diff options
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 2af2dc2..36dc808 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -20,8 +20,6 @@ #include <new> -#include "../include/zmq.h" - #include "pipe.hpp" #include "likely.hpp" @@ -53,11 +51,12 @@ zmq::reader_t::~reader_t () zmq_assert (pipe); // First delete all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (pipe->read (&msg)) - zmq_msg_close (&msg); + // hand because msg_t doesn't have automatic destructor. + msg_t msg; + while (pipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } delete pipe; } @@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_) sink = sink_; } -bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +bool zmq::reader_t::is_delimiter (msg_t &msg_) { - unsigned char *offset = 0; - - return msg_.content == (void*) (offset + ZMQ_DELIMITER); + return msg_.is_delimiter (); } bool zmq::reader_t::check_read () @@ -89,7 +86,7 @@ bool zmq::reader_t::check_read () // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - zmq_msg_t msg; + msg_t msg; bool ok = pipe->read (&msg); zmq_assert (ok); if (sink) @@ -101,7 +98,7 @@ bool zmq::reader_t::check_read () return true; } -bool zmq::reader_t::read (zmq_msg_t *msg_) +bool zmq::reader_t::read (msg_t *msg_) { if (!active) return false; @@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) } // If delimiter was read, start termination process of the pipe. - unsigned char *offset = 0; - if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (msg_->is_delimiter ()) { if (sink) sink->delimited (this); terminate (); return false; } - if (!(msg_->flags & ZMQ_MSG_MORE)) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) sink = sink_; } -bool zmq::writer_t::check_write (zmq_msg_t *msg_) +bool zmq::writer_t::check_write (msg_t *msg_) { // We've already checked and there's no space free for the new message. // There's no point in checking once again. @@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) return true; } -bool zmq::writer_t::write (zmq_msg_t *msg_) +bool zmq::writer_t::write (msg_t *msg_) { if (unlikely (!check_write (msg_))) return false; - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) + pipe->write (*msg_, msg_->flags () & msg_t::more); + if (!(msg_->flags () & msg_t::more)) msgs_written++; return true; @@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) void zmq::writer_t::rollback () { // Remove incomplete message from the pipe. - zmq_msg_t msg; + msg_t msg; while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags & ZMQ_MSG_MORE); - zmq_msg_close (&msg); + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); } } @@ -246,10 +243,8 @@ void zmq::writer_t::terminate () // Push delimiter into the pipe. Trick the compiler to belive that // the tag is a valid pointer. Note that watermarks are not checked // thus the delimiter can be written even though the pipe is full. - zmq_msg_t msg; - const unsigned char *offset = 0; - msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.flags = 0; + msg_t msg; + msg.init_delimiter (); pipe->write (msg, false); flush (); } |