diff options
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/app_thread.cpp | 20 | ||||
-rw-r--r-- | src/app_thread.hpp | 3 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 7 | ||||
-rw-r--r-- | src/options.cpp | 80 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/p2p.cpp | 92 | ||||
-rw-r--r-- | src/p2p.hpp | 56 | ||||
-rw-r--r-- | src/pipe.cpp | 28 | ||||
-rw-r--r-- | src/pipe.hpp | 19 | ||||
-rw-r--r-- | src/pub.cpp | 129 | ||||
-rw-r--r-- | src/pub.hpp | 24 | ||||
-rw-r--r-- | src/rep.cpp | 204 | ||||
-rw-r--r-- | src/rep.hpp | 79 | ||||
-rw-r--r-- | src/req.cpp | 206 | ||||
-rw-r--r-- | src/req.hpp | 84 | ||||
-rw-r--r-- | src/session.cpp | 49 | ||||
-rw-r--r-- | src/session.hpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 460 | ||||
-rw-r--r-- | src/socket_base.hpp | 77 | ||||
-rw-r--r-- | src/sub.cpp | 88 | ||||
-rw-r--r-- | src/sub.hpp | 38 | ||||
-rw-r--r-- | src/yarray.hpp | 110 | ||||
-rw-r--r-- | src/yarray_item.hpp | 62 |
24 files changed, 1460 insertions, 472 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 2701237..f75c3a1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + p2p.hpp \ pub.hpp \ + rep.hpp \ + req.hpp \ select.hpp \ session.hpp \ simple_semaphore.hpp \ @@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \ uuid.hpp \ windows.hpp \ wire.hpp \ + yarray.hpp \ + yarray_item.hpp \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ @@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \ pgm_receiver.cpp \ pgm_sender.cpp \ pgm_socket.cpp \ + p2p.cpp \ pipe.cpp \ poll.cpp \ pub.cpp \ + rep.cpp \ + req.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 303c6a1..d12b126 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -39,6 +39,9 @@ #include "socket_base.hpp" #include "pub.hpp" #include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "p2p.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_SUB: s = new sub_t (this); break; - case ZMQ_P2P: case ZMQ_REQ: + s = new req_t (this); + break; case ZMQ_REP: - s = new socket_base_t (this, type_); + s = new rep_t (this); + break; + case ZMQ_P2P: + s = new p2p_t (this); break; default: // TODO: This should be EINVAL. zmq_assert (false); } zmq_assert (s); - s->set_index (sockets.size ()); + sockets.push_back (s); + return s; } void zmq::app_thread_t::remove_socket (socket_base_t *socket_) { - int i = socket_->get_index (); - socket_->set_index (-1); - sockets [i] = sockets.back (); - sockets [i]->set_index (i); - sockets.pop_back (); + sockets.erase (socket_); } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 4fe67fb..14cb8c5 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -24,6 +24,7 @@ #include "stdint.hpp" #include "object.hpp" +#include "yarray.hpp" #include "thread.hpp" namespace zmq @@ -67,7 +68,7 @@ namespace zmq private: // All the sockets created from this application thread. - typedef std::vector <class socket_base_t*> sockets_t; + typedef yarray_t <socket_base_t> sockets_t; sockets_t sockets; // If false, app_thread_t object is not associated with any OS thread. diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index 8ee2984..3bab2a5 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -25,11 +25,12 @@ namespace zmq struct i_endpoint { - virtual void attach_inpipe (class reader_t *pipe_) = 0; - virtual void attach_outpipe (class writer_t *pipe_) = 0; - virtual void revive (class reader_t *pipe_) = 0; + virtual void attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; + virtual void kill (class reader_t *pipe_) = 0; + virtual void revive (class reader_t *pipe_) = 0; }; } diff --git a/src/options.cpp b/src/options.cpp index 55417f5..b0e6e6e 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -17,7 +17,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "../bindings/c/zmq.h" + #include "options.hpp" +#include "err.hpp" zmq::options_t::options_t () : hwm (0), @@ -29,3 +32,80 @@ zmq::options_t::options_t () : use_multicast_loop (false) { } + +int zmq::options_t::setsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + switch (option_) { + + case ZMQ_HWM: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + hwm = *((int64_t*) optval_); + return 0; + + case ZMQ_LWM: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + lwm = *((int64_t*) optval_); + return 0; + + case ZMQ_SWAP: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + swap = *((int64_t*) optval_); + return 0; + + case ZMQ_AFFINITY: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + affinity = (uint64_t) *((int64_t*) optval_); + return 0; + + case ZMQ_IDENTITY: + identity.assign ((const char*) optval_, optvallen_); + return 0; + + case ZMQ_RATE: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + rate = (uint32_t) *((int64_t*) optval_); + return 0; + + case ZMQ_RECOVERY_IVL: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + recovery_ivl = (uint32_t) *((int64_t*) optval_); + return 0; + + case ZMQ_MCAST_LOOP: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + if ((int64_t) *((int64_t*) optval_) == 0) + use_multicast_loop = false; + else if ((int64_t) *((int64_t*) optval_) == 1) + use_multicast_loop = true; + else { + errno = EINVAL; + return -1; + } + return 0; + } + + errno = EINVAL; + return -1; +} diff --git a/src/options.hpp b/src/options.hpp index c1ecb57..cde144c 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -22,6 +22,7 @@ #include <string> +#include "stddef.h" #include "stdint.hpp" namespace zmq @@ -31,6 +32,8 @@ namespace zmq { options_t (); + int setsockopt (int option_, const void *optval_, size_t optvallen_); + int64_t hwm; int64_t lwm; int64_t swap; diff --git a/src/p2p.cpp b/src/p2p.cpp new file mode 100644 index 0000000..537f3ce --- /dev/null +++ b/src/p2p.cpp @@ -0,0 +1,92 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "p2p.hpp" +#include "err.hpp" + +zmq::p2p_t::p2p_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_P2P) +{ +} + +zmq::p2p_t::~p2p_t () +{ +} + +bool zmq::p2p_t::xrequires_in () +{ + return true; +} + +bool zmq::p2p_t::xrequires_out () +{ + return true; +} + +void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::p2p_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::p2p_t::xflush () +{ + zmq_assert (false); +} + +int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + + diff --git a/src/p2p.hpp b/src/p2p.hpp new file mode 100644 index 0000000..84790a1 --- /dev/null +++ b/src/p2p.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_P2P_INCLUDED__ +#define __ZMQ_P2P_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + + class p2p_t : public socket_base_t + { + public: + + p2p_t (class app_thread_t *parent_); + ~p2p_t (); + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + p2p_t (const p2p_t&); + void operator = (const p2p_t&); + }; + +} + +#endif diff --git a/src/pipe.cpp b/src/pipe.cpp index f4cf0c4..9f70586 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, peer (&pipe_->writer), hwm (hwm_), lwm (lwm_), - index (-1), endpoint (NULL) { } @@ -39,8 +38,10 @@ zmq::reader_t::~reader_t () bool zmq::reader_t::read (zmq_msg_t *msg_) { - if (!pipe->read (msg_)) + if (!pipe->read (msg_)) { + endpoint->kill (this); return false; + } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; @@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) endpoint = endpoint_; } -void zmq::reader_t::set_index (int index_) -{ - index = index_; -} - -int zmq::reader_t::get_index () -{ - zmq_assert (index != -1); - return index; -} - void zmq::reader_t::term () { endpoint = NULL; @@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, peer (&pipe_->reader), hwm (hwm_), lwm (lwm_), - index (-1), endpoint (NULL) { } @@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) endpoint = endpoint_; } -void zmq::writer_t::set_index (int index_) -{ - index = index_; -} - -int zmq::writer_t::get_index () -{ - zmq_assert (index != -1); - return index; -} - zmq::writer_t::~writer_t () { } diff --git a/src/pipe.hpp b/src/pipe.hpp index ede73b8..177b1b4 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -24,6 +24,7 @@ #include "stdint.hpp" #include "i_endpoint.hpp" +#include "yarray_item.hpp" #include "ypipe.hpp" #include "config.hpp" #include "object.hpp" @@ -31,7 +32,7 @@ namespace zmq { - class reader_t : public object_t + class reader_t : public object_t, public yarray_item_t { public: @@ -44,10 +45,6 @@ namespace zmq // Reads a message to the underlying pipe. bool read (struct zmq_msg_t *msg_); - // Mnaipulation of index of the pipe. - void set_index (int index_); - int get_index (); - // Ask pipe to terminate. void term (); @@ -72,9 +69,6 @@ namespace zmq uint64_t tail; uint64_t last_sent_head; - // Index of the pipe in the socket's list of inbound pipes. - int index; - // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; @@ -82,7 +76,7 @@ namespace zmq void operator = (const reader_t&); }; - class writer_t : public object_t + class writer_t : public object_t, public yarray_item_t { public: @@ -104,10 +98,6 @@ namespace zmq // Flush the messages downsteam. void flush (); - // Mnaipulation of index of the pipe. - void set_index (int index_); - int get_index (); - // Ask pipe to terminate. void term (); @@ -130,9 +120,6 @@ namespace zmq uint64_t head; uint64_t tail; - // Index of the pipe in the socket's list of outbound pipes. - int index; - // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; diff --git a/src/pub.cpp b/src/pub.cpp index ca8afae..020d789 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -21,6 +21,8 @@ #include "pub.hpp" #include "err.hpp" +#include "msg_content.hpp" +#include "pipe.hpp" zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_, ZMQ_PUB) @@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::~pub_t () { + for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) + out_pipes [i]->term (); + out_pipes.clear (); } -int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +bool zmq::pub_t::xrequires_in () +{ + return false; +} + +bool zmq::pub_t::xrequires_out () +{ + return true; +} + +void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_); + out_pipes.push_back (outpipe_); +} + +void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) +{ + out_pipes.erase (pipe_); +} + +void zmq::pub_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::pub_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::pub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + out_pipes_t::size_type pipes_count = out_pipes.size (); + + // If there are no pipes available, simply drop the message. + if (pipes_count == 0) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // First check whether all pipes are available for writing. + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) + if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { + errno = EAGAIN; + return -1; + } + + msg_content_t *content = (msg_content_t*) msg_->content; + + // For VSMs the copying is straighforward. + if (content == (msg_content_t*) ZMQ_VSM) { + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { + out_pipes [i]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [i]->flush (); + } + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // Optimisation for the case when there's only a single pipe + // to send the message to - no refcount adjustment i.e. no atomic + // operations are needed. + if (pipes_count == 1) { + out_pipes [0]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [0]->flush (); + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // There are at least 2 destinations for the message. That means we have + // to deal with reference counting. First add N-1 references to + // the content (we are holding one reference anyway, that's why -1). + if (msg_->shared) + content->refcnt.add (pipes_count - 1); + else { + content->refcnt.set (pipes_count); + msg_->shared = true; + } + + // Push the message to all destinations. + for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { + out_pipes [i]->write (msg_); + if (!(flags_ & ZMQ_NOFLUSH)) + out_pipes [i]->flush (); + } + + // Detach the original message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::pub_t::xflush () +{ + out_pipes_t::size_type pipe_count = out_pipes.size (); + for (out_pipes_t::size_type i = 0; i != pipe_count; i++) + out_pipes [i]->flush (); + return 0; +} + +int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_) { errno = EFAULT; return -1; diff --git a/src/pub.hpp b/src/pub.hpp index 2f03b8e..8255c6f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -21,6 +21,7 @@ #define __ZMQ_PUB_INCLUDED__ #include "socket_base.hpp" +#include "yarray.hpp" namespace zmq { @@ -32,8 +33,27 @@ namespace zmq pub_t (class app_thread_t *parent_); ~pub_t (); - // Overloads of API functions from socket_base_t. - int recv (struct zmq_msg_t *msg_, int flags_); + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + // Outbound pipes, i.e. those the socket is sending messages to. + typedef yarray_t <class writer_t> out_pipes_t; + out_pipes_t out_pipes; + + pub_t (const pub_t&); + void operator = (const pub_t&); }; } diff --git a/src/rep.cpp b/src/rep.cpp new file mode 100644 index 0000000..2fbb66c --- /dev/null +++ b/src/rep.cpp @@ -0,0 +1,204 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "rep.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::rep_t::rep_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_REP), + active (0), + current (0), + waiting_for_reply (false), + reply_pipe (NULL) +{ +} + +zmq::rep_t::~rep_t () +{ +} + +bool zmq::rep_t::xrequires_in () +{ + return true; +} + +bool zmq::rep_t::xrequires_out () +{ + return true; +} + +void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && outpipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + in_pipes.push_back (inpipe_); + in_pipes.swap (active, in_pipes.size () - 1); + out_pipes.push_back (outpipe_); + out_pipes.swap (active, out_pipes.size () - 1); + active++; +} + +void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + in_pipes_t::size_type index = in_pipes.index (pipe_); + + // If corresponding outpipe is still in place simply nullify the pointer + // to the inpipe and move it to the passive state. + if (out_pipes [index]) { + in_pipes [index] = NULL; + if (in_pipes.index (pipe_) < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + } + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + if (in_pipes.index (pipe_) < active) + active--; + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + out_pipes_t::size_type index = out_pipes.index (pipe_); + + // TODO: If the connection we've got the request from disconnects, + // there's nowhere to send the reply. DLQ? + if (waiting_for_reply && pipe_ == reply_pipe) { + zmq_assert (false); + } + + // If corresponding inpipe is still in place simply nullify the pointer + // to the outpipe. + if (in_pipes [index]) { + out_pipes [index] = NULL; + if (out_pipes.index (pipe_) < active) { + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); + } + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + if (out_pipes.index (pipe_) < active) + active--; + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::rep_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + in_pipes_t::size_type index = in_pipes.index (pipe_); + active--; + in_pipes.swap (index, active); + out_pipes.swap (index, active); +} + +void zmq::rep_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes_t::size_type index = in_pipes.index (pipe_); + in_pipes.swap (index, active); + out_pipes.swap (index, active); + active++; +} + +int zmq::rep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + if (!waiting_for_reply) { + errno = EFAULT; + return -1; + } + + // TODO: Implement this once queue limits are in-place. If the reply + // overloads the buffer, connection should be torn down. + zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + reply_pipe->write (msg_); + reply_pipe->flush (); + + waiting_for_reply = false; + reply_pipe = NULL; + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); +} + +int zmq::rep_t::xflush () +{ + errno = EFAULT; + return -1; +} + +int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + if (waiting_for_reply) { + errno = EFAULT; + return -1; + } + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = in_pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) { + reply_pipe = out_pipes [current]; + waiting_for_reply = true; + return 0; + } + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + + diff --git a/src/rep.hpp b/src/rep.hpp new file mode 100644 index 0000000..6e55f47 --- /dev/null +++ b/src/rep.hpp @@ -0,0 +1,79 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_REP_INCLUDED__ +#define __ZMQ_REP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class rep_t : public socket_base_t + { + public: + + rep_t (class app_thread_t *parent_); + ~rep_t (); + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + // List in outbound and inbound pipes. Note that the two lists are + // always in sync. I.e. outpipe with index N communicates with the + // same session as inpipe with index N. + typedef yarray_t <class writer_t> out_pipes_t; + out_pipes_t out_pipes; + typedef yarray_t <class reader_t> in_pipes_t; + in_pipes_t in_pipes; + + // Number of active inpipes. All the active inpipes are located at the + // beginning of the in_pipes array. + in_pipes_t::size_type active; + + // Index of the next inbound pipe to read a request from. + in_pipes_t::size_type current; + + // If true, request was already received and reply wasn't sent yet. + bool waiting_for_reply; + + // Pipe we are going to send reply to. + class writer_t *reply_pipe; + + rep_t (const rep_t&); + void operator = (const rep_t&); + + }; + +} + +#endif diff --git a/src/req.cpp b/src/req.cpp new file mode 100644 index 0000000..05629df --- /dev/null +++ b/src/req.cpp @@ -0,0 +1,206 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "req.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::req_t::req_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_REQ), + current (0), + waiting_for_reply (false), + reply_pipe_active (false), + reply_pipe (NULL) +{ +} + +zmq::req_t::~req_t () +{ +} + +bool zmq::req_t::xrequires_in () +{ + return true; +} + +bool zmq::req_t::xrequires_out () +{ + return true; +} + +void zmq::req_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && outpipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + in_pipes.push_back (inpipe_); + out_pipes.push_back (outpipe_); +} + +void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + // TODO: The pipe we are awaiting the reply from is detached. What now? + // Return ECONNRESET from subsequent recv? + if (waiting_for_reply && pipe_ == reply_pipe) { + zmq_assert (false); + } + + in_pipes_t::size_type index = in_pipes.index (pipe_); + + // If corresponding outpipe is still in place simply nullify the pointer + // to the inpipe. + if (out_pipes [index]) { + in_pipes [index] = NULL; + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + zmq_assert (in_pipes.size () == out_pipes.size ()); + + out_pipes_t::size_type index = out_pipes.index (pipe_); + + // If corresponding inpipe is still in place simply nullify the pointer + // to the outpipe. + if (in_pipes [index]) { + out_pipes [index] = NULL; + return; + } + + // Now both inpipe and outpipe are detached. Remove them from the lists. + in_pipes.erase (index); + out_pipes.erase (index); +} + +void zmq::req_t::xkill (class reader_t *pipe_) +{ + zmq_assert (pipe_ == reply_pipe); + + reply_pipe_active = false; +} + +void zmq::req_t::xrevive (class reader_t *pipe_) +{ + // TODO: Actually, misbehaving peer can cause this kind of thing. + // Handle it decently, presumably kill the offending connection. + zmq_assert (pipe_ == reply_pipe); + + reply_pipe_active = true; +} + +int zmq::req_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::req_t::xsend (struct zmq_msg_t *msg_, int flags_) +{ + // If we've sent a request and we still haven't got the reply, + // we can't send another request. + if (waiting_for_reply) { + errno = EFAULT; + return -1; + } + + if (out_pipes.empty ()) { + errno = EFAULT; + return -1; + } + + current++; + if (current >= out_pipes.size ()) + current = 0; + + // TODO: Infinite loop can result here. Integrate the algorithm with + // the active pipes list (i.e. pipe pair that has one pipe missing is + // considered to be inactive. + while (!in_pipes [current] || !out_pipes [current]) { + current++; + if (current >= out_pipes.size ()) + current = 0; + } + + // TODO: Implement this once queue limits are in-place. + zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + out_pipes [current]->write (msg_); + out_pipes [current]->flush (); + + waiting_for_reply = true; + reply_pipe = in_pipes [current]; + + // We can safely assume that the reply pipe is active as the last time + // we've used it we've read the reply and haven't tried to read from it + // anymore. + reply_pipe_active = true; + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::req_t::xflush () +{ + errno = EFAULT; + return -1; +} + +int zmq::req_t::xrecv (struct zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // If request wasn't send, we can't wait for reply. + if (!waiting_for_reply) { + zmq_msg_init (msg_); + errno = EFAULT; + return -1; + } + + // Get the reply from the reply pipe. + if (!reply_pipe_active || !reply_pipe->read (msg_)) { + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; + } + + waiting_for_reply = false; + reply_pipe = NULL; + + return 0; +} + + diff --git a/src/req.hpp b/src/req.hpp new file mode 100644 index 0000000..9158fbe --- /dev/null +++ b/src/req.hpp @@ -0,0 +1,84 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_REQ_INCLUDED__ +#define __ZMQ_REQ_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class req_t : public socket_base_t + { + public: + + req_t (class app_thread_t *parent_); + ~req_t (); + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); + + private: + + // List in outbound and inbound pipes. Note that the two lists are + // always in sync. I.e. outpipe with index N communicates with the + // same session as inpipe with index N. + // + // TODO: Once we have queue limits in place, list of active outpipes + // is to be held (presumably by stacking active outpipes at + // the beginning of the array). We don't have to do the same thing for + // inpipes, because we know which pipe we want to read the + // reply from. + typedef yarray_t <class writer_t> out_pipes_t; + out_pipes_t out_pipes; + typedef yarray_t <class reader_t> in_pipes_t; + in_pipes_t in_pipes; + + // Req_t load-balances the requests - 'current' points to the session + // that's processing the request at the moment. + out_pipes_t::size_type current; + + // If true, request was already sent and reply wasn't received yet. + bool waiting_for_reply; + + // True, if read can be attempted from the reply pipe. + bool reply_pipe_active; + + // Pipe we are awaiting the reply from. + class reader_t *reply_pipe; + + req_t (const req_t&); + void operator = (const req_t&); + }; + +} + +#endif diff --git a/src/session.cpp b/src/session.cpp index d455462..b829ae9 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -46,11 +46,7 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) if (!active) return false; - bool fetched = in_pipe->read (msg_); - if (!fetched) - active = false; - - return fetched; + return in_pipe->read (msg_); } bool zmq::session_t::write (::zmq_msg_t *msg_) @@ -84,38 +80,45 @@ void zmq::session_t::detach () term (); } -void zmq::session_t::attach_inpipe (reader_t *pipe_) +void zmq::session_t::attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) { - zmq_assert (!in_pipe); - in_pipe = pipe_; - active = true; - in_pipe->set_endpoint (this); + if (inpipe_) { + zmq_assert (!in_pipe); + in_pipe = inpipe_; + active = true; + in_pipe->set_endpoint (this); + } + + if (outpipe_) { + zmq_assert (!out_pipe); + out_pipe = outpipe_; + out_pipe->set_endpoint (this); + } } -void zmq::session_t::attach_outpipe (writer_t *pipe_) +void zmq::session_t::detach_inpipe (reader_t *pipe_) { - zmq_assert (!out_pipe); - out_pipe = pipe_; - out_pipe->set_endpoint (this); + active = false; + in_pipe = NULL; } -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::detach_outpipe (writer_t *pipe_) { - zmq_assert (in_pipe == pipe_); - active = true; - if (engine) - engine->revive (); + out_pipe = NULL; } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::kill (reader_t *pipe_) { active = false; - in_pipe = NULL; } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::revive (reader_t *pipe_) { - out_pipe = NULL; + zmq_assert (in_pipe == pipe_); + active = true; + if (engine) + engine->revive (); } void zmq::session_t::process_plug () diff --git a/src/session.hpp b/src/session.hpp index 55aa8ea..64900f2 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -44,11 +44,11 @@ namespace zmq void detach (); // i_endpoint interface implementation. - void attach_inpipe (class reader_t *pipe_); - void attach_outpipe (class writer_t *pipe_); - void revive (class reader_t *pipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); + void kill (class reader_t *pipe_); + void revive (class reader_t *pipe_); private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bb8e7c9..c669e04 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,7 +27,6 @@ #include "dispatcher.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" -#include "msg_content.hpp" #include "io_thread.hpp" #include "session.hpp" #include "config.hpp" @@ -42,145 +41,28 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : object_t (parent_), type (type_), - current (0), - active (0), pending_term_acks (0), ticks (0), app_thread (parent_), - shutting_down (false), - index (-1) + shutting_down (false) { } zmq::socket_base_t::~socket_base_t () { - shutting_down = true; - - // Ask all pipes to terminate. - for (in_pipes_t::iterator it = in_pipes.begin (); - it != in_pipes.end (); it++) - (*it)->term (); - in_pipes.clear (); - for (out_pipes_t::iterator it = out_pipes.begin (); - it != out_pipes.end (); it++) - (*it)->term (); - out_pipes.clear (); - - while (true) { - - // On third pass of the loop there should be no more I/O objects - // because all connecters and listerners were destroyed during - // the first pass and all engines delivered by delayed 'own' commands - // are destroyed during the second pass. - if (io_objects.empty () && !pending_term_acks) - break; - - // Send termination request to all associated I/O objects. - for (io_objects_t::iterator it = io_objects.begin (); - it != io_objects.end (); it++) - send_term (*it); - - // Move the objects to the list of pending term acks. - pending_term_acks += io_objects.size (); - io_objects.clear (); - - // Process commands till we get all the termination acknowledgements. - while (pending_term_acks) - app_thread->process_commands (true, false); - } - - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (sessions.empty ()); - sessions_sync.unlock (); } int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - switch (option_) { - - case ZMQ_HWM: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.hwm = *((int64_t*) optval_); - return 0; - - case ZMQ_LWM: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.lwm = *((int64_t*) optval_); - return 0; - - case ZMQ_SWAP: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.swap = *((int64_t*) optval_); - return 0; - - case ZMQ_AFFINITY: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.affinity = (uint64_t) *((int64_t*) optval_); - return 0; - - case ZMQ_IDENTITY: - options.identity.assign ((const char*) optval_, optvallen_); - return 0; - - case ZMQ_SUBSCRIBE: - case ZMQ_UNSUBSCRIBE: - errno = EFAULT; - return -1; - - case ZMQ_RATE: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.rate = (uint32_t) *((int64_t*) optval_); - return 0; - - case ZMQ_RECOVERY_IVL: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.recovery_ivl = (uint32_t) *((int64_t*) optval_); - return 0; - - case ZMQ_MCAST_LOOP: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - - if ((int64_t) *((int64_t*) optval_) == 0) { - - options.use_multicast_loop = false; - - } else if ((int64_t) *((int64_t*) optval_) == 1) { - - options.use_multicast_loop = true; - - } else { - errno = EINVAL; - return -1; - } - return 0; - - default: - errno = EINVAL; - return -1; - } + // First, check whether specific socket type overloads the option. + int rc = xsetsockopt (option_, optval_, optvallen_); + if (rc == 0 || errno != EINVAL) + return rc; + + // If the socket type doesn't support the option, pass it to + // the generic option parser. + return options.setsockopt (option_, optval_, optvallen_); } int zmq::socket_base_t::bind (const char *addr_) @@ -251,23 +133,29 @@ int zmq::socket_base_t::connect (const char *addr_) options, true); zmq_assert (session); - // Create inbound pipe. - pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); - zmq_assert (in_pipe); - in_pipe->reader.set_endpoint (this); - session->attach_outpipe (&in_pipe->writer); - in_pipes.push_back (&in_pipe->reader); - in_pipes.back ()->set_index (active); - in_pipes [active]->set_index (in_pipes.size () - 1); - std::swap (in_pipes.back (), in_pipes [active]); - active++; - - // Create outbound pipe. - pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); - zmq_assert (out_pipe); - out_pipe->writer.set_endpoint (this); - session->attach_inpipe (&out_pipe->reader); - out_pipes.push_back (&out_pipe->writer); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; + + // Create inbound pipe, if required. + if (xrequires_in ()) { + in_pipe = new pipe_t (this, session, options.hwm, options.lwm); + zmq_assert (in_pipe); + + } + + // Create outbound pipe, if required. + if (xrequires_out ()) { + out_pipe = new pipe_t (session, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + } + + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); // Activate the session. send_plug (session); @@ -294,6 +182,13 @@ int zmq::socket_base_t::connect (const char *addr_) #if defined ZMQ_HAVE_OPENPGM if (addr_type == "pgm" || addr_type == "udp") { + // If the socket type requires bi-directional communication + // multicast is not an option (it is uni-directional). + if (xrequires_in () && xrequires_out ()) { + errno = EFAULT; + return -1; + } + // For udp, pgm transport with udp encapsulation is used. bool udp_encapsulation = false; if (addr_type == "udp") @@ -365,56 +260,61 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) app_thread->process_commands (false, true); // Try to send the message. - bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); - - if (!(flags_ & ZMQ_NOBLOCK)) { + int rc = xsend (msg_, flags_); + if (rc == 0) + return 0; - // Oops, we couldn't send the message. Wait for the next - // command, process it and try to send the message again. - while (!sent) { - app_thread->process_commands (true, false); - sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); - } - } - else if (!sent) { - errno = EAGAIN; + // In case of non-blocking send we'll simply propagate + // the error - including EAGAIN - upwards. + if (flags_ & ZMQ_NOBLOCK) return -1; - } + // Oops, we couldn't send the message. Wait for the next + // command, process it and try to send the message again. + while (rc != 0) { + if (errno != EAGAIN) + return -1; + app_thread->process_commands (true, false); + rc = xsend (msg_, flags_); + } return 0; } int zmq::socket_base_t::flush () { - for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); - it++) - (*it)->flush (); - - return 0; + return xflush (); } int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { + // Get the message and return immediately if successfull. + int rc = xrecv (msg_, flags_); + if (rc == 0) + return 0; + // If the message cannot be fetched immediately, there are two scenarios. - // For non-blocking recv, commands are processed in case there's a message - // already waiting we don't know about. If it's not, return EAGAIN. + // For non-blocking recv, commands are processed in case there's a revive + // command already waiting int a command pipe. If it's not, return EAGAIN. // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. - bool fetched = fetch (msg_); - if (!fetched) { - if (flags_ & ZMQ_NOBLOCK) { - app_thread->process_commands (false, false); - fetched = fetch (msg_); - } - else { - while (!fetched) { - app_thread->process_commands (true, false); - ticks = 0; - fetched = fetch (msg_); - } + if (flags_ & ZMQ_NOBLOCK) { + if (errno != EAGAIN) + return -1; + app_thread->process_commands (false, false); + ticks = 0; + rc = xrecv (msg_, flags_); + } + else { + while (rc != 0) { + if (errno != EAGAIN) + return -1; + app_thread->process_commands (true, false); + ticks = 0; + rc = xrecv (msg_, flags_); } } + // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether // because there are messages available all the time. If poll occurs, @@ -428,12 +328,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ticks = 0; } - if (!fetched) { - errno = EAGAIN; - return -1; - } - - return 0; + return rc; } int zmq::socket_base_t::close () @@ -443,6 +338,37 @@ int zmq::socket_base_t::close () // Pointer to the dispatcher must be retrieved before the socket is // deallocated. Afterwards it is not available. dispatcher_t *dispatcher = get_dispatcher (); + + shutting_down = true; + + while (true) { + + // On third pass of the loop there should be no more I/O objects + // because all connecters and listerners were destroyed during + // the first pass and all engines delivered by delayed 'own' commands + // are destroyed during the second pass. + if (io_objects.empty () && !pending_term_acks) + break; + + // Send termination request to all associated I/O objects. + for (io_objects_t::iterator it = io_objects.begin (); + it != io_objects.end (); it++) + send_term (*it); + + // Move the objects to the list of pending term acks. + pending_term_acks += io_objects.size (); + io_objects.clear (); + + // Process commands till we get all the termination acknowledgements. + while (pending_term_acks) + app_thread->process_commands (true, false); + } + + // Check whether there are no session leaks. + sessions_sync.lock (); + zmq_assert (sessions.empty ()); + sessions_sync.unlock (); + delete this; // This function must be called after the socket is completely deallocated @@ -488,68 +414,36 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) return it->second; } -void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_) +void zmq::socket_base_t::kill (reader_t *pipe_) { - pipe_->set_endpoint (this); - in_pipes.push_back (pipe_); - in_pipes.back ()->set_index (active); - in_pipes [active]->set_index (in_pipes.size () - 1); - std::swap (in_pipes.back (), in_pipes [active]); - active++; + xkill (pipe_); } -void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_) +void zmq::socket_base_t::revive (reader_t *pipe_) { - pipe_->set_endpoint (this); - out_pipes.push_back (pipe_); - pipe_->set_index (out_pipes.size () - 1); + xrevive (pipe_); } -void zmq::socket_base_t::revive (reader_t *pipe_) +void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) { - // Move the pipe to the list of active pipes. - in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); - in_pipes [index]->set_index (active); - in_pipes [active]->set_index (index); - std::swap (in_pipes [index], in_pipes [active]); - active++; + if (inpipe_) + inpipe_->set_endpoint (this); + if (outpipe_) + outpipe_->set_endpoint (this); + xattach_pipes (inpipe_, outpipe_); } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) { - // Remove the pipe from the list of inbound pipes. - in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); - if (index < active) { - in_pipes [index]->set_index (active - 1); - in_pipes [active - 1]->set_index (index); - std::swap (in_pipes [index], in_pipes [active - 1]); - active--; - index = active; - } - in_pipes [index]->set_index (in_pipes.size () - 1); - in_pipes [in_pipes.size () - 1]->set_index (index); - std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]); - in_pipes.pop_back (); + xdetach_inpipe (pipe_); + pipe_->set_endpoint (NULL); // ? } void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) { - out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index (); - out_pipes [index]->set_index (out_pipes.size () - 1); - out_pipes [out_pipes.size () - 1]->set_index (index); - std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]); - out_pipes.pop_back (); -} - -void zmq::socket_base_t::set_index (int index_) -{ - index = index_; -} - -int zmq::socket_base_t::get_index () -{ - zmq_assert (index != -1); - return index; + xdetach_outpipe (pipe_); + pipe_->set_endpoint (NULL); // ? } void zmq::socket_base_t::process_own (owned_t *object_) @@ -560,10 +454,7 @@ void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_bind (owned_t *session_, reader_t *in_pipe_, writer_t *out_pipe_) { - zmq_assert (in_pipe_); - attach_inpipe (in_pipe_); - zmq_assert (out_pipe_); - attach_outpipe (out_pipe_); + attach_pipes (in_pipe_, out_pipe_); } void zmq::socket_base_t::process_term_req (owned_t *object_) @@ -593,106 +484,3 @@ void zmq::socket_base_t::process_term_ack () pending_term_acks--; } -bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) -{ - int pipes_count = out_pipes.size (); - - // If there are no pipes available, simply drop the message. - if (pipes_count == 0) { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return true; - } - - // First check whether all pipes are available for writing. - for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); - it++) - if (!(*it)->check_write (zmq_msg_size (msg_))) - return false; - - msg_content_t *content = (msg_content_t*) msg_->content; - - // For VSMs the copying is straighforward. - if (content == (msg_content_t*) ZMQ_VSM) { - for (out_pipes_t::iterator it = out_pipes.begin (); - it != out_pipes.end (); it++) { - (*it)->write (msg_); - if (flush_) - (*it)->flush (); - } - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return true; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment i.e. no atomic - // operations are needed. - if (pipes_count == 1) { - (*out_pipes.begin ())->write (msg_); - if (flush_) - (*out_pipes.begin ())->flush (); - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return true; - } - - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why -1). - if (msg_->shared) - content->refcnt.add (pipes_count - 1); - else { - content->refcnt.set (pipes_count); - msg_->shared = true; - } - - // Push the message to all destinations. - for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); - it++) { - (*it)->write (msg_); - if (flush_) - (*it)->flush (); - } - - // Detach the original message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - - return true; -} - -bool zmq::socket_base_t::fetch (zmq_msg_t *msg_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - - bool fetched = in_pipes [current]->read (msg_); - - // If there's no message in the pipe, move it to the list of - // non-active pipes. - if (!fetched) { - in_pipes [current]->set_index (active - 1); - in_pipes [active - 1]->set_index (current); - std::swap (in_pipes [current], in_pipes [active - 1]); - active--; - } - - current ++; - if (current >= active) - current = 0; - - if (fetched) - return true; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - return false; -} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 398cd32..120c932 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -27,6 +27,7 @@ #include "i_endpoint.hpp" #include "object.hpp" +#include "yarray_item.hpp" #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" @@ -34,41 +35,59 @@ namespace zmq { - class socket_base_t : public object_t, public i_endpoint + class socket_base_t : + public object_t, public i_endpoint, public yarray_item_t { public: socket_base_t (class app_thread_t *parent_, int type_); - virtual ~socket_base_t (); // Interface for communication with the API layer. - virtual int setsockopt (int option_, const void *optval_, + int setsockopt (int option_, const void *optval_, size_t optvallen_); - virtual int bind (const char *addr_); - virtual int connect (const char *addr_); - virtual int send (struct zmq_msg_t *msg_, int flags_); - virtual int flush (); - virtual int recv (struct zmq_msg_t *msg_, int flags_); - virtual int close (); + int bind (const char *addr_); + int connect (const char *addr_); + int send (struct zmq_msg_t *msg_, int flags_); + int flush (); + int recv (struct zmq_msg_t *msg_, int flags_); + int close (); // The list of sessions cannot be accessed via inter-thread // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application - // thread to 0MQ. + // thread to 0MQ. Locking is used instead. bool register_session (const char *name_, class session_t *session_); bool unregister_session (const char *name_); class session_t *find_session (const char *name_); // i_endpoint interface implementation. - void attach_inpipe (class reader_t *pipe_); - void attach_outpipe (class writer_t *pipe_); - void revive (class reader_t *pipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); + void kill (class reader_t *pipe_); + void revive (class reader_t *pipe_); - // Manipulating index in the app_thread's list of sockets. - void set_index (int index); - int get_index (); + protected: + + // Destructor is protected. Socket is closed using 'close' function. + virtual ~socket_base_t (); + + // Pipe management is done by individual socket types. + virtual bool xrequires_in () = 0; + virtual bool xrequires_out () = 0; + virtual void xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) = 0; + virtual void xdetach_inpipe (class reader_t *pipe_) = 0; + virtual void xdetach_outpipe (class writer_t *pipe_) = 0; + virtual void xkill (class reader_t *pipe_) = 0; + virtual void xrevive (class reader_t *pipe_) = 0; + + // Actual algorithms are to be defined by individual socket types. + virtual int xsetsockopt (int option_, const void *optval_, + size_t optvallen_) = 0; + virtual int xsend (struct zmq_msg_t *msg_, int options_) = 0; + virtual int xflush () = 0; + virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0; private: @@ -79,14 +98,6 @@ namespace zmq void process_term_req (class owned_t *object_); void process_term_ack (); - // Attempts to distribute the message to all the outbound pipes. - // Returns false if not possible because of pipe overflow. - bool distribute (struct zmq_msg_t *msg_, bool flush_); - - // Gets a message from one of the inbound pipes. Implementation of - // fair queueing. - bool fetch (struct zmq_msg_t *msg_); - // Type of the socket. int type; @@ -95,21 +106,6 @@ namespace zmq typedef std::set <class owned_t*> io_objects_t; io_objects_t io_objects; - // Inbound pipes, i.e. those the socket is getting messages from. - typedef std::vector <class reader_t*> in_pipes_t; - in_pipes_t in_pipes; - - // Index of the next inbound pipe to read messages from. - in_pipes_t::size_type current; - - // Number of active inbound pipes. Active pipes are stored in the - // initial section of the in_pipes array. - in_pipes_t::size_type active; - - // Outbound pipes, i.e. those the socket is sending messages to. - typedef std::vector <class writer_t*> out_pipes_t; - out_pipes_t out_pipes; - // Number of I/O objects that were already asked to terminate // but haven't acknowledged it yet. int pending_term_acks; @@ -138,9 +134,6 @@ namespace zmq sessions_t sessions; mutex_t sessions_sync; - // Index of the socket in the app_thread's list of sockets. - int index; - socket_base_t (const socket_base_t&); void operator = (const socket_base_t&); }; diff --git a/src/sub.cpp b/src/sub.cpp index 515a843..73510c6 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,18 +21,69 @@ #include "sub.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_, ZMQ_SUB), + active (0), + current (0), all_count (0) { } zmq::sub_t::~sub_t () { + for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) + in_pipes [i]->term (); + in_pipes.clear (); } -int zmq::sub_t::setsockopt (int option_, const void *optval_, +bool zmq::sub_t::xrequires_in () +{ + return true; +} + +bool zmq::sub_t::xrequires_out () +{ + return false; +} + +void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!outpipe_); + in_pipes.push_back (inpipe_); + in_pipes.swap (active, in_pipes.size () - 1); + active++; +} + +void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) +{ + if (in_pipes.index (pipe_) < active) + active--; + in_pipes.erase (pipe_); +} + +void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::sub_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + in_pipes.swap (in_pipes.index (pipe_), active - 1); + active--; +} + +void zmq::sub_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes.swap (in_pipes.index (pipe_), active); + active++; +} + +int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ == ZMQ_SUBSCRIBE) { @@ -75,27 +126,28 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, return 0; } - return socket_base_t::setsockopt (option_, optval_, optvallen_); + errno = EINVAL; + return -1; } -int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) +int zmq::sub_t::xsend (struct zmq_msg_t *msg_, int flags_) { errno = EFAULT; return -1; } -int zmq::sub_t::flush () +int zmq::sub_t::xflush () { errno = EFAULT; return -1; } -int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) +int zmq::sub_t::xrecv (struct zmq_msg_t *msg_, int flags_) { while (true) { - // Get a message. - int rc = socket_base_t::recv (msg_, flags_); + // Get a message using fair queueing algorithm. + int rc = fq (msg_, flags_); // If there's no message available, return immediately. if (rc != 0 && errno == EAGAIN) @@ -131,3 +183,25 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) return 0; } } + +int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = in_pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} diff --git a/src/sub.hpp b/src/sub.hpp index 14fa687..29da27a 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,6 +24,7 @@ #include <string> #include "socket_base.hpp" +#include "yarray.hpp" namespace zmq { @@ -35,14 +36,38 @@ namespace zmq sub_t (class app_thread_t *parent_); ~sub_t (); - // Overloads of API functions from socket_base_t. - int setsockopt (int option_, const void *optval_, size_t optvallen_); - int send (struct zmq_msg_t *msg_, int flags_); - int flush (); - int recv (struct zmq_msg_t *msg_, int flags_); + protected: + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); private: + // Helper function to return one message choosed using + // fair queueing algorithm. + int fq (struct zmq_msg_t *msg_, int flags_); + + // Inbound pipes, i.e. those the socket is getting messages from. + typedef yarray_t <class reader_t> in_pipes_t; + in_pipes_t in_pipes; + + // Number of active inbound pipes. Active pipes are stored in the + // initial section of the in_pipes array. + in_pipes_t::size_type active; + + // Index of the next inbound pipe to read messages from. + in_pipes_t::size_type current; + // Number of active "*" subscriptions. int all_count; @@ -52,6 +77,9 @@ namespace zmq // List of all exact match subscriptions. subscriptions_t topics; + + sub_t (const sub_t&); + void operator = (const sub_t&); }; } diff --git a/src/yarray.hpp b/src/yarray.hpp new file mode 100644 index 0000000..b2d3f1d --- /dev/null +++ b/src/yarray.hpp @@ -0,0 +1,110 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_YARRAY_INCLUDED__ +#define __ZMQ_YARRAY_INCLUDED__ + +#include <vector> +#include <algorithm> + +namespace zmq +{ + + // Fast array implementation with O(1) access to item, insertion and + // removal. Yarray stores pointers rather than objects. The objects have + // to be derived from yarray_item_t class. + + template <typename T> class yarray_t + { + public: + + typedef typename std::vector <T*>::size_type size_type; + + inline yarray_t () + { + } + + inline ~yarray_t () + { + } + + inline size_type size () + { + return items.size (); + } + + inline bool empty () + { + return items.empty (); + } + + inline T *&operator [] (size_type index_) + { + return items [index_]; + } + + inline void push_back (T *item_) + { + if (item_) + item_->set_yarray_index (items.size ()); + items.push_back (item_); + } + + inline void erase (T *item_) { + erase (item_->get_yarray_index ()); + } + + inline void erase (size_type index_) { + if (items.back ()) + items.back ()->set_yarray_index (index_); + items [index_] = items.back (); + items.pop_back (); + } + + inline void swap (size_type index1_, size_type index2_) + { + if (items [index1_]) + items [index1_]->set_yarray_index (index2_); + if (items [index2_]) + items [index2_]->set_yarray_index (index1_); + std::swap (items [index1_], items [index2_]); + } + + inline void clear () + { + items.clear (); + } + + inline size_type index (T *item_) + { + return (size_type) item_->get_yarray_index (); + } + + private: + + typedef std::vector <T*> items_t; + items_t items; + + yarray_t (const yarray_t&); + void operator = (const yarray_t&); + }; + +} + +#endif diff --git a/src/yarray_item.hpp b/src/yarray_item.hpp new file mode 100644 index 0000000..1de62b8 --- /dev/null +++ b/src/yarray_item.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_YARRAY_ITEM_INCLUDED__ +#define __ZMQ_YARRAY_ITEM_INCLUDED__ + +namespace zmq +{ + + // Base class for objects stored in yarray. Note that each object can + // be stored in at most one yarray. + + class yarray_item_t + { + public: + + inline yarray_item_t () : + yarray_index (-1) + { + } + + inline ~yarray_item_t () + { + } + + inline void set_yarray_index (int index_) + { + yarray_index = index_; + } + + inline int get_yarray_index () + { + return yarray_index; + } + + private: + + int yarray_index; + + yarray_item_t (const yarray_item_t&); + void operator = (const yarray_item_t&); + }; + +} + +#endif |