summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h5
-rw-r--r--src/dist.cpp6
-rw-r--r--src/encoder.cpp5
-rw-r--r--src/fq.cpp3
-rw-r--r--src/lb.cpp6
-rw-r--r--src/msg.hpp7
-rw-r--r--src/pipe.cpp7
-rw-r--r--src/rep.cpp19
-rw-r--r--src/req.cpp46
-rw-r--r--src/req.hpp7
-rw-r--r--src/session_base.cpp4
-rw-r--r--src/socket_base.cpp33
-rw-r--r--src/socket_base.hpp7
-rw-r--r--src/xpub.cpp4
-rw-r--r--src/xrep.cpp11
-rw-r--r--src/xsub.cpp10
-rw-r--r--tests/Makefile.am2
-rw-r--r--tests/test_invalid_rep.cpp13
-rw-r--r--tests/test_reqrep_device.cpp31
-rw-r--r--tests/test_reqrep_drop.cpp144
20 files changed, 75 insertions, 295 deletions
diff --git a/include/zmq.h b/include/zmq.h
index bb80d86..55bb793 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -189,15 +190,11 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_MULTICAST_HOPS 25
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
-#define ZMQ_RCVLABEL 29
-#define ZMQ_RCVCMD 30
#define ZMQ_IPV4ONLY 31
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
-#define ZMQ_SNDLABEL 4
-#define ZMQ_SNDCMD 8
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
diff --git a/src/dist.cpp b/src/dist.cpp
index 59e6c08..d4be65b 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -112,8 +113,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -182,7 +182,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/encoder.cpp b/src/encoder.cpp
index a20623f..94af598 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -90,14 +91,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/fq.cpp b/src/fq.cpp
index 6dd7009..ed1947c 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -92,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (pipe_)
*pipe_ = pipes [current];
more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ msg_->flags () & msg_t::more ? true : false;
if (!more) {
current++;
if (current >= active)
diff --git a/src/lb.cpp b/src/lb.cpp
index bcef48b..e100a9e 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -76,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
if (!more)
dropping = false;
@@ -89,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
break;
}
diff --git a/src/msg.hpp b/src/msg.hpp
index c0dedd5..bc25598 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -48,10 +49,8 @@ namespace zmq
// Mesage flags.
enum
{
- label = 1,
- command = 2,
- shared = 64,
- more = 128
+ more = 1,
+ shared = 128
};
bool check ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index cbf7bf5..6dcc01a 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -137,7 +138,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -166,7 +167,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
outpipe->write (*msg_, more);
if (!more)
msgs_written++;
@@ -180,7 +181,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
+ zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/rep.cpp b/src/rep.cpp
index de99c8a..02a825c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -43,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -72,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- if (!(msg_->flags () & msg_t::label))
- break;
+ zmq_assert (msg_->flags () & msg_t::more);
+ bool bottom = (msg_->size () == 0);
rc = xrep_t::xsend (msg_, flags_);
errno_assert (rc == 0);
+ if (bottom)
+ break;
}
request_begins = false;
}
- else {
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
- }
- zmq_assert (!(msg_->flags () & msg_t::label));
+
+ // Get next message part to return to the user.
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
// If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags () & msg_t::more)) {
diff --git a/src/req.cpp b/src/req.cpp
index 9114daf..9694d2d 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -29,8 +30,7 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true),
- request_id (generate_random ())
+ message_begins (true)
{
options.type = ZMQ_REQ;
}
@@ -50,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if (message_begins) {
- msg_t prefix;
- int rc = prefix.init_size (4);
+ msg_t bottom;
+ int rc = bottom.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::label);
- unsigned char *data = (unsigned char*) prefix.data ();
- put_uint32 (data, request_id);
- rc = xreq_t::xsend (&prefix, flags_);
+ bottom.set_flags (msg_t::more);
+ rc = xreq_t::xsend (&bottom, 0);
if (rc != 0)
- return rc;
+ return -1;
message_begins = false;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -92,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// TODO: This should also close the connection with the peer!
- if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
- break;
- }
- msg_->close ();
- msg_->init ();
- errno = EAGAIN;
- return -1;
- }
-
- unsigned char *data = (unsigned char*) msg_->data ();
- if (unlikely (get_uint32 (data) != request_id)) {
- while (true) {
- int rc = xreq_t::xrecv (msg_, flags_);
- errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
@@ -118,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
message_begins = false;
}
@@ -126,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
- request_id++;
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
@@ -167,8 +151,8 @@ zmq::req_session_t::~req_session_t ()
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == request_id) {
- if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ if (state == bottom) {
+ if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
@@ -177,7 +161,7 @@ int zmq::req_session_t::write (msg_t *msg_)
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
- state = request_id;
+ state = bottom;
return xreq_session_t::write (msg_);
}
}
diff --git a/src/req.hpp b/src/req.hpp
index d99b32a..78acbaf 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -51,10 +52,6 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
- // Request ID. Request numbers gradually increase (and wrap over)
- // so that we don't have to generate random ID for each request.
- uint32_t request_id;
-
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
@@ -74,7 +71,7 @@ namespace zmq
private:
enum {
- request_id,
+ bottom,
body
} state;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index d1d31c9..591b29e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -149,9 +150,8 @@ int zmq::session_base_t::read (msg_t *msg_)
errno = EAGAIN;
return -1;
}
+ incomplete_in = msg_->flags () & msg_t::more ? true : false;
- incomplete_in =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
return 0;
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e990ba1..967d314 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -120,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
- rcvlabel (false),
- rcvcmd (false),
rcvmore (false)
{
}
@@ -252,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
- if (option_ == ZMQ_RCVLABEL) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvlabel ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
- if (option_ == ZMQ_RCVCMD) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvcmd ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -496,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return -1;
// At this point we impose the flags on the message.
- if (flags_ & ZMQ_SNDLABEL)
- msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
- if (flags_ & ZMQ_SNDCMD)
- msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -870,13 +845,7 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
- rcvcmd = msg_->flags () & msg_t::command ? true : false;
- if (rcvcmd)
- msg_->reset_flags (msg_t::command);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 901aa9e..37effa7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -184,12 +185,6 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // True if the last message received had LABEL flag set.
- bool rcvlabel;
-
- // True if the last message received had COMMAND flag set.
- bool rcvcmd;
-
// True if the last message received had MORE flag set.
bool rcvmore;
diff --git a/src/xpub.cpp b/src/xpub.cpp
index de55cec..dfc334a 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -101,8 +102,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 61e703b..1b0f44d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
@@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false;
return 0;
}
@@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
rc = msg_->init_size (4);
errno_assert (rc == 0);
put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
+ msg_->set_flags (msg_t::more);
return 0;
}
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 58c6951..debcac8 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -117,7 +118,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -137,14 +138,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & (msg_t::more | msg_t::label)) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -184,7 +184,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & (msg_t::more | msg_t::label)) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 6ed3762..5f0cfc1 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -7,7 +7,6 @@ noinst_PROGRAMS = test_pair_inproc \
test_reqrep_tcp \
test_hwm \
test_reqrep_device \
- test_reqrep_drop \
test_sub_forward \
test_invalid_rep
@@ -24,7 +23,6 @@ test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp
test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp
-test_reqrep_drop_SOURCES = test_reqrep_drop.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp
index dc902c2..f158b05 100644
--- a/tests/test_invalid_rep.cpp
+++ b/tests/test_invalid_rep.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -47,12 +48,12 @@ int main (int argc, char *argv [])
// Receive the request.
char addr [4];
- char seqn [4];
+ char bottom [1];
char body [1];
rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
assert (rc == 4);
- rc = zmq_recv (xrep_socket, seqn, sizeof (seqn), 0);
- assert (rc == 4);
+ rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0);
+ assert (rc == 0);
rc = zmq_recv (xrep_socket, body, sizeof (body), 0);
assert (rc == 1);
@@ -61,10 +62,10 @@ int main (int argc, char *argv [])
assert (rc == 4);
// Send valid reply.
- rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDLABEL);
- assert (rc == 4);
- rc = zmq_send (xrep_socket, seqn, 4, ZMQ_SNDLABEL);
+ rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDMORE);
assert (rc == 4);
+ rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE);
+ assert (rc == 0);
rc = zmq_send (xrep_socket, "b", 1, 0);
assert (rc == 1);
diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp
index a451956..4ee7cf2 100644
--- a/tests/test_reqrep_device.cpp
+++ b/tests/test_reqrep_device.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -64,15 +65,11 @@ int main (int argc, char *argv [])
assert (rc == 0);
rc = zmq_recvmsg (xrep, &msg, 0);
assert (rc >= 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
int rcvmore;
+ size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
- rc = zmq_sendmsg (xreq, &msg,
- (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+ rc = zmq_sendmsg (xreq, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
@@ -81,21 +78,14 @@ int main (int argc, char *argv [])
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "ABC", 3) == 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
int rcvmore;
+ size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "DEF", 3) == 0);
- rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
@@ -113,15 +103,10 @@ int main (int argc, char *argv [])
assert (rc == 0);
rc = zmq_recvmsg (xreq, &msg, 0);
assert (rc >= 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
int rcvmore;
rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
- rc = zmq_sendmsg (xrep, &msg,
- (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+ rc = zmq_sendmsg (xrep, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
@@ -129,18 +114,12 @@ int main (int argc, char *argv [])
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "GHI", 3) == 0);
- rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "JKL", 3) == 0);
- rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
diff --git a/tests/test_reqrep_drop.cpp b/tests/test_reqrep_drop.cpp
deleted file mode 100644
index 2829f5f..0000000
--- a/tests/test_reqrep_drop.cpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- Copyright (c) 2009-2011 250bpm s.r.o.
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <assert.h>
-
-#include "../include/zmq.h"
-#include "../include/zmq_utils.h"
-
-int main (int argc, char *argv [])
-{
- void *ctx = zmq_init (1);
- assert (ctx);
-
- // Check whether requests are discarded because of disconnected requester.
-
- // Create a server.
- void *xrep = zmq_socket (ctx, ZMQ_XREP);
- assert (xrep);
- int rc = zmq_bind (xrep, "tcp://127.0.0.1:5560");
- assert (rc == 0);
-
- // Create a client.
- void *xreq = zmq_socket (ctx, ZMQ_XREQ);
- assert (xreq);
- rc = zmq_connect (xreq, "tcp://127.0.0.1:5560");
- assert (rc == 0);
-
- // Send requests.
- rc = zmq_send (xreq, "ABC", 3, 0);
- assert (rc == 3);
- rc = zmq_send (xreq, "DEF", 3, 0);
- assert (rc == 3);
-
- // Disconnect client.
- rc = zmq_close (xreq);
- assert (rc == 0);
-
- // Wait a while for disconnect to happen.
- zmq_sleep (1);
-
- // Try to receive a request -- it should have been discarded.
- char buff [3];
- rc = zmq_recv (xrep, buff, 3, ZMQ_DONTWAIT);
- assert (rc < 0);
- assert (errno == EAGAIN);
-
- // Clean up.
- rc = zmq_close (xrep);
- assert (rc == 0);
-
- // New test. Check whether reply is dropped because of HWM overflow.
-
- int one = 1;
- xreq = zmq_socket (ctx, ZMQ_XREQ);
- assert (xreq);
- rc = zmq_setsockopt (xreq, ZMQ_RCVHWM, &one, sizeof(one));
- assert (rc == 0);
- rc = zmq_bind (xreq, "inproc://a");
- assert (rc == 0);
-
- void *rep = zmq_socket (ctx, ZMQ_REP);
- assert (rep);
- rc = zmq_setsockopt (rep, ZMQ_SNDHWM, &one, sizeof(one));
- assert (rc == 0);
- rc = zmq_connect (rep, "inproc://a");
- assert (rc == 0);
-
- // Send request 1
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Send request 2
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 1
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send request 3
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 1
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 2
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 2
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 3
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 3
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive reply 1
- rc = zmq_recv (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Receive reply 2
- rc = zmq_recv (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Try to receive reply 3, it should have been dropped.
- rc = zmq_recv (xreq, buff, 1, ZMQ_DONTWAIT);
- assert (rc == -1 && errno == EAGAIN);
-
- // Clean up.
- rc = zmq_close (xreq);
- assert (rc == 0);
- rc = zmq_close (rep);
- assert (rc == 0);
-
- rc = zmq_term (ctx);
- assert (rc == 0);
-
- return 0 ;
-}