summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/command.hpp8
-rw-r--r--src/downstream.cpp6
-rw-r--r--src/downstream.hpp1
-rw-r--r--src/err.hpp6
-rw-r--r--src/i_endpoint.hpp1
-rw-r--r--src/i_engine.hpp2
-rw-r--r--src/lb.cpp49
-rw-r--r--src/lb.hpp1
-rw-r--r--src/object.cpp19
-rw-r--r--src/object.hpp3
-rw-r--r--src/options.cpp8
-rw-r--r--src/options.hpp4
-rw-r--r--src/p2p.cpp11
-rw-r--r--src/p2p.hpp1
-rw-r--r--src/pgm_receiver.cpp5
-rw-r--r--src/pgm_receiver.hpp1
-rw-r--r--src/pgm_sender.cpp5
-rw-r--r--src/pgm_sender.hpp1
-rw-r--r--src/pipe.cpp57
-rw-r--r--src/pipe.hpp29
-rw-r--r--src/pub.cpp16
-rw-r--r--src/pub.hpp1
-rw-r--r--src/rep.cpp10
-rw-r--r--src/rep.hpp1
-rw-r--r--src/req.cpp11
-rw-r--r--src/req.hpp1
-rw-r--r--src/session.cpp7
-rw-r--r--src/session.hpp1
-rw-r--r--src/socket_base.cpp5
-rw-r--r--src/socket_base.hpp2
-rw-r--r--src/sub.cpp5
-rw-r--r--src/sub.hpp1
-rw-r--r--src/upstream.cpp5
-rw-r--r--src/upstream.hpp1
-rw-r--r--src/xrep.cpp8
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp6
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/zmq_engine.cpp17
-rw-r--r--src/zmq_engine.hpp1
40 files changed, 242 insertions, 77 deletions
diff --git a/src/command.hpp b/src/command.hpp
index 150cad1..3d00cd7 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -40,6 +40,7 @@ namespace zmq
attach,
bind,
revive,
+ reader_info,
pipe_term,
pipe_term_ack,
term_req,
@@ -84,6 +85,13 @@ namespace zmq
struct {
} revive;
+ // Sent by pipe reader to inform pipe writer
+ // about how many messages it has read so far.
+ // Used to implement the flow control.
+ struct {
+ uint64_t msgs_read;
+ } reader_info;
+
// Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe.
struct {
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 7ff88f1..feeb8c3 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
+void zmq::downstream_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out ()
return lb.has_out ();
}
-
diff --git a/src/downstream.hpp b/src/downstream.hpp
index dbd79a5..998ab73 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/err.hpp b/src/err.hpp
index 6c13b02..2b76569 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -115,3 +115,9 @@ namespace zmq
} while (false)
#endif
+
+#define zmq_not_implemented() \
+ do {\
+ fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\
+ abort ();\
+ } while (false)
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index ddab6a4..0d14224 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -35,6 +35,7 @@ namespace zmq
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
+ virtual void revive (class writer_t *pipe_) = 0;
};
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 81b56df..bb5f391 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -41,6 +41,8 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
+ virtual void resume_input () = 0;
+
// Engine should add the prefix supplied to all inbound messages.
virtual void add_prefix (const blob_t &identity_) = 0;
diff --git a/src/lb.cpp b/src/lb.cpp
index 4743ac6..d7193f1 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_);
}
-void zmq::lb_t::kill (writer_t *pipe_)
-{
- // Move the pipe to the list of inactive pipes.
- active--;
- if (current == active)
- current = 0;
- pipes.swap (pipes.index (pipe_), active);
-}
-
void zmq::lb_t::revive (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
+ while (active > 0) {
+ if (pipes [current]->write (msg_))
+ break;
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
+ current = 0;
+ }
+
// If there are no pipes we cannot send the message.
- if (pipes.empty ()) {
+ if (active == 0) {
errno = EAGAIN;
return -1;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
-
- // Push message to the selected pipe.
- pipes [current]->write (msg_);
- pipes [current]->flush ();
+ if (!(flags_ & ZMQ_NOFLUSH))
+ pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
- current++;
- if (current >= active)
- current = 0;
+ current = (current + 1) % active;
return 0;
}
bool zmq::lb_t::has_out ()
{
- for (int count = active; count != 0; count--) {
-
- // We should be able to write at least 1-byte message to interrupt
- // polling for POLLOUT.
- // TODO: Shouldn't we use a saner value here?
- if (pipes [current]->check_write (1))
+ while (active > 0) {
+ if (pipes [current]->check_write ())
return true;
- current++;
- if (current >= active)
+
+ active--;
+ if (current < active)
+ pipes.swap (current, active);
+ else
current = 0;
}
diff --git a/src/lb.hpp b/src/lb.hpp
index a0998ad..5bddc1e 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -36,7 +36,6 @@ namespace zmq
void attach (class writer_t *pipe_);
void detach (class writer_t *pipe_);
- void kill (class writer_t *pipe_);
void revive (class writer_t *pipe_);
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
diff --git a/src/object.cpp b/src/object.cpp
index 356fcd1..5821c89 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum ();
break;
+ case command_t::reader_info:
+ process_reader_info (cmd_.args.reader_info.msgs_read);
+ break;
+
case command_t::pipe_term:
process_pipe_term ();
return;
@@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_)
send_command (cmd);
}
+void zmq::object_t::send_reader_info (writer_t *destination_,
+ uint64_t msgs_read_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::reader_info;
+ cmd.args.reader_info.msgs_read = msgs_read_;
+ send_command (cmd);
+}
+
void zmq::object_t::send_pipe_term (writer_t *destination_)
{
command_t cmd;
@@ -323,6 +337,11 @@ void zmq::object_t::process_revive ()
zmq_assert (false);
}
+void zmq::object_t::process_reader_info (uint64_t msgs_read_)
+{
+ zmq_assert (false);
+}
+
void zmq::object_t::process_pipe_term ()
{
zmq_assert (false);
diff --git a/src/object.hpp b/src/object.hpp
index 1544109..f29342e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -71,6 +71,8 @@ namespace zmq
class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
+ void send_reader_info (class writer_t *destination_,
+ uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_,
@@ -88,6 +90,7 @@ namespace zmq
virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
+ virtual void process_reader_info (uint64_t msgs_read_);
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_);
diff --git a/src/options.cpp b/src/options.cpp
index c0d5339..a713ede 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
switch (option_) {
case ZMQ_HWM:
- if (optvallen_ != sizeof (int64_t)) {
+ if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
- hwm = *((int64_t*) optval_);
+ hwm = *((uint64_t*) optval_);
return 0;
case ZMQ_LWM:
- if (optvallen_ != sizeof (int64_t)) {
+ if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
- lwm = *((int64_t*) optval_);
+ lwm = *((uint64_t*) optval_);
return 0;
case ZMQ_SWAP:
diff --git a/src/options.hpp b/src/options.hpp
index 6d9be4d..eba8ab8 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -33,8 +33,8 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- int64_t hwm;
- int64_t lwm;
+ uint64_t hwm;
+ uint64_t lwm;
int64_t swap;
uint64_t affinity;
blob_t identity;
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 334cfcc..728854b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)
alive = true;
}
+void zmq::p2p_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (outpipe->check_write (zmq_msg_size (msg_)));
-
- outpipe->write (msg_);
+ bool written = outpipe->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
outpipe->flush ();
diff --git a/src/p2p.hpp b/src/p2p.hpp
index bca0eab..e12b58c 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -39,6 +39,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index e708229..5480030 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
+void zmq::pgm_receiver_t::resume_input ()
+{
+ zmq_not_implemented ();
+}
+
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 3f0ef81..dd9402f 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 27b4d0c..01eac2b 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
+void zmq::pgm_sender_t::resume_input ()
+{
+ zmq_assert (false);
+}
+
void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 951c417..4b232b1 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index da019c1..53dfb21 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
zmq::reader_t::~reader_t ()
@@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return false;
}
- // TODO: Adjust the size of the pipe.
+ msgs_read++;
+ if (lwm > 0 && msgs_read % lwm == 0)
+ send_reader_info (peer, msgs_read);
return true;
}
@@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_,
peer (NULL),
hwm (hwm_),
lwm (lwm_),
+ msgs_read (0),
+ msgs_written (0),
+ stalled (false),
endpoint (NULL)
{
+ // Adjust lwm and hwm.
+ if (lwm == 0 || lwm > hwm)
+ lwm = hwm;
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
peer = &pipe->reader;
}
-bool zmq::writer_t::check_write (uint64_t size_)
+bool zmq::writer_t::check_write ()
{
- // TODO: Check whether hwm is exceeded.
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
return true;
}
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
+ if (pipe_full ()) {
+ stalled = true;
+ return false;
+ }
+
pipe->write (*msg_);
+ msgs_written++;
return true;
-
- // TODO: Adjust size of the pipe.
}
void zmq::writer_t::rollback ()
{
- while (true) {
- zmq_msg_t msg;
- if (!pipe->unwrite (&msg))
- break;
+ zmq_msg_t msg;
+
+ while (pipe->unwrite (&msg)) {
zmq_msg_close (&msg);
+ msgs_written--;
}
- // TODO: We don't have to inform the reader side of the pipe about
- // the event. We'll simply adjust the pipe size and keep calm.
+ if (stalled && endpoint != NULL && !pipe_full()) {
+ stalled = false;
+ endpoint->revive (this);
+ }
}
void zmq::writer_t::flush ()
@@ -179,6 +200,15 @@ void zmq::writer_t::term ()
pipe->flush ();
}
+void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
+{
+ msgs_read = msgs_read_;
+ if (stalled && endpoint != NULL) {
+ stalled = false;
+ endpoint->revive (this);
+ }
+}
+
void zmq::writer_t::process_pipe_term ()
{
if (endpoint)
@@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term ()
send_pipe_term_ack (p);
}
+bool zmq::writer_t::pipe_full ()
+{
+ return hwm > 0 && msgs_written - msgs_read == hwm;
+}
+
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, hwm_, lwm_),
diff --git a/src/pipe.hpp b/src/pipe.hpp
index df3d0b1..0ac7fc5 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -68,10 +68,8 @@ namespace zmq
uint64_t hwm;
uint64_t lwm;
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
- uint64_t last_sent_head;
+ // Number of messages read so far.
+ uint64_t msgs_read;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -91,10 +89,10 @@ namespace zmq
void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
- // Checks whether message with specified size can be written to the
- // pipe. If writing the message would cause high watermark to be
+ // Checks whether a message can be written to the pipe.
+ // If writing the message would cause high watermark to be
// exceeded, the function returns false.
- bool check_write (uint64_t size_);
+ bool check_write ();
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
@@ -111,9 +109,14 @@ namespace zmq
private:
+ void process_reader_info (uint64_t msgs_read_);
+
// Command handlers.
void process_pipe_term ();
+ // Tests whether the pipe is already full.
+ bool pipe_full ();
+
// The underlying pipe.
class pipe_t *pipe;
@@ -124,9 +127,15 @@ namespace zmq
uint64_t hwm;
uint64_t lwm;
- // Positions of head and tail of the pipe (in bytes).
- uint64_t head;
- uint64_t tail;
+ // Last confirmed number of messages read from the pipe.
+ // The actual number can be higher.
+ uint64_t msgs_read;
+
+ // Number of messages we have written so far.
+ uint64_t msgs_written;
+
+ // True iff the last attempt to write a message has failed.
+ bool stalled;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
diff --git a/src/pub.cpp b/src/pub.cpp
index 643e29e..b6802fd 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
+void zmq::pub_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
- if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
+ if (!out_pipes [i]->check_write ()) {
errno = EAGAIN;
return -1;
}
@@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
@@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
- out_pipes [0]->write (msg_);
+ bool written = out_pipes [0]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_);
@@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// Push the message to all destinations.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
diff --git a/src/pub.hpp b/src/pub.hpp
index 26142a4..a85301f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/rep.cpp b/src/rep.cpp
index 4e69fa3..2cd4144 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)
active++;
}
+void zmq::rep_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
// TODO: Implement this once queue limits are in-place. If the reply
// overloads the buffer, connection should be torn down.
- zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_)));
+ zmq_assert (reply_pipe->check_write ());
// Push message to the selected pipe. If requester have disconnected
// in the meantime, drop the reply.
if (reply_pipe) {
- reply_pipe->write (msg_);
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (written);
reply_pipe->flush ();
}
else {
diff --git a/src/rep.hpp b/src/rep.hpp
index 7ead321..9d2357d 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/req.cpp b/src/req.cpp
index 5c067b3..f21613a 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
reply_pipe_active = true;
}
+void zmq::req_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::req_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
current = 0;
}
- // TODO: Implement this once queue limits are in-place.
- zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_)));
-
// Push message to the selected pipe.
- out_pipes [current]->write (msg_);
+ bool written = out_pipes [current]->write (msg_);
+ zmq_assert (written);
out_pipes [current]->flush ();
waiting_for_reply = true;
diff --git a/src/req.hpp b/src/req.hpp
index da8e61a..4058b08 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/session.cpp b/src/session.cpp
index e1d0b8e..b99a370 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -164,6 +164,13 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
+void zmq::session_t::revive (writer_t *pipe_)
+{
+ zmq_assert (out_pipe == pipe_);
+ if (engine)
+ engine->resume_input ();
+}
+
void zmq::session_t::process_plug ()
{
}
diff --git a/src/session.hpp b/src/session.hpp
index 872748c..25a0d12 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -57,6 +57,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
+ void revive (class writer_t *pipe_);
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 50b4152..fdb2d12 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -552,6 +552,11 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
xrevive (pipe_);
}
+void zmq::socket_base_t::revive (writer_t *pipe_)
+{
+ xrevive (pipe_);
+}
+
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 31c241b..bb40ae6 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -93,6 +93,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
+ void revive (class writer_t *pipe_);
protected:
@@ -106,6 +107,7 @@ namespace zmq
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
virtual void xrevive (class reader_t *pipe_) = 0;
+ virtual void xrevive (class writer_t *pipe_) = 0;
// Actual algorithms are to be defined by individual socket types.
virtual int xsetsockopt (int option_, const void *optval_,
diff --git a/src/sub.cpp b/src/sub.cpp
index fb00bfb..4169ea5 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -67,6 +67,11 @@ void zmq::sub_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::sub_t::xrevive (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/sub.hpp b/src/sub.hpp
index 670aa79..c319565 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -45,6 +45,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 7ff1157..8163c18 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -62,6 +62,11 @@ void zmq::upstream_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::upstream_t::xrevive (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/upstream.hpp b/src/upstream.hpp
index d1ee7b1..d9fb385 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index df74302..fae376b 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -81,6 +81,11 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::xrep_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -111,7 +116,8 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push message to the selected pipe.
- it->second->write (msg_);
+ bool written = it->second->write (msg_);
+ zmq_assert (written);
it->second->flush ();
// Detach the message from the data buffer.
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 4534463..f2cdb2b 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -43,6 +43,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 12c3dd6..72ce3a2 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -63,6 +63,11 @@ void zmq::xreq_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_);
}
+void zmq::xreq_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -97,4 +102,3 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out ();
}
-
diff --git a/src/xreq.hpp b/src/xreq.hpp
index e23e832..3d3f573 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -41,6 +41,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
+ void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 623ca63..8e0392c 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -102,19 +102,19 @@ void zmq::zmq_engine_t::in_event ()
// Push the data to the decoder.
size_t processed = decoder.process_buffer (inpos, insize);
- // Adjust the buffer.
- inpos += processed;
- insize -= processed;
-
// Stop polling for input if we got stuck.
if (processed < insize) {
- // This may happen if queue limits are implemented or when
+ // This may happen if queue limits are in effect or when
// init object reads all required information from the socket
// and rejects to read more data.
reset_pollin (handle);
}
+ // Adjust the buffer.
+ inpos += processed;
+ insize -= processed;
+
// Flush all messages the decoder may have produced.
inout->flush ();
@@ -162,6 +162,13 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
+void zmq::zmq_engine_t::resume_input ()
+{
+ set_pollin (handle);
+
+ in_event ();
+}
+
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{
decoder.add_prefix (identity_);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index dc90a98..c4ef756 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();