summaryrefslogtreecommitdiff
path: root/src/stream_engine.cpp
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-05-20 07:40:11 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-05-25 14:02:16 +0200
commita34ea4d80609395150742259fd8c9caa4409e961 (patch)
treeaa5c1793e7e5e276e4ded626adbe042e75740ff6 /src/stream_engine.cpp
parent6b7089891bdb3a4c55b43d0854787c96fae3bf2b (diff)
Implement SP wire protocol
Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src/stream_engine.cpp')
-rw-r--r--src/stream_engine.cpp58
1 files changed, 57 insertions, 1 deletions
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index e70d1d3..45a6af6 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,8 +53,21 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
session (NULL),
leftover_session (NULL),
options (options_),
- plugged (false)
+ plugged (false),
+ header_pos (in_header),
+ header_remaining (sizeof in_header),
+ header_received (false),
+ header_sent (false)
{
+ // Fill in outgoing SP protocol header and the complementary (desired)
+ // header.
+ if (!options.legacy_protocol) {
+ sp_get_header (out_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_role);
+ sp_get_header (desired_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_complement);
+ }
+
// Get the socket into non-blocking mode.
unblock_socket (s);
@@ -155,6 +168,35 @@ void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
+ // If we have not yet received the full protocol header...
+ if (unlikely (!options.legacy_protocol && !header_received)) {
+
+ // Read remaining header bytes.
+ int hbytes = read (header_pos, header_remaining);
+
+ // Check whether the peer has closed the connection.
+ if (hbytes == -1) {
+ error ();
+ return;
+ }
+
+ header_remaining -= hbytes;
+ header_pos += hbytes;
+
+ // If we did not read the whole header, poll for more.
+ if (header_remaining)
+ return;
+
+ // If the protocol headers do not match, close the connection.
+ if (memcmp (in_header, desired_header, sizeof in_header) != 0) {
+ error ();
+ return;
+ }
+
+ // Done with protocol header; proceed to read data.
+ header_received = true;
+ }
+
// If there's no data to process in the buffer...
if (!insize) {
@@ -210,6 +252,20 @@ void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
+ // If protocol header was not yet sent...
+ if (unlikely (!options.legacy_protocol && !header_sent)) {
+ int hbytes = write (out_header, sizeof out_header);
+
+ // It should always be possible to write the full protocol header to a
+ // freshly connected TCP socket. Therefore, if we get an error or
+ // partial write here the peer has disconnected.
+ if (hbytes != sizeof out_header) {
+ error ();
+ return;
+ }
+ header_sent = true;
+ }
+
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {