summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-11-30 16:45:36 +0100
committermalosek <malosek@fastmq.com>2009-11-30 16:45:36 +0100
commitc637bf292d0dc97be5c94c5c96a033c2d665576c (patch)
treef6e82c3003ac1e4a646f588a7423d60c0e7dcc23 /src
parent9ccf2b42cf932b4c29ea20cc9c6e3d5d8e7a62b4 (diff)
parentfa1641afc593be5926e558381861112b584e861a (diff)
Merge branch 'master' of git@github.com:sustrik/zeromq2
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp14
-rw-r--r--src/command.hpp4
-rw-r--r--src/devpoll.cpp3
-rw-r--r--src/dispatcher.cpp56
-rw-r--r--src/dispatcher.hpp12
-rw-r--r--src/downstream.cpp131
-rw-r--r--src/downstream.hpp64
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/object.cpp29
-rw-r--r--src/object.hpp14
-rw-r--r--src/p2p.hpp4
-rw-r--r--src/pipe.cpp6
-rw-r--r--src/pub.hpp4
-rw-r--r--src/rep.cpp11
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.hpp4
-rw-r--r--src/session.cpp10
-rw-r--r--src/simple_semaphore.hpp60
-rw-r--r--src/socket_base.cpp72
-rw-r--r--src/socket_base.hpp16
-rw-r--r--src/sub.hpp4
-rw-r--r--src/upstream.cpp143
-rw-r--r--src/upstream.hpp69
-rw-r--r--src/zmq.cpp6
-rw-r--r--src/zmq_decoder.cpp8
-rw-r--r--src/zmq_encoder.cpp7
-rw-r--r--src/zmq_listener_init.cpp1
28 files changed, 710 insertions, 53 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 91fb555..3d038b7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
+ downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
+ downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbda335..a671822 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
-#include "p2p.hpp"
+#include "upstream.hpp"
+#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_P2P:
+ s = new p2p_t (this);
+ break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
- case ZMQ_P2P:
- s = new p2p_t (this);
+ case ZMQ_UPSTREAM:
+ s = new upstream_t (this);
+ break;
+ case ZMQ_DOWNSTREAM:
+ s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/command.hpp b/src/command.hpp
index 9a2e5d5..3099852 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -69,10 +69,12 @@ namespace zmq
} attach;
// Sent from session to socket to establish pipe(s) between them.
+ // If adjust_seqnum is true, caller have used inc_seqnum beforehand
+ // and thus the callee should take care of catching up.
struct {
- class owned_t *session;
class reader_t *in_pipe;
class writer_t *out_pipe;
+ bool adjust_seqnum;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index f28d55e..0ee772b 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -37,7 +37,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::devpoll_t::devpoll_t ()
+zmq::devpoll_t::devpoll_t () :
+ stopping (false)
{
// Get limit on open files
struct rlimit rl;
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1f6b4f0..1e41ee8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
+#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
+
+int zmq::dispatcher_t::register_endpoint (const char *addr_,
+ socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
+ if (!inserted) {
+ errno = EADDRINUSE;
+ endpoints_sync.unlock ();
+ return -1;
+ }
+
+ endpoints_sync.unlock ();
+ return 0;
+}
+
+void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.begin ();
+ while (it != endpoints.end ()) {
+ if (it->second == socket_) {
+ endpoints_t::iterator to_erase = it;
+ it++;
+ endpoints.erase (to_erase);
+ continue;
+ }
+ it++;
+ }
+
+ endpoints_sync.unlock ();
+}
+
+zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.find (addr_);
+ if (it == endpoints.end ()) {
+ endpoints_sync.unlock ();
+ errno = ECONNREFUSED;
+ return NULL;
+ }
+ socket_base_t *endpoint = it->second;
+
+ // Increment the command sequence number of the peer so that it won't
+ // get deallocated until "bind" command is issued by the caller.
+ endpoint->inc_seqnum ();
+
+ endpoints_sync.unlock ();
+ return endpoint;
+}
+
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 23b6a33..8364d4d 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -97,6 +97,11 @@ namespace zmq
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
+ // Management of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
private:
~dispatcher_t ();
@@ -149,6 +154,13 @@ namespace zmq
// and 'terminated' flag).
mutex_t term_sync;
+ // List of inproc endpoints within this context.
+ typedef std::map <std::string, class socket_base_t*> endpoints_t;
+ endpoints_t endpoints;
+
+ // Synchronisation of access to the list of inproc endpoints.
+ mutex_t endpoints_sync;
+
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
diff --git a/src/downstream.cpp b/src/downstream.cpp
new file mode 100644
index 0000000..4f994e6
--- /dev/null
+++ b/src/downstream.cpp
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "downstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ current (0)
+{
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::downstream_t::~downstream_t ()
+{
+}
+
+void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_ && outpipe_);
+ pipes.push_back (outpipe_);
+}
+
+void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ pipes.erase (pipes.index (pipe_));
+}
+
+void zmq::downstream_t::xkill (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xrevive (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special option for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= pipes.size ())
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::downstream_t::xflush ()
+{
+ // TODO: Maybe there's a point in flushing messages downstream.
+ // It may be useful in the case where number of messages in a single
+ // transaction is much greater than the number of attached pipes.
+ errno = ENOTSUP;
+ return -1;
+
+}
+
+int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::downstream_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::downstream_t::xhas_out ()
+{
+ // TODO: Modify this code once pipe limits are in place.
+ return true;
+}
+
+
diff --git a/src/downstream.hpp b/src/downstream.hpp
new file mode 100644
index 0000000..c6a7ed8
--- /dev/null
+++ b/src/downstream.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class downstream_t : public socket_base_t
+ {
+ public:
+
+ downstream_t (class app_thread_t *parent_);
+ ~downstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ downstream_t (const downstream_t&);
+ void operator = (const downstream_t&);
+ };
+
+}
+
+#endif
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index f32fa36..69ad0c8 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -33,7 +33,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::kqueue_t::kqueue_t ()
+zmq::kqueue_t::kqueue_t () :
+ stopping (false)
{
// Create event queue
kqueue_fd = kqueue ();
diff --git a/src/object.cpp b/src/object.cpp
index 1433b7b..b5d5eee 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
- process_bind (cmd_.args.bind.session,
- cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ cmd_.args.bind.adjust_seqnum);
return;
case command_t::pipe_term:
@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
dispatcher->unregister_pipe (pipe_);
}
+int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
+{
+ return dispatcher->register_endpoint (addr_, socket_);
+}
+
+void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
+{
+ return dispatcher->unregister_endpoints (socket_);
+}
+
+zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
+{
+ return dispatcher->find_endpoint (addr_);
+}
+
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
@@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
send_command (cmd);
}
-void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::send_bind (object_t *destination_,
+ reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
- cmd.args.bind.session = session_;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ cmd.args.bind.adjust_seqnum = adjust_seqnum_;
send_command (cmd);
}
@@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
zmq_assert (false);
}
-void zmq::object_t::process_bind (owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ bool adjust_seqnum_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 1954071..4fd0a8e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -49,6 +49,12 @@ namespace zmq
protected:
+ // Using following function, socket is able to access global
+ // repository of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
// Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details.
int thread_slot_count ();
@@ -62,8 +68,8 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_);
- void send_bind (object_t *destination_, class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void send_bind (object_t *destination_, class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -78,8 +84,8 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_);
- virtual void process_bind (class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ virtual void process_bind (class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1fd7e34..32d7755 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_P2P_INCLUDED__
-#define __ZMQ_P2P_INCLUDED__
+#ifndef __ZMQ_P2P_HPP_INCLUDED__
+#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
diff --git a/src/pipe.cpp b/src/pipe.cpp
index e444520..0e15dce 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -81,7 +81,11 @@ void zmq::reader_t::term ()
void zmq::reader_t::process_revive ()
{
- endpoint->revive (this);
+ // Beacuse of command throttling mechanism, incoming termination request
+ // may not have been processed before subsequent send.
+ // In that case endpoint is NULL.
+ if (endpoint)
+ endpoint->revive (this);
}
void zmq::reader_t::process_pipe_term_ack ()
diff --git a/src/pub.hpp b/src/pub.hpp
index b3e868d..9dbcb4a 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUB_INCLUDED__
-#define __ZMQ_PUB_INCLUDED__
+#ifndef __ZMQ_PUB_HPP_INCLUDED__
+#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/rep.cpp b/src/rep.cpp
index e8a9e39..f06f4ab 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
- if (in_pipes.index (pipe_) < active)
+ if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
@@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
if (fetched) {
reply_pipe = out_pipes [current];
waiting_for_reply = true;
- return 0;
}
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
}
// No message is available. Initialise the output parameter
diff --git a/src/rep.hpp b/src/rep.hpp
index 3e87dc1..0b327aa 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REP_INCLUDED__
-#define __ZMQ_REP_INCLUDED__
+#ifndef __ZMQ_REP_HPP_INCLUDED__
+#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/req.hpp b/src/req.hpp
index 86554b5..756cc42 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REQ_INCLUDED__
-#define __ZMQ_REQ_INCLUDED__
+#ifndef __ZMQ_REQ_HPP_INCLUDED__
+#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/session.cpp b/src/session.cpp
index eb0a963..87b47b0 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- // The communication is unidirectional.
- // We don't expect any message to arrive.
- zmq_assert (out_pipe);
-
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
return true;
@@ -155,8 +151,10 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this);
}
- send_bind (owner, this, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
+ // Note that initial call to inc_seqnum was optimised out. Last
+ // parameter conveys the fact to the callee.
+ send_bind (owner, outbound ? &outbound->reader : NULL,
+ inbound ? &inbound->writer : NULL, false);
}
owned_t::process_plug ();
diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp
index 209ccb4..3342281 100644
--- a/src/simple_semaphore.hpp
+++ b/src/simple_semaphore.hpp
@@ -23,7 +23,11 @@
#include "platform.hpp"
#include "err.hpp"
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <linux/futex.h>
+#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
#include <pthread.h>
#elif defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@@ -33,13 +37,63 @@
namespace zmq
{
-
// Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was
// released.
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+
+ // In theory, using private futexes should be more efficient on Linux
+ // platform than using mutexes. However, in uncontended cases of TCP
+ // transport on loopback interface we haven't seen any latency improvement.
+ // The code is commented out waiting for more thorough testing.
+
+ class simple_semaphore_t
+ {
+ public:
+
+ // Initialise the semaphore.
+ inline simple_semaphore_t () :
+ dummy (0)
+ {
+ }
+
+ // Destroy the semaphore.
+ inline ~simple_semaphore_t ()
+ {
+ }
+
+ // Wait for the semaphore.
+ inline void wait ()
+ {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
+ (int) 0, NULL, NULL, (int) 0);
+ zmq_assert (rc == 0);
+ }
+
+ // Post the semaphore.
+ inline void post ()
+ {
+ while (true) {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
+ (int) 1, NULL, NULL, (int) 0);
+ zmq_assert (rc != -1 && rc <= 1);
+ if (rc == 1)
+ break;
+ }
+ }
+
+ private:
+
+ int dummy;
+
+ // Disable copying of the object.
+ simple_semaphore_t (const simple_semaphore_t&);
+ void operator = (const simple_semaphore_t&);
+ };
+
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
// On platforms that allow for double locking of a mutex from the same
// thread, simple semaphore is implemented using mutex, as it is more
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6583608..a614759 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
pending_term_acks (0),
ticks (0),
app_thread (parent_),
- shutting_down (false)
+ shutting_down (false),
+ sent_seqnum (0),
+ processed_seqnum (0)
{
}
@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc")
+ return register_endpoint (addr_args.c_str (), this);
+
if (addr_type == "tcp") {
zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options);
@@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc") {
+
+ // Find the peer socket.
+ socket_base_t *peer = find_endpoint (addr_args.c_str ());
+ if (!peer)
+ return -1;
+
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
+
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new pipe_t (this, peer, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+ }
+
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new pipe_t (peer, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
+
+ // Attach the pipes to this socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the peer socket. Note that peer's seqnum
+ // was incremented in find_endpoint function. The callee is notified
+ // about the fact via the last parameter.
+ send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL, true);
+
+ return 0;
+ }
+
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
@@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ shutting_down = true;
+
+ // Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
- shutting_down = true;
+ // Unregister all inproc endpoints associated with this socket.
+ // From this point we are sure that inc_seqnum won't be called again
+ // on this object.
+ dispatcher->unregister_endpoints (this);
+
+ // Wait till all undelivered commands are delivered. This should happen
+ // very quickly. There's no way to wait here for extensive period of time.
+ while (processed_seqnum != sent_seqnum.get ())
+ app_thread->process_commands (true, false);
while (true) {
@@ -364,6 +415,12 @@ int zmq::socket_base_t::close ()
return 0;
}
+void zmq::socket_base_t::inc_seqnum ()
+{
+ /