summaryrefslogtreecommitdiff
path: root/src/session_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session_base.cpp')
-rw-r--r--src/session_base.cpp45
1 files changed, 23 insertions, 22 deletions
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 32dcd4f..f2ee713 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,7 +28,6 @@
#include "likely.hpp"
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
-#include "vtcp_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
@@ -40,7 +41,6 @@
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
-#include "router.hpp"
#include "pair.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
@@ -88,10 +88,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
- case ZMQ_ROUTER:
- s = new (std::nothrow) router_session_t (io_thread_, connect_,
- socket_, options_, protocol_, address_);
- break;
case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
@@ -116,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- has_linger_timer (false)
+ has_linger_timer (false),
+ send_identity (options_.send_identity),
+ recv_identity (options_.recv_identity)
{
if (protocol_)
protocol = protocol_;
@@ -150,18 +148,33 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::read (msg_t *msg_)
{
+ // First message to send is identity (if required).
+ if (send_identity) {
+ zmq_assert (!(msg_->flags () & msg_t::more));
+ msg_->init_size (options.identity_size);
+ memcpy (msg_->data (), options.identity, options.identity_size);
+ send_identity = false;
+ incomplete_in = false;
+ return 0;
+ }
+
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
+ incomplete_in = msg_->flags () & msg_t::more ? true : false;
- incomplete_in =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
return 0;
}
int zmq::session_base_t::write (msg_t *msg_)
{
+ // First message to receive is identity (if required).
+ if (recv_identity) {
+ msg_->set_flags (msg_t::identity);
+ recv_identity = false;
+ }
+
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
@@ -398,18 +411,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif
-#if defined ZMQ_HAVE_VTCP
- if (protocol == "vtcp") {
-
- vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t (
- io_thread, this, options, address.c_str (),
- wait_);
- alloc_assert (connecter);
- launch_child (connecter);
- return;
- }
-#endif
-
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.