From 0b5cc026fbe7ccc6de66907be29471562a2d344d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 12:51:32 +0200 Subject: clean up - session/socket/engine stuff removed --- src/Makefile.am | 51 +-------- src/app_thread.cpp | 106 ++---------------- src/app_thread.hpp | 27 ++--- src/connecter.cpp | 189 -------------------------------- src/connecter.hpp | 99 ----------------- src/context.cpp | 139 +++-------------------- src/context.hpp | 52 +-------- src/data_distributor.cpp | 155 -------------------------- src/data_distributor.hpp | 70 ------------ src/devpoll.cpp | 9 +- src/devpoll.hpp | 3 +- src/dummy_aggregator.cpp | 111 ------------------- src/dummy_aggregator.hpp | 73 ------------- src/dummy_distributor.cpp | 85 --------------- src/dummy_distributor.hpp | 68 ------------ src/epoll.cpp | 10 +- src/epoll.hpp | 3 +- src/fair_aggregator.cpp | 143 ------------------------ src/fair_aggregator.hpp | 77 ------------- src/i_api.hpp | 42 +++---- src/i_demux.hpp | 57 ---------- src/i_engine.hpp | 53 --------- src/i_mux.hpp | 60 ---------- src/i_poller.hpp | 7 +- src/i_session.hpp | 37 ------- src/i_socket.hpp | 36 ++++++ src/i_thread.hpp | 38 ------- src/io_object.cpp | 37 ------- src/io_object.hpp | 51 --------- src/io_thread.cpp | 31 ------ src/io_thread.hpp | 27 +---- src/kqueue.cpp | 9 +- src/kqueue.hpp | 3 +- src/listener.cpp | 170 ----------------------------- src/listener.hpp | 110 ------------------- src/load_balancer.cpp | 130 ---------------------- src/load_balancer.hpp | 73 ------------- src/object.cpp | 33 ------ src/object.hpp | 8 -- src/p2p.cpp | 29 ----- src/p2p.hpp | 42 ------- src/pipe.cpp | 47 -------- src/pipe.hpp | 23 +--- src/pipe_reader.cpp | 118 -------------------- src/pipe_reader.hpp | 89 --------------- src/pipe_writer.cpp | 120 -------------------- src/pipe_writer.hpp | 88 --------------- src/poll.cpp | 13 ++- src/poll.hpp | 3 +- src/pub.cpp | 38 ------- src/pub.hpp | 45 -------- src/rep.cpp | 29 ----- src/rep.hpp | 42 ------- src/req.cpp | 29 ----- src/req.hpp | 42 ------- src/safe_object.cpp | 76 ------------- src/safe_object.hpp | 68 ------------ src/select.cpp | 13 ++- src/select.hpp | 2 +- src/session.cpp | 273 ---------------------------------------------- src/session.hpp | 107 ------------------ src/session_stub.cpp | 110 ------------------- src/session_stub.hpp | 83 -------------- src/socket_base.cpp | 267 --------------------------------------------- src/socket_base.hpp | 96 ---------------- src/sub.cpp | 45 -------- src/sub.hpp | 46 -------- src/zmq.cpp | 2 +- src/zmq_decoder.cpp | 79 -------------- src/zmq_decoder.hpp | 57 ---------- src/zmq_encoder.cpp | 75 ------------- src/zmq_encoder.hpp | 54 --------- src/zmq_tcp_engine.cpp | 185 ------------------------------- src/zmq_tcp_engine.hpp | 92 ---------------- 74 files changed, 131 insertions(+), 4878 deletions(-) delete mode 100644 src/connecter.cpp delete mode 100644 src/connecter.hpp delete mode 100644 src/data_distributor.cpp delete mode 100644 src/data_distributor.hpp delete mode 100644 src/dummy_aggregator.cpp delete mode 100644 src/dummy_aggregator.hpp delete mode 100644 src/dummy_distributor.cpp delete mode 100644 src/dummy_distributor.hpp delete mode 100644 src/fair_aggregator.cpp delete mode 100644 src/fair_aggregator.hpp delete mode 100644 src/i_demux.hpp delete mode 100644 src/i_engine.hpp delete mode 100644 src/i_mux.hpp delete mode 100644 src/i_session.hpp create mode 100644 src/i_socket.hpp delete mode 100644 src/i_thread.hpp delete mode 100644 src/io_object.cpp delete mode 100644 src/io_object.hpp delete mode 100644 src/listener.cpp delete mode 100644 src/listener.hpp delete mode 100644 src/load_balancer.cpp delete mode 100644 src/load_balancer.hpp delete mode 100644 src/p2p.cpp delete mode 100644 src/p2p.hpp delete mode 100644 src/pipe.cpp delete mode 100644 src/pipe_reader.cpp delete mode 100644 src/pipe_reader.hpp delete mode 100644 src/pipe_writer.cpp delete mode 100644 src/pipe_writer.hpp delete mode 100644 src/pub.cpp delete mode 100644 src/pub.hpp delete mode 100644 src/rep.cpp delete mode 100644 src/rep.hpp delete mode 100644 src/req.cpp delete mode 100644 src/req.hpp delete mode 100644 src/safe_object.cpp delete mode 100644 src/safe_object.hpp delete mode 100644 src/session.cpp delete mode 100644 src/session.hpp delete mode 100644 src/session_stub.cpp delete mode 100644 src/session_stub.hpp delete mode 100644 src/socket_base.cpp delete mode 100644 src/socket_base.hpp delete mode 100644 src/sub.cpp delete mode 100644 src/sub.hpp delete mode 100644 src/zmq_decoder.cpp delete mode 100644 src/zmq_decoder.hpp delete mode 100644 src/zmq_encoder.cpp delete mode 100644 src/zmq_encoder.hpp delete mode 100644 src/zmq_tcp_engine.cpp delete mode 100644 src/zmq_tcp_engine.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 27f4412..bde9c39 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,53 +7,30 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - connecter.hpp \ context.hpp \ - data_distributor.hpp \ decoder.hpp \ devpoll.hpp \ - dummy_aggregator.hpp \ - dummy_distributor.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ - fair_aggregator.hpp \ fd.hpp \ fd_signaler.hpp \ - io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ - i_demux.hpp \ - i_mux.hpp \ i_poller.hpp \ i_poll_events.hpp \ - i_session.hpp \ i_signaler.hpp \ - i_engine.hpp \ - i_thread.hpp \ - listener.hpp \ + i_socket.hpp \ kqueue.hpp \ - load_balancer.hpp \ msg.hpp \ mutex.hpp \ object.hpp \ - p2p.hpp \ pipe.hpp \ - pipe_reader.hpp \ - pipe_writer.hpp \ platform.hpp \ poll.hpp \ - pub.hpp \ - rep.hpp \ - req.hpp \ - safe_object.hpp \ select.hpp \ - session.hpp \ - session_stub.hpp \ simple_semaphore.hpp \ - socket_base.hpp \ - sub.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -65,50 +42,24 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ - zmq_decoder.hpp \ - zmq_encoder.hpp \ - zmq_tcp_engine.hpp \ app_thread.cpp \ - connecter.cpp \ context.cpp \ - data_distributor.cpp \ devpoll.hpp \ - dummy_aggregator.cpp \ - dummy_distributor.cpp \ epoll.cpp \ err.cpp \ - fair_aggregator.cpp \ fd_signaler.cpp \ - io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ - listener.cpp \ - load_balancer.cpp \ object.cpp \ - p2p.cpp \ - pipe.cpp \ - pipe_reader.cpp \ - pipe_writer.cpp \ poll.cpp \ - pub.cpp \ - rep.cpp \ - req.cpp \ - safe_object.cpp \ select.cpp \ - session.cpp \ - session_stub.cpp \ - socket_base.cpp \ - sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq_decoder.cpp \ - zmq_encoder.cpp \ - zmq_tcp_engine.cpp \ zmq.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9cc61c7..23a055a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -28,20 +28,8 @@ #include "app_thread.hpp" #include "context.hpp" #include "err.hpp" -#include "session.hpp" #include "pipe.hpp" #include "config.hpp" -#include "i_api.hpp" -#include "dummy_aggregator.hpp" -#include "fair_aggregator.hpp" -#include "dummy_distributor.hpp" -#include "data_distributor.hpp" -#include "load_balancer.hpp" -#include "p2p.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : { } -void zmq::app_thread_t::shutdown () -{ - // Deallocate all the sessions associated with the thread. - while (!sessions.empty ()) - sessions [0]->shutdown (); - - delete this; -} - zmq::app_thread_t::~app_thread_t () { -} + // Ask all the sockets to start termination, then wait till it is complete. + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + (*it)->stop (); + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + delete *it; -void zmq::app_thread_t::attach_session (session_t *session_) -{ - session_->set_index (sessions.size ()); - sessions.push_back (session_); -} - -void zmq::app_thread_t::detach_session (session_t *session_) -{ - // O(1) removal of the session from the list. - sessions_t::size_type i = session_->get_index (); - sessions [i] = sessions [sessions.size () - 1]; - sessions [i]->set_index (i); - sessions.pop_back (); -} - -zmq::i_poller *zmq::app_thread_t::get_poller () -{ - zmq_assert (false); + delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () bool zmq::app_thread_t::is_current () { - return !sessions.empty () && tid == getpid (); + return !sockets.empty () && tid == getpid (); } bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. - if (!sessions.empty ()) + if (!sockets.empty ()) return false; tid = getpid (); return true; } -zmq::i_api *zmq::app_thread_t::create_socket (int type_) -{ - i_mux *mux = NULL; - i_demux *demux = NULL; - session_t *session = NULL; - i_api *api = NULL; - - switch (type_) { - case ZMQ_P2P: - mux = new dummy_aggregator_t; - zmq_assert (mux); - demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new p2p_t (this, session); - zmq_assert (api); - break; - case ZMQ_PUB: - demux = new data_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new pub_t (this, session); - zmq_assert (api); - break; - case ZMQ_SUB: - mux = new fair_aggregator_t; - zmq_assert (mux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new sub_t (this, session); - zmq_assert (api); - break; - case ZMQ_REQ: - // TODO - zmq_assert (false); - api = new req_t (this, session); - zmq_assert (api); - break; - case ZMQ_REP: - // TODO - zmq_assert (false); - api = new rep_t (this, session); - zmq_assert (api); - break; - default: - errno = EINVAL; - return NULL; - } - - attach_session (session); - - return api; -} - void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 8295c2f..31679b8 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,7 @@ #include -#include "i_thread.hpp" +#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -30,23 +30,18 @@ namespace zmq { - class app_thread_t : public object_t, public i_thread + class app_thread_t : public object_t { public: app_thread_t (class context_t *context_, int thread_slot_); - // To be called when the whole infrastrucure is being closed. - void shutdown (); + ~app_thread_t (); // Returns signaler associated with this application thread. i_signaler *get_signaler (); - // Create socket engine in this thread. Return false if the calling - // thread doesn't match the thread handled by this app thread object. - struct i_api *create_socket (int type_); - - // Nota bene: The following two functions are accessed from different + // Nota bene: Following two functions are accessed from different // threads. The caller (context) is responsible for synchronisation // of accesses. @@ -61,25 +56,17 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); - // i_thread implementation. - void attach_session (class session_t *session_); - void detach_session (class session_t *session_); - struct i_poller *get_poller (); - private: - // Clean-up. - ~app_thread_t (); + // All the sockets created from this application thread. + typedef std::vector sockets_t; + sockets_t sockets; // Thread ID associated with this slot. // TODO: Virtualise pid_t! // TODO: Check whether getpid returns unique ID for each thread. int tid; - // Vector of all sessionss associated with this app thread. - typedef std::vector sessions_t; - sessions_t sessions; - // App thread's signaler object. ypollset_t pollset; diff --git a/src/connecter.cpp b/src/connecter.cpp deleted file mode 100644 index 970dcf7..0000000 --- a/src/connecter.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" - -zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, - session_t *session_) : - io_object_t (thread_), - state (idle), - poller (NULL), - session (session_), - addr (addr_), - identity ("abcde"), - engine (NULL) -{ -} - -void zmq::connecter_t::terminate () -{ - delete this; -} - -void zmq::connecter_t::shutdown () -{ - delete this; -} - -zmq::connecter_t::~connecter_t () -{ -} - -void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) -{ - // Fet poller pointer for further use. - zmq_assert (!poller); - poller = get_poller (); - - // Ask the session to register itself with the I/O thread. Note that - // the session is living in the same I/O thread, thus this results - // in a synchronous call. - session->inc_seqnum (); - send_reg (session, NULL); - - // Unlock the application thread that created the connecter. - if (smph_) - smph_->post (); - - // Manually trigger timer event which will launch asynchronous connect. - state = waiting; - timer_event (); -} - -void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) -{ - // Unregister connecter/engine from the poller. - zmq_assert (poller); - if (state == connecting) - poller->rm_fd (handle); - else if (state == waiting) - poller->cancel_timer (this); - else if (state == sending) - engine->terminate (); - - // Unlock the application thread closing the connecter. - if (smph_) - smph_->post (); -} - -void zmq::connecter_t::in_event () -{ - // Error occured in asynchronous connect. Retry to connect after a while. - if (state == connecting) { - fd_t fd = tcp_connecter.connect (); - zmq_assert (fd == retired_fd); - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::out_event () -{ - if (state == connecting) { - - fd_t fd = tcp_connecter.connect (); - if (fd == retired_fd) { - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - poller->rm_fd (handle); - engine = new zmq_tcp_engine_t (fd); - zmq_assert (engine); - engine->attach (poller, this); - state = sending; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::timer_event () -{ - zmq_assert (state == waiting); - - // Initiate async connect and start polling for its completion. If async - // connect fails instantly, try to reconnect after a while. - int rc = tcp_connecter.open (addr.c_str ()); - if (rc == 0) { - state = connecting; - in_event (); - } - else if (rc == 1) { - handle = poller->add_fd (tcp_connecter.get_fd (), this); - poller->set_pollout (handle); - state = connecting; - } - else { - poller->add_timer (this); - state = waiting; - } -} - -void zmq::connecter_t::set_engine (struct i_engine *engine_) -{ - engine = engine_; -} - -bool zmq::connecter_t::read (zmq_msg *msg_) -{ - zmq_assert (state == sending); - - // Deallocate old content of the message just in case. - zmq_msg_close (msg_); - - // Send the identity. - zmq_msg_init_size (msg_, identity.size ()); - memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); - - // Ask engine to unregister from the poller. - i_engine *e = engine; - engine->detach (); - - // Attach the engine to the session. (Note that this is actually - // a synchronous call. - session->inc_seqnum (); - send_engine (session, e); - - state = idle; - - return true; -} - -bool zmq::connecter_t::write (struct zmq_msg *msg_) -{ - // No incoming messages are accepted till identity is sent. - return false; -} - -void zmq::connecter_t::flush () -{ - // No incoming messages are accepted till identity is sent. -} diff --git a/src/connecter.hpp b/src/connecter.hpp deleted file mode 100644 index 1f11c63..0000000 --- a/src/connecter.hpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_CONNECTER_HPP_INCLUDED__ - -#include - -#include "../include/zmq.h" - -#include "i_poller.hpp" -#include "io_object.hpp" -#include "i_poll_events.hpp" -#include "i_session.hpp" -#include "tcp_connecter.hpp" - -namespace zmq -{ - - class connecter_t : public io_object_t, public i_poll_events, - public i_session - { - public: - - connecter_t (class io_thread_t *thread_, const char *addr_, - class session_t *session_); - - void terminate (); - void shutdown (); - - void process_reg (class simple_semaphore_t *smph_); - void process_unreg (class simple_semaphore_t *smph_); - - // i_poll_events implementation. - void in_event (); - void out_event (); - void timer_event (); - - // i_session implementation - void set_engine (struct i_engine *engine_); - // void shutdown (); - bool read (struct zmq_msg *msg_); - bool write (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~connecter_t (); - - enum { - idle, - waiting, - connecting, - sending - } state; - - // Cached pointer to the poller. - struct i_poller *poller; - - // Handle of the connecting socket. - handle_t handle; - - // Associated session. It lives in the same I/O thread. - class session_t *session; - - // Address to connect to. - std::string addr; - - // Identity of the connection. - std::string identity; - - tcp_connecter_t tcp_connecter; - - struct i_engine *engine; - - connecter_t (const connecter_t&); - void operator = (const connecter_t&); - }; - -} - -#endif diff --git a/src/context.cpp b/src/context.cpp index ab4643e..6b071cf 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -20,15 +20,12 @@ #include "../include/zmq.h" #include "context.hpp" +#include "i_api.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" #include "pipe.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "session.hpp" -#include "i_api.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.h" @@ -72,37 +69,23 @@ zmq::context_t::context_t (int app_threads_, int io_threads_) io_threads [i]->start (); } -void zmq::context_t::shutdown () -{ - delete this; -} - zmq::context_t::~context_t () { - // Ask I/O threads to terminate. + // Close all application theads, sockets, io_objects etc. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + delete app_threads [i]; + + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O + // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) io_threads [i]->stop (); // Wait till I/O threads actually terminate. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->join (); - - // At this point the current thread is the only thread with access to - // our internal data. Deallocation will be done exclusively in this thread. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i]->shutdown (); - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->shutdown (); + delete io_threads [i]; delete [] command_pipes; - // Deallocate all the pipes, pipe readers and pipe writers. - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) { - delete it->pipe; - delete it->reader; - delete it->writer; - } - #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. int rc = WSACleanup (); @@ -123,7 +106,11 @@ zmq::i_api *zmq::context_t::create_socket (int type_) threads_sync.unlock (); return NULL; } - i_api *s = thread->create_socket (type_); + + zmq_assert (false); + i_api *s = NULL; + //i_api *s = thread->create_socket (type_); + threads_sync.unlock (); return s; } @@ -164,103 +151,3 @@ zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } - -void zmq::context_t::create_pipe (object_t *reader_parent_, - object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - pipe_reader_t **reader_, pipe_writer_t **writer_) -{ - // Create the pipe, reader & writer triple. - pipe_t *pipe = new pipe_t; - zmq_assert (pipe); - pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, - hwm_, lwm_); - zmq_assert (reader); - pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, - hwm_, lwm_); - zmq_assert (writer); - reader->set_peer (writer); - - // Store the pipe in the repository. - pipe_info_t info = {pipe, reader, writer}; - pipes_sync.lock (); - pipe->set_index (pipes.size ()); - pipes.push_back (info); - pipes_sync.unlock (); - - *reader_ = reader; - *writer_ = writer; -} - -void zmq::context_t::destroy_pipe (pipe_t *pipe_) -{ - // Remove the pipe from the repository. - pipe_info_t info; - pipes_sync.lock (); - pipes_t::size_type i = pipe_->get_index (); - info = pipes [i]; - pipes [i] = pipes.back (); - pipes.pop_back (); - pipes_sync.unlock (); - - // Deallocate the pipe and associated pipe reader & pipe writer. - zmq_assert (info.pipe == pipe_); - delete info.pipe; - delete info.reader; - delete info.writer; -} - -int zmq::context_t::register_inproc_endpoint (const char *endpoint_, - session_t *session_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it != inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRINUSE; - return -1; - } - - inproc_endpoints.insert (std::make_pair (endpoint_, session_)); - - inproc_endpoint_sync.unlock (); - return 0; -} - -zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it == inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRNOTAVAIL; - return NULL; - } - - it->second->inc_seqnum (); - object_t *session = it->second; - - inproc_endpoint_sync.unlock (); - return session; -} - -void zmq::context_t::unregister_inproc_endpoints (session_t *session_) -{ - inproc_endpoint_sync.lock (); - - // Remove the connection from the repository. - // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n). - for (inproc_endpoints_t::iterator it = inproc_endpoints.begin (); - it != inproc_endpoints.end ();) { - if (it->second == session_) { - inproc_endpoints.erase (it); - it = inproc_endpoints.begin (); - } - else - it++; - } - - inproc_endpoint_sync.unlock (); -} - diff --git a/src/context.hpp b/src/context.hpp index 7701ef7..f2eab1c 100644 --- a/src/context.hpp +++ b/src/context.hpp @@ -52,9 +52,9 @@ namespace zmq context_t (int app_threads_, int io_threads_); // To be called to terminate the whole infrastructure (zmq_term). - void shutdown (); + ~context_t (); - // Create a socket engine. + // Create a socket. struct i_api *create_socket (int type_); // Returns number of thread slots in the context. To be used by @@ -81,37 +81,12 @@ namespace zmq destination_].read (command_); } - // Creates new pipe. - void create_pipe (class object_t *reader_parent_, - class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - class pipe_reader_t **reader_, class pipe_writer_t **writer_); - - // Deallocates the pipe. - void destroy_pipe (class pipe_t *pipe_); - - // Registers existing session object as an inproc endpoint. - int register_inproc_endpoint (const char *endpoint_, - class session_t *session_); - - // Retrieves an inproc endpoint. Increments the command sequence number - // of the object by one. Caller is thus bound to send the command - // to the connection after invoking this function. Returns NULL if - // the endpoint doesn't exist. - class object_t *get_inproc_endpoint (const char *endpoint_); - - // Removes all the inproc endpoints associated with the given session - // object from the global repository. - void unregister_inproc_endpoints (class session_t *session_); - // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); private: - // Clean-up. - ~context_t (); - // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. class app_thread_t *choose_app_thread (); @@ -137,29 +112,6 @@ namespace zmq // Synchronisation of accesses to shared thread data. mutex_t threads_sync; - // Global repository of pipes. It's used only on terminal shutdown - // to deallocate all the pipes irrespective of whether they are - // referenced from pipe_reader, pipe_writer or both. - struct pipe_info_t - { - class pipe_t *pipe; - class pipe_reader_t *reader; - class pipe_writer_t *writer; - }; - typedef std::vector pipes_t; - pipes_t pipes; - - // Synchronisation of access to global repository of pipes. - mutex_t pipes_sync; - - // Global repository of available inproc endpoints. - typedef std::map inproc_endpoints_t; - inproc_endpoints_t inproc_endpoints; - - // Synchronisation of access to the global repository - // of inproc endpoints. - mutex_t inproc_endpoint_sync; - context_t (const context_t&); void operator = (const context_t&); }; diff --git a/src/data_distributor.cpp b/src/data_distributor.cpp deleted file mode 100644 index 971edce..0000000 --- a/src/data_distributor.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "data_distributor.hpp" -#include "pipe_writer.hpp" -#include "err.hpp" -#include "session.hpp" -#include "msg.hpp" - -zmq::data_distributor_t::data_distributor_t () : - session (NULL) -{ -} - -void zmq::data_distributor_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::data_distributor_t::shutdown () -{ - // No need to deallocate pipes here. They'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::data_distributor_t::terminate () -{ - // Pipe unregisters itself during the call to terminate, so the pipes - // list shinks by one in each iteration. - while (!pipes.empty ()) - pipes [0]->terminate (); - - delete this; -} - -zmq::data_distributor_t::~data_distributor_t () -{ -} - -void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) -{ - // Associate demux with a new pipe. - pipe_->set_demux (this); - pipe_->set_index (pipes.size ()); - pipes.push_back (pipe_); -} - -void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) -{ - // Release the reference to the pipe. - int index = pipe_->get_index (); - pipe_->set_index (-1); - pipes [index] = pipes.back (); - pipes [index]->set_index (index); - pipes.pop_back (); -} - -bool zmq::data_distributor_t::empty () -{ - return pipes.empty (); -} - -bool zmq::data_distributor_t::send (zmq_msg *msg_) -{ - int pipes_count = pipes.size (); - - // If there are no pipes available, simply drop the message. - if (pipes_count == 0) { - zmq_msg_close (msg_); - zmq_msg_init (msg_); - return true; - } - - // TODO: ??? - // First check whether all pipes are available for writing. -// for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) -// if (!(*it)->check_write (msg_)) -// return false; - - // For VSMs the copying is straighforward. - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) { - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - write_to_pipe (*it, msg_); - zmq_msg_init (msg_); - return true; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment (i.e. atomic - // operations) needed. - if (pipes_count == 1) { - write_to_pipe (*pipes.begin (), msg_); - zmq_msg_init (msg_); - return true; - } - - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why the -1). - if (msg_->shared) - msg_->content->refcnt.add (pipes_count - 1); - else { - msg_->shared = true; - // TODO: Add memory barrier here. - msg_->content->refcnt.set (pipes_count); - } - - // Push the message to all destinations. - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - write_to_pipe (*it, msg_); - - // Detach the original message from the data buffer. - zmq_msg_init (msg_); - - return true; -} - -void zmq::data_distributor_t::flush () -{ - // Flush all pipes. If there's large number of pipes, it can be pretty - // inefficient (especially if there's new message only in a single pipe). - // Can it be improved? - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - (*it)->flush (); -} - -void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zmq_msg *msg_) -{ - if (!pipe_->write (msg_)) { - // TODO: Push gap notification to the pipe. - zmq_assert (false); - } -} - diff --git a/src/data_distributor.hpp b/src/data_distributor.hpp deleted file mode 100644 index 5bde2e8..0000000 --- a/src/data_distributor.hpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ - -#include - -#include - -namespace zmq -{ - - // Object to distribute messages to outbound pipes. - - class data_distributor_t : public i_demux - { - public: - - data_distributor_t (); - - // i_demux implementation. - void set_session (class session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_writer_t *pipe_); - void detach_pipe (class pipe_writer_t *pipe_); - bool empty (); - bool send (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~data_distributor_t (); - - // Reference to the owner session object. - class session_t *session; - - // Writes the message to the pipe if possible. If it isn't, writes - // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); - - // The list of outbound pipes. - typedef std::vector pipes_t; - pipes_t pipes; - - data_distributor_t (const data_distributor_t&); - void operator = (const data_distributor_t&); - }; - -} - -#endif diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 8fb0877..b7d153c 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -52,6 +52,10 @@ zmq::devpoll_t::devpoll_t () zmq::devpoll_t::~devpoll_t () { + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + + worker.stop (); close (devpoll_fd); } @@ -152,11 +156,6 @@ void zmq::devpoll_t::stop () stopping = true; } -void zmq::devpoll_t::join () -{ - worker.stop (); -} - bool zmq::devpoll_t::loop () { // According to the poll(7d) man page, we can retrieve diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 28274c0..57f0156 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -42,7 +42,7 @@ namespace zmq public: devpoll_t (); - virtual ~devpoll_t (); + ~devpoll_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -56,7 +56,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/dummy_aggregator.cpp b/src/dummy_aggregator.cpp deleted file mode 100644 index 0b27fab..0000000 --- a/src/dummy_aggregator.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "dummy_aggregator.hpp" -#include "err.hpp" -#include "pipe_reader.hpp" -#include "session.hpp" - -// Swaps pipes at specified indices. -#define swap_pipes(i1_, i2_) \ - std::swap (pipes [i1_], pipes [i2_]);\ - pipes [i1_]->set_index (i1_);\ - pipes [i2_]->set_index (i2_); - -zmq::dummy_aggregator_t::dummy_aggregator_t () : - session (NULL), - pipe (NULL), - active (false) -{ -} - -void zmq::dummy_aggregator_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::dummy_aggregator_t::shutdown () -{ - // No need to deallocate the pipe here. It'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::dummy_aggregator_t::terminate () -{ - if (pipe) - pipe->terminate (); - - delete this; -} - -zmq::dummy_aggregator_t::~dummy_aggregator_t () -{ -} - -void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) -{ - zmq_assert (!pipe); - pipe = pipe_; - active = true; - - // Associate new pipe with the mux object. - pipe_->set_mux (this); - session->revive (); -} - -void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) -{ - zmq_assert (pipe == pipe_); - deactivate (pipe_); - pipe = NULL; -} - -bool zmq::dummy_aggregator_t::empty () -{ - return pipe == NULL; -} - -bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Try to read from the pipe. - if (pipe && pipe->read (msg_)) - return true; - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - return false; -} - -void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) -{ - active = false; -} - -void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) -{ - active = true; -} diff --git a/src/dummy_aggregator.hpp b/src/dummy_aggregator.hpp deleted file mode 100644 index 6a9e9db..0000000 --- a/src/dummy_aggregator.hpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ -#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ - -#include - -#include "i_mux.hpp" - -namespace zmq -{ - - // Fake message aggregator. There can be at most one pipe bound to it, - // so there's no real aggregation going on. However, it is more efficient - // than a real aggregator. It's intended to be used in the contexts - // where business logic ensures there'll be at most one pipe bound. - - class dummy_aggregator_t : public i_mux - { - public: - - dummy_aggregator_t (); - - // i_mux interface implementation. - void set_session (session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_reader_t *pipe_); - void detach_pipe (class pipe_reader_t *pipe_); - bool empty (); - void deactivate (class pipe_reader_t *pipe_); - void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zmq_msg *msg_); - - - private: - - // Clean-up. - ~dummy_aggregator_t (); - - // Reference to the owner session object. - class session_t *session; - - // The single pipe bound. - class pipe_reader_t *pipe; - - // If true, the pipe is active. - bool active; - - dummy_aggregator_t (const dummy_aggregator_t&); - void operator = (const dummy_aggregator_t&); - }; - -} - -#endif diff --git a/src/dummy_distributor.cpp b/src/dummy_distributor.cpp deleted file mode 100644 index 62e2b88..0000000 --- a/src/dummy_distributor.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "dummy_distributor.hpp" -#include "pipe_writer.hpp" -#include "err.hpp" -#include "session.hpp" -#include "msg.hpp" - -zmq::dummy_distributor_t::dummy_distributor_t () : - session (NULL) -{ -} - -void zmq::dummy_distributor_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::dummy_distributor_t::shutdown () -{ - // No need to deallocate pipe here. It'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::dummy_distributor_t::terminate () -{ - if (pipe) - pipe->terminate (); - - delete this; -} - -zmq::dummy_distributor_t::~dummy_distributor_t () -{ -} - -void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) -{ - zmq_assert (!pipe); - pipe = pipe_; -} - -void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) -{ - zmq_assert (pipe == pipe_); - pipe = NULL; -} - -bool zmq::dummy_distributor_t::empty () -{ - return pipe == NULL; -} - -bool zmq::dummy_distributor_t::send (zmq_msg *msg_) -{ - return pipe && pipe->write (msg_); -} - -void zmq::dummy_distributor_t::flush () -{ - if (pipe) - pipe->flush (); -} - diff --git a/src/dummy_distributor.hpp b/src/dummy_distributor.hpp deleted file mode 100644 index a71cc49..0000000 --- a/src/dummy_distributor.hpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ - -#include - -#include - -namespace zmq -{ - - // Fake message distributor. There can be only one pipe bound to it - // so there no real distribution going on. However, it is more efficient - // than a real distributor and should be used where business logic - // ensures there'll be at most one pipe bound. - - class dummy_distributor_t : public i_demux - { - public: - - dummy_distributor_t (); - - // i_demux implementation. - void set_session (class session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_writer_t *pipe_); - void detach_pipe (class pipe_writer_t *pipe_); - bool empty (); - bool send (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~dummy_distributor_t (); - - // Reference to the owner session object. - class session_t *session; - - // The bound pipe. - class pipe_writer_t *pipe; - - dummy_distributor_t (const dummy_distributor_t&); - void operator = (const dummy_distributor_t&); - }; - -} - -#endif diff --git a/src/epoll.cpp b/src/epoll.cpp index c4c8fdb..15278c6 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -41,8 +41,11 @@ zmq::epoll_t::epoll_t () : zmq::epoll_t::~epoll_t () { - close (epoll_fd); + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + worker.stop (); + close (epoll_fd); for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++) delete *it; } @@ -144,11 +147,6 @@ void zmq::epoll_t::stop () stopping = true; } -void zmq::epoll_t::join () -{ - worker.stop (); -} - void zmq::epoll_t::loop () { epoll_event ev_buf [max_io_events]; diff --git a/src/epoll.hpp b/src/epoll.hpp index aa363ee..619d4f3 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -44,7 +44,7 @@ namespace zmq public: epoll_t (); - virtual ~epoll_t (); + ~epoll_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -58,7 +58,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/fair_aggregator.cpp b/src/fair_aggregator.cpp deleted file mode 100644 index 1e6937f..0000000 --- a/src/fair_aggregator.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "fair_aggregator.hpp" -#include "err.hpp" -#include "pipe_reader.hpp" -#include "session.hpp" - -// Swaps pipes at specified indices. -#define swap_pipes(i1_, i2_) \ - std::swap (pipes [i1_], pipes [i2_]);\ - pipes [i1_]->set_index (i1_);\ - pipes [i2_]->set_index (i2_); - -zmq::fair_aggregator_t::fair_aggregator_t () : - session (NULL), - active (0), - current (0) -{ -} - -void zmq::fair_aggregator_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::fair_aggregator_t::shutdown () -{ - // No need to deallocate pipes here. They'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::fair_aggregator_t::terminate () -{ - // Pipe unregisters itself during the call to terminate, so the pipes - // list shinks by one in each iteration. - while (!pipes.empty ()) - pipes [0]->terminate (); - - delete this; -} - -zmq::fair_aggregator_t::~fair_aggregator_t () -{ -} - -void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_) -{ - // Associate new pipe with the mux object. - pipe_->set_mux (this); - pipes.push_back (pipe_); - active++; - if (pipes.size () > active) - swap_pipes (pipes.size () - 1, active - 1); - if (active == 1) - session->revive (); -} - -void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_) -{ - // Move the pipe from the list of active pipes to the list of idle pipes. - deactivate (pipe_); - - // Move the pipe to the end of the idle list and remove it. - swap_pipes (pipe_->get_index (), pipes.size () - 1); - pipes.pop_back (); -} - -bool zmq::fair_aggregator_t::empty () -{ - return pipes.empty (); -} - -bool zmq::fair_aggregator_t::recv (zmq_msg *msg_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // O(1) fair queueing. Round-robin over the active pipes to get - // next message. - for (pipes_t::size_type i = active; i != 0; i--) { - - // Update current. - current = (current + 1) % active; - - // Try to read from current. - if (pipes [current]->read (msg_)) - return true; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - return false; -} - -void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_) -{ - int index = pipe_->get_index (); - - // Suspend an active pipe. - swap_pipes (index, active - 1); - active--; - - // If the deactiveted pipe is the current one, shift the current one pipe - // backwards so that the pipe that replaced the deactiveted one will be - // processed immediately rather than skipped. - if (index == (int) current) { - index--; - if (index == -1) - index = active - 1; - current = index; - } -} - -void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_) -{ - // Revive an idle pipe. - swap_pipes (pipe_->get_index (), active); - active++; - if (active == 1) - session->revive (); -} diff --git a/src/fair_aggregator.hpp b/src/fair_aggregator.hpp deleted file mode 100644 index 6ae1fc5..0000000 --- a/src/fair_aggregator.hpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ -#define __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ - -#include - -#include "i_mux.hpp" - -namespace zmq -{ - - // Object to aggregate messages from inbound pipes. - - class fair_aggregator_t : public i_mux - { - public: - - fair_aggregator_t (); - - // i_mux interface implementation. - void set_session (session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_reader_t *pipe_); - void detach_pipe (class pipe_reader_t *pipe_); - bool empty (); - void deactivate (class pipe_reader_t *pipe_); - void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zmq_msg *msg_); - - - private: - - // Clean-up. - ~fair_aggregator_t (); - - // Reference to the owner session object. - class session_t *session; - - // The list of inbound pipes. The active pipes are occupying indices - // from 0 to active-1. Suspended pipes occupy indices from 'active' - // to the end of the array. - typedef std::vector pipes_t; - pipes_t pipes; - - // The number of active pipes. - pipes_t::size_type active; - - // Pipe to retrieve next message from. The messages are retrieved - // from the pipes in round-robin fashion (a.k.a. fair queueing). - pipes_t::size_type current; - - fair_aggregator_t (const fair_aggregator_t&); - void operator = (const fair_aggregator_t&); - }; - -} - -#endif diff --git a/src/i_api.hpp b/src/i_api.hpp index fc7275b..a87e41d 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -1,28 +1,28 @@ /* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . +Copyright (c) 2007-2009 FastMQ Inc. + +This file is part of 0MQ. + +0MQ is free software; you can redistribute it and/or modify it under +the terms of the Lesser GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +0MQ is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +Lesser GNU General Public License for more details. + +You should have received a copy of the Lesser GNU General Public License +along with this program. If not, see . */ - + #ifndef __ZMQ_I_API_HPP_INCLUDED__ #define __ZMQ_I_API_HPP_INCLUDED__ - + namespace zmq { - + struct i_api { virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; @@ -33,7 +33,7 @@ namespace zmq virtual int recv (struct zmq_msg *msg_, int flags_) = 0; virtual int close () = 0; }; - + } - + #endif diff --git a/src/i_demux.hpp b/src/i_demux.hpp deleted file mode 100644 index c4755b5..0000000 --- a/src/i_demux.hpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_DEMUX_HPP_INCLUDED__ -#define __ZMQ_I_DEMUX_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_demux - { - // Attaches mux to a particular session. - virtual void set_session (class session_t *session_) = 0; - - // To be called when the whole infrastrucure - // is being closed (zmq_term). - virtual void shutdown () = 0; - - // To be called when session is being closed. - virtual void terminate () = 0; - - // Adds new pipe to the demux to send messages to. - virtual void attach_pipe (class pipe_writer_t *pipe_) = 0; - - // Removes pipe from the demux. - virtual void detach_pipe (class pipe_writer_t *pipe_) = 0; - - // Returns true if there's no pipe attached. - virtual bool empty () = 0; - - // Sends the message. Returns false if the message cannot be sent - // because the pipes are full. - virtual bool send (struct zmq_msg *msg_) = 0; - - // Flushes messages downstream. - virtual void flush () = 0; - }; - -} - -#endif diff --git a/src/i_engine.hpp b/src/i_engine.hpp deleted file mode 100644 index 8ca2007..0000000 --- a/src/i_engine.hpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ -#define __ZMQ_I_ENGINE_HPP_INCLUDED__ - -namespace zmq -{ - - // Generic interface to access engines from MD objects. - - struct i_engine - { - // Attach the engine with specified context. - virtual void attach (struct i_poller *poller_, - struct i_session *session_) = 0; - - // Detach the engine from the current context. - virtual void detach () = 0; - - // Notify the engine that new messages are available. - virtual void revive () = 0; - - // Called by session when it decides the engine - // should terminate itself. - virtual void schedule_terminate () = 0; - - // Called by normal object termination process. - virtual void terminate () = 0; - - // To be called by MD when terminal shutdown (zmq_term) is in progress. - virtual void shutdown () = 0; - }; - -} - -#endif diff --git a/src/i_mux.hpp b/src/i_mux.hpp deleted file mode 100644 index 22e0a26..0000000 --- a/src/i_mux.hpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_MUX_HPP_INCLUDED__ -#define __ZMQ_I_MUX_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_mux - { - // Attaches mux to a particular session. - virtual void set_session (class session_t *session_) = 0; - - // To be called when the whole infrastrucure - // is being closed (zmq_term). - virtual void shutdown () = 0; - - // To be called when session is being closed. - virtual void terminate () = 0; - - // Adds new pipe to the mux to send messages to. - virtual void attach_pipe (class pipe_reader_t *pipe_) = 0; - - // Removes pipe from the mux. - virtual void detach_pipe (class pipe_reader_t *pipe_) = 0; - - // Returns true if there's no pipe attached. - virtual bool empty () = 0; - - // Shifts the pipe from active to passive state and vice versa. - // TODO: Check whether state transitions cannot be done by - // mux object itself without a need for external APIs. - virtual void deactivate (class pipe_reader_t *pipe_) = 0; - virtual void reactivate (class pipe_reader_t *pipe_) = 0; - - // Receives a message. Returns false when there is no message - // to receive. - virtual bool recv (struct zmq_msg *msg_) = 0; - }; - -} - -#endif diff --git a/src/i_poller.hpp b/src/i_poller.hpp index 52ca095..2665e82 100644 --- a/src/i_poller.hpp +++ b/src/i_poller.hpp @@ -75,13 +75,8 @@ namespace zmq // This method is called from a foreign thread. virtual void start () = 0; - // Ask underlying I/O thread to stop. This method is called from - // underlying thread (callback from io_thread object). + // Ask underlying I/O thread to stop. virtual void stop () = 0; - - // Wait for termination of undelying I/O thread. - // This method is called from a foreign thread. - virtual void join () = 0; }; } diff --git a/src/i_session.hpp b/src/i_session.hpp deleted file mode 100644 index 21cdc0d..0000000 --- a/src/i_session.hpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or