diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-07-15 08:16:40 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-07-15 08:16:40 +0200 |
commit | e4f98d1e100c225abca67b4aad90be08a729e02f (patch) | |
tree | 22da4a529e8a0cc9b051b1367dca38bf6fb5f007 /src | |
parent | 73630de98aeb8add878b1d421aa2bfc22c735c63 (diff) |
ROUTER and DEALER sockets removed
To be replaced by new generic socket type
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/dealer.cpp | 77 | ||||
-rw-r--r-- | src/dealer.hpp | 65 | ||||
-rw-r--r-- | src/router.cpp | 292 | ||||
-rw-r--r-- | src/router.hpp | 105 | ||||
-rw-r--r-- | src/socket_base.cpp | 8 |
6 files changed, 0 insertions, 551 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 9c2fbf1..e78eb1c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,7 +15,6 @@ libzmq_la_SOURCES = \ config.hpp \ connect_session.hpp \ ctx.hpp \ - dealer.hpp \ decoder.hpp \ devpoll.hpp \ dist.hpp \ @@ -56,7 +55,6 @@ libzmq_la_SOURCES = \ reaper.hpp \ rep.hpp \ req.hpp \ - router.hpp \ select.hpp \ semaphore.hpp \ session.hpp \ @@ -87,7 +85,6 @@ libzmq_la_SOURCES = \ command.cpp \ ctx.cpp \ connect_session.cpp \ - dealer.cpp \ decoder.cpp \ devpoll.cpp \ dist.cpp \ @@ -121,7 +118,6 @@ libzmq_la_SOURCES = \ random.cpp \ rep.cpp \ req.cpp \ - router.cpp \ select.cpp \ session.cpp \ signaler.cpp \ diff --git a/src/dealer.cpp b/src/dealer.cpp deleted file mode 100644 index e1e141f..0000000 --- a/src/dealer.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "dealer.hpp" -#include "err.hpp" -#include "msg.hpp" - -zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) -{ - options.type = ZMQ_XREQ; -} - -zmq::dealer_t::~dealer_t () -{ -} - -void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ - zmq_assert (pipe_); - fq.attach (pipe_); - lb.attach (pipe_); -} - -int zmq::dealer_t::xsend (msg_t *msg_, int flags_) -{ - return lb.send (msg_, flags_); -} - -int zmq::dealer_t::xrecv (msg_t *msg_, int flags_) -{ - return fq.recv (msg_, flags_); -} - -bool zmq::dealer_t::xhas_in () -{ - return fq.has_in (); -} - -bool zmq::dealer_t::xhas_out () -{ - return lb.has_out (); -} - -void zmq::dealer_t::xread_activated (pipe_t *pipe_) -{ - fq.activated (pipe_); -} - -void zmq::dealer_t::xwrite_activated (pipe_t *pipe_) -{ - lb.activated (pipe_); -} - -void zmq::dealer_t::xterminated (pipe_t *pipe_) -{ - fq.terminated (pipe_); - lb.terminated (pipe_); -} - diff --git a/src/dealer.hpp b/src/dealer.hpp deleted file mode 100644 index 4f85c46..0000000 --- a/src/dealer.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_DEALER_HPP_INCLUDED__ -#define __ZMQ_DEALER_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "fq.hpp" -#include "lb.hpp" - -namespace zmq -{ - - class dealer_t : - public socket_base_t - { - public: - - dealer_t (class ctx_t *parent_, uint32_t tid_); - ~dealer_t (); - - protected: - - // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); - - private: - - // Messages are fair-queued from inbound pipes. And load-balanced to - // the outbound pipes. - fq_t fq; - lb_t lb; - - dealer_t (const dealer_t&); - const dealer_t &operator = (const dealer_t&); - }; - -} - -#endif diff --git a/src/router.cpp b/src/router.cpp deleted file mode 100644 index 0d1b77e..0000000 --- a/src/router.cpp +++ /dev/null @@ -1,292 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "router.hpp" -#include "pipe.hpp" -#include "err.hpp" - -zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - current_in (0), - prefetched (false), - more_in (false), - current_out (NULL), - more_out (false) -{ - options.type = ZMQ_XREP; - - // On connect, pipes are created only after initial handshaking. - // That way we are aware of the peer's identity when binding to the pipes. - options.immediate_connect = false; -} - -zmq::router_t::~router_t () -{ - zmq_assert (inpipes.empty ()); - zmq_assert (outpipes.empty ()); -} - -void zmq::router_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ - zmq_assert (pipe_); - - // Add the pipe to the map out outbound pipes. - // TODO: What if new connection has same peer identity as the old one? - outpipe_t outpipe = {pipe_, true}; - bool ok = outpipes.insert (outpipes_t::value_type ( - peer_identity_, outpipe)).second; - zmq_assert (ok); - - // Add the pipe to the list of inbound pipes. - inpipe_t inpipe = {pipe_, peer_identity_, true}; - inpipes.push_back (inpipe); -} - -void zmq::router_t::xterminated (pipe_t *pipe_) -{ - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) - current_in--; - inpipes.erase (it); - if (current_in >= inpipes.size ()) - current_in = 0; - goto clean_outpipes; - } - } - zmq_assert (false); - -clean_outpipes: - for (outpipes_t::iterator it = outpipes.begin (); - it != outpipes.end (); ++it) { - if (it->second.pipe == pipe_) { - outpipes.erase (it); - if (pipe_ == current_out) - current_out = NULL; - return; - } - } - zmq_assert (false); -} - -void zmq::router_t::xread_activated (pipe_t *pipe_) -{ - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - ++it) { - if (it->pipe == pipe_) { - zmq_assert (!it->active); - it->active = true; - return; - } - } - zmq_assert (false); -} - -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ - for (outpipes_t::iterator it = outpipes.begin (); - it != outpipes.end (); ++it) { - if (it->second.pipe == pipe_) { - zmq_assert (!it->second.active); - it->second.active = true; - return; - } - } - zmq_assert (false); -} - -int zmq::router_t::xsend (msg_t *msg_, int flags_) -{ - // If this is the first part of the message it's the identity of the - // peer to send the message to. - if (!more_out) { - zmq_assert (!current_out); - - // If we have malformed message (prefix with no subsequent message) - // then just silently ignore it. - // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { - - more_out = true; - - // Find the pipe associated with the identity stored in the prefix. - // If there's no such pipe just silently ignore the message. - blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); - outpipes_t::iterator it = outpipes.find (identity); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - rc = empty.close (); - errno_assert (rc == 0); - errno = EAGAIN; - return -1; - } - rc = empty.close (); - errno_assert (rc == 0); - } - } - - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - return 0; - } - - // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - - // Push the message into the pipe. If there's no out pipe, just drop it. - if (current_out) { - bool ok = current_out->write (msg_); - zmq_assert (ok); - if (!more_out) { - current_out->flush (); - current_out = NULL; - } - } - else { - int rc = msg_->close (); - errno_assert (rc == 0); - } - - // Detach the message from the data buffer. - int rc = msg_->init (); - errno_assert (rc == 0); - - return 0; -} - -int zmq::router_t::xrecv (msg_t *msg_, int flags_) -{ - // If there is a prefetched message, return it. - if (prefetched) { - int rc = msg_->move (prefetched_msg); - errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - prefetched = false; - return 0; - } - - // Deallocate old content of the message. - int rc = msg_->close (); - errno_assert (rc == 0); - - // If we are in the middle of reading a message, just grab next part of it. - if (more_in) { - zmq_assert (inpipes [current_in].active); - bool fetched = inpipes [current_in].pipe->read (msg_); - zmq_assert (fetched); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - if (!more_in) { - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - return 0; - } - - // Round-robin over the pipes to get the next message. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - - // Try to fetch new message. - if (inpipes [current_in].active) - prefetched = inpipes [current_in].pipe->read (&prefetched_msg); - - // If we have a message, create a prefix and return it to the caller. - if (prefetched) { - int rc = msg_->init_size (inpipes [current_in].identity.size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), inpipes [current_in].identity.data (), - msg_->size ()); - msg_->set_flags (msg_t::label); - return 0; - } - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - rc = msg_->init (); - errno_assert (rc == 0); - errno = EAGAIN; - return -1; -} - -int zmq::router_t::rollback (void) -{ - if (current_out) { - current_out->rollback (); - current_out = NULL; - more_out = false; - } - return 0; -} - -bool zmq::router_t::xhas_in () -{ - // There are subsequent parts of the partly-read message available. - if (prefetched || more_in) - return true; - - // Note that messing with current doesn't break the fairness of fair - // queueing algorithm. If there are no messages available current will - // get back to its original value. Otherwise it'll point to the first - // pipe holding messages, skipping only pipes with no messages available. - for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - if (inpipes [current_in].active && - inpipes [current_in].pipe->check_read ()) - return true; - - // If me don't have a message, mark the pipe as passive and - // move to next pipe. - inpipes [current_in].active = false; - current_in++; - if (current_in >= inpipes.size ()) - current_in = 0; - } - - return false; -} - -bool zmq::router_t::xhas_out () -{ - // In theory, XREP socket is always ready for writing. Whether actual - // attempt to write succeeds depends on whitch pipe the message is going - // to be routed to. - return true; -} - - - diff --git a/src/router.hpp b/src/router.hpp deleted file mode 100644 index 9655bba..0000000 --- a/src/router.hpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ROUTER_HPP_INCLUDED__ -#define __ZMQ_ROUTER_HPP_INCLUDED__ - -#include <map> -#include <vector> - -#include "socket_base.hpp" -#include "blob.hpp" -#include "msg.hpp" - -namespace zmq -{ - - // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. - class router_t : - public socket_base_t - { - public: - - router_t (class ctx_t *parent_, uint32_t tid_); - ~router_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); - - protected: - - // Rollback any message parts that were sent but not yet flushed. - int rollback (); - - private: - - struct inpipe_t - { - class pipe_t *pipe; - blob_t identity; - bool active; - }; - - // Inbound pipes with the names of corresponging peers. - typedef std::vector <inpipe_t> inpipes_t; - inpipes_t inpipes; - - // The pipe we are currently reading from. - inpipes_t::size_type current_in; - - // Have we prefetched a message. - bool prefetched; - - // Holds the prefetched message. - msg_t prefetched_msg; - - // If true, more incoming message parts are expected. - bool more_in; - - struct outpipe_t - { - class pipe_t *pipe; - bool active; - }; - - // Outbound pipes indexed by the peer names. - typedef std::map <blob_t, outpipe_t> outpipes_t; - outpipes_t outpipes; - - // The pipe we are currently writing to. - class pipe_t *current_out; - - // If true, more outgoing message parts are expected. - bool more_out; - - router_t (const router_t&); - const router_t &operator = (const router_t&); - }; - -} - -#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6541661..98268de 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -59,8 +59,6 @@ #include "xrep.hpp" #include "xpub.hpp" #include "xsub.hpp" -#include "router.hpp" -#include "dealer.hpp" bool zmq::socket_base_t::check_tag () { @@ -106,12 +104,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_); break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_t (parent_, tid_); - break; - case ZMQ_DEALER: - s = new (std::nothrow) dealer_t (parent_, tid_); - break; default: errno = EINVAL; return NULL; |