summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am8
-rw-r--r--src/app_thread.cpp20
-rw-r--r--src/app_thread.hpp3
-rw-r--r--src/i_endpoint.hpp7
-rw-r--r--src/options.cpp80
-rw-r--r--src/options.hpp3
-rw-r--r--src/p2p.cpp92
-rw-r--r--src/p2p.hpp56
-rw-r--r--src/pipe.cpp28
-rw-r--r--src/pipe.hpp19
-rw-r--r--src/pub.cpp129
-rw-r--r--src/pub.hpp24
-rw-r--r--src/rep.cpp204
-rw-r--r--src/rep.hpp79
-rw-r--r--src/req.cpp206
-rw-r--r--src/req.hpp84
-rw-r--r--src/session.cpp49
-rw-r--r--src/session.hpp6
-rw-r--r--src/socket_base.cpp460
-rw-r--r--src/socket_base.hpp77
-rw-r--r--src/sub.cpp88
-rw-r--r--src/sub.hpp38
-rw-r--r--src/yarray.hpp110
-rw-r--r--src/yarray_item.hpp62
24 files changed, 1460 insertions, 472 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 2701237..f75c3a1 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \
platform.hpp \
poll.hpp \
+ p2p.hpp \
pub.hpp \
+ rep.hpp \
+ req.hpp \
select.hpp \
session.hpp \
simple_semaphore.hpp \
@@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \
uuid.hpp \
windows.hpp \
wire.hpp \
+ yarray.hpp \
+ yarray_item.hpp \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
@@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
+ p2p.cpp \
pipe.cpp \
poll.cpp \
pub.cpp \
+ rep.cpp \
+ req.cpp \
select.cpp \
session.cpp \
socket_base.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 303c6a1..d12b126 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -39,6 +39,9 @@
#include "socket_base.hpp"
#include "pub.hpp"
#include "sub.hpp"
+#include "req.hpp"
+#include "rep.hpp"
+#include "p2p.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_SUB:
s = new sub_t (this);
break;
- case ZMQ_P2P:
case ZMQ_REQ:
+ s = new req_t (this);
+ break;
case ZMQ_REP:
- s = new socket_base_t (this, type_);
+ s = new rep_t (this);
+ break;
+ case ZMQ_P2P:
+ s = new p2p_t (this);
break;
default:
// TODO: This should be EINVAL.
zmq_assert (false);
}
zmq_assert (s);
- s->set_index (sockets.size ());
+
sockets.push_back (s);
+
return s;
}
void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
- int i = socket_->get_index ();
- socket_->set_index (-1);
- sockets [i] = sockets.back ();
- sockets [i]->set_index (i);
- sockets.pop_back ();
+ sockets.erase (socket_);
}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 4fe67fb..14cb8c5 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -24,6 +24,7 @@
#include "stdint.hpp"
#include "object.hpp"
+#include "yarray.hpp"
#include "thread.hpp"
namespace zmq
@@ -67,7 +68,7 @@ namespace zmq
private:
// All the sockets created from this application thread.
- typedef std::vector <class socket_base_t*> sockets_t;
+ typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
// If false, app_thread_t object is not associated with any OS thread.
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index 8ee2984..3bab2a5 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -25,11 +25,12 @@ namespace zmq
struct i_endpoint
{
- virtual void attach_inpipe (class reader_t *pipe_) = 0;
- virtual void attach_outpipe (class writer_t *pipe_) = 0;
- virtual void revive (class reader_t *pipe_) = 0;
+ virtual void attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
+ virtual void kill (class reader_t *pipe_) = 0;
+ virtual void revive (class reader_t *pipe_) = 0;
};
}
diff --git a/src/options.cpp b/src/options.cpp
index 55417f5..b0e6e6e 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -17,7 +17,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "../bindings/c/zmq.h"
+
#include "options.hpp"
+#include "err.hpp"
zmq::options_t::options_t () :
hwm (0),
@@ -29,3 +32,80 @@ zmq::options_t::options_t () :
use_multicast_loop (false)
{
}
+
+int zmq::options_t::setsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ switch (option_) {
+
+ case ZMQ_HWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ hwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_LWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ lwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_SWAP:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ swap = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_AFFINITY:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ affinity = (uint64_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_IDENTITY:
+ identity.assign ((const char*) optval_, optvallen_);
+ return 0;
+
+ case ZMQ_RATE:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rate = (uint32_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_RECOVERY_IVL:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ recovery_ivl = (uint32_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_MCAST_LOOP:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ if ((int64_t) *((int64_t*) optval_) == 0)
+ use_multicast_loop = false;
+ else if ((int64_t) *((int64_t*) optval_) == 1)
+ use_multicast_loop = true;
+ else {
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
diff --git a/src/options.hpp b/src/options.hpp
index c1ecb57..cde144c 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -22,6 +22,7 @@
#include <string>
+#include "stddef.h"
#include "stdint.hpp"
namespace zmq
@@ -31,6 +32,8 @@ namespace zmq
{
options_t ();
+ int setsockopt (int option_, const void *optval_, size_t optvallen_);
+
int64_t hwm;
int64_t lwm;
int64_t swap;
diff --git a/src/p2p.cpp b/src/p2p.cpp
new file mode 100644
index 0000000..537f3ce
--- /dev/null
+++ b/src/p2p.cpp
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "p2p.hpp"
+#include "err.hpp"
+
+zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_P2P)
+{
+}
+
+zmq::p2p_t::~p2p_t ()
+{
+}
+
+bool zmq::p2p_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::p2p_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xflush ()
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+
diff --git a/src/p2p.hpp b/src/p2p.hpp
new file mode 100644
index 0000000..84790a1
--- /dev/null
+++ b/src/p2p.hpp
@@ -0,0 +1,56 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_P2P_INCLUDED__
+#define __ZMQ_P2P_INCLUDED__
+
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class p2p_t : public socket_base_t
+ {
+ public:
+
+ p2p_t (class app_thread_t *parent_);
+ ~p2p_t ();
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ p2p_t (const p2p_t&);
+ void operator = (const p2p_t&);
+ };
+
+}
+
+#endif
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f4cf0c4..9f70586 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->writer),
hwm (hwm_),
lwm (lwm_),
- index (-1),
endpoint (NULL)
{
}
@@ -39,8 +38,10 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
- if (!pipe->read (msg_))
+ if (!pipe->read (msg_)) {
+ endpoint->kill (this);
return false;
+ }
// If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0;
@@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_;
}
-void zmq::reader_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::reader_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
-}
-
void zmq::reader_t::term ()
{
endpoint = NULL;
@@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->reader),
hwm (hwm_),
lwm (lwm_),
- index (-1),
endpoint (NULL)
{
}
@@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_;
}
-void zmq::writer_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::writer_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
-}
-
zmq::writer_t::~writer_t ()
{
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index ede73b8..177b1b4 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -24,6 +24,7 @@
#include "stdint.hpp"
#include "i_endpoint.hpp"
+#include "yarray_item.hpp"
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
@@ -31,7 +32,7 @@
namespace zmq
{
- class reader_t : public object_t
+ class reader_t : public object_t, public yarray_item_t
{
public:
@@ -44,10 +45,6 @@ namespace zmq
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
- // Mnaipulation of index of the pipe.
- void set_index (int index_);
- int get_index ();
-
// Ask pipe to terminate.
void term ();
@@ -72,9 +69,6 @@ namespace zmq
uint64_t tail;
uint64_t last_sent_head;
- // Index of the pipe in the socket's list of inbound pipes.
- int index;
-
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -82,7 +76,7 @@ namespace zmq
void operator = (const reader_t&);
};
- class writer_t : public object_t
+ class writer_t : public object_t, public yarray_item_t
{
public:
@@ -104,10 +98,6 @@ namespace zmq
// Flush the messages downsteam.
void flush ();
- // Mnaipulation of index of the pipe.
- void set_index (int index_);
- int get_index ();
-
// Ask pipe to terminate.
void term ();
@@ -130,9 +120,6 @@ namespace zmq
uint64_t head;
uint64_t tail;
- // Index of the pipe in the socket's list of outbound pipes.
- int index;
-
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
diff --git a/src/pub.cpp b/src/pub.cpp
index ca8afae..020d789 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -21,6 +21,8 @@
#include "pub.hpp"
#include "err.hpp"
+#include "msg_content.hpp"
+#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_PUB)
@@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t ()
{
+ for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
+ out_pipes [i]->term ();
+ out_pipes.clear ();
}
-int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_)
+bool zmq::pub_t::xrequires_in ()
+{
+ return false;
+}
+
+bool zmq::pub_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_);
+ out_pipes.push_back (outpipe_);
+}
+
+void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ out_pipes.erase (pipe_);
+}
+
+void zmq::pub_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::pub_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ out_pipes_t::size_type pipes_count = out_pipes.size ();
+
+ // If there are no pipes available, simply drop the message.
+ if (pipes_count == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // First check whether all pipes are available for writing.
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
+ if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
+ out_pipes [i]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [i]->flush ();
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (pipes_count == 1) {
+ out_pipes [0]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [0]->flush ();
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->shared)
+ content->refcnt.add (pipes_count - 1);
+ else {
+ content->refcnt.set (pipes_count);
+ msg_->shared = true;
+ }
+
+ // Push the message to all destinations.
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
+ out_pipes [i]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [i]->flush ();
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::pub_t::xflush ()
+{
+ out_pipes_t::size_type pipe_count = out_pipes.size ();
+ for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
+ out_pipes [i]->flush ();
+ return 0;
+}
+
+int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
diff --git a/src/pub.hpp b/src/pub.hpp
index 2f03b8e..8255c6f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_PUB_INCLUDED__
#include "socket_base.hpp"
+#include "yarray.hpp"
namespace zmq
{
@@ -32,8 +33,27 @@ namespace zmq
pub_t (class app_thread_t *parent_);
~pub_t ();
- // Overloads of API functions from socket_base_t.
- int recv (struct zmq_msg_t *msg_, int flags_);
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // Outbound pipes, i.e. those the socket is sending messages to.
+ typedef yarray_t <class writer_t> out_pipes_t;
+ out_pipes_t out_pipes;
+
+ pub_t (const pub_t&);
+ void operator = (const pub_t&);
};
}
diff --git a/src/rep.cpp b/src/rep.cpp
new file mode 100644
index 0000000..2fbb66c
--- /dev/null
+++ b/src/rep.cpp
@@ -0,0 +1,204 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "rep.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::rep_t::rep_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_REP),
+ active (0),
+ current (0),
+ waiting_for_reply (false),
+ reply_pipe (NULL)
+{
+}
+
+zmq::rep_t::~rep_t ()
+{
+}
+
+bool zmq::rep_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::rep_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && outpipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+ out_pipes.push_back (outpipe_);
+ out_pipes.swap (active, out_pipes.size () - 1);
+ active++;
+}
+
+void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+
+ // If corresponding outpipe is still in place simply nullify the pointer
+ // to the inpipe and move it to the passive state.
+ if (out_pipes [index]) {
+ in_pipes [index] = NULL;
+ if (in_pipes.index (pipe_) < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ }
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ if (in_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ out_pipes_t::size_type index = out_pipes.index (pipe_);
+
+ // TODO: If the connection we've got the request from disconnects,
+ // there's nowhere to send the reply. DLQ?
+ if (waiting_for_reply && pipe_ == reply_pipe) {
+ zmq_assert (false);
+ }
+
+ // If corresponding inpipe is still in place simply nullify the pointer
+ // to the outpipe.
+ if (in_pipes [index]) {
+ out_pipes [index] = NULL;
+ if (out_pipes.index (pipe_) < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ }
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ if (out_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+}
+
+void zmq::rep_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ active++;
+}
+
+int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ if (!waiting_for_reply) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // TODO: Implement this once queue limits are in-place. If the reply
+ // overloads the buffer, connection should be torn down.
+ zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ reply_pipe->write (msg_);
+ reply_pipe->flush ();
+
+ waiting_for_reply = false;
+ reply_pipe = NULL;
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+}
+
+int zmq::rep_t::xflush ()
+{
+ errno = EFAULT;
+ return -1;
+}
+
+int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ if (waiting_for_reply) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = in_pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched) {
+ reply_pipe = out_pipes [current];
+ waiting_for_reply = true;
+ return 0;
+ }
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+
diff --git a/src/rep.hpp b/src/rep.hpp
new file mode 100644
index 0000000..6e55f47
--- /dev/null
+++ b/src/rep.hpp
@@ -0,0 +1,79 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_REP_INCLUDED__
+#define __ZMQ_REP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class rep_t : public socket_base_t
+ {
+ public:
+
+ rep_t (class app_thread_t *parent_);
+ ~rep_t ();
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // List in outbound and inbound pipes. Note that the two lists are
+ // always in sync. I.e. outpipe with index N communicates with the
+ // same session as inpipe with index N.
+ typedef yarray_t <class writer_t> out_pipes_t;
+ out_pipes_t out_pipes;
+ typedef yarray_t <class reader_t> in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Number of active inpipes. All the active inpipes are located at the
+ // beginning of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Index of the next inbound pipe to read a request from.
+ in_pipes_t::size_type current;
+
+ // If true, request was already received and reply wasn't sent yet.
+ bool waiting_for_reply;
+
+ // Pipe we are going to send reply to.
+ class writer_t *reply_pipe;
+
+ rep_t (const rep_t&);
+ void operator = (const rep_t&);
+
+ };
+
+}
+
+#endif
diff --git a/src/req.cpp b/src/req.cpp
new file mode 100644
index 0000000..05629df
--- /dev/null
+++ b/src/req.cpp
@@ -0,0 +1,206 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "req.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::req_t::req_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_REQ),
+ current (0),
+ waiting_for_reply (false),
+ reply_pipe_active (false),
+ reply_pipe (NULL)
+{
+}
+
+zmq::req_t::~req_t ()
+{
+}
+
+bool zmq::req_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::req_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && outpipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes.push_back (inpipe_);
+ out_pipes.push_back (outpipe_);
+}
+
+void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ // TODO: The pipe we are awaiting the reply from is detached. What now?
+ // Return ECONNRESET from subsequent recv?
+ if (waiting_for_reply && pipe_ == reply_pipe) {
+ zmq_assert (false);
+ }
+
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+
+ // If corresponding outpipe is still in place simply nullify the pointer
+ // to the inpipe.
+ if (out_pipes [index]) {
+ in_pipes [index] = NULL;
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ out_pipes_t::size_type index = out_pipes.index (pipe_);
+
+ // If corresponding inpipe is still in place simply nullify the pointer
+ // to the outpipe.
+ if (in_pipes [index]) {
+ out_pipes [index] = NULL;
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::req_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (pipe_ == reply_pipe);
+
+ reply_pipe_active = false;
+}
+
+void zmq::req_t::xrevive (class reader_t *pipe_)
+{
+ // TODO: Actually, misbehaving peer can cause this kind of thing.
+ // Handle it decently, presumably kill the offending connection.
+ zmq_assert (pipe_ == reply_pipe);
+
+ reply_pipe_active = true;
+}
+
+int zmq::req_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::req_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ // If we've sent a request and we still haven't got the reply,
+ // we can't send another request.
+ if (waiting_for_reply) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ if (out_pipes.empty ()) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ current++;
+ if (current >= out_pipes.size ())
+ current = 0;
+
+ // TODO: Infinite loop can result here. Integrate the algorithm with
+ // the active pipes list (i.e. pipe pair that has one pipe missing is
+ // considered to be inactive.
+ while (!in_pipes [current] || !out_pipes [current]) {
+ current++;
+ if (current >= out_pipes.size ())
+ current = 0;
+ }
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ out_pipes [current]->write (msg_);
+ out_pipes [current]->flush ();
+
+ waiting_for_reply = true;
+ reply_pipe = in_pipes [current];
+
+ // We can safely assume that the reply pipe is active as the last time
+ // we've used it we've read the reply and haven't tried to read from it
+ // anymore.
+ reply_pipe_active = true;
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::req_t::xflush ()
+{
+ errno = EFAULT;
+ return -1;
+}
+
+int zmq::req_t::xrecv (struct zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // If request wasn't send, we can't wait for reply.
+ if (!waiting_for_reply) {
+ zmq_msg_init (msg_);
+ errno = EFAULT;
+ return -1;
+ }
+
+ // Get the reply from the reply pipe.
+ if (!reply_pipe_active || !reply_pipe->read (msg_)) {
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ waiting_for_reply = false;
+ reply_pipe = NULL;
+
+ return 0;
+}
+
+
diff --git a/src/req.hpp b/src/req.hpp
new file mode 100644
index 0000000..9158fbe
--- /dev/null
+++ b/src/req.hpp
@@ -0,0 +1,84 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_REQ_INCLUDED__
+#define __ZMQ_REQ_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class req_t : public socket_base_t
+ {
+ public:
+
+ req_t (class app_thread_t *parent_);
+ ~req_t ();
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // List in outbound and inbound pipes. Note that the two lists are
+ // always in sync. I.e. outpipe with index N communicates with the
+ // same session as inpipe with index N.
+ //
+ // TODO: Once we have queue limits in place, list of active outpipes
+ // is to be held (presumably by stacking active outpipes at
+ // the beginning of the array). We don't have to do the same thing for
+ // inpipes, because we know which pipe we want to read the
+ // reply from.
+ typedef yarray_t <class writer_t> out_pipes_t;
+ out_pipes_t out_pipes;
+ typedef yarray_t <class reader_t> in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Req_t load-balances the requests - 'current' points to the session
+ // that's processing the request at the moment.
+ out_pipes_t::size_type current;
+
+ // If true, request was already sent and reply wasn't received yet.
+ bool waiting_for_reply;
+
+ // True, if read can be attempted from the reply pipe.
+ bool reply_pipe_active;
+
+ // Pipe we are awaiting the reply from.
+ class reader_t *reply_pipe;
+
+ req_t (const req_t&);
+ void operator = (const req_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session.cpp b/src/session.cpp
index d455462..b829ae9 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -46,11 +46,7 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!active)
return false;
- bool fetched = in_pipe->read (msg_);
- if (!fetched)
- active = false;
-
- return fetched;
+ return in_pipe->read (msg_);
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
@@ -84,38 +80,45 @@ void zmq::session_t::detach ()
term ();
}
-void zmq::session_t::attach_inpipe (reader_t *pipe_)
+void zmq::session_t::attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
{
- zmq_assert (!in_pipe);
- in_pipe = pipe_;
- active = true;
- in_pipe->set_endpoint (this);
+ if (inpipe_) {
+ zmq_assert (!in_pipe);
+ in_pipe = inpipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+ }
+
+ if (outpipe_) {
+ zmq_assert (!out_pipe);
+ out_pipe = outpipe_;
+ out_pipe->set_endpoint (this);
+ }
}
-void zmq::session_t::attach_outpipe (writer_t *pipe_)
+void zmq::session_t::detach_inpipe (reader_t *pipe_)
{
- zmq_assert (!out_pipe);
- out_pipe = pipe_;
- out_pipe->set_endpoint (this);
+ active = false;
+ in_pipe = NULL;
}
-void zmq::session_t::revive (reader_t *pipe_)
+void zmq::session_t::detach_outpipe (writer_t *pipe_)
{
- zmq_assert (in_pipe == pipe_);
- active = true;
- if (engine)
- engine->revive ();
+ out_pipe = NULL;
}
-void zmq::session_t::detach_inpipe (reader_t *pipe_)
+void zmq::session_t::kill (reader_t *pipe_)
{
active = false;
- in_pipe = NULL;
}
-void zmq::session_t::detach_outpipe (writer_t *pipe_)
+void zmq::session_t::revive (reader_t *pipe_)
{
- out_pipe = NULL;
+ zmq_assert (in_pipe == pipe_);
+ active = true;
+ if (engine)
+ engine->revive ();
}
void zmq::session_t::process_plug ()
diff --git a/src/session.hpp b/src/session.hpp
index 55aa8ea..64900f2 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -44,11 +44,11 @@ namespace zmq
void detach ();
// i_endpoint interface implementation.
- void attach_inpipe (class reader_t *pipe_);
- void attach_outpipe (class writer_t *pipe_);
- void revive (class reader_t *pipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
+ void kill (class reader_t *pipe_);
+ void revive (class reader_t *pipe_);
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bb8e7c9..c669e04 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -27,7 +27,6 @@
#include "dispatcher.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
-#include "msg_content.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
@@ -42,145 +41,28 @@
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
type (type_),
- current (0),
- active (0),
pending_term_acks (0),
ticks (0),
app_thread (parent_),
- shutting_down (false),
- index (-1)
+ shutting_down (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
- shutting_down = true;
-
- // Ask all pipes to terminate.
- for (in_pipes_t::iterator it = in_pipes.begin ();
- it != in_pipes.end (); it++)
- (*it)->term ();
- in_pipes.clear ();
- for (out_pipes_t::iterator it = out_pipes.begin ();
- it != out_pipes.end (); it++)
- (*it)->term ();
- out_pipes.clear ();
-
- while (true) {
-
- // On third pass of the loop there should be no more I/O objects
- // because all connecters and listerners were destroyed during
- // the first pass and all engines delivered by delayed 'own' commands
- // are destroyed during the second pass.
- if (io_objects.empty () && !pending_term_acks)
- break;
-
- // Send termination request to all associated I/O objects.
- for (io_objects_t::iterator it = io_objects.begin ();
- it != io_objects.end (); it++)
- send_term (*it);
-
- // Move the objects to the list of pending term acks.
- pending_term_acks += io_objects.size ();
- io_objects.clear ();
-
- // Process commands till we get all the termination acknowledgements.
- while (pending_term_acks)
- app_thread->process_commands (true, false);
- }
-
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (sessions.empty ());
- sessions_sync.unlock ();
}
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
- switch (option_) {
-
- case ZMQ_HWM:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.hwm = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_LWM:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.lwm = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_SWAP:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.swap = *((int64_t*) optval_);
- return 0;
-
- case ZMQ_AFFINITY:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.affinity = (uint64_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_IDENTITY:
- options.identity.assign ((const char*) optval_, optvallen_);
- return 0;
-
- case ZMQ_SUBSCRIBE:
- case ZMQ_UNSUBSCRIBE:
- errno = EFAULT;
- return -1;
-
- case ZMQ_RATE:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.rate = (uint32_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_RECOVERY_IVL:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
- return 0;
-
- case ZMQ_MCAST_LOOP:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
-
- if ((int64_t) *((int64_t*) optval_) == 0) {
-
- options.use_multicast_loop = false;
-
- } else if ((int64_t) *((int64_t*) optval_) == 1) {
-
- options.use_multicast_loop = true;
-
- } else {
- errno = EINVAL;
- return -1;
- }
- return 0;
-
- default:
- errno = EINVAL;
- return -1;
- }
+ // First, check whether specific socket type overloads the option.
+ int rc = xsetsockopt (option_, optval_, optvallen_);
+ if (rc == 0 || errno != EINVAL)
+ return rc;
+
+ // If the socket type doesn't support the option, pass it to
+ // the generic option parser.
+ return options.setsockopt (option_, optval_, optvallen_);
}
int zmq::socket_base_t::bind (const char *addr_)
@@ -251,23 +133,29 @@ int zmq::socket_base_t::connect (const char *addr_)
options, true);
zmq_assert (session);
- // Create inbound pipe.
- pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
- zmq_assert (in_pipe);
- in_pipe->reader.set_endpoint (this);
- session->attach_outpipe (&in_pipe->writer);
- in_pipes.push_back (&in_pipe->reader);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
-
- // Create outbound pipe.
- pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
- zmq_assert (out_pipe);
- out_pipe->writer.set_endpoint (this);
- session->attach_inpipe (&out_pipe->reader);
- out_pipes.push_back (&out_pipe->writer);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
+
+ // Create inbound pipe, if required.
+ if (xrequires_in ()) {
+ in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+
+ }
+
+ // Create outbound pipe, if required.
+ if (xrequires_out ()) {
+ out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
+
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL);
// Activate the session.
send_plug (session);
@@ -294,6 +182,13 @@ int zmq::socket_base_t::connect (const char *addr_)
#if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm" || addr_type == "udp") {
+ // If the socket type requires bi-directional communication
+ // multicast is not an option (it is uni-directional).
+ if (xrequires_in () && xrequires_out ()) {
+ errno = EFAULT;
+ return -1;
+ }
+
// For udp, pgm transport with udp encapsulation is used.
bool udp_encapsulation = false;
if (addr_type == "udp")
@@ -365,56 +260,61 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
app_thread->process_commands (false, true);
// Try to send the message.
- bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
-
- if (!(flags_ & ZMQ_NOBLOCK)) {
+ int rc = xsend (msg_, flags_);
+ if (rc == 0)
+ return 0;
- // Oops, we couldn't send the message. Wait for the next
- // command, process it and try to send the message again.
- while (!sent) {
- app_thread->process_commands (true, false);
- sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
- }
- }
- else if (!sent) {
- errno = EAGAIN;
+ // In case of non-blocking send we'll simply propagate
+ // the error - including EAGAIN - upwards.
+ if (flags_ & ZMQ_NOBLOCK)
return -1;
- }
+ // Oops, we couldn't send the message. Wait for the next
+ // command, process it and try to send the message again.
+ while (rc != 0) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (true, false);
+ rc = xsend (msg_, flags_);
+ }
return 0;
}
int zmq::socket_base_t::flush ()
{
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++)
- (*it)->flush ();
-
- return 0;
+ return xflush ();
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
+ // Get the message and return immediately if successfull.
+ int rc = xrecv (msg_, flags_);
+ if (rc == 0)
+ return 0;
+
// If the message cannot be fetched immediately, there are two scenarios.
- // For non-blocking recv, commands are processed in case there's a message
- // already waiting we don't know about. If it's not, return EAGAIN.
+ // For non-blocking recv, commands are processed in case there's a revive
+ // command already waiting int a command pipe. If it's not, return EAGAIN.
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
- bool fetched = fetch (msg_);
- if (!fetched) {
- if (flags_ & ZMQ_NOBLOCK) {
- app_thread->process_commands (false, false);
- fetched = fetch (msg_);
- }
- else {
- while (!fetched) {
- app_thread->process_commands (true, false);
- ticks = 0;
- fetched = fetch (msg_);
- }
+ if (flags_ & ZMQ_NOBLOCK) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (false, false);
+ ticks = 0;
+ rc = xrecv (msg_, flags_);
+ }
+ else {
+ while (rc != 0) {
+ if (errno != EAGAIN)
+ return -1;
+ app_thread->process_commands (true, false);
+ ticks = 0;
+ rc = xrecv (msg_, flags_);
}
}
+
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
// because there are messages available all the time. If poll occurs,
@@ -428,12 +328,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
ticks = 0;
}
- if (!fetched) {
- errno = EAGAIN;
- return -1;
- }
-
- return 0;
+ return rc;
}
int zmq::socket_base_t::close ()
@@ -443,6 +338,37 @@ int zmq::socket_base_t::close ()
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
+
+ shutting_down = true;
+
+ while (true) {
+
+ // On third pass of the loop there should be no more I/O objects
+ // because all connecters and listerners were destroyed during
+ // the first pass and all engines delivered by delayed 'own' commands
+ // are destroyed during the second pass.
+ if (io_objects.empty () && !pending_term_acks)
+ break;
+
+ // Send termination request to all associated I/O objects.
+ for (io_objects_t::iterator it = io_objects.begin ();
+ it != io_objects.end (); it++)
+ send_term (*it);
+
+ // Move the objects to the list of pending term acks.
+ pending_term_acks += io_objects.size ();
+ io_objects.clear ();
+
+ // Process commands till we get all the termination acknowledgements.
+ while (pending_term_acks)
+ app_thread->process_commands (true, false);
+ }
+
+ // Check whether there are no session leaks.
+ sessions_sync.lock ();
+ zmq_assert (sessions.empty ());
+ sessions_sync.unlock ();
+
delete this;
// This function must be called after the socket is completely deallocated
@@ -488,68 +414,36 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
-void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
+void zmq::socket_base_t::kill (reader_t *pipe_)
{
- pipe_->set_endpoint (this);
- in_pipes.push_back (pipe_);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
+ xkill (pipe_);
}
-void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
+void zmq::socket_base_t::revive (reader_t *pipe_)
{
- pipe_->set_endpoint (this);
- out_pipes.push_back (pipe_);
- pipe_->set_index (out_pipes.size () - 1);
+ xrevive (pipe_);
}
-void zmq::socket_base_t::revive (reader_t *pipe_)
+void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
{
- // Move the pipe to the list of active pipes.
- in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- in_pipes [index]->set_index (active);
- in_pipes [active]->set_index (index);
- std::swap (in_pipes [index], in_pipes [active]);
- active++;
+ if (inpipe_)
+ inpipe_->set_endpoint (this);
+ if (outpipe_)
+ outpipe_->set_endpoint (this);
+ xattach_pipes (inpipe_, outpipe_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
{
- // Remove the pipe from the list of inbound pipes.
- in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- if (index < active) {
- in_pipes [index]->set_index (active - 1);
- in_pipes [active - 1]->set_index (index);
- std::swap (in_pipes [index], in_pipes [active - 1]);
- active--;
- index = active;
- }
- in_pipes [index]->set_index (in_pipes.size () - 1);
- in_pipes [in_pipes.size () - 1]->set_index (index);
- std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
- in_pipes.pop_back ();
+ xdetach_inpipe (pipe_);
+ pipe_->set_endpoint (NULL); // ?
}
void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
{
- out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index ();
- out_pipes [index]->set_index (out_pipes.size () - 1);
- out_pipes [out_pipes.size () - 1]->set_index (index);
- std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
- out_pipes.pop_back ();
-}
-
-void zmq::socket_base_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::socket_base_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
+ xdetach_outpipe (pipe_);
+ pipe_->set_endpoint (NULL); // ?
}
void zmq::socket_base_t::process_own (owned_t *object_)
@@ -560,10 +454,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
- zmq_assert (in_pipe_);
- attach_inpipe (in_pipe_);
- zmq_assert (out_pipe_);
- attach_outpipe (out_pipe_);
+ attach_pipes (in_pipe_, out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
@@ -593,106 +484,3 @@ void zmq::socket_base_t::process_term_ack ()
pending_term_acks--;
}
-bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
-{
- int pipes_count = out_pipes.size ();
-
- // If there are no pipes available, simply drop the message.
- if (pipes_count == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // First check whether all pipes are available for writing.
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++)
- if (!(*it)->check_write (zmq_msg_size (msg_)))
- return false;
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (out_pipes_t::iterator it = out_pipes.begin ();
- it != out_pipes.end (); it++) {
- (*it)->write (msg_);
- if (flush_)
- (*it)->flush ();
- }
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (pipes_count == 1) {
- (*out_pipes.begin ())->write (msg_);
- if (flush_)
- (*out_pipes.begin ())->flush ();
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return true;
- }
-
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->shared)
- content->refcnt.add (pipes_count - 1);
- else {
- content->refcnt.set (pipes_count);
- msg_->shared = true;
- }
-
- // Push the message to all destinations.
- for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
- it++) {
- (*it)->write (msg_);
- if (flush_)
- (*it)->flush ();
- }
-
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return true;
-}
-
-bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
-
- bool fetched = in_pipes [current]->read (msg_);
-
- // If there's no message in the pipe, move it to the list of
- // non-active pipes.
- if (!fetched) {
- in_pipes [current]->set_index (active - 1);
- in_pipes [active - 1]->set_index (current);
- std::swap (in_pipes [current], in_pipes [active - 1]);
- active--;
- }
-
- current ++;
- if (current >= active)
- current = 0;
-
- if (fetched)
- return true;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- return false;
-}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 398cd32..120c932 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -27,6 +27,7 @@
#include "i_endpoint.hpp"
#include "object.hpp"
+#include "yarray_item.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
@@ -34,41 +35,59 @@
namespace zmq
{
- class socket_base_t : public object_t, public i_endpoint
+ class socket_base_t :
+ public object_t, public i_endpoint, public yarray_item_t
{
public:
socket_base_t (class app_thread_t *parent_, int type_);
- virtual ~socket_base_t ();
// Interface for communication with the API layer.
- virtual int setsockopt (int option_, const void *optval_,
+ int setsockopt (int option_, const void *optval_,
size_t optvallen_);
- virtual int bind (const char *addr_);
- virtual int connect (const char *addr_);
- virtual int send (struct zmq_msg_t *msg_, int flags_);
- virtual int flush ();
- virtual int recv (struct zmq_msg_t *msg_, int flags_);
- virtual int close ();
+ int bind (const char *addr_);
+ int connect (const char *addr_);
+ int send (struct zmq_msg_t *msg_, int flags_);
+ int flush ();
+ int recv (struct zmq_msg_t *msg_, int flags_);
+ int close ();
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
- // thread to 0MQ.
+ // thread to 0MQ. Locking is used instead.
bool register_session (const char *name_, class session_t *session_);
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
// i_endpoint interface implementation.
- void attach_inpipe (class reader_t *pipe_);
- void attach_outpipe (class writer_t *pipe_);
- void revive (class reader_t *pipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
+ void kill (class reader_t *pipe_);
+ void revive (class reader_t *pipe_);
- // Manipulating index in the app_thread's list of sockets.
- void set_index (int index);
- int get_index ();
+ protected:
+
+ // Destructor is protected. Socket is closed using 'close' function.
+ virtual ~socket_base_t ();
+
+ // Pipe management is done by individual socket types.
+ virtual bool xrequires_in () = 0;
+ virtual bool xrequires_out () = 0;
+ virtual void xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_) = 0;
+ virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
+ virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
+ virtual void xkill (class reader_t *pipe_) = 0;
+ virtual void xrevive (class reader_t *pipe_) = 0;
+
+ // Actual algorithms are to be defined by individual socket types.
+ virtual int xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_) = 0;
+ virtual int xsend (struct zmq_msg_t *msg_, int options_) = 0;
+ virtual int xflush () = 0;
+ virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0;
private:
@@ -79,14 +98,6 @@ namespace zmq
void process_term_req (class owned_t *object_);
void process_term_ack ();
- // Attempts to distribute the message to all the outbound pipes.
- // Returns false if not possible because of pipe overflow.
- bool distribute (struct zmq_msg_t *msg_, bool flush_);
-
- // Gets a message from one of the inbound pipes. Implementation of
- // fair queueing.
- bool fetch (struct zmq_msg_t *msg_);
-
// Type of the socket.
int type;
@@ -95,21 +106,6 @@ namespace zmq
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
- // Inbound pipes, i.e. those the socket is getting messages from.
- typedef std::vector <class reader_t*> in_pipes_t;
- in_pipes_t in_pipes;
-
- // Index of the next inbound pipe to read messages from.
- in_pipes_t::size_type current;
-
- // Number of active inbound pipes. Active pipes are stored in the
- // initial section of the in_pipes array.
- in_pipes_t::size_type active;
-
- // Outbound pipes, i.e. those the socket is sending messages to.
- typedef std::vector <class writer_t*> out_pipes_t;
- out_pipes_t out_pipes;
-
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
@@ -138,9 +134,6 @@ namespace zmq
sessions_t sessions;
mutex_t sessions_sync;
- // Index of the socket in the app_thread's list of sockets.
- int index;
-
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};
diff --git a/src/sub.cpp b/src/sub.cpp
index 515a843..73510c6 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -21,18 +21,69 @@
#include "sub.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB),
+ active (0),
+ current (0),
all_count (0)
{
}
zmq::sub_t::~sub_t ()
{
+ for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
+ in_pipes [i]->term ();
+ in_pipes.clear ();
}
-int zmq::sub_t::setsockopt (int option_, const void *optval_,
+bool zmq::sub_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::sub_t::xrequires_out ()
+{
+ return false;
+}
+
+void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!outpipe_);
+ in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+ active++;
+}
+
+void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ if (in_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (pipe_);
+}
+
+void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::sub_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ in_pipes.swap (in_pipes.index (pipe_), active - 1);
+ active--;
+}
+
+void zmq::sub_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes.swap (in_pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
@@ -75,27 +126,28 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return 0;
}
- return socket_base_t::setsockopt (option_, optval_, optvallen_);
+ errno = EINVAL;
+ return -1;
}
-int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_)
+int zmq::sub_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
-int zmq::sub_t::flush ()
+int zmq::sub_t::xflush ()
{
errno = EFAULT;
return -1;
}
-int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
+int zmq::sub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
while (true) {
- // Get a message.
- int rc = socket_base_t::recv (msg_, flags_);
+ // Get a message using fair queueing algorithm.
+ int rc = fq (msg_, flags_);
// If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN)
@@ -131,3 +183,25 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
return 0;
}
}
+
+int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = in_pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
diff --git a/src/sub.hpp b/src/sub.hpp
index 14fa687..29da27a 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -24,6 +24,7 @@
#include <string>
#include "socket_base.hpp"
+#include "yarray.hpp"
namespace zmq
{
@@ -35,14 +36,38 @@ namespace zmq
sub_t (class app_thread_t *parent_);
~sub_t ();
- // Overloads of API functions from socket_base_t.
- int setsockopt (int option_, const void *optval_, size_t optvallen_);
- int send (struct zmq_msg_t *msg_, int flags_);
- int flush ();
- int recv (struct zmq_msg_t *msg_, int flags_);
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
+ // Helper function to return one message choosed using
+ // fair queueing algorithm.
+ int fq (struct zmq_msg_t *msg_, int flags_);
+
+ // Inbound pipes, i.e. those the socket is getting messages from.
+ typedef yarray_t <class reader_t> in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Number of active inbound pipes. Active pipes are stored in the
+ // initial section of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Index of the next inbound pipe to read messages from.
+ in_pipes_t::size_type current;
+
// Number of active "*" subscriptions.
int all_count;
@@ -52,6 +77,9 @@ namespace zmq
// List of all exact match subscriptions.
subscriptions_t topics;
+
+ sub_t (const sub_t&);
+ void operator = (const sub_t&);
};
}
diff --git a/src/yarray.hpp b/src/yarray.hpp
new file mode 100644
index 0000000..b2d3f1d
--- /dev/null
+++ b/src/yarray.hpp
@@ -0,0 +1,110 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_YARRAY_INCLUDED__
+#define __ZMQ_YARRAY_INCLUDED__
+
+#include <vector>
+#include <algorithm>
+
+namespace zmq
+{
+
+ // Fast array implementation with O(1) access to item, insertion and
+ // removal. Yarray stores pointers rather than objects. The objects have
+ // to be derived from yarray_item_t class.
+
+ template <typename T> class yarray_t
+ {
+ public:
+
+ typedef typename std::vector <T*>::size_type size_type;
+
+ inline yarray_t ()
+ {
+ }
+
+ inline ~yarray_t ()
+ {
+ }
+
+ inline size_type size ()
+ {
+ return items.size ();
+ }
+
+ inline bool empty ()
+ {
+ return items.empty ();
+ }
+
+ inline T *&operator [] (size_type index_)
+ {
+ return items [index_];
+ }
+
+ inline void push_back (T *item_)
+ {
+ if (item_)
+ item_->set_yarray_index (items.size ());
+ items.push_back (item_);
+ }
+
+ inline void erase (T *item_) {
+ erase (item_->get_yarray_index ());
+ }
+
+ inline void erase (size_type index_) {
+ if (items.back ())
+ items.back ()->set_yarray_index (index_);
+ items [index_] = items.back ();
+ items.pop_back ();
+ }
+
+ inline void swap (size_type index1_, size_type index2_)
+ {
+ if (items [index1_])
+ items [index1_]->set_yarray_index (index2_);
+ if (items [index2_])
+ items [index2_]->set_yarray_index (index1_);
+ std::swap (items [index1_], items [index2_]);
+ }
+
+ inline void clear ()
+ {
+ items.clear ();
+ }
+
+ inline size_type index (T *item_)
+ {
+ return (size_type) item_->get_yarray_index ();
+ }
+
+ private:
+
+ typedef std::vector <T*> items_t;
+ items_t items;
+
+ yarray_t (const yarray_t&);
+ void operator = (const yarray_t&);
+ };
+
+}
+
+#endif
diff --git a/src/yarray_item.hpp b/src/yarray_item.hpp
new file mode 100644
index 0000000..1de62b8
--- /dev/null
+++ b/src/yarray_item.hpp
@@ -0,0 +1,62 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_YARRAY_ITEM_INCLUDED__
+#define __ZMQ_YARRAY_ITEM_INCLUDED__
+
+namespace zmq
+{
+
+ // Base class for objects stored in yarray. Note that each object can
+ // be stored in at most one yarray.
+
+ class yarray_item_t
+ {
+ public:
+
+ inline yarray_item_t () :
+ yarray_index (-1)
+ {
+ }
+
+ inline ~yarray_item_t ()
+ {
+ }
+
+ inline void set_yarray_index (int index_)
+ {
+ yarray_index = index_;
+ }
+
+ inline int get_yarray_index ()
+ {
+ return yarray_index;
+ }
+
+ private:
+
+ int yarray_index;
+
+ yarray_item_t (const yarray_item_t&);
+ void operator = (const yarray_item_t&);
+ };
+
+}
+
+#endif