diff options
Diffstat (limited to 'src/session_base.cpp')
-rw-r--r-- | src/session_base.cpp | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/src/session_base.cpp b/src/session_base.cpp index 32dcd4f..f2ee713 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -26,7 +28,6 @@ #include "likely.hpp" #include "tcp_connecter.hpp" #include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -40,7 +41,6 @@ #include "xsub.hpp" #include "push.hpp" #include "pull.hpp" -#include "router.hpp" #include "pair.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, @@ -88,10 +88,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) pull_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); - break; case ZMQ_PAIR: s = new (std::nothrow) pair_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); @@ -116,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - has_linger_timer (false) + has_linger_timer (false), + send_identity (options_.send_identity), + recv_identity (options_.recv_identity) { if (protocol_) protocol = protocol_; @@ -150,18 +148,33 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) int zmq::session_base_t::read (msg_t *msg_) { + // First message to send is identity (if required). + if (send_identity) { + zmq_assert (!(msg_->flags () & msg_t::more)); + msg_->init_size (options.identity_size); + memcpy (msg_->data (), options.identity, options.identity_size); + send_identity = false; + incomplete_in = false; + return 0; + } + if (!pipe || !pipe->read (msg_)) { errno = EAGAIN; return -1; } + incomplete_in = msg_->flags () & msg_t::more ? true : false; - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; return 0; } int zmq::session_base_t::write (msg_t *msg_) { + // First message to receive is identity (if required). + if (recv_identity) { + msg_->set_flags (msg_t::identity); + recv_identity = false; + } + if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); @@ -398,18 +411,6 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - - vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( - io_thread, this, options, address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. |