summaryrefslogtreecommitdiff
path: root/src/zmq_listener_init.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 16:24:21 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 16:24:21 +0200
commit2dd501651592baa7f9e49f52e1321ae2b9b4e126 (patch)
treefe5d221061004894eb259304fcdbd4f8092de99d /src/zmq_listener_init.cpp
parent67194267f89d63391288600f127205a2b7a8a5ae (diff)
multiple bugs fixed
Diffstat (limited to 'src/zmq_listener_init.cpp')
-rw-r--r--src/zmq_listener_init.cpp33
1 files changed, 22 insertions, 11 deletions
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 7e2f311..c188030 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -17,8 +17,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <string>
-
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
@@ -27,7 +25,8 @@
zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_) :
owned_t (parent_, owner_),
- options (options_)
+ options (options_),
+ has_peer_identity (false)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
@@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
{
+ // Once we've got peer's identity we aren't interested in subsequent
+ // messages.
+ if (has_peer_identity)
+ return false;
+
// Retreieve the remote identity. We'll use it as a local session name.
- std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
+ has_peer_identity = true;
+ peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
+
+
+ return true;
+}
+
+void zmq::zmq_listener_init_t::flush ()
+{
+ zmq_assert (has_peer_identity);
// Initialisation is done. Disconnect the engine from the init object.
engine->unplug ();
// Have a look whether the session already exists. If it does, attach it
// to the engine. If it doesn't create it first.
- session_t *session = owner->find_session (session_name.c_str ());
+ session_t *session = owner->find_session (peer_identity.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session = new session_t (io_thread, owner, session_name.c_str (),
+ session = new session_t (io_thread, owner, peer_identity.c_str (),
options);
zmq_assert (session);
send_plug (session);
@@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
// Destroy the init object.
term ();
-
- return true;
}
-void zmq::zmq_listener_init_t::flush ()
+void zmq::zmq_listener_init_t::detach ()
{
- // No need to do anything. zmq_listener_init_t does no batching
- // of messages. Each message is processed immediately on write.
+ // TODO: Engine is closing down. Init object is to be closed as well.
+ zmq_assert (false);
}
void zmq::zmq_listener_init_t::process_plug ()