diff options
-rw-r--r-- | include/zmq.h | 5 | ||||
-rw-r--r-- | src/dist.cpp | 6 | ||||
-rw-r--r-- | src/encoder.cpp | 5 | ||||
-rw-r--r-- | src/fq.cpp | 3 | ||||
-rw-r--r-- | src/lb.cpp | 6 | ||||
-rw-r--r-- | src/msg.hpp | 7 | ||||
-rw-r--r-- | src/pipe.cpp | 7 | ||||
-rw-r--r-- | src/rep.cpp | 19 | ||||
-rw-r--r-- | src/req.cpp | 46 | ||||
-rw-r--r-- | src/req.hpp | 7 | ||||
-rw-r--r-- | src/session_base.cpp | 4 | ||||
-rw-r--r-- | src/socket_base.cpp | 33 | ||||
-rw-r--r-- | src/socket_base.hpp | 7 | ||||
-rw-r--r-- | src/xpub.cpp | 4 | ||||
-rw-r--r-- | src/xrep.cpp | 11 | ||||
-rw-r--r-- | src/xsub.cpp | 10 | ||||
-rw-r--r-- | tests/Makefile.am | 2 | ||||
-rw-r--r-- | tests/test_invalid_rep.cpp | 13 | ||||
-rw-r--r-- | tests/test_reqrep_device.cpp | 31 | ||||
-rw-r--r-- | tests/test_reqrep_drop.cpp | 144 |
20 files changed, 75 insertions, 295 deletions
diff --git a/include/zmq.h b/include/zmq.h index bb80d86..55bb793 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -189,15 +190,11 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_MULTICAST_HOPS 25 #define ZMQ_RCVTIMEO 27 #define ZMQ_SNDTIMEO 28 -#define ZMQ_RCVLABEL 29 -#define ZMQ_RCVCMD 30 #define ZMQ_IPV4ONLY 31 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 -#define ZMQ_SNDLABEL 4 -#define ZMQ_SNDCMD 8 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/dist.cpp b/src/dist.cpp index 59e6c08..d4be65b 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -112,8 +113,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // Push the message to matching pipes. distribute (msg_, flags_); @@ -182,7 +182,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) eligible--; return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } diff --git a/src/encoder.cpp b/src/encoder.cpp index a20623f..94af598 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -90,14 +91,14 @@ bool zmq::encoder_t::message_ready () tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } return true; } @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -92,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) if (pipe_) *pipe_ = pipes [current]; more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + msg_->flags () & msg_t::more ? true : false; if (!more) { current++; if (current >= active) @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -76,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) // switch back to non-dropping mode. if (dropping) { - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; if (!more) dropping = false; @@ -89,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) while (active > 0) { if (pipes [current]->write (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; break; } diff --git a/src/msg.hpp b/src/msg.hpp index c0dedd5..bc25598 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -48,10 +49,8 @@ namespace zmq // Mesage flags. enum { - label = 1, - command = 2, - shared = 64, - more = 128 + more = 1, + shared = 128 }; bool check (); diff --git a/src/pipe.cpp b/src/pipe.cpp index cbf7bf5..6dcc01a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -137,7 +138,7 @@ bool zmq::pipe_t::read (msg_t *msg_) return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -166,7 +167,7 @@ bool zmq::pipe_t::write (msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; outpipe->write (*msg_, more); if (!more) msgs_written++; @@ -180,7 +181,7 @@ void zmq::pipe_t::rollback () msg_t msg; if (outpipe) { while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & (msg_t::more | msg_t::label)); + zmq_assert (msg.flags () & msg_t::more); int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/rep.cpp b/src/rep.cpp index de99c8a..02a825c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -43,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) return -1; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -72,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - if (!(msg_->flags () & msg_t::label)) - break; + zmq_assert (msg_->flags () & msg_t::more); + bool bottom = (msg_->size () == 0); rc = xrep_t::xsend (msg_, flags_); errno_assert (rc == 0); + if (bottom) + break; } request_begins = false; } - else { - int rc = xrep_t::xrecv (msg_, flags_); - if (rc != 0) - return rc; - } - zmq_assert (!(msg_->flags () & msg_t::label)); + + // Get next message part to return to the user. + int rc = xrep_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; // If whole request is read, flip the FSM to reply-sending state. if (!(msg_->flags () & msg_t::more)) { diff --git a/src/req.cpp b/src/req.cpp index 9114daf..9694d2d 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -29,8 +30,7 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), receiving_reply (false), - message_begins (true), - request_id (generate_random ()) + message_begins (true) { options.type = ZMQ_REQ; } @@ -50,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) // First part of the request is the request identity. if (message_begins) { - msg_t prefix; - int rc = prefix.init_size (4); + msg_t bottom; + int rc = bottom.init (); errno_assert (rc == 0); - prefix.set_flags (msg_t::label); - unsigned char *data = (unsigned char*) prefix.data (); - put_uint32 (data, request_id); - rc = xreq_t::xsend (&prefix, flags_); + bottom.set_flags (msg_t::more); + rc = xreq_t::xsend (&bottom, 0); if (rc != 0) - return rc; + return -1; message_begins = false; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -92,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // TODO: This should also close the connection with the peer! - if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) { + if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { while (true) { int rc = xreq_t::xrecv (msg_, flags_); errno_assert (rc == 0); - if (!(msg_->flags () & (msg_t::label | msg_t::more))) - break; - } - msg_->close (); - msg_->init (); - errno = EAGAIN; - return -1; - } - - unsigned char *data = (unsigned char*) msg_->data (); - if (unlikely (get_uint32 (data) != request_id)) { - while (true) { - int rc = xreq_t::xrecv (msg_, flags_); - errno_assert (rc == 0); - if (!(msg_->flags () & (msg_t::label | msg_t::more))) + if (!(msg_->flags () & msg_t::more)) break; } msg_->close (); @@ -118,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) errno = EAGAIN; return -1; } + message_begins = false; } @@ -126,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & (msg_t::more | msg_t::label))) { - request_id++; + if (!(msg_->flags () & msg_t::more)) { receiving_reply = false; message_begins = true; } @@ -167,8 +151,8 @@ zmq::req_session_t::~req_session_t () int zmq::req_session_t::write (msg_t *msg_) { - if (state == request_id) { - if (msg_->flags () == msg_t::label && msg_->size () == 4) { + if (state == bottom) { + if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; return xreq_session_t::write (msg_); } @@ -177,7 +161,7 @@ int zmq::req_session_t::write (msg_t *msg_) if (msg_->flags () == msg_t::more) return xreq_session_t::write (msg_); if (msg_->flags () == 0) { - state = request_id; + state = bottom; return xreq_session_t::write (msg_); } } diff --git a/src/req.hpp b/src/req.hpp index d99b32a..78acbaf 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -51,10 +52,6 @@ namespace zmq // of the message must be empty message part (backtrace stack bottom). bool message_begins; - // Request ID. Request numbers gradually increase (and wrap over) - // so that we don't have to generate random ID for each request. - uint32_t request_id; - req_t (const req_t&); const req_t &operator = (const req_t&); }; @@ -74,7 +71,7 @@ namespace zmq private: enum { - request_id, + bottom, body } state; diff --git a/src/session_base.cpp b/src/session_base.cpp index d1d31c9..591b29e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -149,9 +150,8 @@ int zmq::session_base_t::read (msg_t *msg_) errno = EAGAIN; return -1; } + incomplete_in = msg_->flags () & msg_t::more ? true : false; - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; return 0; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e990ba1..967d314 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -120,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : destroyed (false), last_tsc (0), ticks (0), - rcvlabel (false), - rcvcmd (false), rcvmore (false) { } @@ -252,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return -1; } - if (option_ == ZMQ_RCVLABEL) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvlabel ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - - if (option_ == ZMQ_RCVCMD) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvcmd ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -496,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) return -1; // At this point we impose the flags on the message. - if (flags_ & ZMQ_SNDLABEL) - msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); - if (flags_ & ZMQ_SNDCMD) - msg_->set_flags (msg_t::command); // Try to send the message. rc = xsend (msg_, flags_); @@ -870,13 +845,7 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::extract_flags (msg_t *msg_) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); rcvmore = msg_->flags () & msg_t::more ? true : false; if (rcvmore) msg_->reset_flags (msg_t::more); - rcvcmd = msg_->flags () & msg_t::command ? true : false; - if (rcvcmd) - msg_->reset_flags (msg_t::command); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 901aa9e..37effa7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -184,12 +185,6 @@ namespace zmq // Number of messages received since last command processing. int ticks; - // True if the last message received had LABEL flag set. - bool rcvlabel; - - // True if the last message received had COMMAND flag set. - bool rcvcmd; - // True if the last message received had MORE flag set. bool rcvmore; diff --git a/src/xpub.cpp b/src/xpub.cpp index de55cec..dfc334a 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -101,8 +102,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. if (!more) diff --git a/src/xrep.cpp b/src/xrep.cpp index 61e703b..1b0f44d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { + if (msg_->flags () & msg_t::more) { more_out = true; @@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; prefetched = false; return 0; } @@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we are in the middle of reading a message, just return the next part. if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) rc = msg_->init_size (4); errno_assert (rc == 0); put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); + msg_->set_flags (msg_t::more); return 0; } diff --git a/src/xsub.cpp b/src/xsub.cpp index 58c6951..debcac8 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -117,7 +118,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->move (message); errno_assert (rc == 0); has_message = false; - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -137,14 +138,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || !options.filter || match (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags () & (msg_t::more | msg_t::label)) { + while (msg_->flags () & msg_t::more) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -184,7 +184,7 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags () & (msg_t::more | msg_t::label)) { + while (message.flags () & msg_t::more) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 6ed3762..5f0cfc1 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -7,7 +7,6 @@ noinst_PROGRAMS = test_pair_inproc \ test_reqrep_tcp \ test_hwm \ test_reqrep_device \ - test_reqrep_drop \ test_sub_forward \ test_invalid_rep @@ -24,7 +23,6 @@ test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp test_reqrep_device_SOURCES = test_reqrep_device.cpp -test_reqrep_drop_SOURCES = test_reqrep_drop.cpp test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp index dc902c2..f158b05 100644 --- a/tests/test_invalid_rep.cpp +++ b/tests/test_invalid_rep.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -47,12 +48,12 @@ int main (int argc, char *argv []) // Receive the request. char addr [4]; - char seqn [4]; + char bottom [1]; char body [1]; rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0); assert (rc == 4); - rc = zmq_recv (xrep_socket, seqn, sizeof (seqn), 0); - assert (rc == 4); + rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0); + assert (rc == 0); rc = zmq_recv (xrep_socket, body, sizeof (body), 0); assert (rc == 1); @@ -61,10 +62,10 @@ int main (int argc, char *argv []) assert (rc == 4); // Send valid reply. - rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDLABEL); - assert (rc == 4); - rc = zmq_send (xrep_socket, seqn, 4, ZMQ_SNDLABEL); + rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDMORE); assert (rc == 4); + rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE); + assert (rc == 0); rc = zmq_send (xrep_socket, "b", 1, 0); assert (rc == 1); diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp index a451956..4ee7cf2 100644 --- a/tests/test_reqrep_device.cpp +++ b/tests/test_reqrep_device.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -64,15 +65,11 @@ int main (int argc, char *argv []) assert (rc == 0); rc = zmq_recvmsg (xrep, &msg, 0); assert (rc >= 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); int rcvmore; + size_t sz = sizeof (rcvmore); rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); - rc = zmq_sendmsg (xreq, &msg, - (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + rc = zmq_sendmsg (xreq, &msg, rcvmore ? ZMQ_SNDMORE : 0); assert (rc >= 0); } @@ -81,21 +78,14 @@ int main (int argc, char *argv []) rc = zmq_recv (rep, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "ABC", 3) == 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); int rcvmore; + size_t sz = sizeof (rcvmore); rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (rcvmore); rc = zmq_recv (rep, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "DEF", 3) == 0); - rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (!rcvmore); @@ -113,15 +103,10 @@ int main (int argc, char *argv []) assert (rc == 0); rc = zmq_recvmsg (xreq, &msg, 0); assert (rc >= 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); int rcvmore; rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); - rc = zmq_sendmsg (xrep, &msg, - (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + rc = zmq_sendmsg (xrep, &msg, rcvmore ? ZMQ_SNDMORE : 0); assert (rc >= 0); } @@ -129,18 +114,12 @@ int main (int argc, char *argv []) rc = zmq_recv (req, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "GHI", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (rcvmore); rc = zmq_recv (req, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "JKL", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (!rcvmore); diff --git a/tests/test_reqrep_drop.cpp b/tests/test_reqrep_drop.cpp deleted file mode 100644 index 2829f5f..0000000 --- a/tests/test_reqrep_drop.cpp +++ /dev/null @@ -1,144 +0,0 @@ -/* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <assert.h> - -#include "../include/zmq.h" -#include "../include/zmq_utils.h" - -int main (int argc, char *argv []) -{ - void *ctx = zmq_init (1); - assert (ctx); - - // Check whether requests are discarded because of disconnected requester. - - // Create a server. - void *xrep = zmq_socket (ctx, ZMQ_XREP); - assert (xrep); - int rc = zmq_bind (xrep, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - // Create a client. - void *xreq = zmq_socket (ctx, ZMQ_XREQ); - assert (xreq); - rc = zmq_connect (xreq, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - // Send requests. - rc = zmq_send (xreq, "ABC", 3, 0); - assert (rc == 3); - rc = zmq_send (xreq, "DEF", 3, 0); - assert (rc == 3); - - // Disconnect client. - rc = zmq_close (xreq); - assert (rc == 0); - - // Wait a while for disconnect to happen. - zmq_sleep (1); - - // Try to receive a request -- it should have been discarded. - char buff [3]; - rc = zmq_recv (xrep, buff, 3, ZMQ_DONTWAIT); - assert (rc < 0); - assert (errno == EAGAIN); - - // Clean up. - rc = zmq_close (xrep); - assert (rc == 0); - - // New test. Check whether reply is dropped because of HWM overflow. - - int one = 1; - xreq = zmq_socket (ctx, ZMQ_XREQ); - assert (xreq); - rc = zmq_setsockopt (xreq, ZMQ_RCVHWM, &one, sizeof(one)); - assert (rc == 0); - rc = zmq_bind (xreq, "inproc://a"); - assert (rc == 0); - - void *rep = zmq_socket (ctx, ZMQ_REP); - assert (rep); - rc = zmq_setsockopt (rep, ZMQ_SNDHWM, &one, sizeof(one)); - assert (rc == 0); - rc = zmq_connect (rep, "inproc://a"); - assert (rc == 0); - - // Send request 1 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Send request 2 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Receive request 1 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send request 3 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Send reply 1 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive request 2 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send reply 2 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive request 3 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send reply 3 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive reply 1 - rc = zmq_recv (xreq, buff, 1, 0); - assert (rc == 1); - - // Receive reply 2 - rc = zmq_recv (xreq, buff, 1, 0); - assert (rc == 1); - - // Try to receive reply 3, it should have been dropped. - rc = zmq_recv (xreq, buff, 1, ZMQ_DONTWAIT); - assert (rc == -1 && errno == EAGAIN); - - // Clean up. - rc = zmq_close (xreq); - assert (rc == 0); - rc = zmq_close (rep); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} |