summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp1
-rw-r--r--src/i_engine.hpp5
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp7
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp1
-rw-r--r--src/pgm_receiver.cpp3
-rw-r--r--src/pgm_receiver.hpp2
-rw-r--r--src/pgm_sender.cpp3
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp1
-rw-r--r--src/rep.cpp6
-rw-r--r--src/req.cpp1
-rw-r--r--src/session.cpp127
-rw-r--r--src/session.hpp26
-rw-r--r--src/socket_base.cpp79
-rw-r--r--src/socket_base.hpp14
-rw-r--r--src/sub.cpp1
-rw-r--r--src/upstream.cpp1
-rw-r--r--src/xrep.cpp9
-rw-r--r--src/xreq.cpp1
-rw-r--r--src/zmq_decoder.cpp34
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_engine.cpp5
-rw-r--r--src/zmq_engine.hpp2
-rw-r--r--src/zmq_init.cpp17
-rw-r--r--src/zmq_init.hpp5
34 files changed, 349 insertions, 196 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..2da08e3 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..d64027d 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -42,8 +44,7 @@ namespace zmq
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ virtual void traceroute (const blob_t &identity_) = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..b77af24 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..72bc26b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..b7ca327 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..b01e5b0 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..acbc3fb 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..7041610 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {
// Generate random gsi.
diff --git a/src/pub.cpp b/src/pub.cpp
index 9a2dcc6..05bfdcf 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,7 +27,6 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/rep.cpp b/src/rep.cpp
index b7685b4..8c5b86c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
waiting_for_reply (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REP;
options.requires_in = true;
options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
diff --git a/src/req.cpp b/src/req.cpp
index 9b1766e..c9240e0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
reply_pipe_active (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..f86327e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
+ ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
- if (name_) {
- type = named;
- name = name_;
- ordinal = 0;
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
- ordinal = 0;
+ if (!peer_identity.empty ()) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && peer_identity.empty ())
term ();
}
@@ -120,7 +119,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
@@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
- if (!name.empty ()) {
- bool ok = owner->register_session (name.c_str (), this);
-
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
- }
-
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
-
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
- }
}
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
+ // Unregister the session from the socket.
+ if (ordinal)
owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (name.c_str ());
+ else if (!peer_identity.empty ())
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -232,8 +193,60 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
+ if (!peer_identity.empty ()) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+ }
+
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
+
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
+ }
+
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
diff --git a/src/session.hpp b/src/session.hpp
index 375d095..d412728 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -38,10 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
- // Creates named session. If name is NULL, transient session with
- // auto-generated name is created.
+ // Creates named session.
session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_);
+ const options_t &options_, const blob_t &peer_identity_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -66,7 +64,8 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- void process_attach (struct i_engine *engine_);
+ void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -79,18 +78,13 @@ namespace zmq
struct i_engine *engine;
- enum {
- transient,
- named,
- unnamed
- } type;
-
- // Ordinal of the session (if any).
+ // Session is identified by ordinal in the case when it was created
+ // before connection to the peer was established and thus we are
+ // unaware of peer's identity.
uint64_t ordinal;
- // The name of the session. One that is used to register it with
- // socket-level repository of sessions.
- std::string name;
+ // Identity of the peer.
+ blob_t peer_identity;
// Inherited socket options.
options_t options;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 720e8cd..222b769 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") {
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
@@ -171,7 +175,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, false);
+ in_pipe ? &in_pipe->writer : NULL, options.identity, false);
return 0;
}
@@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ // If 'immediate connect' feature is required, we'll created the pipes
+ // to the session straight away. Otherwise, they'll be created by the
+ // session once the connection is established.
+ if (options.immediate_connect) {
- // Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
- zmq_assert (in_pipe);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
- }
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
+ zmq_assert (in_pipe);
- // Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
- zmq_assert (out_pipe);
- }
+ }
- // Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
- // Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL);
+ }
// Activate the session.
send_plug (session);
@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ // Windows named pipes are not compatible with Winsock API.
+ // There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
@@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender);
+ send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
@@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver);
+ send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
@@ -454,30 +469,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered =
- named_sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered = named_sessions.insert (
+ std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
-
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
@@ -565,7 +579,8 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
attach_pipes (in_pipe_, out_pipe_);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1ad9ed1..a1702a7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,7 +23,6 @@
#include <set>
#include <map>
#include <vector>
-#include <string>
#include "../bindings/c/zmq.h"
@@ -35,6 +34,7 @@
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -78,9 +78,10 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
- bool register_session (const char *name_, class session_t *session_);
- void unregister_session (const char *name_);
- class session_t *find_session (const char *name_);
+ bool register_session (const blob_t &peer_identity_,
+ class session_t *session_);
+ void unregister_session (const blob_t &peer_identity_);
+ class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
@@ -121,7 +122,8 @@ namespace zmq
// Handlers for incoming commands.
void process_own (class owned_t *object_);
- void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
+ const blob_t &peer_identity_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
void process_seqnum ();
@@ -155,7 +157,7 @@ namespace zmq
// within the socket, instead they are used by I/O objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
- typedef std::map <std::string, session_t*> named_sessions_t;
+ typedef std::map <blob_t, session_t*> named_sessions_t;
named_sessions_t named_sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
diff --git a/src/sub.cpp b/src/sub.cpp
index 31ee222..06ed896 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -28,7 +28,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
has_message (false)
{
- options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 390dcbe..bdcd5ef 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -25,7 +25,6 @@
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_UPSTREAM;
options.requires_in = true;
options.requires_out = false;
}
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 67a9a39..328a832 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -25,9 +25,16 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
+
+ // On connect, pipes are created only after initial handshaking.
+ // That way we are aware of the peer's identity when binding to the pipes.
+ options.immediate_connect = false;
+
+ // XREP socket adds identity to inbound messages and strips identity
+ // from the outbound messages.
+ options.traceroute = true;
}
zmq::xrep_t::~xrep_t ()
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 691b66e..a4310f8 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -25,7 +25,6 @@
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f502ffd..20e07bc 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -27,9 +27,7 @@
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL),
- prefix (NULL),
- prefix_size (0)
+ destination (NULL)
{
zmq_msg_init (&in_progress);
@@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
- if (prefix)
- free (prefix);
-
zmq_msg_close (&in_progress);
}
@@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
-void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
- size_t prefix_size_)
+void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
+ prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
@@ -72,15 +63,16 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
+ int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf);
errno_assert (rc == 0);
// Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (!prefix.empty ())
+ memcpy (zmq_msg_data (&in_progress), prefix.data (),
+ prefix.size ());
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- *tmpbuf, &zmq_decoder_t::message_ready);
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) +
+ prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready);
}
return true;
}
@@ -95,14 +87,14 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
+ int rc = zmq_msg_init_size (&in_progress, prefix.size () + size);
errno_assert (rc == 0);
// Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (!prefix.empty ())
+ memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ());
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (),
size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index dfabece..11ee6c2 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -23,6 +23,7 @@
#include "../bindings/c/zmq.h"
#include "decoder.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -41,7 +42,7 @@ namespace zmq
// Once called, all decoded messages will be prefixed by the specified
// prefix.
- void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+ void add_prefix (const blob_t &prefix_);
private:
@@ -53,8 +54,7 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
- void *prefix;
- size_t prefix_size;
+ blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index bda098c..75f3441 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -160,11 +160,10 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
-void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::zmq_engine_t::traceroute (const blob_t &identity_)
{
encoder.trim_prefix ();
- decoder.add_prefix (identity_, identity_size_);
+ decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::error ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 174dd1a..8657e8e 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,7 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 9492caa..9aebad0 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -72,15 +72,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
return false;
// Retreieve the remote identity.
- peer_identity.assign ((const char*) zmq_msg_data (msg_),
+ peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
received = true;
// Once the initial handshaking is over, XREP sockets should start
// tracerouting individual messages.
- if (options.type == ZMQ_XREP)
- engine->traceroute ((unsigned char*) peer_identity.data (),
- peer_identity.size ());
+ if (options.traceroute)
+ engine->traceroute (peer_identity);
return true;
}
@@ -164,11 +163,11 @@ void zmq::zmq_init_t::finalise ()
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
- session = owner->find_session (peer_identity.c_str ());
+ session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
- peer_identity.c_str ());
+ peer_identity);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -182,7 +181,7 @@ void zmq::zmq_init_t::finalise ()
// transient session.
else {
session = new (std::nothrow) session_t (
- choose_io_thread (options.affinity), owner, options, NULL);
+ choose_io_thread (options.affinity), owner, options, blob_t ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -191,8 +190,8 @@ void zmq::zmq_init_t::finalise ()
session->inc_seqnum ();
}
- // No need to increment seqnum as it was laready incremented above.
- send_attach (session, engine, false);
+ // No need to increment seqnum as it was already incremented above.
+ send_attach (session, engine, peer_identity, false);
// Destroy the init object.
engine = NULL;
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index df14293..6f935c2 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp"
@@ -29,6 +27,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -72,7 +71,7 @@ namespace zmq
bool received;
// Identity of the peer socket.
- std::string peer_identity;
+ blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.