summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp60
1 files changed, 53 insertions, 7 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 67a9a39..6fa6bfa 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -21,13 +21,21 @@
#include "xrep.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
+
+ // On connect, pipes are created only after initial handshaking.
+ // That way we are aware of the peer's identity when binding to the pipes.
+ options.immediate_connect = false;
+
+ // XREP socket adds identity to inbound messages and strips identity
+ // from the outbound messages.
+ options.traceroute = true;
}
zmq::xrep_t::~xrep_t ()
@@ -35,12 +43,15 @@ zmq::xrep_t::~xrep_t ()
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
- zmq_assert (false);
+ // TODO: What if new connection has same peer identity as the old one?
+ bool ok = outpipes.insert (std::make_pair (
+ peer_identity_, outpipe_)).second;
+ zmq_assert (ok);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
@@ -51,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it)
+ if (it->second == pipe_) {
+ outpipes.erase (it);
+ return;
+ }
zmq_assert (false);
}
@@ -73,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = zmq_msg_size (msg_);
+
+ // Check whether the message is well-formed.
+ zmq_assert (size >= 1);
+ zmq_assert (size_t (*data + 1) <= size);
+
+ // Find the corresponding outbound pipe. If there's none, just drop the
+ // message.
+ // TODO: There's an allocation here! It's the critical path! Get rid of it!
+ blob_t identity (data + 1, *data);
+ outpipes_t::iterator it = outpipes.find (identity);
+ if (it == outpipes.end ()) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Push message to the selected pipe.
+ it->second->write (msg_);
+ it->second->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
}
int zmq::xrep_t::xflush ()
@@ -95,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()
bool zmq::xrep_t::xhas_out ()
{
- zmq_assert (false);
- return false;
+ // In theory, XREP socket is always ready for writing. Whether actual
+ // attempt to write succeeds depends on whitch pipe the message is going
+ // to be routed to.
+ return true;
}