diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 74 |
1 files changed, 62 insertions, 12 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a26c280..2384d80 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <iostream> + #include <string> #include <algorithm> @@ -35,6 +37,7 @@ #include "uuid.hpp" #include "pipe.hpp" #include "err.hpp" +#include "platform.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -145,6 +148,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, errno = ENOTSUP; return -1; + case ZMQ_RATE: + if (optvallen_ != sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + options.rate = *((int32_t*) optval_); + return 0; + + case ZMQ_RECOVERY_IVL: + if (optvallen_ != sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + options.recovery_ivl = *((int32_t*) optval_); + return 0; + default: errno = EINVAL; return -1; @@ -170,6 +189,21 @@ int zmq::socket_base_t::connect (const char *addr_) std::string session_name ("#"); session_name += uuid_t ().to_string (); + // Parse addr_ string. + std::string addr_type; + std::string addr_args; + + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + + addr_type = addr.substr (0, pos); + addr_args = addr.substr (pos + 3); + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -198,20 +232,36 @@ int zmq::socket_base_t::connect (const char *addr_) send_plug (session); send_own (this, session); - // Create the connecter object. Supply it with the session name so that - // it can bind the new connection to the session once it is established. - zmq_connecter_t *connecter = new zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - session_name.c_str ()); - int rc = connecter->set_address (addr_); - if (rc != 0) { - delete connecter; - return -1; + if (addr_type == "tcp") { + + // Create the connecter object. Supply it with the session name so that + // it can bind the new connection to the session once it is established. + zmq_connecter_t *connecter = new zmq_connecter_t ( + choose_io_thread (options.affinity), this, options, + session_name.c_str ()); + int rc = connecter->set_address (addr_args.c_str ()); + if (rc != 0) { + delete connecter; + return -1; + } + send_plug (connecter); + send_own (this, connecter); + + return 0; } - send_plug (connecter); - send_own (this, connecter); - return 0; +#if defined ZMQ_HAVE_OPENPGM + if (addr_type == "pgm") { + + zmq_assert (false); + + return 0; + } +#endif + + // Unknown address type. + errno = ENOTSUP; + return -1; } int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) |