summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-15 09:44:44 +0200
committermalosek <malosek@fastmq.com>2009-09-15 09:44:44 +0200
commit364281343ce0fd03b25bc6b1b451ee7ba8db436b (patch)
tree32cd09900e2ecdc31d6a8f2a47a68fe128980169 /src
parentbdf22e9c2fe82366283f4edc02fd59c37fdb2c4b (diff)
parentf7ad4a203ad184d97002111f4ffb8bfe6a8f7c01 (diff)
Merge branch 'master' of git@github.com:sustrik/zeromq2
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/app_thread.cpp5
-rw-r--r--src/options.cpp5
-rw-r--r--src/options.hpp1
-rw-r--r--src/pub.cpp39
-rw-r--r--src/pub.hpp41
-rw-r--r--src/select.cpp4
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/sub.cpp12
-rw-r--r--src/sub.hpp2
-rw-r--r--src/zmq_decoder.cpp13
-rw-r--r--src/zmq_listener_init.cpp7
12 files changed, 125 insertions, 24 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index f4f338e..ce88b26 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -67,6 +67,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \
platform.hpp \
poll.hpp \
+ pub.hpp \
select.hpp \
session.hpp \
simple_semaphore.hpp \
@@ -107,6 +108,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
+ pub.cpp \
select.cpp \
session.cpp \
socket_base.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 517b721..feaa4d6 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "pub.hpp"
#include "sub.hpp"
// If the RDTSC is available we use it to prevent excessive
@@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_PUB:
+ s = new pub_t (this);
+ break;
case ZMQ_SUB:
s = new sub_t (this);
break;
case ZMQ_P2P:
- case ZMQ_PUB:
case ZMQ_REQ:
case ZMQ_REP:
s = new socket_base_t (this, type_);
diff --git a/src/options.cpp b/src/options.cpp
index 804cb4f..a39d312 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,9 +23,8 @@ zmq::options_t::options_t () :
hwm (0),
lwm (0),
swap (0),
- mask (0),
affinity (0),
- rate (0),
- recovery_ivl (0)
+ rate (100),
+ recovery_ivl (10)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index 9f4a264..4d359e3 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -34,7 +34,6 @@ namespace zmq
int64_t hwm;
int64_t lwm;
int64_t swap;
- uint64_t mask;
uint64_t affinity;
std::string identity;
diff --git a/src/pub.cpp b/src/pub.cpp
new file mode 100644
index 0000000..d6eca01
--- /dev/null
+++ b/src/pub.cpp
@@ -0,0 +1,39 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../c/zmq.h"
+
+#include "pub.hpp"
+#include "err.hpp"
+
+zmq::pub_t::pub_t (class app_thread_t *parent_) :
+ socket_base_t (parent_, ZMQ_SUB)
+{
+}
+
+zmq::pub_t::~pub_t ()
+{
+}
+
+int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_)
+{
+ errno = EFAULT;
+ return -1;
+}
+
diff --git a/src/pub.hpp b/src/pub.hpp
new file mode 100644
index 0000000..2f03b8e
--- /dev/null
+++ b/src/pub.hpp
@@ -0,0 +1,41 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_PUB_INCLUDED__
+#define __ZMQ_PUB_INCLUDED__
+
+#include "socket_base.hpp"
+
+namespace zmq
+{
+
+ class pub_t : public socket_base_t
+ {
+ public:
+
+ pub_t (class app_thread_t *parent_);
+ ~pub_t ();
+
+ // Overloads of API functions from socket_base_t.
+ int recv (struct zmq_msg_t *msg_, int flags_);
+ };
+
+}
+
+#endif
diff --git a/src/select.cpp b/src/select.cpp
index f10acdc..cb17169 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -53,10 +53,10 @@ zmq::select_t::select_t () :
zmq::select_t::~select_t ()
{
+ worker.stop ();
+
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
-
- worker.stop ();
}
zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 99e8ab1..900f1c5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -123,14 +123,6 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.swap = *((int64_t*) optval_);
return 0;
- case ZMQ_MASK:
- if (optvallen_ != sizeof (int64_t)) {
- errno = EINVAL;
- return -1;
- }
- options.mask = (uint64_t) *((int64_t*) optval_);
- return 0;
-
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
@@ -149,19 +141,19 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
return -1;
case ZMQ_RATE:
- if (optvallen_ != sizeof (uint32_t)) {
+ if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
- options.rate = *((int32_t*) optval_);
+ options.rate = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
- if (optvallen_ != sizeof (uint32_t)) {
+ if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
- options.recovery_ivl = *((int32_t*) optval_);
+ options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
default:
@@ -287,7 +279,7 @@ int zmq::socket_base_t::connect (const char *addr_)
#endif
// Unknown address type.
- errno = ENOTSUP;
+ errno = EFAULT;
return -1;
}
diff --git a/src/sub.cpp b/src/sub.cpp
index 1503fe2..51e0c23 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return socket_base_t::setsockopt (option_, optval_, optvallen_);
}
+int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_)
+{
+ errno = EFAULT;
+ return -1;
+}
+
+int zmq::sub_t::flush ()
+{
+ errno = EFAULT;
+ return -1;
+}
+
int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
{
while (true) {
diff --git a/src/sub.hpp b/src/sub.hpp
index c88d30c..14fa687 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -37,6 +37,8 @@ namespace zmq
// Overloads of API functions from socket_base_t.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
+ int send (struct zmq_msg_t *msg_, int flags_);
+ int flush ();
int recv (struct zmq_msg_t *msg_, int flags_);
private:
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index e51d802..53811a1 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -20,6 +20,7 @@
#include "zmq_decoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
+#include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t () :
destination (NULL)
@@ -48,7 +49,11 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
if (*tmpbuf == 0xff)
next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
else {
- zmq_msg_init_size (&in_progress, *tmpbuf);
+
+ // TODO: Handle over-sized message decently.
+ int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ errno_assert (rc == 0);
+
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
@@ -60,7 +65,11 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// 8-byte size is read. Allocate the buffer for message body and
// read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf);
- zmq_msg_init_size (&in_progress, size);
+
+ // TODO: Handle over-sized message decently.
+ int rc = zmq_msg_init_size (&in_progress, size);
+ errno_assert (rc == 0);
+
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
return true;
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 98a3780..756e9d8 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush ()
void zmq::zmq_listener_init_t::detach ()
{
- // TODO: Engine is closing down. Init object is to be closed as well.
- zmq_assert (false);
+ // This function is called by engine when disconnection occurs.
+ // The engine will destroy itself, so we just drop the pointer here and
+ // start termination of the init object.
+ engine = NULL;
+ term ();
}
void zmq::zmq_listener_init_t::process_plug ()