summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Hurton <hurtonm@gmail.com>2010-03-01 10:13:26 +0100
committerMartin Hurton <hurtonm@gmail.com>2010-03-12 11:07:38 +0100
commit61ee6fae536a8000be87b5aaf271f6519a3b7d3f (patch)
tree4c088ad3c62ff35a5e5482d9127dc510e5b3aaf7
parent31d36104aa7caead6f299f0c5cb58a9fde7cf9b0 (diff)
Implement flow control
This commit introduces the necessary changes necessary for implementing flow control. None of the socket types implements the flow control yet. The code will crash when the flow control is enabled and the thw lwm is reached. The following commits will add flow-control support for individual socket types.
-rw-r--r--src/command.hpp8
-rw-r--r--src/downstream.cpp6
-rw-r--r--src/downstream.hpp1
-rw-r--r--src/err.hpp6
-rw-r--r--src/i_endpoint.hpp1
-rw-r--r--src/i_engine.hpp2
-rw-r--r--src/lb.cpp49
-rw-r--r--src/lb.hpp1
-rw-r--r--src/object.cpp19
-rw-r--r--src/object.hpp3
-rw-r--r--src/options.cpp8
-rw-r--r--src/options.hpp4
-rw-r--r--src/p2p.cpp11
-rw-r--r--src/p2p.hpp1
-rw-r--r--src/pgm_receiver.cpp5
-rw-r--r--src/pgm_receiver.hpp1
-rw-r--r--src/pgm_sender.cpp5
-rw-r--r--src/pgm_sender.hpp1
-rw-r--r--src/pipe.cpp57
-rw-r--r--src/pipe.hpp29
-rw-r--r--src/pub.cpp16
-rw-r--r--src/pub.hpp1
-rw-r--r--src/rep.cpp10
-rw-r--r--src/rep.hpp1
-rw-r--r--src/req.cpp11
-rw-r--r--src/req.hpp1
-rw-r--r--src/session.cpp7
-rw-r--r--src/session.hpp1
-rw-r--r--src/socket_base.cpp5
-rw-r--r--src/socket_base.hpp2
-rw-r--r--src/sub.cpp5
-rw-r--r--src/sub.hpp1
-rw-r--r--src/upstream.cpp5
-rw-r--r--src/upstream.hpp1
-rw-r--r--src/xrep.cpp8
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp6
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/zmq_engine.cpp17
-rw-r--r--src/zmq_engine.hpp1
40 files changed, 242 insertions, 77 deletions
diff --git a/src/command.hpp b/src/command.hpp
index 150cad1..3d00cd7 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -40,6 +40,7 @@ namespace zmq
attach,
bind,
revive,
+ reader_info,
pipe_term,
pipe_term_ack,
term_req,
@@ -84,6 +85,13 @@ namespace zmq
struct {
} revive;
+ // Sent by pipe reader to inform pipe writer
+ // about how many messages it has read so far.
+ // Used to implement the flow control.
+ struct {
+ uint64_t msgs_read;
+ } reader_info;
+
// Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe.
struct {
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 7ff88f1..feeb8c3 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
+void zmq::downstream_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out ()
return lb.has_out ();
}
-
diff --git a/src/downstream.hpp b/src/downstream.hpp
index dbd79a5..998ab73 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/err.hpp b/src/err.hpp
index 6c13b02..2b76569 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -115,3 +115,9 @@ namespace zmq
} while (false)
#endif
+
+#define zmq_not_implemented() \
+ do {\
+ fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\
+ abort ();\
+ } while (false)
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index ddab6a4..0d14224 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -35,6 +35,7 @@ namespace zmq
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
+ virtual void revive (class writer_t *pipe_) = 0;
};
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 81b56df..bb5f391 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -41,6 +41,8 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
+ virtual void resume_input () = 0;
+
// Engine should add the prefix supplied to all inbound messages.
virtual void add_prefix (const blob_t &identity_) = 0;
diff --git a/src/lb.cpp b/src/lb.cpp
index 4743ac6..d7193f1 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_);
}
-void zmq::lb_t::kill (writer_t *pipe_)
-{
- // Move the pipe to the list of inactive pipes.
- active--;
- if (current == active)
- current = 0;
- pipes.swap (pipes.index (pipe_), active);
-}
-
void zmq::lb_t::revive (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
+ while (active > 0) {
+ if (pipes [current]->write (msg_))
+ break;
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
+ current = 0;
+ }
+
// If there are no pipes we cannot send the message.
- if (pipes.empty ()) {
+ if (active == 0) {
errno = EAGAIN;
return -1;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
-
- // Push message to the selected pipe.
- pipes [current]->write (msg_);
- pipes [current]->flush ();
+ if (!(flags_ & ZMQ_NOFLUSH))
+ pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
- current++;
- if (current >= active)
- current = 0;
+ current = (current + 1) % active;
return 0;
}
bool zmq::lb_t::has_out ()
{
- for (int count = active; count != 0; count--) {
-
- // We should be able to write at least 1-byte message to interrupt
- // polling for POLLOUT.
- // TODO: Shouldn't we use a saner value here?
- if (pipes [current]->check_write (1))
+ while (active > 0) {
+ if (pipes [current]->check_write ())
return true;
- current++;
- if (current >= active)
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
current = 0;
}
diff --git a/src/lb.hpp b/src/lb.hpp
index a0998ad..5bddc1e 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -36,7 +36,6 @@ namespace zmq
void attach (class writer_t *pipe_);
void detach (class writer_t *pipe_);
- void kill (class writer_t *pipe_);
void revive (class writer_t *pipe_);
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
diff --git a/src/object.cpp b/src/object.cpp
index 356fcd1..5821c89 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum ();
break;
+ case command_t::reader_info:
+ process_reader_info (cmd_.args.reader_info.msgs_read);
+ break;
+
case command_t::pipe_term:
process_pipe_term ();
return;
@@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_)
send_command (cmd);
}
+void zmq::object_t::send_reader_info (writer_t *destination_,
+ uint64_t msgs_read_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::reader_info;
+ cmd.args.reader_info.msgs_read = msgs_read_;
+ send_command (cmd);
+}
+
void zmq::object_t::send_pipe_term (writer_t *destination_)
{
command_t cmd;
@@ -323,6 +337,11 @@ void zmq::object_t::process_revive ()
zmq_assert (false);
}
+void zmq::object_t::process_reader_info (uint64_t msgs_read_)
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_pipe_term ()
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index 1544109..f29342e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -71,6 +71,8 @@ namespace zmq
class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
+ void send_reader_info (class writer_t *destination_,
+ uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_,
@@ -88,6 +90,7 @@ namespace zmq
virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
+ virtual void process_reader_info (uint64_t msgs_read_);
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_);
diff --git a/src/options.cpp b/src/options.cpp
index c0d5339..a713ede 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
switch (option_) {
case ZMQ_HWM:
- if (optvallen_ != sizeof (int64_t)) {
+ if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
- hwm = *((int64_t*) optval_);
+ hwm = *((uint64_t*) optval_);
return 0;
case ZMQ_LWM:
- if (optvallen_ != sizeof (int64_t)) {
+ if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
- lwm = *((int64_t*) optval_);
+ lwm = *((uint64_t*) optval_);
return 0;
case ZMQ_SWAP:
diff --git a/src/options.hpp b/src/options.hpp
index 6d9be4d..eba8ab8 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -33,8 +33,8 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- int64_t hwm;
- int64_t lwm;
+ uint64_t hwm;
+ uint64_t lwm;
int64_t swap;
uint64_t affinity;
blob_t identity;
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 334cfcc..728854b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)
alive = true;
}
+void zmq::p2p_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (outpipe->check_write (zmq_msg_size (msg_)));
-
- outpipe->write (msg_);
+ bool written = outpipe->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
outpipe->flush ();
diff --git a/src/p2p.hpp b/src/p2p.hpp
index bca0eab..e12b58c 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -39,6 +39,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index e708229..5480030 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
+void zmq::pgm_receiver_t::resume_input ()
+{
+ zmq_not_implemented ();
+}
+
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 3f0ef81..dd9402f 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 27b4d0c..01eac2b 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
+void zmq::pgm_sender_t::resume_input ()
+{
+ zmq_assert (false);
+}
+
void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 951c417..4b232b1 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index da019c1..53dfb21 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
zmq::reader_t::~reader_t ()
@@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return false;
}
- // TODO: Adjust the size of the pipe.
+ msgs_read++;
+ if (lwm > 0 && msgs_read % lwm == 0)
+ send_reader_info (peer, msgs_read);
return true;
}
@@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
+ msgs_written (0),
+ stalled (false),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
peer = &pipe->reader;
}
-bool zmq::writer_t::check_write (uint64_t size_)
+bool zmq::writer_t::check_write ()
{
- // TODO: Check whether hwm is exceeded.
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
return true;
}
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
+
pipe->write (*msg_);
+ msgs_written++;
return true;
-
- // TODO: Adjust size of the pipe.
}
void zmq::writer_t::rollback ()
{
- while (true) {
- zmq_msg_t msg;
- if (!pipe->unwrite (&msg))
- break;
+ zmq_msg_t msg;
+
+ while (pipe->unwrite (&msg)) {
zmq_msg_close (&msg);
+ msgs_written--;
}
- // TODO: We don't have to inform the reader side of the pipe about
- // the event. We'll simply adjust the pipe size and keep calm.
+ if (stalled && endpoint != NULL && !pipe_full()) {
+ stalled = false;
+ endpoint->revive (this);
+ }
}
void zmq::writer_t::flush ()
@@ -179,6 +200,15 @@ void zmq::writer_t::term ()
pipe->flush ();
}
+void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
+{
+ msgs_read = msgs_read_;
+ if (stalled && endpoint != NULL) {
+ stalled = false;
+ endpoint->revive (this);
+ }
+}
+
void zmq::writer_t::process_pipe_term ()
{
if (endpoint)
@@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term ()
send_pipe_term_ack (p);
}
+bool zmq::writer_t::pipe_full ()
+{
+ return hwm > 0 && msgs_written - msgs_read == hwm;
+}
+
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, hwm_, lwm_),
diff --git a/src/pipe.hpp b/src/pipe.hpp
index df3d0b1..0ac7fc5 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -68,10 +68,8 @@ namespace zmq
uint64_t hwm;
uint64_t lwm;
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
- uint64_t last_sent_head;
+ // Number of messages read so far.
+ uint64_t msgs_read;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -91,10 +89,10 @@ namespace zmq
void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
- // Checks whether message with specified size can be written to the
- // pipe. If writing the message would cause high watermark to be
+ // Checks whether a message can be written to the pipe.
+ // If writing the message would cause high watermark to be
// exceeded, the function returns false.
- bool check_write (uint64_t size_);
+ bool check_write ();
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
@@ -111,9 +109,14 @@ namespace zmq
private:
+ void process_reader_info (uint64_t msgs_read_);
+
// Command handlers.
void process_pipe_term ();
+ // Tests whether the pipe is already full.
+ bool pipe_full ();
+
// The underlying pipe.
class pipe_t *pipe;
@@ -124,9 +127,15 @@ namespace zmq
uint64_t hwm;
uint64_t lwm;
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
+ // Last confirmed number of messages read from the pipe.
+ // The actual number can be higher.
+ uint64_t msgs_read;
+
+ // Number of messages we have written so far.
+ uint64_t msgs_written;
+
+ // True iff the last attempt to write a message has failed.
+ bool stalled;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
diff --git a/src/pub.cpp b/src/pub.cpp
index 643e29e..b6802fd 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
+void zmq::pub_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
- if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
+ if (!out_pipes [i]->check_write ()) {
errno = EAGAIN;
return -1;
}
@@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
@@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
- out_pipes [0]->write (msg_);
+ bool written = out_pipes [0]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_);
@@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// Push the message to all destinations.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
diff --git a/src/pub.hpp b/src/pub.hpp
index 26142a4..a85301f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/rep.cpp b/src/rep.cpp
index 4e69fa3..2cd4144 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)
active++;
}
+void zmq::rep_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
// TODO: Implement this once queue limits are in-place. If the reply
// overloads the buffer, connection should be torn down.
- zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_)));
+ zmq_assert (reply_pipe->check_write ());
// Push message to the selected pipe. If requester have disconnected
// in the meantime, drop the reply.
if (reply_pipe) {
- reply_pipe->write (msg_);
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (written);
reply_pipe->flush ();
}
else {
diff --git a/src/rep.hpp b/src/rep.hpp
index 7ead321..9d2357d 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/req.cpp b/src/req.cpp
index 5c067b3..f21613a 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
reply_pipe_active = true;
}
+void zmq::req_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::req_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
current = 0;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_)));
-
// Push message to the selected pipe.
- out_pipes [current]->write (msg_);
+ bool written = out_pipes [current]->write (msg_);
+ zmq_assert (written);
out_pipes [current]->flush ();
waiting_for_reply = true;
diff --git a/src/req.hpp b/src/req.hpp
index da8e61a..4058b08 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/session.cpp b/src/session.cpp
index e1d0b8e..b99a370 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -164,6 +164,13 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
+void zmq::session_t::revive (writer_t *pipe_)
+{
+ zmq_assert (out_pipe == pipe_);
+ if (engine)
+ engine->resume_input ();
+}
+
void zmq::session_t::process_plug ()
{
}
diff --git a/src/session.hpp b/src/session.hpp
index 872748c..25a0d12 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -57,6 +57,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
+ void revive (class writer_t *pipe_);
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 50b4152..fdb2d12 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -552,6 +552,11 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
xrevive (pipe_);
}
+void zmq::socket_base_t::revive (writer_t *pipe_)
+{
+ xrevive (pipe_);
+}
+
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 31c241b..bb40ae6 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -93,6 +93,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
+ void revive (class writer_t *pipe_);
protected:
@@ -106,6 +107,7 @@ namespace zmq
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
virtual void xrevive (class reader_t *pipe_) = 0;
+ virtual void xrevive (class writer_t *pipe_) = 0;
// Actual algorithms are to be defined by individual socket types.
virtual int xsetsockopt (int option_, const void *optval_,
diff --git a/src/sub.cpp b/src/sub.cpp
index fb00bfb..4169ea5 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -67,6 +67,11 @@ void zmq::sub_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::sub_t::xrevive (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/sub.hpp b/src/sub.hpp
index 670aa79..c319565 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -45,6 +45,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 7ff1157..8163c18 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -62,6 +62,11 @@ void zmq::upstream_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::upstream_t::xrevive (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/upstream.hpp b/src/upstream.hpp
index d1ee7b1..d9fb385 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index df74302..fae376b 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -81,6 +81,11 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::xrep_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -111,7 +116,8 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push message to the selected pipe.
- it->second->write (msg_);
+ bool written = it->second->write (msg_);
+ zmq_assert (written);
it->second->flush ();
// Detach the message from the data buffer.
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 4534463..f2cdb2b 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -43,6 +43,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 12c3dd6..72ce3a2 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -63,6 +63,11 @@ void zmq::xreq_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::xreq_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -97,4 +102,3 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out ();
}
-
diff --git a/src/xreq.hpp b/src/xreq.hpp
index e23e832..3d3f573 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -41,6 +41,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 623ca63..8e0392c 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -102,19 +102,19 @@ void zmq::zmq_engine_t::in_event ()
// Push the data to the decoder.
size_t processed = decoder.process_buffer (inpos, insize);
- // Adjust the buffer.
- inpos += processed;
- insize -= processed;
-
// Stop polling for input if we got stuck.
if (processed < insize) {
- // This may happen if queue limits are implemented or when
+ // This may happen if queue limits are in effect or when
// init object reads all required information from the socket
// and rejects to read more data.
reset_pollin (handle);
}
+ // Adjust the buffer.
+ inpos += processed;
+ insize -= processed;
+
// Flush all messages the decoder may have produced.
inout->flush ();
@@ -162,6 +162,13 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
+void zmq::zmq_engine_t::resume_input ()
+{
+ set_pollin (handle);
+
+ in_event ();
+}
+
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{
decoder.add_prefix (identity_);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index dc90a98..c4ef756 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();