summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
commit0b59866a84f733e5a53b0d2f32570581691747ef (patch)
tree8861d97915544dc4385177931f299a6f27603c92
parent311fb0d852374e769d8ff791c9df38f0464960c6 (diff)
Patches from sub-forward branch incorporated
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/Makefile.am2
-rw-r--r--src/command.hpp8
-rw-r--r--src/connect_session.cpp7
-rw-r--r--src/connect_session.hpp4
-rw-r--r--src/fq.cpp7
-rw-r--r--src/fq.hpp4
-rw-r--r--src/mtrie.cpp218
-rw-r--r--src/mtrie.hpp83
-rw-r--r--src/named_session.cpp5
-rw-r--r--src/named_session.hpp4
-rw-r--r--src/object.cpp25
-rw-r--r--src/object.hpp2
-rw-r--r--src/pipe.cpp43
-rw-r--r--src/pipe.hpp13
-rw-r--r--src/pub.cpp13
-rw-r--r--src/pub.hpp4
-rw-r--r--src/session.cpp28
-rw-r--r--src/session.hpp21
-rw-r--r--src/socket_base.cpp14
-rw-r--r--src/socket_base.hpp6
-rw-r--r--src/sub.cpp115
-rw-r--r--src/sub.hpp21
-rw-r--r--src/transient_session.cpp6
-rw-r--r--src/transient_session.hpp4
-rw-r--r--src/trie.cpp63
-rw-r--r--src/trie.hpp18
-rw-r--r--src/xpub.cpp120
-rw-r--r--src/xpub.hpp26
-rw-r--r--src/xsub.cpp160
-rw-r--r--src/xsub.hpp30
30 files changed, 884 insertions, 190 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index de83d76..46c66ee 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -34,6 +34,7 @@ libzmq_la_SOURCES = \
likely.hpp \
mailbox.hpp \
msg.hpp \
+ mtrie.hpp \
mutex.hpp \
named_session.hpp \
object.hpp \
@@ -97,6 +98,7 @@ libzmq_la_SOURCES = \
lb.cpp \
mailbox.cpp \
msg.cpp \
+ mtrie.cpp \
named_session.cpp \
object.cpp \
options.cpp \
diff --git a/src/command.hpp b/src/command.hpp
index ff7b551..15cee0a 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -42,6 +42,7 @@ namespace zmq
bind,
activate_read,
activate_write,
+ hiccup,
pipe_term,
pipe_term_ack,
term_req,
@@ -95,6 +96,13 @@ namespace zmq
uint64_t msgs_read;
} activate_write;
+ // Sent by pipe reader to writer after creating a new inpipe.
+ // The parameter is actually of type pipe_t::upipe_t, however,
+ // its definition is private so we'll have to do with void*.
+ struct {
+ void *pipe;
+ } hiccup;
+
// Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe.
struct {
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index 9a29bf1..fe7332a 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -111,7 +111,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false);
}
-bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
+bool zmq::connect_session_t::xattached (const blob_t &peer_identity_)
{
// If there was no previous connection...
if (!connected) {
@@ -153,9 +153,12 @@ bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
return true;
}
-void zmq::connect_session_t::detached ()
+bool zmq::connect_session_t::xdetached ()
{
// Reconnect.
start_connecting (true);
+
+ // Don't tear the session down.
+ return true;
}
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
index 93e2704..3b8a26b 100644
--- a/src/connect_session.hpp
+++ b/src/connect_session.hpp
@@ -44,8 +44,8 @@ namespace zmq
private:
// Handlers for events from session base class.
- bool attached (const blob_t &peer_identity_);
- void detached ();
+ bool xattached (const blob_t &peer_identity_);
+ bool xdetached ();
// Start the connection process.
void start_connecting (bool wait_);
diff --git a/src/fq.cpp b/src/fq.cpp
index 7318822..63a50ff 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -63,6 +63,11 @@ void zmq::fq_t::activated (pipe_t *pipe_)
int zmq::fq_t::recv (msg_t *msg_, int flags_)
{
+ return recvpipe (msg_, flags_, NULL);
+}
+
+int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
+{
// Deallocate old content of the message.
int rc = msg_->close ();
errno_assert (rc == 0);
@@ -83,6 +88,8 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_)
// and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if (fetched) {
+ if (pipe_)
+ *pipe_ = pipes [current];
more = msg_->flags () & msg_t::more;
if (!more) {
current++;
diff --git a/src/fq.hpp b/src/fq.hpp
index 106e978..be9c695 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -29,8 +29,9 @@ namespace zmq
{
// Class manages a set of inbound pipes. On receive it performs fair
- // queueing (RFC970) so that senders gone berserk won't cause denial of
+ // queueing so that senders gone berserk won't cause denial of
// service for decent senders.
+
class fq_t
{
public:
@@ -43,6 +44,7 @@ namespace zmq
void terminated (pipe_t *pipe_);
int recv (msg_t *msg_, int flags_);
+ int recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_);
bool has_in ();
private:
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
new file mode 100644
index 0000000..ac1fc31
--- /dev/null
+++ b/src/mtrie.cpp
@@ -0,0 +1,218 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include <new>
+#include <algorithm>
+
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#endif
+
+#include "err.hpp"
+#include "pipe.hpp"
+#include "mtrie.hpp"
+
+zmq::mtrie_t::mtrie_t () :
+ min (0),
+ count (0)
+{
+}
+
+zmq::mtrie_t::~mtrie_t ()
+{
+ if (count == 1)
+ delete next.node;
+ else if (count > 1) {
+ for (unsigned short i = 0; i != count; ++i)
+ if (next.table [i])
+ delete next.table [i];
+ free (next.table);
+ }
+}
+
+bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
+{
+ // We are at the node corresponding to the prefix. We are done.
+ if (!size_) {
+ bool result = pipes.empty ();
+ pipes.insert (pipe_);
+ return result;
+ }
+
+ unsigned char c = *prefix_;
+ if (c < min || c >= min + count) {
+
+ // The character is out of range of currently handled
+ // charcters. We have to extend the table.
+ if (!count) {
+ min = c;
+ count = 1;
+ next.node = NULL;
+ }
+ else if (count == 1) {
+ unsigned char oldc = min;
+ mtrie_t *oldp = next.node;
+ count = (min < c ? c - min : min - c) + 1;
+ next.table = (mtrie_t**)
+ malloc (sizeof (mtrie_t*) * count);
+ zmq_assert (next.table);
+ for (unsigned short i = 0; i != count; ++i)
+ next.table [i] = 0;
+ min = std::min (min, c);
+ next.table [oldc - min] = oldp;
+ }
+ else if (min < c) {
+
+ // The new character is above the current character range.
+ unsigned short old_count = count;
+ count = c - min + 1;
+ next.table = (mtrie_t**) realloc ((void*) next.table,
+ sizeof (mtrie_t*) * count);
+ zmq_assert (next.table);
+ for (unsigned short i = old_count; i != count; i++)
+ next.table [i] = NULL;
+ }
+ else {
+
+ // The new character is below the current character range.
+ unsigned short old_count = count;
+ count = (min + old_count) - c;
+ next.table = (mtrie_t**) realloc ((void*) next.table,
+ sizeof (mtrie_t*) * count);
+ zmq_assert (next.table);
+ memmove (next.table + min - c, next.table,
+ old_count * sizeof (mtrie_t*));
+ for (unsigned short i = 0; i != min - c; i++)
+ next.table [i] = NULL;
+ min = c;
+ }
+ }
+
+ // If next node does not exist, create one.
+ if (count == 1) {
+ if (!next.node) {
+ next.node = new (std::nothrow) mtrie_t;
+ zmq_assert (next.node);
+ }
+ return next.node->add (prefix_ + 1, size_ - 1, pipe_);
+ }
+ else {
+ if (!next.table [c - min]) {
+ next.table [c - min] = new (std::nothrow) mtrie_t;
+ zmq_assert (next.table [c - min]);
+ }
+ return next.table [c - min]->add (prefix_ + 1, size_ - 1, pipe_);
+ }
+}
+
+
+void zmq::mtrie_t::rm (pipe_t *pipe_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_)
+{
+ unsigned char *buff = NULL;
+ rm_helper (pipe_, &buff, 0, 0, func_, arg_);
+ free (buff);
+}
+
+void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
+ size_t buffsize_, size_t maxbuffsize_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_)
+{
+ // Remove the subscription from this node.
+ if (pipes.erase (pipe_) && pipes.empty ())
+ func_ (*buff_, buffsize_, arg_);
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ alloc_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (count == 1) {
+ (*buff_) [buffsize_] = min;
+ buffsize_++;
+ next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
+ func_, arg_);
+ return;
+ }
+
+ // If there are multiple subnodes.
+ for (unsigned char c = 0; c != count; c++) {
+ (*buff_) [buffsize_] = min + c;
+ if (next.table [c])
+ next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
+ maxbuffsize_, func_, arg_);
+ }
+}
+
+bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
+{
+ if (!size_) {
+ pipes_t::size_type erased = pipes.erase (pipe_);
+ zmq_assert (erased == 1);
+ return pipes.empty ();
+ }
+
+ unsigned char c = *prefix_;
+ if (!count || c < min || c >= min + count)
+ return false;
+
+ mtrie_t *next_node =
+ count == 1 ? next.node : next.table [c - min];
+
+ if (!next_node)
+ return false;
+
+ return next_node->rm (prefix_ + 1, size_ - 1, pipe_);
+}
+
+void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
+{
+ // Merge the subscriptions from this node to the resultset.
+ pipes_.insert (pipes.begin (), pipes.end ());
+
+ // If there are no subnodes in the trie, return.
+ if (count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (count == 1) {
+ next.node->match (data_ + 1, size_ - 1, pipes_);
+ return;
+ }
+
+ // If there are multiple subnodes.
+ for (unsigned char c = 0; c != count; c++) {
+ if (next.table [c])
+ next.table [c]->match (data_ + 1, size_ - 1, pipes_);
+ }
+}
+
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
new file mode 100644
index 0000000..99f20e2
--- /dev/null
+++ b/src/mtrie.hpp
@@ -0,0 +1,83 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_MTRIE_HPP_INCLUDED__
+#define __ZMQ_MTRIE_HPP_INCLUDED__
+
+#include <stddef.h>
+#include <set>
+
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+ // Multi-trie. Each node in the trie is a set of pointers to pipes.
+
+ class mtrie_t
+ {
+ public:
+
+ typedef std::set <class pipe_t*> pipes_t;
+
+ mtrie_t ();
+ ~mtrie_t ();
+
+ // Add key to the trie. Returns true if it's a new subscription
+ // rather than a duplicate.
+ bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
+
+ // Remove all subscriptions for a specific peer from the trie.
+ // If there are no subscriptions left on some topics, invoke the
+ // supplied callback function.
+ void rm (class pipe_t *pipe_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_);
+
+ // Remove specific subscription from the trie. Return true is it was
+ // actually removed rather than de-duplicated.
+ bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
+
+ // Get all matching pipes.
+ void match (unsigned char *data_, size_t size_, pipes_t &pipes_);
+
+ private:
+
+ void rm_helper (class pipe_t *pipe_, unsigned char **buff_,
+ size_t buffsize_, size_t maxbuffsize_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_);
+
+ pipes_t pipes;
+ unsigned char min;
+ unsigned short count;
+ union {
+ class mtrie_t *node;
+ class mtrie_t **table;
+ } next;
+
+ mtrie_t (const mtrie_t&);
+ const mtrie_t &operator = (const mtrie_t&);
+ };
+
+}
+
+#endif
+
diff --git a/src/named_session.cpp b/src/named_session.cpp
index 34f4af4..8e43fb0 100644
--- a/src/named_session.cpp
+++ b/src/named_session.cpp
@@ -45,7 +45,7 @@ zmq::named_session_t::~named_session_t ()
unregister_session (peer_identity);
}
-bool zmq::named_session_t::attached (const blob_t &peer_identity_)
+bool zmq::named_session_t::xattached (const blob_t &peer_identity_)
{
// Double check that identities match.
zmq_assert (peer_identity == peer_identity_);
@@ -58,9 +58,10 @@ bool zmq::named_session_t::attached (const blob_t &peer_identity_)
return true;
}
-void zmq::named_session_t::detached ()
+bool zmq::named_session_t::xdetached ()
{
// Do nothing. Named sessions are never destroyed because of disconnection.
// Neither they have to actively reconnect.
+ return true;
}
diff --git a/src/named_session.hpp b/src/named_session.hpp
index e3b5aa3..10af8cc 100644
--- a/src/named_session.hpp
+++ b/src/named_session.hpp
@@ -40,8 +40,8 @@ namespace zmq
~named_session_t ();
// Handlers for events from session base class.
- bool attached (const blob_t &peer_identity_);
- void detached ();
+ bool xattached (const blob_t &peer_identity_);
+ bool xdetached ();
private:
diff --git a/src/object.cpp b/src/object.cpp
index 0a06d5f..ba77bf0 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -74,7 +74,7 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::plug:
process_plug ();
process_seqnum ();
- return;
+ break;
case command_t::own:
process_own (cmd_.args.own.object);
@@ -96,9 +96,13 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum ();
break;
+ case command_t::hiccup:
+ process_hiccup (cmd_.args.hiccup.pipe);
+ break;
+
case command_t::pipe_term:
process_pipe_term ();
- return;
+ break;
case command_t::pipe_term_ack:
process_pipe_term_ack ();
@@ -290,6 +294,18 @@ void zmq::object_t::send_activate_write (pipe_t *destination_,
send_command (cmd);
}
+void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
+{
+ command_t cmd;
+#if defined ZMQ_MAKE_VALGRIND_HAPPY
+ memset (&cmd, 0, sizeof (cmd));
+#endif
+ cmd.destination = destination_;
+ cmd.type = command_t::hiccup;
+ cmd.args.hiccup.pipe = pipe_;
+ send_command (cmd);
+}
+
void zmq::object_t::send_pipe_term (pipe_t *destination_)
{
command_t cmd;
@@ -418,6 +434,11 @@ void zmq::object_t::process_activate_write (uint64_t msgs_read_)
zmq_assert (false);
}
+void zmq::object_t::process_hiccup (void *pipe_)
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_pipe_term ()
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index 0f47670..fbad0ea 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -71,6 +71,7 @@ namespace zmq
void send_activate_read (class pipe_t *destination_);
void send_activate_write (class pipe_t *destination_,
uint64_t msgs_read_);
+ void send_hiccup (class pipe_t *destination_, void *pipe_);
void send_pipe_term (class pipe_t *destination_);
void send_pipe_term_ack (class pipe_t *destination_);
void send_term_req (class own_t *destination_,
@@ -92,6 +93,7 @@ namespace zmq
const blob_t &peer_identity_);
virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_);
+ virtual void process_hiccup (void *pipe_);
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_);
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 48fc3e5..fd7223c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
}
}
+void zmq::pipe_t::process_hiccup (void *pipe_)
+{
+ // Destroy old outpipe. Note that the read end of the pipe was already
+ // migrated to this thread.
+ zmq_assert (outpipe);
+ outpipe->flush ();
+ msg_t msg;
+ while (outpipe->read (&msg)) {
+ int rc = msg.close ();
+ errno_assert (rc == 0);
+ }
+ delete outpipe;
+
+ // Plug in the new outpipe.
+ zmq_assert (pipe_);
+ outpipe = (upipe_t*) pipe_;
+ out_active = true;
+
+ // If appropriate, notify the user about the hiccup.
+ if (state == active)
+ sink->hiccuped (this);
+}
+
void zmq::pipe_t::process_pipe_term ()
{
// This is the simple case of peer-induced termination. If there are no
@@ -379,3 +402,23 @@ void zmq::pipe_t::delimit ()
// Delimiter in any other state is invalid.
zmq_assert (false);
}
+
+void zmq::pipe_t::hiccup ()
+{
+ // If termination is already under way do nothing.
+ if (state != active)
+ return;
+
+ // We'll drop the pointer to the inpipe. From now on, the peer is
+ // responsible for deallocating it.
+ inpipe = NULL;
+
+ // Create new inpipe.
+ inpipe = new (std::nothrow) pipe_t::upipe_t ();
+ alloc_assert (inpipe);
+ in_active = true;
+
+ // Notify the peer about the hiccup.
+ send_hiccup (peer, (void*) inpipe);
+}
+
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 3087ab8..bf34a83 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,6 +44,7 @@ namespace zmq
virtual void read_activated (class pipe_t *pipe_) = 0;
virtual void write_activated (class pipe_t *pipe_) = 0;
+ virtual void hiccuped (class pipe_t *pipe_) = 0;
virtual void terminated (class pipe_t *pipe_) = 0;
};
@@ -86,6 +87,11 @@ namespace zmq
// Flush the messages downsteam.
void flush ();
+ // Temporaraily disconnects the inbound message stream and drops
+ // all the messages on the fly. Causes 'hiccuped' event to be generated
+ // in the peer.
+ void hiccup ();
+
// Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated'
// event.
@@ -93,18 +99,19 @@ namespace zmq
private:
+ // Type of the underlying lock-free pipe.
+ typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
+
// Command handlers.
void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
+ void process_hiccup (void *pipe_);
void process_pipe_term ();
void process_pipe_term_ack ();
// Handler for delimiter read from the pipe.
void delimit ();
- // Type of the underlying lock-free pipe.
- typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
-
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
diff --git a/src/pub.cpp b/src/pub.cpp
index 8558265..4787c32 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -30,3 +30,16 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
zmq::pub_t::~pub_t ()
{
}
+
+int zmq::pub_t::xrecv (class msg_t *msg_, int flags_)
+{
+ // Messages cannot be received from PUB socket.
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::pub_t::xhas_in ()
+{
+ return false;
+}
+
diff --git a/src/pub.hpp b/src/pub.hpp
index e69af3c..c8db55f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -33,6 +33,10 @@ namespace zmq
pub_t (class ctx_t *parent_, uint32_t tid_);
~pub_t ();
+ // Implementations of virtual functions from socket_base_t.
+ int xrecv (class msg_t *msg_, int flags_);
+ bool xhas_in ();
+
private:
pub_t (const pub_t&);
diff --git a/src/session.cpp b/src/session.cpp
index 5601402..c9f4fdb 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -146,6 +146,13 @@ void zmq::session_t::write_activated (pipe_t *pipe_)
engine->activate_in ();
}
+void zmq::session_t::hiccuped (pipe_t *pipe_)
+{
+ // Hiccups are always sent from session to socket, not the other
+ // way round.
+ zmq_assert (false);
+}
+
void zmq::session_t::process_plug ()
{
}
@@ -287,4 +294,25 @@ void zmq::session_t::unregister_session (const blob_t &name_)
socket->unregister_session (name_);
}
+bool zmq::session_t::attached (const blob_t &peer_identity_)
+{
+ return xattached (peer_identity_);
+}
+
+void zmq::session_t::detached ()
+{
+ if (!xdetached ()) {
+
+ // Derived session type have asked for session termination.
+ terminate ();
+ return;
+ }
+
+ // For subscriber sockets we hiccup the inbound pipe, which will cause
+ // the socket object to resend all the subscriptions.
+ if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
+ pipe->hiccup ();
+}
+
+
diff --git a/src/session.hpp b/src/session.hpp
index 8bca735..1e32722 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -44,9 +44,7 @@ namespace zmq
// To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_);
- // i_inout interface implementation. Note that detach method is not
- // implemented by generic session. Different session types may handle
- // engine disconnection in different ways.
+ // i_inout interface implementation.
bool read (msg_t *msg_);
bool write (msg_t *msg_);
void flush ();
@@ -55,17 +53,19 @@ namespace zmq
// i_pipe_events interface implementation.
void read_activated (class pipe_t *pipe_);
void write_activated (class pipe_t *pipe_);
+ void hiccuped (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_);
protected:
- // Two events for the derived session type. Attached is triggered
- // when session is attached to a peer. The function can reject the new
- // peer by returning false. Detached is triggered at the beginning of
+ // Events from the engine. Attached is triggered when session is
+ // attached to a peer. The function can reject the new peer by
+ // returning false. Detached is triggered at the beginning of
// the termination process when session is about to be detached from
- // the peer.
- virtual bool attached (const blob_t &peer_identity_) = 0;
- virtual void detached () = 0;
+ // the peer. If it returns false, session will be terminated.
+ // To be overloaded by the derived session type.
+ virtual bool xattached (const blob_t &peer_identity_) = 0;
+ virtual bool xdetached () = 0;
// Returns true if there is an engine attached to the session.
bool has_engine ();
@@ -78,6 +78,9 @@ namespace zmq
private:
+ bool attached (const blob_t &peer_identity_);
+ void detached ();
+
// Handlers for incoming commands.
void process_plug ();
void process_attach (struct i_engine *engine_,
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 1682c05..59e2653 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -764,7 +764,7 @@ bool zmq::socket_base_t::xhas_out ()
return false;
}
-int zmq::socket_base_t::xsend (msg_t *msg_, int options_)
+int zmq::socket_base_t::xsend (msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
@@ -775,7 +775,7 @@ bool zmq::socket_base_t::xhas_in ()
return false;
}
-int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
+int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
@@ -790,6 +790,11 @@ void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
zmq_assert (false);
}
+void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
+{
+ zmq_assert (false);
+}
+
void zmq::socket_base_t::in_event ()
{
// Process any commands from other threads/sockets that may be available
@@ -837,6 +842,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_)
xwrite_activated (pipe_);
}
+void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
+{
+ xhiccuped (pipe_);
+}
+
void zmq::socket_base_t::terminated (pipe_t *pipe_)
{
// Notify the specific socket type about the pipe termination.
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 98046af..ed5620c 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -96,6 +96,7 @@ namespace zmq
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
+ void hiccuped (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
protected:
@@ -116,15 +117,16 @@ namespace zmq
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
- virtual int xsend (class msg_t *msg_, int options_);
+ virtual int xsend (class msg_t *msg_, int flags_);
// The default implementation assumes that recv in not supported.
virtual bool xhas_in ();
- virtual int xrecv (class msg_t *msg_, int options_);
+ virtual int xrecv (class msg_t *msg_, int flags_);
// i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_);
virtual void xwrite_activated (pipe_t *pipe_);
+ virtual void xhiccuped (pipe_t *pipe_);
virtual void xterminated (pipe_t *pipe_) = 0;
// Delay actual destruction of the socket.
diff --git a/src/sub.cpp b/src/sub.cpp
index 11c4532..c8ffd2e 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -22,43 +22,18 @@
#include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
- xsub_t (parent_, tid_),
- has_message (false),
- more (false)
+ xsub_t (parent_, tid_)
{
options.type = ZMQ_SUB;
- int rc = message.init ();
- errno_assert (rc == 0);
}
zmq::sub_t::~sub_t ()
{
- int rc = message.close ();
- errno_assert (rc == 0);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
- // Process a subscription.
- if (option_ == ZMQ_SUBSCRIBE)
- subscriptions.add ((unsigned char*) optval_, optvallen_);
-
- // Process an unsubscription. Return error if there is no corresponding
- // subscription.
- else if (option_ == ZMQ_UNSUBSCRIBE) {
- if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
- errno = EINVAL;
- return -1;
- }
- }
-
- // Unknow option.
- else {
- errno = EINVAL;
- return -1;
- }
-
// Create the subscription message.
msg_t msg;
int rc = msg.init_size (optvallen_ + 1);
@@ -82,7 +57,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return rc;
}
-int zmq::sub_t::xsend (msg_t *msg_, int options_)
+int zmq::sub_t::xsend (msg_t *msg_, int flags_)
{
// Overload the XSUB's send.
errno = ENOTSUP;
@@ -95,89 +70,3 @@ bool zmq::sub_t::xhas_out ()
return false;
}
-int zmq::sub_t::xrecv (msg_t *msg_, int flags_)
-{
- // If there's already a message prepared by a previous call to zmq_poll,
- // return it straight ahead.
- if (has_message) {
- int rc = msg_->move (message);
- errno_assert (rc == 0);
- has_message = false;
- more = msg_->flags () & msg_t::more;
- return 0;
- }
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages which breaks the non-blocking recv
- // semantics.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = xsub_t::xrecv (msg_, flags_);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0)
- return -1;
-
- // Check whether the message matches at least one subscription.
- // Non-initial parts of the message are passed
- if (more || match (msg_)) {
- more = msg_->flags () & msg_t::more;
- return 0;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (msg_->flags () & msg_t::more) {
- rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT);
- zmq_assert (rc == 0);
- }
- }
-}
-
-bool zmq::sub_t::xhas_in ()
-{
- // There are subsequent parts of the partly-read message available.
- if (more)
- return true;
-
- // If there's already a message prepared by a previous call to zmq_poll,
- // return straight ahead.
- if (has_message)
- return true;
-
- // TODO: This can result in infinite loop in the case of continuous
- // stream of non-matching messages.
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
-
- // If there's no message available, return immediately.
- // The same when error occurs.
- if (rc != 0) {
- zmq_assert (errno == EAGAIN);
- return false;
- }
-
- // Check whether the message matches at least one subscription.
- if (match (&message)) {
- has_message = true;
- return true;
- }
-
- // Message doesn't match. Pop any remaining parts of the message
- // from the pipe.
- while (message.flags () & msg_t::more) {
- rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
- zmq_assert (rc == 0);
- }
- }
-}
-
-bool zmq::sub_t::match (msg_t *msg_)
-{
- return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
-}
-
diff --git a/src/sub.hpp b/src/sub.hpp
index 91a5b65..b5980ba 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -22,8 +22,6 @@
#define __ZMQ_SUB_HPP_INCLUDED__
#include "xsub.hpp"
-#include "trie.hpp"
-#include "msg.hpp"
namespace zmq
{
@@ -38,28 +36,11 @@ namespace zmq
protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
- int xsend (class msg_t *msg_, int options_);
+ int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
- int xrecv (class msg_t *msg_, int flags_);
- bool xhas_in ();
private:
- // Check whether the message matches at least one subscription.
- bool match (class msg_t *msg_);
-
- // The repository of subscriptions.
- trie_t subscriptions;
-
- // If true, 'message' contains a matching message to return on the
- // next recv call.
- bool has_message;
- msg_t message;
-
- // If true, part of a multipart message was already received, but
- // there are following parts still waiting.
- bool more;
-
sub_t (const sub_t&);
const sub_t &operator = (const sub_t&);
};
diff --git a/src/transient_session.cpp b/src/transient_session.cpp
index 10a086f..b3c80b0 100644
--- a/src/transient_session.cpp
+++ b/src/transient_session.cpp
@@ -30,14 +30,14 @@ zmq::transient_session_t::~transient_session_t ()
{
}
-bool zmq::transient_session_t::attached (const blob_t &peer_identity_)
+bool zmq::transient_session_t::xattached (const blob_t &peer_identity_)
{
// Transient session is always valid.
return true;
}
-void zmq::transient_session_t::detached ()
+bool zmq::transient_session_t::xdetached ()
{
// There's no way to reestablish a transient session. Tear it down.
- terminate ();
+ return false;
}
diff --git a/src/transient_session.hpp b/src/transient_session.hpp
index c70a2d7..55c2b8a 100644
--- a/src/transient_session.hpp
+++ b/src/transient_session.hpp
@@ -40,8 +40,8 @@ namespace zmq
private:
// Handlers for events from session base class.
- bool attached (const blob_t &peer_identity_);
- void detached ();
+ bool xattached (const blob_t &peer_identity_);
+ bool xdetached ();
transient_session_t (const transient_session_t&);
const transient_session_t &operator = (const transient_session_t&);
diff --git a/src/trie.cpp b/src/trie.cpp
index 4198ff3..cd6cb7b 100644
--- a/src/trie.cpp
+++ b/src/trie.cpp
@@ -50,12 +50,12 @@ zmq::trie_t::~trie_t ()
}
}
-void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
+bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
{
// We are at the node corresponding to the prefix. We are done.
if (!size_) {
++refcnt;
- return;
+ return refcnt == 1;
}
unsigned char c = *prefix_;
@@ -74,7 +74,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
count = (min < c ? c - min : min - c) + 1;
next.table = (trie_t**)
malloc (sizeof (trie_t*) * count);
- alloc_assert (next.table);
+ zmq_assert (next.table);
for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0;
min = std::min (min, c);
@@ -111,26 +111,28 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
if (count == 1) {
if (!next.node) {
next.node = new (std::nothrow) trie_t;
- alloc_assert (next.node);
+ zmq_assert (next.node);
}
- next.node->add (prefix_ + 1, size_ - 1);
+ return next.node->add (prefix_ + 1, size_ - 1);
}
else {
if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) trie_t;
- alloc_assert (next.table [c - min]);
+ zmq_assert (next.table [c - min]);
}
- next.table [c - min]->add (prefix_ + 1, size_ - 1);
+ return next.table [c - min]->add (prefix_ + 1, size_ - 1);
}
}
bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
{
+ // TODO: Shouldn't an error be reported if the key does not exist?
+
if (!size_) {
if (!refcnt)
return false;
refcnt--;
- return true;
+ return refcnt == 0;
}
unsigned char c = *prefix_;
@@ -179,3 +181,48 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
size_--;
}
}
+
+void zmq::trie_t::apply (void (*func_) (unsigned char *data_, size_t size_,
+ void *arg_), void *arg_)
+{
+ unsigned char *buff = NULL;
+ apply_helper (&buff, 0, 0, func_, arg_);
+ free (buff);
+}
+
+void zmq::trie_t::apply_helper (
+ unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_)
+{
+ // If this node is a subscription, apply the function.
+ if (refcnt)
+ func_ (*buff_, buffsize_, arg_);
+
+ // Adjust the buffer.
+ if (buffsize_ >= maxbuffsize_) {
+ maxbuffsize_ = buffsize_ + 256;
+ *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
+ zmq_assert (*buff_);
+ }
+
+ // If there are no subnodes in the trie, return.
+ if (count == 0)
+ return;
+
+ // If there's one subnode (optimisation).
+ if (count == 1) {
+ (*buff_) [buffsize_] = min;
+ buffsize_++;
+ next.node->apply_helper (buff_, buffsize_, maxbuffsize_, func_, arg_);
+ return;
+ }
+
+ // If there are multiple subnodes.
+ for (unsigned char c = 0; c != count; c++) {
+ (*buff_) [buffsize_] = min + c;
+ if (next.table [c])
+ next.table [c]->apply_helper (buff_, buffsize_ + 1, maxbuffsize_,
+ func_, arg_);
+ }
+}
+
diff --git a/src/trie.hpp b/src/trie.hpp
index dbf1cb1..a2b55c6 100644
--- a/src/trie.hpp
+++ b/src/trie.hpp
@@ -35,12 +35,28 @@ namespace zmq
trie_t ();
~trie_t ();
- void add (unsigned char *prefix_, size_t size_);
+ // Add key to the trie. Returns true if this is a new item in the trie
+ // rather than a duplicate.
+ bool add (unsigned char *prefix_, size_t size_);
+
+ // Remove key from the trie. Returns true if the item is actually
+ // removed from the trie.
bool rm (unsigned char *prefix_, size_t size_);
+
+ // Check whether particular key is in the trie.
bool check (unsigned char *data_, size_t size_);
+ // Apply the function supplied to each subscription in the trie.
+ void apply (void (*func_) (unsigned char *data_, size_t size_,
+ void *arg_), void *arg_);
+
private:
+ void apply_helper (
+ unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
+ void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_);
+
uint32_t refcnt;
unsigned char min;
unsigned short count;
diff --git a/src/xpub.cpp b/src/xpub.cpp
index d5dba9f..9158cf4 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -18,6 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "xpub.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -37,6 +39,12 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
dist.attach (pipe_);
+ fq.attach (pipe_);
+}
+
+void zmq::xpub_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
}
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
@@ -46,11 +54,37 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
void zmq::xpub_t::xterminated (pipe_t *pipe_)
{
+ // Remove the pipe from the trie. If there are topics that nobody
+ // is interested in anymore, send corresponding unsubscriptions
+ // upstream.
+ subscriptions.rm (pipe_, send_unsubscription, this);
+
dist.terminated (pipe_);
+ fq.terminated (pipe_);
}
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
+ // First, process any (un)subscriptions from downstream.
+ msg_t sub;
+ sub.init ();
+ while (true) {
+
+ // Grab next subscription.
+ pipe_t *pipe;
+ int rc = fq.recvpipe (&sub, 0, &pipe);
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
+
+ // Apply the subscription to the trie. If it's not a duplicate,
+ // store it so that it can be passed to used on next recv call.
+ if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB)
+ pending.push_back (blob_t ((unsigned char*) sub.data (),
+ sub.size ()));
+ }
+ sub.close ();
+
return dist.send (msg_, flags_);
}
@@ -61,12 +95,92 @@ bool zmq::xpub_t::xhas_out ()
int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
{
- errno = EAGAIN;
- return -1;
+ // If there is at least one
+ if (!pending.empty ()) {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init_size (pending.front ().size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), pending.front ().data (),
+ pending.front ().size ());
+ pending.pop_front ();
+ return 0;
+ }
+
+ // Grab and apply next subscription.
+ pipe_t *pipe;
+ int rc = fq.recvpipe (msg_, 0, &pipe);
+ if (rc != 0)
+ return -1;
+ if (!apply_subscription (msg_, pipe)) {
+// TODO: This should be a loop rather!
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
+ return 0;
}
bool zmq::xpub_t::xhas_in ()
{
- return false;
+ if (!pending.empty ())
+ return true;
+
+ // Even if there are subscriptions in the fair-queuer they may be
+ // duplicates. Thus, we have to check by hand wheter there is any
+ // subscription available to pass upstream.
+ // First, process any (un)subscriptions from downstream.
+ msg_t sub;
+ sub.init ();
+ while (true) {
+
+ // Grab next subscription.
+ pipe_t *pipe;
+ int rc = fq.recvpipe (&sub, 0, &pipe);
+ if (rc != 0 && errno == EAGAIN) {
+ sub.close ();
+ return false;
+ }
+ errno_assert (rc == 0);
+
+ // Apply the subscription to the trie. If it's not a duplicate store
+ // it so that it can be passed to used on next recv call.
+ if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) {
+ pending.push_back (blob_t ((unsigned char*) sub.data (),
+ sub.size ()));
+ sub.close ();
+ return true;
+ }
+ }
+}
+
+bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_)
+{
+ unsigned char *data = (unsigned char*) sub_->data ();
+ size_t size = sub_->size ();
+ zmq_assert (size > 0 && (*data == 0 || *data == 1));
+
+ if (*data == 0)
+ return subscriptions.rm (data + 1, size - 1, pipe_);
+ else
+ return subscriptions.add (data + 1, size - 1, pipe_);
+}
+
+void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
+ void *arg_)
+{
+ xpub_t *self = (xpub_t*) arg_;
+
+ if (self->options.type != ZMQ_PUB) {
+
+ // Place the unsubscription to the queue of pending (un)sunscriptions
+ // to be retrived by the user later on.
+ xpub_t *self = (xpub_t*) arg_;
+ blob_t unsub (size_ + 1, 0);
+ unsub [0] = 0;
+ memcpy (&unsub [1], data_, size_);
+ self->pending.push_back (unsub);
+ }
}
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 8a6ff73..b824548 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -21,9 +21,14 @@
#ifndef __ZMQ_XPUB_HPP_INCLUDED__
#define __ZMQ_XPUB_HPP_INCLUDED__
+#include <deque>
+
#include "socket_base.hpp"
+#include "mtrie.hpp"
#include "array.hpp"
+#include "blob.hpp"
#include "dist.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -42,14 +47,35 @@ namespace zmq
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
+ void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private:
+ // Applies the subscription to the trie. Return false if it is a
+ // duplicate.
+ bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_);
+
+ // Function to be applied to the trie to send all the subsciptions
+ // upstream.
+ static void send_unsubscription (unsigned char *data_, size_t size_,
+ void *arg_);
+
+ // List of all subscriptions mapped to corresponding pipes.
+ mtrie_t subscriptions;
+
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
+ // Object to fair-queue the subscription requests.
+ fq_t fq;
+
+ // List of pending (un)subscriptions, ie. those that were already
+ // applied to the trie, but not yet received by the user.
+ typedef std::deque <blob_t> pending_t;
+ pending_t pending;
+
xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&);
};
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 6928d82..729f6a4 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -24,19 +24,30 @@
#include "err.hpp"
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_)
+ socket_base_t (parent_, tid_),
+ has_message (false),
+ more (false)
{
options.type = ZMQ_XSUB;
+ int rc = message.init ();
+ errno_assert (rc == 0);
}
zmq::xsub_t::~xsub_t ()
{
+ int rc = message.close ();
+ errno_assert (rc == 0);
}
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
+ dist.attach (pipe_);
+
+ // Send all the cached subscriptions to the new upstream peer.
+ subscriptions.apply (send_subscription, pipe_);
+ pipe_->flush ();
}
void zmq::xsub_t::xread_activated (pipe_t *pipe_)
@@ -44,20 +55,49 @@ void zmq::xsub_t::xread_activated (pipe_t *pipe_)
fq.activated (pipe_);
}
+void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
+{
+ dist.activated (pipe_);
+}
+
void zmq::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
+ dist.terminated (pipe_);
}
-int zmq::xsub_t::xsend (msg_t *msg_, int options_)
+void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
{
- // TODO: Once we'll send the subscription upstream here. For now
- // just empty the message.
- int rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
- return 0;
+ // Send all the cached subscriptions to the hiccuped pipe.
+ subscriptions.apply (send_subscription, pipe_);
+ pipe_->flush ();
+}
+
+int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
+{
+ size_t size = msg_->size ();
+ unsigned char *data = (unsigned char*) msg_->data ();
+
+ // Malformed subscriptions.
+ if (size < 1 || (*data != 0 && *data != 1)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Process the subscription.
+ if (*data == 1) {
+ if (subscriptions.add (data + 1, size - 1))
+ return dist.send (msg_, flags_);
+ else
+ return 0;
+ }
+ else if (*data == 0) {
+ if (subscriptions.rm (data + 1, size - 1))
+ return dist.send (msg_, flags_);
+ else
+ return 0;
+ }
+ zmq_assert (false);
}
bool zmq::xsub_t::xhas_out ()
@@ -66,13 +106,109 @@ bool zmq::xsub_t::xhas_out ()
return true;
}
-int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_)
+int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
{
- return fq.recv (msg_, flags_);
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return it straight ahead.
+ if (has_message) {
+ int rc = msg_->move (message);
+ errno_assert (rc == 0);
+ has_message = false;
+ more = msg_->flags () & msg_t::more;
+ return 0;
+ }
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages which breaks the non-blocking recv
+ // semantics.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (msg_, flags_);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0)
+ return -1;
+
+ // Check whether the message matches at least one subscription.
+ // Non-initial parts of the message are passed
+ if (more || match (msg_)) {
+ more = msg_->flags () & msg_t::more;
+ return 0;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (msg_->flags () & msg_t::more) {
+ rc = fq.recv (msg_, ZMQ_DONTWAIT);
+ zmq_assert (rc == 0);
+ }
+ }
}
bool zmq::xsub_t::xhas_in ()
{
- return fq.has_in ();
+ // There are subsequent parts of the partly-read message available.
+ if (more)
+ return true;
+
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return straight ahead.
+ if (has_message)
+ return true;
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (&message, ZMQ_DONTWAIT);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0) {
+ zmq_assert (errno == EAGAIN);
+ return false;
+ }
+
+ // Check whether the message matches at least one subscription.
+ if (match (&message)) {
+ has_message = true;
+ return true;
+ }
+
+ // Message doesn't match. Pop any remaining parts of the message
+ // from the pipe.
+ while (message.flags () & msg_t::more) {
+ rc = fq.recv (&message, ZMQ_DONTWAIT);
+ zmq_assert (rc == 0);
+ }
+ }
+}
+
+bool zmq::xsub_t::match (msg_t *msg_)
+{
+ return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
}
+void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
+ void *arg_)
+{
+ pipe_t *pipe = (pipe_t*) arg_;
+
+ // Create the subsctription message.
+ msg_t msg;
+ int rc = msg.init_size (size_ + 1);
+ zmq_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ data [0] = 1;
+ memcpy (data + 1, data_, size_);
+
+ // Send it to the pipe.
+ bool sent = pipe->write (&msg);
+ zmq_assert (sent);
+ msg.close ();
+}
+
+
diff --git a/src/xsub.hpp b/src/xsub.hpp
index ebd6259..03b3178 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -22,7 +22,10 @@
#define __ZMQ_XSUB_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "dist.hpp"
#include "fq.hpp"
+#include "trie.hpp"
+#include "msg.hpp"
namespace zmq
{
@@ -39,18 +42,43 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
- int xsend (class msg_t *msg_, int options_);
+ int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
+ void xwrite_activated (class pipe_t *pipe_);
+ void xhiccuped (pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private:
+ // Check whether the message matches at least one subscription.
+ bool match (class msg_t *msg_);
+
+ // Function to be applied to the trie to send all the subsciptions
+ // upstream.
+ static void send_subscription (unsigned char *data_, size_t size_,
+ void *arg_);
+
// Fair queueing object for inbound pipes.
fq_t fq;
+ // Object for distributing the subscriptions upstream.
+ dist_t dist;
+
+ // The repository of subscriptions.
+ trie_t subscriptions;
+
+ // If true, 'message' contains a matching message to return on the
+ // next recv call.
+ bool has_message;
+ msg_t message;
+
+ // If true, part of a multipart message was already received, but
+ // there are following parts still waiting.
+ bool more;
+
xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&);
};