diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-06 12:51:32 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-06 12:51:32 +0200 |
commit | 0b5cc026fbe7ccc6de66907be29471562a2d344d (patch) | |
tree | a6051f238152e2261ea48942f0c216a3984cc9fd /src | |
parent | b8b4acef4c2ba1a169ce84c1fb4c70a5676ebba3 (diff) |
clean up - session/socket/engine stuff removed
Diffstat (limited to 'src')
73 files changed, 109 insertions, 4856 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 27f4412..bde9c39 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,53 +7,30 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - connecter.hpp \ context.hpp \ - data_distributor.hpp \ decoder.hpp \ devpoll.hpp \ - dummy_aggregator.hpp \ - dummy_distributor.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ - fair_aggregator.hpp \ fd.hpp \ fd_signaler.hpp \ - io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ - i_demux.hpp \ - i_mux.hpp \ i_poller.hpp \ i_poll_events.hpp \ - i_session.hpp \ i_signaler.hpp \ - i_engine.hpp \ - i_thread.hpp \ - listener.hpp \ + i_socket.hpp \ kqueue.hpp \ - load_balancer.hpp \ msg.hpp \ mutex.hpp \ object.hpp \ - p2p.hpp \ pipe.hpp \ - pipe_reader.hpp \ - pipe_writer.hpp \ platform.hpp \ poll.hpp \ - pub.hpp \ - rep.hpp \ - req.hpp \ - safe_object.hpp \ select.hpp \ - session.hpp \ - session_stub.hpp \ simple_semaphore.hpp \ - socket_base.hpp \ - sub.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -65,50 +42,24 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ - zmq_decoder.hpp \ - zmq_encoder.hpp \ - zmq_tcp_engine.hpp \ app_thread.cpp \ - connecter.cpp \ context.cpp \ - data_distributor.cpp \ devpoll.hpp \ - dummy_aggregator.cpp \ - dummy_distributor.cpp \ epoll.cpp \ err.cpp \ - fair_aggregator.cpp \ fd_signaler.cpp \ - io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ - listener.cpp \ - load_balancer.cpp \ object.cpp \ - p2p.cpp \ - pipe.cpp \ - pipe_reader.cpp \ - pipe_writer.cpp \ poll.cpp \ - pub.cpp \ - rep.cpp \ - req.cpp \ - safe_object.cpp \ select.cpp \ - session.cpp \ - session_stub.cpp \ - socket_base.cpp \ - sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq_decoder.cpp \ - zmq_encoder.cpp \ - zmq_tcp_engine.cpp \ zmq.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9cc61c7..23a055a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -28,20 +28,8 @@ #include "app_thread.hpp" #include "context.hpp" #include "err.hpp" -#include "session.hpp" #include "pipe.hpp" #include "config.hpp" -#include "i_api.hpp" -#include "dummy_aggregator.hpp" -#include "fair_aggregator.hpp" -#include "dummy_distributor.hpp" -#include "data_distributor.hpp" -#include "load_balancer.hpp" -#include "p2p.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : { } -void zmq::app_thread_t::shutdown () -{ - // Deallocate all the sessions associated with the thread. - while (!sessions.empty ()) - sessions [0]->shutdown (); - - delete this; -} - zmq::app_thread_t::~app_thread_t () { -} + // Ask all the sockets to start termination, then wait till it is complete. + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + (*it)->stop (); + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + delete *it; -void zmq::app_thread_t::attach_session (session_t *session_) -{ - session_->set_index (sessions.size ()); - sessions.push_back (session_); -} - -void zmq::app_thread_t::detach_session (session_t *session_) -{ - // O(1) removal of the session from the list. - sessions_t::size_type i = session_->get_index (); - sessions [i] = sessions [sessions.size () - 1]; - sessions [i]->set_index (i); - sessions.pop_back (); -} - -zmq::i_poller *zmq::app_thread_t::get_poller () -{ - zmq_assert (false); + delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () bool zmq::app_thread_t::is_current () { - return !sessions.empty () && tid == getpid (); + return !sockets.empty () && tid == getpid (); } bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. - if (!sessions.empty ()) + if (!sockets.empty ()) return false; tid = getpid (); return true; } -zmq::i_api *zmq::app_thread_t::create_socket (int type_) -{ - i_mux *mux = NULL; - i_demux *demux = NULL; - session_t *session = NULL; - i_api *api = NULL; - - switch (type_) { - case ZMQ_P2P: - mux = new dummy_aggregator_t; - zmq_assert (mux); - demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new p2p_t (this, session); - zmq_assert (api); - break; - case ZMQ_PUB: - demux = new data_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new pub_t (this, session); - zmq_assert (api); - break; - case ZMQ_SUB: - mux = new fair_aggregator_t; - zmq_assert (mux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new sub_t (this, session); - zmq_assert (api); - break; - case ZMQ_REQ: - // TODO - zmq_assert (false); - api = new req_t (this, session); - zmq_assert (api); - break; - case ZMQ_REP: - // TODO - zmq_assert (false); - api = new rep_t (this, session); - zmq_assert (api); - break; - default: - errno = EINVAL; - return NULL; - } - - attach_session (session); - - return api; -} - void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 8295c2f..31679b8 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,7 @@ #include <vector> -#include "i_thread.hpp" +#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -30,23 +30,18 @@ namespace zmq { - class app_thread_t : public object_t, public i_thread + class app_thread_t : public object_t { public: app_thread_t (class context_t *context_, int thread_slot_); - // To be called when the whole infrastrucure is being closed. - void shutdown (); + ~app_thread_t (); // Returns signaler associated with this application thread. i_signaler *get_signaler (); - // Create socket engine in this thread. Return false if the calling - // thread doesn't match the thread handled by this app thread object. - struct i_api *create_socket (int type_); - - // Nota bene: The following two functions are accessed from different + // Nota bene: Following two functions are accessed from different // threads. The caller (context) is responsible for synchronisation // of accesses. @@ -61,25 +56,17 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); - // i_thread implementation. - void attach_session (class session_t *session_); - void detach_session (class session_t *session_); - struct i_poller *get_poller (); - private: - // Clean-up. - ~app_thread_t (); + // All the sockets created from this application thread. + typedef std::vector <i_socket*> sockets_t; + sockets_t sockets; // Thread ID associated with this slot. // TODO: Virtualise pid_t! // TODO: Check whether getpid returns unique ID for each thread. int tid; - // Vector of all sessionss associated with this app thread. - typedef std::vector <class session_t*> sessions_t; - sessions_t sessions; - // App thread's signaler object. ypollset_t pollset; diff --git a/src/connecter.cpp b/src/connecter.cpp deleted file mode 100644 index 970dcf7..0000000 --- a/src/connecter.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" - -zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, - session_t *session_) : - io_object_t (thread_), - state (idle), - poller (NULL), - session (session_), - addr (addr_), - identity ("abcde"), - engine (NULL) -{ -} - -void zmq::connecter_t::terminate () -{ - delete this; -} - -void zmq::connecter_t::shutdown () -{ - delete this; -} - -zmq::connecter_t::~connecter_t () -{ -} - -void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) -{ - // Fet poller pointer for further use. - zmq_assert (!poller); - poller = get_poller (); - - // Ask the session to register itself with the I/O thread. Note that - // the session is living in the same I/O thread, thus this results - // in a synchronous call. - session->inc_seqnum (); - send_reg (session, NULL); - - // Unlock the application thread that created the connecter. - if (smph_) - smph_->post (); - - // Manually trigger timer event which will launch asynchronous connect. - state = waiting; - timer_event (); -} - -void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) -{ - // Unregister connecter/engine from the poller. - zmq_assert (poller); - if (state == connecting) - poller->rm_fd (handle); - else if (state == waiting) - poller->cancel_timer (this); - else if (state == sending) - engine->terminate (); - - // Unlock the application thread closing the connecter. - if (smph_) - smph_->post (); -} - -void zmq::connecter_t::in_event () -{ - // Error occured in asynchronous connect. Retry to connect after a while. - if (state == connecting) { - fd_t fd = tcp_connecter.connect (); - zmq_assert (fd == retired_fd); - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::out_event () -{ - if (state == connecting) { - - fd_t fd = tcp_connecter.connect (); - if (fd == retired_fd) { - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - poller->rm_fd (handle); - engine = new zmq_tcp_engine_t (fd); - zmq_assert (engine); - engine->attach (poller, this); - state = sending; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::timer_event () -{ - zmq_assert (state == waiting); - - // Initiate async connect and start polling for its completion. If async - // connect fails instantly, try to reconnect after a while. - int rc = tcp_connecter.open (addr.c_str ()); - if (rc == 0) { - state = connecting; - in_event (); - } - else if (rc == 1) { - handle = poller->add_fd (tcp_connecter.get_fd (), this); - poller->set_pollout (handle); - state = connecting; - } - else { - poller->add_timer (this); - state = waiting; - } -} - -void zmq::connecter_t::set_engine (struct i_engine *engine_) -{ - engine = engine_; -} - -bool zmq::connecter_t::read (zmq_msg *msg_) -{ - zmq_assert (state == sending); - - // Deallocate old content of the message just in case. - zmq_msg_close (msg_); - - // Send the identity. - zmq_msg_init_size (msg_, identity.size ()); - memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); - - // Ask engine to unregister from the poller. - i_engine *e = engine; - engine->detach (); - - // Attach the engine to the session. (Note that this is actually - // a synchronous call. - session->inc_seqnum (); - send_engine (session, e); - - state = idle; - - return true; -} - -bool zmq::connecter_t::write (struct zmq_msg *msg_) -{ - // No incoming messages are accepted till identity is sent. - return false; -} - -void zmq::connecter_t::flush () -{ - // No incoming messages are accepted till identity is sent. -} diff --git a/src/connecter.hpp b/src/connecter.hpp deleted file mode 100644 index 1f11c63..0000000 --- a/src/connecter.hpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_CONNECTER_HPP_INCLUDED__ - -#include <string> - -#include "../include/zmq.h" - -#include "i_poller.hpp" -#include "io_object.hpp" -#include "i_poll_events.hpp" -#include "i_session.hpp" -#include "tcp_connecter.hpp" - -namespace zmq -{ - - class connecter_t : public io_object_t, public i_poll_events, - public i_session - { - public: - - connecter_t (class io_thread_t *thread_, const char *addr_, - class session_t *session_); - - void terminate (); - void shutdown (); - - void process_reg (class simple_semaphore_t *smph_); - void process_unreg (class simple_semaphore_t *smph_); - - // i_poll_events implementation. - void in_event (); - void out_event (); - void timer_event (); - - // i_session implementation - void set_engine (struct i_engine *engine_); - // void shutdown (); - bool read (struct zmq_msg *msg_); - bool write (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~connecter_t (); - - enum { - idle, - waiting, - connecting, - sending - } state; |