From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/Makefile.am | 4 ++ src/downstream.cpp | 34 +++------------- src/downstream.hpp | 10 ++--- src/fq.cpp | 106 ++++++++++++++++++++++++++++++++++++++++++++++++ src/fq.hpp | 64 +++++++++++++++++++++++++++++ src/lb.cpp | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lb.hpp | 63 +++++++++++++++++++++++++++++ src/pgm_receiver.cpp | 2 +- src/pgm_sender.cpp | 2 +- src/sub.cpp | 50 ++++------------------- src/sub.hpp | 21 +++------- src/upstream.cpp | 58 ++++----------------------- src/upstream.hpp | 14 ++----- src/xrep.cpp | 17 ++++---- src/xrep.hpp | 5 ++- src/xreq.cpp | 28 ++++++------- src/xreq.hpp | 8 +++- src/zmq_decoder.cpp | 40 +++++++++++++++---- src/zmq_decoder.hpp | 7 +++- src/zmq_encoder.cpp | 21 ++++++++-- src/zmq_encoder.hpp | 4 +- src/zmq_engine.cpp | 4 +- 22 files changed, 476 insertions(+), 197 deletions(-) create mode 100644 src/fq.cpp create mode 100644 src/fq.hpp create mode 100644 src/lb.cpp create mode 100644 src/lb.hpp diff --git a/src/Makefile.am b/src/Makefile.am index a733408..0fdaf37 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + fq.hpp \ i_inout.hpp \ io_object.hpp \ io_thread.hpp \ @@ -75,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ + lb.hpp \ msg_content.hpp \ mutex.hpp \ object.hpp \ @@ -126,10 +128,12 @@ libzmq_la_SOURCES = app_thread.hpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + fq.cpp \ io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ + lb.cpp \ object.cpp \ options.cpp \ owned.cpp \ diff --git a/src/downstream.cpp b/src/downstream.cpp index 4f994e6..be1c4cc 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -24,8 +24,7 @@ #include "pipe.hpp" zmq::downstream_t::downstream_t (class app_thread_t *parent_) : - socket_base_t (parent_), - current (0) + socket_base_t (parent_) { options.requires_in = false; options.requires_out = true; @@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { zmq_assert (!inpipe_ && outpipe_); - pipes.push_back (outpipe_); + lb.attach (outpipe_); } void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) @@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) { zmq_assert (pipe_); - pipes.erase (pipes.index (pipe_)); + lb.detach (pipe_); } void zmq::downstream_t::xkill (class reader_t *pipe_) @@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) { - // If there are no pipes we cannot send the message. - if (pipes.empty ()) { - errno = EAGAIN; - return -1; - } - - // Move to the next pipe (load-balancing). - current++; - if (current >= pipes.size ()) - current = 0; - - // TODO: Implement this once queue limits are in-place. - zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); - - // Push message to the selected pipe. - pipes [current]->write (msg_); - pipes [current]->flush (); - - // Detach the message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - - return 0; + return lb.send (msg_, flags_); } int zmq::downstream_t::xflush () @@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in () bool zmq::downstream_t::xhas_out () { - // TODO: Modify this code once pipe limits are in place. - return true; + return lb.has_out (); } diff --git a/src/downstream.hpp b/src/downstream.hpp index c6a7ed8..bf8cabb 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -21,7 +21,7 @@ #define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ #include "socket_base.hpp" -#include "yarray.hpp" +#include "lb.hpp" namespace zmq { @@ -48,12 +48,8 @@ namespace zmq private: - // List of outbound pipes. - typedef yarray_t pipes_t; - pipes_t pipes; - - // Points to the last pipe that the most recent message was sent to. - pipes_t::size_type current; + // Load balancer managing the outbound pipes. + lb_t lb; downstream_t (const downstream_t&); void operator = (const downstream_t&); diff --git a/src/fq.cpp b/src/fq.cpp new file mode 100644 index 0000000..2c6fffb --- /dev/null +++ b/src/fq.cpp @@ -0,0 +1,106 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "fq.hpp" +#include "pipe.hpp" +#include "err.hpp" + +zmq::fq_t::fq_t () : + active (0), + current (0) +{ +} + +zmq::fq_t::~fq_t () +{ + for (pipes_t::size_type i = 0; i != pipes.size (); i++) + pipes [i]->term (); +} + +void zmq::fq_t::attach (reader_t *pipe_) +{ + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::fq_t::detach (reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + if (pipes.index (pipe_) < active) + active--; + pipes.erase (pipe_); +} + +void zmq::fq_t::kill (reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::fq_t::revive (reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::fq_t::has_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + diff --git a/src/fq.hpp b/src/fq.hpp new file mode 100644 index 0000000..a823808 --- /dev/null +++ b/src/fq.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_FQ_HPP_INCLUDED__ +#define __ZMQ_FQ_HPP_INCLUDED__ + +#include "yarray.hpp" + +namespace zmq +{ + + // Class manages a set of inbound pipes. On receive it performs fair + // queueing (RFC970) so that senders gone berserk won't cause denial of + // service for decent senders. + class fq_t + { + public: + + fq_t (); + ~fq_t (); + + void attach (class reader_t *pipe_); + void detach (class reader_t *pipe_); + void kill (class reader_t *pipe_); + void revive (class reader_t *pipe_); + int recv (zmq_msg_t *msg_, int flags_); + bool has_in (); + + private: + + // Inbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + fq_t (const fq_t&); + void operator = (const fq_t&); + }; + +} + +#endif diff --git a/src/lb.cpp b/src/lb.cpp new file mode 100644 index 0000000..4db8594 --- /dev/null +++ b/src/lb.cpp @@ -0,0 +1,111 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "lb.hpp" +#include "pipe.hpp" +#include "err.hpp" + +zmq::lb_t::lb_t () : + active (0), + current (0) +{ +} + +zmq::lb_t::~lb_t () +{ + for (pipes_t::size_type i = 0; i != pipes.size (); i++) + pipes [i]->term (); +} + +void zmq::lb_t::attach (writer_t *pipe_) +{ + pipes.push_back (pipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::lb_t::detach (writer_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + if (pipes.index (pipe_) < active) + active--; + pipes.erase (pipe_); +} + +void zmq::lb_t::kill (writer_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::lb_t::revive (writer_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= active) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +bool zmq::lb_t::has_out () +{ + for (int count = active; count != 0; count--) { + + // We should be able to write at least 1-byte message to interrupt + // polling for POLLOUT. + // TODO: Shouldn't we use a saner value here? + if (pipes [current]->check_write (1)) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + diff --git a/src/lb.hpp b/src/lb.hpp new file mode 100644 index 0000000..21843c3 --- /dev/null +++ b/src/lb.hpp @@ -0,0 +1,63 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_LB_HPP_INCLUDED__ +#define __ZMQ_LB_HPP_INCLUDED__ + +#include "yarray.hpp" + +namespace zmq +{ + + // Class manages a set of outbound pipes. On send it load balances + // messages fairly among the pipes. + class lb_t + { + public: + + lb_t (); + ~lb_t (); + + void attach (class writer_t *pipe_); + void detach (class writer_t *pipe_); + void kill (class writer_t *pipe_); + void revive (class writer_t *pipe_); + int send (zmq_msg_t *msg_, int flags_); + bool has_out (); + + private: + + // List of outbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + lb_t (const lb_t&); + void operator = (const lb_t&); + }; + +} + +#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index aaccd0a..e3f7996 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -171,7 +171,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for joined peer. - it->second.decoder = new zmq_decoder_t (0); + it->second.decoder = new zmq_decoder_t (0, NULL, 0); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 69cb586..676ed93 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -35,7 +35,7 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_, const char *session_name_) : io_object_t (parent_), - encoder (0), + encoder (0, false), pgm_socket (false, options_), options (options_), session_name (session_name_), diff --git a/src/sub.cpp b/src/sub.cpp index a7f9783..e5dbe76 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,12 +21,9 @@ #include "sub.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), - active (0), - current (0), all_count (0) { options.requires_in = true; @@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::~sub_t () { - for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) - in_pipes [i]->term (); - in_pipes.clear (); } void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { - zmq_assert (!outpipe_); - in_pipes.push_back (inpipe_); - in_pipes.swap (active, in_pipes.size () - 1); - active++; + zmq_assert (inpipe_ && !outpipe_); + fq.attach (inpipe_); } void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) { - if (in_pipes.index (pipe_) < active) - active--; - in_pipes.erase (pipe_); + zmq_assert (pipe_); + fq.detach (pipe_); } void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) { + // SUB socket is read-only thus there should be no outpipes. zmq_assert (false); } void zmq::sub_t::xkill (class reader_t *pipe_) { - // Move the pipe to the list of inactive pipes. - in_pipes.swap (in_pipes.index (pipe_), active - 1); - active--; + fq.kill (pipe_); } void zmq::sub_t::xrevive (class reader_t *pipe_) { - // Move the pipe to the list of active pipes. - in_pipes.swap (in_pipes.index (pipe_), active); - active++; + fq.revive (pipe_); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, @@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) while (true) { // Get a message using fair queueing algorithm. - int rc = fq (msg_, flags_); + int rc = fq.recv (msg_, flags_); // If there's no message available, return immediately. if (rc != 0 && errno == EAGAIN) @@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) } } -int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - bool fetched = in_pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; - if (fetched) - return 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - errno = EAGAIN; - return -1; -} - bool zmq::sub_t::xhas_in () { // TODO: This is more complex as we have to ignore all the messages that diff --git a/src/sub.hpp b/src/sub.hpp index 8ad8a18..1eafdac 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,7 +24,7 @@ #include #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp" namespace zmq { @@ -53,26 +53,15 @@ namespace zmq private: - // Helper function to return one message choosed using - // fair queueing algorithm. - int fq (zmq_msg_t *msg_, int flags_); - - // Inbound pipes, i.e. those the socket is getting messages from. - typedef yarray_t in_pipes_t; - in_pipes_t in_pipes; - - // Number of active inbound pipes. Active pipes are stored in the - // initial section of the in_pipes array. - in_pipes_t::size_type active; - - // Index of the next inbound pipe to read messages from. - in_pipes_t::size_type current; + // Fair queueing object for inbound pipes. + fq_t fq; // Number of active "*" subscriptions. int all_count; - // List of all prefix subscriptions. typedef std::multiset subscriptions_t; + + // List of all prefix subscriptions. subscriptions_t prefixes; // List of all exact match subscriptions. diff --git a/src/upstream.cpp b/src/upstream.cpp index da202f8..32de63a 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -21,12 +21,9 @@ #include "upstream.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::upstream_t::upstream_t (class app_thread_t *parent_) : - socket_base_t (parent_), - active (0), - current (0) + socket_base_t (parent_) { options.requires_in = true; options.requires_out = false; @@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { zmq_assert (inpipe_ && !outpipe_); - - pipes.push_back (inpipe_); - pipes.swap (active, pipes.size () - 1); - active++; + fq.attach (inpipe_); } void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) { - // Remove the pipe from the list; adjust number of active pipes - // accordingly. zmq_assert (pipe_); - pipes_t::size_type index = pipes.index (pipe_); - if (index < active) - active--; - pipes.erase (index); + fq.detach (pipe_); } void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) @@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) void zmq::upstream_t::xkill (class reader_t *pipe_) { - // Move the pipe to the list of inactive pipes. - active--; - pipes.swap (pipes.index (pipe_), active); + fq.kill (pipe_); } void zmq::upstream_t::xrevive (class reader_t *pipe_) { - // Move the pipe to the list of active pipes. - pipes.swap (pipes.index (pipe_), active); - active++; + fq.revive (pipe_); } int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, @@ -99,41 +84,12 @@ int zmq::upstream_t::xflush () int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) { - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - bool fetched = pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; - if (fetched) - return 0; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - errno = EAGAIN; - return -1; + return fq.recv (msg_, flags_); } bool zmq::upstream_t::xhas_in () { - // Note that messing with current doesn't break the fairness of fair - // queueing algorithm. If there are no messages available current will - // get back to its original value. Otherwise it'll point to the first - // pipe holding messages, skipping only pipes with no messages available. - for (int count = active; count != 0; count--) { - if (pipes [current]->check_read ()) - return true; - current++; - if (current >= active) - current = 0; - } - - return false; + return fq.has_in (); } bool zmq::upstream_t::xhas_out () diff --git a/src/upstream.hpp b/src/upstream.hpp index 0e2f5ad..3c82cdb 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -21,7 +21,7 @@ #define __ZMQ_UPSTREAM_HPP_INCLUDED__ #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp" namespace zmq { @@ -48,16 +48,8 @@ namespace zmq private: - // Inbound pipes. - typedef yarray_t pipes_t; - pipes_t pipes; - - // Number of active pipes. All the active pipes are located at the - // beginning of the pipes array. - pipes_t::size_type active; - - // Index of the next bound pipe to read a message from. - pipes_t::size_type current; + // Fair queueing object for inbound pipes. + fq_t fq; upstream_t (const upstream_t&); void operator = (const upstream_t&); diff --git a/src/xrep.cpp b/src/xrep.cpp index 1b6a536..4fa250b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -21,7 +21,6 @@ #include "xrep.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_) @@ -37,12 +36,16 @@ zmq::xrep_t::~xrep_t () void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { + zmq_assert (inpipe_ && outpipe_); + fq.attach (inpipe_); + zmq_assert (false); } void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_); + fq.detach (pipe_); } void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) @@ -52,12 +55,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) void zmq::xrep_t::xkill (class reader_t *pipe_) { - zmq_assert (false); + fq.kill (pipe_); } void zmq::xrep_t::xrevive (class reader_t *pipe_) { - zmq_assert (false); + fq.revive (pipe_); } int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, @@ -81,14 +84,12 @@ int zmq::xrep_t::xflush () int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + return fq.recv (msg_, flags_); } bool zmq::xrep_t::xhas_in () { - zmq_assert (false); - return false; + return fq.has_in (); } bool zmq::xrep_t::xhas_out () diff --git a/src/xrep.hpp b/src/xrep.hpp index de42036..66cb611 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -21,7 +21,7 @@ #define __ZMQ_XREP_HPP_INCLUDED__ #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp" namespace zmq { @@ -48,6 +48,9 @@ namespace zmq private: + // Inbound messages are fair-queued. + fq_t fq; + xrep_t (const xrep_t&); void operator = (const xrep_t&); }; diff --git a/src/xreq.cpp b/src/xreq.cpp index d359dc0..9b95393 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -21,7 +21,6 @@ #include "xreq.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : socket_base_t (parent_) @@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t () void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { - zmq_assert (false); + zmq_assert (inpipe_ && outpipe_); + fq.attach (inpipe_); + lb.attach (outpipe_); } void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_); + fq.detach (pipe_); } void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_); + lb.detach (pipe_); } void zmq::xreq_t::xkill (class reader_t *pipe_) { - zmq_assert (false); + fq.kill (pipe_); } void zmq::xreq_t::xrevive (class reader_t *pipe_) { - zmq_assert (false); + fq.revive (pipe_); } int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, @@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + return lb.send (msg_, flags_); } int zmq::xreq_t::xflush () { + // TODO: Implement flushing. zmq_assert (false); return -1; } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + return fq.recv (msg_, flags_); } bool zmq::xreq_t::xhas_in () { - zmq_assert (false); - return false; + return fq.has_in (); } bool zmq::xreq_t::xhas_out () { - zmq_assert (false); - return false; + return lb.has_out (); } diff --git a/src/xreq.hpp b/src/xreq.hpp index 8d6a3b2..fdf8b0f 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -21,7 +21,8 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" -#include "yarray.hpp" +#include "fq.hpp" +#include "lb.hpp" namespace zmq { @@ -48,6 +49,11 @@ namespace zmq private: + // Messages are fair-queued from inbound pipes. And load-balanced to + // the outbound pipes. + fq_t fq; + lb_t lb; + xreq_t (const xreq_t&); void operator = (const xreq_t&); }; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f488272..b9617fc 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -17,23 +17,41 @@ along with this program. If not, see . */ +#include +#include + #include "zmq_decoder.hpp" #include "i_inout.hpp" #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_, + void *prefix_, size_t prefix_size_) : decoder_t (bufsize_), destination (NULL) { zmq_msg_init (&in_progress); + if (!prefix_) { + prefix = NULL; + prefix_size = 0; + } + else { + prefix = malloc (prefix_size_); + zmq_assert (prefix); + memcpy (prefix, prefix_, prefix_size_); + prefix_size = prefix_size_; + } + // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); } zmq::zmq_decoder_t::~zmq_decoder_t () { + if (prefix) + free (prefix); + zmq_msg_close (&in_progress); } @@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), *tmpbuf, - &zmq_decoder_t::message_ready); + // Fill in the message prefix if any. + if (prefix) + memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, + *tmpbuf, &zmq_decoder_t::message_ready); } return true; } @@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, size); + int rc = zmq_msg_init_size (&in_progress, prefix_size + size); errno_assert (rc == 0); - next_step (zmq_msg_data (&in_progress), size, - &zmq_decoder_t::message_ready); + // Fill in the message prefix if any. + if (prefix) + memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size , + size, &zmq_decoder_t::message_ready); return true; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index c5433b7..6df2558 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -32,7 +32,9 @@ namespace zmq { public: - zmq_decoder_t (size_t bufsize_); + // If prefix is not NULL, it will be glued to the beginning of every + // decoded message. + zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_); ~zmq_decoder_t (); void set_inout (struct i_inout *destination_); @@ -47,6 +49,9 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; + void *prefix; + size_t prefix_size; + zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); }; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index cf129e5..4824cd1 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,9 +21,10 @@ #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) : encoder_t (bufsize_), - source (NULL) + source (NULL), + trim_prefix (trim_prefix_) { zmq_msg_init (&in_progress); @@ -44,8 +45,16 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_encoder_t::message_ready, false); + if (!trim_prefix) { + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_encoder_t::message_ready, false); + } + else { + size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, + zmq_msg_size (&in_progress) - prefix_size, + &zmq_encoder_t::message_ready, false); + } return true; } @@ -63,7 +72,11 @@ bool zmq::zmq_encoder_t::message_ready () return false; } + // Get the message size. If the prefix is not to be sent, adjust the + // size accordingly. size_t size = zmq_msg_size (&in_progress); + if (trim_prefix) + size -= *(unsigned char*) zmq_msg_data (&in_progress); // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 825e60f..8d4e956 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - zmq_encoder_t (size_t bufsize_); + zmq_encoder_t (size_t bufsize_, bool trim_prefix_); ~zmq_encoder_t (); void set_inout (struct i_inout *source_); @@ -46,6 +46,8 @@ namespace zmq ::zmq_msg_t in_progress; unsigned char tmpbuf [9]; + bool trim_prefix; + zmq_encoder_t (const zmq_encoder_t&); void operator = (const zmq_encoder_t&); }; diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index c04f29b..18fc616 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,10 +29,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, io_object_t (parent_), inpos (NULL), insize (0), - decoder (in_batch_size), + decoder (in_batch_size, NULL, 0), outpos (NULL), outsize (0), - encoder (out_batch_size), + encoder (out_batch_size, false), inout (NULL), options (options_) { -- cgit v1.2.3