diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/blob.hpp | 33 | ||||
-rw-r--r-- | src/command.cpp | 38 | ||||
-rw-r--r-- | src/command.hpp | 7 | ||||
-rw-r--r-- | src/dispatcher.cpp | 4 | ||||
-rw-r--r-- | src/downstream.cpp | 1 | ||||
-rw-r--r-- | src/i_engine.hpp | 5 | ||||
-rw-r--r-- | src/object.cpp | 67 | ||||
-rw-r--r-- | src/object.hpp | 11 | ||||
-rw-r--r-- | src/options.cpp | 7 | ||||
-rw-r--r-- | src/options.hpp | 17 | ||||
-rw-r--r-- | src/p2p.cpp | 1 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 3 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 2 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 3 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 7 | ||||
-rw-r--r-- | src/pub.cpp | 1 | ||||
-rw-r--r-- | src/rep.cpp | 6 | ||||
-rw-r--r-- | src/req.cpp | 1 | ||||
-rw-r--r-- | src/session.cpp | 127 | ||||
-rw-r--r-- | src/session.hpp | 26 | ||||
-rw-r--r-- | src/socket_base.cpp | 79 | ||||
-rw-r--r-- | src/socket_base.hpp | 14 | ||||
-rw-r--r-- | src/sub.cpp | 1 | ||||
-rw-r--r-- | src/upstream.cpp | 1 | ||||
-rw-r--r-- | src/xrep.cpp | 9 | ||||
-rw-r--r-- | src/xreq.cpp | 1 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 34 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 6 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 5 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 2 | ||||
-rw-r--r-- | src/zmq_init.cpp | 17 | ||||
-rw-r--r-- | src/zmq_init.hpp | 5 |
34 files changed, 349 insertions, 196 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 446b1e2..4146f68 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ command.hpp \ config.hpp \ decoder.hpp \ @@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_init.hpp \ zmq_listener.hpp \ app_thread.cpp \ + command.cpp \ devpoll.cpp \ dispatcher.cpp \ downstream.cpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string <unsigned char> blob_t; + +} + +#endif diff --git a/src/command.cpp b/src/command.cpp new file mode 100644 index 0000000..8bf7ea2 --- /dev/null +++ b/src/command.cpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include "command.hpp" + +void zmq::deallocate_command (command_t *cmd_) +{ + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + case command_t::bind: + if (cmd_->args.bind.peer_identity) + free (cmd_->args.bind.peer_identity); + break; + default: + /* noop */; + } +} diff --git a/src/command.hpp b/src/command.hpp index 469d6ec..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. @@ -73,6 +75,8 @@ namespace zmq struct { class reader_t *in_pipe; class writer_t *out_pipe; + unsigned char peer_identity_size; + unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -107,6 +111,9 @@ namespace zmq } args; }; + // Function to deallocate dynamically allocated components of the command. + void deallocate_command (command_t *cmd_); + } #endif diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 8aafcf8..4233278 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t () while (!pipes.empty ()) delete *pipes.begin (); + // TODO: Deallocate any commands still in the pipes. Keep in mind that + // simple reading from a pipe and deallocating commands won't do as + // command pipe has template parameter D set to true, meaning that + // read may return false even if there are still commands in the pipe. delete [] command_pipes; #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/downstream.cpp b/src/downstream.cpp index 29b0689..2da08e3 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,7 +26,6 @@ zmq::downstream_t::downstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_DOWNSTREAM; options.requires_in = false; options.requires_out = true; } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..d64027d 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@ #include <stddef.h> +#include "blob.hpp" + namespace zmq { @@ -42,8 +44,7 @@ namespace zmq // Start tracing the message route. Engine should add the identity // supplied to all inbound messages and trim identity from all the // outbound messages. - virtual void traceroute (unsigned char *identity_, - size_t identity_size_) = 0; + virtual void traceroute (const blob_t &identity_) = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index a977f39..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::own: process_own (cmd_.args.own.object); - return; + break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + blob_t (cmd_.args.attach.peer_identity, + cmd_.args.attach.peer_identity_size)); process_seqnum (); - return; + break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + blob_t (cmd_.args.bind.peer_identity, + cmd_.args.bind.peer_identity_size)); process_seqnum (); - return; + break; case command_t::pipe_term: process_pipe_term (); @@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::pipe_term_ack: process_pipe_term_ack (); - return; + break; case command_t::term_req: process_term_req (cmd_.args.term_req.object); - return; + break; case command_t::term: process_term (); - return; + break; case command_t::term_ack: process_term_ack (); - return; + break; default: zmq_assert (false); } + + // The assumption here is that each command is processed once only, + // so deallocating it after processing is all right. + deallocate_command (&cmd_); } void zmq::object_t::register_pipe (class pipe_t *pipe_) @@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - bool inc_seqnum_) + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (peer_identity_.empty ()) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.attach.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) + reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_, cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + if (peer_identity_.empty ()) { + cmd.args.bind.peer_identity_size = 0; + cmd.args.bind.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.bind.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.bind.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.bind.peer_identity_size); + memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } @@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -64,10 +65,11 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, const blob_t &peer_identity_, + bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, - bool inc_seqnum_ = true); + const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -81,9 +83,10 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index f9d93d6..b77af24 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,7 +23,6 @@ #include "err.hpp" zmq::options_t::options_t () : - type (-1), hwm (0), lwm (0), swap (0), @@ -34,7 +33,9 @@ zmq::options_t::options_t () : sndbuf (0), rcvbuf (0), requires_in (false), - requires_out (false) + requires_out (false), + immediate_connect (true), + traceroute (false) { } @@ -76,7 +77,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: - identity.assign ((const char*) optval_, optvallen_); + identity.assign ((const unsigned char*) optval_, optvallen_); return 0; case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index dbe3701..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@ #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include <string> - #include "stddef.h" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -34,14 +33,11 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); - // Type of the associated socket. One of the constants defined in zmq.h - int type; - int64_t hwm; int64_t lwm; int64_t swap; uint64_t affinity; - std::string identity; + blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; @@ -59,6 +55,15 @@ namespace zmq // provided by the specific socket type. bool requires_in; bool requires_out; + + // If true, when connecting, pipes are created immediately without + // waiting for the connection to be established. That way the socket + // is not aware of the peer's identity, however, it is able to send + // messages straight away. + bool immediate_connect; + + // If true, socket requires tracerouting the messages. + bool traceroute; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 46bbd0b..72bc26b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : outpipe (NULL), alive (true) { - options.type = ZMQ_P2P; options.requires_in = true; options.requires_out = true; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..b7ca327 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_receiver_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..b01e5b0 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..acbc3fb 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_sender_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..7041610 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (options.identity.size () > 0) { - // Create gsi from identity string. - gsi_base = options.identity; + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); } else { // Generate random gsi. diff --git a/src/pub.cpp b/src/pub.cpp index 9a2dcc6..05bfdcf 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,7 +27,6 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_PUB; options.requires_in = false; options.requires_out = true; } diff --git a/src/rep.cpp b/src/rep.cpp index b7685b4..8c5b86c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : waiting_for_reply (false), reply_pipe (NULL) { - options.type = ZMQ_REP; options.requires_in = true; options.requires_out = true; + + // We don't need immediate connect. We'll be able to send messages + // (replies) only when connection is established and thus requests + // can arrive anyway. + options.immediate_connect = false; } zmq::rep_t::~rep_t () diff --git a/src/req.cpp b/src/req.cpp index 9b1766e..c9240e0 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) : reply_pipe_active (false), reply_pipe (NULL) { - options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } diff --git a/src/session.cpp b/src/session.cpp index 1aece4d..f86327e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, } zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_) : + const options_t &options_, const blob_t &peer_identity_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), + ordinal (0), + peer_identity (peer_identity_), options (options_) { - if (name_) { - type = named; - name = name_; - ordinal = 0; - } - else { - type = transient; - // TODO: Generate unique name here. - ordinal = 0; + if (!peer_identity.empty ()) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && peer_identity.empty ()) term (); } @@ -120,7 +119,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } @@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. - if (!name.empty ()) { - bool ok = owner->register_session (name.c_str (), this); - - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); - } - - // If session is created by 'connect' function, it has the pipes set - // already. Otherwise, it's being created by the listener and the pipes - // are yet to be created. - if (!in_pipe && !out_pipe) { - - pipe_t *inbound = NULL; - pipe_t *outbound = NULL; - - if (options.requires_out) { - inbound = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - } - - if (options.requires_in) { - outbound = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - } - - send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); - } } void zmq::session_t::process_unplug () { - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) + // Unregister the session from the socket. + if (ordinal) owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (name.c_str ()); + else if (!peer_identity.empty ()) + owner->unregister_session (peer_identity); // Ask associated pipes to terminate. if (in_pipe) { @@ -232,8 +193,60 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { + if (!peer_identity.empty ()) { + + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity == peer_identity_); + } + else if (!peer_identity_.empty ()) { + + // Store the peer identity. + peer_identity = peer_identity_; + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } + } + + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + if (options.requires_in && !out_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); + zmq_assert (pipe); + out_pipe = &pipe->writer; + out_pipe->set_endpoint (this); + socket_reader = &pipe->reader; + } + + if (options.requires_out && !in_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); + zmq_assert (pipe); + in_pipe = &pipe->reader; + in_pipe->set_endpoint (this); + socket_writer = &pipe->writer; + } + + if (socket_reader || socket_writer) + send_bind (owner, socket_reader, socket_writer, peer_identity); + + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; diff --git a/src/session.hpp b/src/session.hpp index 375d095..d412728 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,12 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ -#include <string> - #include "i_inout.hpp" #include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" +#include "blob.hpp" namespace zmq { @@ -38,10 +37,9 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_); |