From 47350adcb6ea48512d732bc323eb1835a5ac9908 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Sep 2009 18:16:47 +0200 Subject: separate class for PUB-style socket added --- src/Makefile.am | 2 ++ src/app_thread.cpp | 5 ++++- src/pub.cpp | 39 +++++++++++++++++++++++++++++++++++++++ src/pub.hpp | 41 +++++++++++++++++++++++++++++++++++++++++ src/sub.cpp | 12 ++++++++++++ src/sub.hpp | 2 ++ 6 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 src/pub.cpp create mode 100644 src/pub.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 68b34fa..a09c001 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,6 +65,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + pub.hpp \ select.hpp \ session.hpp \ simple_semaphore.hpp \ @@ -103,6 +104,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ owned.cpp \ pipe.cpp \ poll.cpp \ + pub.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 2bcc724..c48657a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "pub.hpp" #include "sub.hpp" // If the RDTSC is available we use it to prevent excessive @@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_PUB: + s = new pub_t (this); + break; case ZMQ_SUB: s = new sub_t (this); break; case ZMQ_P2P: - case ZMQ_PUB: case ZMQ_REQ: case ZMQ_REP: s = new socket_base_t (this); diff --git a/src/pub.cpp b/src/pub.cpp new file mode 100644 index 0000000..9f2729b --- /dev/null +++ b/src/pub.cpp @@ -0,0 +1,39 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../c/zmq.h" + +#include "pub.hpp" +#include "err.hpp" + +zmq::pub_t::pub_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ +} + +zmq::pub_t::~pub_t () +{ +} + +int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + diff --git a/src/pub.hpp b/src/pub.hpp new file mode 100644 index 0000000..2f03b8e --- /dev/null +++ b/src/pub.hpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PUB_INCLUDED__ +#define __ZMQ_PUB_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + + class pub_t : public socket_base_t + { + public: + + pub_t (class app_thread_t *parent_); + ~pub_t (); + + // Overloads of API functions from socket_base_t. + int recv (struct zmq_msg_t *msg_, int flags_); + }; + +} + +#endif diff --git a/src/sub.cpp b/src/sub.cpp index 8c1ef9b..d4545b5 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, return socket_base_t::setsockopt (option_, optval_, optvallen_); } +int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + +int zmq::sub_t::flush () +{ + errno = EFAULT; + return -1; +} + int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) { while (true) { diff --git a/src/sub.hpp b/src/sub.hpp index c88d30c..14fa687 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -37,6 +37,8 @@ namespace zmq // Overloads of API functions from socket_base_t. int setsockopt (int option_, const void *optval_, size_t optvallen_); + int send (struct zmq_msg_t *msg_, int flags_); + int flush (); int recv (struct zmq_msg_t *msg_, int flags_); private: -- cgit v1.2.3 From 36707529a7d82b164b59d42fe0d5d8a35c3dd279 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 14 Sep 2009 09:40:35 +0200 Subject: minor merge problem corrected --- src/pub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/pub.cpp b/src/pub.cpp index 9f2729b..d6eca01 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -23,7 +23,7 @@ #include "err.hpp" zmq::pub_t::pub_t (class app_thread_t *parent_) : - socket_base_t (parent_) + socket_base_t (parent_, ZMQ_SUB) { } -- cgit v1.2.3 From c806aabb2d3e6b1ba9e3f61319f23d45c7f9a007 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 14 Sep 2009 11:25:57 +0200 Subject: java binding sets socket options using setsockopt function --- src/options.cpp | 5 ++--- src/options.hpp | 1 - src/socket_base.cpp | 16 ++++------------ 3 files changed, 6 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/options.cpp b/src/options.cpp index 804cb4f..a39d312 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,9 +23,8 @@ zmq::options_t::options_t () : hwm (0), lwm (0), swap (0), - mask (0), affinity (0), - rate (0), - recovery_ivl (0) + rate (100), + recovery_ivl (10) { } diff --git a/src/options.hpp b/src/options.hpp index 9f4a264..4d359e3 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -34,7 +34,6 @@ namespace zmq int64_t hwm; int64_t lwm; int64_t swap; - uint64_t mask; uint64_t affinity; std::string identity; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9412d67..0429726 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -125,14 +125,6 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, options.swap = *((int64_t*) optval_); return 0; - case ZMQ_MASK: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.mask = (uint64_t) *((int64_t*) optval_); - return 0; - case ZMQ_AFFINITY: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -151,19 +143,19 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, return -1; case ZMQ_RATE: - if (optvallen_ != sizeof (uint32_t)) { + if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; return -1; } - options.rate = *((int32_t*) optval_); + options.rate = (uint32_t) *((int64_t*) optval_); return 0; case ZMQ_RECOVERY_IVL: - if (optvallen_ != sizeof (uint32_t)) { + if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; return -1; } - options.recovery_ivl = *((int32_t*) optval_); + options.recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; default: -- cgit v1.2.3 From 37cacc5700eaaaddbe2df6e3affeca4a335b023a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 14 Sep 2009 12:28:13 +0200 Subject: ZMQII-1: Win32 - failure on shutdown --- src/select.cpp | 4 ++-- src/socket_base.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/select.cpp b/src/select.cpp index f10acdc..cb17169 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -53,10 +53,10 @@ zmq::select_t::select_t () : zmq::select_t::~select_t () { + worker.stop (); + // Make sure there are no fds registered on shutdown. zmq_assert (load.get () == 0); - - worker.stop (); } zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0429726..51649fb 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -281,7 +281,7 @@ int zmq::socket_base_t::connect (const char *addr_) #endif // Unknown address type. - errno = ENOTSUP; + errno = EFAULT; return -1; } -- cgit v1.2.3 From 2bc9419ced21151fe90c530758dc85b7024fdb70 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 14 Sep 2009 13:54:30 +0200 Subject: ZMQII-10: Make connections interrupted during the init phase be closed silently --- src/zmq_decoder.cpp | 13 +++++++++++-- src/zmq_listener_init.cpp | 7 +++++-- 2 files changed, 16 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index e51d802..53811a1 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -20,6 +20,7 @@ #include "zmq_decoder.hpp" #include "i_inout.hpp" #include "wire.hpp" +#include "err.hpp" zmq::zmq_decoder_t::zmq_decoder_t () : destination (NULL) @@ -48,7 +49,11 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () if (*tmpbuf == 0xff) next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); else { - zmq_msg_init_size (&in_progress, *tmpbuf); + + // TODO: Handle over-sized message decently. + int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), *tmpbuf, &zmq_decoder_t::message_ready); } @@ -60,7 +65,11 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // 8-byte size is read. Allocate the buffer for message body and // read the message data into it. size_t size = (size_t) get_uint64 (tmpbuf); - zmq_msg_init_size (&in_progress, size); + + // TODO: Handle over-sized message decently. + int rc = zmq_msg_init_size (&in_progress, size); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), size, &zmq_decoder_t::message_ready); return true; diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 98a3780..756e9d8 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush () void zmq::zmq_listener_init_t::detach () { - // TODO: Engine is closing down. Init object is to be closed as well. - zmq_assert (false); + // This function is called by engine when disconnection occurs. + // The engine will destroy itself, so we just drop the pointer here and + // start termination of the init object. + engine = NULL; + term (); } void zmq::zmq_listener_init_t::process_plug () -- cgit v1.2.3