diff options
Diffstat (limited to 'src/zmq_listener_init.cpp')
-rw-r--r-- | src/zmq_listener_init.cpp | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 7e2f311..c188030 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -17,8 +17,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include <string> - #include "zmq_listener_init.hpp" #include "io_thread.hpp" #include "session.hpp" @@ -27,7 +25,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, const options_t &options_) : owned_t (parent_, owner_), - options (options_) + options (options_), + has_peer_identity (false) { // Create associated engine object. engine = new zmq_engine_t (parent_, fd_); @@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_) bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) { + // Once we've got peer's identity we aren't interested in subsequent + // messages. + if (has_peer_identity) + return false; + // Retreieve the remote identity. We'll use it as a local session name. - std::string session_name = std::string ((const char*) zmq_msg_data (msg_), + has_peer_identity = true; + peer_identity.assign ((const char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); + + + return true; +} + +void zmq::zmq_listener_init_t::flush () +{ + zmq_assert (has_peer_identity); // Initialisation is done. Disconnect the engine from the init object. engine->unplug (); // Have a look whether the session already exists. If it does, attach it // to the engine. If it doesn't create it first. - session_t *session = owner->find_session (session_name.c_str ()); + session_t *session = owner->find_session (peer_identity.c_str ()); if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); - session = new session_t (io_thread, owner, session_name.c_str (), + session = new session_t (io_thread, owner, peer_identity.c_str (), options); zmq_assert (session); send_plug (session); @@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) // Destroy the init object. term (); - - return true; } -void zmq::zmq_listener_init_t::flush () +void zmq::zmq_listener_init_t::detach () { - // No need to do anything. zmq_listener_init_t does no batching - // of messages. Each message is processed immediately on write. + // TODO: Engine is closing down. Init object is to be closed as well. + zmq_assert (false); } void zmq::zmq_listener_init_t::process_plug () |