summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--builds/msvc/tests/tests.vcxproj6
-rw-r--r--builds/msvc/tests/tests.vcxproj.filters5
-rw-r--r--include/xs.h1
-rw-r--r--src/decoder.cpp2
-rw-r--r--src/options.cpp25
-rw-r--r--src/options.hpp3
-rw-r--r--src/pipe.cpp19
-rw-r--r--src/pipe.hpp14
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/xpub.cpp8
-rw-r--r--src/xsub.cpp18
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/libzmq21.cpp175
-rw-r--r--tests/tests.cpp6
15 files changed, 268 insertions, 24 deletions
diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj
index a3a4434..ee4d414 100644
--- a/builds/msvc/tests/tests.vcxproj
+++ b/builds/msvc/tests/tests.vcxproj
@@ -160,6 +160,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\libzmq21.cpp">
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\libxs\libxs.vcxproj">
@@ -176,4 +180,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project> \ No newline at end of file
+</Project>
diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters
index 02d202b..d858166 100644
--- a/builds/msvc/tests/tests.vcxproj.filters
+++ b/builds/msvc/tests/tests.vcxproj.filters
@@ -59,6 +59,9 @@
<ClCompile Include="..\..\..\tests\wireformat.cpp">
<Filter>Header Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\libzmq21.cpp">
+ <Filter>Header Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="Header Files">
@@ -70,4 +73,4 @@
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
-</Project> \ No newline at end of file
+</Project>
diff --git a/include/xs.h b/include/xs.h
index 2adea21..7e49df4 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -200,6 +200,7 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_SNDTIMEO 28
#define XS_IPV4ONLY 31
#define XS_KEEPALIVE 32
+#define XS_PROTOCOL 33
/* Message options */
#define XS_MORE 1
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 23546f1..dc8e54e 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -126,7 +126,7 @@ bool xs::decoder_t::eight_byte_size_ready ()
bool xs::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.set_flags (tmpbuf [0]);
+ in_progress.set_flags (tmpbuf [0] & 0x01);
next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready);
diff --git a/src/options.cpp b/src/options.cpp
index 07d3752..c9cbaae 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -46,6 +46,7 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
+ protocol (0),
delay_on_close (true),
delay_on_disconnect (true),
filter (false),
@@ -232,6 +233,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
+ case XS_PROTOCOL:
+ {
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int val = *((int*) optval_);
+ if (val < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = val;
+ return 0;
+ }
+
}
errno = EINVAL;
@@ -413,6 +429,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case XS_PROTOCOL:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = protocol;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 3e47336..c1e4dda 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,6 +92,9 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
+ // Version of wire protocol to use.
+ int protocol;
+
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
bool delay_on_close;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 76df1b0..51ecedc 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2])
+ int hwms_ [2], bool delays_ [2], int protocol_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0]);
+ hwms_ [1], hwms_ [0], delays_ [0], protocol_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1]);
+ hwms_ [0], hwms_ [1], delays_ [1], protocol_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_) :
+ int inhwm_, int outhwm_, bool delay_, int protocol_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -65,7 +65,8 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_)
+ delay (delay_),
+ protocol (protocol_)
{
}
@@ -375,7 +376,8 @@ void xs::pipe_t::terminate (bool delay_)
rollback ();
// Push delimiter into the outbound pipe. Note that watermarks are not
- // checked thus the delimiter can be written even though the pipe is full.
+ // checked thus the delimiter can be written even though the pipe
+ // is full.
msg_t msg;
msg.init_delimiter ();
outpipe->write (msg, false);
@@ -383,6 +385,11 @@ void xs::pipe_t::terminate (bool delay_)
}
}
+int xs::pipe_t::get_protocol ()
+{
+ return protocol;
+}
+
bool xs::pipe_t::is_delimiter (msg_t &msg_)
{
return msg_.is_delimiter ();
diff --git a/src/pipe.hpp b/src/pipe.hpp
index bce2c04..c298154 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2]);
+ int hwms_ [2], bool delays_ [2], int protocol_);
struct i_pipe_events
{
@@ -68,7 +68,8 @@ namespace xs
{
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
- xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
+ xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
+ int protocol_);
public:
@@ -110,6 +111,9 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
+ // Returns the ID of the protocol associated with the pipe.
+ int get_protocol ();
+
private:
// Type of the underlying lock-free pipe.
@@ -128,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_);
+ int inhwm_, int outhwm_, bool delay_, int protocol_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -188,6 +192,10 @@ namespace xs
// asks us to.
bool delay;
+ // ID of the protocol to use. This value is not used by the pipe
+ // itself, rather it's used by the users of the pipe.
+ int protocol;
+
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index b752f9f..0c9428b 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -278,7 +278,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 29961ab..9d33348 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -410,7 +410,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- int rc = pipepair (parents, pipes, hwms, delays);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -452,7 +452,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, pipes, hwms, delays);
+ rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 29dd079..255c063 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -42,9 +42,11 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
xs_assert (pipe_);
dist.attach (pipe_);
- // If icanhasall_ is specified, the caller would like to subscribe
- // to all data on this pipe, implicitly.
- if (icanhasall_)
+ // If icanhasall_ is specified, the caller would like to subscribe
+ // to all data on this pipe, implicitly. Also, if we are using
+ // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
+ // we need to subscribe for all messages automatically.
+ if (icanhasall_ || pipe_->get_protocol () == 1)
subscriptions.add (NULL, 0, pipe_);
// The pipe is active when attached. Let's read the subscriptions from
diff --git a/src/xsub.cpp b/src/xsub.cpp
index add5ba9..af6789f 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -52,7 +52,11 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
fq.attach (pipe_);
- dist.attach (pipe_);
+
+ // Pipes with 0MQ/2.1-style protocol are not eligible for accepting
+ // subscriptions.
+ if (pipe_->get_protocol () != 1)
+ dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
subscriptions.apply (send_subscription, pipe_);
@@ -72,14 +76,18 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- dist.terminated (pipe_);
+ if (pipe_->get_protocol () != 1)
+ dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
- // Send all the cached subscriptions to the hiccuped pipe.
- subscriptions.apply (send_subscription, pipe_);
- pipe_->flush ();
+ if (pipe_->get_protocol () != 1) {
+
+ // Send all the cached subscriptions to the hiccuped pipe.
+ subscriptions.apply (send_subscription, pipe_);
+ pipe_->flush ();
+ }
}
int xs::xsub_t::xsend (msg_t *msg_, int flags_)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 817b0c1..8e2bbbf 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -21,7 +21,8 @@ noinst_PROGRAMS = pair_inproc \
max_sockets \
emptyctx \
polltimeo \
- wireformat
+ wireformat \
+ libzmq21
pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp
pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp
@@ -42,5 +43,6 @@ max_sockets_SOURCES = max_sockets.cpp
emptyctx_SOURCES = emptyctx.cpp
polltimeo_SOURCES = polltimeo.cpp testutil.hpp
wireformat_SOURCES = wireformat.cpp
+libzmq21_SOURCES = libzmq21.cpp
TESTS = $(noinst_PROGRAMS)
diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp
new file mode 100644
index 0000000..e7affae
--- /dev/null
+++ b/tests/libzmq21.cpp
@@ -0,0 +1,175 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Paul Colomiets
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+
+#if defined XS_HAVE_WINDOWS
+#include <winsock2.h>
+#else
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#endif
+
+#if defined XS_HAVE_OPENVMS
+#include <ioctl.h>
+#endif
+
+int XS_TEST_MAIN ()
+{
+ fprintf (stderr, "libzmq21 test running...\n");
+
+#if defined XS_HAVE_WINDOWS
+ WSADATA info;
+ int wsarc = WSAStartup (MAKEWORD(1,1), &info);
+ assert (wsarc == 0);
+#endif
+
+ // First, test up-to-date publisher with 0MQ/2.1-style subscriber.
+
+ // Create the basic infrastructure.
+ void *ctx = xs_init ();
+ assert (ctx);
+ void *pub = xs_socket (ctx, XS_PUB);
+ assert (pub);
+ int protocol = 1;
+ int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol));
+ assert (rc == 0);
+ rc = xs_bind (pub, "tcp://127.0.0.1:5560");
+ assert (rc == 0);
+
+ int oldsub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ struct sockaddr_in address;
+ address.sin_family = AF_INET;
+ address.sin_addr.s_addr = inet_addr ("127.0.0.1");
+ address.sin_port = htons (5560);
+ rc = connect (oldsub, (struct sockaddr*) &address, sizeof (address));
+ assert (rc == 0);
+
+ // Wait a while to allow connection to be accepted on the publisher side.
+ sleep (1);
+
+ // Send a message and check whether it arrives although there was no
+ // subscription sent.
+ rc = xs_send (pub, "ABC", 3, 0);
+ assert (rc == 3);
+ char buf [5];
+ rc = recv (oldsub, buf, sizeof (buf), 0);
+ assert (rc == 5);
+ assert (!memcmp (buf, "\x04\0ABC", 5));
+
+ // Tear down the infrastructure.
+ rc = xs_close (pub);
+ assert (rc == 0);
+#if defined XS_HAVE_WINDOWS
+ rc = closesocket (oldsub);
+ assert (rc != SOCKET_ERROR);
+#else
+ rc = close (oldsub);
+ assert (rc == 0);
+#endif
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ // Second, test the 0MQ/2.1-style publisher with up-to-date subscriber.
+
+ // Create the basic infrastructure.
+ ctx = xs_init ();
+ assert (ctx);
+ void *sub = xs_socket (ctx, XS_SUB);
+ assert (sub);
+ protocol = 1;
+ rc = xs_setsockopt (sub, XS_PROTOCOL, &protocol, sizeof (protocol));
+ assert (rc == 0);
+ rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0);
+ assert (rc == 0);
+ rc = xs_bind (sub, "tcp://127.0.0.1:5560");
+ assert (rc == 0);
+
+ int oldpub = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ address.sin_family = AF_INET;
+ address.sin_addr.s_addr = inet_addr ("127.0.0.1");
+ address.sin_port = htons (5560);
+ rc = connect (oldpub, (struct sockaddr*) &address, sizeof (address));
+ assert (rc == 0);
+
+ // Wait a while to allow connection to be accepted on the subscriber side.
+ sleep (1);
+
+ // Set the socket to the non-blocking mode.
+ #ifdef XS_HAVE_WINDOWS
+ u_long nonblock = 1;
+ rc = ioctlsocket (oldpub, FIONBIO, &nonblock);
+ assert (rc != SOCKET_ERROR);
+ #elif XS_HAVE_OPENVMS
+ int nonblock = 1;
+ rc = ioctl (oldpub, FIONBIO, &nonblock);
+ assert (rc != -1);
+ #else
+ int flags = fcntl (oldpub, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ rc = fcntl (oldpub, F_SETFL, flags | O_NONBLOCK);
+ assert (rc != -1);
+ #endif
+
+ // Check that subscription haven't arrived at the publisher side.
+ rc = recv (oldpub, buf, sizeof (buf), 0);
+#if defined XS_HAVE_WINDOWS
+ assert (rc == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK);
+#else
+ assert (rc == -1 && (errno == EAGAIN || errno == EWOULDBLOCK));
+#endif
+
+ // Pass one message through.
+ rc = send (oldpub, "\x04\0ABC", 5, 0);
+ assert (rc == 5);
+ rc = xs_recv (sub, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Pass one message with usused flags set (0MQ/2.1 bug).
+ rc = send (oldpub, "\x04\xfe" "ABC", 5, 0);
+ assert (rc == 5);
+ rc = xs_recv (sub, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Tear down the infrastructure.
+ rc = xs_close (sub);
+ assert (rc == 0);
+#if defined XS_HAVE_WINDOWS
+ rc = closesocket (oldpub);
+ assert (rc != SOCKET_ERROR);
+#else
+ rc = close (oldpub);
+ assert (rc == 0);
+#endif
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+#if defined XS_HAVE_WINDOWS
+ WSACleanup ();
+#endif
+
+ return 0 ;
+}
diff --git a/tests/tests.cpp b/tests/tests.cpp
index d4756ab..f5c403a 100644
--- a/tests/tests.cpp
+++ b/tests/tests.cpp
@@ -103,6 +103,10 @@
#include "wireformat.cpp"
#undef XS_TEST_MAIN
+#define XS_TEST_MAIN libzmq21
+#include "libzmq21.cpp"
+#undef XS_TEST_MAIN
+
int main ()
{
int rc;
@@ -143,6 +147,8 @@ int main ()
assert (rc == 0);
rc = wireformat ();
assert (rc == 0);
+ rc = libzmq21 ();
+ assert (rc == 0);
fprintf (stderr, "SUCCESS\n");
sleep (1);