From 2dd501651592baa7f9e49f52e1321ae2b9b4e126 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 27 Aug 2009 16:24:21 +0200 Subject: multiple bugs fixed --- src/i_inout.hpp | 8 ++++++++ src/pipe.hpp | 3 ++- src/session.cpp | 13 ++++++++++++- src/session.hpp | 1 + src/zmq_connecter_init.cpp | 6 ++++++ src/zmq_connecter_init.hpp | 1 + src/zmq_encoder.cpp | 1 - src/zmq_engine.cpp | 5 ++++- src/zmq_listener_init.cpp | 33 ++++++++++++++++++++++----------- src/zmq_listener_init.hpp | 5 +++++ 10 files changed, 61 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 8901c04..89b9fbd 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -27,9 +27,17 @@ namespace zmq struct i_inout { + // Engine asks to get a message to send to the network. virtual bool read (::zmq_msg_t *msg_) = 0; + + // Engine sends the incoming message further on downstream. virtual bool write (::zmq_msg_t *msg_) = 0; + + // Flush all the previously written messages downstream. virtual void flush () = 0; + + // Drop all the references to the engine. + virtual void detach () = 0; }; } diff --git a/src/pipe.hpp b/src/pipe.hpp index d48fc47..b7593c7 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -42,8 +42,9 @@ namespace zmq // Reads a message to the underlying pipe. bool read (struct zmq_msg_t *msg_); - // Mnaipulation of index of the pipe. void set_endpoint (i_endpoint *endpoint_); + + // Mnaipulation of index of the pipe. void set_index (int index_); int get_index (); diff --git a/src/session.cpp b/src/session.cpp index 115fb85..0b1b947 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -26,7 +26,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, const char *name_, const options_t &options_) : owned_t (parent_, owner_), in_pipe (NULL), - active (false), + active (true), out_pipe (NULL), engine (NULL), name (name_), @@ -74,6 +74,16 @@ void zmq::session_t::flush () out_pipe->flush (); } +void zmq::session_t::detach () +{ + // Engine is terminating itself. + engine = NULL; + + // TODO: In the case od anonymous connection, terminate the session. +// if (anonymous) +// term (); +} + void zmq::session_t::revive (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); @@ -98,6 +108,7 @@ void zmq::session_t::process_plug () pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); zmq_assert (inbound); in_pipe = &inbound->reader; + in_pipe->set_endpoint (this); pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); zmq_assert (outbound); out_pipe = &outbound->writer; diff --git a/src/session.hpp b/src/session.hpp index b79fb4b..4a0882b 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -48,6 +48,7 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); + void detach (); // i_endpoint interface implementation. void revive (class reader_t *pipe_); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 7048bd1..7326ebe 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -80,6 +80,12 @@ void zmq::zmq_connecter_init_t::flush () zmq_assert (false); } +void zmq::zmq_connecter_init_t::detach () +{ + // TODO: Engine is closing down. Init object is to be closed as well. + zmq_assert (false); +} + void zmq::zmq_connecter_init_t::process_plug () { zmq_assert (engine); diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp index 79ea9e2..3f42fc6 100644 --- a/src/zmq_connecter_init.hpp +++ b/src/zmq_connecter_init.hpp @@ -49,6 +49,7 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); + void detach (); // Handlers for incoming commands. void process_plug (); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 39b7192..55e1a83 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -73,4 +73,3 @@ bool zmq::zmq_encoder_t::message_ready () } return true; } - diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index cd7ad7e..3cab4c9 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -136,5 +136,8 @@ void zmq::zmq_engine_t::revive () void zmq::zmq_engine_t::error () { - zmq_assert (false); + zmq_assert (inout); + inout->detach (); + unplug (); + delete this; } diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 7e2f311..c188030 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -17,8 +17,6 @@ along with this program. If not, see . */ -#include - #include "zmq_listener_init.hpp" #include "io_thread.hpp" #include "session.hpp" @@ -27,7 +25,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, const options_t &options_) : owned_t (parent_, owner_), - options (options_) + options (options_), + has_peer_identity (false) { // Create associated engine object. engine = new zmq_engine_t (parent_, fd_); @@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_) bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) { + // Once we've got peer's identity we aren't interested in subsequent + // messages. + if (has_peer_identity) + return false; + // Retreieve the remote identity. We'll use it as a local session name. - std::string session_name = std::string ((const char*) zmq_msg_data (msg_), + has_peer_identity = true; + peer_identity.assign ((const char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); + + + return true; +} + +void zmq::zmq_listener_init_t::flush () +{ + zmq_assert (has_peer_identity); // Initialisation is done. Disconnect the engine from the init object. engine->unplug (); // Have a look whether the session already exists. If it does, attach it // to the engine. If it doesn't create it first. - session_t *session = owner->find_session (session_name.c_str ()); + session_t *session = owner->find_session (peer_identity.c_str ()); if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); - session = new session_t (io_thread, owner, session_name.c_str (), + session = new session_t (io_thread, owner, peer_identity.c_str (), options); zmq_assert (session); send_plug (session); @@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) // Destroy the init object. term (); - - return true; } -void zmq::zmq_listener_init_t::flush () +void zmq::zmq_listener_init_t::detach () { - // No need to do anything. zmq_listener_init_t does no batching - // of messages. Each message is processed immediately on write. + // TODO: Engine is closing down. Init object is to be closed as well. + zmq_assert (false); } void zmq::zmq_listener_init_t::process_plug () diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp index b061eaa..885b36b 100644 --- a/src/zmq_listener_init.hpp +++ b/src/zmq_listener_init.hpp @@ -49,6 +49,7 @@ namespace zmq bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); + void detach (); // Handlers for incoming commands. void process_plug (); @@ -62,6 +63,10 @@ namespace zmq // Associated socket options. options_t options; + // Indetity on the other end of the connection. + bool has_peer_identity; + std::string peer_identity; + zmq_listener_init_t (const zmq_listener_init_t&); void operator = (const zmq_listener_init_t&); }; -- cgit v1.2.3