diff options
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/app_thread.cpp | 20 | ||||
-rw-r--r-- | src/app_thread.hpp | 3 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 7 | ||||
-rw-r--r-- | src/options.cpp | 80 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/p2p.cpp | 92 | ||||
-rw-r--r-- | src/p2p.hpp | 56 | ||||
-rw-r--r-- | src/pipe.cpp | 28 | ||||
-rw-r--r-- | src/pipe.hpp | 19 | ||||
-rw-r--r-- | src/pub.cpp | 129 | ||||
-rw-r--r-- | src/pub.hpp | 24 | ||||
-rw-r--r-- | src/rep.cpp | 204 | ||||
-rw-r--r-- | src/rep.hpp | 79 | ||||
-rw-r--r-- | src/req.cpp | 206 | ||||
-rw-r--r-- | src/req.hpp | 84 | ||||
-rw-r--r-- | src/session.cpp | 49 | ||||
-rw-r--r-- | src/session.hpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 460 | ||||
-rw-r--r-- | src/socket_base.hpp | 77 | ||||
-rw-r--r-- | src/sub.cpp | 88 | ||||
-rw-r--r-- | src/sub.hpp | 38 | ||||
-rw-r--r-- | src/yarray.hpp | 110 | ||||
-rw-r--r-- | src/yarray_item.hpp | 62 |
24 files changed, 1460 insertions, 472 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 2701237..f75c3a1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + p2p.hpp \ pub.hpp \ + rep.hpp \ + req.hpp \ select.hpp \ session.hpp \ simple_semaphore.hpp \ @@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \ uuid.hpp \ windows.hpp \ wire.hpp \ + yarray.hpp \ + yarray_item.hpp \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ @@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \ pgm_receiver.cpp \ pgm_sender.cpp \ pgm_socket.cpp \ + p2p.cpp \ pipe.cpp \ poll.cpp \ pub.cpp \ + rep.cpp \ + req.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 303c6a1..d12b126 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -39,6 +39,9 @@ #include "socket_base.hpp" #include "pub.hpp" #include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "p2p.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_SUB: s = new sub_t (this); break; - case ZMQ_P2P: case ZMQ_REQ: + s = new req_t (this); + break; case ZMQ_REP: - s = new socket_base_t (this, type_); + s = new rep_t (this); + break; + case ZMQ_P2P: + s = new p2p_t (this); break; default: // TODO: This should be EINVAL. zmq_assert (false); } zmq_assert (s); - s->set_index (sockets.size ()); + sockets.push_back (s); + return s; } void zmq::app_thread_t::remove_socket (socket_base_t *socket_) { - int i = socket_->get_index (); - socket_->set_index (-1); - sockets [i] = sockets.back (); - sockets [i]->set_index (i); - sockets.pop_back (); + sockets.erase (socket_); } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 4fe67fb..14cb8c5 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -24,6 +24,7 @@ #include "stdint.hpp" #include "object.hpp" +#include "yarray.hpp" #include "thread.hpp" namespace zmq @@ -67,7 +68,7 @@ namespace zmq private: // All the sockets created from this application thread. - typedef std::vector <class socket_base_t*> sockets_t; + typedef yarray_t <socket_base_t> sockets_t; sockets_t sockets; // If false, app_thread_t object is not associated with any OS thread. diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index 8ee2984..3bab2a5 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -25,11 +25,12 @@ namespace zmq struct i_endpoint { - virtual void attach_inpipe (class reader_t *pipe_) = 0; - virtual void attach_outpipe (class writer_t *pipe_) = 0; - virtual void revive (class reader_t *pipe_) = 0; + virtual void attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; + virtual void kill (class reader_t *pipe_) = 0; + virtual void revive (class reader_t *pipe_) = 0; }; } diff --git a/src/options.cpp b/src/options.cpp index 55417f5..b0e6e6e 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -17,7 +17,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "../bindings/c/zmq.h" + #include "options.hpp" +#include "err.hpp" zmq::options_t::options_t () : hwm (0), @@ -29,3 +32,80 @@ zmq::options_t::options_t () : use_multicast_loop (false) { } + +int zmq::options_t::setsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + switch (option_) { + + case ZMQ_HWM: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + hwm = *((int64_t*) optval_); + return 0; + + case ZMQ_LWM: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + lwm = *((int64_t*) optval_); + return 0; + + case ZMQ_SWAP: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + swap = *((int64_t*) optval_); + return 0; + + case ZMQ_AFFINITY: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + affinity = (uint64_t) *((int64_t*) optval_); + return 0; + + case ZMQ_IDENTITY: + identity.assign ((const char*) optval_, optvallen_); + return 0; + + case ZMQ_RATE: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + rate = (uint32_t) *((int64_t*) optval_); + return 0; + + case ZMQ_RECOVERY_IVL: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + recovery_ivl = (uint32_t) *((int64_t*) optval_); + return 0; + + case ZMQ_MCAST_LOOP: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + if ((int64_t) *((int64_t*) optval_) == 0) + use_multicast_loop = false; + else if ((int64_t) *((int64_t*) optval_) == 1) + use_multicast_loop = true; + else { + errno = EINVAL; + return -1; + } + return 0; + } + + errno = EINVAL; + return -1; +} diff --git a/src/options.hpp b/src/options.hpp index c1ecb57..cde144c 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -22,6 +22,7 @@ #include <string> +#include "stddef.h" #include "stdint.hpp" namespace zmq @@ -31,6 +32,8 @@ namespace zmq { options_t (); + int setsockopt (int option_, const void *optval_, size_t optvallen_); + int64_t hwm; int64_t lwm; int64_t swap; diff --git a/src/p2p.cpp b/src/p2p.cpp new file mode 100644 index 0000000..537f3ce --- /dev/null +++ b/src/p2p.cpp @@ -0,0 +1,92 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "p2p.hpp" +#include "err.hpp" + +zmq::p2p_t::p2p_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_P2P) +{ +} + +zmq::p2p_t::~p2p_t () +{ +} + +bool zmq::p2p_t::xrequires_in () +{ + return true; +} + +bool zmq::p2p_t::xrequires_out () +{ + return true; +} + +void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::p2p_t::xflush () +{ + zmq_assert (false); +} + +int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + + diff --git a/src/p2p.hpp b/src/p2p.hpp new file mode 100644 index 0000000..84790a1 --- /dev/null +++ b/src/p2p.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_P2P_INCLUDED__ +#define __ZMQ_P2P_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + + class p2p_t : public socket_base_t + { + public: + + p2p_t (class app_thread_t *parent_); + ~p2p_t (); + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + p2p_t (const p2p_t&); + void operator = (const p2p_t&); + }; + +} + +#endif diff --git a/src/pipe.cpp b/src/pipe.cpp index f4cf0c4..9f70586 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, peer (&pipe_->writer), hwm (hwm_), lwm (lwm_), - index (-1), endpoint (NULL) { } @@ -39,8 +38,10 @@ zmq::reader_t::~reader_t () bool zmq::reader_t::read (zmq_msg_t *msg_) { - if (!pipe->read (msg_)) + if (!pipe->read (msg_)) { + endpoint->kill (this); return false; + } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; @@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) endpoint = endpoint_; } -void zmq::reader_t::set_index (int index_) -{ - index = index_; -} - -int zmq::reader_t::get_index () -{ - zmq_assert (index != -1); - return index; -} - void zmq::reader_t::term () { endpoint = NULL; @@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, peer (&pipe_->reader), hwm (hwm_), lwm (lwm_), - index (-1), endpoint (NULL) { } @@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) endpoint = endpoint_; } -void zmq::writer_t::set_index (int index_) -{ - index = index_; -} - -int zmq::writer_t::get_index () -{ - zmq_assert (index != -1); - return index; -} - zmq::writer_t::~writer_t () { } diff --git a/src/pipe.hpp b/src/pipe.hpp index ede73b8..177b1b4 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -24,6 +24,7 @@ #include "stdint.hpp" #include "i_endpoint.hpp" +#include "yarray_item.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" @@ -31,7 +32,7 @@ namespace zmq { - class reader_t : public object_t + class reader_t : public object_t, public yarray_item_t { public: @@ -44,10 +45,6 @@ namespace zmq // Reads a message to the underlying pipe. bool read (struct zmq_msg_t *msg_); - // Mnaipulation of index of the pipe. - void set_index (int index_); - int get_index (); - // Ask pipe to terminate. void term (); @@ -72,9 +69,6 @@ namespace zmq uint64_t tail; uint64_t last_sent_head; - // Index of the pipe in the socket's list of inbound pipes. - int index; - // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; @@ -82,7 +76,7 @@ namespace zmq void operator = (const reader_t&); }; - class writer_t : public object_t + class writer_t : public object_t, public yarray_item_t { public: @@ -104,10 +98,6 @@ namespace zmq // Flush the messages downsteam. void flush (); - // Mnaipulation of index of the pipe. - void set_index (int index_); - int get_index (); - // Ask pipe to terminate. void term (); @@ -130,9 +120,6 @@ namespace zmq uint64_t head; uint64_t tail; - // Index of the pipe in the socket's list of outbound pipes. - int index; - // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; diff --git a/src/pub.cpp b/src/pub.cpp index ca8afae..020d789 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -21,6 +21,8 @@ #include "pub.hpp" #include "err.hpp" +#include "msg_content.hpp" +#include "pipe.hpp" zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_, ZMQ_PUB) @@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::~pub_t () { + for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) + out_pipes [i]->term (); + out_pipes.clear (); } -int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +bool zmq::pub_t::xrequires_in () +{ + return false; +} + +bool zmq::pub_t::xrequires_out () +{ + return true; +} + +void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_); + out_pipes.push_back (outpipe_); +} + +void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) +{ + out_pipes.erase (pipe_); +} + +void zmq::pub_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::pub_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::pub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + out_pipes_t::size_type pipes_count = out_pipes.size (); + + // If there are no pipes available, simply drop the message. + if (pipes_count == 0) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // First check whether all pipes are available for writing. + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) + if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { + errno = EAGAIN; + return -1; + } + + msg_content_t *content = (msg_content_t*) msg_->content; + + // For VSMs the copying is straighforward. + if (content == (msg_content_t*) ZMQ_VSM) { + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { + out_pipes [i]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [i]->flush (); + } + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // Optimisation for the case when there's only a single pipe + // to send the message to - no refcount adjustment i.e. no atomic + // operations are needed. + if (pipes_count == 1) { + out_pipes [0]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [0]->flush (); + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // There are at least 2 destinations for the message. That means we have + // to deal with reference counting. First add N-1 references to + // the content (we are holding one reference anyway, that's why -1). + if (msg_->shared) + content->refcnt.add (pipes_count - 1); + else { + content->refcnt.set (pipes_count); + msg_->shared = true; + } + + // Push the message to all destinations. + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { + out_pipes [i]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [i]->flush (); + } + + // Detach the original message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::pub_t::xflush () +{ + out_pipes_t::size_type pipe_count = out_pipes.size (); + for (out_pipes_t::size_type i = 0; i != pipe_count; i++) + out_pipes [i]->flush (); + return 0; +} + +int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_) { errno = EFAULT; return -1; diff --git a/src/pub.hpp b/src/pub.hpp index 2f03b8e..8255c6f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -21,6 +21,7 @@ #define __ZMQ_PUB_INCLUDED__ #include "socket_base.hpp" +#include "yarray.hpp" namespace zmq { @@ -32,8 +33,27 @@ namespace zmq pub_t (class app_thread_t *parent_); ~pub_t (); - // Overloads of API functions from socket_base_t. - int recv (struct zmq_msg_t *msg_, int flags_); + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + // Outbound pipes, i.e. those the socket is sending messages to. + typedef yarray_t <class writer_t> out_pipes_t; + out_pipes_t out_pipes; + + pub_t (const pub_t&); + void operator = (const pub_t&); }; } diff --git a/src/rep.cpp b/src/rep.cpp new file mode 100644 index 0000000..2fbb66c --- /dev/null +++ b/src/rep.cpp @@ -0,0 +1,204 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "rep.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::rep_t::rep_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_REP), + active (0), + current (0), + waiting_for_reply (false), + reply_pipe (NULL) +{ +} + +zmq::rep_t::~rep_t () +{ +} + +bool zmq::rep_t::xrequires_in () +{ + return true; +} + +bool zmq::rep_t::xrequires_out () +{ + return true; +} + +void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && outpipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + in_pipes.push_back (inpipe_); + in_pipes.swap (active, in_pipes.size () - 1); + out_pipes.push_back (outpipe_); + out_pipes.swap (active, out_pipes.size () - 1); + active++; +} + +void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + in_pipes_t::size_type index = in_pipes.index (pipe_); + + // If corresponding outpipe is still in place simply nullify the pointer + // to the inpipe and move it to the passive state. + if (out_pipes [index]) { + in_pipes [index] = NULL; + if (in_pipes.index (pipe_) < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + } + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + if (in_pipes.index (pipe_) < active) + active--; + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + out_pipes_t::size_type index = out_pipes.index (pipe_); + + // TODO: If the connection we've got the request from disconnects, + // there's nowhere to send the reply. DLQ? + if (waiting_for_reply && pipe_ == reply_pipe) { + zmq_assert (false); + } + + // If corresponding inpipe is still in place simply nullify the pointer + // to the outpipe. + if (in_pipes [index]) { + out_pipes [index] = NULL; + if (out_pipes.index (pipe_) < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + } + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + if (out_pipes.index (pipe_) < active) + active--; + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::rep_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + in_pipes_t::size_type index = in_pipes.index (pipe_); + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); +} + +void zmq::rep_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes_t::size_type index = in_pipes.index (pipe_); + in_pipes.swap (index, active); + out_pipes.swap (index, active); + active++; +} + +int zmq::rep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + if (!waiting_for_reply) { + errno = EFAULT; + return -1; + } + + // TODO: Implement this once queue limits are in-place. If the reply + // overloads the buffer, connection should be torn down. + zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + reply_pipe->write (msg_); + reply_pipe->flush (); + + waiting_for_reply = false; + reply_pipe = NULL; + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); +} + +int zmq::rep_t::xflush () +{ + errno = EFAULT; + return -1; +} + +int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + if (waiting_for_reply) { + errno = EFAULT; + return -1; + } + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = in_pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) { + reply_pipe = out_pipes [current]; + waiting_for_reply = true; + return 0; + } + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + + diff --git a/src/rep.hpp b/src/rep.hpp new file mode 100644 index 0000000..6e55f47 --- /dev/null +++ b/src/rep.hpp @@ -0,0 +1,79 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_REP_INCLUDED__ +#define __ZMQ_REP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class rep_t : public socket_base_t + { + public: + + rep_t (class app_thread_t *parent_); + ~rep_t (); + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + // List in outbound and inbound pipes. Note that the two lists are + // always in sync. I.e. outpipe with index N communicates with the + // same session as inpipe with index N. + typedef yarray_t <class writer_t> out_pipes_t; + out_pipes_t out_pipes; + typedef yarray_t <class reader_t> in_pipes_t; + in_pipes_t in_pipes; + + // Number of active inpipes. All the active inpipes are located at the + // beginning of the in_pipes array. + in_pipes_t::size_type active; + + // Index of the next inbound pipe to read a request from. + in_pipes_t::size_type current; + + // If true, request was already received and reply wasn't sent yet. + bool waiting_for_reply; + + // Pipe we are going to send reply to. + class writer_t *reply_pipe; + + rep_t (const rep_t&); + void operator = (const rep_t&); + + }; + +} + +#endif diff --git a/src/req.cpp b/src/req.cpp new file mode 100644 index 0000000..05629df --- /dev/null +++ b/src/req.cpp @@ -0,0 +1,206 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either vers |