summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am4
-rw-r--r--src/downstream.cpp34
-rw-r--r--src/downstream.hpp10
-rw-r--r--src/fq.cpp106
-rw-r--r--src/fq.hpp64
-rw-r--r--src/lb.cpp111
-rw-r--r--src/lb.hpp63
-rw-r--r--src/pgm_receiver.cpp2
-rw-r--r--src/pgm_sender.cpp2
-rw-r--r--src/sub.cpp50
-rw-r--r--src/sub.hpp21
-rw-r--r--src/upstream.cpp58
-rw-r--r--src/upstream.hpp14
-rw-r--r--src/xrep.cpp17
-rw-r--r--src/xrep.hpp5
-rw-r--r--src/xreq.cpp28
-rw-r--r--src/xreq.hpp8
-rw-r--r--src/zmq_decoder.cpp40
-rw-r--r--src/zmq_decoder.hpp7
-rw-r--r--src/zmq_encoder.cpp21
-rw-r--r--src/zmq_encoder.hpp4
-rw-r--r--src/zmq_engine.cpp4
22 files changed, 476 insertions, 197 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index a733408..0fdaf37 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ fq.hpp \
i_inout.hpp \
io_object.hpp \
io_thread.hpp \
@@ -75,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
i_poll_events.hpp \
i_signaler.hpp \
kqueue.hpp \
+ lb.hpp \
msg_content.hpp \
mutex.hpp \
object.hpp \
@@ -126,10 +128,12 @@ libzmq_la_SOURCES = app_thread.hpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
+ fq.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
+ lb.cpp \
object.cpp \
options.cpp \
owned.cpp \
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 4f994e6..be1c4cc 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -24,8 +24,7 @@
#include "pipe.hpp"
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- current (0)
+ socket_base_t (parent_)
{
options.requires_in = false;
options.requires_out = true;
@@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!inpipe_ && outpipe_);
- pipes.push_back (outpipe_);
+ lb.attach (outpipe_);
}
void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
@@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
- pipes.erase (pipes.index (pipe_));
+ lb.detach (pipe_);
}
void zmq::downstream_t::xkill (class reader_t *pipe_)
@@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
- // If there are no pipes we cannot send the message.
- if (pipes.empty ()) {
- errno = EAGAIN;
- return -1;
- }
-
- // Move to the next pipe (load-balancing).
- current++;
- if (current >= pipes.size ())
- current = 0;
-
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
-
- // Push message to the selected pipe.
- pipes [current]->write (msg_);
- pipes [current]->flush ();
-
- // Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return 0;
+ return lb.send (msg_, flags_);
}
int zmq::downstream_t::xflush ()
@@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in ()
bool zmq::downstream_t::xhas_out ()
{
- // TODO: Modify this code once pipe limits are in place.
- return true;
+ return lb.has_out ();
}
diff --git a/src/downstream.hpp b/src/downstream.hpp
index c6a7ed8..bf8cabb 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -21,7 +21,7 @@
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "yarray.hpp"
+#include "lb.hpp"
namespace zmq
{
@@ -48,12 +48,8 @@ namespace zmq
private:
- // List of outbound pipes.
- typedef yarray_t <class writer_t> pipes_t;
- pipes_t pipes;
-
- // Points to the last pipe that the most recent message was sent to.
- pipes_t::size_type current;
+ // Load balancer managing the outbound pipes.
+ lb_t lb;
downstream_t (const downstream_t&);
void operator = (const downstream_t&);
diff --git a/src/fq.cpp b/src/fq.cpp
new file mode 100644
index 0000000..2c6fffb
--- /dev/null
+++ b/src/fq.cpp
@@ -0,0 +1,106 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "fq.hpp"
+#include "pipe.hpp"
+#include "err.hpp"
+
+zmq::fq_t::fq_t () :
+ active (0),
+ current (0)
+{
+}
+
+zmq::fq_t::~fq_t ()
+{
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->term ();
+}
+
+void zmq::fq_t::attach (reader_t *pipe_)
+{
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::fq_t::detach (reader_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
+}
+
+void zmq::fq_t::kill (reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::fq_t::revive (reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+bool zmq::fq_t::has_in ()
+{
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = active; count != 0; count--) {
+ if (pipes [current]->check_read ())
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
diff --git a/src/fq.hpp b/src/fq.hpp
new file mode 100644
index 0000000..a823808
--- /dev/null
+++ b/src/fq.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_FQ_HPP_INCLUDED__
+#define __ZMQ_FQ_HPP_INCLUDED__
+
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ // Class manages a set of inbound pipes. On receive it performs fair
+ // queueing (RFC970) so that senders gone berserk won't cause denial of
+ // service for decent senders.
+ class fq_t
+ {
+ public:
+
+ fq_t ();
+ ~fq_t ();
+
+ void attach (class reader_t *pipe_);
+ void detach (class reader_t *pipe_);
+ void kill (class reader_t *pipe_);
+ void revive (class reader_t *pipe_);
+ int recv (zmq_msg_t *msg_, int flags_);
+ bool has_in ();
+
+ private:
+
+ // Inbound pipes.
+ typedef yarray_t <class reader_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Index of the next bound pipe to read a message from.
+ pipes_t::size_type current;
+
+ fq_t (const fq_t&);
+ void operator = (const fq_t&);
+ };
+
+}
+
+#endif
diff --git a/src/lb.cpp b/src/lb.cpp
new file mode 100644
index 0000000..4db8594
--- /dev/null
+++ b/src/lb.cpp
@@ -0,0 +1,111 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "lb.hpp"
+#include "pipe.hpp"
+#include "err.hpp"
+
+zmq::lb_t::lb_t () :
+ active (0),
+ current (0)
+{
+}
+
+zmq::lb_t::~lb_t ()
+{
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->term ();
+}
+
+void zmq::lb_t::attach (writer_t *pipe_)
+{
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::lb_t::detach (writer_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
+}
+
+void zmq::lb_t::kill (writer_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::lb_t::revive (writer_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= active)
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+bool zmq::lb_t::has_out ()
+{
+ for (int count = active; count != 0; count--) {
+
+ // We should be able to write at least 1-byte message to interrupt
+ // polling for POLLOUT.
+ // TODO: Shouldn't we use a saner value here?
+ if (pipes [current]->check_write (1))
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
diff --git a/src/lb.hpp b/src/lb.hpp
new file mode 100644
index 0000000..21843c3
--- /dev/null
+++ b/src/lb.hpp
@@ -0,0 +1,63 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_LB_HPP_INCLUDED__
+#define __ZMQ_LB_HPP_INCLUDED__
+
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ // Class manages a set of outbound pipes. On send it load balances
+ // messages fairly among the pipes.
+ class lb_t
+ {
+ public:
+
+ lb_t ();
+ ~lb_t ();
+
+ void attach (class writer_t *pipe_);
+ void detach (class writer_t *pipe_);
+ void kill (class writer_t *pipe_);
+ void revive (class writer_t *pipe_);
+ int send (zmq_msg_t *msg_, int flags_);
+ bool has_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ lb_t (const lb_t&);
+ void operator = (const lb_t&);
+ };
+
+}
+
+#endif
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index aaccd0a..e3f7996 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -171,7 +171,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for joined peer.
- it->second.decoder = new zmq_decoder_t (0);
+ it->second.decoder = new zmq_decoder_t (0, NULL, 0);
it->second.decoder->set_inout (inout);
}
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 69cb586..676ed93 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -35,7 +35,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
- encoder (0),
+ encoder (0, false),
pgm_socket (false, options_),
options (options_),
session_name (session_name_),
diff --git a/src/sub.cpp b/src/sub.cpp
index a7f9783..e5dbe76 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -21,12 +21,9 @@
#include "sub.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
- active (0),
- current (0),
all_count (0)
{
options.requires_in = true;
@@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
zmq::sub_t::~sub_t ()
{
- for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
- in_pipes [i]->term ();
- in_pipes.clear ();
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
- zmq_assert (!outpipe_);
- in_pipes.push_back (inpipe_);
- in_pipes.swap (active, in_pipes.size () - 1);
- active++;
+ zmq_assert (inpipe_ && !outpipe_);
+ fq.attach (inpipe_);
}
void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
{
- if (in_pipes.index (pipe_) < active)
- active--;
- in_pipes.erase (pipe_);
+ zmq_assert (pipe_);
+ fq.detach (pipe_);
}
void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
{
+ // SUB socket is read-only thus there should be no outpipes.
zmq_assert (false);
}
void zmq::sub_t::xkill (class reader_t *pipe_)
{
- // Move the pipe to the list of inactive pipes.
- in_pipes.swap (in_pipes.index (pipe_), active - 1);
- active--;
+ fq.kill (pipe_);
}
void zmq::sub_t::xrevive (class reader_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- in_pipes.swap (in_pipes.index (pipe_), active);
- active++;
+ fq.revive (pipe_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
@@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
while (true) {
// Get a message using fair queueing algorithm.
- int rc = fq (msg_, flags_);
+ int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN)
@@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
}
}
-int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
-{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
- if (fetched)
- return 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
-}
-
bool zmq::sub_t::xhas_in ()
{
// TODO: This is more complex as we have to ignore all the messages that
diff --git a/src/sub.hpp b/src/sub.hpp
index 8ad8a18..1eafdac 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -24,7 +24,7 @@
#include <string>
#include "socket_base.hpp"
-#include "yarray.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -53,26 +53,15 @@ namespace zmq
private:
- // Helper function to return one message choosed using
- // fair queueing algorithm.
- int fq (zmq_msg_t *msg_, int flags_);
-
- // Inbound pipes, i.e. those the socket is getting messages from.
- typedef yarray_t <class reader_t> in_pipes_t;
- in_pipes_t in_pipes;
-
- // Number of active inbound pipes. Active pipes are stored in the
- // initial section of the in_pipes array.
- in_pipes_t::size_type active;
-
- // Index of the next inbound pipe to read messages from.
- in_pipes_t::size_type current;
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
// Number of active "*" subscriptions.
int all_count;
- // List of all prefix subscriptions.
typedef std::multiset <std::string> subscriptions_t;
+
+ // List of all prefix subscriptions.
subscriptions_t prefixes;
// List of all exact match subscriptions.
diff --git a/src/upstream.cpp b/src/upstream.cpp
index da202f8..32de63a 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -21,12 +21,9 @@
#include "upstream.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- active (0),
- current (0)
+ socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = false;
@@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && !outpipe_);
-
- pipes.push_back (inpipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
+ fq.attach (inpipe_);
}
void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
zmq_assert (pipe_);
- pipes_t::size_type index = pipes.index (pipe_);
- if (index < active)
- active--;
- pipes.erase (index);
+ fq.detach (pipe_);
}
void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
@@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::upstream_t::xkill (class reader_t *pipe_)
{
- // Move the pipe to the list of inactive pipes.
- active--;
- pipes.swap (pipes.index (pipe_), active);
+ fq.kill (pipe_);
}
void zmq::upstream_t::xrevive (class reader_t *pipe_)
{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
+ fq.revive (pipe_);
}
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
@@ -99,41 +84,12 @@ int zmq::upstream_t::xflush ()
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
- if (fetched)
- return 0;
- }
-
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
+ return fq.recv (msg_, flags_);
}
bool zmq::upstream_t::xhas_in ()
{
- // Note that messing with current doesn't break the fairness of fair
- // queueing algorithm. If there are no messages available current will
- // get back to its original value. Otherwise it'll point to the first
- // pipe holding messages, skipping only pipes with no messages available.
- for (int count = active; count != 0; count--) {
- if (pipes [current]->check_read ())
- return true;
- current++;
- if (current >= active)
- current = 0;
- }
-
- return false;
+ return fq.has_in ();
}
bool zmq::upstream_t::xhas_out ()
diff --git a/src/upstream.hpp b/src/upstream.hpp
index 0e2f5ad..3c82cdb 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -21,7 +21,7 @@
#define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "yarray.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -48,16 +48,8 @@ namespace zmq
private:
- // Inbound pipes.
- typedef yarray_t <class reader_t> pipes_t;
- pipes_t pipes;
-
- // Number of active pipes. All the active pipes are located at the
- // beginning of the pipes array.
- pipes_t::size_type active;
-
- // Index of the next bound pipe to read a message from.
- pipes_t::size_type current;
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
upstream_t (const upstream_t&);
void operator = (const upstream_t&);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 1b6a536..4fa250b 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -21,7 +21,6 @@
#include "xrep.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
@@ -37,12 +36,16 @@ zmq::xrep_t::~xrep_t ()
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
+ zmq_assert (inpipe_ && outpipe_);
+ fq.attach (inpipe_);
+
zmq_assert (false);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
{
- zmq_assert (false);
+ zmq_assert (pipe_);
+ fq.detach (pipe_);
}
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
@@ -52,12 +55,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::xrep_t::xkill (class reader_t *pipe_)
{
- zmq_assert (false);
+ fq.kill (pipe_);
}
void zmq::xrep_t::xrevive (class reader_t *pipe_)
{
- zmq_assert (false);
+ fq.revive (pipe_);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
@@ -81,14 +84,12 @@ int zmq::xrep_t::xflush ()
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ return fq.recv (msg_, flags_);
}
bool zmq::xrep_t::xhas_in ()
{
- zmq_assert (false);
- return false;
+ return fq.has_in ();
}
bool zmq::xrep_t::xhas_out ()
diff --git a/src/xrep.hpp b/src/xrep.hpp
index de42036..66cb611 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -21,7 +21,7 @@
#define __ZMQ_XREP_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "yarray.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -48,6 +48,9 @@ namespace zmq
private:
+ // Inbound messages are fair-queued.
+ fq_t fq;
+
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};
diff --git a/src/xreq.cpp b/src/xreq.cpp
index d359dc0..9b95393 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -21,7 +21,6 @@
#include "xreq.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
@@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t ()
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
- zmq_assert (false);
+ zmq_assert (inpipe_ && outpipe_);
+ fq.attach (inpipe_);
+ lb.attach (outpipe_);
}
void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
{
- zmq_assert (false);
+ zmq_assert (pipe_);
+ fq.detach (pipe_);
}
void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
{
- zmq_assert (false);
+ zmq_assert (pipe_);
+ lb.detach (pipe_);
}
void zmq::xreq_t::xkill (class reader_t *pipe_)
{
- zmq_assert (false);
+ fq.kill (pipe_);
}
void zmq::xreq_t::xrevive (class reader_t *pipe_)
{
- zmq_assert (false);
+ fq.revive (pipe_);
}
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
@@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ return lb.send (msg_, flags_);
}
int zmq::xreq_t::xflush ()
{
+ // TODO: Implement flushing.
zmq_assert (false);
return -1;
}
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ return fq.recv (msg_, flags_);
}
bool zmq::xreq_t::xhas_in ()
{
- zmq_assert (false);
- return false;
+ return fq.has_in ();
}
bool zmq::xreq_t::xhas_out ()
{
- zmq_assert (false);
- return false;
+ return lb.has_out ();
}
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 8d6a3b2..fdf8b0f 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -21,7 +21,8 @@
#define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "yarray.hpp"
+#include "fq.hpp"
+#include "lb.hpp"
namespace zmq
{
@@ -48,6 +49,11 @@ namespace zmq
private:
+ // Messages are fair-queued from inbound pipes. And load-balanced to
+ // the outbound pipes.
+ fq_t fq;
+ lb_t lb;
+
xreq_t (const xreq_t&);
void operator = (const xreq_t&);
};
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f488272..b9617fc 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -17,23 +17,41 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdlib.h>
+#include <string.h>
+
#include "zmq_decoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
#include "err.hpp"
-zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
+zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
+ void *prefix_, size_t prefix_size_) :
decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL)
{
zmq_msg_init (&in_progress);
+ if (!prefix_) {
+ prefix = NULL;
+ prefix_size = 0;
+ }
+ else {
+ prefix = malloc (prefix_size_);
+ zmq_assert (prefix);
+ memcpy (prefix, prefix_, prefix_size_);
+ prefix_size = prefix_size_;
+ }
+
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
+ if (prefix)
+ free (prefix);
+
zmq_msg_close (&in_progress);
}
@@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
errno_assert (rc == 0);
- next_step (zmq_msg_data (&in_progress), *tmpbuf,
- &zmq_decoder_t::message_ready);
+ // Fill in the message prefix if any.
+ if (prefix)
+ memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
+ *tmpbuf, &zmq_decoder_t::message_ready);
}
return true;
}
@@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, size);
+ int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
errno_assert (rc == 0);
- next_step (zmq_msg_data (&in_progress), size,
- &zmq_decoder_t::message_ready);
+ // Fill in the message prefix if any.
+ if (prefix)
+ memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
+ size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index c5433b7..6df2558 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -32,7 +32,9 @@ namespace zmq
{
public:
- zmq_decoder_t (size_t bufsize_);
+ // If prefix is not NULL, it will be glued to the beginning of every
+ // decoded message.
+ zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
@@ -47,6 +49,9 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
+ void *prefix;
+ size_t prefix_size;
+
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
};
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index cf129e5..4824cd1 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -21,9 +21,10 @@
#include "i_inout.hpp"
#include "wire.hpp"
-zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
+zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) :
encoder_t <zmq_encoder_t> (bufsize_),
- source (NULL)
+ source (NULL),
+ trim_prefix (trim_prefix_)
{
zmq_msg_init (&in_progress);
@@ -44,8 +45,16 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
- &zmq_encoder_t::message_ready, false);
+ if (!trim_prefix) {
+ next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ &zmq_encoder_t::message_ready, false);
+ }
+ else {
+ size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
+ next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
+ zmq_msg_size (&in_progress) - prefix_size,
+ &zmq_encoder_t::message_ready, false);
+ }
return true;
}
@@ -63,7 +72,11 @@ bool zmq::zmq_encoder_t::message_ready ()
return false;
}
+ // Get the message size. If the prefix is not to be sent, adjust the
+ // size accordingly.
size_t size = zmq_msg_size (&in_progress);
+ if (trim_prefix)
+ size -= *(unsigned char*) zmq_msg_data (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 825e60f..8d4e956 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- zmq_encoder_t (size_t bufsize_);
+ zmq_encoder_t (size_t bufsize_, bool trim_prefix_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
@@ -46,6 +46,8 @@ namespace zmq
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
+ bool trim_prefix;
+
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
};
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index c04f29b..18fc616 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -29,10 +29,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
io_object_t (parent_),
inpos (NULL),
insize (0),
- decoder (in_batch_size),
+ decoder (in_batch_size, NULL, 0),
outpos (NULL),
outsize (0),
- encoder (out_batch_size),
+ encoder (out_batch_size, false),
inout (NULL),
options (options_)
{