diff options
| -rw-r--r-- | src/Makefile.am | 8 | ||||
| -rw-r--r-- | src/i_inout.hpp | 6 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 3 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 6 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 3 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 6 | ||||
| -rw-r--r-- | src/session.cpp | 54 | ||||
| -rw-r--r-- | src/session.hpp | 24 | ||||
| -rw-r--r-- | src/socket_base.cpp | 83 | ||||
| -rw-r--r-- | src/socket_base.hpp | 27 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 15 | ||||
| -rw-r--r-- | src/zmq_connecter.hpp | 8 | ||||
| -rw-r--r-- | src/zmq_connecter_init.cpp | 132 | ||||
| -rw-r--r-- | src/zmq_connecter_init.hpp | 79 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 2 | ||||
| -rw-r--r-- | src/zmq_init.cpp | 195 | ||||
| -rw-r--r-- | src/zmq_init.hpp (renamed from src/zmq_listener_init.hpp) | 52 | ||||
| -rw-r--r-- | src/zmq_listener.cpp | 6 | ||||
| -rw-r--r-- | src/zmq_listener_init.cpp | 137 | 
19 files changed, 387 insertions, 459 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 0fdaf37..0f47e7b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -115,12 +115,11 @@ libzmq_la_SOURCES = app_thread.hpp \      ypollset.hpp \      yqueue.hpp \      zmq_connecter.hpp \ -    zmq_connecter_init.hpp \      zmq_decoder.hpp \      zmq_encoder.hpp \      zmq_engine.hpp \ +    zmq_init.hpp \      zmq_listener.hpp \ -    zmq_listener_init.hpp \      app_thread.cpp \      devpoll.cpp \      dispatcher.cpp \ @@ -161,12 +160,11 @@ libzmq_la_SOURCES = app_thread.hpp \      ypollset.cpp \      zmq.cpp \      zmq_connecter.cpp \ -    zmq_connecter_init.cpp \      zmq_decoder.cpp \      zmq_encoder.cpp \      zmq_engine.cpp \ -    zmq_listener.cpp \ -    zmq_listener_init.cpp +    zmq_init.cpp \ +    zmq_listener.cpp  libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@ diff --git a/src/i_inout.hpp b/src/i_inout.hpp index b82a476..8a0ce6a 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -22,6 +22,8 @@  #include "../bindings/c/zmq.h" +#include "stdint.hpp" +  namespace zmq  { @@ -47,8 +49,8 @@ namespace zmq          //  Return pointer to the owning socket.          virtual class socket_base_t *get_owner () = 0; -        //  Returns the name of associated session. -        virtual const char *get_session_name () = 0; +        //  Return ordinal number of the session. +        virtual uint64_t get_ordinal () = 0;      };  } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 2a24858..1d4d695 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -37,11 +37,10 @@  #include "i_inout.hpp"  zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,  -      const options_t &options_, const char *session_name_) : +      const options_t &options_) :      io_object_t (parent_),      pgm_socket (true, options_),      options (options_), -    session_name (session_name_),      inout (NULL)  {  } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index fa84acb..91169f4 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -47,8 +47,7 @@ namespace zmq          //  Creates gm_engine. Underlying PGM connection is initialised          //  using network_ parameter. -        pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, -            const char *session_name_); +        pgm_receiver_t (class io_thread_t *parent_, const options_t &options_);          ~pgm_receiver_t ();          int init (bool udp_encapsulation_, const char *network_); @@ -94,9 +93,6 @@ namespace zmq          //  Socket options.          options_t options; -        //  Name of the session associated with the connecter. -        std::string session_name; -          //  Parent session.          i_inout *inout; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 676ed93..880bb09 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -33,12 +33,11 @@  #include "wire.hpp"  zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,  -      const options_t &options_, const char *session_name_) : +      const options_t &options_) :      io_object_t (parent_),      encoder (0, false),      pgm_socket (false, options_),      options (options_), -    session_name (session_name_),      inout (NULL),      out_buffer (NULL),      out_buffer_size (0), diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 9a9844c..30b545d 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -42,8 +42,7 @@ namespace zmq      {      public: -        pgm_sender_t (class io_thread_t *parent_, const options_t &options_,  -            const char *session_name_); +        pgm_sender_t (class io_thread_t *parent_, const options_t &options_);          ~pgm_sender_t ();          int init (bool udp_encapsulation_, const char *network_); @@ -74,9 +73,6 @@ namespace zmq          //  Socket options.          options_t options; -        //  Name of the session associated with the connecter. -        std::string session_name; -          //  Poll handle associated with PGM socket.          handle_t handle;          handle_t uplink_handle; diff --git a/src/session.cpp b/src/session.cpp index 37f2720..a17e205 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -25,16 +25,41 @@  #include "pipe.hpp"  zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, -      const char *name_, const options_t &options_, bool reconnect_) : +      const options_t &options_) :      owned_t (parent_, owner_),      in_pipe (NULL),      active (true),      out_pipe (NULL),      engine (NULL), -    name (name_), -    options (options_), -    reconnect (reconnect_) +    options (options_)  { +    type = unnamed; +     +    //  It's possible to register the session at this point as it will be +    //  searched for only on reconnect, i.e. no race condition (session found +    //  before it is plugged into it's I/O thread) is possible. +    ordinal = owner->register_session (this); +} + +zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, +      const options_t &options_, const char *name_) : +    owned_t (parent_, owner_), +    in_pipe (NULL), +    active (true), +    out_pipe (NULL), +    engine (NULL), +    options (options_) +{ +    if (name_) { +        type = named; +        name = name_; +        ordinal = 0; +    } +    else { +        type = transient; +        //  TODO: Generate unique name here. +        ordinal = 0; +    }  }  zmq::session_t::~session_t () @@ -78,8 +103,8 @@ void zmq::session_t::detach (owned_t *reconnecter_)      //  Engine is terminating itself. No need to deallocate it from here.      engine = NULL; -    //  In the case od anonymous connection, terminate the session. -    if (name.empty ()) +    //  Terminate transient session. +    if (type == transient)          term ();  } @@ -93,9 +118,11 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()      return owner;  } -const char *zmq::session_t::get_session_name () +uint64_t zmq::session_t::get_ordinal ()  { -    return name.c_str (); +    zmq_assert (type == unnamed); +    zmq_assert (ordinal); +    return ordinal;  }  void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -181,11 +208,12 @@ void zmq::session_t::process_plug ()  void zmq::session_t::process_unplug ()  { -    //  Unregister the session from the socket. -    if (!name.empty ()) { -        bool ok = owner->unregister_session (name.c_str ()); -        zmq_assert (ok); -    } +    //  Unregister the session from the socket. There's nothing to do here +    //  for transient sessions. +    if (type == unnamed) +        owner->unregister_session (ordinal); +    else if (type == named) +        owner->unregister_session (name.c_str ());      //  Ask associated pipes to terminate.      if (in_pipe) { diff --git a/src/session.hpp b/src/session.hpp index 72e1d59..c60cfc7 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -34,8 +34,14 @@ namespace zmq      {      public: -        session_t (object_t *parent_, socket_base_t *owner_, const char *name_, -            const options_t &options_, bool reconnect_); +        //  Creates unnamed session. +        session_t (object_t *parent_, socket_base_t *owner_, +            const options_t &options_); + +        //  Creates named session. If name is NULL, transient session with +        //  auto-generated name is created. +        session_t (object_t *parent_, socket_base_t *owner_, +            const options_t &options_, const char *name_);          //  i_inout interface implementation.          bool read (::zmq_msg_t *msg_); @@ -44,7 +50,7 @@ namespace zmq          void detach (owned_t *reconnecter_);          class io_thread_t *get_io_thread ();          class socket_base_t *get_owner (); -        const char *get_session_name (); +        uint64_t get_ordinal ();          //  i_endpoint interface implementation.          void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); @@ -73,6 +79,15 @@ namespace zmq          struct i_engine *engine; +        enum { +            transient, +            named, +            unnamed +        } type; + +        //  Ordinal of the session (if any). +        uint64_t ordinal; +          //  The name of the session. One that is used to register it with          //  socket-level repository of sessions.          std::string name; @@ -80,9 +95,6 @@ namespace zmq          //  Inherited socket options.          options_t options; -        //  If true, reconnection is required after connection breaks. -        bool reconnect; -          session_t (const session_t&);          void operator = (const session_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fde258c..43209d5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -32,7 +32,6 @@  #include "session.hpp"  #include "config.hpp"  #include "owned.hpp" -#include "uuid.hpp"  #include "pipe.hpp"  #include "err.hpp"  #include "platform.hpp" @@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :      app_thread (parent_),      shutting_down (false),      sent_seqnum (0), -    processed_seqnum (0) +    processed_seqnum (0), +    next_ordinal (1)  {  } @@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_)  int zmq::socket_base_t::connect (const char *addr_)  { -    //  Generate a unique name for the session. -    std::string session_name ("#"); -    session_name += uuid_t ().to_string (); -      //  Parse addr_ string.      std::string addr_type;      std::string addr_args; @@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_)          return 0;      } -    //  Create the session. +    //  Create unnamed session.      io_thread_t *io_thread = choose_io_thread (options.affinity); -    session_t *session = new (std::nothrow) session_t (io_thread, this, -        session_name.c_str (), options, true); +    session_t *session = new (std::nothrow) session_t (io_thread, +        this, options);      zmq_assert (session);      pipe_t *in_pipe = NULL; @@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_)          //  it is established.          zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (              choose_io_thread (options.affinity), this, options, -            session_name.c_str (), false); +            session->get_ordinal (), false);          zmq_assert (connecter);          int rc = connecter->set_address (addr_args.c_str ());          if (rc != 0) { @@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_)              //  PGM sender.              pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t ( -                choose_io_thread (options.affinity), options, -                session_name.c_str ()); +                choose_io_thread (options.affinity), options);              zmq_assert (pgm_sender);              int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); @@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_)              //  PGM receiver.              pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t ( -                choose_io_thread (options.affinity), options,  -                session_name.c_str ()); +                choose_io_thread (options.affinity), options);              zmq_assert (pgm_receiver);              int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); @@ -408,7 +402,8 @@ int zmq::socket_base_t::close ()      //  Check whether there are no session leaks.      sessions_sync.lock (); -    zmq_assert (sessions.empty ()); +    zmq_assert (named_sessions.empty ()); +    zmq_assert (unnamed_sessions.empty ());      sessions_sync.unlock ();      delete this; @@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_,      session_t *session_)  {      sessions_sync.lock (); -    bool registered = sessions.insert (std::make_pair (name_, session_)).second; +    bool registered = +        named_sessions.insert (std::make_pair (name_, session_)).second;      sessions_sync.unlock ();      return registered;  } -bool zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const char *name_)  {      sessions_sync.lock (); -    sessions_t::iterator it = sessions.find (name_); -    bool unregistered = (it != sessions.end ()); -    sessions.erase (it); +    named_sessions_t::iterator it = named_sessions.find (name_); +    zmq_assert (it != named_sessions.end ()); +    named_sessions.erase (it);      sessions_sync.unlock (); -    return unregistered;  }  zmq::session_t *zmq::socket_base_t::find_session (const char *name_)  {      sessions_sync.lock (); -    sessions_t::iterator it = sessions.find (name_); -    if (it == sessions.end ()) { +    named_sessions_t::iterator it = named_sessions.find (name_); +    if (it == named_sessions.end ()) { +        sessions_sync.unlock (); +        return NULL; +    } +    session_t *session = it->second; + +    //  Prepare the session for subsequent attach command. +    session->inc_seqnum (); + +    sessions_sync.unlock (); +    return session;     +} + +uint64_t zmq::socket_base_t::register_session (session_t *session_) +{ +    sessions_sync.lock (); +    uint64_t ordinal = next_ordinal; +    next_ordinal++; +    unnamed_sessions.insert (std::make_pair (ordinal, session_)).second; +    sessions_sync.unlock (); +    return ordinal; +} + +void zmq::socket_base_t::unregister_session (uint64_t ordinal_) +{ +    sessions_sync.lock (); +    unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); +    zmq_assert (it != unnamed_sessions.end ()); +    unnamed_sessions.erase (it); +    sessions_sync.unlock (); +} + +zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) +{ +    sessions_sync.lock (); + +    unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); +    if (it == unnamed_sessions.end ()) {          sessions_sync.unlock ();          return NULL;      } +    session_t *session = it->second;      //  Prepare the session for subsequent attach command. -    it->second->inc_seqnum (); +    session->inc_seqnum ();      sessions_sync.unlock (); -    return it->second;     +    return session;   }  void zmq::socket_base_t::kill (reader_t *pipe_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 79a8340..16553ea 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -34,6 +34,7 @@  #include "options.hpp"  #include "stdint.hpp"  #include "atomic_counter.hpp" +#include "stdint.hpp"  namespace zmq  { @@ -74,9 +75,15 @@ namespace zmq          //  commands as it is unacceptable to wait for the completion of the          //  action till user application yields control of the application          //  thread to 0MQ. Locking is used instead. +        //  There are two distinct types of sessions: those identified by name +        //  and those identified by ordinal number. Thus two sets of session +        //  management functions.          bool register_session (const char *name_, class session_t *session_); -        bool unregister_session (const char *name_); +        void unregister_session (const char *name_);          class session_t *find_session (const char *name_); +        uint64_t register_session (class session_t *session_); +        void unregister_session (uint64_t ordinal_); +        class session_t *find_session (uint64_t ordinal_);          //  i_endpoint interface implementation.          void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); @@ -144,15 +151,15 @@ namespace zmq          //  Sequence number of the last command processed by this object.          uint64_t processed_seqnum; -        //  List of existing sessions. This list is never referenced from within -        //  the socket, instead it is used by I/O objects owned by the session. -        //  As those objects can live in different threads, the access is -        //  synchronised using 'sessions_sync' mutex. -        //  Local sessions are those named by the local instance of 0MQ. -        //  Remote sessions are the sessions who's identities are provided by -        //  the remote party. -        typedef std::map <std::string, session_t*> sessions_t; -        sessions_t sessions; +        //  Lists of existing sessions. This lists are never referenced from +        //  within the socket, instead they are used by I/O objects owned by +        //  the socket. As those objects can live in different threads, +        //  the access is synchronised by mutex. +        typedef std::map <std::string, session_t*> named_sessions_t; +        named_sessions_t named_sessions; +        typedef std::map <uint64_t, session_t*> unnamed_sessions_t; +        unnamed_sessions_t unnamed_sessions; +        uint64_t next_ordinal;          mutex_t sessions_sync;          socket_base_t (const socket_base_t&); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 5bda48d..8f95fc0 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -20,19 +20,20 @@  #include <new>  #include "zmq_connecter.hpp" -#include "zmq_connecter_init.hpp" +#include "zmq_engine.hpp" +#include "zmq_init.hpp"  #include "io_thread.hpp"  #include "err.hpp"  zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,        socket_base_t *owner_, const options_t &options_, -      const char *session_name_, bool wait_) : +      uint64_t session_ordinal_, bool wait_) :      owned_t (parent_, owner_),      io_object_t (parent_),      handle_valid (false),      wait (wait_), -    options (options_), -    session_name (session_name_) +    session_ordinal (session_ordinal_), +    options (options_)  {  } @@ -88,9 +89,9 @@ void zmq::zmq_connecter_t::out_event ()      }      //  Create an init object.  -    io_thread_t *io_thread = choose_io_thread (options.affinity); -    zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t ( -        io_thread, owner, fd, options, session_name.c_str (), address.c_str ()); +    zmq_init_t *init = new (std::nothrow) zmq_init_t ( +        choose_io_thread (options.affinity), owner, +        fd, options, true, address.c_str (), session_ordinal);      zmq_assert (init);      send_plug (init);      send_own (owner, init); diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index acd3352..e5b4a70 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -36,7 +36,7 @@ namespace zmq      public:          zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, -            const options_t &options_, const char *session_name_, bool wait_); +            const options_t &options_, uint64_t session_ordinal_, bool wait_);          ~zmq_connecter_t ();          //  Set IP address to connect to. @@ -69,12 +69,12 @@ namespace zmq          //  If true, connecter is waiting a while before trying to connect.          bool wait; +        //  Ordinal of the session to attach to. +        uint64_t session_ordinal; +          //  Associated socket options.          options_t options; -        //  Name of the session associated with the connecter. -        std::string session_name; -          //  Address to connect to.          std::string address; diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp deleted file mode 100644 index f8436a3..0000000 --- a/src/zmq_connecter_init.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* -    Copyright (c) 2007-2009 FastMQ Inc. - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <new> - -#include "zmq_connecter_init.hpp" -#include "zmq_connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, -      socket_base_t *owner_, fd_t fd_, const options_t &options_, -      const char *session_name_, const char *address_) : -    owned_t (parent_, owner_), -    options (options_), -    session_name (session_name_) -{ -    //  Create associated engine object. -    engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true, -        address_); -    zmq_assert (engine); -} - -zmq::zmq_connecter_init_t::~zmq_connecter_init_t () -{ -    if (engine) -        delete engine; -} - -bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) -{ -    //  Send identity. -    int rc = zmq_msg_init_size (msg_, options.identity.size ()); -    zmq_assert (rc == 0); -    memcpy (zmq_msg_data (msg_), options.identity.c_str (), -        options.identity.size ()); - -    //  Initialisation is done at this point. Disconnect the engine from -    //  the init object. -    engine->unplug (); - -    //  Find the session associated with this connecter. If it doesn't exist -    //  drop the newly created connection. If it does, attach it to the -    //  connection. -    session_t *session = NULL; -    if (!session_name.empty ()) -        session = owner->find_session (session_name.c_str ()); -    if (!session) { - -        //  TODO: -        //  The socket is already closing. The session is already shut down, -        //  so no point in continuing with connecting. Shut the connection down. -        zmq_assert (false); -    } - -    //  No need to increment seqnum as it was alredy incremented above. -    send_attach (session, engine, false); -    engine = NULL; - -    //  Destroy the init object. -    term (); - -    return true; -} - -bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_) -{ -    return false; -} - -void zmq::zmq_connecter_init_t::flush () -{ -    //  We are not expecting any messages. No point in flushing. -} - -void zmq::zmq_connecter_init_t::detach (owned_t *reconnecter_) -{ -    //  Plug in the reconnecter object. -    zmq_assert (reconnecter_); -    send_plug (reconnecter_); -    send_own (owner, reconnecter_); - -    //  This function is called by engine when disconnection occurs. -    //  The engine will destroy itself, so we just drop the pointer here and -    //  start termination of the init object. -    engine = NULL; -    term (); -} - -zmq::io_thread_t *zmq::zmq_connecter_init_t::get_io_thread () -{ -    return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_connecter_init_t::get_owner () -{ -    return owner; -} - -const char *zmq::zmq_connecter_init_t::get_session_name () -{ -    return session_name.c_str (); -} - -void zmq::zmq_connecter_init_t::process_plug () -{ -    zmq_assert (engine); -    engine->plug (this); -} - -void zmq::zmq_connecter_init_t::process_unplug () -{ -    if (engine) -        engine->unplug (); -} diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp deleted file mode 100644 index 03ccd24..0000000 --- a/src/zmq_connecter_init.hpp +++ /dev/null @@ -1,79 +0,0 @@ -/* -    Copyright (c) 2007-2009 FastMQ Inc. - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ -#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__ - -#include <string> - -#include "i_inout.hpp" -#include "owned.hpp" -#include "zmq_engine.hpp" -#include "stdint.hpp" -#include "fd.hpp" -#include "options.hpp" - -namespace zmq -{ - -    //  The class handles initialisation phase of native 0MQ wire-level -    //  protocol on the connecting side of the connection. - -    class zmq_connecter_init_t : public owned_t, public i_inout -    { -    public: - -        zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_, -            fd_t fd_, const options_t &options, const char *session_name_, -            const char *address_); -        ~zmq_connecter_init_t (); - -    private: - -        //  i_inout interface implementation. -        bool read (::zmq_msg_t *msg_); -        bool write (::zmq_msg_t *msg_); -        void flush (); -        void detach (owned_t *reconnecter_); -        class io_thread_t *get_io_thread (); -        class socket_base_t *get_owner (); -        const char *get_session_name (); - -        //  Handlers for incoming commands. -        void process_plug (); -        void process_unplug (); - -        //  Engine is created by zmq_connecter_init_t object. Once the -        //  initialisation phase is over it is passed to a session object, -        //  possibly running in a different I/O thread. -        zmq_engine_t *engine; - -        //  Associated socket options. -        options_t options; - -        //  Name of the session to bind new connection to. -        std::string session_name; - -        zmq_connecter_init_t (const zmq_connecter_init_t&); -        void operator = (const zmq_connecter_init_t&); -    }; - -} - -#endif diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index c2878a7..cfc87a7 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -159,7 +159,7 @@ void zmq::zmq_engine_t::error ()          //  Ask it to wait for a while before reconnecting.          reconnecter = new (std::nothrow) zmq_connecter_t (              inout->get_io_thread (), inout->get_owner (), -            options, inout->get_session_name (), true); +            options, inout->get_ordinal (), true);          zmq_assert (reconnecter);          reconnecter->set_address (address.c_str ());      } diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp new file mode 100644 index 0000000..e526b34 --- /dev/null +++ b/src/zmq_init.cpp @@ -0,0 +1,195 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "zmq_init.hpp" +#include "zmq_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" + +zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, +      fd_t fd_, const options_t &options_, bool reconnect_, +      const char *address_, uint64_t session_ordinal_) : +    owned_t (parent_, owner_), +    sent (false), +    received (false), +    session_ordinal (session_ordinal_), +    options (options_) +{ +    //  Create the engine object for this connection. +    engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, +        reconnect_, address_); +    zmq_assert (engine); +} + +zmq::zmq_init_t::~zmq_init_t () +{ +    if (engine) +        delete engine; +} + +bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) +{ +    //  If the identity was already sent, do nothing. +    if (sent) +        return false; + +    //  Send the identity. +    int rc = zmq_msg_init_size (msg_, options.identity.size ()); +    zmq_assert (rc == 0); +    memcpy (zmq_msg_data (msg_), options.identity.c_str (), +        options.identity.size ()); +    sent = true; + +    //  If initialisation is done, pass the engine to the session and +    //  destroy the init object. +    finalise (); + +    return true; +} + +bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) +{ +    //  If identity was already received, we are not interested +    //  in subsequent messages. +    if (received) +        return false; + +    //  Retreieve the remote identity. +    peer_identity.assign ((const char*) zmq_msg_data (msg_), +        zmq_msg_size (msg_)); +    received = true; + +    return true; +} + +void zmq::zmq_init_t::flush () +{ +    //  Check if there's anything to flush. +    if (!received) +        return; + +    //  If initialisation is done, pass the engine to the session and +    //  destroy the init object. +    finalise (); +} + +void zmq::zmq_init_t::detach (owned_t *reconnecter_) +{ +    // | 
