diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-21 14:29:22 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-21 14:29:22 +0200 |
commit | 6be4b0143793ab5ceebc5d9d6bbe5c2f1333a0d2 (patch) | |
tree | a785065e54317d1d360e2e4b3a4acf1d6e5669f1 /src | |
parent | a801b6d8b37557ccfb53030dca22f89a3f99b59c (diff) |
session management implemented
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 10 | ||||
-rw-r--r-- | src/i_inout.hpp | 4 | ||||
-rw-r--r-- | src/msg_content.hpp (renamed from src/msg.hpp) | 9 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 4 | ||||
-rw-r--r-- | src/owned.cpp | 10 | ||||
-rw-r--r-- | src/owned.hpp | 4 | ||||
-rw-r--r-- | src/pipe.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 39 | ||||
-rw-r--r-- | src/session.hpp | 15 | ||||
-rw-r--r-- | src/socket_base.cpp | 77 | ||||
-rw-r--r-- | src/socket_base.hpp | 24 | ||||
-rw-r--r-- | src/zmq.cpp | 111 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 11 | ||||
-rw-r--r-- | src/zmq_connecter.hpp | 10 | ||||
-rw-r--r-- | src/zmq_connecter_init.cpp | 94 | ||||
-rw-r--r-- | src/zmq_connecter_init.hpp (renamed from src/zmq_init.hpp) | 43 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 2 | ||||
-rw-r--r-- | src/zmq_init.cpp | 112 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 5 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 96 | ||||
-rw-r--r-- | src/zmq_listener_init.hpp | 71 |
23 files changed, 499 insertions, 261 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 9d6127c..396e3a3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,7 +23,7 @@ libzmq_la_SOURCES = \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ - msg.hpp \ + msg_content.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -47,11 +47,12 @@ libzmq_la_SOURCES = \ ypollset.hpp \ yqueue.hpp \ zmq_connecter.hpp \ + zmq_connecter_init.hpp \ zmq_decoder.hpp \ zmq_encoder.hpp \ zmq_engine.hpp \ - zmq_init.hpp \ zmq_listener.hpp \ + zmq_listener_init.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ @@ -77,11 +78,12 @@ libzmq_la_SOURCES = \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ + zmq_connecter_init.cpp \ zmq_decoder.cpp \ zmq_encoder.cpp \ zmq_engine.cpp \ - zmq_init.cpp \ - zmq_listener.cpp + zmq_listener.cpp \ + zmq_listener_init.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ diff --git a/src/i_inout.hpp b/src/i_inout.hpp index be2e007..8901c04 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -27,8 +27,8 @@ namespace zmq struct i_inout { - virtual bool read (::zmq_msg *msg_) = 0; - virtual bool write (::zmq_msg *msg_) = 0; + virtual bool read (::zmq_msg_t *msg_) = 0; + virtual bool write (::zmq_msg_t *msg_) = 0; virtual void flush () = 0; }; diff --git a/src/msg.hpp b/src/msg_content.hpp index 4f35961..b468746 100644 --- a/src/msg.hpp +++ b/src/msg_content.hpp @@ -26,8 +26,8 @@ #include "atomic_counter.hpp" -//namespace zmq -//{ +namespace zmq +{ // Shared message buffer. Message data are either allocated in one // continuous block along with this structure - thus avoiding one @@ -36,7 +36,8 @@ // used to deallocate the data. If the buffer is actually shared (there // are at least 2 references to it) refcount member contains number of // references. - struct zmq_msg_content + + struct msg_content_t { void *data; size_t size; @@ -44,6 +45,6 @@ zmq::atomic_counter_t refcnt; }; -//} +} #endif diff --git a/src/object.cpp b/src/object.cpp index b1b7c8a..0a25750 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -131,9 +131,8 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) { - // Let the object know that it cannot shut down till it gets this command. - destination_->inc_seqnum (); - + // The assumption here is that command sequence number of the destination + // object was already incremented in find_session function. command_t cmd; cmd.destination = destination_; cmd.type = command_t::attach; diff --git a/src/object.hpp b/src/object.hpp index 02a071a..31c8c40 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -30,6 +30,10 @@ namespace zmq class object_t { + // Repository of sessions needs to use caller's send_* functions + // when creating new session. TODO: Get rid of this dependency. + friend class socket_base_t; + public: object_t (class dispatcher_t *dispatcher_, int thread_slot_); diff --git a/src/owned.cpp b/src/owned.cpp index 6995a39..810bed7 100644 --- a/src/owned.cpp +++ b/src/owned.cpp @@ -47,6 +47,14 @@ void zmq::owned_t::process_plug () finalise_command (); } +void zmq::owned_t::process_attach (zmq_engine_t *engine_) +{ + // Keep track of how many commands were processed so far. + processed_seqnum++; + + finalise_command (); +} + void zmq::owned_t::term () { send_term_req (owner, this); @@ -65,8 +73,8 @@ void zmq::owned_t::finalise_command () // If termination request was already received and there are no more // commands to wait for, terminate the object. if (shutting_down && processed_seqnum == sent_seqnum.get ()) { - send_term_ack (owner); process_unplug (); + send_term_ack (owner); delete this; } } diff --git a/src/owned.hpp b/src/owned.hpp index 22595d1..78036a3 100644 --- a/src/owned.hpp +++ b/src/owned.hpp @@ -59,6 +59,10 @@ namespace zmq // handler. void process_plug (); + // It's vital that session invokes io_object_t::process_attach + // at the end of it's own attach handler. + void process_attach (class zmq_engine_t *engine_); + // io_object_t defines a new handler used to disconnect the object // from the poller object. Implement the handlen in the derived // classes to ensure sane cleanup. diff --git a/src/pipe.hpp b/src/pipe.hpp index d771120..28e4b4d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -29,7 +29,7 @@ namespace zmq { // Message pipe. - class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity> + class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity> { }; diff --git a/src/session.cpp b/src/session.cpp index 2bb4ff6..fc1f858 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -22,9 +22,10 @@ #include "err.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - zmq_engine_t *engine_) : + const char *name_) : owned_t (parent_, owner_), - engine (engine_) + engine (NULL), + name (name_) { } @@ -32,12 +33,12 @@ zmq::session_t::~session_t () { } -bool zmq::session_t::read (::zmq_msg *msg_) +bool zmq::session_t::read (::zmq_msg_t *msg_) { return false; } -bool zmq::session_t::write (::zmq_msg *msg_) +bool zmq::session_t::write (::zmq_msg_t *msg_) { return false; } @@ -48,14 +49,34 @@ void zmq::session_t::flush () void zmq::session_t::process_plug () { - zmq_assert (engine); - engine->plug (this); + // Register the session with the socket. + bool ok = owner->register_session (name.c_str (), this); + + // There's already a session with the specified identity. + // We should syslog it and drop the session. TODO + zmq_assert (ok); + owned_t::process_plug (); } void zmq::session_t::process_unplug () { - zmq_assert (engine); - engine->unplug (); - delete engine; + // Unregister the session from the socket. + bool ok = owner->unregister_session (name.c_str ()); + zmq_assert (ok); + + if (engine) { + engine->unplug (); + delete engine; + engine = NULL; + } +} + +void zmq::session_t::process_attach (class zmq_engine_t *engine_) +{ + zmq_assert (engine_); + engine = engine_; + engine->plug (this); + + owned_t::process_attach (engine_); } diff --git a/src/session.hpp b/src/session.hpp index 2cb8e18..6d6bcf7 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,8 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include <string> + #include "i_inout.hpp" #include "owned.hpp" +#include "options.hpp" namespace zmq { @@ -30,24 +33,28 @@ namespace zmq { public: - session_t (object_t *parent_, socket_base_t *owner_, - class zmq_engine_t *engine_); + session_t (object_t *parent_, socket_base_t *owner_, const char *name_); private: ~session_t (); // i_inout interface implementation. - bool read (::zmq_msg *msg_); - bool write (::zmq_msg *msg_); + bool read (::zmq_msg_t *msg_); + bool write (::zmq_msg_t *msg_); void flush (); // Handlers for incoming commands. void process_plug (); void process_unplug (); + void process_attach (class zmq_engine_t *engine_); class zmq_engine_t *engine; + // The name of the session. One that is used to register it with + // socket-level repository of sessions. + std::string name; + session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fa6c1e3..fb7bdcf 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string> #include <algorithm> #include "../include/zmq.h" @@ -30,16 +31,20 @@ #include "session.hpp" #include "config.hpp" #include "owned.hpp" +#include "uuid.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), - app_thread (parent_) + app_thread (parent_), + shutting_down (false) { } zmq::socket_base_t::~socket_base_t () { + shutting_down = true; + while (true) { // On third pass of the loop there should be no more I/O objects @@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t () } // Check whether there are no session leaks. + sessions_sync.lock (); zmq_assert (sessions.empty ()); + sessions_sync.unlock (); } -int zmq::socket_base_t::setsockopt (int option_, void *optval_, +int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { @@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, return 0; case ZMQ_IDENTITY: - if (optvallen_ != sizeof (const char*)) { - errno = EINVAL; - return -1; - } - options.identity = (const char*) optval_; + options.identity.assign ((const char*) optval_, optvallen_); return 0; default: @@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { + // Generate a unique name for the session. + std::string session_name ("#"); + session_name += uuid_t ().to_string (); + + // Create the session. + io_thread_t *io_thread = choose_io_thread (options.affinity); + session_t *session = new session_t (io_thread, this, session_name.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (this, session); + + // Create the connecter object. Supply it with the session name so that + // it can bind the new connection to the session once it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( - choose_io_thread (options.affinity), this, options); + choose_io_thread (options.affinity), this, options, + session_name.c_str ()); int rc = connecter->set_address (addr_); - if (rc != 0) + if (rc != 0) { + delete connecter; return -1; - + } send_plug (connecter); send_own (this, connecter); + return 0; } -int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -162,7 +181,7 @@ int zmq::socket_base_t::flush () zmq_assert (false); } -int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_) +int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { zmq_assert (false); } @@ -174,35 +193,40 @@ int zmq::socket_base_t::close () return 0; } -void zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { sessions_sync.lock (); - bool inserted = sessions.insert (std::make_pair (name_, session_)).second; - zmq_assert (inserted); + bool registered = sessions.insert (std::make_pair (name_, session_)).second; sessions_sync.unlock (); + return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +bool zmq::socket_base_t::unregister_session (const char *name_) { sessions_sync.lock (); sessions_t::iterator it = sessions.find (name_); - zmq_assert (it != sessions.end ()); + bool unregistered = (it != sessions.end ()); sessions.erase (it); sessions_sync.unlock (); + return unregistered; } -zmq::session_t *zmq::socket_base_t::get_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const char *name_) { sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); - session_t *session = NULL; - if (it != sessions.end ()) { - session = it->second; - session->inc_seqnum (); - } + if (it == sessions.end ()) { + sessions_sync.unlock (); + return NULL; + } + + // Prepare the session for subsequent attach command. + it->second->inc_seqnum (); + sessions_sync.unlock (); - return session; + return it->second; } void zmq::socket_base_t::process_own (owned_t *object_) @@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_term_req (owned_t *object_) { + // When shutting down we can ignore termination requests from owned + // objects. They are going to be terminated anyway. + if (shutting_down) + return; + // If I/O object is well and alive ask it to terminate. io_objects_t::iterator it = std::find (io_objects.begin (), io_objects.end (), object_); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 8e99654..20ac4e2 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -40,23 +40,22 @@ namespace zmq ~socket_base_t (); // Interface for communication with the API layer. - virtual int setsockopt (int option_, void *optval_, size_t optvallen_); + virtual int setsockopt (int option_, const void *optval_, + size_t optvallen_); virtual int bind (const char *addr_); virtual int connect (const char *addr_); - virtual int send (struct zmq_msg *msg_, int flags_); + virtual int send (struct zmq_msg_t *msg_, int flags_); virtual int flush (); - virtual int recv (struct zmq_msg *msg_, int flags_); + virtual int recv (struct zmq_msg_t *msg_, int flags_); virtual int close (); - // Functions that owned objects use to manipulate socket's list - // of existing sessions. - // Note that this functionality cannot be implemented via inter-thread + // The list of sessions cannot be accessed via inter-thread // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application // thread to 0MQ. - void register_session (const char *name_, class session_t *session_); - void unregister_session (const char *name_); - class session_t *get_session (const char *name_); + bool register_session (const char *name_, class session_t *session_); + bool unregister_session (const char *name_); + class session_t *find_session (const char *name_); private: @@ -80,10 +79,17 @@ namespace zmq // Socket options. options_t options; + // If true, socket is already shutting down. No new work should be + // started. + bool shutting_down; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is // synchronised using 'sessions_sync' mutex. + // Local sessions are those named by the local instance of 0MQ. + // Remote sessions are the sessions who's identities are provided by + // the remote party. typedef std::map <std::string, session_t*> sessions_t; sessions_t sessions; mutex_t sessions_sync; diff --git a/src/zmq.cpp b/src/zmq.cpp index 7ebe003..49096ad 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -26,76 +26,79 @@ #include "socket_base.hpp" #include "err.hpp" #include "dispatcher.hpp" -#include "msg.hpp" +#include "msg_content.hpp" -int zmq_msg_init (zmq_msg *msg_) +int zmq_msg_init (zmq_msg_t *msg_) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = 0; return 0; } -int zmq_msg_init_size (zmq_msg *msg_, size_t size_) +int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { - msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->vsm_size = (uint16_t) size_; } else { - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) + - size_); + msg_->content = + (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); if (!msg_->content) { errno = ENOMEM; return -1; } msg_->shared = 0; - - msg_->content->data = (void*) (msg_->content + 1); - msg_->content->size = size_; - msg_->content->ffn = NULL; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = (void*) (content + 1); + content->size = size_; + content->ffn = NULL; + new (&content->refcnt) zmq::atomic_counter_t (); } return 0; } -int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_, +int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_) { msg_->shared = 0; - msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content)); + msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); zmq_assert (msg_->content); - msg_->content->data = data_; - msg_->content->size = size_; - msg_->content->ffn = ffn_; - new (&msg_->content->refcnt) zmq::atomic_counter_t (); + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + content->data = data_; + content->size = size_; + content->ffn = ffn_; + new (&content->refcnt) zmq::atomic_counter_t (); return 0; } -int zmq_msg_close (zmq_msg *msg_) +int zmq_msg_close (zmq_msg_t *msg_) { - // For VSMs and delimiters there are no resources to free - if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_VSM || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + // For VSMs and delimiters there are no resources to free. + if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_VSM || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - // If the content is not shared, or if it is shared and the reference + // If the content is not shared, or if it is shared and the reference. // count has dropped to zero, deallocate it. - if (!msg_->shared || !msg_->content->refcnt.sub (1)) { + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!msg_->shared || !content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference + // We used "placement new" operator to initialize the reference. // counter so we call its destructor now. - msg_->content->refcnt.~atomic_counter_t (); + content->refcnt.~atomic_counter_t (); - if (msg_->content->ffn) - msg_->content->ffn (msg_->content->data); - free (msg_->content); + if (content->ffn) + content->ffn (content->data); + free (content); } return 0; } -int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); *dest_ = *src_; @@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) return 0; } -int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) +int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) { zmq_msg_close (dest_); // VSMs and delimiters require no special handling. if (src_->content != - (zmq_msg_content*) ZMQ_DELIMITER && - src_->content != (zmq_msg_content*) ZMQ_VSM && - src_->content != (zmq_msg_content*) ZMQ_GAP) { + (zmq::msg_content_t*) ZMQ_DELIMITER && + src_->content != (zmq::msg_content_t*) ZMQ_VSM && + src_->content != (zmq::msg_content_t*) ZMQ_GAP) { // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. + zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; if (src_->shared) - src_->content->refcnt.add (1); + content->refcnt.add (1); else { src_->shared = true; - src_->content->refcnt.set (2); + content->refcnt.set (2); } } @@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) return 0; } -void *zmq_msg_data (zmq_msg *msg_) +void *zmq_msg_data (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_data; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return NULL; - return msg_->content->data; + + return ((zmq::msg_content_t*) msg_->content)->data; } -size_t zmq_msg_size (zmq_msg *msg_) +size_t zmq_msg_size (zmq_msg_t *msg_) { - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_size; if (msg_->content == - (zmq_msg_content*) ZMQ_DELIMITER || - msg_->content == (zmq_msg_content*) ZMQ_GAP) + (zmq::msg_content_t*) ZMQ_DELIMITER || + msg_->content == (zmq::msg_content_t*) ZMQ_GAP) return 0; - return msg_->content->size; + + return ((zmq::msg_content_t*) msg_->content)->size; } -int zmq_msg_type (zmq_msg *msg_) +int zmq_msg_type (zmq_msg_t *msg_) { // If it's a genuine message, return 0. - if (msg_->content >= (zmq_msg_content*) ZMQ_VSM) + if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM) return 0; // Trick the compiler to believe that content is an integer. @@ -192,7 +198,8 @@ int zmq_close (void *s_) return 0; } -int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_) +int zmq_setsockopt (void *s_, int option_, const void *optval_, + size_t optvallen_) { return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, optvallen_)); @@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_) return (((zmq::socket_base_t*) s_)->connect (addr_)); } -int zmq_send (void *s_, zmq_msg *msg_, int flags_) +int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); } @@ -218,7 +225,7 @@ int zmq_flush (void *s_) return (((zmq::socket_base_t*) s_)->flush ()); } -int zmq_recv (void *s_, zmq_msg *msg_, int flags_) +int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 00c8cb2..e4e7eea 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -18,16 +18,18 @@ */ #include "zmq_connecter.hpp" -#include "zmq_init.hpp" +#include "zmq_connecter_init.hpp" #include "io_thread.hpp" #include "err.hpp" zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, - socket_base_t *owner_, const options_t &options_) : + socket_base_t *owner_, const options_t &options_, + const char *session_name_) : owned_t (parent_, owner_), io_object_t (parent_), handle_valid (false), - options (options_) + options (options_), + session_name (session_name_) { } @@ -76,7 +78,8 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options); + zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner, + fd, options, session_name.c_str ()); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index dcdec19..e308502 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ +#include <string> + #include "owned.hpp" #include "io_object.hpp" #include "tcp_connecter.hpp" @@ -34,15 +36,14 @@ namespace zmq public: zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, - const options_t &options_); + const options_t &options_, const char *session_name_); + ~zmq_connecter_t (); // Set IP address to connect to. int set_address (const char *addr_); private: - ~zmq_connecter_t (); - // Handlers for incoming commands. void process_plug (); void process_unplug (); @@ -68,6 +69,9 @@ namespace zmq // Associated socket options. options_t options; + // Name of the session associated with the connecter. + std::string session_name; + zmq_connecter_t (const zmq_connecter_t&); void operator = (const zmq_connecter_t&); }; diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp new file mode 100644 index 0000000..7048bd1 --- /dev/null +++ b/src/zmq_connecter_init.cpp @@ -0,0 +1,94 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "zmq_connecter_init.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, + socket_base_t *owner_, fd_t fd_, const options_t &options_, + const char *session_name_) : + owned_t (parent_, owner_), + options (options_), + session_name (session_name_) +{ + // Create associated engine object. + engine = new zmq_engine_t (parent_, fd_); + zmq_assert (engine); +} + +zmq::zmq_connecter_init_t::~zmq_connecter_init_t () +{ + if (engine) + delete engine; +} + +bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) +{ + // Send identity. + int rc = zmq_msg_init_size (msg_, options.identity.size ()); + zmq_assert (rc == 0); + memcpy (zmq_msg_data (msg_), options.identity.c_str (), + options.identity.size ()); + + // Initialisation is done at this point. Disconnect the engine from + // the init object. + engine->unplug (); + + // Find the session associated with this connecter. If it doesn't exist + // drop the newly created connection. If it does, attach it to the + // connection. + session_t *session = owner->find_session (session_name.c_str ()); + if (!session) { + // TODO + zmq_assert (false); + } + send_attach (session, engine); + engine = NULL; + + // Destroy the init object. + term (); + + return true; +} + +bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_) +{ + return false; +} + +void zmq::zmq_connecter_init_t::flush () +{ + // We are not expecting any messages. No point in flushing. + zmq_assert (false); +} + +void zmq::zmq_connecter_init_t::process_plug () +{ + zmq_assert (engine); + engine->plug (this); + owned_t::process_plug (); +} + +void zmq::zmq_connecter_init_t::process_unplug () +{ + if (engine) + engine->unplug (); +} diff --git a/src/zmq_init.hpp b/src/zmq_connecter_init.hpp index 5eb289e..79ea9e2 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_connecter_init.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ -#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ +#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ +#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ #include <string> @@ -33,48 +33,41 @@ namespace zmq { // The class handles initialisation phase of native 0MQ wire-level - // protocol. Currently it can be used to handle both sides of the - // connection. If it grows to complex, we can separate the two into - // distinct classes. + // protocol on the connecting side of the connection. - class zmq_init_t : public owned_t, public i_inout + class zmq_connecter_init_t : public owned_t, public i_inout { public: - // Set 'connected' to true if the connection was created by 'connect' - // function. If it was accepted from a listening socket, set it to - // false. - zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, - bool connected_, const options_t &options); - ~zmq_init_t (); + zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, const options_t &options, const char *session_name_); + ~zmq_connecter_init_t (); private: // i_inout interface implementation. - bool read (::zmq_msg *msg_); - bool write (::zmq_msg *msg_); + bool read (::zmq_msg_t *msg_); + bool write (::zmq_msg_t *msg_); void flush (); // Handlers for incoming commands. void process_plug (); void process_unplug (); - void create_session (); - - // Engine is created by zmq_init_t object. Once the initialisation - // phase is over it is passed to a session object, possibly running - // in a different I/O thread. + // Engine is created by zmq_connecter_init_t object. Once the + // initialisation phase is over it is passed to a session object, + // possibly running in a different I/O thread. zmq_engine_t *engine; - // If true, we are on the connecting side. If false, we are on the - // listening side. - bool connected; - // Associated socket options. options_t options; - zmq_init_t (const zmq_init_t&); - void operator = (const zmq_init_t&); + // Name of the session to bind new connection to. Makes sense only + // when 'connected' is true. + std::string session_name; + + zmq_connecter_init_t (const zmq_connecter_init_t&); + void operator = (const zmq_connecter_init_t&); }; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 17c28f8..212e68e 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -45,7 +45,7 @@ namespace zmq struct i_inout *destination; unsigned char tmpbuf [8]; - ::zmq_msg in_progress; + ::zmq_msg_t in_progress; zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 89af265..29563fc 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -43,7 +43,7 @@ namespace zmq bool message_ready (); struct i_inout *source; - ::zmq_msg in_progress; + ::zmq_msg_t in_progress; unsigned char tmpbuf [9]; zmq_encoder_t (const zmq_encoder_t&); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp deleted file mode 100644 index 124622d..0000000 --- a/src/zmq_init.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "zmq_init.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, bool connected_, const options_t &options_) : - owned_t (parent_, owner_), - connected (connected_), - options (options_) -{ - // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_); - zmq_assert (engine); -} - -zmq::zmq_init_t::~zmq_init_t () -{ - if (engine) - delete engine; -} - -bool zmq::zmq_init_t::read (::zmq_msg *msg_) -{ - // On the listening side, no initialisation data are sent to the peer. - if (!connected) - return false; - - // Send identity. - int rc = zmq_msg_init_size (msg_, options.identity.size ()); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (msg_), options.identity.c_str (), - options.identity.size ()); - - // Initialisation is done. - create_session (); - - return true; -} - -bool zmq::zmq_init_t::write (::zmq_msg *msg_) -{ - // On the connecting side no initialisation data are expected. - if (connected) - return false; - - // Retreieve the identity. - options.identity = std::string ((const char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); - - // Initialisation is done. - create_session (); - - return true; -} - -void zmq::zmq_init_t::flush () -{ - // No need to do anything. zmq_init_t does no batching of messages. - // Each message is processed immediately on write. -} - -void zmq::zmq_init_t::process_plug () -{ - zmq_assert (engine); - engine->plug (this); - owned_t::process_plug (); -} - -void zmq::zmq_init_t::process_unplug () -{ - if (engine) - engine->unplug (); -} - -void zmq::zmq_init_t::create_session () -{ - // Disconnect engine from the init object. - engine->unplug (); - - // Create the session instance. - io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new session_t (io_thread, owner, engine); - zmq_assert (session); - engine = NULL; - - // Pass session/engine pair to a chosen I/O thread. - send_plug (session); - send_own (owner, session); - - // Destroy the init object. - term (); -} diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 49bbf61..97164c1 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -18,7 +18,7 @@ */ #include "zmq_listener.hpp" -#include "zmq_init.hpp" +#include "zmq_listener_init.hpp" #include "io_thread.hpp" #include "err.hpp" @@ -68,7 +68,8 @@ void zmq::zmq_listener_t::in_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options); + zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner, + fd, options); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp new file mode 100644 index 0000000..bfd79b4 --- /dev/null +++ b/src/zmq_listener_init.cpp @@ -0,0 +1,96 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <string> + +#include "zmq_listener_init.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, + socket_base_t *owner_, fd_t fd_, const options_t &options_) : + owned_t (parent_, owner_), + options (options_) +{ + // Create associated engine object. + engine = new zmq_engine_t (parent_, fd_); + zmq_assert (engine); +} + +zmq::zmq_listener_init_t::~zmq_listener_init_t () +{ + if (engine) + delete engine; +} + +bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_) +{ + return false; +} + +bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) +{ + // Retreieve the remote identity. We'll use it as a local session name. + std::string session_name = std::string ((const char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + + // Initialisation is done. Disconnect the engine from the init object. + engine->unplug (); + + // Have a look whether the session already exists. If it does, attach it + // to the engine. If it doesn't create it first. + session_t *session = owner->find_session (session_name.c_str ()); + if (!session) { + io_thread_t *io_thread = choose_io_thread (options.affinity); + session = new session_t (io_thread, owner, session_name.c_str ()); + zmq_assert (session); + send_plug (session); + send_own (owner, session); + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + } + send_attach (session, engine); + engine = NULL; + + // Destroy the init object. + term (); + + return true; +} + +void zmq::zmq_listener_init_t::flush () +{ + // No need to do anything. zmq_listener_init_t does no batching + // of messages. Each message is processed immediately on write. +} + +void zmq::zmq_listener_init_t::process_plug () +{ + zmq_assert (engine); + engine->plug (this); + owned_t::process_plug (); +} + +void zmq::zmq_listener_init_t::process_unplug () +{ + if (engine) + engine->unplug (); +} diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp new file mode 100644 index 0000000..b061eaa --- /dev/null +++ b/src/zmq_listener_init.hpp @@ -0,0 +1,71 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ +#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ + +#include <string> + +#include "i_inout.hpp" +#include "owned.hpp" +#include "zmq_engine.hpp" +#include "stdint.hpp" +#include "fd.hpp" +#include "options.hpp" + +namespace zmq +{ + + // The class handles initialisation phase of native 0MQ wire-level + // protocol on the listening side of the connection. + + class zmq_listener_init_t : public owned_t, public i_inout + { + public: + + zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, const options_t &options); + ~zmq_listener_init_t (); + + private: + + // i_inout interface implementation. + bool read (::zmq_msg_t *msg_); + bool write (::zmq_msg_t *msg_); + void flush (); + + // Handlers for incoming commands. + void process_plug (); + void process_unplug (); + + // Engine is created by zmq_listener_init_t object. Once the + // initialisation phase is over it is passed to a session object, + // possibly running in a different I/O thread. + zmq_engine_t *engine; + + // Associated socket options. + options_t options; + + zmq_listener_init_t (const zmq_listener_init_t&); + void operator = (const zmq_listener_init_t&); + }; + +} + +#endif |