From 47350adcb6ea48512d732bc323eb1835a5ac9908 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 Sep 2009 18:16:47 +0200 Subject: separate class for PUB-style socket added --- c/zmq.h | 6 +++--- src/Makefile.am | 2 ++ src/app_thread.cpp | 5 ++++- src/pub.cpp | 39 +++++++++++++++++++++++++++++++++++++++ src/pub.hpp | 41 +++++++++++++++++++++++++++++++++++++++++ src/sub.cpp | 12 ++++++++++++ src/sub.hpp | 2 ++ 7 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 src/pub.cpp create mode 100644 src/pub.hpp diff --git a/c/zmq.h b/c/zmq.h index df6e04c..cb86dcc 100644 --- a/c/zmq.h +++ b/c/zmq.h @@ -184,12 +184,12 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr); // // Errors: EAGAIN - message cannot be sent at the moment (applies only to // non-blocking send). -// ENOTSUP - function isn't supported by particular socket type. +// EFAULT - function isn't supported by particular socket type. ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags); // Flush the messages that were send using ZMQ_NOFLUSH flag down the stream. // -// Errors: ENOTSUP - function isn't supported by particular socket type. +// Errors: FAULT - function isn't supported by particular socket type. ZMQ_EXPORT int zmq_flush (void *s); // Send a message from the socket 's'. 'flags' argument can be combination @@ -198,7 +198,7 @@ ZMQ_EXPORT int zmq_flush (void *s); // // Errors: EAGAIN - message cannot be received at the moment (applies only to // non-blocking receive). -// ENOTSUP - function isn't supported by particular socket type. +// EFAULT - function isn't supported by particular socket type. ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags); // Helper functions used by perf tests so that they don't have to care diff --git a/src/Makefile.am b/src/Makefile.am index 68b34fa..a09c001 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,6 +65,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + pub.hpp \ select.hpp \ session.hpp \ simple_semaphore.hpp \ @@ -103,6 +104,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ owned.cpp \ pipe.cpp \ poll.cpp \ + pub.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 2bcc724..c48657a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "pub.hpp" #include "sub.hpp" // If the RDTSC is available we use it to prevent excessive @@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_PUB: + s = new pub_t (this); + break; case ZMQ_SUB: s = new sub_t (this); break; case ZMQ_P2P: - case ZMQ_PUB: case ZMQ_REQ: case ZMQ_REP: s = new socket_base_t (this); diff --git a/src/pub.cpp b/src/pub.cpp new file mode 100644 index 0000000..9f2729b --- /dev/null +++ b/src/pub.cpp @@ -0,0 +1,39 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../c/zmq.h" + +#include "pub.hpp" +#include "err.hpp" + +zmq::pub_t::pub_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ +} + +zmq::pub_t::~pub_t () +{ +} + +int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + diff --git a/src/pub.hpp b/src/pub.hpp new file mode 100644 index 0000000..2f03b8e --- /dev/null +++ b/src/pub.hpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PUB_INCLUDED__ +#define __ZMQ_PUB_INCLUDED__ + +#include "socket_base.hpp" + +namespace zmq +{ + + class pub_t : public socket_base_t + { + public: + + pub_t (class app_thread_t *parent_); + ~pub_t (); + + // Overloads of API functions from socket_base_t. + int recv (struct zmq_msg_t *msg_, int flags_); + }; + +} + +#endif diff --git a/src/sub.cpp b/src/sub.cpp index 8c1ef9b..d4545b5 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, return socket_base_t::setsockopt (option_, optval_, optvallen_); } +int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) +{ + errno = EFAULT; + return -1; +} + +int zmq::sub_t::flush () +{ + errno = EFAULT; + return -1; +} + int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) { while (true) { diff --git a/src/sub.hpp b/src/sub.hpp index c88d30c..14fa687 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -37,6 +37,8 @@ namespace zmq // Overloads of API functions from socket_base_t. int setsockopt (int option_, const void *optval_, size_t optvallen_); + int send (struct zmq_msg_t *msg_, int flags_); + int flush (); int recv (struct zmq_msg_t *msg_, int flags_); private: -- cgit v1.2.3