summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am8
-rw-r--r--src/app_thread.cpp20
-rw-r--r--src/app_thread.hpp3
-rw-r--r--src/i_endpoint.hpp7
-rw-r--r--src/options.cpp80
-rw-r--r--src/options.hpp3
-rw-r--r--src/p2p.cpp92
-rw-r--r--src/p2p.hpp56
-rw-r--r--src/pipe.cpp28
-rw-r--r--src/pipe.hpp19
-rw-r--r--src/pub.cpp129
-rw-r--r--src/pub.hpp24
-rw-r--r--src/rep.cpp204
-rw-r--r--src/rep.hpp79
-rw-r--r--src/req.cpp206
-rw-r--r--src/req.hpp84
-rw-r--r--src/session.cpp49
-rw-r--r--src/session.hpp6
-rw-r--r--src/socket_base.cpp460
-rw-r--r--src/socket_base.hpp77
-rw-r--r--src/sub.cpp88
-rw-r--r--src/sub.hpp38
-rw-r--r--src/yarray.hpp110
-rw-r--r--src/yarray_item.hpp62
24 files changed, 1460 insertions, 472 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 2701237..f75c3a1 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \
platform.hpp \
poll.hpp \
+ p2p.hpp \
pub.hpp \
+ rep.hpp \
+ req.hpp \
select.hpp \
session.hpp \
simple_semaphore.hpp \
@@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \
uuid.hpp \
windows.hpp \
wire.hpp \
+ yarray.hpp \
+ yarray_item.hpp \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
@@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
+ p2p.cpp \
pipe.cpp \
poll.cpp \
pub.cpp \
+ rep.cpp \
+ req.cpp \
select.cpp \
session.cpp \
socket_base.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 303c6a1..d12b126 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -39,6 +39,9 @@
#include "socket_base.hpp"
#include "pub.hpp"
#include "sub.hpp"
+#include "req.hpp"
+#include "rep.hpp"
+#include "p2p.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_SUB:
s = new sub_t (this);
break;
- case ZMQ_P2P:
case ZMQ_REQ:
+ s = new req_t (this);
+ break;
case ZMQ_REP:
- s = new socket_base_t (this, type_);
+ s = new rep_t (this);
+ break;
+ case ZMQ_P2P:
+ s = new p2p_t (this);
break;
default:
// TODO: This should be EINVAL.
zmq_assert (false);
}
zmq_assert (s);
- s->set_index (sockets.size ());
+
sockets.push_back (s);
+
return s;
}
void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
- int i = socket_->get_index ();
- socket_->set_index (-1);
- sockets [i] = sockets.back ();
- sockets [i]->set_index (i);
- sockets.pop_back ();
+ sockets.erase (socket_);
}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 4fe67fb..14cb8c5 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -24,6 +24,7 @@
#include "stdint.hpp"
#include "object.hpp"
+#include "yarray.hpp"
#include "thread.hpp"
namespace zmq
@@ -67,7 +68,7 @@ namespace zmq
private:
// All the sockets created from this application thread.
- typedef std::vector <class socket_base_t*> sockets_t;
+ typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
// If false, app_thread_t object is not associated with any OS thread.
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index 8ee2984..3bab2a5 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -25,11 +25,12 @@ namespace zmq
struct i_endpoint
{
- virtual void attach_inpipe (class reader_t *pipe_) = 0;
- virtual void attach_outpipe (class writer_t *pipe_) = 0;
- virtual void revive (class reader_t *pipe_) = 0;
+ virtual void attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
+ virtual void kill (class reader_t *pipe_) = 0;
+ virtual void revive (class reader_t *pipe_) = 0;
};
}
diff --git a/src/options.cpp b/src/options.cpp
index 55417f5..b0e6e6e 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -17,7 +17,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "../bindings/c/zmq.h"
+
#include "options.hpp"
+#include "err.hpp"
zmq::options_t::options_t () :
hwm (0),
@@ -29,3 +32,80 @@ zmq::options_t::options_t () :
use_multicast_loop (false)
{
}
+
+int zmq::options_t::setsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ switch (option_) {
+
+ case ZMQ_HWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ hwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_LWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ lwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_SWAP:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ swap = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_AFFINITY:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ affinity = (uint64_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_IDENTITY:
+ identity.assign ((const char*) optval_, optvallen_);
+ return 0;
+
+ case ZMQ_RATE:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rate = (uint32_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_RECOVERY_IVL:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ recovery_ivl = (uint32_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_MCAST_LOOP:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ if ((int64_t) *((int64_t*) optval_) == 0)
+ use_multicast_loop = false;
+ else if ((int64_t) *((int64_t*) optval_) == 1)
+ use_multicast_loop = true;
+ else {
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
diff --git a/src/options.hpp b/src/options.hpp
index c1ecb57..cde144c 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -22,6 +22,7 @@
#include <string>
+#include "stddef.h"
#include "stdint.hpp"
namespace zmq
@@ -31,6 +32,8 @@ namespace zmq
{
options_t ();
+ int setsockopt (int option_, const void *optval_, size_t optvallen_);
+
int64_t hwm;
int64_t lwm;
int64_t swap;
diff --git a/src/p2p.cpp b/src/p2p.cpp
new file mode 100644
index 0000000..537f3ce
--- /dev/null
+++ b/src/p2p.cpp
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "p2p.hpp"
+#include "err.hpp"
+
+zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_P2P)
+{
+}
+
+zmq::p2p_t::~p2p_t ()
+{
+}
+
+bool zmq::p2p_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::p2p_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::p2p_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xflush ()
+{
+ zmq_assert (false);
+}
+
+int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+
diff --git a/src/p2p.hpp b/src/p2p.hpp
new file mode 100644
index 0000000..84790a1
--- /dev/null
+++ b/src/p2p.hpp
@@ -0,0 +1,56 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_P2P_INCLUDED__
+#define __ZMQ_P2P_INCLUDED__
+
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class p2p_t : public socket_base_t
+ {
+ public:
+
+ p2p_t (class app_thread_t *parent_);
+ ~p2p_t ();
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ p2p_t (const p2p_t&);
+ void operator = (const p2p_t&);
+ };
+
+}
+
+#endif
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f4cf0c4..9f70586 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->writer),
hwm (hwm_),
lwm (lwm_),
- index (-1),
endpoint (NULL)
{
}
@@ -39,8 +38,10 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
- if (!pipe->read (msg_))
+ if (!pipe->read (msg_)) {
+ endpoint->kill (this);
return false;
+ }
// If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0;
@@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_;
}
-void zmq::reader_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::reader_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
-}
-
void zmq::reader_t::term ()
{
endpoint = NULL;
@@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->reader),
hwm (hwm_),
lwm (lwm_),
- index (-1),
endpoint (NULL)
{
}
@@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_;
}
-void zmq::writer_t::set_index (int index_)
-{
- index = index_;
-}
-
-int zmq::writer_t::get_index ()
-{
- zmq_assert (index != -1);
- return index;
-}
-
zmq::writer_t::~writer_t ()
{
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index ede73b8..177b1b4 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -24,6 +24,7 @@
#include "stdint.hpp"
#include "i_endpoint.hpp"
+#include "yarray_item.hpp"
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
@@ -31,7 +32,7 @@
namespace zmq
{
- class reader_t : public object_t
+ class reader_t : public object_t, public yarray_item_t
{
public:
@@ -44,10 +45,6 @@ namespace zmq
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
- // Mnaipulation of index of the pipe.
- void set_index (int index_);
- int get_index ();
-
// Ask pipe to terminate.
void term ();
@@ -72,9 +69,6 @@ namespace zmq
uint64_t tail;
uint64_t last_sent_head;
- // Index of the pipe in the socket's list of inbound pipes.
- int index;
-
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -82,7 +76,7 @@ namespace zmq
void operator = (const reader_t&);
};
- class writer_t : public object_t
+ class writer_t : public object_t, public yarray_item_t
{
public:
@@ -104,10 +98,6 @@ namespace zmq
// Flush the messages downsteam.
void flush ();
- // Mnaipulation of index of the pipe.
- void set_index (int index_);
- int get_index ();
-
// Ask pipe to terminate.
void term ();
@@ -130,9 +120,6 @@ namespace zmq
uint64_t head;
uint64_t tail;
- // Index of the pipe in the socket's list of outbound pipes.
- int index;
-
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
diff --git a/src/pub.cpp b/src/pub.cpp
index ca8afae..020d789 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -21,6 +21,8 @@
#include "pub.hpp"
#include "err.hpp"
+#include "msg_content.hpp"
+#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_PUB)
@@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t ()
{
+ for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
+ out_pipes [i]->term ();
+ out_pipes.clear ();
}
-int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_)
+bool zmq::pub_t::xrequires_in ()
+{
+ return false;
+}
+
+bool zmq::pub_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_);
+ out_pipes.push_back (outpipe_);
+}
+
+void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ out_pipes.erase (pipe_);
+}
+
+void zmq::pub_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::pub_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ out_pipes_t::size_type pipes_count = out_pipes.size ();
+
+ // If there are no pipes available, simply drop the message.
+ if (pipes_count == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // First check whether all pipes are available for writing.
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
+ if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
+ out_pipes [i]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [i]->flush ();
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (pipes_count == 1) {
+ out_pipes [0]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [0]->flush ();
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->shared)
+ content->refcnt.add (pipes_count - 1);
+ else {
+ content->refcnt.set (pipes_count);
+ msg_->shared = true;
+ }
+
+ // Push the message to all destinations.
+ for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
+ out_pipes [i]->write (msg_);
+ if (!(flags_ & ZMQ_NOFLUSH))
+ out_pipes [i]->flush ();
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::pub_t::xflush ()
+{
+ out_pipes_t::size_type pipe_count = out_pipes.size ();
+ for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
+ out_pipes [i]->flush ();
+ return 0;
+}
+
+int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
diff --git a/src/pub.hpp b/src/pub.hpp
index 2f03b8e..8255c6f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_PUB_INCLUDED__
#include "socket_base.hpp"
+#include "yarray.hpp"
namespace zmq
{
@@ -32,8 +33,27 @@ namespace zmq
pub_t (class app_thread_t *parent_);
~pub_t ();
- // Overloads of API functions from socket_base_t.
- int recv (struct zmq_msg_t *msg_, int flags_);
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // Outbound pipes, i.e. those the socket is sending messages to.
+ typedef yarray_t <class writer_t> out_pipes_t;
+ out_pipes_t out_pipes;
+
+ pub_t (const pub_t&);
+ void operator = (const pub_t&);
};
}
diff --git a/src/rep.cpp b/src/rep.cpp
new file mode 100644
index 0000000..2fbb66c
--- /dev/null
+++ b/src/rep.cpp
@@ -0,0 +1,204 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "rep.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::rep_t::rep_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_REP),
+ active (0),
+ current (0),
+ waiting_for_reply (false),
+ reply_pipe (NULL)
+{
+}
+
+zmq::rep_t::~rep_t ()
+{
+}
+
+bool zmq::rep_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::rep_t::xrequires_out ()
+{
+ return true;
+}
+
+void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && outpipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+ out_pipes.push_back (outpipe_);
+ out_pipes.swap (active, out_pipes.size () - 1);
+ active++;
+}
+
+void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+
+ // If corresponding outpipe is still in place simply nullify the pointer
+ // to the inpipe and move it to the passive state.
+ if (out_pipes [index]) {
+ in_pipes [index] = NULL;
+ if (in_pipes.index (pipe_) < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ }
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ if (in_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ out_pipes_t::size_type index = out_pipes.index (pipe_);
+
+ // TODO: If the connection we've got the request from disconnects,
+ // there's nowhere to send the reply. DLQ?
+ if (waiting_for_reply && pipe_ == reply_pipe) {
+ zmq_assert (false);
+ }
+
+ // If corresponding inpipe is still in place simply nullify the pointer
+ // to the outpipe.
+ if (in_pipes [index]) {
+ out_pipes [index] = NULL;
+ if (out_pipes.index (pipe_) < active) {
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ }
+ return;
+ }
+
+ // Now both inpipe and outpipe are detached. Remove them from the lists.
+ if (out_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+}
+
+void zmq::rep_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ active++;
+}
+
+int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_)
+{
+ if (!waiting_for_reply) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // TODO: Implement this once queue limits are in-place. If the reply
+ // overloads the buffer, connection should be torn down.
+ zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ reply_pipe->write (msg_);
+ reply_pipe->flush ();
+
+ waiting_for_reply = false;
+ reply_pipe = NULL;
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+}
+
+int zmq::rep_t::xflush ()
+{
+ errno = EFAULT;
+ return -1;
+}
+
+int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ if (waiting_for_reply) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = in_pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched) {
+ reply_pipe = out_pipes [current];
+ waiting_for_reply = true;
+ return 0;
+ }
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+
diff --git a/src/rep.hpp b/src/rep.hpp
new file mode 100644
index 0000000..6e55f47
--- /dev/null
+++ b/src/rep.hpp
@@ -0,0 +1,79 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_REP_INCLUDED__
+#define __ZMQ_REP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class rep_t : public socket_base_t
+ {
+ public:
+
+ rep_t (class app_thread_t *parent_);
+ ~rep_t ();
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
+
+ private:
+
+ // List in outbound and inbound pipes. Note that the two lists are
+ // always in sync. I.e. outpipe with index N communicates with the
+ // same session as inpipe with index N.
+ typedef yarray_t <class writer_t> out_pipes_t;
+ out_pipes_t out_pipes;
+ typedef yarray_t <class reader_t> in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Number of active inpipes. All the active inpipes are located at the
+ // beginning of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Index of the next inbound pipe to read a request from.
+ in_pipes_t::size_type current;
+
+ // If true, request was already received and reply wasn't sent yet.
+ bool waiting_for_reply;
+
+ // Pipe we are going to send reply to.
+ class writer_t *reply_pipe;
+
+ rep_t (const rep_t&);
+ void operator = (const rep_t&);
+
+ };
+
+}
+
+#endif
diff --git a/src/req.cpp b/src/req.cpp
new file mode 100644
index 0000000..05629df
--- /dev/null
+++ b/src/req.cpp
@@ -0,0 +1,206 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either vers