diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dist.cpp | 6 | ||||
-rw-r--r-- | src/encoder.cpp | 5 | ||||
-rw-r--r-- | src/fq.cpp | 3 | ||||
-rw-r--r-- | src/lb.cpp | 6 | ||||
-rw-r--r-- | src/msg.hpp | 7 | ||||
-rw-r--r-- | src/pipe.cpp | 7 | ||||
-rw-r--r-- | src/rep.cpp | 19 | ||||
-rw-r--r-- | src/req.cpp | 46 | ||||
-rw-r--r-- | src/req.hpp | 7 | ||||
-rw-r--r-- | src/session_base.cpp | 4 | ||||
-rw-r--r-- | src/socket_base.cpp | 33 | ||||
-rw-r--r-- | src/socket_base.hpp | 7 | ||||
-rw-r--r-- | src/xpub.cpp | 4 | ||||
-rw-r--r-- | src/xrep.cpp | 11 | ||||
-rw-r--r-- | src/xsub.cpp | 10 |
15 files changed, 62 insertions, 113 deletions
diff --git a/src/dist.cpp b/src/dist.cpp index 59e6c08..d4be65b 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -112,8 +113,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // Push the message to matching pipes. distribute (msg_, flags_); @@ -182,7 +182,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) eligible--; return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } diff --git a/src/encoder.cpp b/src/encoder.cpp index a20623f..94af598 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -90,14 +91,14 @@ bool zmq::encoder_t::message_ready () tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } return true; } @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -92,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) if (pipe_) *pipe_ = pipes [current]; more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + msg_->flags () & msg_t::more ? true : false; if (!more) { current++; if (current >= active) @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -76,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) // switch back to non-dropping mode. if (dropping) { - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; if (!more) dropping = false; @@ -89,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) while (active > 0) { if (pipes [current]->write (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; break; } diff --git a/src/msg.hpp b/src/msg.hpp index c0dedd5..bc25598 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -48,10 +49,8 @@ namespace zmq // Mesage flags. enum { - label = 1, - command = 2, - shared = 64, - more = 128 + more = 1, + shared = 128 }; bool check (); diff --git a/src/pipe.cpp b/src/pipe.cpp index cbf7bf5..6dcc01a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -137,7 +138,7 @@ bool zmq::pipe_t::read (msg_t *msg_) return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -166,7 +167,7 @@ bool zmq::pipe_t::write (msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; outpipe->write (*msg_, more); if (!more) msgs_written++; @@ -180,7 +181,7 @@ void zmq::pipe_t::rollback () msg_t msg; if (outpipe) { while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & (msg_t::more | msg_t::label)); + zmq_assert (msg.flags () & msg_t::more); int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/rep.cpp b/src/rep.cpp index de99c8a..02a825c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -43,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) return -1; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -72,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - if (!(msg_->flags () & msg_t::label)) - break; + zmq_assert (msg_->flags () & msg_t::more); + bool bottom = (msg_->size () == 0); rc = xrep_t::xsend (msg_, flags_); errno_assert (rc == 0); + if (bottom) + break; } request_begins = false; } - else { - int rc = xrep_t::xrecv (msg_, flags_); - if (rc != 0) - return rc; - } - zmq_assert (!(msg_->flags () & msg_t::label)); + + // Get next message part to return to the user. + int rc = xrep_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; // If whole request is read, flip the FSM to reply-sending state. if (!(msg_->flags () & msg_t::more)) { diff --git a/src/req.cpp b/src/req.cpp index 9114daf..9694d2d 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -29,8 +30,7 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), receiving_reply (false), - message_begins (true), - request_id (generate_random ()) + message_begins (true) { options.type = ZMQ_REQ; } @@ -50,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) // First part of the request is the request identity. if (message_begins) { - msg_t prefix; - int rc = prefix.init_size (4); + msg_t bottom; + int rc = bottom.init (); errno_assert (rc == 0); - prefix.set_flags (msg_t::label); - unsigned char *data = (unsigned char*) prefix.data (); - put_uint32 (data, request_id); - rc = xreq_t::xsend (&prefix, flags_); + bottom.set_flags (msg_t::more); + rc = xreq_t::xsend (&bottom, 0); if (rc != 0) - return rc; + return -1; message_begins = false; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -92,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // TODO: This should also close the connection with the peer! - if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) { + if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { while (true) { int rc = xreq_t::xrecv (msg_, flags_); errno_assert (rc == 0); - if (!(msg_->flags () & (msg_t::label | msg_t::more))) - break; - } - msg_->close (); - msg_->init (); - errno = EAGAIN; - return -1; - } - - unsigned char *data = (unsigned char*) msg_->data (); - if (unlikely (get_uint32 (data) != request_id)) { - while (true) { - int rc = xreq_t::xrecv (msg_, flags_); - errno_assert (rc == 0); - if (!(msg_->flags () & (msg_t::label | msg_t::more))) + if (!(msg_->flags () & msg_t::more)) break; } msg_->close (); @@ -118,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) errno = EAGAIN; return -1; } + message_begins = false; } @@ -126,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & (msg_t::more | msg_t::label))) { - request_id++; + if (!(msg_->flags () & msg_t::more)) { receiving_reply = false; message_begins = true; } @@ -167,8 +151,8 @@ zmq::req_session_t::~req_session_t () int zmq::req_session_t::write (msg_t *msg_) { - if (state == request_id) { - if (msg_->flags () == msg_t::label && msg_->size () == 4) { + if (state == bottom) { + if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; return xreq_session_t::write (msg_); } @@ -177,7 +161,7 @@ int zmq::req_session_t::write (msg_t *msg_) if (msg_->flags () == msg_t::more) return xreq_session_t::write (msg_); if (msg_->flags () == 0) { - state = request_id; + state = bottom; return xreq_session_t::write (msg_); } } diff --git a/src/req.hpp b/src/req.hpp index d99b32a..78acbaf 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -51,10 +52,6 @@ namespace zmq // of the message must be empty message part (backtrace stack bottom). bool message_begins; - // Request ID. Request numbers gradually increase (and wrap over) - // so that we don't have to generate random ID for each request. - uint32_t request_id; - req_t (const req_t&); const req_t &operator = (const req_t&); }; @@ -74,7 +71,7 @@ namespace zmq private: enum { - request_id, + bottom, body } state; diff --git a/src/session_base.cpp b/src/session_base.cpp index d1d31c9..591b29e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -149,9 +150,8 @@ int zmq::session_base_t::read (msg_t *msg_) errno = EAGAIN; return -1; } + incomplete_in = msg_->flags () & msg_t::more ? true : false; - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; return 0; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e990ba1..967d314 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -120,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : destroyed (false), last_tsc (0), ticks (0), - rcvlabel (false), - rcvcmd (false), rcvmore (false) { } @@ -252,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return -1; } - if (option_ == ZMQ_RCVLABEL) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvlabel ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - - if (option_ == ZMQ_RCVCMD) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvcmd ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -496,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) return -1; // At this point we impose the flags on the message. - if (flags_ & ZMQ_SNDLABEL) - msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); - if (flags_ & ZMQ_SNDCMD) - msg_->set_flags (msg_t::command); // Try to send the message. rc = xsend (msg_, flags_); @@ -870,13 +845,7 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::extract_flags (msg_t *msg_) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); rcvmore = msg_->flags () & msg_t::more ? true : false; if (rcvmore) msg_->reset_flags (msg_t::more); - rcvcmd = msg_->flags () & msg_t::command ? true : false; - if (rcvcmd) - msg_->reset_flags (msg_t::command); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 901aa9e..37effa7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -184,12 +185,6 @@ namespace zmq // Number of messages received since last command processing. int ticks; - // True if the last message received had LABEL flag set. - bool rcvlabel; - - // True if the last message received had COMMAND flag set. - bool rcvcmd; - // True if the last message received had MORE flag set. bool rcvmore; diff --git a/src/xpub.cpp b/src/xpub.cpp index de55cec..dfc334a 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -101,8 +102,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. if (!more) diff --git a/src/xrep.cpp b/src/xrep.cpp index 61e703b..1b0f44d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { + if (msg_->flags () & msg_t::more) { more_out = true; @@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; prefetched = false; return 0; } @@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // If we are in the middle of reading a message, just return the next part. if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) rc = msg_->init_size (4); errno_assert (rc == 0); put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); + msg_->set_flags (msg_t::more); return 0; } diff --git a/src/xsub.cpp b/src/xsub.cpp index 58c6951..debcac8 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -117,7 +118,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->move (message); errno_assert (rc == 0); has_message = false; - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -137,14 +138,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || !options.filter || match (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags () & (msg_t::more | msg_t::label)) { + while (msg_->flags () & msg_t::more) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -184,7 +184,7 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags () & (msg_t::more | msg_t::label)) { + while (message.flags () & msg_t::more) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } |