From 00cf3ceb8da8cb58b343cb75798a042588f09752 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 11 Apr 2010 10:26:47 +0200 Subject: multi-part message functionality available via ZMQ_SNDMORE and ZMQ_RCVMORE --- src/socket_base.cpp | 27 ++++++++++++++++++++++----- src/socket_base.hpp | 3 +++ 2 files changed, 25 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ddf2470..3b74359 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -42,6 +42,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), ticks (0), + rcvmore (false), app_thread (parent_), shutting_down (false), sent_seqnum (0), @@ -70,7 +71,16 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - // At the moment there are no socket-type-specific overloads of getsockopt. + if (option_ == ZMQ_RCVMORE) { + if (*optvallen_ < sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + *((int64_t*) optval_) = rcvmore ? 1 : 0; + *optvallen_ = sizeof (int64_t); + return 0; + } + return options.getsockopt (option_, optval_, optvallen_); } @@ -318,9 +328,8 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { - // ZMQ_MORE is actually a message flag, not a real send-flag - // such as ZMQ_NOBLOCK. At this point we impose it on the message. - if (flags_ & ZMQ_MORE) + // At this point we impose the MORE flag on the message. + if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; // Process pending commands, if any. @@ -367,8 +376,12 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) } // If we have the message, return immediately. - if (rc == 0) + if (rc == 0) { + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; return 0; + } // If we don't have the message, restore the original cause of the problem. errno = err; @@ -393,6 +406,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); ticks = 0; } + + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; return 0; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1b70299..3d95cec 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -141,6 +141,9 @@ namespace zmq // Number of messages received since last command processing. int ticks; + // If true there's a half-read message in the socket. + bool rcvmore; + // Application thread the socket lives in. class app_thread_t *app_thread; -- cgit v1.2.3