summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h70
-rw-r--r--src/Makefile.am15
-rw-r--r--src/app_thread.cpp195
-rw-r--r--src/app_thread.hpp88
-rw-r--r--src/config.hpp5
-rw-r--r--src/ctx.cpp249
-rw-r--r--src/ctx.hpp94
-rw-r--r--src/fq.cpp29
-rw-r--r--src/fq.hpp17
-rw-r--r--src/i_endpoint.hpp43
-rw-r--r--src/io_thread.cpp5
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/lb.cpp21
-rw-r--r--src/lb.hpp13
-rw-r--r--src/object.cpp29
-rw-r--r--src/object.hpp14
-rw-r--r--src/owned.cpp10
-rw-r--r--src/owned.hpp9
-rw-r--r--src/pair.cpp72
-rw-r--r--src/pair.hpp26
-rw-r--r--src/pipe.cpp250
-rw-r--r--src/pipe.hpp111
-rw-r--r--src/pub.cpp62
-rw-r--r--src/pub.hpp21
-rw-r--r--src/pull.cpp48
-rw-r--r--src/pull.hpp16
-rw-r--r--src/push.cpp50
-rw-r--r--src/push.hpp16
-rw-r--r--src/rep.cpp81
-rw-r--r--src/rep.hpp30
-rw-r--r--src/req.cpp65
-rw-r--r--src/req.hpp32
-rw-r--r--src/semaphore.hpp135
-rw-r--r--src/session.cpp62
-rw-r--r--src/session.hpp26
-rw-r--r--src/socket_base.cpp495
-rw-r--r--src/socket_base.hpp100
-rw-r--r--src/sub.cpp40
-rw-r--r--src/sub.hpp11
-rw-r--r--src/thread.cpp20
-rw-r--r--src/thread.hpp9
-rw-r--r--src/xrep.cpp65
-rw-r--r--src/xrep.hpp26
-rw-r--r--src/xreq.cpp37
-rw-r--r--src/xreq.hpp10
-rw-r--r--src/zmq.cpp7
-rw-r--r--src/zmq_encoder.cpp2
47 files changed, 1427 insertions, 1406 deletions
diff --git a/include/zmq.h b/include/zmq.h
index bce1215..075ff3c 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -82,7 +82,6 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#endif
/* Native 0MQ error codes. */
-#define EMTHREAD (ZMQ_HAUSNUMERO + 50)
#define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53)
@@ -152,36 +151,39 @@ ZMQ_EXPORT int zmq_term (void *context);
/******************************************************************************/
/* Socket types. */
-#define ZMQ_PAIR 0
-#define ZMQ_PUB 1
-#define ZMQ_SUB 2
-#define ZMQ_REQ 3
-#define ZMQ_REP 4
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
-#define ZMQ_PULL 7
-#define ZMQ_PUSH 8
-#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
-#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
+#define ZMQ_PAIR 0
+#define ZMQ_PUB 1
+#define ZMQ_SUB 2
+#define ZMQ_REQ 3
+#define ZMQ_REP 4
+#define ZMQ_XREQ 5
+#define ZMQ_XREP 6
+#define ZMQ_PULL 7
+#define ZMQ_PUSH 8
+
+/* Deprecated aliases, to be removed in release 3.x */
+#define ZMQ_UPSTREAM ZMQ_PULL
+#define ZMQ_DOWNSTREAM ZMQ_PUSH
/* Socket options. */
-#define ZMQ_HWM 1
-/* ZMQ_LWM 2 no longer supported */
-#define ZMQ_SWAP 3
-#define ZMQ_AFFINITY 4
-#define ZMQ_IDENTITY 5
-#define ZMQ_SUBSCRIBE 6
-#define ZMQ_UNSUBSCRIBE 7
-#define ZMQ_RATE 8
+#define ZMQ_HWM 1
+#define ZMQ_SWAP 3
+#define ZMQ_AFFINITY 4
+#define ZMQ_IDENTITY 5
+#define ZMQ_SUBSCRIBE 6
+#define ZMQ_UNSUBSCRIBE 7
+#define ZMQ_RATE 8
#define ZMQ_RECOVERY_IVL 9
-#define ZMQ_MCAST_LOOP 10
-#define ZMQ_SNDBUF 11
-#define ZMQ_RCVBUF 12
-#define ZMQ_RCVMORE 13
+#define ZMQ_MCAST_LOOP 10
+#define ZMQ_SNDBUF 11
+#define ZMQ_RCVBUF 12
+#define ZMQ_RCVMORE 13
+#define ZMQ_FD 14
+#define ZMQ_EVENTS 15
/* Send/recv options. */
-#define ZMQ_NOBLOCK 1
-#define ZMQ_SNDMORE 2
+#define ZMQ_NOBLOCK 1
+#define ZMQ_SNDMORE 2
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
@@ -198,9 +200,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
/* I/O multiplexing. */
/******************************************************************************/
-#define ZMQ_POLLIN 1
-#define ZMQ_POLLOUT 2
-#define ZMQ_POLLERR 4
+#define ZMQ_POLLIN 1
+#define ZMQ_POLLOUT 2
+#define ZMQ_POLLERR 4
typedef struct
{
@@ -217,17 +219,15 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
-/* Devices */
+/* Devices - Experimental. */
/******************************************************************************/
-#define ZMQ_QUEUE 1
-#define ZMQ_FORWARDER 2
-#define ZMQ_STREAMER 3
+#define ZMQ_STREAMER 1
+#define ZMQ_FORWARDER 2
+#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
-#undef ZMQ_EXPORT
-
#ifdef __cplusplus
}
#endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 19a80d0..937372f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -49,7 +49,7 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
-libzmq_la_SOURCES = app_thread.hpp \
+libzmq_la_SOURCES = \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
@@ -58,7 +58,6 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -69,7 +68,6 @@ libzmq_la_SOURCES = app_thread.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
- i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
kqueue.hpp \
@@ -91,10 +89,13 @@ libzmq_la_SOURCES = app_thread.hpp \
pair.hpp \
prefix_tree.hpp \
pub.hpp \
+ pull.hpp \
+ push.hpp \
queue.hpp \
rep.hpp \
req.hpp \
select.hpp \
+ semaphore.hpp \
session.hpp \
signaler.hpp \
socket_base.hpp \
@@ -105,7 +106,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -121,11 +121,9 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
- app_thread.cpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -139,13 +137,15 @@ libzmq_la_SOURCES = app_thread.hpp \
object.cpp \
options.cpp \
owned.cpp \
+ pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
- pair.cpp \
prefix_tree.cpp \
pipe.cpp \
poll.cpp \
+ pull.cpp \
+ push.cpp \
pub.cpp \
queue.cpp \
rep.cpp \
@@ -160,7 +160,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
deleted file mode 100644
index ac59464..0000000
--- a/src/app_thread.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-#include <algorithm>
-
-#include "../include/zmq.h"
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#if defined _MSC_VER
-#include <intrin.h>
-#endif
-#else
-#include <unistd.h>
-#endif
-
-#include "app_thread.hpp"
-#include "ctx.hpp"
-#include "err.hpp"
-#include "pipe.hpp"
-#include "config.hpp"
-#include "socket_base.hpp"
-#include "pair.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
-#include "xreq.hpp"
-#include "xrep.hpp"
-#include "pull.hpp"
-#include "push.hpp"
-
-// If the RDTSC is available we use it to prevent excessive
-// polling for commands. The nice thing here is that it will work on any
-// system with x86 architecture and gcc or MSVC compiler.
-#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
- (defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
-#define ZMQ_DELAY_COMMANDS
-#endif
-
-zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
- uint32_t thread_slot_) :
- object_t (ctx_, thread_slot_),
- last_processing_time (0),
- terminated (false)
-{
-}
-
-zmq::app_thread_t::~app_thread_t ()
-{
- zmq_assert (sockets.empty ());
-}
-
-void zmq::app_thread_t::stop ()
-{
- send_stop ();
-}
-
-zmq::signaler_t *zmq::app_thread_t::get_signaler ()
-{
- return &signaler;
-}
-
-bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
-{
- bool received;
- command_t cmd;
- if (block_) {
- received = signaler.recv (&cmd, true);
- zmq_assert (received);
- }
- else {
-
-#if defined ZMQ_DELAY_COMMANDS
- // Optimised version of command processing - it doesn't have to check
- // for incoming commands each time. It does so only if certain time
- // elapsed since last command processing. Command delay varies
- // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
- // etc. The optimisation makes sense only on platforms where getting
- // a timestamp is a very cheap operation (tens of nanoseconds).
- if (throttle_) {
-
- // Get timestamp counter.
-#if defined __GNUC__
- uint32_t low;
- uint32_t high;
- __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
- uint64_t current_time = (uint64_t) high << 32 | low;
-#elif defined _MSC_VER
- uint64_t current_time = __rdtsc ();
-#else
-#error
-#endif
-
- // Check whether certain time have elapsed since last command
- // processing.
- if (current_time - last_processing_time <= max_command_delay)
- return !terminated;
- last_processing_time = current_time;
- }
-#endif
-
- // Check whether there are any commands pending for this thread.
- received = signaler.recv (&cmd, false);
- }
-
- // Process all the commands available at the moment.
- while (received) {
- cmd.destination->process_command (cmd);
- received = signaler.recv (&cmd, false);
- }
-
- return !terminated;
-}
-
-zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
-{
- socket_base_t *s = NULL;
- switch (type_) {
- case ZMQ_PAIR:
- s = new (std::nothrow) pair_t (this);
- break;
- case ZMQ_PUB:
- s = new (std::nothrow) pub_t (this);
- break;
- case ZMQ_SUB:
- s = new (std::nothrow) sub_t (this);
- break;
- case ZMQ_REQ:
- s = new (std::nothrow) req_t (this);
- break;
- case ZMQ_REP:
- s = new (std::nothrow) rep_t (this);
- break;
- case ZMQ_XREQ:
- s = new (std::nothrow) xreq_t (this);
- break;
- case ZMQ_XREP:
- s = new (std::nothrow) xrep_t (this);
- break;
- case ZMQ_PULL:
- s = new (std::nothrow) pull_t (this);
- break;
- case ZMQ_PUSH:
- s = new (std::nothrow) push_t (this);
- break;
- default:
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
- errno = EINVAL;
- return NULL;
- }
- zmq_assert (s);
-
- sockets.push_back (s);
-
- return s;
-}
-
-void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
-{
- sockets.erase (socket_);
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
-}
-
-void zmq::app_thread_t::process_stop ()
-{
- terminated = true;
-}
-
-bool zmq::app_thread_t::is_terminated ()
-{
- return terminated;
-}
-
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
deleted file mode 100644
index f0deaab..0000000
--- a/src/app_thread.hpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
-#define __ZMQ_APP_THREAD_HPP_INCLUDED__
-
-#include <vector>
-
-#include "stdint.hpp"
-#include "object.hpp"
-#include "yarray.hpp"
-#include "signaler.hpp"
-
-namespace zmq
-{
-
- class app_thread_t : public object_t
- {
- public:
-
- app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
-
- ~app_thread_t ();
-
- // Interrupt blocking call if the app thread is stuck in one.
- // This function is is called from a different thread!
- void stop ();
-
- // Returns signaler associated with this application thread.
- signaler_t *get_signaler ();
-
- // Processes commands sent to this thread (if any). If 'block' is
- // set to true, returns only after at least one command was processed.
- // If throttle argument is true, commands are processed at most once
- // in a predefined time period. The function returns false is the
- // associated context was terminated, true otherwise.
- bool process_commands (bool block_, bool throttle_);
-
- // Create a socket of a specified type.
- class socket_base_t *create_socket (int type_);
-
- // Unregister the socket from the app_thread (called by socket itself).
- void remove_socket (class socket_base_t *socket_);
-
- // Returns true is the associated context was already terminated.
- bool is_terminated ();
-
- private:
-
- // Command handlers.
- void process_stop ();
-
- // All the sockets created from this application thread.
- typedef yarray_t <socket_base_t> sockets_t;
- sockets_t sockets;
-
- // App thread's signaler object.
- signaler_t signaler;
-
- // Timestamp of when commands were processed the last time.
- uint64_t last_processing_time;
-
- // If true, 'stop' command was already received.
- bool terminated;
-
- app_thread_t (const app_thread_t&);
- void operator = (const app_thread_t&);
- };
-
-}
-
-#endif
diff --git a/src/config.hpp b/src/config.hpp
index 2c0ac2d..83e612a 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -27,9 +27,8 @@ namespace zmq
enum
{
- // Maximal number of OS threads that can own 0MQ sockets
- // at the same time.
- max_app_threads = 512,
+ // Maximum number of sockets that can be opened at the same time.
+ max_sockets = 512,
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 397f692..91157a5 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -24,7 +24,6 @@
#include "ctx.hpp"
#include "socket_base.hpp"
-#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
@@ -32,11 +31,12 @@
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
+#else
+#include "unistd.h"
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
- sockets (0),
- terminated (false)
+ no_sockets_notify (false)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -50,44 +50,32 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
- signalers_count = max_app_threads + io_threads_;
- signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
- zmq_assert (signalers);
- memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
+ slot_count = max_sockets + io_threads_;
+ slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count);
+ zmq_assert (slots);
// Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
- signalers [i] = io_thread->get_signaler ();
+ slots [i] = io_thread->get_signaler ();
io_thread->start ();
}
-}
-
-int zmq::ctx_t::term ()
-{
- // First send stop command to application threads so that any
- // blocking calls are interrupted.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- app_threads [i].app_thread->stop ();
-
- // Then mark context as terminated.
- term_sync.lock ();
- zmq_assert (!terminated);
- terminated = true;
- bool destroy = (sockets == 0);
- term_sync.unlock ();
-
- // If there are no sockets open, destroy the context immediately.
- if (destroy)
- delete this;
- return 0;
+ // In the unused part of the slot array, create a list of empty slots.
+ for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
+ empty_slots.push_back (i);
+ slots [i] = NULL;
+ }
}
zmq::ctx_t::~ctx_t ()
{
+ // Check that there are no remaining open or zombie sockets.
+ zmq_assert (sockets.empty ());
+ zmq_assert (zombies.empty ());
+
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
@@ -97,18 +85,10 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
- // Close all application theads, sockets, io_objects etc.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- delete app_threads [i].app_thread;
-
- // Deallocate all the orphaned pipes.
- while (!pipes.empty ())
- delete *pipes.begin ();
-
- // Deallocate the array of pointers to signalers. No special work is
+ // Deallocate the array of slot. No special work is
// needed as signalers themselves were deallocated with their
- // corresponding (app_/io_) thread objects.
- free (signalers);
+ // corresponding io_thread/socket objects.
+ free (slots);
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
@@ -117,110 +97,113 @@ zmq::ctx_t::~ctx_t ()
#endif
}
-zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
+int zmq::ctx_t::term ()
{
- app_threads_sync.lock ();
-
- // Find whether the calling thread has app_thread_t object associated
- // already. At the same time find an unused app_thread_t so that it can
- // be used if there's no associated object for the calling thread.
- // Check whether thread ID is already assigned. If so, return it.
- app_threads_t::size_type unused = app_threads.size ();
- app_threads_t::size_type current;
- for (current = 0; current != app_threads.size (); current++) {
- if (app_threads [current].associated &&
- thread_t::equal (thread_t::id (), app_threads [current].tid))
- break;
- if (!app_threads [current].associated)
- unused = current;
+ // First send stop command to sockets so that any
+ // blocking calls are interrupted.
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+
+ // Find out whether there are any open sockets to care about.
+ // If so, sleep till they are closed. Note that we can use
+ // no_sockets_notify safely out of the critical section as once set
+ // its value is never changed again.
+ slot_sync.lock ();
+ if (!sockets.empty ())
+ no_sockets_notify = true;
+ slot_sync.unlock ();
+ if (no_sockets_notify)
+ no_sockets_sync.wait ();
+
+ // At this point there's only one application thread (this one) remaining.
+ // We don't even have to synchronise access to data.
+ zmq_assert (sockets.empty ());
+
+ // Get rid of remaining zombie sockets.
+ while (!zombies.empty ()) {
+ dezombify ();
+
+ // Sleep for 1ms not to end up busy-looping in the case the I/O threads
+ // are still busy sending data. We can possibly add a grand poll here
+ // (polling for fds associated with all the zombie sockets), but it's
+ // probably not worth of implementing it.
+#if defined ZMQ_HAVE_WINDOWS
+ Sleep (1);
+#else
+ usleep (1000);
+#endif
}
- // If no app_thread_t is associated with the calling thread,
- // associate it with one of the unused app_thread_t objects.
- if (current == app_threads.size ()) {
+ // Deallocate the resources.
+ delete this;
- // If all the existing app_threads are already used, create one more.
- if (unused == app_threads.size ()) {
-
- // If max_app_threads limit was reached, return error.
- if (app_threads.size () == max_app_threads) {
- app_threads_sync.unlock ();
- errno = EMTHREAD;
- return NULL;
- }
+ return 0;
+}
- // Create the new application thread proxy object.
- app_thread_info_t info;
- memset (&info, 0, sizeof (info));
- info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this,
- io_threads.size () + app_threads.size ());
- zmq_assert (info.app_thread);
- signalers [io_threads.size () + app_threads.size ()] =
- info.app_thread->get_signaler ();
- app_threads.push_back (info);
- }
+zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
+{
+ slot_sync.lock ();
- // Incidentally, this works both when there is an unused app_thread
- // and when a new one is created.
- current = unused;
+ // Free the slots, if possible.
+ dezombify ();
- // Associate the selected app_thread with the OS thread.
- app_threads [current].associated = true;
- app_threads [current].tid = thread_t::id ();
+ // If max_sockets limit was reached, return error.
+ if (empty_slots.empty ()) {
+ slot_sync.unlock ();
+ errno = EMFILE;
+ return NULL;
}
- app_thread_t *thread = app_threads [current].app_thread;
- app_threads_sync.unlock ();
+ // Choose a slot for the socket.
+ uint32_t slot = empty_slots.back ();
+ empty_slots.pop_back ();
- socket_base_t *s = thread->create_socket (type_);
- if (!s)
+ // Create the socket and register its signaler.
+ socket_base_t *s = socket_base_t::create (type_, this, slot);
+ if (!s) {
+ empty_slots.push_back (slot);
+ slot_sync.unlock ();
return NULL;
+ }
+ sockets.push_back (s);
+ slots [slot] = s->get_signaler ();
- term_sync.lock ();
- sockets++;
- term_sync.unlock ();
+ slot_sync.unlock ();
return s;
}
-void zmq::ctx_t::destroy_socket ()
+void zmq::ctx_t::zombify (socket_base_t *socket_)
{
- // If zmq_term was already called and there are no more sockets,
- // terminate the whole 0MQ infrastructure.
- term_sync.lock ();
- zmq_assert (sockets > 0);
- sockets--;
- bool destroy = (sockets == 0 && terminated);
- term_sync.unlock ();
-
- if (destroy)
- delete this;
-}
+ // Zombification of socket basically means that its ownership is tranferred
+ // from the application that created it to the context.
-void zmq::ctx_t::no_sockets (app_thread_t *thread_)
-{
- app_threads_sync.lock ();
- app_threads_t::size_type i;
- for (i = 0; i != app_threads.size (); i++)
- if (app_threads [i].app_thread == thread_) {
- app_threads [i].associated = false;
- break;
- }
- zmq_assert (i != app_threads.size ());
- app_threads_sync.unlock ();
+ // Note that the lock provides the memory barrier needed to migrate
+ // zombie-to-be socket from it's native thread to shared data area
+ // synchronised by slot_sync.
+ slot_sync.lock ();
+ sockets.erase (socket_);
+ zombies.push_back (socket_);
+
+ // Try to get rid of at least some zombie sockets at this point.
+ dezombify ();
+
+ // If shutdown thread is interested in notification about no more
+ // open sockets, notify it now.
+ if (sockets.empty () && no_sockets_notify)
+ no_sockets_sync.post ();
+
+ slot_sync.unlock ();
}
-void zmq::ctx_t::send_command (uint32_t destination_,
- const command_t &command_)
+void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{
- signalers [destination_]->send (command_);
+ slots [slot_]->send (command_);
}
-bool zmq::ctx_t::recv_command (uint32_t thread_slot_,
- command_t *command_, bool block_)
+bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_)
{
- return signalers [thread_slot_]->recv (command_, block_);
+ return slots [slot_]->recv (command_, block_);
}
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
@@ -242,22 +225,6 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
return io_threads [result];
}
-void zmq::ctx_t::register_pipe (class pipe_t *pipe_)
-{
- pipes_sync.lock ();
- bool inserted = pipes.insert (pipe_).second;
- zmq_assert (inserted);
- pipes_sync.unlock ();
-}
-
-void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_)
-{
- pipes_sync.lock ();
- pipes_t::size_type erased = pipes.erase (pipe_);
- zmq_assert (erased == 1);
- pipes_sync.unlock ();
-}
-
int zmq::ctx_t::register_endpoint (const char *addr_,
socket_base_t *socket_)
{
@@ -315,3 +282,15 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint;
}
+void zmq::ctx_t::dezombify ()
+{
+ // Try to dezombify each zombie in the list.
+ for (zombies_t::size_type i = 0; i != zombies.size ();)
+ if (zombies [i]->dezombify ()) {
+ empty_slots.push_back (zombies [i]->get_slot ());
+ zombies.erase (zombies [i]);
+ }
+ else
+ i++;
+}
+
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c96a923..cb9a2d9 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,7 +26,9 @@
#include <string>
#include "signaler.hpp"
+#include "semaphore.hpp"
#include "ypipe.hpp"
+#include "yarray.hpp"
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
@@ -55,29 +57,19 @@ namespace zmq
// Create a socket.
class socket_base_t *create_socket (int type_);
- // Destroy a socket.
- void destroy_socket ();
+ // Make socket a zombie.
+ void zombify (socket_base_t *socket_);
- // Called by app_thread_t when it has no more sockets. The function
- // should disassociate the object from the current OS thread.
- void no_sockets (class app_thread_t *thread_);
+ // Send command to the destination slot.
+ void send_command (uint32_t slot_, const command_t &command_);
- // Send command to the destination thread.
- void send_command (uint32_t destination_, const command_t &command_);
-
- // Receive command from another thread.
- bool recv_command (uint32_t thread_slot_, command_t *command_,
- bool block_);
+ // Receive command from the source slot.
+ bool recv_command (uint32_t slot_, command_t *command_, bool block_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
- // All pipes are registered with the context so that even the
- // orphaned pipes can be deallocated on the terminal shutdown.
- void register_pipe (class pipe_t *pipe_);
- void unregister_pipe (class pipe_t *pipe_);
-
// Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
void unregister_endpoints (class socket_base_t *socket_);
@@ -87,57 +79,45 @@ namespace zmq
~ctx_t ();
- struct app_thread_info_t
- {
- // If false, 0MQ application thread is free, there's no associated
- // OS thread.
- bool associated;
+ // Sockets belonging to this context.
+ typedef yarray_t <socket_base_t> sockets_t;
+ sockets_t sockets;
+
+ // Array of sockets that were already closed but not yet deallocated.
+ // These sockets still have some pipes and I/O objects attached.
+ typedef yarray_t <socket_base_t> zombies_t;
+ zombies_t zombies;
+
+ // List of unused slots.
+ typedef std::vector <uint32_t> emtpy_slots_t;
+ emtpy_slots_t empty_slots;
- // ID of the associated OS thread. If 'associated' is false,
- // this field contains bogus data.
- thread_t::id_t tid;
+ // If true, shutdown thread wants to be informed when there are no
+ // more open sockets. Do so by posting no_sockets_sync semaphore.
+ // Note that this variable is synchronised by slot_sync mutex.
+ bool no_sockets_notify;
- // Pointer to the 0MQ application thread object.
- class app_thread_t *app_thread;
- };
+ // Object used by zmq_term to wait while all the sockets are closed
+ // by different application threads.
+ semaphore_t no_sockets_sync;
- // Application threads.
- typedef std::vector <app_thread_info_t> app_threads_t;
- app_threads_t app_threads;
+ // Synchronisation of accesses to global slot-related data:
+ // sockets, zombies, empty_slots, terminated. It also synchronises
+ // access to zombie sockets as such (as oposed to slots) and provides
+ // a memory barrier to ensure that all CPU cores see the same data.
+ mutex_t slot_sync;
- // Synchronisation of accesses to shared application thread data.
- mutex_t app_threads_sync;
+ // This function attempts to deallocate as many zombie sockets as
+ // possible. It must be called within a slot_sync critical section.
+ void dezombify ();
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
// Array of pointers to signalers for both application and I/O threads.
- int signalers_count;
- signaler_t **signalers;
-
- // As pipes may reside in orphaned state in particular moments
- // of the pipe shutdown process, i.e. neither pipe reader nor
- // pipe writer hold reference to the pipe, we have to hold references
- // to all pipes in context so that we can deallocate them
- // during terminal shutdown even though it conincides with the
- // pipe being in the orphaned state.
- typedef std::set <class pipe_t*> pipes_t;
- pipes_t pipes;
-
- // Synchronisation of access to the pipes repository.
- mutex_t pipes_sync;
-
- // Number of sockets alive.
- int sockets;
-
- // If true, zmq_term was already called. When last socket is closed
- // the whole 0MQ infrastructure should be deallocated.
- bool terminated;
-
- // Synchronisation of access to the termination data (socket count
- // and 'terminated' flag).
- mutex_t term_sync;
+ uint32_t slot_count;
+ signaler_t **slots;
// List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t;
diff --git a/src/fq.cpp b/src/fq.cpp
index 9028853..48a7029 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -32,18 +32,19 @@ zmq::fq_t::fq_t () :
zmq::fq_t::~fq_t ()
{
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->term ();
+ zmq_assert (pipes.empty ());
}
void zmq::fq_t::attach (reader_t *pipe_)
{
+ pipe_->set_event_sink (this);
+
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
-void zmq::fq_t::detach (reader_t *pipe_)
+void zmq::fq_t::terminated (reader_t *pipe_)
{
zmq_assert (!more || pipes [current] != pipe_);
@@ -57,16 +58,18 @@ void zmq::fq_t::detach (reader_t *pipe_)
pipes.erase (pipe_);
}
-void zmq::fq_t::kill (reader_t *pipe_)
+bool zmq::fq_t::has_pipes ()
{
- // Move the pipe to the list of inactive pipes.
- active--;
- if (current == active)
- current = 0;
- pipes.swap (pipes.index (pipe_), active);
+ return !pipes.empty ();
+}
+
+void zmq::fq_t::term_pipes ()
+{
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
}
-void zmq::fq_t::revive (reader_t *pipe_)
+void zmq::fq_t::activated (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
@@ -98,6 +101,12 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
}
return 0;
}
+ else {
+ active--;
+ pipes.swap (current, active);
+ if (current == active)
+ current = 0;
+ }
}
// No message is available. Initialise the output parameter
diff --git a/src/fq.hpp b/src/fq.hpp
index 5c699ee..2e09809 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_FQ_HPP_INCLUDED__
#include "yarray.hpp"
+#include "pipe.hpp"
namespace zmq
{
@@ -28,24 +29,28 @@ namespace zmq
// Class manages a set of inbound pipes. On receive it performs fair
// queueing (RFC970) so that senders gone berserk won't cause denial of
// service for decent senders.
- class fq_t
+ class fq_t : public i_reader_events
{
public:
fq_t ();
~fq_t ();
- void attach (class reader_t *pipe_);
- void detach (class reader_t *pipe_);
- void kill (class reader_t *pipe_);
- void revive (class reader_t *pipe_);
+ void attach (reader_t *pipe_);
+ bool has_pipes ();
+ void term_pipes ();
+
int recv (zmq_msg_t *msg_, int flags_);
bool has_in ();
+ // i_reader_events implementation.
+ void activated (reader_t *pipe_);
+ void terminated (reader_t *pipe_);
+
private:
// Inbound pipes.
- typedef yarray_t <class reader_t> pipes_t;
+ typedef yarray_t <reader_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
deleted file mode 100644
index 0d14224..0000000
--- a/src/i_endpoint.hpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
-#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
-
-#include "blob.hpp"
-
-namespace zmq
-{
-
- struct i_endpoint
- {
- virtual ~i_endpoint () {}
-
- virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
- virtual void detach_inpipe (class reader_t *pipe_) = 0;
- virtual void detach_outpipe (class writer_t *pipe_) = 0;
- virtual void kill (class reader_t *pipe_) = 0;
- virtual void revive (class reader_t *pipe_) = 0;
- virtual void revive (class writer_t *pipe_) = 0;
- };
-
-}
-
-#endif
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index fac6961..3d202cf 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -26,9 +26,8 @@
#include "err.hpp"
#include "ctx.hpp"
-zmq::io_thread_t::io_thread_t (ctx_t *ctx_,
- uint32_t thread_slot_) :
- object_t (ctx_, thread_slot_)
+zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t slot_) :
+ object_t (ctx_, slot_)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 3d832c0..9e7c2ea 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,7 +38,7 @@ namespace zmq
{
public:
- io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
+ io_thread_t (class ctx_t *ctx_, uint32_t slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/lb.cpp b/src/lb.cpp
index ca93ba2..ccfaaae 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -32,19 +32,27 @@ zmq::lb_t::lb_t () :
zmq::lb_t::~lb_t ()
{
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->term ();
+ zmq_assert (pipes.empty ());
}
void zmq::lb_t::attach (writer_t *pipe_)
{
+ pipe_->set_event_sink (this);
+
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
-void zmq::lb_t::detach (writer_t *pipe_)
+void zmq::lb_t::term_pipes ()
{
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
+}
+
+void zmq::lb_t::terminated (writer_t *pipe_)
+{
+ // ???
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
@@ -57,7 +65,12 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_);
}
-void zmq::lb_t::revive (writer_t *pipe_)
+bool zmq::lb_t::has_pipes ()
+{
+ return !pipes.empty ();
+}
+
+void zmq::lb_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
diff --git a/src/lb.hpp b/src/lb.hpp
index 526a727..e69385e 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -21,25 +21,30 @@
#define __ZMQ_LB_HPP_INCLUDED__
#include "yarray.hpp"
+#include "pipe.hpp"
namespace zmq
{
// Class manages a set of outbound pipes. On send it load balances
// messages fairly among the pipes.
- class lb_t
+ class lb_t : public i_writer_events
{
public:
lb_t ();
~lb_t ();
- void attach (class writer_t *pipe_);
- void detach (class writer_t *pipe_);
- void revive (class writer_t *pipe_);
+ void attach (writer_t *pipe_);
+ void term_pipes ();
+ bool has_pipes ();
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
private:
// List of outbound pipes.
diff --git a/src/object.cpp b/src/object.cpp
index 324450f..cdb177f 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -28,15 +28,15 @@
#include "session.hpp"
#include "socket_base.hpp"
-zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) :
+zmq::object_t::object_t (ctx_t *ctx_, uint32_t slot_) :
ctx (ctx_),
- thread_slot (thread_slot_)
+ slot (slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
ctx (parent_->ctx),
- thread_slot (parent_->thread_slot)
+ slot (parent_->slot)
{
}
@@ -44,9 +44,9 @@ zmq::object_t::~object_t ()
{
}
-uint32_t zmq::object_t::get_thread_slot ()
+uint32_t zmq::object_t::get_slot ()
{
- return thread_slot;
+ return slot;
}
zmq::ctx_t *zmq::object_t::get_ctx ()
@@ -123,16 +123,6 @@ void zmq::object_t::process_command (command_t &cmd_)
deallocate_command (&cmd_);
}
-void zmq::object_t::register_pipe (class pipe_t *pipe_)
-{
- ctx->register_pipe (pipe_);
-}
-
-void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
-{
- ctx->unregister_pipe (pipe_);
-}
-
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
return ctx->register_endpoint (addr_, socket_);
@@ -153,6 +143,11 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
return ctx->choose_io_thread (taskset_);
}
+void zmq::object_t::zombify (socket_base_t *socket_)
+{
+ ctx->zombify (socket_);
+}
+
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
@@ -160,7 +155,7 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
- ctx->send_command (thread_slot, cmd);
+ ctx->send_command (slot, cmd);
}
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
@@ -369,6 +364,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
- ctx->send_command (cmd_.destination->get_thread_slot (), cmd_);
+ ctx->send_command (cmd_.destination->get_slot (), cmd_);
}
diff --git a/src/object.hpp b/src/object.hpp
index a38b0a6..c75a95a 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,18 +32,14 @@ namespace zmq
{
public:
- object_t (class ctx_t *ctx_, uint32_t thread_slot_);
+ object_t (class ctx_t *ctx_, uint32_t slot_);
object_t (object_t *parent_);
virtual ~object_t ();
- uint32_t get_thread_slot ();
+ uint32_t get_slot ();
ctx_t *get_ctx ();
void process_command (struct command_t &cmd_);
- // Allow pipe to access corresponding context functions.
- void register_pipe (class pipe_t *pipe_);
- void unregister_pipe (class pipe_t *pipe_);
-
protected:
// Using following function, socket is able to access global
@@ -55,6 +51,10 @@ namespace zmq
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_);
+ // Zombify particular socket. In other words, pass the ownership to
+ // the context.
+ void zombify (class socket_base_t *socket_);
+
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
@@ -105,7 +105,7 @@ namespace zmq
class ctx_t *ctx;
// Slot ID of the thread the object belongs to.
- uint32_t thread_slot;
+ uint32_t slot;
void send_command (command_t &cmd_);
diff --git a/src/owned.cpp b/src/owned.cpp
index d6be444..7d1cf5e 100644
--- a/src/owned.cpp
+++ b/src/owned.cpp
@@ -35,7 +35,7 @@ zmq::owned_t::~owned_t ()
void zmq::owned_t::inc_seqnum ()
{
- // NB: This function may be called from a different thread!
+ // This function may be called from a different thread!
sent_seqnum.add (1);
}
@@ -62,10 +62,16 @@ void zmq::owned_t::finalise ()
{
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
- if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
+ if (shutting_down && processed_seqnum == sent_seqnum.get ()
+ && is_terminable ()) {
process_unplug ();
send_term_ack (owner);
delete this;
}
}
+bool zmq::owned_t::is_terminable ()
+{
+ return true;
+}
+
diff --git a/src/owned.hpp b/src/owned.hpp
index 91189a1..80cf42f 100644
--- a/src/owned.hpp
+++ b/src/owned.hpp
@@ -45,6 +45,13 @@ namespace zmq
protected:
+ // A mechanism allowing derived owned objects to postpone the
+ // termination process. Default implementation defines no such delay.
+ // Note that the derived object has to call finalise method when the
+ // delay is over.
+ virtual bool is_terminable ();
+ void finalise ();
+
// Ask owner socket to terminate this object.
void term ();
@@ -69,8 +76,6 @@ namespace zmq
void process_term ();
void process_seqnum ();
- void finalise ();
-
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
diff --git a/src/pair.cpp b/src/pair.cpp
index 3872b28..1ff2e1a 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -23,11 +23,12 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::pair_t::pair_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
inpipe (NULL),
outpipe (NULL),
- alive (true)
+ inpipe_alive (false),
+ outpipe_alive (false)
{
options.requires_in = true;
options.requires_out = true;
@@ -35,56 +36,61 @@ zmq::pair_t::pair_t (class app_thread_t *parent_) :
zmq::pair_t::~pair_t ()
{
- if (inpipe)
- inpipe->term ();
- if (outpipe)
- outpipe->term ();
+ zmq_assert (!inpipe);
+ zmq_assert (!outpipe);
}
void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
+
inpipe = inpipe_;
+ inpipe_alive = true;
+ inpipe->set_event_sink (this);
+
outpipe = outpipe_;
outpipe_alive = true;
+ outpipe->set_event_sink (this);
}
-void zmq::pair_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::pair_t::terminated (class reader_t *pipe_)
{
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
+ inpipe_alive = false;
}
-void zmq::pair_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::pair_t::terminated (class writer_t *pipe_)
{
zmq_assert (pipe_ == outpipe);
outpipe = NULL;
+ outpipe_alive = false;
}
-void zmq::pair_t::xkill (class reader_t *pipe_)
+void zmq::pair_t::xterm_pipes ()
{
- zmq_assert (alive);
- alive = false;
+ if (inpipe)
+ inpipe->terminate ();
+ if (outpipe)
+ outpipe->terminate ();
}
-void zmq::pair_t::xrevive (class reader_t *pipe_)
+bool zmq::pair_t::xhas_pipes ()
{
- zmq_assert (!alive);
- alive = true;
+ return inpipe != NULL || outpipe != NULL;
}
-void zmq::pair_t::xrevive (class writer_t *pipe_)
+void zmq::pair_t::activated (class reader_t *pipe_)
{
- zmq_assert (!outpipe_alive);
- outpipe_alive = true;
+ zmq_assert (!inpipe_alive);
+ inpipe_alive = true;
}
-int zmq::pair_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::pair_t::activated (class writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ zmq_assert (!outpipe_alive);
+ outpipe_alive = true;
}
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -100,7 +106,8 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- outpipe->flush ();
+ if (!(flags_ & ZMQ_SNDMORE))
+ outpipe->flush ();
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
@@ -114,9 +121,12 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
- if (!alive || !inpipe || !inpipe->read (msg_)) {
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
+ if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
+
+ // No message is available.
+ inpipe_alive = false;
+
+ // Initialise the output parameter to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
@@ -126,14 +136,16 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::pair_t::xhas_in ()
{
- if (alive && inpipe && inpipe->check_read ())
- return true;
- return false;
+ if (!inpipe || !inpipe_alive)
+ return false;
+
+ inpipe_alive = inpipe->check_read ();
+ return inpipe_alive;
}
bool zmq::pair_t::xhas_out ()
{
- if (outpipe == NULL || !outpipe_alive)
+ if (!outpipe || !outpipe_alive)
return false;
outpipe_alive = outpipe->check_write ();
diff --git a/src/pair.hpp b/src/pair.hpp
index aea249f..0c484d7 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -21,37 +21,45 @@
#define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "pipe.hpp"
namespace zmq
{
- class pair_t : public socket_base_t
+ class pair_t :
+ public socket_base_t,
+ public i_reader_events,
+ public i_writer_events
{
public:
- pair_t (class app_thread_t *parent_);
+ pair_t (class ctx_t *parent_, uint32_t slot_);
~pair_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ // i_reader_events interface implementation.
+ void activated (class reader_t *pipe_);
+ void terminated (class reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (class writer_t *pipe_);
+ void terminated (class writer_t *pipe_);
+
private:
class reader_t *inpipe;
class writer_t *outpipe;
- bool alive;
+ bool inpipe_alive;
bool outpipe_alive;
pair_t (const pair_t&);
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 200beb0..1903422 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -17,31 +17,54 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include "../include/zmq.h"
#include "pipe.hpp"
+#include "likely.hpp"
-zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
+zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
+ uint64_t lwm_) :
object_t (parent_),
- pipe (NULL),
- peer (NULL),
+ pipe (pipe_),
+ writer (NULL),
lwm (lwm_),
msgs_read (0),
- endpoint (NULL)
-{}
+ sink (NULL),
+ terminating (false)
+{
+ // Note that writer is not set here. Writer will inform reader about its
+ // address once it is created (via set_writer method).
+}
+
+void zmq::reader_t::set_writer (writer_t *writer_)
+{
+ zmq_assert (!writer);
+ writer = writer_;
+}
zmq::reader_t::~reader_t ()
{
- if (pipe)
- unregister_pipe (pipe);
+ // Pipe as such is owned and deallocated by reader object.
+ // The point is that reader processes the last step of terminal
+ // handshaking (term_ack).
+ zmq_assert (pipe);
+
+ // First delete all the unread messages in the pipe. We have to do it by
+ // hand because zmq_msg_t is a POD, not a class, so there's no associated
+ // destructor.
+ zmq_msg_t msg;
+ while (pipe->read (&msg))
+ zmq_msg_close (&msg);
+
+ delete pipe;
}
-void zmq::reader_t::set_pipe (pipe_t *pipe_)
+void zmq::reader_t::set_event_sink (i_reader_events *sink_)
{
- zmq_assert (!pipe);
- pipe = pipe_;
- peer = &pipe->writer;
- register_pipe (pipe);
+ zmq_assert (!sink);
+ sink = sink_;
}
bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
@@ -53,19 +76,20 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
bool zmq::reader_t::check_read ()
{
+ if (unlikely (terminating))
+ return false;
+
// Check if there's an item in the pipe.
// If not, deactivate the pipe.
if (!pipe->check_read ()) {
- endpoint->kill (this);
+ terminate ();
return false;
}
// If the next item in the pipe is message delimiter,
// initiate its termination.
if (pipe->probe (is_delimiter)) {
- if (endpoint)
- endpoint->detach_inpipe (this);
- term ();
+ terminate ();
return false;
}
@@ -74,17 +98,16 @@ bool zmq::reader_t::check_read ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
- if (!pipe->read (msg_)) {
- endpoint->kill (this);
+ if (unlikely (terminating))
+ return false;
+
+ if (!pipe->read (msg_))
return false;
- }
// If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0;
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
- if (endpoint)
- endpoint->detach_inpipe (this);
- term ();
+ terminate ();
return false;
}
@@ -92,51 +115,64 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
- send_reader_info (peer, msgs_read);
+ send_reader_info (writer, msgs_read);
return true;
}
-void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
+void zmq::reader_t::terminate ()
{
- endpoint = endpoint_;
+ // If termination was already started by the peer, do nothing.
+ if (terminating)
+ return;
+
+ terminating = true;
+ send_pipe_term (writer);
}
-void zmq::reader_t::term ()
+bool zmq::reader_t::is_terminating ()
{
- endpoint = NULL;
- send_pipe_term (peer);
+ return terminating;
}
void zmq::reader_t::process_revive ()
{
- // Beacuse of command throttling mechanism, incoming termination request
- // may not have been processed before subsequent send.
- // In that case endpoint is NULL.
- if (endpoint)
- endpoint->revive (this);
+ // Forward the event to the sink (either socket or session).
+ sink->activated (this);
}
void zmq::reader_t::process_pipe_term_ack ()
{
- peer = NULL;
- delete pipe;
+ // At this point writer may already be deallocated.
+ // For safety's sake drop the reference to it.
+ writer = NULL;
+
+ // Notify owner about the termination.
+ zmq_assert (sink);
+ sink->terminated (this);
+
+ // Deallocate resources.
+ delete this;
}
-zmq::writer_t::writer_t (object_t *parent_,
+zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_, int64_t swap_size_) :
object_t (parent_),
- pipe (NULL),
- peer (NULL),
+ pipe (pipe_),
+ reader (reader_),
hwm (hwm_),
msgs_read (0),
msgs_written (0),
msg_store (NULL),
extra_msg_flag (false),
stalled (false),
- pending_close (false),
- endpoint (NULL)
+ sink (NULL),
+ terminating (false),
+ pending_close (false)
{
+ // Inform reader about the writer.
+ reader->set_writer (this);
+
if (swap_size_ > 0) {
msg_store = new (std::nothrow) msg_store_t (swap_size_);
if (msg_store != NULL) {
@@ -148,11 +184,6 @@ zmq::writer_t::writer_t (object_t *parent_,
}
}
-void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
-{
- endpoint = endpoint_;
-}
-
zmq::writer_t::~writer_t ()
{
if (extra_msg_flag)
@@ -161,15 +192,17 @@ zmq::writer_t::~writer_t ()
delete msg_store;
}
-void zmq::writer_t::set_pipe (pipe_t *pipe_)
+void zmq::writer_t::set_event_sink (i_writer_events *sink_)
{
- zmq_assert (!pipe);
- pipe = pipe_;
- peer = &pipe->reader;
+ zmq_assert (!sink);
+ sink = sink_;
}
bool zmq::writer_t::check_write ()
{
+ if (terminating)
+ return false;
+
if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
stalled = true;
return false;
@@ -180,6 +213,9 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
+ if (terminating)
+ return false;
+
if (!check_write ())
return false;
@@ -216,23 +252,27 @@ void zmq::writer_t::rollback ()
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
+ msgs_written--;
}
- if (stalled && endpoint != NULL && check_write ()) {
+ if (stalled && check_write ()) {
stalled = false;
- endpoint->revive (this);
+ zmq_assert (sink);
+ sink->activated (this);
}
}
void zmq::writer_t::flush ()
{
if (!pipe->flush ())
- send_revive (peer);
+ send_revive (reader);
}
-void zmq::writer_t::term ()
+void zmq::writer_t::terminate ()
{
- endpoint = NULL;
+ // Prevent double termination.
+ if (terminating)
+ return;
// Rollback any unfinished messages.
rollback ();
@@ -293,71 +333,69 @@ void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
flush ();
}
- if (stalled && endpoint != NULL) {
+ if (stalled) {
stalled = false;
- endpoint->revive (this);
+ zmq_assert (sink);
+ sink->activated (this);
}
}
void zmq::writer_t::process_pipe_term ()
{
- if (endpoint)
- endpoint->detach_outpipe (this);
+ send_pipe_term_ack (reader);
- reader_t *p = peer;
- peer = NULL;
- send_pipe_term_ack (p);
-}
+ // The above command allows reader to deallocate itself and the pipe.
+ // For safety's sake we'll drop the pointers here.
+ reader = NULL;
+ pipe = NULL;
-bool zmq::writer_t::pipe_full ()
-{
- return hwm > 0 && msgs_written - msgs_read == hwm;
-}
+ // Notify owner about the termination.
+ zmq_assert (sink);
+ sink->terminated (this);
-zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, int64_t swap_size_) :
- reader (reader_parent_, compute_lwm (hwm_)),
- writer (writer_parent_, hwm_, swap_size_)
-{
- reader.set_pipe (this);
- writer.set_pipe (this);
+ // Deallocate the resources.
+ delete this;
}
-zmq::pipe_t::~pipe_t ()
+bool zmq::writer_t::pipe_full ()
{
- // Deallocate all the unread messages in the pipe. We have to do it by
- // hand because zmq_msg_t is a POD, not a class, so there's no associated
- // destructor.
- zmq_msg_t msg;
- while (read (&msg))
- zmq_msg_close (&msg);
+ return hwm > 0 && msgs_written - msgs_read == hwm;
}
-uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
+void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
{
- // Following point should be taken into consideration when computing
- // low watermark:
- //
- // 1. LWM has to be less than HWM.
- // 2. LWM cannot be set to very low value (such as zero) as after filling
- // the queue it would start to refill only after all the messages are
- // read from it and thus unnecessarily hold the progress back.
- // 3. LWM cannot be set to very high value (such as HWM-1) as it would
- // result in lock-step filling of the queue - if a single message is read
- // from a full queue, writer thread is resumed to write exactly one
- // message to the queue and go back to sleep immediately. This would
- // result in low performance.
- //
- // Given the 3. it would be good to keep HWM and LWM as far apart as
- // possible to reduce the thread switching overhead to almost zero,
- // say HWM-LWM should be 500 (max_wm_delta).
- //
- // That done, we still we have to account for the cases where HWM<500 thus
- // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.
-
- if (hwm_ > max_wm_delta * 2)
- return hwm_ - max_wm_delta;
- else
- return (hwm_ + 1) / 2;
+ // First compute the low water mark. Following point should be taken
+ // into consideration:
+ //
+ // 1. LWM has to be less than HWM.
+ // 2. LWM cannot be set to very low value (such as zero) as after filling
+ // the queue it would start to refill only after all the messages are
+ // read from it and thus unnecessarily hold the progress back.
+ // 3. LWM cannot be set to very high value (such as HWM-1) as it would
+ // result in lock-step filling of the queue - if a single message is
+ // read from a full queue, writer thread is resumed to write exactly one
+ // message to the queue and go back to sleep immediately. This would
+ // result in low performance.
+ //
+ // Given the 3. it would be good to keep HWM and LWM as far apart as
+ // possible to reduce the thread switching overhead to almost zero,
+ // say HWM-LWM should be max_wm_delta.
+ //
+ // That done, we still we have to account for the cases where
+ // HWM < max_wm_delta thus driving LWM to negative numbers.
+ // Let's make LWM 1/2 of HWM in such cases.
+ uint64_t lwm = (hwm_ > max_wm_delta * 2) ?
+ hwm_ - max_wm_delta : (hwm_ + 1) / 2;
+
+ // Create all three objects pipe consists of: the pipe per se, reader and
+ // writer. The pipe will be handled by reader and writer, its never passed
+ // to the user. Reader and writer are returned to the user.
+ pipe_t *pipe = new (std::nothrow) pipe_t ();
+ zmq_assert (pipe);
+ *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
+ zmq_assert (*reader_);
+ *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
+ hwm_, swap_size_);
+ zmq_assert (*writer_);
}
-
diff --git a/src/pipe.hpp b/src/pipe.hpp
index ece678a..34c5600 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -23,7 +23,6 @@
#include "../include/zmq.h"
#include "stdint.hpp"
-#include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp"
#include "msg_store.hpp"
@@ -33,15 +32,31 @@
namespace zmq
{
+ // The shutdown mechanism for pipe works as follows: Either endpoint
+ // (or even both of them) can ask pipe to terminate by calling 'terminate'
+ // method. Pipe then terminates in asynchronous manner. When the part of
+ // the shutdown tied to the endpoint is done it triggers 'terminated'
+ // event. When endpoint processes the event and returns, associated
+ // reader/writer object is deallocated.
+
+ typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t;
+
+ struct i_reader_events
+ {
+ virtual void terminated (class reader_t *pipe_) = 0;
+ virtual void activated (class reader_t *pipe_) = 0;
+ };
+
class reader_t : public object_t, public yarray_item_t
{
- public:
+ friend void zmq::create_pipe (object_t*, object_t*, uint64_t,
+ int64_t, reader_t**, writer_t**);
+ friend class writer_t;
- reader_t (class object_t *parent_, uint64_t lwm_);
- ~reader_t ();
+ public:
- void set_pipe (class pipe_t *pipe_);
- void set_endpoint (i_endpoint *endpoint_);
+ // Specifies the object to get events from the reader.
+ void set_event_sink (i_reader_events *endpoint_);
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -50,10 +65,20 @@ namespace zmq
bool read (zmq_msg_t *msg_);
// Ask pipe to terminate.
- void term ();
+ void terminate ();
+
+ // Returns true if the pipe is already terminating
+ // (say if delimiter was already read).
+ bool is_terminating ();
private:
+ reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_);
+ ~reader_t ();
+
+ // To be called only by writer itself!
+ void set_writer (class writer_t *writer_);
+
// Command handlers.
void process_revive ();
void process_pipe_term_ack ();
@@ -62,10 +87,10 @@ namespace zmq
static bool is_delimiter (zmq_msg_t &msg_);
// The underlying pipe.
- class pipe_t *pipe;
+ pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
- class writer_t *peer;
+ class writer_t *writer;
// Low watermark for in-memory storage (in bytes).
uint64_t lwm;
@@ -73,22 +98,32 @@ namespace zmq
// Number of messages read so far.
uint64_t msgs_read;
- // Endpoint (either session or socket) the pipe is attached to.
- i_endpoint *endpoint;
+ // Sink for the events (either the socket of the session).
+ i_reader_events *sink;
+
+ // True is 'terminate' method was called or delimiter
+ // was read from the pipe.
+ bool terminating;
reader_t (const reader_t&);
void operator = (const reader_t&);
};
+ struct i_writer_events
+ {
+ virtual void terminated (class writer_t *pipe_) = 0;
+ virtual void activated (class writer_t *pipe_) = 0;
+ };
+
class writer_t : public object_t, public yarray_item_t
{
- public:
+ friend void zmq::create_pipe (object_t*, object_t*, uint64_t,
+ int64_t, reader_t**, writer_t**);
- writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_);
- ~writer_t ();
+ public:
- void set_pipe (class pipe_t *pipe_);
- void set_endpoint (i_endpoint *endpoint_);
+ // Specifies the object to get events from the writer.
+ void set_event_sink (i_writer_events *endpoint_);
// Checks whether a message can be written to the pipe.
// If writing the message would cause high watermark to be
@@ -106,10 +141,14 @@ namespace zmq
void flush ();
// Ask pipe to terminate.
- void term ();
+ void terminate ();
private:
+ writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
+ uint64_t hwm_, int64_t swap_size_);
+ ~writer_t ();
+
void process_reader_info (uint64_t msgs_read_);
// Command handlers.
@@ -123,10 +162,10 @@ namespace zmq
void write_delimiter ();
// The underlying pipe.
- class pipe_t *pipe;
+ pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
- class reader_t *peer;
+ reader_t *reader;
// High watermark for in-memory storage (in bytes).
uint64_t hwm;
@@ -149,35 +188,23 @@ namespace zmq
// True iff the last attempt to write a message has failed.
bool stalled;
- bool pending_close;
+ // Sink for the events (either the socket or the session).
+ i_writer_events *sink;
- // Endpoint (either session or socket) the pipe is attached to.
- i_endpoint *endpoint;
+ // True is 'terminate' method was called of 'pipe_term' command
+ // arrived from the reader.
+ bool terminating;
+
+ bool pending_close;
writer_t (const writer_t&);
void operator = (const writer_t&);
};
- // Message pipe.
- class pipe_t : public ypipe_t <zmq_msg_t, message_pipe_granularity>
- {
- public:
-
- pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_, int64_t swap_size_);
- ~pipe_t ();
-
- reader_t reader;
- writer_t writer;
-
- private:
-
- uint64_t compute_lwm (uint64_t hwm_);
-
- pipe_t (const pipe_t&);
- void operator = (const pipe_t&);
- };
-
+ // Creates a pipe. Returns pointer to reader and writer objects.
+ void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, int64_t swap_size_, reader_t **reader_,
+ writer_t **writer_);
}
#endif
diff --git a/src/pub.cpp b/src/pub.cpp
index 4e73b19..d1d1c72 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -24,8 +24,8 @@
#include "msg_content.hpp"
#include "pipe.hpp"
-zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
active (0)
{
options.requires_in = false;
@@ -34,56 +34,47 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t ()
{
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->term ();
- pipes.clear ();
+ zmq_assert (pipes.empty ());
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
+
+ outpipe_->set_event_sink (this);
+
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
-void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
-{
- zmq_assert (false);
-}
-
-void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
-{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
- if (pipes.index (pipe_) < active)
- active--;
- pipes.erase (pipe_);
-}
-
-void zmq::pub_t::xkill (class reader_t *pipe_)
+void zmq::pub_t::xterm_pipes ()
{
- zmq_assert (false);
+ // Start shutdown process for all the pipes.
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
}
-void zmq::pub_t::xrevive (class reader_t *pipe_)
+bool zmq::pub_t::xhas_pipes ()
{
- zmq_assert (false);
+ return !pipes.empty ();
}
-void zmq::pub_t::xrevive (class writer_t *pipe_)
+void zmq::pub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
-int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::pub_t::terminated (writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -101,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i != active;)
+ for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
@@ -133,7 +124,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push the message to all destinations.
- for (pipes_t::size_type i = 0; i != active;) {
+ for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
@@ -147,17 +138,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
-}
-
-bool zmq::pub_t::xhas_in ()
-{
- return false;
-}
-
bool zmq::pub_t::xhas_out ()
{
return true;
diff --git a/src/pub.hpp b/src/pub.hpp
index ac3924a..a81edfe 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -22,31 +22,30 @@
#include "socket_base.hpp"
#include "yarray.hpp"
+#include "pipe.hpp"
namespace zmq
{
- class pub_t : public socket_base_t
+ class pub_t : public socket_base_t, public i_writer_events
{
public:
- pub_t (class app_thread_t *parent_);
+ pub_t (class ctx_t *parent_, uint32_t slot_);
~pub_t ();
- // Overloads of functions from socket_base_t.
+ // Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
- bool xhas_in ();
bool xhas_out ();
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
private:
// Write the message to the pipe. Make the pipe inactive if writing
diff --git a/src/pull.cpp b/src/pull.cpp
index b2413ee..4f4a8b3 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -22,8 +22,8 @@
#include "pull.hpp"
#include "err.hpp"
-zmq::pull_t::pull_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_)
{
options.requires_in = true;
options.requires_out = false;
@@ -40,45 +40,14 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::pull_t::xterm_pipes ()
{
- zmq_assert (pipe_);
- fq.detach (pipe_);
+ fq.term_pipes ();
}
-void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_)
+bool zmq::pull_t::xhas_pipes ()
{
- // There are no outpipes, so this function shouldn't be called at all.
- zmq_assert (false);
-}
-
-void zmq::pull_t::xkill (class reader_t *pipe_)
-{
- fq.kill (pipe_);
-}
-
-void zmq::pull_t::xrevive (class reader_t *pipe_)
-{
- fq.revive (pipe_);
-}
-
-void zmq::pull_t::xrevive (class writer_t *pipe_)
-{
- zmq_assert (false);
-}
-
-int zmq::pull_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- // No special options for this socket type.
- errno = EINVAL;
- return -1;
-}
-
-int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
+ return fq.has_pipes ();
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
@@ -91,8 +60,3 @@ bool zmq::pull_t::xhas_in ()
return fq.has_in ();
}
-bool zmq::pull_t::xhas_out ()
-{
- return false;
-}
-
diff --git a/src/pull.hpp b/src/pull.hpp
index 7f249e9..4be40dd 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PULL_HPP_INCLUDED__
-#define __ZMQ_PULL_HPP_INCLUDED__
+#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
+#define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "fq.hpp"
@@ -30,22 +30,16 @@ namespace zmq
{
public:
- pull_t (class app_thread_t *parent_);
+ pull_t (class ctx_t *parent_, uint32_t slot_);
~pull_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
- int xsend (zmq_msg_t *msg_, int flags_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
- bool xhas_out ();
private:
diff --git a/src/push.cpp b/src/push.cpp
index 522101f..3a3d258 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::push_t::push_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_)
{
options.requires_in = false;
options.requires_out = true;
@@ -41,41 +41,14 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::push_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::push_t::xterm_pipes ()
{
- // There are no inpipes, so this function shouldn't be called at all.
- zmq_assert (false);
+ lb.term_pipes ();
}
-void zmq::push_t::xdetach_outpipe (class writer_t *pipe_)
+bool zmq::push_t::xhas_pipes ()
{
- zmq_assert (pipe_);
- lb.detach (pipe_);
-}
-
-void zmq::push_t::xkill (class reader_t *pipe_)
-{
- // There are no inpipes, so this function shouldn't be called at all.
- zmq_assert (false);
-}
-
-void zmq::push_t::xrevive (class reader_t *pipe_)
-{
- // There are no inpipes, so this function shouldn't be called at all.
- zmq_assert (false);
-}
-
-void zmq::push_t::xrevive (class writer_t *pipe_)
-{
- lb.revive (pipe_);
-}
-
-int zmq::push_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- // No special option for this socket type.
- errno = EINVAL;
- return -1;
+ return lb.has_pipes ();
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -83,17 +56,6 @@ int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_);
}
-int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
-}
-
-bool zmq::push_t::xhas_in ()
-{
- return false;
-}
-
bool zmq::push_t::xhas_out ()
{
return lb.has_out ();
diff --git a/src/push.hpp b/src/push.hpp
index b3c8d87..e604abc 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUSH_HPP_INCLUDED__
-#define __ZMQ_PUSH_HPP_INCLUDED__
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "lb.hpp"
@@ -30,21 +30,15 @@ namespace zmq
{
public:
- push_t (class app_thread_t *parent_);
+ push_t (class ctx_t *parent_, uint32_t slot_);
~push_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
- int xrecv (zmq_msg_t *msg_, int flags_);
- bool xhas_in ();
bool xhas_out ();
private:
diff --git a/src/rep.cpp b/src/rep.cpp
index 34b77c4..7636d13 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::rep_t::rep_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
active (0),
current (0),
sending_reply (false),
@@ -42,6 +42,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
zmq::rep_t::~rep_t ()
{
+ zmq_assert (in_pipes.empty ());
+ zmq_assert (out_pipes.empty ());
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
@@ -50,15 +52,28 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
+ inpipe_->set_event_sink (this);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
+
+ outpipe_->set_event_sink (this);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
+
active++;
}
-void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::rep_t::xterm_pipes ()
+{
+ for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
+ in_pipes [i]->terminate ();
+ for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
+ out_pipes [i]->terminate ();
+}
+
+void zmq::rep_t::terminated (reader_t *pipe_)
{
+ // ???
zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
zmq_assert (pipe_);
@@ -71,14 +86,17 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
if (current == active)
current = 0;
}
-
- if (out_pipes [index])
- out_pipes [index]->term ();
in_pipes.erase (index);
- out_pipes.erase (index);
+
+ // ???
+ if (!zombie) {
+ if (out_pipes [index])
+ out_pipes [index]->terminate ();
+ out_pipes.erase (index);
+ }
}
-void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::rep_t::terminated (writer_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
@@ -97,22 +115,22 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
current = 0;
}
- if (in_pipes [index])
- in_pipes [index]->term ();
- in_pipes.erase (index);
out_pipes.erase (index);
+
+ // ???
+ if (!zombie) {
+ if (in_pipes [index])
+ in_pipes [index]->terminate ();
+ in_pipes.erase (index);
+ }
}
-void zmq::rep_t::xkill (class reader_t *pipe_)
+bool zmq::rep_t::xhas_pipes ()
{
- // Move the pipe to the list of inactive pipes.
- in_pipes_t::size_type index = in_pipes.index (pipe_);
- active--;
- in_pipes.swap (index, active);
- out_pipes.swap (index, active);
+ return !in_pipes.empty () || !out_pipes.empty ();
}
-void zmq::rep_t::xrevive (class reader_t *pipe_)
+void zmq::rep_t::activated (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
@@ -121,15 +139,10 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)
active++;
}
-void zmq::rep_t::xrevive (class writer_t *pipe_)
-{
-}
-
-int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::rep_t::activated (writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ // TODO: What here?
+ zmq_assert (false);
}
int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -151,6 +164,8 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
// misbehaving requesters stop collecting replies.
// TODO: Tear down the underlying connection (?)
if (!written) {
+
+ // TODO: The reply socket becomes deactivated here...
errno = EAGAIN;
return -1;
}
@@ -198,6 +213,13 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (count = active; count != 0; count--) {
if (in_pipes [current]->read (msg_))
break;
+
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ in_pipes.swap (current, active);
+ out_pipes.swap (current, active);
+
+ // Move to next pipe.
current++;
if (current >= active)
current = 0;
@@ -258,6 +280,13 @@ bool zmq::rep_t::xhas_in ()
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !sending_reply;
+
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ in_pipes.swap (current, active);
+ out_pipes.swap (current, active);
+
+ // Move to the next pipe.
current++;
if (current >= active)
current = 0;
diff --git a/src/rep.hpp b/src/rep.hpp
index aef4318..7d82a28 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -22,39 +22,47 @@
#include "socket_base.hpp"
#include "yarray.hpp"
+#include "pipe.hpp"
namespace zmq
{
- class rep_t : public socket_base_t
+ class rep_t :
+ public socket_base_t,
+ public i_reader_events,
+ public i_writer_events
{
public:
- rep_t (class app_thread_t *parent_);
+ rep_t (class ctx_t *parent_, uint32_t slot_);
~rep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ // i_reader_events interface implementation.
+ void activated (reader_t *pipe_);
+ void terminated (reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
private:
// List in outbound and inbound pipes. Note that the two lists are
// always in sync. I.e. outpipe with index N communicates with the
// same session as inpipe with index N.
- typedef yarray_t <class writer_t> out_pipes_t;
+ typedef yarray_t <writer_t> out_pipes_t;
out_pipes_t out_pipes;
- typedef yarray_t <class reader_t> in_pipes_t;
+ typedef yarray_t <reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inpipes. All the active inpipes are located at the
@@ -73,7 +81,7 @@ namespace zmq
bool more;
// Pipe we are going to send reply to.
- class writer_t *reply_pipe;
+ writer_t *reply_pipe;
rep_t (const rep_t&);
void operator = (const rep_t&);
diff --git a/src/req.cpp b/src/req.cpp
index f3695a2..b900961 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::req_t::req_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::req_t::req_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
active (0),
current (0),
receiving_reply (false),
@@ -38,24 +38,36 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
zmq::req_t::~req_t ()
{
+ zmq_assert (in_pipes.empty ());
+ zmq_assert (out_pipes.empty ());
}
-void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::req_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
+ inpipe_->set_event_sink (this);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
+ outpipe_->set_event_sink (this);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
}
-void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::req_t::xterm_pipes ()
+{
+ for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
+ in_pipes [i]->terminate ();
+ for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
+ out_pipes [i]->terminate ();
+}
+
+void zmq::req_t::terminated (reader_t *pipe_)
{
zmq_assert (!receiving_reply || !more || reply_pipe != pipe_);
@@ -63,17 +75,21 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
zmq_assert (in_pipes.size () == out_pipes.size ());
// TODO: The pipe we are awaiting the reply from is detached. What now?
- // Return ECONNRESET from subsequent recv?
if (receiving_reply && pipe_ == reply_pipe) {
zmq_assert (false);
}
in_pipes_t::size_type index = in_pipes.index (pipe_);
- if (out_pipes [index])
- out_pipes [index]->term ();
+ // ???
+ if (!zombie) {
+ if (out_pipes [index])
+ out_pipes [index]->terminate ();
+ out_pipes.erase (index);
+ }
+
in_pipes.erase (index);
- out_pipes.erase (index);
+
if (index < active) {
active--;
if (current == active)
@@ -81,7 +97,7 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
}
}
-void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::req_t::terminated (writer_t *pipe_)
{
zmq_assert (receiving_reply || !more || out_pipes [current] != pipe_);
@@ -90,9 +106,13 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
out_pipes_t::size_type index = out_pipes.index (pipe_);
- if (in_pipes [index])
- in_pipes [index]->term ();
- in_pipes.erase (index);
+ // ???
+ if (!zombie) {
+ if (in_pipes [index])
+ in_pipes [index]->terminate ();
+ in_pipes.erase (index);
+ }
+
out_pipes.erase (index);
if (index < active) {
active--;
@@ -101,15 +121,12 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
}
}
-void zmq::req_t::xkill (class reader_t *pipe_)
+bool zmq::req_t::xhas_pipes ()
{
- zmq_assert (receiving_reply);
- zmq_assert (pipe_ == reply_pipe);
-
- reply_pipe_active = false;
+ return !in_pipes.empty () || !out_pipes.empty ();
}
-void zmq::req_t::xrevive (class reader_t *pipe_)
+void zmq::req_t::activated (reader_t *pipe_)
{
// TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection.
@@ -117,7 +134,7 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
reply_pipe_active = true;
}
-void zmq::req_t::xrevive (class writer_t *pipe_)
+void zmq::req_t::activated (writer_t *pipe_)
{
out_pipes_t::size_type index = out_pipes.index (pipe_);
zmq_assert (index >= active);
@@ -129,13 +146,6 @@ void zmq::req_t::xrevive (class writer_t *pipe_)
}
}
-int zmq::req_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- errno = EINVAL;
- return -1;
-}
-
int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If we've sent a request and we still haven't got the reply,
@@ -214,6 +224,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
// Get the reply from the reply pipe.
if (!reply_pipe_active || !reply_pipe->read (msg_)) {
+ reply_pipe_active = false;
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
diff --git a/src/req.hpp b/src/req.hpp
index 5ab7bca..5fd5642 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -22,31 +22,39 @@
#include "socket_base.hpp"
#include "yarray.hpp"
+#include "pipe.hpp"
namespace zmq
{
- class req_t : public socket_base_t
+ class req_t :
+ public socket_base_t,
+ public i_reader_events,
+ public i_writer_events
{
public:
- req_t (class app_thread_t *parent_);
+ req_t (class ctx_t *parent_, uint32_t slot_);
~req_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ // i_reader_events interface implementation.
+ void activated (reader_t *pipe_);
+ void terminated (reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
private:
// List in outbound and inbound pipes. Note that the two lists are
@@ -58,9 +66,9 @@ namespace zmq
// the beginning of the array). We don't have to do the same thing for
// inpipes, because we know which pipe we want to read the
// reply from.
- typedef yarray_t <class writer_t> out_pipes_t;
+ typedef yarray_t <writer_t> out_pipes_t;
out_pipes_t out_pipes;
- typedef yarray_t <class reader_t> in_pipes_t;
+ typedef yarray_t <reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active pipes.
@@ -82,7 +90,7 @@ namespace zmq
bool more;
// Pipe we are awaiting the reply from.
- class reader_t *reply_pipe;
+ reader_t *reply_pipe;
req_t (const req_t&);
void operator = (const req_t&);
diff --git a/src/semaphore.hpp b/src/semaphore.hpp
new file mode 100644
index 0000000..1c4d2a0
--- /dev/null
+++ b/src/semaphore.hpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_SEMAPHORE_HPP_INCLUDED__
+#define __ZMQ_SEMAPHORE_HPP_INCLUDED__
+
+#include "platform.hpp"
+#include "err.hpp"
+
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <semaphore.h>
+#endif
+
+namespace zmq
+{
+ // Simple semaphore. Only single thread may be waiting at any given time.
+ // Also, the semaphore may not be posted before the previous post
+ // was matched by corresponding wait and the waiting thread was
+ // released.
+
+#if defined ZMQ_HAVE_WINDOWS
+
+ // On Windows platform simple semaphore is implemeted using event object.
+
+ class semaphore_t
+ {
+ public:
+
+ // Initialise the semaphore.
+ inline semaphore_t ()
+ {
+ ev = CreateEvent (NULL, FALSE, FALSE, NULL);
+ win_assert (ev != NULL);
+ }
+
+ // Destroy the semaphore.
+ inline ~semaphore_t ()
+ {
+ int rc = CloseHandle (ev);
+ win_assert (rc != 0);
+ }
+
+ // Wait for the semaphore.
+ inline void wait ()
+ {
+ DWORD rc = WaitForSingleObject (ev, INFINITE);
+ win_assert (rc != WAIT_FAILED);
+ }
+
+ // Post the semaphore.
+ inline void post ()
+ {
+ int rc = SetEvent (ev);
+ win_assert (rc != 0);
+ }
+
+ private:
+
+ HANDLE ev;
+
+ // Disable copying of the object.
+ semaphore_t (const semaphore_t&);
+ void operator = (const semaphore_t&);
+ };
+
+#else
+
+ // Default implementation maps simple semaphore to POSIX semaphore.
+
+ class semaphore_t
+ {
+ public:
+
+ // Initialise the semaphore.
+ inline semaphore_t ()
+ {
+ int rc = sem_init (&sem, 0, 0);
+ errno_assert (rc != -1);
+ }
+
+ // Destroy the semaphore.
+ inline ~semaphore_t ()
+ {
+ int rc = sem_destroy (&sem);
+ errno_assert (rc != -1);
+ }
+
+ // Wait for the semaphore.
+ inline void wait ()
+ {
+ int rc = sem_wait (&sem);
+ errno_assert (rc != -1);
+ }
+
+ // Post the semaphore.
+ inline void post ()
+ {
+ int rc = sem_post (&sem);
+ errno_assert (rc != -1);
+ }
+
+ private:
+
+ // Underlying system semaphore object.
+ sem_t sem;
+
+ // Disable copying of the object.
+ semaphore_t (const semaphore_t&);
+ void operator = (const semaphore_t&);
+ };
+
+#endif
+
+}
+
+#endif
+
diff --git a/src/session.cpp b/src/session.cpp
index f798877..86086fb 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -69,13 +69,22 @@ zmq::session_t::~session_t ()
zmq_assert (!out_pipe);
}
+bool zmq::session_t::is_terminable ()
+{
+ return in_pipe->is_terminating ();
+}
+
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
if (!in_pipe || !active)
return false;
- if (!in_pipe->read (msg_))
+ if (!in_pipe->read (msg_)) {
+ active = false;
+ if (in_pipe->is_terminating ())
+ finalise ();
return false;
+ }
incomplete_in = msg_->flags & ZMQ_MSG_MORE;
return true;
@@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
zmq_assert (!in_pipe);
in_pipe = inpipe_;
active = true;
- in_pipe->set_endpoint (this);
+ in_pipe->set_event_sink (this);
}
if (outpipe_) {
zmq_assert (!out_pipe);
out_pipe = outpipe_;
- out_pipe->set_endpoint (this);
+ out_pipe->set_event_sink (this);
}
}
-void zmq::session_t::detach_inpipe (reader_t *pipe_)
+void zmq::session_t::terminated (reader_t *pipe_)
{
active = false;
in_pipe = NULL;
}
-void zmq::session_t::detach_outpipe (writer_t *pipe_)
+void zmq::session_t::terminated (writer_t *pipe_)
{
out_pipe = NULL;
}
-void zmq::session_t::kill (reader_t *pipe_)
-{
- active = false;
-}
-
-void zmq::session_t::revive (reader_t *pipe_)
+void zmq::session_t::activated (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
@@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
-void zmq::session_t::revive (writer_t *pipe_)
+void zmq::session_t::activated (writer_t *pipe_)
{
zmq_assert (out_pipe == pipe_);
if (engine)
@@ -203,6 +207,11 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_unplug ()
{
+ // TODO: There may be a problem here. The called ensures that all the
+ // commands on the fly have been delivered. However, given that the
+ // session is unregistered from the global repository only at this point
+ // there may be some commands being sent to the session right now.
+
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
@@ -210,14 +219,10 @@ void zmq::session_t::process_unplug ()
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
- if (in_pipe) {
- in_pipe->term ();
- in_pipe = NULL;
- }
- if (out_pipe) {
- out_pipe->term ();
- out_pipe = NULL;
- }
+ if (in_pipe)
+ in_pipe->terminate ();
+ if (out_pipe)
+ out_pipe->terminate ();
if (engine) {
engine->unplug ();
@@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap);
- zmq_assert (pipe);
- out_pipe = &pipe->writer;
- out_pipe->set_endpoint (this);
- socket_reader = &pipe->reader;
+ create_pipe (owner, this, options.hwm, options.swap, &socket_reader,
+ &out_pipe);
+ out_pipe->set_event_sink (this);
}
if (options.requires_out && !in_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap);
- zmq_assert (pipe);
- in_pipe = &pipe->reader;
- in_pipe->set_endpoint (this);
- socket_writer = &pipe->writer;
+ create_pipe (this, owner, options.hwm, options.swap, &in_pipe,
+ &socket_writer);
+ in_pipe->set_event_sink (this);
}
if (socket_reader || socket_writer)
@@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_,
engine = engine_;
engine->plug (this);
}
+
diff --git a/src/session.hpp b/src/session.hpp
index 9bda1ad..603b50c 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -21,15 +21,19 @@
#define __ZMQ_SESSION_HPP_INCLUDED__
#include "i_inout.hpp"
-#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
#include "blob.hpp"
+#include "pipe.hpp"
namespace zmq
{
- class session_t : public owned_t, public i_inout, public i_endpoint
+ class session_t :
+ public owned_t,
+ public i_inout,
+ public i_reader_events,
+ public i_writer_events
{
public:
@@ -50,19 +54,25 @@ namespace zmq
class socket_base_t *get_owner ();
uint64_t get_ordinal ();
- // i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void detach_inpipe (class reader_t *pipe_);
- void detach_outpipe (class writer_t *pipe_);
- void kill (class reader_t *pipe_);
- void revive (class reader_t *pipe_);
- void revive (class writer_t *pipe_);
+
+ // i_reader_events interface implementation.
+ void activated (class reader_t *pipe_);
+ void terminated (class reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (class writer_t *pipe_);
+ void terminated (class writer_t *pipe_);
private:
~session_t ();
+ // Define the delayed termination. (I.e. termination is postponed
+ // till all the data is flushed to the kernel.)
+ bool is_terminable ();
+
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c933954..5d3175a 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -23,9 +23,18 @@
#include "../include/zmq.h"
-#include "socket_base.hpp"
-#include "app_thread.hpp"
+#include "platform.hpp"
+
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#if defined _MSC_VER
+#include <intrin.h>
+#endif
+#else
+#include <unistd.h>
+#endif
+#include "socket_base.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
@@ -39,15 +48,73 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "likely.hpp"
+#include "pair.hpp"
+#include "pub.hpp"
+#include "sub.hpp"
+#include "req.hpp"
+#include "rep.hpp"
+#include "pull.hpp"
+#include "push.hpp"
+#include "xreq.hpp"
+#include "xrep.hpp"
#include "uuid.hpp"
-zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
- object_t (parent_),
+// If the RDTSC is available we use it to prevent excessive
+// polling for commands. The nice thing here is that it will work on any
+// system with x86 architecture and gcc or MSVC compiler.
+#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
+ (defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
+#define ZMQ_DELAY_COMMANDS
+#endif
+
+zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
+ uint32_t slot_)
+{
+ socket_base_t *s = NULL;
+ switch (type_) {
+
+ case ZMQ_PAIR:
+ s = new (std::nothrow) pair_t (parent_, slot_);
+ break;
+ case ZMQ_PUB:
+ s = new (std::nothrow) pub_t (parent_, slot_);
+ break;
+ case ZMQ_SUB:
+ s = new (std::nothrow) sub_t (parent_, slot_);
+ break;
+ case ZMQ_REQ:
+ s = new (std::nothrow) req_t (parent_, slot_);
+ break;
+ case ZMQ_REP:
+ s = new (std::nothrow) rep_t (parent_, slot_);
+ break;
+ case ZMQ_XREQ:
+ s = new (std::nothrow) xreq_t (parent_, slot_);
+ break;
+ case ZMQ_XREP:
+ s = new (std::nothrow) xrep_t (parent_, slot_);
+ break;
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_t (parent_, slot_);
+ break;
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_t (parent_, slot_);
+ break;
+ default:
+ errno = EINVAL;
+ return NULL;
+ }
+ zmq_assert (s);
+ return s;
+}
+
+zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
+ object_t (parent_, slot_),
+ zombie (false),
+ last_processing_time (0),
pending_term_acks (0),
ticks (0),
rcvmore (false),
- app_thread (parent_),
- shutting_down (false),
sent_seqnum (0),
processed_seqnum (0),
next_ordinal (1)
@@ -58,10 +125,38 @@ zmq::socket_base_t::~socket_base_t ()
{
}
+zmq::signaler_t *zmq::socket_base_t::get_signaler ()
+{
+ return &signaler;
+}
+
+void zmq::socket_base_t::stop ()
+{
+ // Called by ctx when it is terminated (zmq_term).
+ // 'stop' command is sent from the threads that called zmq_term to
+ // the thread owning the socket. This way, blocking call in the
+ // owner thread can be interrupted.
+ send_stop ();
+}
+
+void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_, const blob_t &peer_identity_)
+{
+ // If the peer haven't specified it's identity, let's generate one.
+ if (peer_identity_.size ()) {
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
+ }
+ else {
+ blob_t identity (1, 0);
+ identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ xattach_pipes (inpipe_, outpipe_, identity);
+ }
+}
+
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
- if (unlikely (app_thread->is_terminated ())) {
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -79,7 +174,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
- if (unlikely (app_thread->is_terminated ())) {
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -94,12 +189,37 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
+ if (option_ == ZMQ_FD) {
+ if (*optvallen_ < sizeof (fd_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((fd_t*) optval_) = signaler.get_fd ();
+ *optvallen_ = sizeof (fd_t);
+ return 0;
+ }
+
+ if (option_ == ZMQ_EVENTS) {
+ if (*optvallen_ < sizeof (uint32_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ process_commands(false, false);
+ *((uint32_t*) optval_) = 0;
+ if (has_out ())
+ *((uint32_t*) optval_) |= ZMQ_POLLOUT;
+ if (has_in ())
+ *((uint32_t*) optval_) |= ZMQ_POLLIN;
+ *optvallen_ = sizeof (uint32_t);
+ return 0;
+ }
+
return options.getsockopt (option_, optval_, optvallen_);
}
int zmq::socket_base_t::bind (const char *addr_)
{
- if (unlikely (app_thread->is_terminated ())) {
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -159,7 +279,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
- if (unlikely (app_thread->is_terminated ())) {
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -190,30 +310,29 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer)
return -1;
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
-
+ reader_t *inpipe_reader = NULL;
+ writer_t *inpipe_writer = NULL;
+ reader_t *outpipe_reader = NULL;
+ writer_t *outpipe_writer = NULL;
+
// Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap);
- zmq_assert (in_pipe);
- }
+ if (options.requires_in)
+ create_pipe (this, peer, options.hwm, options.swap,
+ &inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap);
- zmq_assert (out_pipe);
- }
+ if (options.requires_out)
+ create_pipe (peer, this, options.hwm, options.swap,
+ &outpipe_reader, &outpipe_writer);
// Attach the pipes to this socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL, blob_t ());
+ attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
- send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, options.identity, false);
+ send_bind (peer, outpipe_reader, inpipe_writer,
+ options.identity, false);
return 0;
}
@@ -224,34 +343,31 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- // If 'immediate connect' feature is required, we'll created the pipes
+ // If 'immediate connect' feature is required, we'll create the pipes
// to the session straight away. Otherwise, they'll be created by the
// session once the connection is established.
if (options.immediate_connect) {
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ reader_t *inpipe_reader = NULL;
+ writer_t *inpipe_writer = NULL;
+ reader_t *outpipe_reader = NULL;
+ writer_t *outpipe_writer = NULL;
// Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap);
- zmq_assert (in_pipe);
-
- }
+ if (options.requires_in)
+ create_pipe (this, session, options.hwm, options.swap,
+ &inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap);
- zmq_assert (out_pipe);
- }
+ if (options.requires_out)
+ create_pipe (session, this, options.hwm, options.swap,
+ &outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL, blob_t ());
+ attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, blob_t ());
+ session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}
// Activate the session.
@@ -347,8 +463,14 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
+ if (unlikely (zombie)) {
+ errno = ETERM;
+ return -1;
+ }
+
// Process pending commands, if any.
- if (unlikely (!app_thread->process_commands (false, true))) {
+ process_commands (false, true);
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -372,7 +494,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- if (unlikely (!app_thread->process_commands (true, false))) {
+ process_commands (true, false);
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -383,6 +506,11 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
+ if (unlikely (zombie)) {
+ errno = ETERM;
+ return -1;
+ }
+
// Get the message.
int rc = xrecv (msg_, flags_);
int err = errno;
@@ -396,7 +524,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
- if (unlikely (!app_thread->process_commands (false, false))) {
+ process_commands (false, false);
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -420,7 +549,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
- if (unlikely (!app_thread->process_commands (false, false))) {
+ process_commands (false, false);
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -440,7 +570,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
- if (unlikely (!app_thread->process_commands (true, false))) {
+ process_commands (true, false);
+ if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
@@ -456,74 +587,72 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
- shutting_down = true;
-
- // Let the thread know that the socket is no longer available.
- app_thread->remove_socket (this);
-
- // Pointer to the context must be retrieved before the socket is
- // deallocated. Afterwards it is not available.
- ctx_t *ctx = get_ctx ();
+ // Socket becomes a zombie. From now on all new arrived pipes (bind
+ // command) and I/O objects (own command) are immediately terminated.
+ // Also, any further requests form I/O object termination are ignored
+ // (we are going to shut them down anyway -- this way we assure that
+ // we do so once only).
+ zombie = true;
// Unregister all inproc endpoints associated with this socket.
- // From this point we are sure that inc_seqnum won't be called again
- // on this object.
- ctx->unregister_endpoints (this);
-
- // Wait till all undelivered commands are delivered. This should happen
- // very quickly. There's no way to wait here for extensive period of time.
+ // Doing this we make sure that no new pipes from other sockets (inproc)
+ // will be initiated. However, there may be some inproc pipes already
+ // on the fly, but not yet received by this socket. To get finished
+ // with them we'll do the subsequent waiting from on-the-fly commands.
+ // This should happen very quickly. There's no way to block here for
+ // extensive period of time.
+ unregister_endpoints (this);
while (processed_seqnum != sent_seqnum.get ())
- app_thread->process_commands (true, false);
-
- while (true) {
-
- // On third pass of the loop there should be no more I/O objects
- // because all connecters and listerners were destroyed during
- // the first pass and all engines delivered by delayed 'own' commands
- // are destroyed during the second pass.
- if (io_objects.empty () && !pending_term_acks)
- break;
-
- // Send termination request to all associated I/O objects.
- for (io_objects_t::iterator it = io_objects.begin ();
- it != io_objects.end (); it++)
- send_term (*it);
-
- // Move the objects to the list of pending term acks.
- pending_term_acks += io_objects.size ();
- io_objects.clear ();
-
- // Process commands till we get all the termination acknowledgements.
- while (pending_term_acks)
- app_thread->process_commands (true, false);
- }
-
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (named_sessions.empty ());
- zmq_assert (unnamed_sessions.empty ());
- sessions_sync.unlock ();
-
- delete this;
-
- // This function must be called after the socket is completely deallocated
- // as it may cause termination of the whole 0MQ infrastructure.
- ctx->destroy_socket ();
+ process_commands (true, false);
+ // TODO: My feeling is that the above has to be done in the dezombification
+ // loop, otherwise we may end up with number of i/o object dropping to zero
+ // even though there are more i/o objects on the way.
+
+ // The above process ensures that only pipes that will arrive from now on
+ // are those initiated by sessions. These in turn have a nice property of
+ // not arriving totally asynchronously. When a session -- being an I/O
+ // object -- acknowledges its termination we are 100% sure that we'll get
+ // no new pipe from it.
+
+ // Start termination of all the pipes presently associated with the socket.
+ xterm_pipes ();
+
+ // Send termination request to all associated I/O objects.
+ // Start waiting for the acks. Note that the actual waiting is not done
+ // in this function. Rather it is done in delayed manner as socket is
+ // being dezombified. The reason is that I/O object shutdown can take
+ // considerable amount of time in case there's still a lot of data to
+ // push to the network.
+ for (io_objects_t::iterator it = io_objects.begin ();
+ it != io_objects.end (); it++)
+ send_term (*it);
+ pending_term_acks += io_objects.size ();
+ io_objects.clear ();
+
+ // Note that new I/O objects may arrive even in zombie state (say new
+ // session initiated by a listener object), however, in such case number
+ // of pending acks never drops to zero. Here's the scenario: We have an
+ // pending ack for the listener object. Then 'own' commands arrives from
+ // the listener notifying the socket about new session. It immediately
+ // triggers termination request and number of of pending acks if
+ // incremented. Then term_acks arrives from the listener. Number of pending
+ // acks is decremented. Later on, the session itself will ack its
+ // termination. During the process, number of pending acks never dropped
+ // to zero and thus the socket remains safely in the zombie state.
+
+ // Transfer the ownership of the socket from this application thread
+ // to the context which will take care of the rest of shutdown process.
+ zombify (this);
return 0;
}
void zmq::socket_base_t::inc_seqnum ()
{
- // NB: This function may be called from a different thread!
+ // Be aware: This function may be called from a different thread!
sent_seqnum.add (1);
}
-zmq::app_thread_t *zmq::socket_base_t::get_thread ()
-{
- return app_thread;
-}
-
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
@@ -607,68 +736,133 @@ zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
return session;
}
-void zmq::socket_base_t::kill (reader_t *pipe_)
+bool zmq::socket_base_t::dezombify ()
{
- xkill (pipe_);
-}
+ zmq_assert (zombie);
-void zmq::socket_base_t::revive (reader_t *pipe_)
-{
- xrevive (pipe_);
-}
+ // Process any commands from other threads/sockets that may be available
+ // at the moment.
+ process_commands (false, false);
-void zmq::socket_base_t::revive (writer_t *pipe_)
-{
- xrevive (pipe_);
+ // If there are no more pipes attached and there are no more I/O objects
+ // owned by the socket, we can kill the zombie.
+ if (!pending_term_acks && !xhas_pipes ()) {
+
+ // If all objects have acknowledged their termination there should
+ // definitely be no I/O object remaining in the list.
+ zmq_assert (io_objects.empty ());
+
+ // Check whether there are no session leaks.
+ sessions_sync.lock ();
+ zmq_assert (named_sessions.empty ());
+ zmq_assert (unnamed_sessions.empty ());
+ sessions_sync.unlock ();
+
+ // Deallocate all the resources tied to this socket.
+ delete this;
+
+ // Notify the caller about the fact that the zombie is finally dead.
+ return true;
+ }
+
+ // The zombie remains undead.
+ return false;
}
-void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
{
- if (inpipe_)
- inpipe_->set_endpoint (this);
- if (outpipe_)
- outpipe_->set_endpoint (this);
-
- // If the peer haven't specified it's identity, let's generate one.
- if (peer_identity_.size ()) {
- xattach_pipes (inpipe_, outpipe_, peer_identity_);
+ bool received;
+ command_t cmd;
+ if (block_) {
+ received = signaler.recv (&cmd, true);
+ zmq_assert (received);
}
else {
- blob_t identity (1, 0);
- identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
- xattach_pipes (inpipe_, outpipe_, identity);
+
+#if defined ZMQ_DELAY_COMMANDS
+ // Optimised version of command processing - it doesn't have to check
+ // for incoming commands each time. It does so only if certain time
+ // elapsed since last command processing. Command delay varies
+ // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+ // etc. The optimisation makes sense only on platforms where getting
+ // a timestamp is a very cheap operation (tens of nanoseconds).
+ if (throttle_) {
+
+ // Get timestamp counter.
+#if defined __GNUC__
+ uint32_t low;
+ uint32_t high;
+ __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
+ uint64_t current_time = (uint64_t) high << 32 | low;
+#elif defined _MSC_VER
+ uint64_t current_time = __rdtsc ();
+#else
+#error
+#endif
+
+ // Check whether certain time have elapsed since last command
+ // processing.
+ if (current_time - last_processing_time <= max_command_delay)
+ return;
+ last_processing_time = current_time;
+ }
+#endif
+
+ // Check whether there are any commands pending for this thread.
+ received = signaler.recv (&cmd, false);
}
-}
-void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
-{
- xdetach_inpipe (pipe_);
- pipe_->set_endpoint (NULL); // ?
+ // Process all the commands available at the moment.
+ while (received) {
+ cmd.destination->process_command (cmd);
+ received = signaler.recv (&cmd, false);
+ }
}
-void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
+void zmq::socket_base_t::process_stop ()
{
- xdetach_outpipe (pipe_);
- pipe_->set_endpoint (NULL); // ?
+ // Here, someone have called zmq_term while the socket was still alive.
+ // We'll zombify it so that any blocking call is interrupted and any
+ // further attempt to use the socket will return ETERM. The user is still
+ // responsible for calling zmq_close on the socket though!
+ zombie = true;
}
void zmq::socket_base_t::process_own (owned_t *object_)
{
+ // If the socket is already being shut down, new owned objects are
+ // immediately asked to terminate.
+ if (zombie) {
+ send_term (object_);
+ pending_term_acks++;
+ return;
+ }
+
io_objects.insert (object_);
}
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
+ // If the socket is already being shut down, the termination process on
+ // the new pipes is started immediately. However, they are still attached
+ // as to let the process finish in a decent manner.
+ if (unlikely (zombie)) {
+ if (in_pipe_)
+ in_pipe_->terminate ();
+ if (out_pipe_)
+ out_pipe_->terminate ();
+ }
+
attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
- // objects. They are going to be terminated anyway.
- if (shutting_down)
+ // objects. It means the termination request was already sent to
+ // the object.
+ if (zombie)
return;
// If I/O object is well and alive ask it to terminate.
@@ -676,7 +870,7 @@ void zmq::socket_base_t::process_term_req (owned_t *object_)
io_objects.end (), object_);
// If not found, we assume that termination request was already sent to
- // the object so we can sagely ignore the request.
+ // the object so we can safely ignore the request.
if (it == io_objects.end ())
return;
@@ -696,3 +890,32 @@ void zmq::socket_base_t::process_seqnum ()
processed_seqnum++;
}
+int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+bool zmq::socket_base_t::xhas_out ()
+{
+ return false;
+}
+
+int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::socket_base_t::xhas_in ()
+{
+ return false;
+}
+
+int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 3d95cec..386fdbb 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -26,13 +26,13 @@
#include "../include/zmq.h"
-#include "i_endpoint.hpp"
#include "object.hpp"
#include "yarray_item.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
+#include "signaler.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -40,11 +40,21 @@ namespace zmq
{
class socket_base_t :
- public object_t, public i_endpoint, public yarray_item_t
+ public object_t,
+ public yarray_item_t
{
public:
- socket_base_t (class app_thread_t *parent_);
+ // Create a socket of a specified type.
+ static socket_base_t *create (int type_, class ctx_t *parent_,
+ uint32_t slot_);
+
+ // Returns the signaler associated with this socket.
+ signaler_t *get_signaler ();
+
+ // Interrupt blocking call if the socket is stuck in one.
+ // This function can be called from a different thread!
+ void stop ();
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
@@ -60,11 +70,6 @@ namespace zmq
// before the command is delivered.
void inc_seqnum ();
- // This function is used by the polling mechanism to determine
- // whether the socket belongs to the application thread the poll
- // is called from.
- class app_thread_t *get_thread ();
-
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
@@ -85,43 +90,67 @@ namespace zmq
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
- // i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
- const blob_t &peer_identity_);
- void detach_inpipe (class reader_t *pipe_);
- void detach_outpipe (class writer_t *pipe_);
- void kill (class reader_t *pipe_);
- void revive (class reader_t *pipe_);
- void revive (class writer_t *pipe_);
+ // i_reader_events interface implementation.
+ void activated (class reader_t *pipe_);
+ void terminated (class reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (class writer_t *pipe_);
+ void terminated (class writer_t *pipe_);
+
+ // This function should be called only on zombie sockets. It tries
+ // to deallocate the zombie and returns true is successful.
+ bool dezombify ();
protected:
- // Destructor is protected. Socket is closed using 'close' function.
+ socket_base_t (class ctx_t *parent_, uint32_t slot_);
virtual ~socket_base_t ();
- // Pipe management is done by individual socket types.
+ // Concrete algorithms for the x- methods are to be defined by
+ // individual socket types.
+
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
- virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
- virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
- virtual void xkill (class reader_t *pipe_) = 0;
- virtual void xrevive (class reader_t *pipe_) = 0;
- virtual void xrevive (class writer_t *pipe_) = 0;
+ virtual void xterm_pipes () = 0;
+ virtual bool xhas_pipes () = 0;
- // Actual algorithms are to be defined by individual socket types.
+ // The default implementation assumes there are no specific socket
+ // options for the particular socket type. If not so, overload this
+ // method.
virtual int xsetsockopt (int option_, const void *optval_,
- size_t optvallen_) = 0;
- virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
- virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
- virtual bool xhas_in () = 0;
- virtual bool xhas_out () = 0;
+ size_t optvallen_);
+
+ // The default implementation assumes that send is not supported.
+ virtual bool xhas_out ();
+ virtual int xsend (zmq_msg_t *msg_, int options_);
+
+ // The default implementation assumes that recv in not supported.
+ virtual bool xhas_in ();
+ virtual int xrecv (zmq_msg_t *msg_, int options_);
// Socket options.
options_t options;
+ // If true, socket was already closed but not yet deallocated
+ // because either shutdown is in process or there are still pipes
+ // attached to the socket.
+ bool zombie;
+
private:
+ // If no identity set generate one and call xattach_pipes ().
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
+
+ // Processes commands sent to this socket (if any). If 'block' is
+ // set to true, returns only after at least one command was processed.
+ // If throttle argument is true, commands are processed at most once
+ // in a predefined time period.
+ void process_commands (bool block_, bool throttle_);
+
// Handlers for incoming commands.
+ void process_stop ();
void process_own (class owned_t *object_);
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_);
@@ -129,6 +158,12 @@ namespace zmq
void process_term_ack ();
void process_seqnum ();
+ // App thread's signaler object.
+ signaler_t signaler;
+
+ // Timestamp of when commands were processed the last time.
+ uint64_t last_processing_time;
+
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
@@ -144,13 +179,6 @@ namespace zmq
// If true there's a half-read message in the socket.
bool rcvmore;
- // Application thread the socket lives in.
- class app_thread_t *app_thread;
-
- // If true, socket is already shutting down. No new work should be
- // started.
- bool shutting_down;
-
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
diff --git a/src/sub.cpp b/src/sub.cpp
index eeb50cd..a1e8fb7 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -24,8 +24,8 @@
#include "sub.hpp"
#include "err.hpp"
-zmq::sub_t::sub_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
has_message (false),
more (false)
{
@@ -46,31 +46,14 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::sub_t::xterm_pipes ()
{
- zmq_assert (pipe_);
- fq.detach (pipe_);
+ fq.term_pipes ();
}
-void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
+bool zmq::sub_t::xhas_pipes ()
{
- // SUB socket is read-only thus there should be no outpipes.
- zmq_assert (false);
-}
-
-void zmq::sub_t::xkill (class reader_t *pipe_)
-{
- fq.kill (pipe_);
-}
-
-void zmq::sub_t::xrevive (class reader_t *pipe_)
-{
- fq.revive (pipe_);
-}
-
-void zmq::sub_t::xrevive (class writer_t *pipe_)
-{
- zmq_assert (false);
+ return fq.has_pipes ();
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
@@ -93,12 +76,6 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
-int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
-}
-
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
@@ -179,11 +156,6 @@ bool zmq::sub_t::xhas_in ()
}
}
-bool zmq::sub_t::xhas_out ()
-{
- return false;
-}
-
bool zmq::sub_t::match (zmq_msg_t *msg_)
{
return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
diff --git a/src/sub.hpp b/src/sub.hpp
index 7b997c9..da69892 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -33,7 +33,7 @@ namespace zmq
{
public:
- sub_t (class app_thread_t *parent_);
+ sub_t (class ctx_t *parent_, uint32_t slot_);
~sub_t ();
protected:
@@ -41,16 +41,11 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
- int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
- bool xhas_out ();
private:
diff --git a/src/thread.cpp b/src/thread.cpp
index 602ca8b..4e86531 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -38,16 +38,6 @@ void zmq::thread_t::stop ()
win_assert (rc != WAIT_FAILED);
}
-zmq::thread_t::id_t zmq::thread_t::id ()
-{
- return GetCurrentThreadId ();
-}
-
-bool zmq::thread_t::equal (id_t id1_, id_t id2_)
-{
- return id1_ == id2_;
-}
-
unsigned int __stdcall zmq::thread_t::thread_routine (void *arg_)
{
thread_t *self = (thread_t*) arg_;
@@ -73,16 +63,6 @@ void zmq::thread_t::stop ()
errno_assert (rc == 0);
}
-zmq::thread_t::id_t zmq::thread_t::id ()
-{
- return pthread_self ();
-}
-
-bool zmq::thread_t::equal (id_t id1_, id_t id2_)
-{
- return pthread_equal (id1_, id2_) != 0;
-}
-
void *zmq::thread_t::thread_routine (void *arg_)
{
#if !defined ZMQ_HAVE_OPENVMS
diff --git a/src/thread.hpp b/src/thread.hpp
index 432770c..8af6ea5 100644
--- a/src/thread.hpp
+++ b/src/thread.hpp
@@ -54,15 +54,6 @@ namespace zmq
// Waits for thread termination.
void stop ();
-
-#ifdef ZMQ_HAVE_WINDOWS
- typedef DWORD id_t;
-#else
- typedef pthread_t id_t;
-#endif
-
- static id_t id ();
- static bool equal (id_t id1_, id_t id2_);
private:
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 5fd6cbb..73d7856 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
current_in (0),
prefetched (false),
more_in (false),
@@ -41,31 +41,41 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
zmq::xrep_t::~xrep_t ()
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++)
- it->reader->term ();
- for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
- it++)
- it->second.writer->term ();
+ zmq_assert (inpipes.empty ());
+ zmq_assert (outpipes.empty ());
}
-void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
+ outpipe_->set_event_sink (this);
+
// TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {outpipe_, true};
bool ok = outpipes.insert (std::make_pair (
peer_identity_, outpipe)).second;
zmq_assert (ok);
+ inpipe_->set_event_sink (this);
+
inpipe_t inpipe = {inpipe_, peer_identity_, true};
inpipes.push_back (inpipe);
}
-void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::xrep_t::xterm_pipes ()
+{
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
+ it++)
+ it->reader->terminate ();
+ for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
+ it++)
+ it->second.writer->terminate ();
+}
+
+void zmq::xrep_t::terminated (reader_t *pipe_)
{
-// TODO:!
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) {
if (it->reader == pipe_) {
@@ -76,7 +86,7 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::xrep_t::terminated (writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
@@ -90,20 +100,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::xkill (class reader_t *pipe_)
+bool zmq::xrep_t::xhas_pipes ()
{
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- it++) {
- if (it->reader == pipe_) {
- zmq_assert (it->active);
- it->active = false;
- return;
- }
- }
- zmq_assert (false);
+ return !inpipes.empty () || !outpipes.empty ();
}
-void zmq::xrep_t::xrevive (class reader_t *pipe_)
+void zmq::xrep_t::activated (reader_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) {
@@ -116,7 +118,7 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::xrevive (class writer_t *pipe_)
+void zmq::xrep_t::activated (writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
@@ -129,13 +131,6 @@ void zmq::xrep_t::xrevive (class writer_t *pipe_)
zmq_assert (false);
}
-int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- errno = EINVAL;
- return -1;
-}
-
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the identity of the
@@ -232,7 +227,9 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
- // If me don't have a message, move to next pipe.
+ // If me don't have a message, mark the pipe as passive and
+ // move to next pipe.
+ inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
@@ -259,6 +256,10 @@ bool zmq::xrep_t::xhas_in ()
if (inpipes [current_in].active &&
inpipes [current_in].reader->check_read ())
return true;
+
+ // If me don't have a message, mark the pipe as passive and
+ // move to next pipe.
+ inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
diff --git a/src/xrep.hpp b/src/xrep.hpp
index da1b3d8..1c240ff 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -25,32 +25,40 @@
#include "socket_base.hpp"
#include "blob.hpp"
+#include "pipe.hpp"
namespace zmq
{
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
- class xrep_t : public socket_base_t
+ class xrep_t :
+ public socket_base_t,
+ public i_reader_events,
+ public i_writer_events
{
public:
- xrep_t (class app_thread_t *parent_);
+ xrep_t (class ctx_t *parent_, uint32_t slot_);
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ // i_reader_events interface implementation.
+ void activated (reader_t *pipe_);
+ void terminated (reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
private:
struct inpipe_t
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 66e5cc3..893c18e 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -22,8 +22,8 @@
#include "xreq.hpp"
#include "err.hpp"
-zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_)
{
options.requires_in = true;
options.requires_out = true;
@@ -41,38 +41,15 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::xreq_t::xterm_pipes ()
{
- zmq_assert (pipe_);
- fq.detach (pipe_);
+ fq.term_pipes ();
+ lb.term_pipes ();
}
-void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
+bool zmq::xreq_t::xhas_pipes ()
{
- zmq_assert (pipe_);
- lb.detach (pipe_);
-}
-
-void zmq::xreq_t::xkill (class reader_t *pipe_)
-{
- fq.kill (pipe_);
-}
-
-void zmq::xreq_t::xrevive (class reader_t *pipe_)
-{
- fq.revive (pipe_);
-}
-
-void zmq::xreq_t::xrevive (class writer_t *pipe_)
-{
- lb.revive (pipe_);
-}
-
-int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- errno = EINVAL;
- return -1;
+ return fq.has_pipes () || lb.has_pipes ();
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 8ee0bb9..b8b9a0b 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -31,18 +31,14 @@ namespace zmq
{
public:
- xreq_t (class app_thread_t *parent_);
+ xreq_t (class ctx_t *parent_, uint32_t slot_);
~xreq_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xdetach_inpipe (class reader_t *pipe_);
- void xdetach_outpipe (class writer_t *pipe_);
- void xkill (class reader_t *pipe_);
- void xrevive (class reader_t *pipe_);
- void xrevive (class writer_t *pipe_);
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ void xterm_pipes ();
+ bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 342a8a6..6dc577e 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -29,7 +29,6 @@
#include "queue.hpp"
#include "streamer.hpp"
#include "socket_base.hpp"
-#include "app_thread.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
@@ -83,8 +82,6 @@ const char *zmq_strerror (int errnum_)
case EINPROGRESS:
return "Operation in progress";
#endif
- case EMTHREAD:
- return "Number of preallocated application threads exceeded";
case EFSM:
return "Operation cannot be accomplished in current state";
case ENOCOMPATPROTO:
@@ -367,6 +364,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
+/*
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
@@ -679,6 +677,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP;
return -1;
#endif
+*/
+zmq_assert (false);
+return -1;
}
int zmq_errno ()
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 077286f..d552c61 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -52,7 +52,7 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready ()
{
// Destroy content of the old message.
- zmq_msg_close(&in_progress);
+ zmq_msg_close (&in_progress);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way