diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/app_thread.cpp | 14 | ||||
-rw-r--r-- | src/command.hpp | 4 | ||||
-rw-r--r-- | src/devpoll.cpp | 3 | ||||
-rw-r--r-- | src/dispatcher.cpp | 56 | ||||
-rw-r--r-- | src/dispatcher.hpp | 12 | ||||
-rw-r--r-- | src/downstream.cpp | 131 | ||||
-rw-r--r-- | src/downstream.hpp | 64 | ||||
-rw-r--r-- | src/kqueue.cpp | 3 | ||||
-rw-r--r-- | src/object.cpp | 29 | ||||
-rw-r--r-- | src/object.hpp | 14 | ||||
-rw-r--r-- | src/p2p.hpp | 4 | ||||
-rw-r--r-- | src/pipe.cpp | 6 | ||||
-rw-r--r-- | src/pub.hpp | 4 | ||||
-rw-r--r-- | src/rep.cpp | 11 | ||||
-rw-r--r-- | src/rep.hpp | 4 | ||||
-rw-r--r-- | src/req.hpp | 4 | ||||
-rw-r--r-- | src/session.cpp | 10 | ||||
-rw-r--r-- | src/simple_semaphore.hpp | 60 | ||||
-rw-r--r-- | src/socket_base.cpp | 72 | ||||
-rw-r--r-- | src/socket_base.hpp | 16 | ||||
-rw-r--r-- | src/sub.hpp | 4 | ||||
-rw-r--r-- | src/upstream.cpp | 143 | ||||
-rw-r--r-- | src/upstream.hpp | 69 | ||||
-rw-r--r-- | src/zmq.cpp | 6 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 8 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 7 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 1 |
28 files changed, 710 insertions, 53 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 91fb555..3d038b7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ decoder.hpp \ devpoll.hpp \ dispatcher.hpp \ + downstream.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + upstream.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ + downstream.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ @@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + upstream.cpp \ uuid.cpp \ ypollset.cpp \ zmq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbda335..a671822 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -40,11 +40,13 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "p2p.hpp" #include "pub.hpp" #include "sub.hpp" #include "req.hpp" #include "rep.hpp" -#include "p2p.hpp" +#include "upstream.hpp" +#include "downstream.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_P2P: + s = new p2p_t (this); + break; case ZMQ_PUB: s = new pub_t (this); break; @@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; - case ZMQ_P2P: - s = new p2p_t (this); + case ZMQ_UPSTREAM: + s = new upstream_t (this); + break; + case ZMQ_DOWNSTREAM: + s = new downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/command.hpp b/src/command.hpp index 9a2e5d5..3099852 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -69,10 +69,12 @@ namespace zmq } attach; // Sent from session to socket to establish pipe(s) between them. + // If adjust_seqnum is true, caller have used inc_seqnum beforehand + // and thus the callee should take care of catching up. struct { - class owned_t *session; class reader_t *in_pipe; class writer_t *out_pipe; + bool adjust_seqnum; } bind; // Sent by pipe writer to inform dormant pipe reader that there diff --git a/src/devpoll.cpp b/src/devpoll.cpp index f28d55e..0ee772b 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -37,7 +37,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::devpoll_t::devpoll_t () +zmq::devpoll_t::devpoll_t () : + stopping (false) { // Get limit on open files struct rlimit rl; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1f6b4f0..1e41ee8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -20,6 +20,7 @@ #include "../bindings/c/zmq.h" #include "dispatcher.hpp" +#include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) zmq_assert (erased == 1); pipes_sync.unlock (); } + +int zmq::dispatcher_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 23b6a33..8364d4d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -97,6 +97,11 @@ namespace zmq void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); + // Management of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + private: ~dispatcher_t (); @@ -149,6 +154,13 @@ namespace zmq // and 'terminated' flag). mutex_t term_sync; + // List of inproc endpoints within this context. + typedef std::map <std::string, class socket_base_t*> endpoints_t; + endpoints_t endpoints; + + // Synchronisation of access to the list of inproc endpoints. + mutex_t endpoints_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/downstream.cpp b/src/downstream.cpp new file mode 100644 index 0000000..4f994e6 --- /dev/null +++ b/src/downstream.cpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "downstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::downstream_t::downstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + current (0) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::downstream_t::~downstream_t () +{ +} + +void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_ && outpipe_); + pipes.push_back (outpipe_); +} + +void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + pipes.erase (pipes.index (pipe_)); +} + +void zmq::downstream_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= pipes.size ()) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::downstream_t::xflush () +{ + // TODO: Maybe there's a point in flushing messages downstream. + // It may be useful in the case where number of messages in a single + // transaction is much greater than the number of attached pipes. + errno = ENOTSUP; + return -1; + +} + +int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::downstream_t::xhas_in () +{ + return false; +} + +bool zmq::downstream_t::xhas_out () +{ + // TODO: Modify this code once pipe limits are in place. + return true; +} + + diff --git a/src/downstream.hpp b/src/downstream.hpp new file mode 100644 index 0000000..c6a7ed8 --- /dev/null +++ b/src/downstream.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class downstream_t : public socket_base_t + { + public: + + downstream_t (class app_thread_t *parent_); + ~downstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // List of outbound pipes. + typedef yarray_t <class writer_t> pipes_t; + pipes_t pipes; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + downstream_t (const downstream_t&); + void operator = (const downstream_t&); + }; + +} + +#endif diff --git a/src/kqueue.cpp b/src/kqueue.cpp index f32fa36..69ad0c8 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -33,7 +33,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::kqueue_t::kqueue_t () +zmq::kqueue_t::kqueue_t () : + stopping (false) { // Create event queue kqueue_fd = kqueue (); diff --git a/src/object.cpp b/src/object.cpp index 1433b7b..b5d5eee 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_) return; case command_t::bind: - process_bind (cmd_.args.bind.session, - cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + cmd_.args.bind.adjust_seqnum); return; case command_t::pipe_term: @@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_) dispatcher->unregister_pipe (pipe_); } +int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) +{ + return dispatcher->register_endpoint (addr_, socket_); +} + +void zmq::object_t::unregister_endpoints (socket_base_t *socket_) +{ + return dispatcher->unregister_endpoints (socket_); +} + +zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) +{ + return dispatcher->find_endpoint (addr_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); @@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) send_command (cmd); } -void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::send_bind (object_t *destination_, + reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_) { command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; - cmd.args.bind.session = session_; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + cmd.args.bind.adjust_seqnum = adjust_seqnum_; send_command (cmd); } @@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_) zmq_assert (false); } -void zmq::object_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 1954071..4fd0a8e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,6 +49,12 @@ namespace zmq protected: + // Using following function, socket is able to access global + // repository of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + // Derived object can use following functions to interact with // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); @@ -62,8 +68,8 @@ namespace zmq class owned_t *object_); void send_attach (class session_t *destination_, struct i_engine *engine_); - void send_bind (object_t *destination_, class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void send_bind (object_t *destination_, class reader_t *in_pipe_, + class writer_t *out_pipe_, bool adjust_seqnum_); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -78,8 +84,8 @@ namespace zmq virtual void process_plug (); virtual void process_own (class owned_t *object_); virtual void process_attach (struct i_engine *engine_); - virtual void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + virtual void process_bind (class reader_t *in_pipe_, + class writer_t *out_pipe_, bool adjust_seqnum_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/p2p.hpp b/src/p2p.hpp index 1fd7e34..32d7755 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_P2P_INCLUDED__ -#define __ZMQ_P2P_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" diff --git a/src/pipe.cpp b/src/pipe.cpp index e444520..0e15dce 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -81,7 +81,11 @@ void zmq::reader_t::term () void zmq::reader_t::process_revive () { - endpoint->revive (this); + // Beacuse of command throttling mechanism, incoming termination request + // may not have been processed before subsequent send. + // In that case endpoint is NULL. + if (endpoint) + endpoint->revive (this); } void zmq::reader_t::process_pipe_term_ack () diff --git a/src/pub.hpp b/src/pub.hpp index b3e868d..9dbcb4a 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_PUB_INCLUDED__ -#define __ZMQ_PUB_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/rep.cpp b/src/rep.cpp index e8a9e39..f06f4ab 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) } // Now both inpipe and outpipe are detached. Remove them from the lists. - if (in_pipes.index (pipe_) < active) + if (index < active) active--; in_pipes.erase (index); out_pipes.erase (index); @@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { bool fetched = in_pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; if (fetched) { reply_pipe = out_pipes [current]; waiting_for_reply = true; - return 0; } + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; } // No message is available. Initialise the output parameter diff --git a/src/rep.hpp b/src/rep.hpp index 3e87dc1..0b327aa 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_REP_INCLUDED__ -#define __ZMQ_REP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/req.hpp b/src/req.hpp index 86554b5..756cc42 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_REQ_INCLUDED__ -#define __ZMQ_REQ_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/session.cpp b/src/session.cpp index eb0a963..87b47b0 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - // The communication is unidirectional. - // We don't expect any message to arrive. - zmq_assert (out_pipe); - if (out_pipe->write (msg_)) { zmq_msg_init (msg_); return true; @@ -155,8 +151,10 @@ void zmq::session_t::process_plug () out_pipe->set_endpoint (this); } - send_bind (owner, this, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); + // Note that initial call to inc_seqnum was optimised out. Last + // parameter conveys the fact to the callee. + send_bind (owner, outbound ? &outbound->reader : NULL, + inbound ? &inbound->writer : NULL, false); } owned_t::process_plug (); diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp index 209ccb4..3342281 100644 --- a/src/simple_semaphore.hpp +++ b/src/simple_semaphore.hpp @@ -23,7 +23,11 @@ #include "platform.hpp" #include "err.hpp" -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX +#include <sys/syscall.h> +#include <unistd.h> +#include <linux/futex.h> +#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #include <pthread.h> #elif defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -33,13 +37,63 @@ namespace zmq { - // Simple semaphore. Only single thread may be waiting at any given time. // Also, the semaphore may not be posted before the previous post // was matched by corresponding wait and the waiting thread was // released. -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX + + // In theory, using private futexes should be more efficient on Linux + // platform than using mutexes. However, in uncontended cases of TCP + // transport on loopback interface we haven't seen any latency improvement. + // The code is commented out waiting for more thorough testing. + + class simple_semaphore_t + { + public: + + // Initialise the semaphore. + inline simple_semaphore_t () : + dummy (0) + { + } + + // Destroy the semaphore. + inline ~simple_semaphore_t () + { + } + + // Wait for the semaphore. + inline void wait () + { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE, + (int) 0, NULL, NULL, (int) 0); + zmq_assert (rc == 0); + } + + // Post the semaphore. + inline void post () + { + while (true) { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE, + (int) 1, NULL, NULL, (int) 0); + zmq_assert (rc != -1 && rc <= 1); + if (rc == 1) + break; + } + } + + private: + + int dummy; + + // Disable copying of the object. + simple_semaphore_t (const simple_semaphore_t&); + void operator = (const simple_semaphore_t&); + }; + +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS // On platforms that allow for double locking of a mutex from the same // thread, simple semaphore is implemented using mutex, as it is more diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6583608..a614759 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : pending_term_acks (0), ticks (0), app_thread (parent_), - shutting_down (false) + shutting_down (false), + sent_seqnum (0), + processed_seqnum (0) { } @@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") + return register_endpoint (addr_args.c_str (), this); + if (addr_type == "tcp") { zmq_listener_t *listener = new zmq_listener_t ( choose_io_thread (options.affinity), this, options); @@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") { + + // Find the peer socket. + socket_base_t *peer = find_endpoint (addr_args.c_str ()); + if (!peer) + return -1; + + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; + + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new pipe_t (this, peer, options.hwm, options.lwm); + zmq_assert (in_pipe); + } + + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new pipe_t (peer, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + } + + // Attach the pipes to this socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the peer socket. Note that peer's seqnum + // was incremented in find_endpoint function. The callee is notified + // about the fact via the last parameter. + send_bind (peer, out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL, true); + + return 0; + } + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + shutting_down = true; + + // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); // Pointer to the dispatcher must be retrieved before the socket is // deallocated. Afterwards it is not available. dispatcher_t *dispatcher = get_dispatcher (); - shutting_down = true; + // Unregister all inproc endpoints associated with this socket. + // From this point we are sure that inc_seqnum won't be called again + // on this object. + dispatcher->unregister_endpoints (this); + + // Wait till all undelivered commands are delivered. This should happen + // very quickly. There's no way to wait here for extensive period of time. + while (processed_seqnum != sent_seqnum.get ()) + app_thread->process_commands (true, false); while (true) { @@ -364,6 +415,12 @@ int zmq::socket_base_t::close () return 0; } +void zmq::socket_base_t::inc_seqnum () +{ + // NB: This function may be called from a different thread! + sent_seqnum.add (1); +} + zmq::app_thread_t *zmq::socket_base_t::get_thread () { return app_thread; @@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { + // In case of inproc transport, the seqnum should catch up here. + // For other transports the seqnum modification can be optimised out + // because final handshaking between the socket and the session ensures + // that no 'bind' command will be left unprocessed. + if (adjust_seqnum_) + processed_seqnum++; + attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 49ff5a5..dd7b526 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -33,6 +33,7 @@ #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -54,6 +55,11 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); + // This function is used by the polling mechanism to determine // whether the socket belongs to the application thread the poll // is called from. @@ -108,8 +114,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + bool adjust_seqnum_); void process_term_req (class owned_t *object_); void process_term_ack (); @@ -132,6 +138,12 @@ namespace zmq // started. bool shutting_down; + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is diff --git a/src/sub.hpp b/src/sub.hpp index fb881dc..8ad8a18 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_SUB_INCLUDED__ -#define __ZMQ_SUB_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include <set> #include <string> diff --git a/src/upstream.cpp b/src/upstream.cpp new file mode 100644 index 0000000..da202f8 --- /dev/null +++ b/src/upstream.cpp @@ -0,0 +1,143 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "upstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::upstream_t::upstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + active (0), + current (0) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::upstream_t::~upstream_t () +{ +} + +void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && !outpipe_); + + pipes.push_back (inpipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + zmq_assert (pipe_); + pipes_t::size_type index = pipes.index (pipe_); + if (index < active) + active--; + pipes.erase (index); +} + +void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::upstream_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::upstream_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xflush () +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::upstream_t::xhas_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::upstream_t::xhas_out () +{ + return false; +} + diff --git a/src/upstream.hpp b/src/upstream.hpp new file mode 100644 index 0000000..0e2f5ad --- /dev/null +++ b/src/upstream.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class upstream_t : public socket_base_t + { + public: + + upstream_t (class app_thread_t *parent_); + ~upstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Inbound pipes. + typedef yarray_t <class reader_t> pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + upstream_t (const upstream_t&); + void operator = (const upstream_t&); + + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 7952b61..9b66be8 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_) void *zmq_init (int app_threads_, int io_threads_, int flags_) { - // There should be at least a single thread managed by the dispatcher. - if (app_threads_ <= 0 || io_threads_ <= 0 || + // There should be at least a single application thread managed + // by the dispatcher. There's no need for I/O threads if 0MQ is used + // only for inproc messaging + if (app_threads_ < 1 || io_threads_ < 0 || app_threads_ > 63 || io_threads_ > 63) { errno = EINVAL; return NULL; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 53811a1..8040f21 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () else { // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, *tmpbuf); errno_assert (rc == 0); @@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () size_t size = (size_t) get_uint64 (tmpbuf); // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, size); errno_assert (rc == 0); @@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () bool zmq::zmq_decoder_t::message_ready () { // Message is completely read. Push it further and start reading - // new message. + // new message. (in_progress is a 0-byte message after this point.) if (!destination || !destination->write (&in_progress)) return false; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 44b919b..180bda7 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::message_ready () { + // Destroy content of the old message. + zmq_msg_close(&in_progress); + // Read new message from the dispatcher. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!source || !source->read (&in_progress)) + if (!source || !source->read (&in_progress)) { + zmq_msg_init (&in_progress); return false; + } size_t size = zmq_msg_size (&in_progress); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index eec41c7..0d9488d 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) has_peer_identity = true; peer_identity.assign ((const char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); - return true; } |