diff options
-rw-r--r-- | include/zmq.h | 6 | ||||
-rw-r--r-- | perf/local_lat.cpp | 12 | ||||
-rw-r--r-- | perf/local_thr.cpp | 12 | ||||
-rw-r--r-- | perf/remote_lat.cpp | 12 | ||||
-rw-r--r-- | perf/remote_thr.cpp | 6 | ||||
-rw-r--r-- | src/zmq.cpp | 59 | ||||
-rw-r--r-- | tests/test_hwm.cpp | 34 | ||||
-rw-r--r-- | tests/testutil.hpp | 38 |
8 files changed, 96 insertions, 83 deletions
diff --git a/include/zmq.h b/include/zmq.h index 6f8e766..cd1bc90 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -216,8 +216,10 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr); -ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags); -ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); +ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); +ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); +ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags); +ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags); /******************************************************************************/ /* I/O multiplexing. */ diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp index 8d51662..999e799 100644 --- a/perf/local_lat.cpp +++ b/perf/local_lat.cpp @@ -68,18 +68,18 @@ int main (int argc, char *argv []) } for (i = 0; i != roundtrip_count; i++) { - rc = zmq_recv (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); return -1; } if (zmq_msg_size (&msg) != message_size) { printf ("message of incorrect size received\n"); return -1; } - rc = zmq_send (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + rc = zmq_sendmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); return -1; } } diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index 0dea8c6..b5379d8 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -79,9 +79,9 @@ int main (int argc, char *argv []) return -1; } - rc = zmq_recv (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); return -1; } if (zmq_msg_size (&msg) != message_size) { @@ -92,9 +92,9 @@ int main (int argc, char *argv []) watch = zmq_stopwatch_start (); for (i = 0; i != message_count - 1; i++) { - rc = zmq_recv (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); return -1; } if (zmq_msg_size (&msg) != message_size) { diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp index f353ed6..0d438e9 100644 --- a/perf/remote_lat.cpp +++ b/perf/remote_lat.cpp @@ -75,14 +75,14 @@ int main (int argc, char *argv []) watch = zmq_stopwatch_start (); for (i = 0; i != roundtrip_count; i++) { - rc = zmq_send (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + rc = zmq_sendmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); return -1; } - rc = zmq_recv (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); return -1; } if (zmq_msg_size (&msg) != message_size) { diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 78b8f72..ba36b98 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -76,9 +76,9 @@ int main (int argc, char *argv []) memset (zmq_msg_data (&msg), 0, message_size); #endif - rc = zmq_send (s, &msg, 0); - if (rc != 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + rc = zmq_sendmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); return -1; } rc = zmq_msg_close (&msg); diff --git a/src/zmq.cpp b/src/zmq.cpp index 89848e7..b533d23 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -343,22 +343,73 @@ int zmq_connect (void *s_, const char *addr_) return (((zmq::socket_base_t*) s_)->connect (addr_)); } -int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) +int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init_size (&msg, len_); + if (rc != 0) + return -1; + memcpy (zmq_msg_data (&msg), buf_, len_); + + rc = zmq_sendmsg (s_, &msg, flags_); + if (unlikely (rc < 0)) { + int err = errno; + int rc2 = zmq_msg_close (&msg); + errno_assert (rc2 == 0); + errno = err; + return -1; + } + + // Note the optimisation here. We don't close the msg object as it is + // empty anyway. This may change when implementation of zmq_msg_t changes. + return rc; +} + +int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + errno_assert (rc == 0); + + rc = zmq_recvmsg (s_, &msg, flags_); + if (unlikely (rc < 0)) { + int err = errno; + int rc2 = zmq_msg_close (&msg); + errno_assert (rc2 == 0); + errno = err; + return -1; + } + + // At the moment an oversized message is silently truncated. + // TODO: Build in a notification mechanism to report the overflows. + size_t to_copy = size_t (rc) < len_ ? size_t (rc) : len_; + memcpy (buf_, zmq_msg_data (&msg), to_copy); + return (int) to_copy; +} + +int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) { if (!s_) { errno = EFAULT; return -1; } - return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); + int sz = (int) zmq_msg_size (msg_); + int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_)); + if (unlikely (rc < 0)) + return -1; + return sz; } -int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) +int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) { if (!s_) { errno = EFAULT; return -1; } - return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); + int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); + if (unlikely (rc < 0)) + return -1; + return (int) zmq_msg_size (msg_); } #if defined ZMQ_FORCE_SELECT diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index 58d3a82..d579f9d 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -49,49 +49,25 @@ int main (int argc, char *argv []) // Try to send 10 messages. Only 4 should succeed. for (int i = 0; i < 10; i++) { - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - - int rc = zmq_send (sc, &msg, ZMQ_NOBLOCK); + int rc = zmq_send (sc, NULL, 0, ZMQ_NOBLOCK); if (i < 4) assert (rc == 0); else - assert (rc != 0 && errno == EAGAIN); - - rc = zmq_msg_close (&msg); - assert (rc == 0); + assert (rc < 0 && errno == EAGAIN); } // There should be now 4 messages pending, consume them. for (int i = 0; i != 4; i++) { - - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - - rc = zmq_recv (sb, &msg, 0); - assert (rc == 0); - - rc = zmq_msg_close (&msg); + rc = zmq_recv (sb, NULL, 0, 0); assert (rc == 0); } // Now it should be possible to send one more. - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_send (sc, &msg, 0); - assert (rc == 0); - rc = zmq_msg_close (&msg); + rc = zmq_send (sc, NULL, 0, 0); assert (rc == 0); // Consume the remaining message. - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_recv (sb, &msg, 0); - assert (rc == 0); - rc = zmq_msg_close (&msg); + rc = zmq_recv (sb, NULL, 0, 0); assert (rc == 0); rc = zmq_close (sc); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 6879dff..1fbf1f3 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -31,39 +31,23 @@ inline void bounce (void *sb, void *sc) const char *content = "12345678ABCDEFGH12345678abcdefgh"; // Send the message. - zmq_msg_t msg1; - int rc = zmq_msg_init_size (&msg1, 32); - memcpy (zmq_msg_data (&msg1), content, 32); - rc = zmq_send (sc, &msg1, 0); - assert (rc == 0); - rc = zmq_msg_close (&msg1); - assert (rc == 0); + int rc = zmq_send (sc, content, 32, 0); + assert (rc == 32); // Bounce the message back. - zmq_msg_t msg2; - rc = zmq_msg_init (&msg2); - assert (rc == 0); - rc = zmq_recv (sb, &msg2, 0); - assert (rc == 0); - rc = zmq_send (sb, &msg2, 0); - assert (rc == 0); - rc = zmq_msg_close (&msg2); - assert (rc == 0); + char buf1 [32]; + rc = zmq_recv (sb, buf1, 32, 0); + assert (rc == 32); + rc = zmq_send (sb, buf1, 32, 0); + assert (rc == 32); // Receive the bounced message. - zmq_msg_t msg3; - rc = zmq_msg_init (&msg3); - assert (rc == 0); - rc = zmq_recv (sc, &msg3, 0); - assert (rc == 0); + char buf2 [32]; + rc = zmq_recv (sc, buf2, 32, 0); + assert (rc == 32); // Check whether the message is still the same. - assert (zmq_msg_size (&msg3) == 32); - assert (memcmp (zmq_msg_data (&msg3), content, 32) == 0); - - rc = zmq_msg_close (&msg3); - assert (rc == 0); + assert (memcmp (buf2, content, 32) == 0); } - #endif |