summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp14
-rw-r--r--src/command.hpp4
-rw-r--r--src/devpoll.cpp3
-rw-r--r--src/dispatcher.cpp56
-rw-r--r--src/dispatcher.hpp12
-rw-r--r--src/downstream.cpp131
-rw-r--r--src/downstream.hpp64
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/object.cpp29
-rw-r--r--src/object.hpp14
-rw-r--r--src/p2p.hpp4
-rw-r--r--src/pipe.cpp6
-rw-r--r--src/pub.hpp4
-rw-r--r--src/rep.cpp11
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.hpp4
-rw-r--r--src/session.cpp10
-rw-r--r--src/simple_semaphore.hpp60
-rw-r--r--src/socket_base.cpp72
-rw-r--r--src/socket_base.hpp16
-rw-r--r--src/sub.hpp4
-rw-r--r--src/upstream.cpp143
-rw-r--r--src/upstream.hpp69
-rw-r--r--src/zmq.cpp6
-rw-r--r--src/zmq_decoder.cpp8
-rw-r--r--src/zmq_encoder.cpp7
-rw-r--r--src/zmq_listener_init.cpp1
28 files changed, 710 insertions, 53 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 91fb555..3d038b7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
+ downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
+ downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbda335..a671822 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
-#include "p2p.hpp"
+#include "upstream.hpp"
+#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_P2P:
+ s = new p2p_t (this);
+ break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
- case ZMQ_P2P:
- s = new p2p_t (this);
+ case ZMQ_UPSTREAM:
+ s = new upstream_t (this);
+ break;
+ case ZMQ_DOWNSTREAM:
+ s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/command.hpp b/src/command.hpp
index 9a2e5d5..3099852 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -69,10 +69,12 @@ namespace zmq
} attach;
// Sent from session to socket to establish pipe(s) between them.
+ // If adjust_seqnum is true, caller have used inc_seqnum beforehand
+ // and thus the callee should take care of catching up.
struct {
- class owned_t *session;
class reader_t *in_pipe;
class writer_t *out_pipe;
+ bool adjust_seqnum;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index f28d55e..0ee772b 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -37,7 +37,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::devpoll_t::devpoll_t ()
+zmq::devpoll_t::devpoll_t () :
+ stopping (false)
{
// Get limit on open files
struct rlimit rl;
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1f6b4f0..1e41ee8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
+#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
+
+int zmq::dispatcher_t::register_endpoint (const char *addr_,
+ socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
+ if (!inserted) {
+ errno = EADDRINUSE;
+ endpoints_sync.unlock ();
+ return -1;
+ }
+
+ endpoints_sync.unlock ();
+ return 0;
+}
+
+void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.begin ();
+ while (it != endpoints.end ()) {
+ if (it->second == socket_) {
+ endpoints_t::iterator to_erase = it;
+ it++;
+ endpoints.erase (to_erase);
+ continue;
+ }
+ it++;
+ }
+
+ endpoints_sync.unlock ();
+}
+
+zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.find (addr_);
+ if (it == endpoints.end ()) {
+ endpoints_sync.unlock ();
+ errno = ECONNREFUSED;
+ return NULL;
+ }
+ socket_base_t *endpoint = it->second;
+
+ // Increment the command sequence number of the peer so that it won't
+ // get deallocated until "bind" command is issued by the caller.
+ endpoint->inc_seqnum ();
+
+ endpoints_sync.unlock ();
+ return endpoint;
+}
+
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 23b6a33..8364d4d 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -97,6 +97,11 @@ namespace zmq
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
+ // Management of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
private:
~dispatcher_t ();
@@ -149,6 +154,13 @@ namespace zmq
// and 'terminated' flag).
mutex_t term_sync;
+ // List of inproc endpoints within this context.
+ typedef std::map <std::string, class socket_base_t*> endpoints_t;
+ endpoints_t endpoints;
+
+ // Synchronisation of access to the list of inproc endpoints.
+ mutex_t endpoints_sync;
+
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
diff --git a/src/downstream.cpp b/src/downstream.cpp
new file mode 100644
index 0000000..4f994e6
--- /dev/null
+++ b/src/downstream.cpp
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "downstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ current (0)
+{
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::downstream_t::~downstream_t ()
+{
+}
+
+void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_ && outpipe_);
+ pipes.push_back (outpipe_);
+}
+
+void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ pipes.erase (pipes.index (pipe_));
+}
+
+void zmq::downstream_t::xkill (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xrevive (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special option for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= pipes.size ())
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::downstream_t::xflush ()
+{
+ // TODO: Maybe there's a point in flushing messages downstream.
+ // It may be useful in the case where number of messages in a single
+ // transaction is much greater than the number of attached pipes.
+ errno = ENOTSUP;
+ return -1;
+
+}
+
+int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::downstream_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::downstream_t::xhas_out ()
+{
+ // TODO: Modify this code once pipe limits are in place.
+ return true;
+}
+
+
diff --git a/src/downstream.hpp b/src/downstream.hpp
new file mode 100644
index 0000000..c6a7ed8
--- /dev/null
+++ b/src/downstream.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class downstream_t : public socket_base_t
+ {
+ public:
+
+ downstream_t (class app_thread_t *parent_);
+ ~downstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ downstream_t (const downstream_t&);
+ void operator = (const downstream_t&);
+ };
+
+}
+
+#endif
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index f32fa36..69ad0c8 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -33,7 +33,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::kqueue_t::kqueue_t ()
+zmq::kqueue_t::kqueue_t () :
+ stopping (false)
{
// Create event queue
kqueue_fd = kqueue ();
diff --git a/src/object.cpp b/src/object.cpp
index 1433b7b..b5d5eee 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
- process_bind (cmd_.args.bind.session,
- cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ cmd_.args.bind.adjust_seqnum);
return;
case command_t::pipe_term:
@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
dispatcher->unregister_pipe (pipe_);
}
+int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
+{
+ return dispatcher->register_endpoint (addr_, socket_);
+}
+
+void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
+{
+ return dispatcher->unregister_endpoints (socket_);
+}
+
+zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
+{
+ return dispatcher->find_endpoint (addr_);
+}
+
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
@@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
send_command (cmd);
}
-void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::send_bind (object_t *destination_,
+ reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
- cmd.args.bind.session = session_;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ cmd.args.bind.adjust_seqnum = adjust_seqnum_;
send_command (cmd);
}
@@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
zmq_assert (false);
}
-void zmq::object_t::process_bind (owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ bool adjust_seqnum_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 1954071..4fd0a8e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -49,6 +49,12 @@ namespace zmq
protected:
+ // Using following function, socket is able to access global
+ // repository of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
// Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details.
int thread_slot_count ();
@@ -62,8 +68,8 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_);
- void send_bind (object_t *destination_, class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void send_bind (object_t *destination_, class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -78,8 +84,8 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_);
- virtual void process_bind (class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ virtual void process_bind (class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1fd7e34..32d7755 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_P2P_INCLUDED__
-#define __ZMQ_P2P_INCLUDED__
+#ifndef __ZMQ_P2P_HPP_INCLUDED__
+#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
diff --git a/src/pipe.cpp b/src/pipe.cpp
index e444520..0e15dce 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -81,7 +81,11 @@ void zmq::reader_t::term ()
void zmq::reader_t::process_revive ()
{
- endpoint->revive (this);
+ // Beacuse of command throttling mechanism, incoming termination request
+ // may not have been processed before subsequent send.
+ // In that case endpoint is NULL.
+ if (endpoint)
+ endpoint->revive (this);
}
void zmq::reader_t::process_pipe_term_ack ()
diff --git a/src/pub.hpp b/src/pub.hpp
index b3e868d..9dbcb4a 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUB_INCLUDED__
-#define __ZMQ_PUB_INCLUDED__
+#ifndef __ZMQ_PUB_HPP_INCLUDED__
+#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/rep.cpp b/src/rep.cpp
index e8a9e39..f06f4ab 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
- if (in_pipes.index (pipe_) < active)
+ if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
@@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
if (fetched) {
reply_pipe = out_pipes [current];
waiting_for_reply = true;
- return 0;
}
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
}
// No message is available. Initialise the output parameter
diff --git a/src/rep.hpp b/src/rep.hpp
index 3e87dc1..0b327aa 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REP_INCLUDED__
-#define __ZMQ_REP_INCLUDED__
+#ifndef __ZMQ_REP_HPP_INCLUDED__
+#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/req.hpp b/src/req.hpp
index 86554b5..756cc42 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REQ_INCLUDED__
-#define __ZMQ_REQ_INCLUDED__
+#ifndef __ZMQ_REQ_HPP_INCLUDED__
+#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/session.cpp b/src/session.cpp
index eb0a963..87b47b0 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- // The communication is unidirectional.
- // We don't expect any message to arrive.
- zmq_assert (out_pipe);
-
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
return true;
@@ -155,8 +151,10 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this);
}
- send_bind (owner, this, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
+ // Note that initial call to inc_seqnum was optimised out. Last
+ // parameter conveys the fact to the callee.
+ send_bind (owner, outbound ? &outbound->reader : NULL,
+ inbound ? &inbound->writer : NULL, false);
}
owned_t::process_plug ();
diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp
index 209ccb4..3342281 100644
--- a/src/simple_semaphore.hpp
+++ b/src/simple_semaphore.hpp
@@ -23,7 +23,11 @@
#include "platform.hpp"
#include "err.hpp"
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <linux/futex.h>
+#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
#include <pthread.h>
#elif defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@@ -33,13 +37,63 @@
namespace zmq
{
-
// Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was
// released.
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+
+ // In theory, using private futexes should be more efficient on Linux
+ // platform than using mutexes. However, in uncontended cases of TCP
+ // transport on loopback interface we haven't seen any latency improvement.
+ // The code is commented out waiting for more thorough testing.
+
+ class simple_semaphore_t
+ {
+ public:
+
+ // Initialise the semaphore.
+ inline simple_semaphore_t () :
+ dummy (0)
+ {
+ }
+
+ // Destroy the semaphore.
+ inline ~simple_semaphore_t ()
+ {
+ }
+
+ // Wait for the semaphore.
+ inline void wait ()
+ {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
+ (int) 0, NULL, NULL, (int) 0);
+ zmq_assert (rc == 0);
+ }
+
+ // Post the semaphore.
+ inline void post ()
+ {
+ while (true) {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
+ (int) 1, NULL, NULL, (int) 0);
+ zmq_assert (rc != -1 && rc <= 1);
+ if (rc == 1)
+ break;
+ }
+ }
+
+ private:
+
+ int dummy;
+
+ // Disable copying of the object.
+ simple_semaphore_t (const simple_semaphore_t&);
+ void operator = (const simple_semaphore_t&);
+ };
+
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
// On platforms that allow for double locking of a mutex from the same
// thread, simple semaphore is implemented using mutex, as it is more
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6583608..a614759 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
pending_term_acks (0),
ticks (0),
app_thread (parent_),
- shutting_down (false)
+ shutting_down (false),
+ sent_seqnum (0),
+ processed_seqnum (0)
{
}
@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc")
+ return register_endpoint (addr_args.c_str (), this);
+
if (addr_type == "tcp") {
zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options);
@@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc") {
+
+ // Find the peer socket.
+ socket_base_t *peer = find_endpoint (addr_args.c_str ());
+ if (!peer)
+ return -1;
+
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
+
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new pipe_t (this, peer, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+ }
+
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new pipe_t (peer, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
+
+ // Attach the pipes to this socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the peer socket. Note that peer's seqnum
+ // was incremented in find_endpoint function. The callee is notified
+ // about the fact via the last parameter.
+ send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL, true);
+
+ return 0;
+ }
+
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
@@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ shutting_down = true;
+
+ // Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
- shutting_down = true;
+ // Unregister all inproc endpoints associated with this socket.
+ // From this point we are sure that inc_seqnum won't be called again
+ // on this object.
+ dispatcher->unregister_endpoints (this);
+
+ // Wait till all undelivered commands are delivered. This should happen
+ // very quickly. There's no way to wait here for extensive period of time.
+ while (processed_seqnum != sent_seqnum.get ())
+ app_thread->process_commands (true, false);
while (true) {
@@ -364,6 +415,12 @@ int zmq::socket_base_t::close ()
return 0;
}
+void zmq::socket_base_t::inc_seqnum ()
+{
+ // NB: This function may be called from a different thread!
+ sent_seqnum.add (1);
+}
+
zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
@@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ bool adjust_seqnum_)
{
+ // In case of inproc transport, the seqnum should catch up here.
+ // For other transports the seqnum modification can be optimised out
+ // because final handshaking between the socket and the session ensures
+ // that no 'bind' command will be left unprocessed.
+ if (adjust_seqnum_)
+ processed_seqnum++;
+
attach_pipes (in_pipe_, out_pipe_);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 49ff5a5..dd7b526 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -33,6 +33,7 @@
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "atomic_counter.hpp"
namespace zmq
{
@@ -54,6 +55,11 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();
+ // When another owned object wants to send command to this object
+ // it calls this function to let it know it should not shut down
+ // before the command is delivered.
+ void inc_seqnum ();
+
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
@@ -108,8 +114,8 @@ namespace zmq
// Handlers for incoming commands.
void process_own (class owned_t *object_);
- void process_bind (class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
+ bool adjust_seqnum_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
@@ -132,6 +138,12 @@ namespace zmq
// started.
bool shutting_down;
+ // Sequence number of the last command sent to this object.
+ atomic_counter_t sent_seqnum;
+
+ // Sequence number of the last command processed by this object.
+ uint64_t processed_seqnum;
+
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
diff --git a/src/sub.hpp b/src/sub.hpp
index fb881dc..8ad8a18 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SUB_INCLUDED__
-#define __ZMQ_SUB_INCLUDED__
+#ifndef __ZMQ_SUB_HPP_INCLUDED__
+#define __ZMQ_SUB_HPP_INCLUDED__
#include <set>
#include <string>
diff --git a/src/upstream.cpp b/src/upstream.cpp
new file mode 100644
index 0000000..da202f8
--- /dev/null
+++ b/src/upstream.cpp
@@ -0,0 +1,143 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "upstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ active (0),
+ current (0)
+{
+ options.requires_in = true;
+ options.requires_out = false;
+}
+
+zmq::upstream_t::~upstream_t ()
+{
+}
+
+void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && !outpipe_);
+
+ pipes.push_back (inpipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ zmq_assert (pipe_);
+ pipes_t::size_type index = pipes.index (pipe_);
+ if (index < active)
+ active--;
+ pipes.erase (index);
+}
+
+void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ // There are no outpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::upstream_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::upstream_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special options for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xflush ()
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+bool zmq::upstream_t::xhas_in ()
+{
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = active; count != 0; count--) {
+ if (pipes [current]->check_read ())
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::upstream_t::xhas_out ()
+{
+ return false;
+}
+
diff --git a/src/upstream.hpp b/src/upstream.hpp
new file mode 100644
index 0000000..0e2f5ad
--- /dev/null
+++ b/src/upstream.hpp
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
+#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class upstream_t : public socket_base_t
+ {
+ public:
+
+ upstream_t (class app_thread_t *parent_);
+ ~upstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // Inbound pipes.
+ typedef yarray_t <class reader_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Index of the next bound pipe to read a message from.
+ pipes_t::size_type current;
+
+ upstream_t (const upstream_t&);
+ void operator = (const upstream_t&);
+
+ };
+
+}
+
+#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 7952b61..9b66be8 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void *zmq_init (int app_threads_, int io_threads_, int flags_)
{
- // There should be at least a single thread managed by the dispatcher.
- if (app_threads_ <= 0 || io_threads_ <= 0 ||
+ // There should be at least a single application thread managed
+ // by the dispatcher. There's no need for I/O threads if 0MQ is used
+ // only for inproc messaging
+ if (app_threads_ < 1 || io_threads_ < 0 ||
app_threads_ > 63 || io_threads_ > 63) {
errno = EINVAL;
return NULL;
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 53811a1..8040f21 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else {
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0);
@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0);
@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
bool zmq::zmq_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
- // new message.
+ // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress))
return false;
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 44b919b..180bda7 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready ()
{
+ // Destroy content of the old message.
+ zmq_msg_close(&in_progress);
+
// Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!source || !source->read (&in_progress))
+ if (!source || !source->read (&in_progress)) {
+ zmq_msg_init (&in_progress);
return false;
+ }
size_t size = zmq_msg_size (&in_progress);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index eec41c7..0d9488d 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
-
return true;
}