summaryrefslogtreecommitdiff
path: root/src/req.cpp
diff options
context:
space:
mode:
authorsustrik <sustrik@250bpm.com>2011-11-04 02:15:37 -0700
committersustrik <sustrik@250bpm.com>2011-11-04 02:15:37 -0700
commit6cdd720400ea456ccbfdf09cdc5054ab07dbdc6f (patch)
tree6a25ede64ac1252e022feb91b1342cdc38e3dcf5 /src/req.cpp
parent541b83bc02784c721efa3d9dde8f8a191c3c3b7b (diff)
parente9c3a227a7175b4eda5193b1c8ce6985f5ed89f3 (diff)
Merge pull request #220 from 250bpm/HEAD
Refactoring
Diffstat (limited to 'src/req.cpp')
-rw-r--r--src/req.cpp62
1 files changed, 28 insertions, 34 deletions
diff --git a/src/req.cpp b/src/req.cpp
index 0832f60..3ba1ec0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -28,8 +30,7 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true),
- request_id (generate_random ())
+ message_begins (true)
{
options.type = ZMQ_REQ;
}
@@ -49,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if (message_begins) {
- msg_t prefix;
- int rc = prefix.init_size (4);
+ msg_t bottom;
+ int rc = bottom.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::label);
- unsigned char *data = (unsigned char*) prefix.data ();
- put_uint32 (data, request_id);
- rc = xreq_t::xsend (&prefix, flags_);
+ bottom.set_flags (msg_t::more);
+ rc = xreq_t::xsend (&bottom, 0);
if (rc != 0)
- return rc;
+ return -1;
message_begins = false;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -91,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// TODO: This should also close the connection with the peer!
- if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
- break;
- }
- msg_->close ();
- msg_->init ();
- errno = EAGAIN;
- return -1;
- }
-
- unsigned char *data = (unsigned char*) msg_->data ();
- if (unlikely (get_uint32 (data) != request_id)) {
- while (true) {
- int rc = xreq_t::xrecv (msg_, flags_);
- errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
@@ -117,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
message_begins = false;
}
@@ -125,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
- request_id++;
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
@@ -162,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
zmq::req_session_t::~req_session_t ()
{
+ state = options.recv_identity ? identity : bottom;
}
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == request_id) {
- if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ switch (state) {
+ case bottom:
+ if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
- }
- else {
+ break;
+ case body:
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
- state = request_id;
+ state = bottom;
+ return xreq_session_t::write (msg_);
+ }
+ break;
+ case identity:
+ if (msg_->flags () == 0) {
+ state = bottom;
return xreq_session_t::write (msg_);
}
+ break;
}
errno = EFAULT;
return -1;