/* Copyright (c) 2012 250bpm s.r.o. Copyright (c) 2012 Martin Lucina Copyright (c) 2012 Other contributors as noted in the AUTHORS file This file is part of Crossroads I/O project. Crossroads I/O is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "../include/zmq.h" #include "../include/zmq_utils.h" #include "../include/xs/xs.h" #include "platform.hpp" #include #include #include #if !defined XS_HAVE_WINDOWS #include #else #include "windows.hpp" #endif void zmq_version (int *major_, int *minor_, int *patch_) { *major_ = ZMQ_VERSION_MAJOR; *minor_ = ZMQ_VERSION_MINOR; *patch_ = ZMQ_VERSION_PATCH; } int zmq_errno () { return xs_errno (); } const char *zmq_strerror (int errnum) { return xs_strerror (errnum); } int zmq_msg_init (zmq_msg_t *msg) { xs_msg_t *content = (xs_msg_t*) malloc (sizeof (xs_msg_t)); assert (content); int rc = xs_msg_init (content); if (rc != 0) return -1; msg->content = (void*) content; return 0; } int zmq_msg_init_size (zmq_msg_t *msg, size_t size) { xs_msg_t *content = (xs_msg_t*) malloc (sizeof (xs_msg_t)); assert (content); int rc = xs_msg_init_size (content, size); if (rc != 0) return -1; msg->content = (void*) content; return 0; } int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint) { xs_msg_t *content = (xs_msg_t*) malloc (sizeof (xs_msg_t)); assert (content); int rc = xs_msg_init_data (content, data, size, ffn, hint); if (rc != 0) return -1; msg->content = (void*) content; return 0; } int zmq_msg_close (zmq_msg_t *msg) { int rc = xs_msg_close ((xs_msg_t*) msg->content); if (rc != 0) return -1; free (msg->content); msg->content = NULL; return 0; } int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src) { return xs_msg_move ((xs_msg_t*) dest->content, (xs_msg_t*) src->content); } int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src) { return xs_msg_copy ((xs_msg_t*) dest->content, (xs_msg_t*) src->content); } void *zmq_msg_data (zmq_msg_t *msg) { return xs_msg_data ((xs_msg_t*) msg->content); } size_t zmq_msg_size (zmq_msg_t *msg) { return xs_msg_size ((xs_msg_t*) msg->content); } void *zmq_init (int io_threads) { void *ctx = xs_init (); if (!ctx) return NULL; // Crossroads don't allow for zero I/O threads. if (io_threads < 1) io_threads = 1; int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads, sizeof (io_threads)); if (rc != 0) { xs_term (ctx); return NULL; } return ctx; } int zmq_term (void *context) { return xs_term (context); } void *zmq_socket (void *context, int type) { return xs_socket (context, type); } int zmq_close (void *s) { return xs_close (s); } int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen) { switch (option) { case ZMQ_AFFINITY: case ZMQ_IDENTITY: case ZMQ_SUBSCRIBE: case ZMQ_UNSUBSCRIBE: case ZMQ_LINGER: case ZMQ_RECONNECT_IVL: case ZMQ_RECONNECT_IVL_MAX: case ZMQ_BACKLOG: return xs_setsockopt (s, option, optval, optvallen); case ZMQ_HWM: { if (optvallen != sizeof (uint64_t)) { errno = EINVAL; return -1; } int val = (int) *(uint64_t*) optval; int rc = xs_setsockopt (s, XS_SNDHWM, &val, sizeof (int)); if (rc < 0) return -1; return xs_setsockopt (s, XS_RCVHWM, &val, sizeof (int)); } case ZMQ_RATE: { if (optvallen != sizeof (int64_t)) { errno = EINVAL; return -1; } int val = (int) *(int64_t*) optval; return xs_setsockopt (s, option, &val, sizeof (int)); } case ZMQ_RECOVERY_IVL: { if (optvallen != sizeof (int64_t)) { errno = EINVAL; return -1; } int val = ((int) *(int64_t*) optval) * 1000; return xs_setsockopt (s, option, &val, sizeof (int)); } case ZMQ_RECOVERY_IVL_MSEC: { if (optvallen != sizeof (int64_t)) { errno = EINVAL; return -1; } int val = (int) *(int64_t*) optval; return xs_setsockopt (s, option, &val, sizeof (int)); } case ZMQ_SNDBUF: case ZMQ_RCVBUF: { if (optvallen != sizeof (uint64_t)) { errno = EINVAL; return -1; } int val = (int) *(uint64_t*) optval; return xs_setsockopt (s, option, &val, sizeof (int)); } default: errno = EINVAL; return -1; } } int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen) { switch (option) { case ZMQ_TYPE: case ZMQ_AFFINITY: case ZMQ_IDENTITY: case ZMQ_FD: case ZMQ_BACKLOG: case ZMQ_LINGER: case ZMQ_RECONNECT_IVL: case ZMQ_RECONNECT_IVL_MAX: return xs_getsockopt (s, option, optval, optvallen); case ZMQ_RCVMORE: case ZMQ_RATE: { if (!optvallen || *optvallen < sizeof (int64_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, option, &val, &size); if (rc < 0) return -1; assert (size == sizeof (int)); *(int64_t*) optval = (int64_t) val; *optvallen = sizeof (int64_t); return 0; } case ZMQ_HWM: { if (!optvallen || *optvallen < sizeof (uint64_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, XS_SNDHWM, &val, &size); if (rc < 0) return -1; assert (size == sizeof (int)); *(uint64_t*) optval = (uint64_t) val; *optvallen = sizeof (uint64_t); return 0; } case ZMQ_RECOVERY_IVL: { if (!optvallen || *optvallen < sizeof (int64_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, option, &val, &size); if (rc < 0) return -1; val /= 1000; assert (size == sizeof (int)); *(int64_t*) optval = (int64_t) val; *optvallen = sizeof (int64_t); return 0; } case ZMQ_RECOVERY_IVL_MSEC: { if (!optvallen || *optvallen < sizeof (int64_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, option, &val, &size); if (rc < 0) return -1; assert (size == sizeof (int)); *(int64_t*) optval = (int64_t) val; *optvallen = sizeof (int64_t); return 0; } case ZMQ_SNDBUF: case ZMQ_RCVBUF: { if (!optvallen || *optvallen < sizeof (uint64_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, option, &val, &size); if (rc < 0) return -1; assert (size == sizeof (int)); *(int64_t*) optval = (uint64_t) val; *optvallen = sizeof (uint64_t); return 0; } case ZMQ_EVENTS: { if (!optvallen || *optvallen < sizeof (uint32_t)) { errno = EINVAL; return -1; } int val; size_t size = sizeof (int); int rc = xs_getsockopt (s, option, &val, &size); if (rc < 0) return -1; assert (size == sizeof (int)); *(int32_t*) optval = (uint32_t) val; *optvallen = sizeof (uint32_t); return 0; } default: errno = EINVAL; return -1; } } int zmq_bind (void *s, const char *addr) { int rc = xs_bind (s, addr); return rc < 0 ? -1 : 0; } int zmq_connect (void *s, const char *addr) { int rc = xs_connect (s, addr); return rc < 0 ? -1 : 0; } int zmq_send (void *s, zmq_msg_t *msg, int flags) { int rc = xs_sendmsg (s, (xs_msg_t*) msg->content, flags); return rc < 0 ? -1 : 0; } int zmq_recv (void *s, zmq_msg_t *msg, int flags) { int rc = xs_recvmsg (s, (xs_msg_t*) msg->content, flags); return rc < 0 ? -1 : 0; } int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout) { return xs_poll ((xs_pollitem_t*) items, nitems, timeout < 0 ? -1 : (int) (timeout / 1000)); } int zmq_device (int device, void *frontend, void *backend) { int more; size_t size; xs_msg_t msg; xs_pollitem_t items [2]; int rc = xs_msg_init (&msg); if (rc != 0) return -1; items [0].socket = frontend; items [0].events = XS_POLLIN; items [1].socket = backend; items [1].events = XS_POLLIN; while (1) { rc = xs_poll (&items [0], 2, -1); if (rc < 0) return -1; if (items [0].revents & XS_POLLIN) { while (1) { rc = xs_recvmsg (frontend, &msg, 0); if (rc < 0) return -1; size = sizeof (more); rc = xs_getsockopt (frontend, XS_RCVMORE, &more, &size); if (rc < 0) return -1; rc = xs_sendmsg (backend, &msg, more ? XS_SNDMORE : 0); if (rc < 0) return -1; if (!more) break; } } if (items [1].revents & XS_POLLIN) { while (1) { rc = xs_recvmsg (backend, &msg, 0); if (rc < 0) return -1; size = sizeof (more); rc = xs_getsockopt (backend, XS_RCVMORE, &more, &size); if (rc < 0) return -1; rc = xs_sendmsg (frontend, &msg, more ? ZMQ_SNDMORE : 0); if (rc < 0) return -1; if (!more) break; } } } return 0; } void *zmq_stopwatch_start () { return xs_stopwatch_start (); } unsigned long zmq_stopwatch_stop (void *watch) { return xs_stopwatch_stop (watch); } void zmq_sleep (int seconds_) { #if defined XS_HAVE_WINDOWS Sleep (seconds_ * 1000); #else sleep (seconds_); #endif }