summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h6
-rw-r--r--perf/local_lat.cpp12
-rw-r--r--perf/local_thr.cpp12
-rw-r--r--perf/remote_lat.cpp12
-rw-r--r--perf/remote_thr.cpp6
-rw-r--r--src/zmq.cpp59
-rw-r--r--tests/test_hwm.cpp34
-rw-r--r--tests/testutil.hpp38
8 files changed, 96 insertions, 83 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 6f8e766..cd1bc90 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -216,8 +216,10 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
size_t *optvallen);
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
-ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);
-ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
+ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
+ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
+ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
+ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
/******************************************************************************/
/* I/O multiplexing. */
diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp
index 8d51662..999e799 100644
--- a/perf/local_lat.cpp
+++ b/perf/local_lat.cpp
@@ -68,18 +68,18 @@ int main (int argc, char *argv [])
}
for (i = 0; i != roundtrip_count; i++) {
- rc = zmq_recv (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
+ rc = zmq_recvmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
- rc = zmq_send (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_send: %s\n", zmq_strerror (errno));
+ rc = zmq_sendmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
}
diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp
index 0dea8c6..b5379d8 100644
--- a/perf/local_thr.cpp
+++ b/perf/local_thr.cpp
@@ -79,9 +79,9 @@ int main (int argc, char *argv [])
return -1;
}
- rc = zmq_recv (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
+ rc = zmq_recvmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
@@ -92,9 +92,9 @@ int main (int argc, char *argv [])
watch = zmq_stopwatch_start ();
for (i = 0; i != message_count - 1; i++) {
- rc = zmq_recv (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
+ rc = zmq_recvmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp
index f353ed6..0d438e9 100644
--- a/perf/remote_lat.cpp
+++ b/perf/remote_lat.cpp
@@ -75,14 +75,14 @@ int main (int argc, char *argv [])
watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) {
- rc = zmq_send (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_send: %s\n", zmq_strerror (errno));
+ rc = zmq_sendmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
- rc = zmq_recv (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
+ rc = zmq_recvmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index 78b8f72..ba36b98 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -76,9 +76,9 @@ int main (int argc, char *argv [])
memset (zmq_msg_data (&msg), 0, message_size);
#endif
- rc = zmq_send (s, &msg, 0);
- if (rc != 0) {
- printf ("error in zmq_send: %s\n", zmq_strerror (errno));
+ rc = zmq_sendmsg (s, &msg, 0);
+ if (rc < 0) {
+ printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_msg_close (&msg);
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 89848e7..b533d23 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -343,22 +343,73 @@ int zmq_connect (void *s_, const char *addr_)
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
-int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
+int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
+{
+ zmq_msg_t msg;
+ int rc = zmq_msg_init_size (&msg, len_);
+ if (rc != 0)
+ return -1;
+ memcpy (zmq_msg_data (&msg), buf_, len_);
+
+ rc = zmq_sendmsg (s_, &msg, flags_);
+ if (unlikely (rc < 0)) {
+ int err = errno;
+ int rc2 = zmq_msg_close (&msg);
+ errno_assert (rc2 == 0);
+ errno = err;
+ return -1;
+ }
+
+ // Note the optimisation here. We don't close the msg object as it is
+ // empty anyway. This may change when implementation of zmq_msg_t changes.
+ return rc;
+}
+
+int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
+{
+ zmq_msg_t msg;
+ int rc = zmq_msg_init (&msg);
+ errno_assert (rc == 0);
+
+ rc = zmq_recvmsg (s_, &msg, flags_);
+ if (unlikely (rc < 0)) {
+ int err = errno;
+ int rc2 = zmq_msg_close (&msg);
+ errno_assert (rc2 == 0);
+ errno = err;
+ return -1;
+ }
+
+ // At the moment an oversized message is silently truncated.
+ // TODO: Build in a notification mechanism to report the overflows.
+ size_t to_copy = size_t (rc) < len_ ? size_t (rc) : len_;
+ memcpy (buf_, zmq_msg_data (&msg), to_copy);
+ return (int) to_copy;
+}
+
+int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
if (!s_) {
errno = EFAULT;
return -1;
}
- return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
+ int sz = (int) zmq_msg_size (msg_);
+ int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_));
+ if (unlikely (rc < 0))
+ return -1;
+ return sz;
}
-int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
+int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
if (!s_) {
errno = EFAULT;
return -1;
}
- return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
+ int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
+ if (unlikely (rc < 0))
+ return -1;
+ return (int) zmq_msg_size (msg_);
}
#if defined ZMQ_FORCE_SELECT
diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp
index 58d3a82..d579f9d 100644
--- a/tests/test_hwm.cpp
+++ b/tests/test_hwm.cpp
@@ -49,49 +49,25 @@ int main (int argc, char *argv [])
// Try to send 10 messages. Only 4 should succeed.
for (int i = 0; i < 10; i++)
{
- zmq_msg_t msg;
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
-
- int rc = zmq_send (sc, &msg, ZMQ_NOBLOCK);
+ int rc = zmq_send (sc, NULL, 0, ZMQ_NOBLOCK);
if (i < 4)
assert (rc == 0);
else
- assert (rc != 0 && errno == EAGAIN);
-
- rc = zmq_msg_close (&msg);
- assert (rc == 0);
+ assert (rc < 0 && errno == EAGAIN);
}
// There should be now 4 messages pending, consume them.
for (int i = 0; i != 4; i++) {
-
- zmq_msg_t msg;
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
-
- rc = zmq_recv (sb, &msg, 0);
- assert (rc == 0);
-
- rc = zmq_msg_close (&msg);
+ rc = zmq_recv (sb, NULL, 0, 0);
assert (rc == 0);
}
// Now it should be possible to send one more.
- zmq_msg_t msg;
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
- rc = zmq_send (sc, &msg, 0);
- assert (rc == 0);
- rc = zmq_msg_close (&msg);
+ rc = zmq_send (sc, NULL, 0, 0);
assert (rc == 0);
// Consume the remaining message.
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
- rc = zmq_recv (sb, &msg, 0);
- assert (rc == 0);
- rc = zmq_msg_close (&msg);
+ rc = zmq_recv (sb, NULL, 0, 0);
assert (rc == 0);
rc = zmq_close (sc);
diff --git a/tests/testutil.hpp b/tests/testutil.hpp
index 6879dff..1fbf1f3 100644
--- a/tests/testutil.hpp
+++ b/tests/testutil.hpp
@@ -31,39 +31,23 @@ inline void bounce (void *sb, void *sc)
const char *content = "12345678ABCDEFGH12345678abcdefgh";
// Send the message.
- zmq_msg_t msg1;
- int rc = zmq_msg_init_size (&msg1, 32);
- memcpy (zmq_msg_data (&msg1), content, 32);
- rc = zmq_send (sc, &msg1, 0);
- assert (rc == 0);
- rc = zmq_msg_close (&msg1);
- assert (rc == 0);
+ int rc = zmq_send (sc, content, 32, 0);
+ assert (rc == 32);
// Bounce the message back.
- zmq_msg_t msg2;
- rc = zmq_msg_init (&msg2);
- assert (rc == 0);
- rc = zmq_recv (sb, &msg2, 0);
- assert (rc == 0);
- rc = zmq_send (sb, &msg2, 0);
- assert (rc == 0);
- rc = zmq_msg_close (&msg2);
- assert (rc == 0);
+ char buf1 [32];
+ rc = zmq_recv (sb, buf1, 32, 0);
+ assert (rc == 32);
+ rc = zmq_send (sb, buf1, 32, 0);
+ assert (rc == 32);
// Receive the bounced message.
- zmq_msg_t msg3;
- rc = zmq_msg_init (&msg3);
- assert (rc == 0);
- rc = zmq_recv (sc, &msg3, 0);
- assert (rc == 0);
+ char buf2 [32];
+ rc = zmq_recv (sc, buf2, 32, 0);
+ assert (rc == 32);
// Check whether the message is still the same.
- assert (zmq_msg_size (&msg3) == 32);
- assert (memcmp (zmq_msg_data (&msg3), content, 32) == 0);
-
- rc = zmq_msg_close (&msg3);
- assert (rc == 0);
+ assert (memcmp (buf2, content, 32) == 0);
}
-
#endif