summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-16 10:56:55 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-16 10:56:55 +0200
commit6e03cb2f3eb083e1de8e7161d3ab21b52c87eece (patch)
tree9cf442583cae614f2bcfca22fd0b9a55c7f0ccda /src
parentbce2e60bbb3b7f799a532d9b8f2e171c570b9fea (diff)
parent9fbdcc7940823634d82f51d2b124ccfbca6e9b17 (diff)
Merge branch 'master' of git@github.com:sustrik/zeromq2
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/options.cpp3
-rw-r--r--src/options.hpp7
-rw-r--r--src/pgm_receiver.cpp207
-rw-r--r--src/pgm_receiver.hpp98
-rw-r--r--src/pgm_sender.cpp7
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/pgm_socket.cpp17
-rw-r--r--src/pub.cpp2
-rw-r--r--src/socket_base.cpp75
-rw-r--r--src/sub.cpp6
11 files changed, 400 insertions, 26 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index ce88b26..398c861 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -62,6 +62,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.hpp \
options.hpp \
owned.hpp \
+ pgm_receiver.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
pipe.hpp \
@@ -104,6 +105,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.cpp \
options.cpp \
owned.cpp \
+ pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
pipe.cpp \
diff --git a/src/options.cpp b/src/options.cpp
index a39d312..55417f5 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -25,6 +25,7 @@ zmq::options_t::options_t () :
swap (0),
affinity (0),
rate (100),
- recovery_ivl (10)
+ recovery_ivl (10),
+ use_multicast_loop (false)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index 4d359e3..c1ecb57 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -37,11 +37,14 @@ namespace zmq
uint64_t affinity;
std::string identity;
- // Maximum tranfer rate [kb/s].
+ // Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
- // Reliability time interval [s].
+ // Reliability time interval [s]. Default 10s.
uint32_t recovery_ivl;
+
+ // Enable multicast loopback. Default disabled (false).
+ bool use_multicast_loop;
};
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
new file mode 100644
index 0000000..6ea310c
--- /dev/null
+++ b/src/pgm_receiver.cpp
@@ -0,0 +1,207 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#if defined ZMQ_HAVE_OPENPGM
+
+#include <iostream>
+
+#include "pgm_receiver.hpp"
+#include "err.hpp"
+#include "stdint.hpp"
+#include "wire.hpp"
+#include "i_inout.hpp"
+
+//#define PGM_RECEIVER_DEBUG
+//#define PGM_RECEIVER_DEBUG_LEVEL 1
+
+// level 1 = key behaviour
+// level 2 = processing flow
+// level 4 = infos
+
+#ifndef PGM_RECEIVER_DEBUG
+# define zmq_log(n, ...) while (0)
+#else
+# define zmq_log(n, ...) do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \
+ { printf (__VA_ARGS__);}} while (0)
+#endif
+
+zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
+ const options_t &options_, const char *session_name_) :
+ io_object_t (parent_),
+ decoder (NULL),
+ pgm_socket (true, options_),
+ options (options_),
+ session_name (session_name_),
+ joined (false),
+ inout (NULL)
+{
+}
+
+zmq::pgm_receiver_t::~pgm_receiver_t ()
+{
+ if (decoder)
+ delete decoder;
+}
+
+int zmq::pgm_receiver_t::init (const char *network_)
+{
+ decoder = new zmq_decoder_t;
+ zmq_assert (decoder);
+
+ return pgm_socket.init (network_);
+}
+
+void zmq::pgm_receiver_t::plug (i_inout *inout_)
+{
+ // Allocate 2 fds one for socket second for waiting pipe.
+ int socket_fd;
+ int waiting_pipe_fd;
+
+ decoder->set_inout (inout_);
+
+ // Fill socket_fd and waiting_pipe_fd from PGM transport
+ pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
+
+ // Add socket_fd into poller.
+ socket_handle = add_fd (socket_fd);
+
+ // Add waiting_pipe_fd into poller.
+ pipe_handle = add_fd (waiting_pipe_fd);
+
+ // Set POLLIN for both handlers.
+ set_pollin (pipe_handle);
+ set_pollin (socket_handle);
+
+ inout = inout_;
+}
+
+void zmq::pgm_receiver_t::unplug ()
+{
+ rm_fd (socket_handle);
+ rm_fd (pipe_handle);
+ decoder->set_inout (NULL);
+ inout = NULL;
+}
+
+void zmq::pgm_receiver_t::revive ()
+{
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::reconnect ()
+{
+ // Save inout ptr.
+ i_inout *inout_tmp = inout;
+
+ // PGM receiver is not joined anymore.
+ joined = false;
+
+ // Unplug - plug PGM transport.
+ unplug ();
+ delete decoder;
+ decoder = new zmq_decoder_t;
+ zmq_assert (decoder);
+ plug (inout_tmp);
+}
+
+// POLLIN event from socket or waiting_pipe.
+void zmq::pgm_receiver_t::in_event ()
+{
+ void *data_with_offset;
+ ssize_t nbytes = 0;
+
+ // Read all data from pgm socket.
+ while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
+
+ // Push all the data to the decoder.
+ decoder->write ((unsigned char*)data_with_offset, nbytes);
+ }
+
+ // Flush any messages decoder may have produced to the dispatcher.
+ inout->flush ();
+
+ // Data loss detected.
+ if (nbytes == -1) {
+
+ // Recreate PGM transport.
+ reconnect ();
+ }
+}
+
+void zmq::pgm_receiver_t::out_event ()
+{
+ zmq_assert (false);
+}
+
+ssize_t zmq::pgm_receiver_t::receive_with_offset
+ (void **data_)
+{
+
+ // Data from PGM socket.
+ void *rd = NULL;
+ unsigned char *raw_data = NULL;
+
+ // Read data from underlying pgm_socket.
+ ssize_t nbytes = pgm_socket.receive ((void**) &rd);
+ raw_data = (unsigned char*) rd;
+
+ // No ODATA or RDATA.
+ if (!nbytes)
+ return 0;
+
+ // Data loss.
+ if (nbytes == -1) {
+ return -1;
+ }
+
+ // Read offset of the fist message in current APDU.
+ uint16_t apdu_offset = get_uint16 (raw_data);
+
+ // Shift raw_data & decrease nbytes by the first message offset
+ // information (sizeof uint16_t).
+ *data_ = raw_data + sizeof (uint16_t);
+ nbytes -= sizeof (uint16_t);
+
+ // There is not beginning of the message in current APDU and we
+ // are not joined jet -> throwing data.
+ if (apdu_offset == 0xFFFF && !joined) {
+ *data_ = NULL;
+ return 0;
+ }
+
+ // Now is the possibility to join the stream.
+ if (!joined) {
+
+ // We have to move data to the begining of the first message.
+ *data_ = (unsigned char *)*data_ + apdu_offset;
+ nbytes -= apdu_offset;
+
+ // Joined the stream.
+ joined = true;
+
+ zmq_log (2, "joined into the stream, %s(%i)\n",
+ __FILE__, __LINE__);
+ }
+
+ return nbytes;
+}
+#endif
+
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
new file mode 100644
index 0000000..53d5340
--- /dev/null
+++ b/src/pgm_receiver.hpp
@@ -0,0 +1,98 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
+#define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#if defined ZMQ_HAVE_OPENPGM
+
+#include "io_object.hpp"
+#include "i_engine.hpp"
+#include "options.hpp"
+#include "zmq_decoder.hpp"
+#include "pgm_socket.hpp"
+
+namespace zmq
+{
+
+ class pgm_receiver_t : public io_object_t, public i_engine
+ {
+
+ public:
+
+ // Creates gm_engine. Underlying PGM connection is initialised
+ // using network_ parameter.
+ pgm_receiver_t (class io_thread_t *parent_, const options_t &options_,
+ const char *session_name_);
+ ~pgm_receiver_t ();
+
+ int init (const char *network_);
+ void reconnect ();
+
+ // i_engine interface implementation.
+ void plug (struct i_inout *inout_);
+ void unplug ();
+ void revive ();
+
+ // i_poll_events interface implementation.
+ void in_event ();
+ void out_event ();
+
+ private:
+ // Read exactly iov_len_ count APDUs, function returns number
+ // of bytes received. Note that if we did not join message stream
+ // before and there is not message beginning in the APDUs being
+ // received iov_len for such a APDUs will be 0.
+ ssize_t receive_with_offset (void **data_);
+
+ // Message decoder.
+ zmq_decoder_t *decoder;
+
+ // PGM socket.
+ pgm_socket_t pgm_socket;
+
+ // Socket options.
+ options_t options;
+
+ // Name of the session associated with the connecter.
+ std::string session_name;
+
+ // If receiver joined the messages stream.
+ bool joined;
+
+ // Parent session.
+ i_inout *inout;
+
+ // Poll handle associated with PGM socket.
+ handle_t socket_handle;
+
+ // Poll handle associated with engine PGM waiting pipe.
+ handle_t pipe_handle;
+
+ pgm_receiver_t (const pgm_receiver_t&);
+ void operator = (const pgm_receiver_t&);
+ };
+
+}
+
+#endif
+
+#endif
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 8e7a66b..9b1cef4 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -90,11 +90,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
set_pollout (handle);
inout = inout_;
-
- zmq_log (1, "plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)",
- downlink_socket_fd, uplink_socket_fd, __FILE__, __LINE__);
-
- std::cout << std::flush;
}
void zmq::pgm_sender_t::unplug ()
@@ -185,7 +180,7 @@ void zmq::pgm_sender_t::out_event ()
size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
size_t size_, uint16_t offset_)
{
- zmq_log (1, "data_size %i, first message offset %i, %s(%i)",
+ zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n",
(int) size_, offset_, __FILE__, __LINE__);
std::cout << std::flush;
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index b1d1599..80be8d4 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -77,7 +77,7 @@ namespace zmq
handle_t handle;
handle_t uplink_handle;
- // ?
+ // Parent session.
i_inout *inout;
// Output buffer from pgm_socket.
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 5e2e764..315b43e 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -386,12 +386,14 @@ int zmq::pgm_socket_t::open_transport (void)
return -1;
}
}
-
+
// Enable multicast loopback.
- rc = pgm_transport_set_multicast_loop (g_transport, true);
- if (rc != 0) {
- errno = EINVAL;
- return -1;
+ if (options.use_multicast_loop) {
+ rc = pgm_transport_set_multicast_loop (g_transport, true);
+ if (rc != 0) {
+ errno = EINVAL;
+ return -1;
+ }
}
// Bind a transport to the specified network devices.
@@ -486,6 +488,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
// Send one APDU, transmit window owned memory.
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
+
iovec iov = {data_,data_len_};
ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,
@@ -561,7 +564,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// returned.
ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
{
-
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
@@ -575,7 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
// If we have are going first time or if we have processed all pgm_msgv_t
- // structure previaously read from the pgm socket.
+ // structure previously read from the pgm socket.
if (nbytes_rec == nbytes_processed) {
// Check program flow.
@@ -615,6 +617,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
zmq_log (4, "received %i bytes\n", (int)nbytes_rec);
+
}
zmq_assert (nbytes_rec > 0);
diff --git a/src/pub.cpp b/src/pub.cpp
index d6eca01..5c6e329 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -23,7 +23,7 @@
#include "err.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_, ZMQ_SUB)
+ socket_base_t (parent_, ZMQ_PUB)
{
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 900f1c5..c195e91 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -37,6 +37,7 @@
#include "err.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
+#include "pgm_receiver.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
@@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
+ case ZMQ_MCAST_LOOP:
+ if (optvallen_ != sizeof (bool)) {
+ errno = EINVAL;
+ return -1;
+ }
+ options.use_multicast_loop = optval_;
+ return 0;
+
default:
errno = EINVAL;
return -1;
@@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::bind (const char *addr_)
{
- zmq_listener_t *listener = new zmq_listener_t (
- choose_io_thread (options.affinity), this, options);
- int rc = listener->set_address (addr_);
- if (rc != 0)
+ // Parse addr_ string.
+ std::string addr_type;
+ std::string addr_args;
+
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+
+ if (pos == std::string::npos) {
+ errno = EINVAL;
return -1;
+ }
- send_plug (listener);
- send_own (this, listener);
- return 0;
+ addr_type = addr.substr (0, pos);
+ addr_args = addr.substr (pos + 3);
+
+ if (addr_type == "tcp") {
+ zmq_listener_t *listener = new zmq_listener_t (
+ choose_io_thread (options.affinity), this, options);
+ int rc = listener->set_address (addr_args.c_str ());
+ if (rc != 0)
+ return -1;
+
+ send_plug (listener);
+ send_own (this, listener);
+ return 0;
+ }
+
+#if defined ZMQ_HAVE_OPENPGM
+ if (addr_type == "pgm") {
+ // In the case of PGM bind behaves the same like connect.
+ return connect (addr_);
+ }
+#endif
+
+ // Unknown address type.
+ errno = EFAULT;
+ return -1;
}
int zmq::socket_base_t::connect (const char *addr_)
@@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "pgm") {
switch (type) {
+
+ // PGM sender.
case ZMQ_PUB:
{
pgm_sender_t *pgm_sender =
@@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_)
break;
}
+
+ // PGM receiver.
case ZMQ_SUB:
- zmq_assert (false);
+ {
+ pgm_receiver_t *pgm_receiver =
+ new pgm_receiver_t (choose_io_thread (options.affinity), options,
+ session_name.c_str ());
+
+ int rc = pgm_receiver->init (addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_receiver;
+ return -1;
+ }
+
+ // Reserve a sequence number for following 'attach' command.
+ session->inc_seqnum ();
+ send_attach (session, pgm_receiver);
+
+ pgm_receiver = NULL;
+
break;
+ }
+
default:
errno = EINVAL;
return -1;
diff --git a/src/sub.cpp b/src/sub.cpp
index 51e0c23..101c62f 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -101,6 +101,12 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
if (rc != 0 && errno == EAGAIN)
return -1;
+ // If there is no subscription return -1/EAGAIN.
+ if (!all_count && prefixes.empty () && topics.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
// If there is at least one "*" subscription, the message matches.
if (all_count)
return 0;