From 7cfd1c58ba244ee0185043c3dac0617bd7a7b938 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 2 Apr 2012 11:47:40 +0200 Subject: 0MQ/2.1 wire format compatibility implemented - XS_PROTOCOL option added - libxs ignores when unused flags are set to 1 (0MQ/2.1 bug) - compatibility tests added Signed-off-by: Martin Sustrik --- builds/msvc/tests/tests.vcxproj | 6 +- builds/msvc/tests/tests.vcxproj.filters | 5 +- include/xs.h | 1 + src/decoder.cpp | 2 +- src/options.cpp | 25 +++++ src/options.hpp | 3 + src/pipe.cpp | 19 ++-- src/pipe.hpp | 14 ++- src/session_base.cpp | 2 +- src/socket_base.cpp | 4 +- src/xpub.cpp | 8 +- src/xsub.cpp | 18 +++- tests/Makefile.am | 4 +- tests/libzmq21.cpp | 175 ++++++++++++++++++++++++++++++++ tests/tests.cpp | 6 ++ 15 files changed, 268 insertions(+), 24 deletions(-) create mode 100644 tests/libzmq21.cpp diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index a3a4434..ee4d414 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -160,6 +160,10 @@ true true + + true + true + @@ -176,4 +180,4 @@ - \ No newline at end of file + diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index 02d202b..d858166 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -59,6 +59,9 @@ Header Files + + Header Files + @@ -70,4 +73,4 @@ Header Files - \ No newline at end of file + diff --git a/include/xs.h b/include/xs.h index 2adea21..7e49df4 100644 --- a/include/xs.h +++ b/include/xs.h @@ -200,6 +200,7 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, #define XS_SNDTIMEO 28 #define XS_IPV4ONLY 31 #define XS_KEEPALIVE 32 +#define XS_PROTOCOL 33 /* Message options */ #define XS_MORE 1 diff --git a/src/decoder.cpp b/src/decoder.cpp index 23546f1..dc8e54e 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -126,7 +126,7 @@ bool xs::decoder_t::eight_byte_size_ready () bool xs::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. - in_progress.set_flags (tmpbuf [0]); + in_progress.set_flags (tmpbuf [0] & 0x01); next_step (in_progress.data (), in_progress.size (), &decoder_t::message_ready); diff --git a/src/options.cpp b/src/options.cpp index 07d3752..c9cbaae 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,6 +46,7 @@ xs::options_t::options_t () : sndtimeo (-1), ipv4only (1), keepalive (0), + protocol (0), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -232,6 +233,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_PROTOCOL: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val < 0) { + errno = EINVAL; + return -1; + } + protocol = val; + return 0; + } + } errno = EINVAL; @@ -413,6 +429,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_PROTOCOL: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = protocol; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 3e47336..c1e4dda 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,6 +92,9 @@ namespace xs // If 1, keepalives are to be sent periodically. int keepalive; + // Version of wire protocol to use. + int protocol; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/pipe.cpp b/src/pipe.cpp index 76df1b0..51ecedc 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@ #include "err.hpp" int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2], bool delays_ [2], int protocol_) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0], delays_ [0], protocol_); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1], delays_ [1], protocol_); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_, bool delay_, int protocol_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -65,7 +65,8 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (delay_), + protocol (protocol_) { } @@ -375,7 +376,8 @@ void xs::pipe_t::terminate (bool delay_) rollback (); // Push delimiter into the outbound pipe. Note that watermarks are not - // checked thus the delimiter can be written even though the pipe is full. + // checked thus the delimiter can be written even though the pipe + // is full. msg_t msg; msg.init_delimiter (); outpipe->write (msg, false); @@ -383,6 +385,11 @@ void xs::pipe_t::terminate (bool delay_) } } +int xs::pipe_t::get_protocol () +{ + return protocol; +} + bool xs::pipe_t::is_delimiter (msg_t &msg_) { return msg_.is_delimiter (); diff --git a/src/pipe.hpp b/src/pipe.hpp index bce2c04..c298154 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2], bool delays_ [2], int protocol_); struct i_pipe_events { @@ -68,7 +68,8 @@ namespace xs { // This allows pipepair to create pipe objects. friend int pipepair (xs::object_t *parents_ [2], - xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], + int protocol_); public: @@ -110,6 +111,9 @@ namespace xs // before actual shutdown. void terminate (bool delay_); + // Returns the ID of the protocol associated with the pipe. + int get_protocol (); + private: // Type of the underlying lock-free pipe. @@ -128,7 +132,7 @@ namespace xs // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_, bool delay_, int protocol_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -188,6 +192,10 @@ namespace xs // asks us to. bool delay; + // ID of the protocol to use. This value is not used by the pipe + // itself, rather it's used by the users of the pipe. + int protocol; + // Identity of the writer. Used uniquely by the reader side. blob_t identity; diff --git a/src/session_base.cpp b/src/session_base.cpp index b752f9f..0c9428b 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -278,7 +278,7 @@ void xs::session_base_t::process_attach (i_engine *engine_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); + int rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 29961ab..9d33348 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -410,7 +410,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {sndhwm, rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - int rc = pipepair (parents, pipes, hwms, delays); + int rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -452,7 +452,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.sndhwm, options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, pipes, hwms, delays); + rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/xpub.cpp b/src/xpub.cpp index 29dd079..255c063 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -42,9 +42,11 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) xs_assert (pipe_); dist.attach (pipe_); - // If icanhasall_ is specified, the caller would like to subscribe - // to all data on this pipe, implicitly. - if (icanhasall_) + // If icanhasall_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. Also, if we are using + // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, + // we need to subscribe for all messages automatically. + if (icanhasall_ || pipe_->get_protocol () == 1) subscriptions.add (NULL, 0, pipe_); // The pipe is active when attached. Let's read the subscriptions from diff --git a/src/xsub.cpp b/src/xsub.cpp index add5ba9..af6789f 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -52,7 +52,11 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); fq.attach (pipe_); - dist.attach (pipe_); + + // Pipes with 0MQ/2.1-style protocol are not eligible for accepting + // subscriptions. + if (pipe_->get_protocol () != 1) + dist.attach (pipe_); // Send all the cached subscriptions to the new upstream peer. subscriptions.apply (send_subscription, pipe_); @@ -72,14 +76,18 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - dist.terminated (pipe_); + if (pipe_->get_protocol () != 1) + dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { - // Send all the cached subscriptions to the hiccuped pipe. - subscriptions.apply (send_subscription, pipe_); - pipe_->flush (); + if (pipe_->get_protocol () != 1) { + + // Send all the cached subscriptions to the hiccuped pipe. + subscriptions.apply (send_subscription, pipe_); + pipe_->flush (); + } } int xs::xsub_t::xsend (msg_t *msg_, int flags_) diff --git a/tests/Makefile.am b/tests/Makefile.am index 817b0c1..8e2bbbf 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -21,7 +21,8 @@ noinst_PROGRAMS = pair_inproc \ max_sockets \ emptyctx \ polltimeo \ - wireformat + wireformat \ + libzmq21 pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -42,5 +43,6 @@ max_sockets_SOURCES = max_sockets.cpp emptyctx_SOURCES = emptyctx.cpp polltimeo_SOURCES = polltimeo.cpp testutil.hpp wireformat_SOURCES = wireformat.cpp +libzmq21_SOURCES = libzmq21.cpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp new file mode 100644 index 0000000..e7affae --- /dev/null +++ b/tests/libzmq21.cpp @@ -0,0 +1,175 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Paul Colomiets + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +#if defined XS_HAVE_WINDOWS +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#if defined XS_HAVE_OPENVMS +#include +#endif + +int XS_TEST_MAIN () +{ + fprintf (stderr, "libzmq21 test running...\n"); + +#if defined XS_HAVE_WINDOWS + WSADATA info; + int wsarc = WSAStartup (MAKEWORD(1,1), &info); + assert (wsarc == 0); +#endif + + // First, test up-to-date publisher with 0MQ/2.1-style subscriber. + + // Create the basic infrastructure. + void *ctx = xs_init (); + assert (ctx); + void *pub = xs_socket (ctx, XS_PUB); + assert (pub); + int protocol = 1; + int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol)); + assert (rc == 0); + rc = xs_bind (pub, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + int oldsub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + struct sockaddr_in address; + address.sin_family = AF_INET; + address.sin_addr.s_addr = inet_addr ("127.0.0.1"); + address.sin_port = htons (5560); + rc = connect (oldsub, (struct sockaddr*) &address, sizeof (address)); + assert (rc == 0); + + // Wait a while to allow connection to be accepted on the publisher side. + sleep (1); + + // Send a message and check whether it arrives although there was no + // subscription sent. + rc = xs_send (pub, "ABC", 3, 0); + assert (rc == 3); + char buf [5]; + rc = recv (oldsub, buf, sizeof (buf), 0); + assert (rc == 5); + assert (!memcmp (buf, "\x04\0ABC", 5)); + + // Tear down the infrastructure. + rc = xs_close (pub); + assert (rc == 0); +#if defined XS_HAVE_WINDOWS + rc = closesocket (oldsub); + assert (rc != SOCKET_ERROR); +#else + rc = close (oldsub); + assert (rc == 0); +#endif + rc = xs_term (ctx); + assert (rc == 0); + + // Second, test the 0MQ/2.1-style publisher with up-to-date subscriber. + + // Create the basic infrastructure. + ctx = xs_init (); + assert (ctx); + void *sub = xs_socket (ctx, XS_SUB); + assert (sub); + protocol = 1; + rc = xs_setsockopt (sub, XS_PROTOCOL, &protocol, sizeof (protocol)); + assert (rc == 0); + rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = xs_bind (sub, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + int oldpub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + address.sin_family = AF_INET; + address.sin_addr.s_addr = inet_addr ("127.0.0.1"); + address.sin_port = htons (5560); + rc = connect (oldpub, (struct sockaddr*) &address, sizeof (address)); + assert (rc == 0); + + // Wait a while to allow connection to be accepted on the subscriber side. + sleep (1); + + // Set the socket to the non-blocking mode. + #ifdef XS_HAVE_WINDOWS + u_long nonblock = 1; + rc = ioctlsocket (oldpub, FIONBIO, &nonblock); + assert (rc != SOCKET_ERROR); + #elif XS_HAVE_OPENVMS + int nonblock = 1; + rc = ioctl (oldpub, FIONBIO, &nonblock); + assert (rc != -1); + #else + int flags = fcntl (oldpub, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (oldpub, F_SETFL, flags | O_NONBLOCK); + assert (rc != -1); + #endif + + // Check that subscription haven't arrived at the publisher side. + rc = recv (oldpub, buf, sizeof (buf), 0); +#if defined XS_HAVE_WINDOWS + assert (rc == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK); +#else + assert (rc == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)); +#endif + + // Pass one message through. + rc = send (oldpub, "\x04\0ABC", 5, 0); + assert (rc == 5); + rc = xs_recv (sub, buf, sizeof (buf), 0); + assert (rc == 3); + + // Pass one message with usused flags set (0MQ/2.1 bug). + rc = send (oldpub, "\x04\xfe" "ABC", 5, 0); + assert (rc == 5); + rc = xs_recv (sub, buf, sizeof (buf), 0); + assert (rc == 3); + + // Tear down the infrastructure. + rc = xs_close (sub); + assert (rc == 0); +#if defined XS_HAVE_WINDOWS + rc = closesocket (oldpub); + assert (rc != SOCKET_ERROR); +#else + rc = close (oldpub); + assert (rc == 0); +#endif + rc = xs_term (ctx); + assert (rc == 0); + +#if defined XS_HAVE_WINDOWS + WSACleanup (); +#endif + + return 0 ; +} diff --git a/tests/tests.cpp b/tests/tests.cpp index d4756ab..f5c403a 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -103,6 +103,10 @@ #include "wireformat.cpp" #undef XS_TEST_MAIN +#define XS_TEST_MAIN libzmq21 +#include "libzmq21.cpp" +#undef XS_TEST_MAIN + int main () { int rc; @@ -143,6 +147,8 @@ int main () assert (rc == 0); rc = wireformat (); assert (rc == 0); + rc = libzmq21 (); + assert (rc == 0); fprintf (stderr, "SUCCESS\n"); sleep (1); -- cgit v1.2.3