diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/app_thread.cpp | 5 | ||||
-rw-r--r-- | src/options.cpp | 5 | ||||
-rw-r--r-- | src/options.hpp | 1 | ||||
-rw-r--r-- | src/pub.cpp | 39 | ||||
-rw-r--r-- | src/pub.hpp | 41 | ||||
-rw-r--r-- | src/select.cpp | 4 | ||||
-rw-r--r-- | src/socket_base.cpp | 18 | ||||
-rw-r--r-- | src/sub.cpp | 12 | ||||
-rw-r--r-- | src/sub.hpp | 2 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 13 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 7 |
12 files changed, 125 insertions, 24 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index f4f338e..ce88b26 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,6 +67,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + pub.hpp \ select.hpp \ session.hpp \ simple_semaphore.hpp \ @@ -107,6 +108,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ + pub.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 517b721..feaa4d6 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "pub.hpp" #include "sub.hpp" // If the RDTSC is available we use it to prevent excessive @@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_PUB: + s = new pub_t (this); + break; case ZMQ_SUB: s = new sub_t (this); break; case ZMQ_P2P: - case ZMQ_PUB: case ZMQ_REQ: case ZMQ_REP: s = new socket_base_t (this, type_); diff --git a/src/options.cpp b/src/options.cpp index 804cb4f..a39d312 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,9 +23,8 @@ zmq::options_t::options_t () : hwm (0), lwm (0), swap (0), - mask (0), affinity (0), - rate (0), - recovery_ivl (0) + rate (100), + recovery_ivl (10) { } diff --git a/src/options.hpp b/src/options.hpp index 9f4a264..4d359e3 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -34,7 +34,6 @@ namespace zmq int64_t hwm; int64_t lwm; int64_t swap; - uint64_t mask; uint64_t affinity; std::string identity; diff --git a/src/pub.cpp b/src/pub.cpp new file mode 100644 index 0000000..d6eca01 --- /dev/null +++ b/src/pub.cpp @@ -0,0 +1,39 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../c/zmq.h" + +#include "pub.hpp" +#include "err.hpp" + +zmq::pub_t::pub_t (class app_thread_t *parent_) : + socket_base_t (parent_, ZMQ_SUB) +{ +} + +zmq::pub_t::~pub_t () +{ +} + +int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + diff --git a/src/pub.hpp b/src/pub.hpp new file mode 100644 index 0000000..2f03b8e --- /dev/null +++ b/src/pub.hpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_PUB_INCLUDED__ +#define __ZMQ_PUB_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + + class pub_t : public socket_base_t + { + public: + + pub_t (class app_thread_t *parent_); + ~pub_t (); + + // Overloads of API functions from socket_base_t. + int recv (struct zmq_msg_t *msg_, int flags_); + }; + +} + +#endif diff --git a/src/select.cpp b/src/select.cpp index f10acdc..cb17169 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -53,10 +53,10 @@ zmq::select_t::select_t () : zmq::select_t::~select_t () { + worker.stop (); + // Make sure there are no fds registered on shutdown. zmq_assert (load.get () == 0); - - worker.stop (); } zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 99e8ab1..900f1c5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -123,14 +123,6 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, options.swap = *((int64_t*) optval_); return 0; - case ZMQ_MASK: - if (optvallen_ != sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - options.mask = (uint64_t) *((int64_t*) optval_); - return 0; - case ZMQ_AFFINITY: if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; @@ -149,19 +141,19 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, return -1; case ZMQ_RATE: - if (optvallen_ != sizeof (uint32_t)) { + if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; return -1; } - options.rate = *((int32_t*) optval_); + options.rate = (uint32_t) *((int64_t*) optval_); return 0; case ZMQ_RECOVERY_IVL: - if (optvallen_ != sizeof (uint32_t)) { + if (optvallen_ != sizeof (int64_t)) { errno = EINVAL; return -1; } - options.recovery_ivl = *((int32_t*) optval_); + options.recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; default: @@ -287,7 +279,7 @@ int zmq::socket_base_t::connect (const char *addr_) #endif // Unknown address type. - errno = ENOTSUP; + errno = EFAULT; return -1; } diff --git a/src/sub.cpp b/src/sub.cpp index 1503fe2..51e0c23 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, return socket_base_t::setsockopt (option_, optval_, optvallen_); } +int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + +int zmq::sub_t::flush () +{ + errno = EFAULT; + return -1; +} + int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) { while (true) { diff --git a/src/sub.hpp b/src/sub.hpp index c88d30c..14fa687 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -37,6 +37,8 @@ namespace zmq // Overloads of API functions from socket_base_t. int setsockopt (int option_, const void *optval_, size_t optvallen_); + int send (struct zmq_msg_t *msg_, int flags_); + int flush (); int recv (struct zmq_msg_t *msg_, int flags_); private: diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index e51d802..53811a1 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -20,6 +20,7 @@ #include "zmq_decoder.hpp" #include "i_inout.hpp" #include "wire.hpp" +#include "err.hpp" zmq::zmq_decoder_t::zmq_decoder_t () : destination (NULL) @@ -48,7 +49,11 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () if (*tmpbuf == 0xff) next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); else { - zmq_msg_init_size (&in_progress, *tmpbuf); + + // TODO: Handle over-sized message decently. + int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), *tmpbuf, &zmq_decoder_t::message_ready); } @@ -60,7 +65,11 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // 8-byte size is read. Allocate the buffer for message body and // read the message data into it. size_t size = (size_t) get_uint64 (tmpbuf); - zmq_msg_init_size (&in_progress, size); + + // TODO: Handle over-sized message decently. + int rc = zmq_msg_init_size (&in_progress, size); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), size, &zmq_decoder_t::message_ready); return true; diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 98a3780..756e9d8 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush () void zmq::zmq_listener_init_t::detach () { - // TODO: Engine is closing down. Init object is to be closed as well. - zmq_assert (false); + // This function is called by engine when disconnection occurs. + // The engine will destroy itself, so we just drop the pointer here and + // start termination of the init object. + engine = NULL; + term (); } void zmq::zmq_listener_init_t::process_plug () |