diff options
40 files changed, 242 insertions, 77 deletions
diff --git a/src/command.hpp b/src/command.hpp index 150cad1..3d00cd7 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,6 +40,7 @@ namespace zmq attach, bind, revive, + reader_info, pipe_term, pipe_term_ack, term_req, @@ -84,6 +85,13 @@ namespace zmq struct { } revive; + // Sent by pipe reader to inform pipe writer + // about how many messages it has read so far. + // Used to implement the flow control. + struct { + uint64_t msgs_read; + } reader_info; + // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. struct { diff --git a/src/downstream.cpp b/src/downstream.cpp index 7ff88f1..feeb8c3 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_) zmq_assert (false); } +void zmq::downstream_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out () return lb.has_out (); } - diff --git a/src/downstream.hpp b/src/downstream.hpp index dbd79a5..998ab73 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -40,6 +40,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/err.hpp b/src/err.hpp index 6c13b02..2b76569 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -115,3 +115,9 @@ namespace zmq } while (false) #endif + +#define zmq_not_implemented() \ + do {\ + fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\ + abort ();\ + } while (false) diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index ddab6a4..0d14224 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -35,6 +35,7 @@ namespace zmq virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void kill (class reader_t *pipe_) = 0; virtual void revive (class reader_t *pipe_) = 0; + virtual void revive (class writer_t *pipe_) = 0; }; } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 81b56df..bb5f391 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,6 +41,8 @@ namespace zmq // are messages to send available. virtual void revive () = 0; + virtual void resume_input () = 0; + // Engine should add the prefix supplied to all inbound messages. virtual void add_prefix (const blob_t &identity_) = 0; @@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_) pipes.erase (pipe_); } -void zmq::lb_t::kill (writer_t *pipe_) -{ - // Move the pipe to the list of inactive pipes. - active--; - if (current == active) - current = 0; - pipes.swap (pipes.index (pipe_), active); -} - void zmq::lb_t::revive (writer_t *pipe_) { // Move the pipe to the list of active pipes. @@ -72,42 +63,46 @@ void zmq::lb_t::revive (writer_t *pipe_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) { + while (active > 0) { + if (pipes [current]->write (msg_)) + break; + + active--; + if (current < active) + pipes.swap (current, active); + else + current = 0; + } + // If there are no pipes we cannot send the message. - if (pipes.empty ()) { + if (active == 0) { errno = EAGAIN; return -1; } - // TODO: Implement this once queue limits are in-place. - zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); - - // Push message to the selected pipe. - pipes [current]->write (msg_); - pipes [current]->flush (); + if (!(flags_ & ZMQ_NOFLUSH)) + pipes [current]->flush (); // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); // Move to the next pipe (load-balancing). - current++; - if (current >= active) - current = 0; + current = (current + 1) % active; return 0; } bool zmq::lb_t::has_out () { - for (int count = active; count != 0; count--) { - - // We should be able to write at least 1-byte message to interrupt - // polling for POLLOUT. - // TODO: Shouldn't we use a saner value here? - if (pipes [current]->check_write (1)) + while (active > 0) { + if (pipes [current]->check_write ()) return true; - current++; - if (current >= active) + + active--; + if (current < active) + pipes.swap (current, active); + else current = 0; } @@ -36,7 +36,6 @@ namespace zmq void attach (class writer_t *pipe_); void detach (class writer_t *pipe_); - void kill (class writer_t *pipe_); void revive (class writer_t *pipe_); int send (zmq_msg_t *msg_, int flags_); bool has_out (); diff --git a/src/object.cpp b/src/object.cpp index 356fcd1..5821c89 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; + case command_t::reader_info: + process_reader_info (cmd_.args.reader_info.msgs_read); + break; + case command_t::pipe_term: process_pipe_term (); return; @@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_) send_command (cmd); } +void zmq::object_t::send_reader_info (writer_t *destination_, + uint64_t msgs_read_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::reader_info; + cmd.args.reader_info.msgs_read = msgs_read_; + send_command (cmd); +} + void zmq::object_t::send_pipe_term (writer_t *destination_) { command_t cmd; @@ -323,6 +337,11 @@ void zmq::object_t::process_revive () zmq_assert (false); } +void zmq::object_t::process_reader_info (uint64_t msgs_read_) +{ + zmq_assert (false); +} + void zmq::object_t::process_pipe_term () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 1544109..f29342e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -71,6 +71,8 @@ namespace zmq class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); + void send_reader_info (class writer_t *destination_, + uint64_t msgs_read_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); void send_term_req (class socket_base_t *destination_, @@ -88,6 +90,7 @@ namespace zmq virtual void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); + virtual void process_reader_info (uint64_t msgs_read_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class owned_t *object_); diff --git a/src/options.cpp b/src/options.cpp index c0d5339..a713ede 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, switch (option_) { case ZMQ_HWM: - if (optvallen_ != sizeof (int64_t)) { + if (optvallen_ != sizeof (uint64_t)) { errno = EINVAL; return -1; } - hwm = *((int64_t*) optval_); + hwm = *((uint64_t*) optval_); return 0; case ZMQ_LWM: - if (optvallen_ != sizeof (int64_t)) { + if (optvallen_ != sizeof (uint64_t)) { errno = EINVAL; return -1; } - lwm = *((int64_t*) optval_); + lwm = *((uint64_t*) optval_); return 0; case ZMQ_SWAP: diff --git a/src/options.hpp b/src/options.hpp index 6d9be4d..eba8ab8 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -33,8 +33,8 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); - int64_t hwm; - int64_t lwm; + uint64_t hwm; + uint64_t lwm; int64_t swap; uint64_t affinity; blob_t identity; diff --git a/src/p2p.cpp b/src/p2p.cpp index 334cfcc..728854b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_) alive = true; } +void zmq::p2p_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - // TODO: Implement this once queue limits are in-place. - zmq_assert (outpipe->check_write (zmq_msg_size (msg_))); - - outpipe->write (msg_); + bool written = outpipe->write (msg_); + zmq_assert (written); if (!(flags_ & ZMQ_NOFLUSH)) outpipe->flush (); diff --git a/src/p2p.hpp b/src/p2p.hpp index bca0eab..e12b58c 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -39,6 +39,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index e708229..5480030 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } +void zmq::pgm_receiver_t::resume_input () +{ + zmq_not_implemented (); +} + void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3f0ef81..dd9402f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void resume_input (); void add_prefix (const blob_t &identity_); void trim_prefix (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 27b4d0c..01eac2b 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive () out_event (); } +void zmq::pgm_sender_t::resume_input () +{ + zmq_assert (false); +} + void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 951c417..4b232b1 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,6 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void resume_input (); void add_prefix (const blob_t &identity_); void trim_prefix (); diff --git a/src/pipe.cpp b/src/pipe.cpp index da019c1..53dfb21 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_, peer (NULL), hwm (hwm_), lwm (lwm_), + msgs_read (0), endpoint (NULL) { + // Adjust lwm and hwm. + if (lwm == 0 || lwm > hwm) + lwm = hwm; } zmq::reader_t::~reader_t () @@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) return false; } - // TODO: Adjust the size of the pipe. + msgs_read++; + if (lwm > 0 && msgs_read % lwm == 0) + send_reader_info (peer, msgs_read); return true; } @@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_, peer (NULL), hwm (hwm_), lwm (lwm_), + msgs_read (0), + msgs_written (0), + stalled (false), endpoint (NULL) { + // Adjust lwm and hwm. + if (lwm == 0 || lwm > hwm) + lwm = hwm; } void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_) peer = &pipe->reader; } -bool zmq::writer_t::check_write (uint64_t size_) +bool zmq::writer_t::check_write () { - // TODO: Check whether hwm is exceeded. + if (pipe_full ()) { + stalled = true; + return false; + } return true; } bool zmq::writer_t::write (zmq_msg_t *msg_) { + if (pipe_full ()) { + stalled = true; + return false; + } + pipe->write (*msg_); + msgs_written++; return true; - - // TODO: Adjust size of the pipe. } void zmq::writer_t::rollback () { - while (true) { - zmq_msg_t msg; - if (!pipe->unwrite (&msg)) - break; + zmq_msg_t msg; + + while (pipe->unwrite (&msg)) { zmq_msg_close (&msg); + msgs_written--; } - // TODO: We don't have to inform the reader side of the pipe about - // the event. We'll simply adjust the pipe size and keep calm. + if (stalled && endpoint != NULL && !pipe_full()) { + stalled = false; + endpoint->revive (this); + } } void zmq::writer_t::flush () @@ -179,6 +200,15 @@ void zmq::writer_t::term () pipe->flush (); } +void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +{ + msgs_read = msgs_read_; + if (stalled && endpoint != NULL) { + stalled = false; + endpoint->revive (this); + } +} + void zmq::writer_t::process_pipe_term () { if (endpoint) @@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term () send_pipe_term_ack (p); } +bool zmq::writer_t::pipe_full () +{ + return hwm > 0 && msgs_written - msgs_read == hwm; +} + zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_) : reader (reader_parent_, hwm_, lwm_), diff --git a/src/pipe.hpp b/src/pipe.hpp index df3d0b1..0ac7fc5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -68,10 +68,8 @@ namespace zmq uint64_t hwm; uint64_t lwm; - // Positions of head and tail of the pipe (in bytes). - uint64_t head; - uint64_t tail; - uint64_t last_sent_head; + // Number of messages read so far. + uint64_t msgs_read; // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; @@ -91,10 +89,10 @@ namespace zmq void set_pipe (class pipe_t *pipe_); void set_endpoint (i_endpoint *endpoint_); - // Checks whether message with specified size can be written to the - // pipe. If writing the message would cause high watermark to be + // Checks whether a message can be written to the pipe. + // If writing the message would cause high watermark to be // exceeded, the function returns false. - bool check_write (uint64_t size_); + bool check_write (); // Writes a message to the underlying pipe. Returns false if the // message cannot be written because high watermark was reached. @@ -111,9 +109,14 @@ namespace zmq private: + void process_reader_info (uint64_t msgs_read_); + // Command handlers. void process_pipe_term (); + // Tests whether the pipe is already full. + bool pipe_full (); + // The underlying pipe. class pipe_t *pipe; @@ -124,9 +127,15 @@ namespace zmq uint64_t hwm; uint64_t lwm; - // Positions of head and tail of the pipe (in bytes). - uint64_t head; - uint64_t tail; + // Last confirmed number of messages read from the pipe. + // The actual number can be higher. + uint64_t msgs_read; + + // Number of messages we have written so far. + uint64_t msgs_written; + + // True iff the last attempt to write a message has failed. + bool stalled; // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; diff --git a/src/pub.cpp b/src/pub.cpp index 643e29e..b6802fd 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_) zmq_assert (false); } +void zmq::pub_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::pub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // First check whether all pipes are available for writing. for (out_pipes_t::size_type i = 0; i != pipes_count; i++) - if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { + if (!out_pipes [i]->check_write ()) { errno = EAGAIN; return -1; } @@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // For VSMs the copying is straighforward. if (content == (msg_content_t*) ZMQ_VSM) { for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { - out_pipes [i]->write (msg_); + bool written = out_pipes [i]->write (msg_); + zmq_assert (written); if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [i]->flush (); } @@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // to send the message to - no refcount adjustment i.e. no atomic // operations are needed. if (pipes_count == 1) { - out_pipes [0]->write (msg_); + bool written = out_pipes [0]->write (msg_); + zmq_assert (written); if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [0]->flush (); int rc = zmq_msg_init (msg_); @@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // Push the message to all destinations. for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { - out_pipes [i]->write (msg_); + bool written = out_pipes [i]->write (msg_); + zmq_assert (written); if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [i]->flush (); } diff --git a/src/pub.hpp b/src/pub.hpp index 26142a4..a85301f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -40,6 +40,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/rep.cpp b/src/rep.cpp index 4e69fa3..2cd4144 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_) active++; } +void zmq::rep_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::rep_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) // TODO: Implement this once queue limits are in-place. If the reply // overloads the buffer, connection should be torn down. - zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); + zmq_assert (reply_pipe->check_write ()); // Push message to the selected pipe. If requester have disconnected // in the meantime, drop the reply. if (reply_pipe) { - reply_pipe->write (msg_); + bool written = reply_pipe->write (msg_); + zmq_assert (written); reply_pipe->flush (); } else { diff --git a/src/rep.hpp b/src/rep.hpp index 7ead321..9d2357d 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -40,6 +40,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/req.cpp b/src/req.cpp index 5c067b3..f21613a 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_) reply_pipe_active = true; } +void zmq::req_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) current = 0; } - // TODO: Implement this once queue limits are in-place. - zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_))); - // Push message to the selected pipe. - out_pipes [current]->write (msg_); + bool written = out_pipes [current]->write (msg_); + zmq_assert (written); out_pipes [current]->flush (); waiting_for_reply = true; diff --git a/src/req.hpp b/src/req.hpp index da8e61a..4058b08 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -40,6 +40,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/session.cpp b/src/session.cpp index e1d0b8e..b99a370 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -164,6 +164,13 @@ void zmq::session_t::revive (reader_t *pipe_) engine->revive (); } +void zmq::session_t::revive (writer_t *pipe_) +{ + zmq_assert (out_pipe == pipe_); + if (engine) + engine->resume_input (); +} + void zmq::session_t::process_plug () { } diff --git a/src/session.hpp b/src/session.hpp index 872748c..25a0d12 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -57,6 +57,7 @@ namespace zmq void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); void revive (class reader_t *pipe_); + void revive (class writer_t *pipe_); private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 50b4152..fdb2d12 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -552,6 +552,11 @@ void zmq::socket_base_t::revive (reader_t *pipe_) xrevive (pipe_); } +void zmq::socket_base_t::revive (writer_t *pipe_) +{ + xrevive (pipe_); +} + void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 31c241b..bb40ae6 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -93,6 +93,7 @@ namespace zmq void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); void revive (class reader_t *pipe_); + void revive (class writer_t *pipe_); protected: @@ -106,6 +107,7 @@ namespace zmq virtual void xdetach_outpipe (class writer_t *pipe_) = 0; virtual void xkill (class reader_t *pipe_) = 0; virtual void xrevive (class reader_t *pipe_) = 0; + virtual void xrevive (class writer_t *pipe_) = 0; // Actual algorithms are to be defined by individual socket types. virtual int xsetsockopt (int option_, const void *optval_, diff --git a/src/sub.cpp b/src/sub.cpp index fb00bfb..4169ea5 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -67,6 +67,11 @@ void zmq::sub_t::xrevive (class reader_t *pipe_) fq.revive (pipe_); } +void zmq::sub_t::xrevive (class writer_t *pipe_) +{ + zmq_assert (false); +} + int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/sub.hpp b/src/sub.hpp index 670aa79..c319565 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -45,6 +45,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/upstream.cpp b/src/upstream.cpp index 7ff1157..8163c18 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -62,6 +62,11 @@ void zmq::upstream_t::xrevive (class reader_t *pipe_) fq.revive (pipe_); } +void zmq::upstream_t::xrevive (class writer_t *pipe_) +{ + zmq_assert (false); +} + int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/upstream.hpp b/src/upstream.hpp index d1ee7b1..d9fb385 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -40,6 +40,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/xrep.cpp b/src/xrep.cpp index df74302..fae376b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -81,6 +81,11 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_) fq.revive (pipe_); } +void zmq::xrep_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -111,7 +116,8 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) } // Push message to the selected pipe. - it->second->write (msg_); + bool written = it->second->write (msg_); + zmq_assert (written); it->second->flush (); // Detach the message from the data buffer. diff --git a/src/xrep.hpp b/src/xrep.hpp index 4534463..f2cdb2b 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -43,6 +43,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/xreq.cpp b/src/xreq.cpp index 12c3dd6..72ce3a2 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -63,6 +63,11 @@ void zmq::xreq_t::xrevive (class reader_t *pipe_) fq.revive (pipe_); } +void zmq::xreq_t::xrevive (class writer_t *pipe_) +{ + zmq_not_implemented (); +} + int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { @@ -97,4 +102,3 @@ bool zmq::xreq_t::xhas_out () return lb.has_out (); } - diff --git a/src/xreq.hpp b/src/xreq.hpp index e23e832..3d3f573 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -41,6 +41,7 @@ namespace zmq void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); int xflush (); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 623ca63..8e0392c 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -102,19 +102,19 @@ void zmq::zmq_engine_t::in_event () // Push the data to the decoder. size_t processed = decoder.process_buffer (inpos, insize); - // Adjust the buffer. - inpos += processed; - insize -= processed; - // Stop polling for input if we got stuck. if (processed < insize) { - // This may happen if queue limits are implemented or when + // This may happen if queue limits are in effect or when // init object reads all required information from the socket // and rejects to read more data. reset_pollin (handle); } + // Adjust the buffer. + inpos += processed; + insize -= processed; + // Flush all messages the decoder may have produced. inout->flush (); @@ -162,6 +162,13 @@ void zmq::zmq_engine_t::revive () out_event (); } +void zmq::zmq_engine_t::resume_input () +{ + set_pollin (handle); + + in_event (); +} + void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) { decoder.add_prefix (identity_); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index dc90a98..c4ef756 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,6 +47,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void resume_input (); void add_prefix (const blob_t &identity_); void trim_prefix (); |