summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-11 14:09:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commitd13933bc62fce71b5a58118020e0dd3776e79aa9 (patch)
tree6586d5b9cc637dbf8acae4b32d24da9c8e046014
parentee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff)
I/O object hierarchy implemented
-rw-r--r--src/Makefile.am11
-rw-r--r--src/command.hpp4
-rw-r--r--src/connect_session.cpp115
-rw-r--r--src/connect_session.hpp60
-rw-r--r--src/ctx.cpp6
-rw-r--r--src/ctx.hpp2
-rw-r--r--src/fq.cpp21
-rw-r--r--src/fq.hpp11
-rw-r--r--src/i_engine.hpp15
-rw-r--r--src/i_inout.hpp21
-rw-r--r--src/i_terminate_events.hpp38
-rw-r--r--src/io_object.cpp22
-rw-r--r--src/io_object.hpp10
-rw-r--r--src/lb.cpp22
-rw-r--r--src/lb.hpp11
-rw-r--r--src/named_session.cpp87
-rw-r--r--src/named_session.hpp56
-rw-r--r--src/object.cpp26
-rw-r--r--src/object.hpp23
-rw-r--r--src/own.cpp198
-rw-r--r--src/own.hpp132
-rw-r--r--src/owned.cpp77
-rw-r--r--src/owned.hpp94
-rw-r--r--src/pair.cpp28
-rw-r--r--src/pair.hpp7
-rw-r--r--src/pub.cpp26
-rw-r--r--src/pub.hpp8
-rw-r--r--src/pull.cpp14
-rw-r--r--src/pull.hpp17
-rw-r--r--src/push.cpp14
-rw-r--r--src/push.hpp17
-rw-r--r--src/session.cpp161
-rw-r--r--src/session.hpp73
-rw-r--r--src/socket_base.cpp435
-rw-r--r--src/socket_base.hpp68
-rw-r--r--src/sub.cpp12
-rw-r--r--src/sub.hpp11
-rw-r--r--src/transient_session.cpp36
-rw-r--r--src/transient_session.hpp46
-rw-r--r--src/xrep.cpp25
-rw-r--r--src/xrep.hpp12
-rw-r--r--src/xreq.cpp17
-rw-r--r--src/xreq.hpp13
-rw-r--r--src/zmq_connecter.cpp38
-rw-r--r--src/zmq_connecter.hpp24
-rw-r--r--src/zmq_engine.cpp52
-rw-r--r--src/zmq_engine.hpp14
-rw-r--r--src/zmq_init.cpp142
-rw-r--r--src/zmq_init.hpp33
-rw-r--r--src/zmq_listener.cpp19
-rw-r--r--src/zmq_listener.hpp11
51 files changed, 1441 insertions, 994 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 937372f..d7509dc 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,6 +55,7 @@ libzmq_la_SOURCES = \
blob.hpp \
command.hpp \
config.hpp \
+ connect_session.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
@@ -70,15 +71,17 @@ libzmq_la_SOURCES = \
ip.hpp \
i_engine.hpp \
i_poll_events.hpp \
+ i_terminate_events.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
msg_store.hpp \
mutex.hpp \
+ named_session.hpp \
object.hpp \
options.hpp \
- owned.hpp \
+ own.hpp \
pgm_receiver.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
@@ -106,6 +109,7 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ transient_session.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -123,6 +127,7 @@ libzmq_la_SOURCES = \
zmq_listener.hpp \
command.cpp \
ctx.cpp \
+ connect_session.cpp \
devpoll.cpp \
epoll.cpp \
err.cpp \
@@ -134,9 +139,10 @@ libzmq_la_SOURCES = \
kqueue.cpp \
lb.cpp \
msg_store.cpp \
+ named_session.cpp \
object.cpp \
options.cpp \
- owned.cpp \
+ own.cpp \
pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
@@ -160,6 +166,7 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ transient_session.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/command.hpp b/src/command.hpp
index 3d00cd7..a924b4e 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -61,7 +61,7 @@ namespace zmq
// Sent to socket to let it know about the newly created object.
struct {
- class owned_t *object;
+ class own_t *object;
} own;
// Attach the engine to the session.
@@ -104,7 +104,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
- class owned_t *object;
+ class own_t *object;
} term_req;
// Sent by socket to I/O object to start its shutdown.
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
new file mode 100644
index 0000000..5c088f6
--- /dev/null
+++ b/src/connect_session.cpp
@@ -0,0 +1,115 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "connect_session.hpp"
+#include "zmq_connecter.hpp"
+
+zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_t (io_thread_, socket_, options_),
+ protocol (protocol_),
+ address (address_)
+{
+}
+
+zmq::connect_session_t::~connect_session_t ()
+{
+}
+
+void zmq::connect_session_t::process_plug ()
+{
+ // Start connection process immediately.
+ start_connecting ();
+}
+
+void zmq::connect_session_t::start_connecting ()
+{
+ // Create the connecter object.
+
+ // Both TCP and IPC transports are using the same infrastructure.
+ if (protocol == "tcp" || protocol == "ipc") {
+ zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
+ choose_io_thread (options.affinity), this, options,
+ protocol.c_str (), address.c_str ());
+ zmq_assert (connecter);
+ launch_child (connecter);
+ return;
+ }
+
+#if defined ZMQ_HAVE_OPENPGM
+
+ // Both PGM and EPGM transports are using the same infrastructure.
+ if (addr_type == "pgm" || addr_type == "epgm") {
+
+ // For EPGM transport with UDP encapsulation of PGM is used.
+ bool udp_encapsulation = (addr_type == "epgm");
+
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
+ if (options.requires_out) {
+
+ // PGM sender.
+ pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
+ choose_io_thread (options.affinity), options);
+ zmq_assert (pgm_sender);
+
+ int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_sender;
+ return -1;
+ }
+
+ send_attach (this, pgm_sender, blob_t ());
+ }
+ else if (options.requires_in) {
+
+ // PGM receiver.
+ pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
+ choose_io_thread (options.affinity), options);
+ zmq_assert (pgm_receiver);
+
+ int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_receiver;
+ return -1;
+ }
+
+ send_attach (this, pgm_receiver, blob_t ());
+ }
+ else
+ zmq_assert (false);
+
+ return;
+ }
+#endif
+
+ zmq_assert (false);
+}
+
+void zmq::connect_session_t::detach ()
+{
+ // Clean up the mess left over by the failed connection.
+ clean_pipes ();
+
+ // Reconnect.
+ start_connecting ();
+}
+
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
new file mode 100644
index 0000000..8303dda
--- /dev/null
+++ b/src/connect_session.hpp
@@ -0,0 +1,60 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+
+#include <string>
+
+#include "session.hpp"
+
+namespace zmq
+{
+
+ // Connect session contains an address to connect to. On disconnect it
+ // attempts to reconnect.
+
+ class connect_session_t : public session_t
+ {
+ public:
+
+ connect_session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~connect_session_t ();
+
+ // i_inout interface implementation.
+ void detach ();
+
+ private:
+
+ // Start the connection process.
+ void start_connecting ();
+
+ // Command handlers.
+ void process_plug ();
+
+ // Address to connect to.
+ std::string protocol;
+ std::string address;
+ };
+
+}
+
+#endif
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 91157a5..d096b91 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -119,6 +119,7 @@ int zmq::ctx_t::term ()
// We don't even have to synchronise access to data.
zmq_assert (sockets.empty ());
+// TODO: We are accessing the list of zombies in unsynchronised manner here!
// Get rid of remaining zombie sockets.
while (!zombies.empty ()) {
dezombify ();
@@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s;
}
-void zmq::ctx_t::zombify (socket_base_t *socket_)
+void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
{
// Zombification of socket basically means that its ownership is tranferred
// from the application that created it to the context.
@@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::dezombify ()
{
- // Try to dezombify each zombie in the list.
+ // Try to dezombify each zombie in the list. Note that caller is
+ // responsible for calling this method in the slot_sync critical section.
for (zombies_t::size_type i = 0; i != zombies.size ();)
if (zombies [i]->dezombify ()) {
empty_slots.push_back (zombies [i]->get_slot ());
diff --git a/src/ctx.hpp b/src/ctx.hpp
index cb9a2d9..c44cca6 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -58,7 +58,7 @@ namespace zmq
class socket_base_t *create_socket (int type_);
// Make socket a zombie.
- void zombify (socket_base_t *socket_);
+ void zombify_socket (socket_base_t *socket_);
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
diff --git a/src/fq.cpp b/src/fq.cpp
index ddade28..8f6485f 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -22,11 +22,14 @@
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "i_terminate_events.hpp"
-zmq::fq_t::fq_t () :
+zmq::fq_t::fq_t (i_terminate_events *sink_) :
active (0),
current (0),
- more (false)
+ more (false),
+ sink (sink_),
+ terminating (false)
{
}
@@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
+
+ // If we are already terminating, ask the pipe to terminate straight away.
+ if (terminating)
+ pipe_->terminate ();
}
void zmq::fq_t::terminated (reader_t *pipe_)
@@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
-}
-bool zmq::fq_t::has_pipes ()
-{
- return !pipes.empty ();
+ if (terminating && pipes.empty ())
+ sink->terminated ();
}
-void zmq::fq_t::term_pipes ()
+void zmq::fq_t::terminate ()
{
+ terminating = true;
+
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
diff --git a/src/fq.hpp b/src/fq.hpp
index 2e09809..97e9469 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -33,12 +33,11 @@ namespace zmq
{
public:
- fq_t ();
+ fq_t (struct i_terminate_events *sink_);
~fq_t ();
void attach (reader_t *pipe_);
- bool has_pipes ();
- void term_pipes ();
+ void terminate ();
int recv (zmq_msg_t *msg_, int flags_);
bool has_in ();
@@ -64,6 +63,12 @@ namespace zmq
// there are following parts still waiting in the current pipe.
bool more;
+ // Object to send events to.
+ i_terminate_events *sink;
+
+ // If true, termination process is already underway.
+ bool terminating;
+
fq_t (const fq_t&);
void operator = (const fq_t&);
};
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index ea6b850..0ba94f5 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
-#include <stddef.h>
-
namespace zmq
{
@@ -30,18 +28,19 @@ namespace zmq
virtual ~i_engine () {}
// Plug the engine to the session.
- virtual void plug (struct i_inout *inout_) = 0;
+ virtual void plug (class io_thread_t *io_thread_,
+ struct i_inout *inout_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
- // This method is called by the session to signalise that there
- // are messages to send available.
- virtual void revive () = 0;
-
// This method is called by the session to signalise that more
// messages can be written to the pipe.
- virtual void resume_input () = 0;
+ virtual void activate_in () = 0;
+
+ // This method is called by the session to signalise that there
+ // are messages to send available.
+ virtual void activate_out () = 0;
};
}
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index 21d1838..60bc518 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -31,28 +31,17 @@ namespace zmq
{
virtual ~i_inout () {}
- // Engine asks to get a message to send to the network.
+ // Engine asks for a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0;
- // Engine sends the incoming message further on downstream.
+ // Engine received message from the network and sends it further on.
virtual bool write (::zmq_msg_t *msg_) = 0;
- // Flush all the previously written messages downstream.
+ // Flush all the previously written messages.
virtual void flush () = 0;
-
- // Drop all the references to the engine. The parameter is the object
- // to use to reconnect. If reconnection is not required, the argument
- // is set to NULL.
- virtual void detach (class owned_t *reconnecter_) = 0;
- // Returns least loaded I/O thread.
- virtual class io_thread_t *get_io_thread () = 0;
-
- // Return pointer to the owning socket.
- virtual class socket_base_t *get_owner () = 0;
-
- // Return ordinal number of the session.
- virtual uint64_t get_ordinal () = 0;
+ // Engine is dead. Drop all the references to it.
+ virtual void detach () = 0;
};
}
diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp
new file mode 100644
index 0000000..08599ff
--- /dev/null
+++ b/src/i_terminate_events.hpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
+#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
+
+namespace zmq
+{
+
+ // Algorithms such as fair queueing (fq_t) and load balancing (lb_t)
+ // use this interface to communicate termination event to the socket.
+
+ struct i_terminate_events
+ {
+ virtual ~i_terminate_events () {}
+
+ virtual void terminated () = 0;
+ };
+
+}
+
+#endif
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 086f173..b3b45ee 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -21,21 +21,35 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::io_object_t::io_object_t (io_thread_t *io_thread_)
+zmq::io_object_t::io_object_t (io_thread_t *io_thread_) :
+ poller (NULL)
{
- // Retrieve the poller from the thread we are running in.
- poller = io_thread_->get_poller ();
+ if (io_thread_)
+ plug (io_thread_);
}
zmq::io_object_t::~io_object_t ()
{
}
-void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
+void zmq::io_object_t::plug (io_thread_t *io_thread_)
{
+ zmq_assert (io_thread_);
+ zmq_assert (!poller);
+
+ // Retrieve the poller from the thread we are running in.
poller = io_thread_->get_poller ();
}
+void zmq::io_object_t::unplug ()
+{
+ zmq_assert (poller);
+
+ // Forget about old poller in preparation to be migrated
+ // to a different I/O thread.
+ poller = NULL;
+}
+
zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)
{
return poller->add_fd (fd_, this);
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 655e7f5..284e6d1 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -40,15 +40,15 @@ namespace zmq
io_object_t (class io_thread_t *io_thread_ = NULL);
~io_object_t ();
+ // When migrating an object from one I/O thread to another, first
+ // unplug it, then migrate it, then plug it to the new thread.
+ void plug (class io_thread_t *io_thread_);
+ void unplug ();
+
protected:
typedef poller_t::handle_t handle_t;
- // Derived class can init/swap the underlying I/O thread.
- // Caution: Remove all the file descriptors from the old I/O thread
- // before swapping to the new one!
- void set_io_thread (class io_thread_t *io_thread_);
-
// Methods to access underlying poller object.
handle_t add_fd (fd_t fd_);
void rm_fd (handle_t handle_);
diff --git a/src/lb.cpp b/src/lb.cpp
index ccfaaae..2468b48 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -22,11 +22,14 @@
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "i_terminate_events.hpp"
-zmq::lb_t::lb_t () :
+zmq::lb_t::lb_t (i_terminate_events *sink_) :
active (0),
current (0),
- more (false)
+ more (false),
+ sink (sink_),
+ terminating (false)
{
}
@@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
+
+ if (terminating)
+ pipe_->terminate ();
}
-void zmq::lb_t::term_pipes ()
+void zmq::lb_t::terminate ()
{
+ terminating = true;
+
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
void zmq::lb_t::terminated (writer_t *pipe_)
{
- // ???
+ // TODO: ???
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
@@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
-}
-bool zmq::lb_t::has_pipes ()
-{
- return !pipes.empty ();
+ if (terminating && pipes.empty ())
+ sink->terminated ();
}
void zmq::lb_t::activated (writer_t *pipe_)
diff --git a/src/lb.hpp b/src/lb.hpp
index e69385e..cb2ce72 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -32,12 +32,11 @@ namespace zmq
{
public:
- lb_t ();
+ lb_t (struct i_terminate_events *sink_);
~lb_t ();
void attach (writer_t *pipe_);
- void term_pipes ();
- bool has_pipes ();
+ void terminate ();
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
@@ -61,6 +60,12 @@ namespace zmq
// True if last we are in the middle of a multipart message.
bool more;
+ // Object to send events to.
+ struct i_terminate_events *sink;
+
+ // If true, termination process is already underway.
+ bool terminating;
+
lb_t (const lb_t&);
void operator = (const lb_t&);
};
diff --git a/src/named_session.cpp b/src/named_session.cpp
new file mode 100644
index 0000000..d219286
--- /dev/null
+++ b/src/named_session.cpp
@@ -0,0 +1,87 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "named_session.hpp"
+#include "socket_base.hpp"
+
+/*
+zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
+ socket_base_t *socket_, const options_t &options_,
+ const blob_t &name_) :
+ session_t (io_thread_, socket_, options_),
+ name (name_)
+{
+ // Make double sure that the session has valid name.
+ zmq_assert (!name.empty ());
+ zmq_assert (name [0] != 0);
+
+ if (!socket_->register_session (name, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should log the error and drop the
+ // session.
+ zmq_assert (false);
+ }