summaryrefslogtreecommitdiff
path: root/src/zmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r--src/zmq.cpp113
1 files changed, 113 insertions, 0 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index c8f419a..f3ccaac 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -18,6 +18,7 @@
*/
#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
#include <string.h>
#include <errno.h>
@@ -271,6 +272,11 @@ void *zmq_init (int io_threads_)
int zmq_term (void *ctx_)
{
+ if (!ctx_) {
+ errno = EFAULT;
+ return -1;
+ }
+
int rc = ((zmq::ctx_t*) ctx_)->term ();
int en = errno;
@@ -286,11 +292,19 @@ int zmq_term (void *ctx_)
void *zmq_socket (void *ctx_, int type_)
{
+ if (!ctx_) {
+ errno = EFAULT;
+ return NULL;
+ }
return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
}
int zmq_close (void *s_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
((zmq::socket_base_t*) s_)->close ();
return 0;
}
@@ -298,33 +312,57 @@ int zmq_close (void *s_)
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
optvallen_));
}
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
optvallen_));
}
int zmq_bind (void *s_, const char *addr_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->bind (addr_));
}
int zmq_connect (void *s_, const char *addr_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
}
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
@@ -336,6 +374,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
defined ZMQ_HAVE_NETBSD
+ if (!items_) {
+ errno = EFAULT;
+ return -1;
+ }
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
int npollfds = 0;
@@ -489,6 +531,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
zmq::fd_t maxfd = zmq::retired_fd;
zmq::fd_t notify_fd = zmq::retired_fd;
+ // Ensure we do not attempt to select () on more than FD_SETSIZE
+ // file descriptors.
+ zmq_assert (nitems_ <= FD_SETSIZE);
+
for (int i = 0; i != nitems_; i++) {
// 0MQ sockets.
@@ -647,6 +693,10 @@ int zmq_errno ()
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
+ if (!insocket_ || !outsocket_) {
+ errno = EFAULT;
+ return -1;
+ }
switch (device_) {
case ZMQ_FORWARDER:
return zmq::forwarder ((zmq::socket_base_t*) insocket_,
@@ -661,3 +711,66 @@ int zmq_device (int device_, void *insocket_, void *outsocket_)
return EINVAL;
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+// 0MQ utils - to be used by perf tests
+////////////////////////////////////////////////////////////////////////////////
+
+#if defined ZMQ_HAVE_WINDOWS
+
+static uint64_t now ()
+{
+ // Get the high resolution counter's accuracy.
+ LARGE_INTEGER ticksPerSecond;
+ QueryPerformanceFrequency (&ticksPerSecond);
+
+ // What time is it?
+ LARGE_INTEGER tick;
+ QueryPerformanceCounter (&tick);
+
+ // Convert the tick number into the number of seconds
+ // since the system was started.
+ double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);
+ return (uint64_t) (tick.QuadPart / ticks_div);
+}
+
+void zmq_sleep (int seconds_)
+{
+ Sleep (seconds_ * 1000);
+}
+
+#else
+
+static uint64_t now ()
+{
+ struct timeval tv;
+ int rc;
+
+ rc = gettimeofday (&tv, NULL);
+ assert (rc == 0);
+ return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
+}
+
+void zmq_sleep (int seconds_)
+{
+ sleep (seconds_);
+}
+
+#endif
+
+void *zmq_stopwatch_start ()
+{
+ uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
+ assert (watch);
+ *watch = now ();
+ return (void*) watch;
+}
+
+unsigned long zmq_stopwatch_stop (void *watch_)
+{
+ uint64_t end = now ();
+ uint64_t start = *(uint64_t*) watch_;
+ free (watch_);
+ return (unsigned long) (end - start);
+}
+