summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-04-11 10:26:47 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-04-11 10:26:47 +0200
commit00cf3ceb8da8cb58b343cb75798a042588f09752 (patch)
treea4f9909842d45f5467b42fe1400206c3d996ceaa /src
parent6fea42258348c8489d2cd64ca0e92981148134f8 (diff)
multi-part message functionality available via ZMQ_SNDMORE and ZMQ_RCVMORE
Diffstat (limited to 'src')
-rw-r--r--src/socket_base.cpp27
-rw-r--r--src/socket_base.hpp3
2 files changed, 25 insertions, 5 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ddf2470..3b74359 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -42,6 +42,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
ticks (0),
+ rcvmore (false),
app_thread (parent_),
shutting_down (false),
sent_seqnum (0),
@@ -70,7 +71,16 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
- // At the moment there are no socket-type-specific overloads of getsockopt.
+ if (option_ == ZMQ_RCVMORE) {
+ if (*optvallen_ < sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int64_t*) optval_) = rcvmore ? 1 : 0;
+ *optvallen_ = sizeof (int64_t);
+ return 0;
+ }
+
return options.getsockopt (option_, optval_, optvallen_);
}
@@ -318,9 +328,8 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
- // ZMQ_MORE is actually a message flag, not a real send-flag
- // such as ZMQ_NOBLOCK. At this point we impose it on the message.
- if (flags_ & ZMQ_MORE)
+ // At this point we impose the MORE flag on the message.
+ if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE;
// Process pending commands, if any.
@@ -367,8 +376,12 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
}
// If we have the message, return immediately.
- if (rc == 0)
+ if (rc == 0) {
+ rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ if (rcvmore)
+ msg_->flags &= ~ZMQ_MSG_MORE;
return 0;
+ }
// If we don't have the message, restore the original cause of the problem.
errno = err;
@@ -393,6 +406,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
ticks = 0;
}
+
+ rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ if (rcvmore)
+ msg_->flags &= ~ZMQ_MSG_MORE;
return 0;
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1b70299..3d95cec 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -141,6 +141,9 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
+ // If true there's a half-read message in the socket.
+ bool rcvmore;
+
// Application thread the socket lives in.
class app_thread_t *app_thread;