diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-30 10:07:34 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-30 10:07:34 +0200 |
commit | 0b59866a84f733e5a53b0d2f32570581691747ef (patch) | |
tree | 8861d97915544dc4385177931f299a6f27603c92 /src | |
parent | 311fb0d852374e769d8ff791c9df38f0464960c6 (diff) |
Patches from sub-forward branch incorporated
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/command.hpp | 8 | ||||
-rw-r--r-- | src/connect_session.cpp | 7 | ||||
-rw-r--r-- | src/connect_session.hpp | 4 | ||||
-rw-r--r-- | src/fq.cpp | 7 | ||||
-rw-r--r-- | src/fq.hpp | 4 | ||||
-rw-r--r-- | src/mtrie.cpp | 218 | ||||
-rw-r--r-- | src/mtrie.hpp | 83 | ||||
-rw-r--r-- | src/named_session.cpp | 5 | ||||
-rw-r--r-- | src/named_session.hpp | 4 | ||||
-rw-r--r-- | src/object.cpp | 25 | ||||
-rw-r--r-- | src/object.hpp | 2 | ||||
-rw-r--r-- | src/pipe.cpp | 43 | ||||
-rw-r--r-- | src/pipe.hpp | 13 | ||||
-rw-r--r-- | src/pub.cpp | 13 | ||||
-rw-r--r-- | src/pub.hpp | 4 | ||||
-rw-r--r-- | src/session.cpp | 28 | ||||
-rw-r--r-- | src/session.hpp | 21 | ||||
-rw-r--r-- | src/socket_base.cpp | 14 | ||||
-rw-r--r-- | src/socket_base.hpp | 6 | ||||
-rw-r--r-- | src/sub.cpp | 115 | ||||
-rw-r--r-- | src/sub.hpp | 21 | ||||
-rw-r--r-- | src/transient_session.cpp | 6 | ||||
-rw-r--r-- | src/transient_session.hpp | 4 | ||||
-rw-r--r-- | src/trie.cpp | 63 | ||||
-rw-r--r-- | src/trie.hpp | 18 | ||||
-rw-r--r-- | src/xpub.cpp | 120 | ||||
-rw-r--r-- | src/xpub.hpp | 26 | ||||
-rw-r--r-- | src/xsub.cpp | 160 | ||||
-rw-r--r-- | src/xsub.hpp | 30 |
30 files changed, 884 insertions, 190 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index de83d76..46c66ee 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ libzmq_la_SOURCES = \ likely.hpp \ mailbox.hpp \ msg.hpp \ + mtrie.hpp \ mutex.hpp \ named_session.hpp \ object.hpp \ @@ -97,6 +98,7 @@ libzmq_la_SOURCES = \ lb.cpp \ mailbox.cpp \ msg.cpp \ + mtrie.cpp \ named_session.cpp \ object.cpp \ options.cpp \ diff --git a/src/command.hpp b/src/command.hpp index ff7b551..15cee0a 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -42,6 +42,7 @@ namespace zmq bind, activate_read, activate_write, + hiccup, pipe_term, pipe_term_ack, term_req, @@ -95,6 +96,13 @@ namespace zmq uint64_t msgs_read; } activate_write; + // Sent by pipe reader to writer after creating a new inpipe. + // The parameter is actually of type pipe_t::upipe_t, however, + // its definition is private so we'll have to do with void*. + struct { + void *pipe; + } hiccup; + // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. struct { diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 9a29bf1..fe7332a 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -111,7 +111,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_assert (false); } -bool zmq::connect_session_t::attached (const blob_t &peer_identity_) +bool zmq::connect_session_t::xattached (const blob_t &peer_identity_) { // If there was no previous connection... if (!connected) { @@ -153,9 +153,12 @@ bool zmq::connect_session_t::attached (const blob_t &peer_identity_) return true; } -void zmq::connect_session_t::detached () +bool zmq::connect_session_t::xdetached () { // Reconnect. start_connecting (true); + + // Don't tear the session down. + return true; } diff --git a/src/connect_session.hpp b/src/connect_session.hpp index 93e2704..3b8a26b 100644 --- a/src/connect_session.hpp +++ b/src/connect_session.hpp @@ -44,8 +44,8 @@ namespace zmq private: // Handlers for events from session base class. - bool attached (const blob_t &peer_identity_); - void detached (); + bool xattached (const blob_t &peer_identity_); + bool xdetached (); // Start the connection process. void start_connecting (bool wait_); @@ -63,6 +63,11 @@ void zmq::fq_t::activated (pipe_t *pipe_) int zmq::fq_t::recv (msg_t *msg_, int flags_) { + return recvpipe (msg_, flags_, NULL); +} + +int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) +{ // Deallocate old content of the message. int rc = msg_->close (); errno_assert (rc == 0); @@ -83,6 +88,8 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_) // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { + if (pipe_) + *pipe_ = pipes [current]; more = msg_->flags () & msg_t::more; if (!more) { current++; @@ -29,8 +29,9 @@ namespace zmq { // Class manages a set of inbound pipes. On receive it performs fair - // queueing (RFC970) so that senders gone berserk won't cause denial of + // queueing so that senders gone berserk won't cause denial of // service for decent senders. + class fq_t { public: @@ -43,6 +44,7 @@ namespace zmq void terminated (pipe_t *pipe_); int recv (msg_t *msg_, int flags_); + int recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_); bool has_in (); private: diff --git a/src/mtrie.cpp b/src/mtrie.cpp new file mode 100644 index 0000000..ac1fc31 --- /dev/null +++ b/src/mtrie.cpp @@ -0,0 +1,218 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include <new> +#include <algorithm> + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#include "err.hpp" +#include "pipe.hpp" +#include "mtrie.hpp" + +zmq::mtrie_t::mtrie_t () : + min (0), + count (0) +{ +} + +zmq::mtrie_t::~mtrie_t () +{ + if (count == 1) + delete next.node; + else if (count > 1) { + for (unsigned short i = 0; i != count; ++i) + if (next.table [i]) + delete next.table [i]; + free (next.table); + } +} + +bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = pipes.empty (); + pipes.insert (pipe_); + return result; + } + + unsigned char c = *prefix_; + if (c < min || c >= min + count) { + + // The character is out of range of currently handled + // charcters. We have to extend the table. + if (!count) { + min = c; + count = 1; + next.node = NULL; + } + else if (count == 1) { + unsigned char oldc = min; + mtrie_t *oldp = next.node; + count = (min < c ? c - min : min - c) + 1; + next.table = (mtrie_t**) + malloc (sizeof (mtrie_t*) * count); + zmq_assert (next.table); + for (unsigned short i = 0; i != count; ++i) + next.table [i] = 0; + min = std::min (min, c); + next.table [oldc - min] = oldp; + } + else if (min < c) { + + // The new character is above the current character range. + unsigned short old_count = count; + count = c - min + 1; + next.table = (mtrie_t**) realloc ((void*) next.table, + sizeof (mtrie_t*) * count); + zmq_assert (next.table); + for (unsigned short i = old_count; i != count; i++) + next.table [i] = NULL; + } + else { + + // The new character is below the current character range. + unsigned short old_count = count; + count = (min + old_count) - c; + next.table = (mtrie_t**) realloc ((void*) next.table, + sizeof (mtrie_t*) * count); + zmq_assert (next.table); + memmove (next.table + min - c, next.table, + old_count * sizeof (mtrie_t*)); + for (unsigned short i = 0; i != min - c; i++) + next.table [i] = NULL; + min = c; + } + } + + // If next node does not exist, create one. + if (count == 1) { + if (!next.node) { + next.node = new (std::nothrow) mtrie_t; + zmq_assert (next.node); + } + return next.node->add (prefix_ + 1, size_ - 1, pipe_); + } + else { + if (!next.table [c - min]) { + next.table [c - min] = new (std::nothrow) mtrie_t; + zmq_assert (next.table [c - min]); + } + return next.table [c - min]->add (prefix_ + 1, size_ - 1, pipe_); + } +} + + +void zmq::mtrie_t::rm (pipe_t *pipe_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_) +{ + unsigned char *buff = NULL; + rm_helper (pipe_, &buff, 0, 0, func_, arg_); + free (buff); +} + +void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_) +{ + // Remove the subscription from this node. + if (pipes.erase (pipe_) && pipes.empty ()) + func_ (*buff_, buffsize_, arg_); + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + (*buff_) [buffsize_] = min; + buffsize_++; + next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, + func_, arg_); + return; + } + + // If there are multiple subnodes. + for (unsigned char c = 0; c != count; c++) { + (*buff_) [buffsize_] = min + c; + if (next.table [c]) + next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, + maxbuffsize_, func_, arg_); + } +} + +bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_) +{ + if (!size_) { + pipes_t::size_type erased = pipes.erase (pipe_); + zmq_assert (erased == 1); + return pipes.empty (); + } + + unsigned char c = *prefix_; + if (!count || c < min || c >= min + count) + return false; + + mtrie_t *next_node = + count == 1 ? next.node : next.table [c - min]; + + if (!next_node) + return false; + + return next_node->rm (prefix_ + 1, size_ - 1, pipe_); +} + +void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) +{ + // Merge the subscriptions from this node to the resultset. + pipes_.insert (pipes.begin (), pipes.end ()); + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + next.node->match (data_ + 1, size_ - 1, pipes_); + return; + } + + // If there are multiple subnodes. + for (unsigned char c = 0; c != count; c++) { + if (next.table [c]) + next.table [c]->match (data_ + 1, size_ - 1, pipes_); + } +} + diff --git a/src/mtrie.hpp b/src/mtrie.hpp new file mode 100644 index 0000000..99f20e2 --- /dev/null +++ b/src/mtrie.hpp @@ -0,0 +1,83 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_MTRIE_HPP_INCLUDED__ +#define __ZMQ_MTRIE_HPP_INCLUDED__ + +#include <stddef.h> +#include <set> + +#include "stdint.hpp" + +namespace zmq +{ + + // Multi-trie. Each node in the trie is a set of pointers to pipes. + + class mtrie_t + { + public: + + typedef std::set <class pipe_t*> pipes_t; + + mtrie_t (); + ~mtrie_t (); + + // Add key to the trie. Returns true if it's a new subscription + // rather than a duplicate. + bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + + // Remove all subscriptions for a specific peer from the trie. + // If there are no subscriptions left on some topics, invoke the + // supplied callback function. + void rm (class pipe_t *pipe_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_); + + // Remove specific subscription from the trie. Return true is it was + // actually removed rather than de-duplicated. + bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + + // Get all matching pipes. + void match (unsigned char *data_, size_t size_, pipes_t &pipes_); + + private: + + void rm_helper (class pipe_t *pipe_, unsigned char **buff_, + size_t buffsize_, size_t maxbuffsize_, + void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_); + + pipes_t pipes; + unsigned char min; + unsigned short count; + union { + class mtrie_t *node; + class mtrie_t **table; + } next; + + mtrie_t (const mtrie_t&); + const mtrie_t &operator = (const mtrie_t&); + }; + +} + +#endif + diff --git a/src/named_session.cpp b/src/named_session.cpp index 34f4af4..8e43fb0 100644 --- a/src/named_session.cpp +++ b/src/named_session.cpp @@ -45,7 +45,7 @@ zmq::named_session_t::~named_session_t () unregister_session (peer_identity); } -bool zmq::named_session_t::attached (const blob_t &peer_identity_) +bool zmq::named_session_t::xattached (const blob_t &peer_identity_) { // Double check that identities match. zmq_assert (peer_identity == peer_identity_); @@ -58,9 +58,10 @@ bool zmq::named_session_t::attached (const blob_t &peer_identity_) return true; } -void zmq::named_session_t::detached () +bool zmq::named_session_t::xdetached () { // Do nothing. Named sessions are never destroyed because of disconnection. // Neither they have to actively reconnect. + return true; } diff --git a/src/named_session.hpp b/src/named_session.hpp index e3b5aa3..10af8cc 100644 --- a/src/named_session.hpp +++ b/src/named_session.hpp @@ -40,8 +40,8 @@ namespace zmq ~named_session_t (); // Handlers for events from session base class. - bool attached (const blob_t &peer_identity_); - void detached (); + bool xattached (const blob_t &peer_identity_); + bool xdetached (); private: diff --git a/src/object.cpp b/src/object.cpp index 0a06d5f..ba77bf0 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -74,7 +74,7 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::plug: process_plug (); process_seqnum (); - return; + break; case command_t::own: process_own (cmd_.args.own.object); @@ -96,9 +96,13 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; + case command_t::hiccup: + process_hiccup (cmd_.args.hiccup.pipe); + break; + case command_t::pipe_term: process_pipe_term (); - return; + break; case command_t::pipe_term_ack: process_pipe_term_ack (); @@ -290,6 +294,18 @@ void zmq::object_t::send_activate_write (pipe_t *destination_, send_command (cmd); } +void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) +{ + command_t cmd; +#if defined ZMQ_MAKE_VALGRIND_HAPPY + memset (&cmd, 0, sizeof (cmd)); +#endif + cmd.destination = destination_; + cmd.type = command_t::hiccup; + cmd.args.hiccup.pipe = pipe_; + send_command (cmd); +} + void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; @@ -418,6 +434,11 @@ void zmq::object_t::process_activate_write (uint64_t msgs_read_) zmq_assert (false); } +void zmq::object_t::process_hiccup (void *pipe_) +{ + zmq_assert (false); +} + void zmq::object_t::process_pipe_term () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 0f47670..fbad0ea 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -71,6 +71,7 @@ namespace zmq void send_activate_read (class pipe_t *destination_); void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); + void send_hiccup (class pipe_t *destination_, void *pipe_); void send_pipe_term (class pipe_t *destination_); void send_pipe_term_ack (class pipe_t *destination_); void send_term_req (class own_t *destination_, @@ -92,6 +93,7 @@ namespace zmq const blob_t &peer_identity_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); + virtual void process_hiccup (void *pipe_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 48fc3e5..fd7223c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) } } +void zmq::pipe_t::process_hiccup (void *pipe_) +{ + // Destroy old outpipe. Note that the read end of the pipe was already + // migrated to this thread. + zmq_assert (outpipe); + outpipe->flush (); + msg_t msg; + while (outpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } + delete outpipe; + + // Plug in the new outpipe. + zmq_assert (pipe_); + outpipe = (upipe_t*) pipe_; + out_active = true; + + // If appropriate, notify the user about the hiccup. + if (state == active) + sink->hiccuped (this); +} + void zmq::pipe_t::process_pipe_term () { // This is the simple case of peer-induced termination. If there are no @@ -379,3 +402,23 @@ void zmq::pipe_t::delimit () // Delimiter in any other state is invalid. zmq_assert (false); } + +void zmq::pipe_t::hiccup () +{ + // If termination is already under way do nothing. + if (state != active) + return; + + // We'll drop the pointer to the inpipe. From now on, the peer is + // responsible for deallocating it. + inpipe = NULL; + + // Create new inpipe. + inpipe = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (inpipe); + in_active = true; + + // Notify the peer about the hiccup. + send_hiccup (peer, (void*) inpipe); +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index 3087ab8..bf34a83 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,6 +44,7 @@ namespace zmq virtual void read_activated (class pipe_t *pipe_) = 0; virtual void write_activated (class pipe_t *pipe_) = 0; + virtual void hiccuped (class pipe_t *pipe_) = 0; virtual void terminated (class pipe_t *pipe_) = 0; }; @@ -86,6 +87,11 @@ namespace zmq // Flush the messages downsteam. void flush (); + // Temporaraily disconnects the inbound message stream and drops + // all the messages on the fly. Causes 'hiccuped' event to be generated + // in the peer. + void hiccup (); + // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' // event. @@ -93,18 +99,19 @@ namespace zmq private: + // Type of the underlying lock-free pipe. + typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; + // Command handlers. void process_activate_read (); void process_activate_write (uint64_t msgs_read_); + void process_hiccup (void *pipe_); void process_pipe_term (); void process_pipe_term_ack (); // Handler for delimiter read from the pipe. void delimit (); - // Type of the underlying lock-free pipe. - typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; - // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, diff --git a/src/pub.cpp b/src/pub.cpp index 8558265..4787c32 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -30,3 +30,16 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : zmq::pub_t::~pub_t () { } + +int zmq::pub_t::xrecv (class msg_t *msg_, int flags_) +{ + // Messages cannot be received from PUB socket. + errno = ENOTSUP; + return -1; +} + +bool zmq::pub_t::xhas_in () +{ + return false; +} + diff --git a/src/pub.hpp b/src/pub.hpp index e69af3c..c8db55f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -33,6 +33,10 @@ namespace zmq pub_t (class ctx_t *parent_, uint32_t tid_); ~pub_t (); + // Implementations of virtual functions from socket_base_t. + int xrecv (class msg_t *msg_, int flags_); + bool xhas_in (); + private: pub_t (const pub_t&); diff --git a/src/session.cpp b/src/session.cpp index 5601402..c9f4fdb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -146,6 +146,13 @@ void zmq::session_t::write_activated (pipe_t *pipe_) engine->activate_in (); } +void zmq::session_t::hiccuped (pipe_t *pipe_) +{ + // Hiccups are always sent from session to socket, not the other + // way round. + zmq_assert (false); +} + void zmq::session_t::process_plug () { } @@ -287,4 +294,25 @@ void zmq::session_t::unregister_session (const blob_t &name_) socket->unregister_session (name_); } +bool zmq::session_t::attached (const blob_t &peer_identity_) +{ + return xattached (peer_identity_); +} + +void zmq::session_t::detached () +{ + if (!xdetached ()) { + + // Derived session type have asked for session termination. + terminate (); + return; + } + + // For subscriber sockets we hiccup the inbound pipe, which will cause + // the socket object to resend all the subscriptions. + if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) + pipe->hiccup (); +} + + diff --git a/src/session.hpp b/src/session.hpp index 8bca735..1e32722 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -44,9 +44,7 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (class pipe_t *pipe_); - // i_inout interface implementation. Note that detach method is not - // implemented by generic session. Different session types may handle - // engine disconnection in different ways. + // i_inout interface implementation. bool read (msg_t *msg_); bool write (msg_t *msg_); void flush (); @@ -55,17 +53,19 @@ namespace zmq // i_pipe_events interface implementation. void read_activated (class pipe_t *pipe_); void write_activated (class pipe_t *pipe_); + void hiccuped (class pipe_t *pipe_); void terminated (class pipe_t *pipe_); protected: - // Two events for the derived session type. Attached is triggered - // when session is attached to a peer. The function can reject the new - // peer by returning false. Detached is triggered at the beginning of + // Events from the engine. Attached is triggered when session is + // attached to a peer. The function can reject the new peer by + // returning false. Detached is triggered at the beginning of // the termination process when session is about to be detached from - // the peer. - virtual bool attached (const blob_t &peer_identity_) = 0; - virtual void detached () = 0; + // the peer. If it returns false, session will be terminated. + // To be overloaded by the derived session type. + virtual bool xattached (const blob_t &peer_identity_) = 0; + virtual bool xdetached () = 0; // Returns true if there is an engine attached to the session. bool has_engine (); @@ -78,6 +78,9 @@ namespace zmq private: + bool attached (const blob_t &peer_identity_); + void detached (); + // Handlers for incoming commands. void process_plug (); void process_attach (struct i_engine *engine_, diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1682c05..59e2653 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -764,7 +764,7 @@ bool zmq::socket_base_t::xhas_out () return false; } -int zmq::socket_base_t::xsend (msg_t *msg_, int options_) +int zmq::socket_base_t::xsend (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; @@ -775,7 +775,7 @@ bool zmq::socket_base_t::xhas_in () return false; } -int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) +int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; @@ -790,6 +790,11 @@ void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) zmq_assert (false); } +void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available @@ -837,6 +842,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_) xwrite_activated (pipe_); } +void zmq::socket_base_t::hiccuped (pipe_t *pipe_) +{ + xhiccuped (pipe_); +} + void zmq::socket_base_t::terminated (pipe_t *pipe_) { // Notify the specific socket type about the pipe termination. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 98 |