diff options
| -rw-r--r-- | include/zmq.h | 2 | ||||
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/dealer.cpp | 77 | ||||
| -rw-r--r-- | src/dealer.hpp | 65 | ||||
| -rw-r--r-- | src/router.cpp | 292 | ||||
| -rw-r--r-- | src/router.hpp | 105 | ||||
| -rw-r--r-- | src/socket_base.cpp | 8 | 
7 files changed, 0 insertions, 553 deletions
diff --git a/include/zmq.h b/include/zmq.h index fa319f5..450bed9 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -158,8 +158,6 @@ ZMQ_EXPORT int zmq_term (void *context);  #define ZMQ_PUSH 8  #define ZMQ_XPUB 9  #define ZMQ_XSUB 10 -#define ZMQ_ROUTER 11 -#define ZMQ_DEALER 12  /*  Socket options.                                                           */  #define ZMQ_AFFINITY 4 diff --git a/src/Makefile.am b/src/Makefile.am index 9c2fbf1..e78eb1c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,7 +15,6 @@ libzmq_la_SOURCES = \      config.hpp \      connect_session.hpp \      ctx.hpp \ -    dealer.hpp \      decoder.hpp \      devpoll.hpp \      dist.hpp \ @@ -56,7 +55,6 @@ libzmq_la_SOURCES = \      reaper.hpp \      rep.hpp \      req.hpp \ -    router.hpp \      select.hpp \      semaphore.hpp \      session.hpp \ @@ -87,7 +85,6 @@ libzmq_la_SOURCES = \      command.cpp \      ctx.cpp \      connect_session.cpp \ -    dealer.cpp \      decoder.cpp \      devpoll.cpp \      dist.cpp \ @@ -121,7 +118,6 @@ libzmq_la_SOURCES = \      random.cpp \      rep.cpp \      req.cpp \ -    router.cpp \      select.cpp \      session.cpp \      signaler.cpp \ diff --git a/src/dealer.cpp b/src/dealer.cpp deleted file mode 100644 index e1e141f..0000000 --- a/src/dealer.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "dealer.hpp" -#include "err.hpp" -#include "msg.hpp" - -zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_) : -    socket_base_t (parent_, tid_) -{ -    options.type = ZMQ_XREQ; -} - -zmq::dealer_t::~dealer_t () -{ -} - -void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ -    zmq_assert (pipe_); -    fq.attach (pipe_); -    lb.attach (pipe_); -} - -int zmq::dealer_t::xsend (msg_t *msg_, int flags_) -{ -    return lb.send (msg_, flags_); -} - -int zmq::dealer_t::xrecv (msg_t *msg_, int flags_) -{ -    return fq.recv (msg_, flags_); -} - -bool zmq::dealer_t::xhas_in () -{ -    return fq.has_in (); -} - -bool zmq::dealer_t::xhas_out () -{ -    return lb.has_out (); -} - -void zmq::dealer_t::xread_activated (pipe_t *pipe_) -{ -    fq.activated (pipe_); -} - -void zmq::dealer_t::xwrite_activated (pipe_t *pipe_) -{ -    lb.activated (pipe_); -} - -void zmq::dealer_t::xterminated (pipe_t *pipe_) -{ -    fq.terminated (pipe_); -    lb.terminated (pipe_); -} - diff --git a/src/dealer.hpp b/src/dealer.hpp deleted file mode 100644 index 4f85c46..0000000 --- a/src/dealer.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_DEALER_HPP_INCLUDED__ -#define __ZMQ_DEALER_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "fq.hpp" -#include "lb.hpp" - -namespace zmq -{ - -    class dealer_t : -        public socket_base_t -    { -    public: - -        dealer_t (class ctx_t *parent_, uint32_t tid_); -        ~dealer_t (); - -    protected: - -        //  Overloads of functions from socket_base_t. -        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); -        int xsend (class msg_t *msg_, int flags_); -        int xrecv (class msg_t *msg_, int flags_); -        bool xhas_in (); -        bool xhas_out (); -        void xread_activated (class pipe_t *pipe_); -        void xwrite_activated (class pipe_t *pipe_); -        void xterminated (class pipe_t *pipe_); - -    private: - -        //  Messages are fair-queued from inbound pipes. And load-balanced to -        //  the outbound pipes. -        fq_t fq; -        lb_t lb; - -        dealer_t (const dealer_t&); -        const dealer_t &operator = (const dealer_t&); -    }; - -} - -#endif diff --git a/src/router.cpp b/src/router.cpp deleted file mode 100644 index 0d1b77e..0000000 --- a/src/router.cpp +++ /dev/null @@ -1,292 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "router.hpp" -#include "pipe.hpp" -#include "err.hpp" - -zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) : -    socket_base_t (parent_, tid_), -    current_in (0), -    prefetched (false), -    more_in (false), -    current_out (NULL), -    more_out (false) -{ -    options.type = ZMQ_XREP; - -    //  On connect, pipes are created only after initial handshaking. -    //  That way we are aware of the peer's identity when binding to the pipes. -    options.immediate_connect = false; -} - -zmq::router_t::~router_t () -{ -    zmq_assert (inpipes.empty ()); -    zmq_assert (outpipes.empty ()); -} - -void zmq::router_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) -{ -    zmq_assert (pipe_); - -    //  Add the pipe to the map out outbound pipes. -    //  TODO: What if new connection has same peer identity as the old one? -    outpipe_t outpipe = {pipe_, true}; -    bool ok = outpipes.insert (outpipes_t::value_type ( -        peer_identity_, outpipe)).second; -    zmq_assert (ok); - -    //  Add the pipe to the list of inbound pipes. -    inpipe_t inpipe = {pipe_, peer_identity_, true}; -    inpipes.push_back (inpipe); -} - -void zmq::router_t::xterminated (pipe_t *pipe_) -{ -    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); -          ++it) { -        if (it->pipe == pipe_) { -            if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) -                current_in--; -            inpipes.erase (it); -            if (current_in >= inpipes.size ()) -                current_in = 0; -            goto clean_outpipes; -        } -    } -    zmq_assert (false); - -clean_outpipes: -    for (outpipes_t::iterator it = outpipes.begin (); -          it != outpipes.end (); ++it) { -        if (it->second.pipe == pipe_) { -            outpipes.erase (it); -            if (pipe_ == current_out) -                current_out = NULL; -            return; -        } -    } -    zmq_assert (false); -} - -void zmq::router_t::xread_activated (pipe_t *pipe_) -{ -    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); -          ++it) { -        if (it->pipe == pipe_) { -            zmq_assert (!it->active); -            it->active = true; -            return; -        } -    } -    zmq_assert (false); -} - -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ -    for (outpipes_t::iterator it = outpipes.begin (); -          it != outpipes.end (); ++it) { -        if (it->second.pipe == pipe_) { -            zmq_assert (!it->second.active); -            it->second.active = true; -            return; -        } -    } -    zmq_assert (false); -} - -int zmq::router_t::xsend (msg_t *msg_, int flags_) -{ -    //  If this is the first part of the message it's the identity of the -    //  peer to send the message to. -    if (!more_out) { -        zmq_assert (!current_out); - -        //  If we have malformed message (prefix with no subsequent message) -        //  then just silently ignore it. -        //  TODO: The connections should be killed instead. -        if (msg_->flags () & msg_t::label) { - -            more_out = true; - -            //  Find the pipe associated with the identity stored in the prefix. -            //  If there's no such pipe just silently ignore the message. -            blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); -            outpipes_t::iterator it = outpipes.find (identity); - -            if (it != outpipes.end ()) { -                current_out = it->second.pipe; -                msg_t empty; -                int rc = empty.init (); -                errno_assert (rc == 0); -                if (!current_out->check_write (&empty)) { -                    it->second.active = false; -                    more_out = false; -                    current_out = NULL; -                    rc = empty.close (); -                    errno_assert (rc == 0); -                    errno = EAGAIN; -                    return -1; -                } -                rc = empty.close (); -                errno_assert (rc == 0); -            } -        } - -        int rc = msg_->close (); -        errno_assert (rc == 0); -        rc = msg_->init (); -        errno_assert (rc == 0); -        return 0; -    } - -    //  Check whether this is the last part of the message. -    more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - -    //  Push the message into the pipe. If there's no out pipe, just drop it. -    if (current_out) { -        bool ok = current_out->write (msg_); -        zmq_assert (ok); -        if (!more_out) { -            current_out->flush (); -            current_out = NULL; -        } -    } -    else { -        int rc = msg_->close (); -        errno_assert (rc == 0); -    } - -    //  Detach the message from the data buffer. -    int rc = msg_->init (); -    errno_assert (rc == 0); - -    return 0; -} - -int zmq::router_t::xrecv (msg_t *msg_, int flags_) -{ -    //  If there is a prefetched message, return it. -    if (prefetched) { -        int rc = msg_->move (prefetched_msg); -        errno_assert (rc == 0); -        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; -        prefetched = false; -        return 0; -    } - -    //  Deallocate old content of the message. -    int rc = msg_->close (); -    errno_assert (rc == 0); - -    //  If we are in the middle of reading a message, just grab next part of it. -    if (more_in) { -        zmq_assert (inpipes [current_in].active); -        bool fetched = inpipes [current_in].pipe->read (msg_); -        zmq_assert (fetched); -        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; -        if (!more_in) { -            current_in++; -            if (current_in >= inpipes.size ()) -                current_in = 0; -        } -        return 0; -    } - -    //  Round-robin over the pipes to get the next message. -    for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { - -        //  Try to fetch new message. -        if (inpipes [current_in].active) -            prefetched = inpipes [current_in].pipe->read (&prefetched_msg); - -        //  If we have a message, create a prefix and return it to the caller. -        if (prefetched) { -            int rc = msg_->init_size (inpipes [current_in].identity.size ()); -            errno_assert (rc == 0); -            memcpy (msg_->data (), inpipes [current_in].identity.data (), -                msg_->size ()); -            msg_->set_flags (msg_t::label); -            return 0; -        } - -        //  If me don't have a message, mark the pipe as passive and -        //  move to next pipe. -        inpipes [current_in].active = false; -        current_in++; -        if (current_in >= inpipes.size ()) -            current_in = 0; -    } - -    //  No message is available. Initialise the output parameter -    //  to be a 0-byte message. -    rc = msg_->init (); -    errno_assert (rc == 0); -    errno = EAGAIN; -    return -1; -} - -int zmq::router_t::rollback (void) -{ -    if (current_out) { -        current_out->rollback (); -        current_out = NULL; -        more_out = false; -    } -    return 0; -} - -bool zmq::router_t::xhas_in () -{ -    //  There are subsequent parts of the partly-read message available. -    if (prefetched || more_in) -        return true; - -    //  Note that messing with current doesn't break the fairness of fair -    //  queueing algorithm. If there are no messages available current will -    //  get back to its original value. Otherwise it'll point to the first -    //  pipe holding messages, skipping only pipes with no messages available. -    for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { -        if (inpipes [current_in].active && -              inpipes [current_in].pipe->check_read ()) -            return true; - -        //  If me don't have a message, mark the pipe as passive and -        //  move to next pipe. -        inpipes [current_in].active = false; -        current_in++; -        if (current_in >= inpipes.size ()) -            current_in = 0; -    } - -    return false; -} - -bool zmq::router_t::xhas_out () -{ -    //  In theory, XREP socket is always ready for writing. Whether actual -    //  attempt to write succeeds depends on whitch pipe the message is going -    //  to be routed to. -    return true; -} - - - diff --git a/src/router.hpp b/src/router.hpp deleted file mode 100644 index 9655bba..0000000 --- a/src/router.hpp +++ /dev/null @@ -1,105 +0,0 @@ -/* -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ROUTER_HPP_INCLUDED__ -#define __ZMQ_ROUTER_HPP_INCLUDED__ - -#include <map> -#include <vector> - -#include "socket_base.hpp" -#include "blob.hpp" -#include "msg.hpp" - -namespace zmq -{ - -    //  TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. -    class router_t : -        public socket_base_t -    { -    public: - -        router_t (class ctx_t *parent_, uint32_t tid_); -        ~router_t (); - -        //  Overloads of functions from socket_base_t. -        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); -        int xsend (class msg_t *msg_, int flags_); -        int xrecv (class msg_t *msg_, int flags_); -        bool xhas_in (); -        bool xhas_out (); -        void xread_activated (class pipe_t *pipe_); -        void xwrite_activated (class pipe_t *pipe_); -        void xterminated (class pipe_t *pipe_); - -    protected: - -        //  Rollback any message parts that were sent but not yet flushed. -        int rollback (); - -    private: - -        struct inpipe_t -        { -            class pipe_t *pipe; -            blob_t identity; -            bool active; -        }; - -        //  Inbound pipes with the names of corresponging peers. -        typedef std::vector <inpipe_t> inpipes_t; -        inpipes_t inpipes; - -        //  The pipe we are currently reading from. -        inpipes_t::size_type current_in; - -        //  Have we prefetched a message. -        bool prefetched; - -        //  Holds the prefetched message. -        msg_t prefetched_msg; - -        //  If true, more incoming message parts are expected. -        bool more_in; - -        struct outpipe_t -        { -            class pipe_t *pipe; -            bool active; -        }; - -        //  Outbound pipes indexed by the peer names. -        typedef std::map <blob_t, outpipe_t> outpipes_t; -        outpipes_t outpipes; - -        //  The pipe we are currently writing to. -        class pipe_t *current_out; - -        //  If true, more outgoing message parts are expected. -        bool more_out; - -        router_t (const router_t&); -        const router_t &operator = (const router_t&); -    }; - -} - -#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6541661..98268de 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -59,8 +59,6 @@  #include "xrep.hpp"  #include "xpub.hpp"  #include "xsub.hpp" -#include "router.hpp" -#include "dealer.hpp"  bool zmq::socket_base_t::check_tag ()  { @@ -106,12 +104,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,      case ZMQ_XSUB:          s = new (std::nothrow) xsub_t (parent_, tid_);          break; -    case ZMQ_ROUTER: -        s = new (std::nothrow) router_t (parent_, tid_); -        break; -    case ZMQ_DEALER: -        s = new (std::nothrow) dealer_t (parent_, tid_); -        break;      default:          errno = EINVAL;          return NULL;  | 
