summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-11-01 13:39:54 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-11-01 13:39:54 +0100
commit7842c7107358324e8c5b9af7272e6dcab8c97931 (patch)
treed97f0e869ea0d95c60d41c96b48627c38e9c464c /src
parent626099aa2a292178872843c55cc5226e6850f2ed (diff)
LABELS and COMMANDs removed
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/dist.cpp6
-rw-r--r--src/encoder.cpp5
-rw-r--r--src/fq.cpp3
-rw-r--r--src/lb.cpp6
-rw-r--r--src/msg.hpp7
-rw-r--r--src/pipe.cpp7
-rw-r--r--src/rep.cpp19
-rw-r--r--src/req.cpp46
-rw-r--r--src/req.hpp7
-rw-r--r--src/session_base.cpp4
-rw-r--r--src/socket_base.cpp33
-rw-r--r--src/socket_base.hpp7
-rw-r--r--src/xpub.cpp4
-rw-r--r--src/xrep.cpp11
-rw-r--r--src/xsub.cpp10
15 files changed, 62 insertions, 113 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 59e6c08..d4be65b 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -112,8 +113,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -182,7 +182,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/encoder.cpp b/src/encoder.cpp
index a20623f..94af598 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -90,14 +91,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/fq.cpp b/src/fq.cpp
index 6dd7009..ed1947c 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -92,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (pipe_)
*pipe_ = pipes [current];
more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ msg_->flags () & msg_t::more ? true : false;
if (!more) {
current++;
if (current >= active)
diff --git a/src/lb.cpp b/src/lb.cpp
index bcef48b..e100a9e 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -76,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
if (!more)
dropping = false;
@@ -89,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
break;
}
diff --git a/src/msg.hpp b/src/msg.hpp
index c0dedd5..bc25598 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -48,10 +49,8 @@ namespace zmq
// Mesage flags.
enum
{
- label = 1,
- command = 2,
- shared = 64,
- more = 128
+ more = 1,
+ shared = 128
};
bool check ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index cbf7bf5..6dcc01a 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -137,7 +138,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -166,7 +167,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
outpipe->write (*msg_, more);
if (!more)
msgs_written++;
@@ -180,7 +181,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
+ zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/rep.cpp b/src/rep.cpp
index de99c8a..02a825c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -43,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -72,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- if (!(msg_->flags () & msg_t::label))
- break;
+ zmq_assert (msg_->flags () & msg_t::more);
+ bool bottom = (msg_->size () == 0);
rc = xrep_t::xsend (msg_, flags_);
errno_assert (rc == 0);
+ if (bottom)
+ break;
}
request_begins = false;
}
- else {
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
- }
- zmq_assert (!(msg_->flags () & msg_t::label));
+
+ // Get next message part to return to the user.
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
// If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags () & msg_t::more)) {
diff --git a/src/req.cpp b/src/req.cpp
index 9114daf..9694d2d 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -29,8 +30,7 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true),
- request_id (generate_random ())
+ message_begins (true)
{
options.type = ZMQ_REQ;
}
@@ -50,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if (message_begins) {
- msg_t prefix;
- int rc = prefix.init_size (4);
+ msg_t bottom;
+ int rc = bottom.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::label);
- unsigned char *data = (unsigned char*) prefix.data ();
- put_uint32 (data, request_id);
- rc = xreq_t::xsend (&prefix, flags_);
+ bottom.set_flags (msg_t::more);
+ rc = xreq_t::xsend (&bottom, 0);
if (rc != 0)
- return rc;
+ return -1;
message_begins = false;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -92,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// TODO: This should also close the connection with the peer!
- if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
- break;
- }
- msg_->close ();
- msg_->init ();
- errno = EAGAIN;
- return -1;
- }
-
- unsigned char *data = (unsigned char*) msg_->data ();
- if (unlikely (get_uint32 (data) != request_id)) {
- while (true) {
- int rc = xreq_t::xrecv (msg_, flags_);
- errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
@@ -118,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
message_begins = false;
}
@@ -126,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
- request_id++;
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
@@ -167,8 +151,8 @@ zmq::req_session_t::~req_session_t ()
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == request_id) {
- if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ if (state == bottom) {
+ if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
@@ -177,7 +161,7 @@ int zmq::req_session_t::write (msg_t *msg_)
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
- state = request_id;
+ state = bottom;
return xreq_session_t::write (msg_);
}
}
diff --git a/src/req.hpp b/src/req.hpp
index d99b32a..78acbaf 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -51,10 +52,6 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
- // Request ID. Request numbers gradually increase (and wrap over)
- // so that we don't have to generate random ID for each request.
- uint32_t request_id;
-
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
@@ -74,7 +71,7 @@ namespace zmq
private:
enum {
- request_id,
+ bottom,
body
} state;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index d1d31c9..591b29e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -149,9 +150,8 @@ int zmq::session_base_t::read (msg_t *msg_)
errno = EAGAIN;
return -1;
}
+ incomplete_in = msg_->flags () & msg_t::more ? true : false;
- incomplete_in =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
return 0;
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e990ba1..967d314 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -120,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
- rcvlabel (false),
- rcvcmd (false),
rcvmore (false)
{
}
@@ -252,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
- if (option_ == ZMQ_RCVLABEL) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvlabel ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
- if (option_ == ZMQ_RCVCMD) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvcmd ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -496,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return -1;
// At this point we impose the flags on the message.
- if (flags_ & ZMQ_SNDLABEL)
- msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
- if (flags_ & ZMQ_SNDCMD)
- msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -870,13 +845,7 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
- rcvcmd = msg_->flags () & msg_t::command ? true : false;
- if (rcvcmd)
- msg_->reset_flags (msg_t::command);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 901aa9e..37effa7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -184,12 +185,6 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // True if the last message received had LABEL flag set.
- bool rcvlabel;
-
- // True if the last message received had COMMAND flag set.
- bool rcvcmd;
-
// True if the last message received had MORE flag set.
bool rcvmore;
diff --git a/src/xpub.cpp b/src/xpub.cpp
index de55cec..dfc334a 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -101,8 +102,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 61e703b..1b0f44d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
@@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false;
return 0;
}
@@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
rc = msg_->init_size (4);
errno_assert (rc == 0);
put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
+ msg_->set_flags (msg_t::more);
return 0;
}
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 58c6951..debcac8 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -1,6 +1,7 @@
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -117,7 +118,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -137,14 +138,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & (msg_t::more | msg_t::label)) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -184,7 +184,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & (msg_t::more | msg_t::label)) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}