summaryrefslogtreecommitdiff
path: root/src/pub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pub.cpp')
-rw-r--r--src/pub.cpp16
1 files changed, 12 insertions, 4 deletions
diff --git a/src/pub.cpp b/src/pub.cpp
index 643e29e..b6802fd 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
+void zmq::pub_t::xrevive (class writer_t *pipe_)
+{
+ zmq_not_implemented ();
+}
+
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
- if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
+ if (!out_pipes [i]->check_write ()) {
errno = EAGAIN;
return -1;
}
@@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
@@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
- out_pipes [0]->write (msg_);
+ bool written = out_pipes [0]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_);
@@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// Push the message to all destinations.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
- out_pipes [i]->write (msg_);
+ bool written = out_pipes [i]->write (msg_);
+ zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}