diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/downstream.cpp | 34 | ||||
| -rw-r--r-- | src/downstream.hpp | 10 | ||||
| -rw-r--r-- | src/fq.cpp | 106 | ||||
| -rw-r--r-- | src/fq.hpp | 64 | ||||
| -rw-r--r-- | src/lb.cpp | 111 | ||||
| -rw-r--r-- | src/lb.hpp | 63 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 2 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 2 | ||||
| -rw-r--r-- | src/sub.cpp | 50 | ||||
| -rw-r--r-- | src/sub.hpp | 21 | ||||
| -rw-r--r-- | src/upstream.cpp | 58 | ||||
| -rw-r--r-- | src/upstream.hpp | 14 | ||||
| -rw-r--r-- | src/xrep.cpp | 17 | ||||
| -rw-r--r-- | src/xrep.hpp | 5 | ||||
| -rw-r--r-- | src/xreq.cpp | 28 | ||||
| -rw-r--r-- | src/xreq.hpp | 8 | ||||
| -rw-r--r-- | src/zmq_decoder.cpp | 40 | ||||
| -rw-r--r-- | src/zmq_decoder.hpp | 7 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 21 | ||||
| -rw-r--r-- | src/zmq_encoder.hpp | 4 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 4 | 
22 files changed, 476 insertions, 197 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index a733408..0fdaf37 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \      err.hpp \      fd.hpp \      fd_signaler.hpp \ +    fq.hpp \      i_inout.hpp \      io_object.hpp \      io_thread.hpp \ @@ -75,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \      i_poll_events.hpp \      i_signaler.hpp \      kqueue.hpp \ +    lb.hpp \      msg_content.hpp \      mutex.hpp \      object.hpp \ @@ -126,10 +128,12 @@ libzmq_la_SOURCES = app_thread.hpp \      epoll.cpp \      err.cpp \      fd_signaler.cpp \ +    fq.cpp \      io_object.cpp \      io_thread.cpp \      ip.cpp \      kqueue.cpp \ +    lb.cpp \      object.cpp \      options.cpp \      owned.cpp \ diff --git a/src/downstream.cpp b/src/downstream.cpp index 4f994e6..be1c4cc 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -24,8 +24,7 @@  #include "pipe.hpp"  zmq::downstream_t::downstream_t (class app_thread_t *parent_) : -    socket_base_t (parent_), -    current (0) +    socket_base_t (parent_)  {      options.requires_in = false;      options.requires_out = true; @@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  {      zmq_assert (!inpipe_ && outpipe_); -    pipes.push_back (outpipe_); +    lb.attach (outpipe_);  }  void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) @@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)  void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)  {      zmq_assert (pipe_); -    pipes.erase (pipes.index (pipe_)); +    lb.detach (pipe_);  }  void zmq::downstream_t::xkill (class reader_t *pipe_) @@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,  int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)  { -    //  If there are no pipes we cannot send the message. -    if (pipes.empty ()) { -        errno = EAGAIN; -        return -1; -    } - -    //  Move to the next pipe (load-balancing). -    current++; -    if (current >= pipes.size ()) -        current = 0; - -    //  TODO: Implement this once queue limits are in-place. -    zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); - -    //  Push message to the selected pipe. -    pipes [current]->write (msg_); -    pipes [current]->flush (); - -    //  Detach the message from the data buffer. -    int rc = zmq_msg_init (msg_); -    zmq_assert (rc == 0); - -    return 0; +    return lb.send (msg_, flags_);  }  int zmq::downstream_t::xflush () @@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in ()  bool zmq::downstream_t::xhas_out ()  { -    //  TODO: Modify this code once pipe limits are in place. -    return true; +    return lb.has_out ();  } diff --git a/src/downstream.hpp b/src/downstream.hpp index c6a7ed8..bf8cabb 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -21,7 +21,7 @@  #define __ZMQ_DOWNSTREAM_HPP_INCLUDED__  #include "socket_base.hpp" -#include "yarray.hpp" +#include "lb.hpp"  namespace zmq  { @@ -48,12 +48,8 @@ namespace zmq      private: -        //  List of outbound pipes. -        typedef yarray_t <class writer_t> pipes_t; -        pipes_t pipes; - -        //  Points to the last pipe that the most recent message was sent to. -        pipes_t::size_type current; +        //  Load balancer managing the outbound pipes. +        lb_t lb;          downstream_t (const downstream_t&);          void operator = (const downstream_t&); diff --git a/src/fq.cpp b/src/fq.cpp new file mode 100644 index 0000000..2c6fffb --- /dev/null +++ b/src/fq.cpp @@ -0,0 +1,106 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "fq.hpp" +#include "pipe.hpp" +#include "err.hpp" + +zmq::fq_t::fq_t () : +    active (0), +    current (0) +{ +} + +zmq::fq_t::~fq_t () +{ +    for (pipes_t::size_type i = 0; i != pipes.size (); i++) +        pipes [i]->term (); +} + +void zmq::fq_t::attach (reader_t *pipe_) +{ +    pipes.push_back (pipe_); +    pipes.swap (active, pipes.size () - 1); +    active++; +} + +void zmq::fq_t::detach (reader_t *pipe_) +{ +    //  Remove the pipe from the list; adjust number of active pipes +    //  accordingly. +    if (pipes.index (pipe_) < active) +        active--; +    pipes.erase (pipe_); +} + +void zmq::fq_t::kill (reader_t *pipe_) +{ +    //  Move the pipe to the list of inactive pipes. +    active--; +    pipes.swap (pipes.index (pipe_), active); +} + +void zmq::fq_t::revive (reader_t *pipe_) +{ +    //  Move the pipe to the list of active pipes. +    pipes.swap (pipes.index (pipe_), active); +    active++; +} + +int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) +{ +    //  Deallocate old content of the message. +    zmq_msg_close (msg_); + +    //  Round-robin over the pipes to get next message. +    for (int count = active; count != 0; count--) { +        bool fetched = pipes [current]->read (msg_); +        current++; +        if (current >= active) +            current = 0; +        if (fetched) +            return 0; +    } + +    //  No message is available. Initialise the output parameter +    //  to be a 0-byte message. +    zmq_msg_init (msg_); +    errno = EAGAIN; +    return -1; +} + +bool zmq::fq_t::has_in () +{ +    //  Note that messing with current doesn't break the fairness of fair +    //  queueing algorithm. If there are no messages available current will +    //  get back to its original value. Otherwise it'll point to the first +    //  pipe holding messages, skipping only pipes with no messages available. +    for (int count = active; count != 0; count--) { +        if (pipes [current]->check_read ()) +            return true; +        current++; +        if (current >= active) +            current = 0; +    } + +    return false; +} + diff --git a/src/fq.hpp b/src/fq.hpp new file mode 100644 index 0000000..a823808 --- /dev/null +++ b/src/fq.hpp @@ -0,0 +1,64 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_FQ_HPP_INCLUDED__ +#define __ZMQ_FQ_HPP_INCLUDED__ + +#include "yarray.hpp" + +namespace zmq +{ + +    //  Class manages a set of inbound pipes. On receive it performs fair +    //  queueing (RFC970) so that senders gone berserk won't cause denial of +    //  service for decent senders. +    class fq_t +    { +    public: + +        fq_t (); +        ~fq_t (); + +        void attach (class reader_t *pipe_); +        void detach (class reader_t *pipe_); +        void kill (class reader_t *pipe_); +        void revive (class reader_t *pipe_); +        int recv (zmq_msg_t *msg_, int flags_); +        bool has_in (); + +    private: + +        //  Inbound pipes. +        typedef yarray_t <class reader_t> pipes_t; +        pipes_t pipes; + +        //  Number of active pipes. All the active pipes are located at the +        //  beginning of the pipes array. +        pipes_t::size_type active; + +        //  Index of the next bound pipe to read a message from. +        pipes_t::size_type current; + +        fq_t (const fq_t&); +        void operator = (const fq_t&); +    }; + +} + +#endif diff --git a/src/lb.cpp b/src/lb.cpp new file mode 100644 index 0000000..4db8594 --- /dev/null +++ b/src/lb.cpp @@ -0,0 +1,111 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "lb.hpp" +#include "pipe.hpp" +#include "err.hpp" + +zmq::lb_t::lb_t () : +    active (0), +    current (0) +{ +} + +zmq::lb_t::~lb_t () +{ +    for (pipes_t::size_type i = 0; i != pipes.size (); i++) +        pipes [i]->term (); +} + +void zmq::lb_t::attach (writer_t *pipe_) +{ +    pipes.push_back (pipe_); +    pipes.swap (active, pipes.size () - 1); +    active++; +} + +void zmq::lb_t::detach (writer_t *pipe_) +{ +    //  Remove the pipe from the list; adjust number of active pipes +    //  accordingly. +    if (pipes.index (pipe_) < active) +        active--; +    pipes.erase (pipe_); +} + +void zmq::lb_t::kill (writer_t *pipe_) +{ +    //  Move the pipe to the list of inactive pipes. +    active--; +    pipes.swap (pipes.index (pipe_), active); +} + +void zmq::lb_t::revive (writer_t *pipe_) +{ +    //  Move the pipe to the list of active pipes. +    pipes.swap (pipes.index (pipe_), active); +    active++; +} + +int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) +{ +    //  If there are no pipes we cannot send the message. +    if (pipes.empty ()) { +        errno = EAGAIN; +        return -1; +    } + +    //  Move to the next pipe (load-balancing). +    current++; +    if (current >= active) +        current = 0; + +    //  TODO: Implement this once queue limits are in-place. +    zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + +    //  Push message to the selected pipe. +    pipes [current]->write (msg_); +    pipes [current]->flush (); + +    //  Detach the message from the data buffer. +    int rc = zmq_msg_init (msg_); +    zmq_assert (rc == 0); + +    return 0; +} + +bool zmq::lb_t::has_out () +{ +    for (int count = active; count != 0; count--) { + +        //  We should be able to write at least 1-byte message to interrupt +        //  polling for POLLOUT. +        //  TODO: Shouldn't we use a saner value here? +        if (pipes [current]->check_write (1)) +            return true; +        current++; +        if (current >= active) +            current = 0; +    } + +    return false; +} + diff --git a/src/lb.hpp b/src/lb.hpp new file mode 100644 index 0000000..21843c3 --- /dev/null +++ b/src/lb.hpp @@ -0,0 +1,63 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_LB_HPP_INCLUDED__ +#define __ZMQ_LB_HPP_INCLUDED__ + +#include "yarray.hpp" + +namespace zmq +{ + +    //  Class manages a set of outbound pipes. On send it load balances +    //  messages fairly among the pipes. +    class lb_t +    { +    public: + +        lb_t (); +        ~lb_t (); + +        void attach (class writer_t *pipe_); +        void detach (class writer_t *pipe_); +        void kill (class writer_t *pipe_); +        void revive (class writer_t *pipe_); +        int send (zmq_msg_t *msg_, int flags_); +        bool has_out (); + +    private: + +        //  List of outbound pipes. +        typedef yarray_t <class writer_t> pipes_t; +        pipes_t pipes; + +        //  Number of active pipes. All the active pipes are located at the +        //  beginning of the pipes array. +        pipes_t::size_type active; + +        //  Points to the last pipe that the most recent message was sent to. +        pipes_t::size_type current; + +        lb_t (const lb_t&); +        void operator = (const lb_t&); +    }; + +} + +#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index aaccd0a..e3f7996 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -171,7 +171,7 @@ void zmq::pgm_receiver_t::in_event ()              it->second.joined = true;              //  Create and connect decoder for joined peer. -            it->second.decoder = new zmq_decoder_t (0); +            it->second.decoder = new zmq_decoder_t (0, NULL, 0);              it->second.decoder->set_inout (inout);          } diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 69cb586..676ed93 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -35,7 +35,7 @@  zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,         const options_t &options_, const char *session_name_) :      io_object_t (parent_), -    encoder (0), +    encoder (0, false),      pgm_socket (false, options_),      options (options_),      session_name (session_name_), diff --git a/src/sub.cpp b/src/sub.cpp index a7f9783..e5dbe76 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,12 +21,9 @@  #include "sub.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::sub_t::sub_t (class app_thread_t *parent_) :      socket_base_t (parent_), -    active (0), -    current (0),      all_count (0)  {      options.requires_in = true; @@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :  zmq::sub_t::~sub_t ()  { -    for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) -        in_pipes [i]->term (); -    in_pipes.clear ();  }  void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  { -    zmq_assert (!outpipe_); -    in_pipes.push_back (inpipe_); -    in_pipes.swap (active, in_pipes.size () - 1); -    active++; +    zmq_assert (inpipe_ && !outpipe_); +    fq.attach (inpipe_);  }  void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)  { -    if (in_pipes.index (pipe_) < active) -        active--; -    in_pipes.erase (pipe_); +    zmq_assert (pipe_); +    fq.detach (pipe_);  }  void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)  { +    //  SUB socket is read-only thus there should be no outpipes.      zmq_assert (false);  }  void zmq::sub_t::xkill (class reader_t *pipe_)  { -    //  Move the pipe to the list of inactive pipes. -    in_pipes.swap (in_pipes.index (pipe_), active - 1); -    active--; +    fq.kill (pipe_);  }  void zmq::sub_t::xrevive (class reader_t *pipe_)  { -    //  Move the pipe to the list of active pipes. -    in_pipes.swap (in_pipes.index (pipe_), active); -    active++; +    fq.revive (pipe_);  }  int zmq::sub_t::xsetsockopt (int option_, const void *optval_, @@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)      while (true) {          //  Get a message using fair queueing algorithm. -        int rc = fq (msg_, flags_); +        int rc = fq.recv (msg_, flags_);          //  If there's no message available, return immediately.          if (rc != 0 && errno == EAGAIN) @@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)      }  } -int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) -{ -    //  Deallocate old content of the message. -    zmq_msg_close (msg_); - -    //  Round-robin over the pipes to get next message. -    for (int count = active; count != 0; count--) { -        bool fetched = in_pipes [current]->read (msg_); -        current++; -        if (current >= active) -            current = 0; -        if (fetched) -            return 0; -    } - -    //  No message is available. Initialise the output parameter -    //  to be a 0-byte message. -    zmq_msg_init (msg_); -    errno = EAGAIN; -    return -1; -} -  bool zmq::sub_t::xhas_in ()  {      //  TODO:  This is more complex as we have to ignore all the messages that diff --git a/src/sub.hpp b/src/sub.hpp index 8ad8a18..1eafdac 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,7 +24,7 @@  #include <string>  #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp"  namespace zmq  { @@ -53,26 +53,15 @@ namespace zmq      private: -        //  Helper function to return one message choosed using -        //  fair queueing algorithm. -        int fq (zmq_msg_t *msg_, int flags_); - -        //  Inbound pipes, i.e. those the socket is getting messages from. -        typedef yarray_t <class reader_t> in_pipes_t; -        in_pipes_t in_pipes; - -        //  Number of active inbound pipes. Active pipes are stored in the -        //  initial section of the in_pipes array. -        in_pipes_t::size_type active; - -        //  Index of the next inbound pipe to read messages from. -        in_pipes_t::size_type current; +        //  Fair queueing object for inbound pipes. +         fq_t fq;          //  Number of active "*" subscriptions.          int all_count; -        //  List of all prefix subscriptions.          typedef std::multiset <std::string> subscriptions_t; + +        //  List of all prefix subscriptions.          subscriptions_t prefixes;          //  List of all exact match subscriptions. diff --git a/src/upstream.cpp b/src/upstream.cpp index da202f8..32de63a 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -21,12 +21,9 @@  #include "upstream.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::upstream_t::upstream_t (class app_thread_t *parent_) : -    socket_base_t (parent_), -    active (0), -    current (0) +    socket_base_t (parent_)  {      options.requires_in = true;      options.requires_out = false; @@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  {      zmq_assert (inpipe_ && !outpipe_); - -    pipes.push_back (inpipe_); -    pipes.swap (active, pipes.size () - 1); -    active++; +    fq.attach (inpipe_);  }  void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)  { -    //  Remove the pipe from the list; adjust number of active pipes -    //  accordingly.      zmq_assert (pipe_); -    pipes_t::size_type index = pipes.index (pipe_); -    if (index < active) -        active--; -    pipes.erase (index); +    fq.detach (pipe_);  }  void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) @@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)  void zmq::upstream_t::xkill (class reader_t *pipe_)  { -    //  Move the pipe to the list of inactive pipes. -    active--; -    pipes.swap (pipes.index (pipe_), active); +    fq.kill (pipe_);  }  void zmq::upstream_t::xrevive (class reader_t *pipe_)  { -    //  Move the pipe to the list of active pipes. -    pipes.swap (pipes.index (pipe_), active); -    active++; +    fq.revive (pipe_);  }  int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, @@ -99,41 +84,12 @@ int zmq::upstream_t::xflush ()  int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    //  Deallocate old content of the message. -    zmq_msg_close (msg_); - -    //  Round-robin over the pipes to get next message. -    for (int count = active; count != 0; count--) { -        bool fetched = pipes [current]->read (msg_); -        current++; -        if (current >= active) -            current = 0; -        if (fetched) -            return 0; -    } - -    //  No message is available. Initialise the output parameter -    //  to be a 0-byte message. -    zmq_msg_init (msg_); -    errno = EAGAIN; -    return -1; +    return fq.recv (msg_, flags_);  }  bool zmq::upstream_t::xhas_in ()  { -    //  Note that messing with current doesn't break the fairness of fair -    //  queueing algorithm. If there are no messages available current will -    //  get back to its original value. Otherwise it'll point to the first -    //  pipe holding messages, skipping only pipes with no messages available. -    for (int count = active; count != 0; count--) { -        if (pipes [current]->check_read ()) -            return true; -        current++; -        if (current >= active) -            current = 0; -    } - -    return false; +    return fq.has_in ();  }  bool zmq::upstream_t::xhas_out () diff --git a/src/upstream.hpp b/src/upstream.hpp index 0e2f5ad..3c82cdb 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -21,7 +21,7 @@  #define __ZMQ_UPSTREAM_HPP_INCLUDED__  #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp"  namespace zmq  { @@ -48,16 +48,8 @@ namespace zmq      private: -        //  Inbound pipes. -        typedef yarray_t <class reader_t> pipes_t; -        pipes_t pipes; - -        //  Number of active pipes. All the active pipes are located at the -        //  beginning of the pipes array. -        pipes_t::size_type active; - -        //  Index of the next bound pipe to read a message from. -        pipes_t::size_type current; +        //  Fair queueing object for inbound pipes. +        fq_t fq;          upstream_t (const upstream_t&);          void operator = (const upstream_t&); diff --git a/src/xrep.cpp b/src/xrep.cpp index 1b6a536..4fa250b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -21,7 +21,6 @@  #include "xrep.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::xrep_t::xrep_t (class app_thread_t *parent_) :      socket_base_t (parent_) @@ -37,12 +36,16 @@ zmq::xrep_t::~xrep_t ()  void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  { +    zmq_assert (inpipe_ && outpipe_); +    fq.attach (inpipe_); +      zmq_assert (false);  }  void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (pipe_); +    fq.detach (pipe_);  }  void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) @@ -52,12 +55,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)  void zmq::xrep_t::xkill (class reader_t *pipe_)  { -    zmq_assert (false); +    fq.kill (pipe_);  }  void zmq::xrep_t::xrevive (class reader_t *pipe_)  { -    zmq_assert (false); +    fq.revive (pipe_);  }  int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, @@ -81,14 +84,12 @@ int zmq::xrep_t::xflush ()  int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); -    return -1; +    return fq.recv (msg_, flags_);  }  bool zmq::xrep_t::xhas_in ()  { -    zmq_assert (false); -    return false; +    return fq.has_in ();  }  bool zmq::xrep_t::xhas_out () diff --git a/src/xrep.hpp b/src/xrep.hpp index de42036..66cb611 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -21,7 +21,7 @@  #define __ZMQ_XREP_HPP_INCLUDED__  #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp"  namespace zmq  { @@ -48,6 +48,9 @@ namespace zmq      private: +        //  Inbound messages are fair-queued. +        fq_t fq; +          xrep_t (const xrep_t&);          void operator = (const xrep_t&);      }; diff --git a/src/xreq.cpp b/src/xreq.cpp index d359dc0..9b95393 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -21,7 +21,6 @@  #include "xreq.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::xreq_t::xreq_t (class app_thread_t *parent_) :      socket_base_t (parent_) @@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t ()  void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_)  { -    zmq_assert (false); +    zmq_assert (inpipe_ && outpipe_); +    fq.attach (inpipe_); +    lb.attach (outpipe_);  }  void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (pipe_); +    fq.detach (pipe_);  }  void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)  { -    zmq_assert (false); +    zmq_assert (pipe_); +    lb.detach (pipe_);  }  void zmq::xreq_t::xkill (class reader_t *pipe_)  { -    zmq_assert (false); +    fq.kill (pipe_);  }  void zmq::xreq_t::xrevive (class reader_t *pipe_)  { -    zmq_assert (false); +    fq.revive (pipe_);  }  int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, @@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,  int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); -    return -1; +    return lb.send (msg_, flags_);  }  int zmq::xreq_t::xflush ()  { +    //  TODO: Implement flushing.      zmq_assert (false);      return -1;  }  int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); -    return -1; +    return fq.recv (msg_, flags_);  }  bool zmq::xreq_t::xhas_in ()  { -    zmq_assert (false); -    return false; +    return fq.has_in ();  }  bool zmq::xreq_t::xhas_out ()  { -    zmq_assert (false); -    return false; +    return lb.has_out ();  } diff --git a/src/xreq.hpp b/src/xreq.hpp index 8d6a3b2..fdf8b0f 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -21,7 +21,8 @@  #define __ZMQ_XREQ_HPP_INCLUDED__  #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp" +#include "lb.hpp"  namespace zmq  { @@ -48,6 +49,11 @@ namespace zmq      private: +        //  Messages are fair-queued from inbound pipes. And load-balanced to +        //  the outbound pipes. +        fq_t fq; +        lb_t lb; +          xreq_t (const xreq_t&);          void operator = (const xreq_t&);      }; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f488272..b9617fc 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -17,23 +17,41 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <stdlib.h> +#include <string.h> +  #include "zmq_decoder.hpp"  #include "i_inout.hpp"  #include "wire.hpp"  #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : +zmq::zmq_decode | 
