diff options
| author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-27 16:24:21 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-27 16:24:21 +0200 | 
| commit | 2dd501651592baa7f9e49f52e1321ae2b9b4e126 (patch) | |
| tree | fe5d221061004894eb259304fcdbd4f8092de99d /src | |
| parent | 67194267f89d63391288600f127205a2b7a8a5ae (diff) | |
multiple bugs fixed
Diffstat (limited to 'src')
| -rw-r--r-- | src/i_inout.hpp | 8 | ||||
| -rw-r--r-- | src/pipe.hpp | 3 | ||||
| -rw-r--r-- | src/session.cpp | 13 | ||||
| -rw-r--r-- | src/session.hpp | 1 | ||||
| -rw-r--r-- | src/zmq_connecter_init.cpp | 6 | ||||
| -rw-r--r-- | src/zmq_connecter_init.hpp | 1 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 1 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 5 | ||||
| -rw-r--r-- | src/zmq_listener_init.cpp | 33 | ||||
| -rw-r--r-- | src/zmq_listener_init.hpp | 5 | 
10 files changed, 61 insertions, 15 deletions
diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 8901c04..89b9fbd 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -27,9 +27,17 @@ namespace zmq      struct i_inout      { +        //  Engine asks to get a message to send to the network.          virtual bool read (::zmq_msg_t *msg_) = 0; + +        //  Engine sends the incoming message further on downstream.          virtual bool write (::zmq_msg_t *msg_) = 0; + +        //  Flush all the previously written messages downstream.          virtual void flush () = 0; +     +        //  Drop all the references to the engine. +        virtual void detach () = 0;      };  } diff --git a/src/pipe.hpp b/src/pipe.hpp index d48fc47..b7593c7 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -42,8 +42,9 @@ namespace zmq          //  Reads a message to the underlying pipe.          bool read (struct zmq_msg_t *msg_); -        //  Mnaipulation of index of the pipe.          void set_endpoint (i_endpoint *endpoint_); + +        //  Mnaipulation of index of the pipe.          void set_index (int index_);          int get_index (); diff --git a/src/session.cpp b/src/session.cpp index 115fb85..0b1b947 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -26,7 +26,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,        const char *name_, const options_t &options_) :      owned_t (parent_, owner_),      in_pipe (NULL), -    active (false), +    active (true),      out_pipe (NULL),      engine (NULL),      name (name_), @@ -74,6 +74,16 @@ void zmq::session_t::flush ()      out_pipe->flush ();  } +void zmq::session_t::detach () +{ +    //  Engine is terminating itself. +    engine = NULL; + +    //  TODO: In the case od anonymous connection, terminate the session. +//    if (anonymous) +//        term (); +} +  void zmq::session_t::revive (reader_t *pipe_)  {      zmq_assert (in_pipe == pipe_); @@ -98,6 +108,7 @@ void zmq::session_t::process_plug ()          pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);          zmq_assert (inbound);          in_pipe = &inbound->reader; +        in_pipe->set_endpoint (this);          pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);          zmq_assert (outbound);          out_pipe = &outbound->writer; diff --git a/src/session.hpp b/src/session.hpp index b79fb4b..4a0882b 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -48,6 +48,7 @@ namespace zmq          bool read (::zmq_msg_t *msg_);          bool write (::zmq_msg_t *msg_);          void flush (); +        void detach ();          //  i_endpoint interface implementation.          void revive (class reader_t *pipe_); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 7048bd1..7326ebe 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -80,6 +80,12 @@ void zmq::zmq_connecter_init_t::flush ()      zmq_assert (false);  } +void zmq::zmq_connecter_init_t::detach () +{ +    //  TODO: Engine is closing down. Init object is to be closed as well. +    zmq_assert (false); +} +  void zmq::zmq_connecter_init_t::process_plug ()  {      zmq_assert (engine); diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp index 79ea9e2..3f42fc6 100644 --- a/src/zmq_connecter_init.hpp +++ b/src/zmq_connecter_init.hpp @@ -49,6 +49,7 @@ namespace zmq          bool read (::zmq_msg_t *msg_);          bool write (::zmq_msg_t *msg_);          void flush (); +        void detach ();          //  Handlers for incoming commands.          void process_plug (); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 39b7192..55e1a83 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -73,4 +73,3 @@ bool zmq::zmq_encoder_t::message_ready ()      }      return true;  } - diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index cd7ad7e..3cab4c9 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -136,5 +136,8 @@ void zmq::zmq_engine_t::revive ()  void zmq::zmq_engine_t::error ()  { -    zmq_assert (false); +    zmq_assert (inout); +    inout->detach (); +    unplug (); +    delete this;  } diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 7e2f311..c188030 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -17,8 +17,6 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include <string> -  #include "zmq_listener_init.hpp"  #include "io_thread.hpp"  #include "session.hpp" @@ -27,7 +25,8 @@  zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,        socket_base_t *owner_, fd_t fd_, const options_t &options_) :      owned_t (parent_, owner_), -    options (options_) +    options (options_), +    has_peer_identity (false)  {      //  Create associated engine object.      engine = new zmq_engine_t (parent_, fd_); @@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)  bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)  { +    //  Once we've got peer's identity we aren't interested in subsequent +    //  messages. +    if (has_peer_identity) +        return false; +      //  Retreieve the remote identity. We'll use it as a local session name. -    std::string session_name = std::string ((const char*) zmq_msg_data (msg_), +    has_peer_identity = true; +    peer_identity.assign ((const char*) zmq_msg_data (msg_),          zmq_msg_size (msg_)); +     + +    return true; +} + +void zmq::zmq_listener_init_t::flush () +{ +    zmq_assert (has_peer_identity);      //  Initialisation is done. Disconnect the engine from the init object.      engine->unplug ();      //  Have a look whether the session already exists. If it does, attach it      //  to the engine. If it doesn't create it first. -    session_t *session = owner->find_session (session_name.c_str ()); +    session_t *session = owner->find_session (peer_identity.c_str ());      if (!session) {          io_thread_t *io_thread = choose_io_thread (options.affinity); -        session = new session_t (io_thread, owner, session_name.c_str (), +        session = new session_t (io_thread, owner, peer_identity.c_str (),              options);          zmq_assert (session);          send_plug (session); @@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)      //  Destroy the init object.      term (); - -    return true;  } -void zmq::zmq_listener_init_t::flush () +void zmq::zmq_listener_init_t::detach ()  { -    //  No need to do anything. zmq_listener_init_t does no batching -    //  of messages. Each message is processed immediately on write. +    //  TODO: Engine is closing down. Init object is to be closed as well. +    zmq_assert (false);  }  void zmq::zmq_listener_init_t::process_plug () diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp index b061eaa..885b36b 100644 --- a/src/zmq_listener_init.hpp +++ b/src/zmq_listener_init.hpp @@ -49,6 +49,7 @@ namespace zmq          bool read (::zmq_msg_t *msg_);          bool write (::zmq_msg_t *msg_);          void flush (); +        void detach ();          //  Handlers for incoming commands.          void process_plug (); @@ -62,6 +63,10 @@ namespace zmq          //  Associated socket options.          options_t options; +        //  Indetity on the other end of the connection. +        bool has_peer_identity; +        std::string peer_identity; +          zmq_listener_init_t (const zmq_listener_init_t&);          void operator = (const zmq_listener_init_t&);      };  | 
