diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-30 10:07:34 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-30 10:07:34 +0200 |
commit | 0b59866a84f733e5a53b0d2f32570581691747ef (patch) | |
tree | 8861d97915544dc4385177931f299a6f27603c92 | |
parent | 311fb0d852374e769d8ff791c9df38f0464960c6 (diff) |
Patches from sub-forward branch incorporated
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/command.hpp | 8 | ||||
-rw-r--r-- | src/connect_session.cpp | 7 | ||||
-rw-r--r-- | src/connect_session.hpp | 4 | ||||
-rw-r--r-- | src/fq.cpp | 7 | ||||
-rw-r--r-- | src/fq.hpp | 4 | ||||
-rw-r--r-- | src/mtrie.cpp | 218 | ||||
-rw-r--r-- | src/mtrie.hpp | 83 | ||||
-rw-r--r-- | src/named_session.cpp | 5 | ||||
-rw-r--r-- | src/named_session.hpp | 4 | ||||
-rw-r--r-- | src/object.cpp | 25 | ||||
-rw-r--r-- | src/object.hpp | 2 | ||||
-rw-r--r-- | src/pipe.cpp | 43 | ||||
-rw-r--r-- | src/pipe.hpp | 13 | ||||
-rw-r--r-- | src/pub.cpp | 13 | ||||
-rw-r--r-- | src/pub.hpp | 4 | ||||
-rw-r--r-- | src/session.cpp | 28 | ||||
-rw-r--r-- | src/session.hpp | 21 | ||||
-rw-r--r-- | src/socket_base.cpp | 14 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | src/sub.cpp | 115 | ||||
-rw-r--r-- | src/sub.hpp | 21 | ||||
-rw-r--r-- | src/transient_session.cpp | 6 | ||||
-rw-r--r-- | src/transient_session.hpp | 4 | ||||
-rw-r--r-- | src/trie.cpp | 63 | ||||
-rw-r--r-- | src/trie.hpp | 18 | ||||
-rw-r--r-- | src/xpub.cpp | 120 | ||||
-rw-r--r-- | src/xpub.hpp | 26 | ||||
-rw-r--r-- | src/xsub.cpp | 160 | ||||
-rw-r--r-- | src/xsub.hpp | 30 |
30 files changed, 884 insertions, 190 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index de83d76..46c66ee 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ libzmq_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ + mtrie.hpp \ mutex.hpp \ named_session.hpp \ object.hpp \ @@ -97,6 +98,7 @@ libzmq_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ + mtrie.cpp \ named_session.cpp \ object.cpp \ options.cpp \ diff --git a/src/command.hpp b/src/command.hpp index ff7b551..15cee0a 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -42,6 +42,7 @@ namespace zmq bind, activate_read, activate_write, + hiccup, pipe_term, pipe_term_ack, term_req, @@ -95,6 +96,13 @@ namespace zmq uint64_t msgs_read; } activate_write; + // Sent by pipe reader to writer after creating a new inpipe. + // The parameter is actually of type pipe_t::upipe_t, however, + // its definition is private so we'll have to do with void*. + struct { + void *pipe; + } hiccup; + // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. struct { diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 9a29bf1..fe7332a 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -111,7 +111,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_assert (false); } -bool zmq::connect_session_t::attached (const blob_t &peer_identity_) +bool zmq::connect_session_t::xattached (const blob_t &peer_identity_) { // If there was no previous connection... if (!connected) { @@ -153,9 +153,12 @@ bool zmq::connect_session_t::attached (const blob_t &peer_identity_) return true; } -void zmq::connect_session_t::detached () +bool zmq::connect_session_t::xdetached () { // Reconnect. start_connecting (true); + + // Don't tear the session down. + return true; } diff --git a/src/connect_session.hpp b/src/connect_session.hpp index 93e2704..3b8a26b 100644 --- a/src/connect_session.hpp +++ b/src/connect_session.hpp @@ -44,8 +44,8 @@ namespace zmq private: // Handlers for events from session base class. - bool attached (const blob_t &peer_identity_); - void detached (); + bool xattached (const blob_t &peer_identity_); + bool xdetached (); // Start the connection process. void start_connecting (bool wait_); @@ -63,6 +63,11 @@ void zmq::fq_t::activated (pipe_t *pipe_) int zmq::fq_t::recv (msg_t *msg_, int flags_) { + return recvpipe (msg_, flags_, NULL); +} + +int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) +{ // Deallocate old content of the message. int rc = msg_->close (); errno_assert (rc == 0); @@ -83,6 +88,8 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_) // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { + if (pipe_) + *pipe_ = pipes [current]; more = msg_->flags () & msg_t::more; if (!more) { current++; @@ -29,8 +29,9 @@ namespace zmq { // Class manages a set of inbound pipes. On receive it performs fair - // queueing (RFC970) so that senders gone berserk won't cause denial of + // queueing so that senders gone berserk won't cause denial of // service for decent senders. + class fq_t { public: @@ -43,6 +44,7 @@ namespace zmq void terminated (pipe_t *pipe_); int recv (msg_t *msg_, int flags_); + int recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_); bool has_in (); private: diff --git a/src/mtrie.cpp b/src/mtrie.cpp new file mode 100644 index 0000000..ac1fc31 --- /dev/null +++ b/src/mtrie.cpp @@ -0,0 +1,218 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include <new> +#include <algorithm> + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#include "err.hpp" +#include "pipe.hpp" +#include "mtrie.hpp" + +zmq::mtrie_t::mtrie_t () : + min (0), + count (0) +{ +} + +zmq::mtrie_t::~mtrie_t () +{ + if (count == 1) + delete next.node; + else if (count > 1) { + for (unsigned short i = 0; i != count; ++i) + if (next.table [i]) + delete next.table [i]; + free (next.table); + } +} + +bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = pipes.empty (); + pipes.insert (pipe_); + return result; + } + + unsigned char c = *prefix_; + if (c < min || c >= min + count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!count) { + min = c; + count = 1; + next.node = NULL; + } + else if (count == 1) { + unsigned char oldc = min; + mtrie_t *oldp = next.node; + count = (min < c ? c - min : min - c) + 1; + next.table = (mtrie_t**) + malloc (sizeof (mtrie_t*) * count); + zmq_assert (next.table); + for (unsigned short i = 0; i != count; ++i) + next.table [i] = 0; + min = std::min (min, c); + next.table [oldc - min] = oldp; + } + else if (min < c) { + + // The new character is above the current character range. + unsigned short old_count = count; + count = c - min + 1; + next.table = (mtrie_t**) realloc ((void*) next.table, + sizeof (mtrie_t*) * count); + zmq_assert (next.table); + for (unsigned short i = old_count; i != count; i++) + next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned short old_count = count; + count = (min + old_count) - c; + next.table = (mtrie_t**) realloc ((void*) next.table, + sizeof (mtrie_t*) * count); + zmq_assert (next.table); + memmove (next.table + min - c, next.table, + old_count * sizeof (mtrie_t*)); + for (unsigned short i = 0; i != min - c; i++) + next.table [i] = NULL; + min = c; + } + } + + // If next node does not exist, create one. + if (count == 1) { + if (!next.node) { + next.node = new (std::nothrow) mtrie_t; + zmq_assert (next.node); + } + return next.node->add (prefix_ + 1, size_ - 1, pipe_); + } + else { + if (!next.table [c - min]) { + next.table [c - min] = new (std::nothrow) mtrie_t; + zmq_assert (next.table [c - min]); + } + return next.table [c - min]->add (prefix_ + 1, size_ - 1, pipe_); + } +} + + +void zmq::mtrie_t::rm (pipe_t *pipe_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_) +{ + unsigned char *buff = NULL; + rm_helper (pipe_, &buff, 0, 0, func_, arg_); + free (buff); +} + +void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_) +{ + // Remove the subscription from this node. + if (pipes.erase (pipe_) && pipes.empty ()) + func_ (*buff_, buffsize_, arg_); + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + (*buff_) [buffsize_] = min; + buffsize_++; + next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, + func_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned char c = 0; c != count; c++) { + (*buff_) [buffsize_] = min + c; + if (next.table [c]) + next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, + maxbuffsize_, func_, arg_); + } +} + +bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) +{ + if (!size_) { + pipes_t::size_type erased = pipes.erase (pipe_); + zmq_assert (erased == 1); + return pipes.empty (); + } + + unsigned char c = *prefix_; + if (!count || c < min || c >= min + count) + return false; + + mtrie_t *next_node = + count == 1 ? next.node : next.table [c - min]; + + if (!next_node) + return false; + + return next_node->rm (prefix_ + 1, size_ - 1, pipe_); +} + +void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) +{ + // Merge the subscriptions from this node to the resultset. + pipes_.insert (pipes.begin (), pipes.end ()); + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + next.node->match (data_ + 1, size_ - 1, pipes_); + return; + } + + // If there are multiple subnodes. + for (unsigned char c = 0; c != count; c++) { + if (next.table [c]) + next.table [c]->match (data_ + 1, size_ - 1, pipes_); + } +} + diff --git a/src/mtrie.hpp b/src/mtrie.hpp new file mode 100644 index 0000000..99f20e2 --- /dev/null +++ b/src/mtrie.hpp @@ -0,0 +1,83 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_MTRIE_HPP_INCLUDED__ +#define __ZMQ_MTRIE_HPP_INCLUDED__ + +#include <stddef.h> +#include <set> + +#include "stdint.hpp" + +namespace zmq +{ + + // Multi-trie. Each node in the trie is a set of pointers to pipes. + + class mtrie_t + { + public: + + typedef std::set <class pipe_t*> pipes_t; + + mtrie_t (); + ~mtrie_t (); + + // Add key to the trie. Returns true if it's a new subscription + // rather than a duplicate. + bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + + // Remove all subscriptions for a specific peer from the trie. + // If there are no subscriptions left on some topics, invoke the + // supplied callback function. + void rm (class pipe_t *pipe_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_); + + // Remove specific subscription from the trie. Return true is it was + // actually removed rather than de-duplicated. + bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + + // Get all matching pipes. + void match (unsigned char *data_, size_t size_, pipes_t &pipes_); + + private: + + void rm_helper (class pipe_t *pipe_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_); + + pipes_t pipes; + unsigned char min; + unsigned short count; + union { + class mtrie_t *node; + class mtrie_t **table; + } next; + + mtrie_t (const mtrie_t&); + const mtrie_t &operator = (const mtrie_t&); + }; + +} + +#endif + diff --git a/src/named_session.cpp b/src/named_session.cpp index 34f4af4..8e43fb0 100644 --- a/src/named_session.cpp +++ b/src/named_session.cpp @@ -45,7 +45,7 @@ zmq::named_session_t::~named_session_t () unregister_session (peer_identity); } -bool zmq::named_session_t::attached (const blob_t &peer_identity_) +bool zmq::named_session_t::xattached (const blob_t &peer_identity_) { // Double check that identities match. zmq_assert (peer_identity == peer_identity_); @@ -58,9 +58,10 @@ bool zmq::named_session_t::attached (const blob_t &peer_identity_) return true; } -void zmq::named_session_t::detached () +bool zmq::named_session_t::xdetached () { // Do nothing. Named sessions are never destroyed because of disconnection. // Neither they have to actively reconnect. + return true; } diff --git a/src/named_session.hpp b/src/named_session.hpp index e3b5aa3..10af8cc 100644 --- a/src/named_session.hpp +++ b/src/named_session.hpp @@ -40,8 +40,8 @@ namespace zmq ~named_session_t (); // Handlers for events from session base class. - bool attached (const blob_t &peer_identity_); - void detached (); + bool xattached (const blob_t &peer_identity_); + bool xdetached (); private: diff --git a/src/object.cpp b/src/object.cpp index 0a06d5f..ba77bf0 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -74,7 +74,7 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::plug: process_plug (); process_seqnum (); - return; + break; case command_t::own: process_own (cmd_.args.own.object); @@ -96,9 +96,13 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; + case command_t::hiccup: + process_hiccup (cmd_.args.hiccup.pipe); + break; + case command_t::pipe_term: process_pipe_term (); - return; + break; case command_t::pipe_term_ack: process_pipe_term_ack (); @@ -290,6 +294,18 @@ void zmq::object_t::send_activate_write (pipe_t *destination_, send_command (cmd); } +void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = destination_; + cmd.type = command_t::hiccup; + cmd.args.hiccup.pipe = pipe_; + send_command (cmd); +} + void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; @@ -418,6 +434,11 @@ void zmq::object_t::process_activate_write (uint64_t msgs_read_) zmq_assert (false); } +void zmq::object_t::process_hiccup (void *pipe_) +{ + zmq_assert (false); +} + void zmq::object_t::process_pipe_term () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 0f47670..fbad0ea 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -71,6 +71,7 @@ namespace zmq void send_activate_read (class pipe_t *destination_); void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); + void send_hiccup (class pipe_t *destination_, void *pipe_); void send_pipe_term (class pipe_t *destination_); void send_pipe_term_ack (class pipe_t *destination_); void send_term_req (class own_t *destination_, @@ -92,6 +93,7 @@ namespace zmq const blob_t &peer_identity_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); + virtual void process_hiccup (void *pipe_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 48fc3e5..fd7223c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) } } +void zmq::pipe_t::process_hiccup (void *pipe_) +{ + // Destroy old outpipe. Note that the read end of the pipe was already + // migrated to this thread. + zmq_assert (outpipe); + outpipe->flush (); + msg_t msg; + while (outpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } + delete outpipe; + + // Plug in the new outpipe. + zmq_assert (pipe_); + outpipe = (upipe_t*) pipe_; + out_active = true; + + // If appropriate, notify the user about the hiccup. + if (state == active) + sink->hiccuped (this); +} + void zmq::pipe_t::process_pipe_term () { // This is the simple case of peer-induced termination. If there are no @@ -379,3 +402,23 @@ void zmq::pipe_t::delimit () // Delimiter in any other state is invalid. zmq_assert (false); } + +void zmq::pipe_t::hiccup () +{ + // If termination is already under way do nothing. + if (state != active) + return; + + // We'll drop the pointer to the inpipe. From now on, the peer is + // responsible for deallocating it. + inpipe = NULL; + + // Create new inpipe. + inpipe = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (inpipe); + in_active = true; + + // Notify the peer about the hiccup. + send_hiccup (peer, (void*) inpipe); +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index 3087ab8..bf34a83 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,6 +44,7 @@ namespace zmq virtual void read_activated (class pipe_t *pipe_) = 0; virtual void write_activated (class pipe_t *pipe_) = 0; + virtual void hiccuped (class pipe_t *pipe_) = 0; virtual void terminated (class pipe_t *pipe_) = 0; }; @@ -86,6 +87,11 @@ namespace zmq // Flush the messages downsteam. void flush (); + // Temporaraily disconnects the inbound message stream and drops + // all the messages on the fly. Causes 'hiccuped' event to be generated + // in the peer. + void hiccup (); + // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' // event. @@ -93,18 +99,19 @@ namespace zmq private: + // Type of the underlying lock-free pipe. + typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; + // Command handlers. void process_activate_read (); void process_activate_write (uint64_t msgs_read_); + void process_hiccup (void *pipe_); void process_pipe_term (); void process_pipe_term_ack (); // Handler for delimiter read from the pipe. void delimit (); - // Type of the underlying lock-free pipe. - typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; - // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, diff --git a/src/pub.cpp b/src/pub.cpp index 8558265..4787c32 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -30,3 +30,16 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : zmq::pub_t::~pub_t () { } + +int zmq::pub_t::xrecv (class msg_t *msg_, int flags_) +{ + // Messages cannot be received from PUB socket. + errno = ENOTSUP; + return -1; +} + +bool zmq::pub_t::xhas_in () +{ + return false; +} + diff --git a/src/pub.hpp b/src/pub.hpp index e69af3c..c8db55f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -33,6 +33,10 @@ namespace zmq pub_t (class ctx_t *parent_, uint32_t tid_); ~pub_t (); + // Implementations of virtual functions from socket_base_t. + int xrecv (class msg_t *msg_, int flags_); + bool xhas_in (); + private: pub_t (const pub_t&); diff --git a/src/session.cpp b/src/session.cpp index 5601402..c9f4fdb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -146,6 +146,13 @@ void zmq::session_t::write_activated (pipe_t *pipe_) engine->activate_in (); } +void zmq::session_t::hiccuped (pipe_t *pipe_) +{ + // Hiccups are always sent from session to socket, not the other + // way round. + zmq_assert (false); +} + void zmq::session_t::process_plug () { } @@ -287,4 +294,25 @@ void zmq::session_t::unregister_session (const blob_t &name_) socket->unregister_session (name_); } +bool zmq::session_t::attached (const blob_t &peer_identity_) +{ + return xattached (peer_identity_); +} + +void zmq::session_t::detached () +{ + if (!xdetached ()) { + + // Derived session type have asked for session termination. + terminate (); + return; + } + + // For subscriber sockets we hiccup the inbound pipe, which will cause + // the socket object to resend all the subscriptions. + if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) + pipe->hiccup (); +} + + diff --git a/src/session.hpp b/src/session.hpp index 8bca735..1e32722 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -44,9 +44,7 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (class pipe_t *pipe_); - // i_inout interface implementation. Note that detach method is not - // implemented by generic session. Different session types may handle - // engine disconnection in different ways. + // i_inout interface implementation. bool read (msg_t *msg_); bool write (msg_t *msg_); void flush (); @@ -55,17 +53,19 @@ namespace zmq // i_pipe_events interface implementation. void read_activated (class pipe_t *pipe_); void write_activated (class pipe_t *pipe_); + void hiccuped (class pipe_t *pipe_); void terminated (class pipe_t *pipe_); protected: - // Two events for the derived session type. Attached is triggered - // when session is attached to a peer. The function can reject the new - // peer by returning false. Detached is triggered at the beginning of + // Events from the engine. Attached is triggered when session is + // attached to a peer. The function can reject the new peer by + // returning false. Detached is triggered at the beginning of // the termination process when session is about to be detached from - // the peer. - virtual bool attached (const blob_t &peer_identity_) = 0; - virtual void detached () = 0; + // the peer. If it returns false, session will be terminated. + // To be overloaded by the derived session type. + virtual bool xattached (const blob_t &peer_identity_) = 0; + virtual bool xdetached () = 0; // Returns true if there is an engine attached to the session. bool has_engine (); @@ -78,6 +78,9 @@ namespace zmq private: + bool attached (const blob_t &peer_identity_); + void detached (); + // Handlers for incoming commands. void process_plug (); void process_attach (struct i_engine *engine_, diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1682c05..59e2653 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -764,7 +764,7 @@ bool zmq::socket_base_t::xhas_out () return false; } -int zmq::socket_base_t::xsend (msg_t *msg_, int options_) +int zmq::socket_base_t::xsend (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; @@ -775,7 +775,7 @@ bool zmq::socket_base_t::xhas_in () return false; } -int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) +int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; @@ -790,6 +790,11 @@ void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) zmq_assert (false); } +void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available @@ -837,6 +842,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_) xwrite_activated (pipe_); } +void zmq::socket_base_t::hiccuped (pipe_t *pipe_) +{ + xhiccuped (pipe_); +} + void zmq::socket_base_t::terminated (pipe_t *pipe_) { // Notify the specific socket type about the pipe termination. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 98046af..ed5620c 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -96,6 +96,7 @@ namespace zmq // i_pipe_events interface implementation. void read_activated (pipe_t *pipe_); void write_activated (pipe_t *pipe_); + void hiccuped (pipe_t *pipe_); void terminated (pipe_t *pipe_); protected: @@ -116,15 +117,16 @@ namespace zmq // The default implementation assumes that send is not supported. virtual bool xhas_out (); - virtual int xsend (class msg_t *msg_, int options_); + virtual int xsend (class msg_t *msg_, int flags_); // The default implementation assumes that recv in not supported. virtual bool xhas_in (); - virtual int xrecv (class msg_t *msg_, int options_); + virtual int xrecv (class msg_t *msg_, int flags_); // i_pipe_events will be forwarded to these functions. virtual void xread_activated (pipe_t *pipe_); virtual void xwrite_activated (pipe_t *pipe_); + virtual void xhiccuped (pipe_t *pipe_); virtual void xterminated (pipe_t *pipe_) = 0; // Delay actual destruction of the socket. diff --git a/src/sub.cpp b/src/sub.cpp index 11c4532..c8ffd2e 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -22,43 +22,18 @@ #include "msg.hpp" zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : - xsub_t (parent_, tid_), - has_message (false), - more (false) + xsub_t (parent_, tid_) { options.type = ZMQ_SUB; - int rc = message.init (); - errno_assert (rc == 0); } zmq::sub_t::~sub_t () { - int rc = message.close (); - errno_assert (rc == 0); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - // Process a subscription. - if (option_ == ZMQ_SUBSCRIBE) - subscriptions.add ((unsigned char*) optval_, optvallen_); - - // Process an unsubscription. Return error if there is no corresponding - // subscription. - else if (option_ == ZMQ_UNSUBSCRIBE) { - if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { - errno = EINVAL; - return -1; - } - } - - // Unknow option. - else { - errno = EINVAL; - return -1; - } - // Create the subscription message. msg_t msg; int rc = msg.init_size (optvallen_ + 1); @@ -82,7 +57,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, return rc; } -int zmq::sub_t::xsend (msg_t *msg_, int options_) +int zmq::sub_t::xsend (msg_t *msg_, int flags_) { // Overload the XSUB's send. errno = ENOTSUP; @@ -95,89 +70,3 @@ bool zmq::sub_t::xhas_out () return false; } -int zmq::sub_t::xrecv (msg_t *msg_, int flags_) -{ - // If there's already a message prepared by a previous call to zmq_poll, - // return it straight ahead. - if (has_message) { - int rc = msg_->move (message); - errno_assert (rc == 0); - has_message = false; - more = msg_->flags () & msg_t::more; - return 0; - } - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages which breaks the non-blocking recv - // semantics. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = xsub_t::xrecv (msg_, flags_); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; - - // Check whether the message matches at least one subscription. - // Non-initial parts of the message are passed - if (more || match (msg_)) { - more = msg_->flags () & msg_t::more; - return 0; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (msg_->flags () & msg_t::more) { - rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT); - zmq_assert (rc == 0); - } - } -} - -bool zmq::sub_t::xhas_in () -{ - // There are subsequent parts of the partly-read message available. - if (more) - return true; - - // If there's already a message prepared by a previous call to zmq_poll, - // return straight ahead. - if (has_message) - return true; - - // TODO: This can result in infinite loop in the case of continuous - // stream of non-matching messages. - while (true) { - - // Get a message using fair queueing algorithm. - int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - zmq_assert (errno == EAGAIN); - return false; - } - - // Check whether the message matches at least one subscription. - if (match (&message)) { - has_message = true; - return true; - } - - // Message doesn't match. Pop any remaining parts of the message - // from the pipe. - while (message.flags () & msg_t::more) { - rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT); - zmq_assert (rc == 0); - } - } -} - -bool zmq::sub_t::match (msg_t *msg_) -{ - return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); -} - diff --git a/src/sub.hpp b/src/sub.hpp index 91a5b65..b5980ba 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -22,8 +22,6 @@ #define __ZMQ_SUB_HPP_INCLUDED__ #include "xsub.hpp" -#include "trie.hpp" -#include "msg.hpp" namespace zmq { @@ -38,28 +36,11 @@ namespace zmq protected: int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (class msg_t *msg_, int options_); + int xsend (class msg_t *msg_, int flags_); bool xhas_out (); - int xrecv (class msg_t *msg_, int flags_); - bool xhas_in (); private: - // Check whether the message matches at least one subscription. - bool match (class msg_t *msg_); - - // The repository of subscriptions. - trie_t subscriptions; - - // If true, 'message' contains a matching message to return on the - // next recv call. - bool has_message; - msg_t message; - - // If true, part of a multipart message was already received, but - // there are following parts still waiting. - bool more; - sub_t (const sub_t&); const sub_t &operator = (const sub_t&); }; diff --git a/src/transient_session.cpp b/src/transient_session.cpp index 10a086f..b3c80b0 100644 --- a/src/transient_session.cpp +++ b/src/transient_session.cpp @@ -30,14 +30,14 @@ zmq::transient_session_t::~transient_session_t () { } -bool zmq::transient_session_t::attached (const blob_t &peer_identity_) +bool zmq::transient_session_t::xattached (const blob_t &peer_identity_) { // Transient session is always valid. return true; } -void zmq::transient_session_t::detached () +bool zmq::transient_session_t::xdetached () { // There's no way to reestablish a transient session. Tear it down. - terminate (); + return false; } diff --git a/src/transient_session.hpp b/src/transient_session.hpp index c70a2d7..55c2b8a 100644 --- a/src/transient_session.hpp +++ b/src/transient_session.hpp @@ -40,8 +40,8 @@ namespace zmq private: // Handlers for events from session base class. - bool attached (const blob_t &peer_identity_); - void detached (); + bool xattached (const blob_t &peer_identity_); + bool xdetached (); transient_session_t (const transient_session_t&); const transient_session_t &operator = (const transient_session_t&); diff --git a/src/trie.cpp b/src/trie.cpp index 4198ff3..cd6cb7b 100644 --- a/src/trie.cpp +++ b/src/trie.cpp @@ -50,12 +50,12 @@ zmq::trie_t::~trie_t () } } -void zmq::trie_t::add (unsigned char *prefix_, size_t size_) +bool zmq::trie_t::add (unsigned char *prefix_, size_t size_) { // We are at the node corresponding to the prefix. We are done. if (!size_) { ++refcnt; - return; + return refcnt == 1; } unsigned char c = *prefix_; @@ -74,7 +74,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) count = (min < c ? c - min : min - c) + 1; next.table = (trie_t**) malloc (sizeof (trie_t*) * count); - alloc_assert (next.table); + zmq_assert (next.table); for (unsigned short i = 0; i != count; ++i) next.table [i] = 0; min = std::min (min, c); @@ -111,26 +111,28 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) if (count == 1) { if (!next.node) { next.node = new (std::nothrow) trie_t; - alloc_assert (next.node); + zmq_assert (next.node); } - next.node->add (prefix_ + 1, size_ - 1); + return next.node->add (prefix_ + 1, size_ - 1); } else { if (!next.table [c - min]) { next.table [c - min] = new (std::nothrow) trie_t; - alloc_assert (next.table [c - min]); + zmq_assert (next.table [c - min]); } - next.table [c - min]->add (prefix_ + 1, size_ - 1); + return next.table [c - min]->add (prefix_ + 1, size_ - 1); } } bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_) { + // TODO: Shouldn't an error be reported if the key does not exist? + if (!size_) { if (!refcnt) return false; refcnt--; - return true; + return refcnt == 0; } unsigned char c = *prefix_; @@ -179,3 +181,48 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_) size_--; } } + +void zmq::trie_t::apply (void (*func_) (unsigned char *data_, size_t size_, + void *arg_), void *arg_) +{ + unsigned char *buff = NULL; + apply_helper (&buff, 0, 0, func_, arg_); + free (buff); +} + +void zmq::trie_t::apply_helper ( + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_) +{ + // If this node is a subscription, apply the function. + if (refcnt) + func_ (*buff_, buffsize_, arg_); + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + zmq_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + (*buff_) [buffsize_] = min; + buffsize_++; + next.node->apply_helper (buff_, buffsize_, maxbuffsize_, func_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned char c = 0; c != count; c++) { + (*buff_) [buffsize_] = min + c; + if (next.table [c]) + next.table [c]->apply_helper (buff_, buffsize_ + 1, maxbuffsize_, + func_, arg_); + } +} + diff --git a/src/trie.hpp b/src/trie.hpp index dbf1cb1..a2b55c6 100644 --- a/src/trie.hpp +++ b/src/trie.hpp @@ -35,12 +35,28 @@ namespace zmq trie_t (); ~trie_t (); - void add (unsigned char *prefix_, size_t size_); + // Add key to the trie. Returns true if this is a new item in the trie + // rather than a duplicate. + bool add (unsigned char *prefix_, size_t size_); + + // Remove key from the trie. Returns true if the item is actually + // removed from the trie. bool rm (unsigned char *prefix_, size_t size_); + + // Check whether particular key is in the trie. bool check (unsigned char *data_, size_t size_); + // Apply the function supplied to each subscription in the trie. + void apply (void (*func_) (unsigned char *data_, size_t size_, + void *arg_), void *arg_); + private: + void apply_helper ( + unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_); + uint32_t refcnt; unsigned char min; unsigned short count; diff --git a/src/xpub.cpp b/src/xpub.cpp index d5dba9f..9158cf4 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -18,6 +18,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "xpub.hpp" #include "pipe.hpp" #include "err.hpp" @@ -37,6 +39,12 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); dist.attach (pipe_); + fq.attach (pipe_); +} + +void zmq::xpub_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); } void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) @@ -46,11 +54,37 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) void zmq::xpub_t::xterminated (pipe_t *pipe_) { + // Remove the pipe from the trie. If there are topics that nobody + // is interested in anymore, send corresponding unsubscriptions + // upstream. + subscriptions.rm (pipe_, send_unsubscription, this); + dist.terminated (pipe_); + fq.terminated (pipe_); } int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { + // First, process any (un)subscriptions from downstream. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (&sub, 0, &pipe); + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); + + // Apply the subscription to the trie. If it's not a duplicate, + // store it so that it can be passed to used on next recv call. + if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + } + sub.close (); + return dist.send (msg_, flags_); } @@ -61,12 +95,92 @@ bool zmq::xpub_t::xhas_out () int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) { - errno = EAGAIN; - return -1; + // If there is at least one + if (!pending.empty ()) { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init_size (pending.front ().size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), pending.front ().data (), + pending.front ().size ()); + pending.pop_front (); + return 0; + } + + // Grab and apply next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (msg_, 0, &pipe); + if (rc != 0) + return -1; + if (!apply_subscription (msg_, pipe)) { +// TODO: This should be a loop rather! + msg_->close (); + msg_->init (); + errno = EAGAIN; + return -1; + } + return 0; } bool zmq::xpub_t::xhas_in () { - return false; + if (!pending.empty ()) + return true; + + // Even if there are subscriptions in the fair-queuer they may be + // duplicates. Thus, we have to check by hand wheter there is any + // subscription available to pass upstream. + // First, process any (un)subscriptions from downstream. + msg_t sub; + sub.init (); + while (true) { + + // Grab next subscription. + pipe_t *pipe; + int rc = fq.recvpipe (&sub, 0, &pipe); + if (rc != 0 && errno == EAGAIN) { + sub.close (); + return false; + } + errno_assert (rc == 0); + + // Apply the subscription to the trie. If it's not a duplicate store + // it so that it can be passed to used on next recv call. + if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) { + pending.push_back (blob_t ((unsigned char*) sub.data (), + sub.size ())); + sub.close (); + return true; + } + } +} + +bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_) +{ + unsigned char *data = (unsigned char*) sub_->data (); + size_t size = sub_->size (); + zmq_assert (size > 0 && (*data == 0 || *data == 1)); + + if (*data == 0) + return subscriptions.rm (data + 1, size - 1, pipe_); + else + return subscriptions.add (data + 1, size - 1, pipe_); +} + +void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, + void *arg_) +{ + xpub_t *self = (xpub_t*) arg_; + + if (self->options.type != ZMQ_PUB) { + + // Place the unsubscription to the queue of pending (un)sunscriptions + // to be retrived by the user later on. + xpub_t *self = (xpub_t*) arg_; + blob_t unsub (size_ + 1, 0); + unsub [0] = 0; + memcpy (&unsub [1], data_, size_); + self->pending.push_back (unsub); + } } diff --git a/src/xpub.hpp b/src/xpub.hpp index 8a6ff73..b824548 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -21,9 +21,14 @@ #ifndef __ZMQ_XPUB_HPP_INCLUDED__ #define __ZMQ_XPUB_HPP_INCLUDED__ +#include <deque> + #include "socket_base.hpp" +#include "mtrie.hpp" #include "array.hpp" +#include "blob.hpp" #include "dist.hpp" +#include "fq.hpp" namespace zmq { @@ -42,14 +47,35 @@ namespace zmq bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); + void xread_activated (class pipe_t *pipe_); void xwrite_activated (class pipe_t *pipe_); void xterminated (class pipe_t *pipe_); private: + // Applies the subscription to the trie. Return false if it is a + // duplicate. + bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_); + + // Function to be applied to the trie to send all the subsciptions + // upstream. + static void send_unsubscription (unsigned char *data_, size_t size_, + void *arg_); + + // List of all subscriptions mapped to corresponding pipes. + mtrie_t subscriptions; + // Distributor of messages holding the list of outbound pipes. dist_t dist; + // Object to fair-queue the subscription requests. + fq_t fq; + + // List of pending (un)subscriptions, ie. those that were already + // applied to the trie, but not yet received by the user. + typedef std::deque <blob_t> pending_t; + pending_t pending; + xpub_t (const xpub_t&); const xpub_t &operator = (const xpub_t&); }; diff --git a/src/xsub.cpp b/src/xsub.cpp index 6928d82..729f6a4 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -24,19 +24,30 @@ #include "err.hpp" zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) + socket_base_t (parent_, tid_), + has_message (false), + more (false) { options.type = ZMQ_XSUB; + int rc = message.init (); + errno_assert (rc == 0); } zmq::xsub_t::~xsub_t () { + int rc = message.close (); + errno_assert (rc == 0); } void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); fq.attach (pipe_); + dist.attach (pipe_); + + // Send all the cached subscriptions to the new upstream peer. + subscriptions.apply (send_subscription, pipe_); + pipe_->flush (); } void zmq::xsub_t::xread_activated (pipe_t *pipe_) @@ -44,20 +55,49 @@ void zmq::xsub_t::xread_activated (pipe_t *pipe_) fq.activated (pipe_); } +void zmq::xsub_t::xwrite_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + void zmq::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); + dist.terminated (pipe_); } -int zmq::xsub_t::xsend (msg_t *msg_, int options_) +void zmq::xsub_t::xhiccuped (pipe_t *pipe_) { - // TODO: Once we'll send the subscription upstream here. For now - // just empty the message. - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - return 0; + // Send all the cached subscriptions to the hiccuped pipe. + subscriptions.apply (send_subscription, pipe_); + pipe_->flush (); +} + +int zmq::xsub_t::xsend (msg_t *msg_, int flags_) +{ + size_t size = msg_->size (); + unsigned char *data = (unsigned char*) msg_->data (); + + // Malformed subscriptions. + if (size < 1 || (*data != 0 && *data != 1)) { + errno = EINVAL; + return -1; + } + + // Process the subscription. + if (*data == 1) { + if (subscriptions.add (data + 1, size - 1)) + return dist.send (msg_, flags_); + else + return 0; + } + else if (*data == 0) { + if (subscriptions.rm (data + 1, size - 1)) + return dist.send (msg_, flags_); + else + return 0; + } + zmq_assert (false); } bool zmq::xsub_t::xhas_out () @@ -66,13 +106,109 @@ bool zmq::xsub_t::xhas_out () return true; } -int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_) +int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) { - return fq.recv (msg_, flags_); + // If there's already a message prepared by a previous call to zmq_poll, + // return it straight ahead. + if (has_message) { + int rc = msg_->move (message); + errno_assert (rc == 0); + has_message = false; + more = msg_->flags () & msg_t::more; + return 0; + } + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages which breaks the non-blocking recv + // semantics. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_, flags_); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Check whether the message matches at least one subscription. + // Non-initial parts of the message are passed + if (more || match (msg_)) { + more = msg_->flags () & msg_t::more; + return 0; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (msg_->flags () & msg_t::more) { + rc = fq.recv (msg_, ZMQ_DONTWAIT); + zmq_assert (rc == 0); + } + } } bool zmq::xsub_t::xhas_in () { - return fq.has_in (); + // There are subsequent parts of the partly-read message available. + if (more) + return true; + + // If there's already a message prepared by a previous call to zmq_poll, + // return straight ahead. + if (has_message) + return true; + + // TODO: This can result in infinite loop in the case of continuous + // stream of non-matching messages. + while (true) { + + // Get a message using fair queueing algorithm. + int rc = fq.recv (&message, ZMQ_DONTWAIT); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + zmq_assert (errno == EAGAIN); + return false; + } + + // Check whether the message matches at least one subscription. + if (match (&message)) { + has_message = true; + return true; + } + + // Message doesn't match. Pop any remaining parts of the message + // from the pipe. + while (message.flags () & msg_t::more) { + rc = fq.recv (&message, ZMQ_DONTWAIT); + zmq_assert (rc == 0); + } + } +} + +bool zmq::xsub_t::match (msg_t *msg_) +{ + return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); } +void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, + void *arg_) +{ + pipe_t *pipe = (pipe_t*) arg_; + + // Create the subsctription message. + msg_t msg; + int rc = msg.init_size (size_ + 1); + zmq_assert (rc == 0); + unsigned char *data = (unsigned char*) msg.data (); + data [0] = 1; + memcpy (data + 1, data_, size_); + + // Send it to the pipe. + bool sent = pipe->write (&msg); + zmq_assert (sent); + msg.close (); +} + + diff --git a/src/xsub.hpp b/src/xsub.hpp index ebd6259..03b3178 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -22,7 +22,10 @@ #define __ZMQ_XSUB_HPP_INCLUDED__ #include "socket_base.hpp" +#include "dist.hpp" #include "fq.hpp" +#include "trie.hpp" +#include "msg.hpp" namespace zmq { @@ -39,18 +42,43 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - int xsend (class msg_t *msg_, int options_); + int xsend (class msg_t *msg_, int flags_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (class pipe_t *pipe_); + void xwrite_activated (class pipe_t *pipe_); + void xhiccuped (pipe_t *pipe_); void xterminated (class pipe_t *pipe_); private: + // Check whether the message matches at least one subscription. + bool match (class msg_t *msg_); + + // Function to be applied to the trie to send all the subsciptions + // upstream. + static void send_subscription (unsigned char *data_, size_t size_, + void *arg_); + // Fair queueing object for inbound pipes. fq_t fq; + // Object for distributing the subscriptions upstream. + dist_t dist; + + // The repository of subscriptions. + trie_t subscriptions; + + // If true, 'message' contains a matching message to return on the + // next recv call. + bool has_message; + msg_t message; + + // If true, part of a multipart message was already received, but + // there are following parts still waiting. + bool more; + xsub_t (const xsub_t&); const xsub_t &operator = (const xsub_t&); }; |