summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
commitcb09c6951e2c4405318b422a1f9213af3e4b6b8a (patch)
treefb5d4dfd6a71745e885b2501f19cfbbb38c6f441 /src
parent2dd501651592baa7f9e49f52e1321ae2b9b4e126 (diff)
pipe deallocation added
Diffstat (limited to 'src')
-rw-r--r--src/command.hpp11
-rw-r--r--src/dispatcher.cpp20
-rw-r--r--src/dispatcher.hpp18
-rw-r--r--src/i_endpoint.hpp2
-rw-r--r--src/object.cpp44
-rw-r--r--src/object.hpp8
-rw-r--r--src/pipe.cpp81
-rw-r--r--src/pipe.hpp30
-rw-r--r--src/session.cpp18
-rw-r--r--src/session.hpp2
-rw-r--r--src/socket_base.cpp80
-rw-r--r--src/socket_base.hpp13
12 files changed, 290 insertions, 37 deletions
diff --git a/src/command.hpp b/src/command.hpp
index d3bad79..d16d4fa 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -40,6 +40,8 @@ namespace zmq
attach,
bind,
revive,
+ pipe_term,
+ pipe_term_ack,
term_req,
term,
term_ack
@@ -78,6 +80,15 @@ namespace zmq
struct {
} revive;
+ // Sent by pipe reader to pipe writer to ask it to terminate
+ // its end of the pipe.
+ struct {
+ } pipe_term;
+
+ // Pipe writer acknowledges pipe_term command.
+ struct {
+ } pipe_term_ack;
+
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index c0f4541..71e20df 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -83,6 +83,10 @@ zmq::dispatcher_t::~dispatcher_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
+ // Deallocate all the orphaned pipes.
+ for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++)
+ delete *it;
+
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
@@ -146,3 +150,19 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
+
+void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
+{
+ pipes_sync.lock ();
+ bool inserted = pipes.insert (pipe_).second;
+ zmq_assert (inserted);
+ pipes_sync.unlock ();
+}
+
+void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
+{
+ pipes_sync.lock ();
+ pipes_t::size_type erased = pipes.erase (pipe_);
+ zmq_assert (erased == 1);
+ pipes_sync.unlock ();
+}
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 08596cb..cb445ef 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_DISPATCHER_HPP_INCLUDED__
#include <vector>
+#include <set>
#include <map>
#include <string>
@@ -85,6 +86,11 @@ namespace zmq
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
+ // All pipes are registered with the dispatcher so that even the
+ // orphaned pipes can be deallocated on the terminal shutdown.
+ void register_pipe (class pipe_t *pipe_);
+ void unregister_pipe (class pipe_t *pipe_);
+
private:
// Returns the app thread associated with the current thread.
@@ -112,6 +118,18 @@ namespace zmq
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
+ // As pipes may reside in orphaned state in particular moments
+ // of the pipe shutdown process, i.e. neither pipe reader nor
+ // pipe writer hold reference to the pipe, we have to hold references
+ // to all pipes in dispatcher so that we can deallocate them
+ // during terminal shutdown even though it conincides with the
+ // pipe being in the orphaned state.
+ typedef std::set <class pipe_t*> pipes_t;
+ pipes_t pipes;
+
+ // Synchronisation of access to the pipes repository.
+ mutex_t pipes_sync;
+
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index bb7409e..14a479e 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -26,6 +26,8 @@ namespace zmq
struct i_endpoint
{
virtual void revive (class reader_t *pipe_) = 0;
+ virtual void detach_inpipe (class reader_t *pipe_) = 0;
+ virtual void detach_outpipe (class writer_t *pipe_) = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index 4d54ebf..b3cf898 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -83,6 +83,14 @@ void zmq::object_t::process_command (command_t &cmd_)
cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
return;
+ case command_t::pipe_term:
+ process_pipe_term ();
+ return;
+
+ case command_t::pipe_term_ack:
+ process_pipe_term_ack ();
+ return;
+
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
return;
@@ -100,6 +108,16 @@ void zmq::object_t::process_command (command_t &cmd_)
}
}
+void zmq::object_t::register_pipe (class pipe_t *pipe_)
+{
+ dispatcher->register_pipe (pipe_);
+}
+
+void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
+{
+ dispatcher->unregister_pipe (pipe_);
+}
+
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
@@ -166,6 +184,22 @@ void zmq::object_t::send_revive (object_t *destination_)
send_command (cmd);
}
+void zmq::object_t::send_pipe_term (writer_t *destination_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::pipe_term;
+ send_command (cmd);
+}
+
+void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::pipe_term_ack;
+ send_command (cmd);
+}
+
void zmq::object_t::send_term_req (socket_base_t *destination_,
owned_t *object_)
{
@@ -223,6 +257,16 @@ void zmq::object_t::process_revive ()
zmq_assert (false);
}
+void zmq::object_t::process_pipe_term ()
+{
+ zmq_assert (false);
+}
+
+void zmq::object_t::process_pipe_term_ack ()
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_term_req (owned_t *object_)
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index 0dbac24..8ce569e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -42,6 +42,10 @@ namespace zmq
int get_thread_slot ();
void process_command (struct command_t &cmd_);
+ // Allow pipe to access corresponding dispatcher functions.
+ void register_pipe (class pipe_t *pipe_);
+ void unregister_pipe (class pipe_t *pipe_);
+
protected:
// Derived object can use following functions to interact with
@@ -60,6 +64,8 @@ namespace zmq
void send_bind (object_t *destination_, class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
void send_revive (class object_t *destination_);
+ void send_pipe_term (class writer_t *destination_);
+ void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
@@ -74,6 +80,8 @@ namespace zmq
virtual void process_bind (class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
virtual void process_revive ();
+ virtual void process_pipe_term ();
+ virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 5016631..3748ae9 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -19,6 +19,8 @@
#include <pthread.h>
+#include <../include/zmq.h>
+
#include "pipe.hpp"
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
@@ -39,9 +41,21 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
- return pipe->read (msg_);
+ if (!pipe->read (msg_))
+ return false;
+
+ // If delimiter was read, start termination process of the pipe.
+ unsigned char *offset = 0;
+ if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
+ if (endpoint)
+ endpoint->detach_inpipe (this);
+ term ();
+ return false;
+ }
// TODO: Adjust the size of the pipe.
+
+ return true;
}
void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
@@ -59,19 +73,48 @@ int zmq::reader_t::get_index ()
return index;
}
+void zmq::reader_t::term ()
+{
+ endpoint = NULL;
+ send_pipe_term (peer);
+}
+
void zmq::reader_t::process_revive ()
{
endpoint->revive (this);
}
+void zmq::reader_t::process_pipe_term_ack ()
+{
+ peer = NULL;
+ delete pipe;
+}
+
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (&pipe_->reader),
hwm (hwm_),
- lwm (lwm_)
+ lwm (lwm_),
+ index (-1),
+ endpoint (NULL)
+{
+}
+
+void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
+{
+ endpoint = endpoint_;
+}
+
+void zmq::writer_t::set_index (int index_)
+{
+ index = index_;
+}
+
+int zmq::writer_t::get_index ()
{
+ return index;
}
zmq::writer_t::~writer_t ()
@@ -99,14 +142,46 @@ void zmq::writer_t::flush ()
send_revive (peer);
}
+void zmq::writer_t::term ()
+{
+ endpoint = NULL;
+
+ // Push delimiter into the pipe.
+ // Trick the compiler to belive that the tag is a valid pointer.
+ zmq_msg_t msg;
+ const unsigned char *offset = 0;
+ msg.content = (void*) (offset + ZMQ_DELIMITER);
+ msg.shared = false;
+ pipe->write (msg);
+ pipe->flush ();
+}
+
+void zmq::writer_t::process_pipe_term ()
+{
+ if (endpoint)
+ endpoint->detach_outpipe (this);
+
+ reader_t *p = peer;
+ peer = NULL;
+ send_pipe_term_ack (p);
+}
+
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, this, hwm_, lwm_),
writer (writer_parent_, this, hwm_, lwm_)
{
+ reader.register_pipe (this);
}
zmq::pipe_t::~pipe_t ()
{
+ // Deallocate all the unread messages in the pipe. We have to do it by
+ // hand because zmq_msg_t is a POD, not a class, so there's no associated
+ // destructor.
+ zmq_msg_t msg;
+ while (read (&msg))
+ zmq_msg_close (&msg);
+
+ reader.unregister_pipe (this);
}
-
diff --git a/src/pipe.hpp b/src/pipe.hpp
index b7593c7..b4e592a 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -39,25 +39,29 @@ namespace zmq
uint64_t hwm_, uint64_t lwm_);
~reader_t ();
+ void set_endpoint (i_endpoint *endpoint_);
+
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
- void set_endpoint (i_endpoint *endpoint_);
-
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
+ // Ask pipe to terminate.
+ void term ();
+
private:
// Command handlers.
void process_revive ();
+ void process_pipe_term_ack ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
- class object_t *peer;
+ class writer_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
@@ -86,6 +90,8 @@ namespace zmq
uint64_t hwm_, uint64_t lwm_);
~writer_t ();
+ void set_endpoint (i_endpoint *endpoint_);
+
// Checks whether message with specified size can be written to the
// pipe. If writing the message would cause high watermark to be
// exceeded, the function returns false.
@@ -98,13 +104,23 @@ namespace zmq
// Flush the messages downsteam.
void flush ();
+ // Mnaipulation of index of the pipe.
+ void set_index (int index_);
+ int get_index ();
+
+ // Ask pipe to terminate.
+ void term ();
+
private:
+ // Command handlers.
+ void process_pipe_term ();
+
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
- class object_t *peer;
+ class reader_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
@@ -114,6 +130,12 @@ namespace zmq
uint64_t head;
uint64_t tail;
+ // Index of the pipe in the socket's list of outbound pipes.
+ int index;
+
+ // Endpoint (either session or socket) the pipe is attached to.
+ i_endpoint *endpoint;
+
writer_t (const writer_t&);
void operator = (const writer_t&);
};
diff --git a/src/session.cpp b/src/session.cpp
index 0b1b947..d667851 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -36,6 +36,11 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq::session_t::~session_t ()
{
+ // Ask associated pipes to terminate.
+ if (in_pipe)
+ in_pipe->term ();
+ if (out_pipe)
+ out_pipe->term ();
}
void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
@@ -49,6 +54,7 @@ void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
{
zmq_assert (!out_pipe);
out_pipe = pipe_;
+ out_pipe->set_endpoint (this);
}
@@ -92,6 +98,17 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
+void zmq::session_t::detach_inpipe (reader_t *pipe_)
+{
+ active = false;
+ in_pipe = NULL;
+}
+
+void zmq::session_t::detach_outpipe (writer_t *pipe_)
+{
+ out_pipe = NULL;
+}
+
void zmq::session_t::process_plug ()
{
// Register the session with the socket.
@@ -112,6 +129,7 @@ void zmq::session_t::process_plug ()
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;
+ out_pipe->set_endpoint (this);
send_bind (owner, this, &outbound->reader, &inbound->writer);
}
diff --git a/src/session.hpp b/src/session.hpp
index 4a0882b..ba5bcdd 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -52,6 +52,8 @@ namespace zmq
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
+ void detach_inpipe (class reader_t *pipe_);
+ void detach_outpipe (class writer_t *pipe_);
// Handlers for incoming commands.
void process_plug ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 68fc82b..e14065b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t ()
{
shutting_down = true;
+ // Ask all pipes to terminate.
+ for (in_pipes_t::iterator it = in_pipes.begin ();
+ it != in_pipes.end (); it++)
+ (*it)->term ();
+ in_pipes.clear ();
+ for (out_pipes_t::iterator it = out_pipes.begin ();
+ it != out_pipes.end (); it++)
+ (*it)->term ();
+ out_pipes.clear ();
+
while (true) {
// On third pass of the loop there should be no more I/O objects
@@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_)
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
session->set_outbound_pipe (&in_pipe->writer);
- in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
- in_pipes.back ().first->set_index (active);
- in_pipes [active].first->set_index (in_pipes.size () - 1);
+ in_pipes.push_back (&in_pipe->reader);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
// Create outbound pipe.
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
+ out_pipe->writer.set_endpoint (this);
session->set_inbound_pipe (&out_pipe->reader);
- out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
+ out_pipes.push_back (&out_pipe->writer);
// Activate the session.
send_plug (session);
@@ -225,7 +236,7 @@ int zmq::socket_base_t::flush ()
{
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
- it->first->flush ();
+ (*it)->flush ();
return 0;
}
@@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- in_pipes [index].first->set_index (active);
- in_pipes [active].first->set_index (index);
+ in_pipes [index]->set_index (active);
+ in_pipes [active]->set_index (index);
std::swap (in_pipes [index], in_pipes [active]);
active++;
}
+void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list of inbound pipes.
+ in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
+ if (index < active) {
+ in_pipes [index]->set_index (active - 1);
+ in_pipes [active - 1]->set_index (index);
+ std::swap (in_pipes [index], in_pipes [active - 1]);
+ active--;
+ index = active;
+ }
+ in_pipes [index]->set_index (in_pipes.size () - 1);
+ in_pipes [in_pipes.size () - 1]->set_index (index);
+ std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
+ in_pipes.pop_back ();
+}
+
+void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
+{
+ out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index ();
+ out_pipes [index]->set_index (out_pipes.size () - 1);
+ out_pipes [out_pipes.size () - 1]->set_index (index);
+ std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
+ out_pipes.pop_back ();
+}
+
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
@@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
{
zmq_assert (in_pipe_);
in_pipe_->set_endpoint (this);
- in_pipes.push_back (std::make_pair (in_pipe_, session_));
- in_pipes.back ().first->set_index (active);
- in_pipes [active].first->set_index (in_pipes.size () - 1);
+ in_pipes.push_back (in_pipe_);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
zmq_assert (out_pipe_);
- out_pipes.push_back (std::make_pair (out_pipe_, session_));
+ out_pipe_->set_endpoint (this);
+ out_pipes.push_back (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
@@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// First check whether all pipes are available for writing.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
- if (!it->first->check_write (zmq_msg_size (msg_)))
+ if (!(*it)->check_write (zmq_msg_size (msg_)))
return false;
msg_content_t *content = (msg_content_t*) msg_->content;
@@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++) {
- it->first->write (msg_);
+ (*it)->write (msg_);
if (flush_)
- it->first->flush ();
+ (*it)->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
@@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
- out_pipes.begin ()->first->write (msg_);
+ (*out_pipes.begin ())->write (msg_);
if (flush_)
- out_pipes.begin ()->first->flush ();
+ (*out_pipes.begin ())->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
@@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// Push the message to all destinations.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++) {
- it->first->write (msg_);
+ (*it)->write (msg_);
if (flush_)
- it->first->flush ();
+ (*it)->flush ();
}
// Detach the original message from the data buffer.
@@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current].first->read (msg_);
+ bool fetched = in_pipes [current]->read (msg_);
// If there's no message in the pipe, move it to the list of
// non-active pipes.
if (!fetched) {
- in_pipes [current].first->set_index (active - 1);
- in_pipes [active - 1].first->set_index (current);
+ in_pipes [current]->set_index (active - 1);
+ in_pipes [active - 1]->set_index (current);
std::swap (in_pipes [current], in_pipes [active - 1]);
active--;
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1f04bda..490c09a 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -24,7 +24,6 @@
#include <map>
#include <vector>
#include <string>
-#include <utility>
#include "i_endpoint.hpp"
#include "object.hpp"
@@ -62,6 +61,8 @@ namespace zmq
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
+ void detach_inpipe (class reader_t *pipe_);
+ void detach_outpipe (class writer_t *pipe_);
private:
@@ -86,10 +87,7 @@ namespace zmq
io_objects_t io_objects;
// Inbound pipes, i.e. those the socket is getting messages from.
- // The second member in the pair indicates the object on the other
- // side of the pipe.
- typedef std::vector <std::pair <class reader_t*, owned_t*> >
- in_pipes_t;
+ typedef std::vector <class reader_t*> in_pipes_t;
in_pipes_t in_pipes;
// Index of the next inbound pipe to read messages from.
@@ -100,10 +98,7 @@ namespace zmq
in_pipes_t::size_type active;
// Outbound pipes, i.e. those the socket is sending messages to.
- // The second member in the pair indicates the object on the other
- // side of the pipe.
- typedef std::vector <std::pair <class writer_t*, owned_t*> >
- out_pipes_t;
+ typedef std::vector <class writer_t*> out_pipes_t;
out_pipes_t out_pipes;
// Number of I/O objects that were already asked to terminate