diff options
| -rw-r--r-- | include/zmq.h | 1 | ||||
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rwxr-xr-x | src/router.cpp | 285 | ||||
| -rwxr-xr-x | src/router.hpp | 123 | ||||
| -rw-r--r-- | src/session_base.cpp | 5 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | 
6 files changed, 0 insertions, 420 deletions
| diff --git a/include/zmq.h b/include/zmq.h index e236b2a..a262d01 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -162,7 +162,6 @@ ZMQ_EXPORT int zmq_term (void *context);  #define ZMQ_PUSH 8  #define ZMQ_XPUB 9  #define ZMQ_XSUB 10 -#define ZMQ_ROUTER 13  /*  Socket options.                                                           */  #define ZMQ_AFFINITY 4 diff --git a/src/Makefile.am b/src/Makefile.am index 3b7dec6..137ba73 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,7 +55,6 @@ libzmq_la_SOURCES = \      reaper.hpp \      rep.hpp \      req.hpp \ -    router.hpp \      select.hpp \      session_base.hpp \      signaler.hpp \ @@ -113,7 +112,6 @@ libzmq_la_SOURCES = \      reaper.cpp \      pub.cpp \      random.cpp \ -    router.cpp \      rep.cpp \      req.cpp \      select.cpp \ diff --git a/src/router.cpp b/src/router.cpp deleted file mode 100755 index b7e19fb..0000000 --- a/src/router.cpp +++ /dev/null @@ -1,285 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "router.hpp" -#include "pipe.hpp" -#include "wire.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "wire.hpp" -#include "err.hpp" - -zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) : -    socket_base_t (parent_, tid_), -    prefetched (false), -    more_in (false), -    current_out (NULL), -    more_out (false), -    next_peer_id (generate_random ()) -{ -    options.type = ZMQ_ROUTER; - -    prefetched_msg.init (); -} - -zmq::router_t::~router_t () -{ -    zmq_assert (outpipes.empty ()); -    prefetched_msg.close (); -} - -void zmq::router_t::xattach_pipe (pipe_t *pipe_) -{ -    zmq_assert (pipe_); - -    //  Generate a new peer ID. Take care to avoid duplicates. -    outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); -    if (!outpipes.empty ()) { -        while (true) { -            if (it == outpipes.end ()) -                it = outpipes.begin (); -            if (it->first != next_peer_id) -                break; -            ++next_peer_id; -            ++it; -        } -    } - -    //  Add the pipe to the map out outbound pipes. -    outpipe_t outpipe = {pipe_, true}; -    bool ok = outpipes.insert (outpipes_t::value_type ( -        next_peer_id, outpipe)).second; -    zmq_assert (ok); - -    //  Add the pipe to the list of inbound pipes. -    pipe_->set_pipe_id (next_peer_id); -    fq.attach (pipe_); - -    //  Queue the connection command. -    pending_command_t cmd = {1, next_peer_id}; -    pending_commands.push_back (cmd); - -    //  Advance next peer ID so that if new connection is dropped shortly after -    //  its creation we don't accidentally get two subsequent peers with -    //  the same ID. -    ++next_peer_id; -} - -void zmq::router_t::xterminated (pipe_t *pipe_) -{ -    fq.terminated (pipe_); - -    for (outpipes_t::iterator it = outpipes.begin (); -          it != outpipes.end (); ++it) { -        if (it->second.pipe == pipe_) { - -            //  Queue the disconnection command. -            pending_command_t cmd = {2, it->first}; -            pending_commands.push_back (cmd); - -            //  Remove the pipe. -            outpipes.erase (it); -            if (pipe_ == current_out) -                current_out = NULL; -            return; -        } -    } -    zmq_assert (false); -} - -void zmq::router_t::xread_activated (pipe_t *pipe_) -{ -    fq.activated (pipe_); -} - -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ -    for (outpipes_t::iterator it = outpipes.begin (); -          it != outpipes.end (); ++it) { -        if (it->second.pipe == pipe_) { -            zmq_assert (!it->second.active); -            it->second.active = true; -            return; -        } -    } -    zmq_assert (false); -} - -int zmq::router_t::xsend (msg_t *msg_, int flags_) -{ -    //  If this is the first part of the message it's the ID of the -    //  peer to send the message to. -    if (!more_out) { -        zmq_assert (!current_out); - -        //  The first message part has to be label. -        if (unlikely (!(msg_->flags () & msg_t::label))) { -            errno = EFSM; -            return -1; -        } - -        //  Find the pipe associated with the peer ID stored in the message. -        if (unlikely (msg_->size () != 4)) { -            errno = ECANTROUTE; -            return -1; -        } -        uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); -        outpipes_t::iterator it = outpipes.find (peer_id); -        if (unlikely (it == outpipes.end ())) { -            errno = ECANTROUTE; -            return -1; -        } - -        //  Check whether the pipe is available for writing. -        msg_t empty; -        int rc = empty.init (); -        errno_assert (rc == 0); -        if (!it->second.pipe->check_write (&empty)) { -            rc = empty.close (); -            errno_assert (rc == 0); -            it->second.active = false; -            errno = EAGAIN; -            return -1; -        } -        rc = empty.close (); -        errno_assert (rc == 0); - -        //  Mark the pipe to send the message to. -        current_out = it->second.pipe; -        more_out = true; -         -        //  Clean up the message object. -        rc = msg_->close (); -        errno_assert (rc == 0); -        rc = msg_->init (); -        errno_assert (rc == 0); -        return 0; -    } - -    //  Check whether this is the last part of the message. -    more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - -    //  Push the message into the pipe. If there's no out pipe, just drop it. -    if (current_out) { -        bool ok = current_out->write (msg_); -        if (unlikely (!ok)) -            current_out = NULL; -        else if (!more_out) { -            current_out->flush (); -            current_out = NULL; -        } -    } -    else { -        int rc = msg_->close (); -        errno_assert (rc == 0); -    } - -    //  Detach the message from the data buffer. -    int rc = msg_->init (); -    errno_assert (rc == 0); - -    return 0; -} - -int zmq::router_t::xrecv (msg_t *msg_, int flags_) -{ -    //  If there's a queued command, pass it to the caller. -    if (unlikely (!more_in && !pending_commands.empty ())) { -        msg_->init_size (5); -        unsigned char *data = (unsigned char*) msg_->data (); -        put_uint8 (data, pending_commands.front ().cmd); -        put_uint32 (data + 1, pending_commands.front ().peer); -        msg_->set_flags (msg_t::command); -        pending_commands.pop_front (); -        return 0; -    } - -    //  If there is a prefetched message, return it. -    if (prefetched) { -        int rc = msg_->move (prefetched_msg); -        errno_assert (rc == 0); -        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; -        prefetched = false; -        return 0; -    } - -    //  Get next message part. -    pipe_t *pipe; -    int rc = fq.recvpipe (msg_, flags_, &pipe); -    if (rc != 0) -        return -1; - -    //  If we are in the middle of reading a message, just return the next part. -    if (more_in) { -        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; -        return 0; -    } -  -    //  We are at the beginning of a new message. Move the message part we -    //  have to the prefetched and return the ID of the peer instead. -    rc = prefetched_msg.move (*msg_); -    errno_assert (rc == 0); -    prefetched = true; -    rc = msg_->close (); -    errno_assert (rc == 0); -    rc = msg_->init_size (4); -    errno_assert (rc == 0); -    put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); -    msg_->set_flags (msg_t::label); -    return 0; -} - -int zmq::router_t::rollback (void) -{ -    if (current_out) { -        current_out->rollback (); -        current_out = NULL; -        more_out = false; -    } -    return 0; -} - -bool zmq::router_t::xhas_in () -{ -    if (prefetched) -        return true; -    return fq.has_in () || !pending_commands.empty(); -} - -bool zmq::router_t::xhas_out () -{ -    //  In theory, GENERIC socket is always ready for writing. Whether actual -    //  attempt to write succeeds depends on whitch pipe the message is going -    //  to be routed to. -    return true; -} - -zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, -      socket_base_t *socket_, const options_t &options_, -      const char *protocol_, const char *address_) : -    session_base_t (io_thread_, connect_, socket_, options_, protocol_, -        address_) -{ -} - -zmq::router_session_t::~router_session_t () -{ -} - diff --git a/src/router.hpp b/src/router.hpp deleted file mode 100755 index 9a5c0f9..0000000 --- a/src/router.hpp +++ /dev/null @@ -1,123 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ROUTER_HPP_INCLUDED__ -#define __ZMQ_ROUTER_HPP_INCLUDED__ - -#include <map> -#include <deque> - -#include "socket_base.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "msg.hpp" -#include "fq.hpp" - -namespace zmq -{ - -    class router_t : -        public socket_base_t -    { -    public: - -        router_t (class ctx_t *parent_, uint32_t tid_); -        ~router_t (); - -        //  Overloads of functions from socket_base_t. -        void xattach_pipe (class pipe_t *pipe_); -        int xsend (class msg_t *msg_, int flags_); -        int xrecv (class msg_t *msg_, int flags_); -        bool xhas_in (); -        bool xhas_out (); -        void xread_activated (class pipe_t *pipe_); -        void xwrite_activated (class pipe_t *pipe_); -        void xterminated (class pipe_t *pipe_); - -    protected: - -        //  Rollback any message parts that were sent but not yet flushed. -        int rollback (); - -    private: - -        //  Fair queueing object for inbound pipes. -        fq_t fq; - -        //  Have we prefetched a message. -        bool prefetched; - -        //  Holds the prefetched message. -        msg_t prefetched_msg; - -        //  If true, more incoming message parts are expected. -        bool more_in; - -        struct outpipe_t -        { -            class pipe_t *pipe; -            bool active; -        }; - -        //  Outbound pipes indexed by the peer IDs. -        typedef std::map <uint32_t, outpipe_t> outpipes_t; -        outpipes_t outpipes; - -        //  The pipe we are currently writing to. -        class pipe_t *current_out; - -        //  If true, more outgoing message parts are expected. -        bool more_out; - -        //  Peer ID are generated. It's a simple increment and wrap-over -        //  algorithm. This value is the next ID to use (if not used already). -        uint32_t next_peer_id; - -        //  Commands to be delivered to the user. -        struct pending_command_t -        { -            uint8_t cmd; -            uint32_t peer; -        }; -        typedef std::deque <pending_command_t> pending_commands_t; -        pending_commands_t pending_commands; - -        router_t (const router_t&); -        const router_t &operator = (const router_t&); -    }; - -    class router_session_t : public session_base_t -    { -    public: - -        router_session_t (class io_thread_t *io_thread_, bool connect_, -            class socket_base_t *socket_, const options_t &options_, -            const char *protocol_, const char *address_); -        ~router_session_t (); - -    private: - -        router_session_t (const router_session_t&); -        const router_session_t &operator = (const router_session_t&); -    }; - -} - -#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 32dcd4f..35c0b46 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -40,7 +40,6 @@  #include "xsub.hpp"  #include "push.hpp"  #include "pull.hpp" -#include "router.hpp"  #include "pair.hpp"  zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, @@ -88,10 +87,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,          s = new (std::nothrow) pull_session_t (io_thread_, connect_,              socket_, options_, protocol_, address_);          break; -    case ZMQ_ROUTER: -        s = new (std::nothrow) router_session_t (io_thread_, connect_, -            socket_, options_, protocol_, address_); -        break;      case ZMQ_PAIR:          s = new (std::nothrow) pair_session_t (io_thread_, connect_,              socket_, options_, protocol_, address_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a4d89db..0a5f732 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -60,7 +60,6 @@  #include "xrep.hpp"  #include "xpub.hpp"  #include "xsub.hpp" -#include "router.hpp"  bool zmq::socket_base_t::check_tag ()  { @@ -106,9 +105,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,      case ZMQ_XSUB:          s = new (std::nothrow) xsub_t (parent_, tid_);          break; -    case ZMQ_ROUTER: -        s = new (std::nothrow) router_t (parent_, tid_); -        break;      default:          errno = EINVAL;          return NULL; | 
