diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-04-11 10:26:47 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-04-11 10:26:47 +0200 |
commit | 00cf3ceb8da8cb58b343cb75798a042588f09752 (patch) | |
tree | a4f9909842d45f5467b42fe1400206c3d996ceaa | |
parent | 6fea42258348c8489d2cd64ca0e92981148134f8 (diff) |
multi-part message functionality available via ZMQ_SNDMORE and ZMQ_RCVMORE
-rw-r--r-- | include/zmq.h | 3 | ||||
-rw-r--r-- | src/socket_base.cpp | 27 | ||||
-rw-r--r-- | src/socket_base.hpp | 3 |
3 files changed, 28 insertions, 5 deletions
diff --git a/include/zmq.h b/include/zmq.h index e2b5436..55e7cd7 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -179,8 +179,11 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_MCAST_LOOP 10 #define ZMQ_SNDBUF 11 #define ZMQ_RCVBUF 12 +#define ZMQ_RCVMORE 13 #define ZMQ_NOBLOCK 1 +#define ZMQ_SNDMORE 2 +// Obsolete: #define ZMQ_MORE 2 ZMQ_EXPORT void *zmq_socket (void *context, int type); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ddf2470..3b74359 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -42,6 +42,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), pending_term_acks (0), ticks (0), + rcvmore (false), app_thread (parent_), shutting_down (false), sent_seqnum (0), @@ -70,7 +71,16 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - // At the moment there are no socket-type-specific overloads of getsockopt. + if (option_ == ZMQ_RCVMORE) { + if (*optvallen_ < sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + *((int64_t*) optval_) = rcvmore ? 1 : 0; + *optvallen_ = sizeof (int64_t); + return 0; + } + return options.getsockopt (option_, optval_, optvallen_); } @@ -318,9 +328,8 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { - // ZMQ_MORE is actually a message flag, not a real send-flag - // such as ZMQ_NOBLOCK. At this point we impose it on the message. - if (flags_ & ZMQ_MORE) + // At this point we impose the MORE flag on the message. + if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; // Process pending commands, if any. @@ -367,8 +376,12 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) } // If we have the message, return immediately. - if (rc == 0) + if (rc == 0) { + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; return 0; + } // If we don't have the message, restore the original cause of the problem. errno = err; @@ -393,6 +406,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); ticks = 0; } + + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; return 0; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1b70299..3d95cec 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -141,6 +141,9 @@ namespace zmq // Number of messages received since last command processing. int ticks; + // If true there's a half-read message in the socket. + bool rcvmore; + // Application thread the socket lives in. class app_thread_t *app_thread; |