From 5daa0dec0f473c84aab6b12cdc5541335e74f07f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Sep 2009 13:20:24 +0200 Subject: ZMQII-5: Only one consumer in a process gets the message --- src/session.cpp | 23 ++++++++++++++--------- src/zmq_connecter_init.cpp | 4 +++- src/zmq_engine.cpp | 2 ++ src/zmq_listener_init.cpp | 4 +++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/session.cpp b/src/session.cpp index bc334e0..31c6354 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -72,9 +72,9 @@ void zmq::session_t::detach () // Engine is terminating itself. engine = NULL; - // TODO: In the case od anonymous connection, terminate the session. -// if (anonymous) -// term (); + // In the case od anonymous connection, terminate the session. + if (name.empty ()) + term (); } void zmq::session_t::attach_inpipe (reader_t *pipe_) @@ -114,11 +114,13 @@ void zmq::session_t::detach_outpipe (writer_t *pipe_) void zmq::session_t::process_plug () { // Register the session with the socket. - bool ok = owner->register_session (name.c_str (), this); + if (!name.empty ()) { + bool ok = owner->register_session (name.c_str (), this); - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); + // There's already a session with the specified identity. + // We should syslog it and drop the session. TODO + zmq_assert (ok); + } // If session is created by 'connect' function, it has the pipes set // already. Otherwise, it's being created by the listener and the pipes @@ -141,8 +143,10 @@ void zmq::session_t::process_plug () void zmq::session_t::process_unplug () { // Unregister the session from the socket. - bool ok = owner->unregister_session (name.c_str ()); - zmq_assert (ok); + if (!name.empty ()) { + bool ok = owner->unregister_session (name.c_str ()); + zmq_assert (ok); + } // Ask associated pipes to terminate. if (in_pipe) { @@ -163,6 +167,7 @@ void zmq::session_t::process_unplug () void zmq::session_t::process_attach (i_engine *engine_) { + zmq_assert (!engine); zmq_assert (engine_); engine = engine_; engine->plug (this); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 4e996d8..730077a 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -55,7 +55,9 @@ bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) // Find the session associated with this connecter. If it doesn't exist // drop the newly created connection. If it does, attach it to the // connection. - session_t *session = owner->find_session (session_name.c_str ()); + session_t *session = NULL; + if (!session_name.empty ()) + session = owner->find_session (session_name.c_str ()); if (!session) { // TODO zmq_assert (false); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 3ed9ba5..2c5eb1c 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -50,6 +50,8 @@ zmq::zmq_engine_t::~zmq_engine_t () void zmq::zmq_engine_t::plug (i_inout *inout_) { + zmq_assert (!inout); + encoder.set_inout (inout_); decoder.set_inout (inout_); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 5d2dc2e..98a3780 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -70,7 +70,9 @@ void zmq::zmq_listener_init_t::flush () // Have a look whether the session already exists. If it does, attach it // to the engine. If it doesn't create it first. - session_t *session = owner->find_session (peer_identity.c_str ()); + session_t *session = NULL; + if (!peer_identity.empty ()) + session = owner->find_session (peer_identity.c_str ()); if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); session = new session_t (io_thread, owner, peer_identity.c_str (), -- cgit v1.2.3