diff options
-rw-r--r-- | include/zmq.h | 6 | ||||
-rw-r--r-- | perf/c/local_thr.c | 6 | ||||
-rw-r--r-- | perf/cpp/local_thr.cpp | 5 | ||||
-rw-r--r-- | perf/python/local_lat.py | 42 | ||||
-rw-r--r-- | perf/python/local_thr.py | 44 | ||||
-rw-r--r-- | perf/python/remote_lat.py | 43 | ||||
-rw-r--r-- | perf/python/remote_thr.py | 17 | ||||
-rw-r--r-- | python/pyzmq.cpp | 924 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 29 | ||||
-rw-r--r-- | src/session.hpp | 13 | ||||
-rw-r--r-- | src/socket_base.cpp | 30 | ||||
-rw-r--r-- | src/socket_base.hpp | 2 |
13 files changed, 452 insertions, 711 deletions
diff --git a/include/zmq.h b/include/zmq.h index 2b6f996..f321fa9 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size); // Initialise a message from an existing buffer. Message isn't copied, -// instead 0SOCKETS infrastructure take ownership of the buffer and call +// instead 0MQ infrastructure take ownership of the buffer and call // deallocation functio (ffn) once it's not needed anymore. ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); @@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg); // Returns type of the message. ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); -// Initialise 0SOCKETS context. 'app_threads' specifies maximal number +// Initialise 0MQ context. 'app_threads' specifies maximal number // of application threads that can have open sockets at the same time. // 'io_threads' specifies the size of thread pool to handle I/O operations. // @@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); // threads declared at all. ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads); -// Deinitialise 0SOCKETS context including all the open sockets. Closing +// Deinitialise 0MQ context including all the open sockets. Closing // sockets after zmq_term has been called will result in undefined behaviour. ZMQ_EXPORT int zmq_term (void *context); diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 87c3220..b58fac3 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -38,6 +38,7 @@ int main (int argc, char *argv []) struct timeval end; uint64_t elapsed; uint64_t throughput; + double megabits; if (argc != 4) { printf ("usage: local_thr <bind-to> <message-count> " @@ -81,12 +82,15 @@ int main (int argc, char *argv []) elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - ((uint64_t) start.tv_sec * 1000000 + start.tv_usec); - + if (elapsed == 0) + elapsed = 1; throughput = (uint64_t) message_count * 1000000 / elapsed; + megabits = (double) (throughput * message_size * 8) / 1000000; printf ("message size: %d [B]\n", (int) message_size); printf ("message count: %d\n", (int) message_count); printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); return 0; } diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index fdcbc8d..e328117 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -63,12 +63,15 @@ int main (int argc, char *argv []) uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - ((uint64_t) start.tv_sec * 1000000 + start.tv_usec); - + if (elapsed == 0) + elapsed = 1; uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed; + double megabits = (double) (throughput * message_size * 8) / 1000000; printf ("message size: %d [B]\n", (int) message_size); printf ("message count: %d\n", (int) message_count); printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); return 0; } diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py index 9883dc0..7f1503f 100644 --- a/perf/python/local_lat.py +++ b/perf/python/local_lat.py @@ -18,50 +18,32 @@ # import sys -from datetime import datetime -import libpyzmq import time - +import libpyzmq def main (): - if len (sys.argv) != 5: - print 'usage: py_local_lat <in-interface> <out-interface> <message-size> <roundtrip-count>' + if len (sys.argv) != 4: + print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>' sys.exit (1) try: - in_interface = sys.argv [1] - out_interface = sys.argv [2] + bind_to = sys.argv [1] + roundtrip_count = int (sys.argv [2]) message_size = int (sys.argv [3]) - roundtrip_count = int (sys.argv [4]) except (ValueError, OverflowError), e: print 'message-size and roundtrip-count must be integers' sys.exit (1) - print "message size:", message_size, "[B]" - print "roundtrip count:", roundtrip_count + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.REP) + s.bind (bind_to) - z = libpyzmq.Zmq () - context = z.context (1,1); - - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - - z.bind (in_socket, addr = in_interface) - z.bind (out_socket, addr = out_interface) - - msg_out = z.init_msg_data (string_msg, type) - - start = datetime.now () for i in range (0, roundtrip_count): - z.send (out_socket, msg_out, True) - list = z.receive (in_socket, True) - msg_in = list [1] - assert len(msg_in) == message_size - end = datetime.now () + msg = s.recv () + assert len (msg) == message_size + s.send (msg) - delta = end - start - delta_us = delta.seconds * 1000000 + delta.microseconds - print 'Your average latency is', delta_us / roundtrip_count, ' [us]' + time.sleep (1) if __name__ == "__main__": main () diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py index 9b199df..6113c82 100644 --- a/perf/python/local_thr.py +++ b/perf/python/local_thr.py @@ -23,46 +23,42 @@ import libpyzmq def main (): if len (sys.argv) != 4: - print ('usage: py_local_thr <in_interface> <message-size> ' + - '<message-count>') + print 'usage: local_thr <bind-to> <message-size> <message-count>' sys.exit (1) try: + bind_to = sys.argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - print "message size:", message_size, "[B]" - print "message count:", message_count + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s.bind (bind_to) - z = libpyzmq.Zmq () + msg = s.recv () + assert len (msg) == message_size - context = z.context (1,1) - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - z.connect (in_socketaddr = sys.argv [1]) - - - list = z.receive (in_socket, True) - msg = list [1] - assert len(msg) == message_size start = datetime.now () + for i in range (1, message_count): - list = z.receive (in_socket, True) - msg = list [1] - assert len(msg) == message_size + msg = s.recv () + assert len (msg) == message_size + end = datetime.now() - delta = end - start - delta_us = delta.seconds * 1000000 + delta.microseconds - if delta_us == 0: - delta_us = 1 - message_thr = (1000000.0 * float (message_count)) / float (delta_us) - megabit_thr = (message_thr * float (message_size) * 8.0) / 1000000.0; + elapsed = (end - start).seconds * 1000000 + (end - start).microseconds + if elapsed == 0: + elapsed = 1 + throughput = (1000000.0 * float (message_count)) / float (elapsed) + megabits = float (throughput * message_size * 8) / 1000000 - print "Your average throughput is %.0f [msg/s]" % (message_thr, ) - print "Your average throughput is %.2f [Mb/s]" % (megabit_thr, ) + print "message size: %.0f [B]" % (message_size, ) + print "message count: %.0f" % (message_count, ) + print "mean throughput: %.0f [msg/s]" % (throughput, ) + print "mean throughput: %.3f [Mb/s]" % (megabits, ) if __name__ == "__main__": main () diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py index ac73595..372f567 100644 --- a/perf/python/remote_lat.py +++ b/perf/python/remote_lat.py @@ -20,39 +20,40 @@ import sys from datetime import datetime import libpyzmq -import time - def main (): - if len(sys.argv) != 5: - print ('usage: py_remote_lat <in-interface> ' + - '<out-interface> <message-size> <roundtrip-count>') + if len(sys.argv) != 4: + print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>' sys.exit (1) try: - message_size = int (sys.argv [3]) - roundtrip_count = int (sys.argv [4]) + connect_to = sys.argv [1] + message_size = int (sys.argv [2]) + roundtrip_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - z = libpyzmq.Zmq () + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.REQ) + s.connect (connect_to) + + msg = ''.join ([' ' for n in range (0, message_size)]) + + start = datetime.now () - context = z.context (1,1) - - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - - z.connect (in_socket, addr = in_interface) - z.connect (out_socket, addr = out_interface) - for i in range (0, roundtrip_count): - list = z.receive (in_socket, True) - message = list [1] - z.send (out_socket, message, True) - - time.sleep (2) + s.send (msg) + msg = s.recv () + assert len (msg) == message_size + + end = datetime.now () + delta = (end - start).microseconds + 1000000 * (end - start).seconds + latency = delta / roundtrip_count / 2 + print "message size: %.0f [B]" % (message_size, ) + print "roundtrip count: %.0f" % (roundtrip_count, ) + print "mean latency: %.3f [us]" % (latency, ) if __name__ == "__main__": main () diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index a4f2b66..a80adfd 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -18,33 +18,32 @@ # import sys -from datetime import datetime import libpyzmq import time def main (): if len (sys.argv) != 4: - print 'usage: py_remote_thr <out-interface> <message-size> <message-count>' + print 'usage: remote_thr <connect-to> <message-size> <message-count>' sys.exit (1) try: + connect_to = argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - z = libpyzmq.Zmq () - context = z.context (1,1); - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - z.bind (out_socket, addr = sys.argv [1]) + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s.connect (connect_to) - msg = z.init_msg_data (string_msg, type) + msg = ''.join ([' ' for n in range (0, message_size)]) for i in range (0, message_count): - z.send (out_socket, msg, True) + s.send (msg) - time.sleep (2) + time.sleep (10) if __name__ == "__main__": main () diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp index 019c73c..8913b8a 100644 --- a/python/pyzmq.cpp +++ b/python/pyzmq.cpp @@ -17,740 +17,484 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <stddef.h> +#include <assert.h> +#include <errno.h> +#include <string.h> #include <Python.h> -#include <zmq.hpp> +#include "../include/zmq.h" -struct pyZMQ +struct context_t { PyObject_HEAD - + void *handle; }; -void pyZMQ_dealloc (pyZMQ *self) -{ - - self->ob_type->tp_free ((PyObject*) self); -} - -PyObject *pyZMQ_new (PyTypeObject *type, PyObject *args, PyObject *kwdict) +PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { - pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0); +printf ("context_new\n"); + context_t *self = (context_t*) type->tp_alloc (type, 0); - return (PyObject*) self; -} + if (self) + self->handle = NULL; -PyObject *pyZMQ_term (PyTypeObject *type, PyObject *args, PyObject *kwdict) -{ - pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0); - - static const char *kwlist [] = {"context", NULL}; - PyObject *context; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &context)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_term ((void *) context); - assert (rc != 0); - return (PyObject*) self; } -int pyZMQ_init (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - return 0; -} -PyObject *pyZMQ_context (pyZMQ *self, PyObject *args, PyObject *kwdict) +int context_init (context_t *self, PyObject *args, PyObject *kwdict) { - static const char *kwlist [] = {"app_threads", "io_threads", NULL}; - +printf ("context_init\n"); int app_threads; int io_threads; - + static const char *kwlist [] = {"app_threads", "io_threads", NULL}; if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist, - &app_threads, &io_threads)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - void *context = zmq_init (app_threads, io_threads); - if (context == NULL) { - assert (errno == EINVAL); - PyErr_SetString (PyExc_ValueError, "Invalid argument"); + &app_threads, &io_threads)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); +printf ("context_init err1\n"); + return -1; // ? } - - return (PyObject*) context; -} -PyObject *pyZMQ_msg_init (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - zmq_msg *msg; +printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads); - int rc = zmq_msg_init (msg); - - if (rc == -1) { - assert (rc == ENOMEM); - PyErr_SetString( PyExc_MemoryError, "Out of memory"); - } - - return (PyObject*) msg; -} - - -PyObject *pyZMQ_msg_init_size (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"size", NULL}; - - zmq_msg *msg; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "i", (char**) kwlist, - &size)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_init_size (msg, size); - - if (rc == -1) { - assert (rc == ENOMEM); - PyErr_SetString( PyExc_ValueError, "Out of memory"); - } - - return (PyObject*) msg; -} - -PyObject *pyZMQ_msg_init_data (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"data", "size", "ffn", NULL}; - - PyObject *data = PyString_FromStringAndSize (NULL, 0); - zmq_msg *msg; - PyObject *ffn; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "SiO", (char**) kwlist, - &data, &size, &ffn)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_init_data (msg, data, size, NULL); - assert (rc == 0); - - return (PyObject*) msg; -} - -PyObject *pyZMQ_msg_close (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_close ((zmq_msg *) msg); - assert (rc == 0); + assert (!self->handle); + self->handle = zmq_init (app_threads, io_threads); + if (!self->handle) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return -1; // ? +printf ("context_init err2\n"); + } - return (PyObject*) self; +printf ("context_init ok\n"); + return 0; } -PyObject *pyZMQ_msg_move (pyZMQ *self, PyObject *args, PyObject *kwdict) +void context_dealloc (context_t *self) { - static const char *kwlist [] = {"src", NULL}; - - zmq_msg *dest; - PyObject *src; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &src)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_move (dest, (zmq_msg*) src); - assert (rc == 0); +printf ("context_dealloc\n"); + if (self->handle) { + int rc = zmq_term (self->handle); + if (rc != 0) + PyErr_SetString (PyExc_SystemError, strerror (errno)); + } - return (PyObject*) dest; + self->ob_type->tp_free ((PyObject*) self); } -PyObject *pyZMQ_msg_copy (pyZMQ *self, PyObject *args, PyObject *kwdict) +struct socket_t { - static const char *kwlist [] = {"src", NULL}; - - PyObject *dest; - PyObject *src; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &src)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_copy ((zmq_msg*) dest, (zmq_msg*) src); - assert (rc == 0); - - return (PyObject*) dest; -} + PyObject_HEAD + void *handle; +}; -PyObject *pyZMQ_msg_data (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - PyObject *data = PyString_FromStringAndSize (NULL, 0); - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - data = (PyObject *) zmq_msg_data ((zmq_msg *) msg); - - return (PyObject*) data; -} +printf ("socket_new\n"); + socket_t *self = (socket_t*) type->tp_alloc (type, 0); -int pyZMQ_msg_size (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - size = zmq_msg_size ((zmq_msg*) msg); - - return size; -} + if (self) + self->handle = NULL; -int pyZMQ_msg_type (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - int type; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - type = zmq_msg_type ((zmq_msg*) msg); - - return type; + return (PyObject*) self; } -PyObject *pyZMQ_socket (pyZMQ *self, PyObject *args, PyObject *kwdict) +int socket_init (socket_t *self, PyObject *args, PyObject *kwdict) { +printf ("socket_init\n"); + context_t *context; + int socket_type; static const char *kwlist [] = {"context", "type", NULL}; - void* context; - int type; - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist, - &context, &type)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); + &context, &socket_type)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + // TODO: Check whether 'context' is really a libpyzmq.Context object. - void *socket = zmq_socket ((void *) context, type); - - if (socket == NULL) { - assert (errno == EMFILE || errno == EINVAL); - if (errno == EMFILE) - PyErr_SetString (PyExc_MemoryError, "Too many threads"); - else - PyErr_SetString (PyExc_ValueError, "Invalid argument"); + assert (!self->handle); + self->handle = zmq_socket (context->handle, socket_type); + if (!self->handle) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return -1; // ? } - return (PyObject*) socket; + return 0; } -PyObject *pyZMQ_close (pyZMQ *self, PyObject *args, PyObject *kwdict) +void socket_dealloc (socket_t *self) { - static const char *kwlist [] = {"socket", NULL}; - - PyObject* socket; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &socket)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - - int rc = zmq_close ((void *)socket); - assert (rc == 0); +printf ("socket_dealloc\n"); + if (self->handle) { + int rc = zmq_close (self->handle); + if (rc != 0) + PyErr_SetString (PyExc_SystemError, strerror (errno)); + } - return (PyObject *) self; + self->ob_type->tp_free ((PyObject*) self); } -PyObject *pyZMQ_setsockopt (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_setsockopt (socket_t *self, PyObject *args, PyObject *kwdict) { - static const char *kwlist [] = {"socket", "option", "optval", NULL}; - printf ("setsockopt\n"); - PyObject* socket; int option; PyObject* optval; - int optvallen; + static const char *kwlist [] = {"option", "optval", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "iO", (char**) kwlist, + &option, &optval)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + int rc; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "OiO", (char**) kwlist, - &socket, &option, &optval)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - if (PyInt_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 4); - if (PyBool_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 1); - if (PyFloat_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 4); + if (PyInt_Check (optval)) { + int val = PyInt_AsLong (optval); + rc = zmq_setsockopt (self->handle, option, &val, sizeof (int)); + } if (PyString_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - PyString_Size (optval)); + rc = zmq_setsockopt (self->handle, option, PyString_AsString (optval), + PyString_Size (optval)); + else { + rc = -1; + errno = EINVAL; + } - assert (rc == 0); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } - return (PyObject *) self; + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_bind (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_bind (socket_t *self, PyObject *args, PyObject *kwdict) { - char const *addr = NULL; - PyObject* socket; - - static const char *kwlist [] = {"socket", "addr", NULL}; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Os", (char**) kwlist, - &socket, &addr)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_bind ((void*) socket, addr); - if (rc == -1) { - assert (errno == EINVAL || errno == EADDRINUSE); - if (errno == EINVAL) - PyErr_SetString (PyExc_ValueError, "Invalid argument"); - else - PyErr_SetString (PyExc_ValueError, "Address in use"); + char const *addr; + static const char *kwlist [] = {"addr", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist, + &addr)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; } - return (PyObject *) self; + int rc = zmq_bind (self->handle, addr); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_connect (pyZMQ *self, PyObject *args, PyObject *kw) + +PyObject *socket_connect (socket_t *self, PyObject *args, PyObject *kwdict) { - char const *addr = NULL; - PyObject* socket; - - static const char* kwlist [] = {"socket", "addr", NULL}; - - if (!PyArg_ParseTupleAndKeywords (args, kw, "Os", (char**) kwlist, - &socket, &addr)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_connect ((void *) socket, addr); - if (rc == -1) { - assert (errno == EINVAL || errno == EADDRINUSE); - if (errno == EINVAL) - PyErr_SetString (PyExc_ValueError, "Invalid argument"); - else - PyErr_SetString (PyExc_ValueError, "Address in use"); + char const *addr; + static const char *kwlist [] = {"addr", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist, + &addr)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + int rc = zmq_connect (self->handle, addr); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; } - return (PyObject *) self; + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_flush (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_send (socket_t *self, PyObject *args, PyObject *kwdict) { + PyObject *msg; /* = PyString_FromStringAndSize (NULL, 0); */ + int flags = 0; + static const char *kwlist [] = {"msg", "flags", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "S|i", (char**) kwlist, + &msg, &flags)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } - static const char *kwlist [] = {"socket", NULL}; - PyObject *socket; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &socket)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_flush ((void*) socket); - assert (rc == 0); + zmq_msg_t data; + int rc = zmq_msg_init_size (&data, PyString_Size (msg)); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + memcpy (zmq_msg_data (&data), PyString_AsString (msg), + zmq_msg_size (&data)); + + rc = zmq_send (self->handle, &data, flags); + int rc2 = zmq_msg_close (&data); + assert (rc2 == 0); + + if (rc != 0 && errno == EAGAIN) + return PyBool_FromLong (0); + + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } - return (PyObject *) self; + return PyBool_FromLong (1); } -PyObject *pyZMQ_send (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_flush (socket_t *self, PyObject *args, PyObject *kwdict) { - PyObject *msg; - PyObject *socket; - int flags = 0; - - static const char *kwlist [] = {"socket", "msg", "flags", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwdict, "OOi", (char**) kwlist, - &socket, &msg, &flags)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_send ((void*) socket, (zmq_msg*) msg, flags); - assert (rc == 0 || (rc == -1 && errno == EAGAIN)); - - return (PyObject *) self; + static const char *kwlist [] = {NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "", (char**) kwlist)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + int rc = zmq_flush (self->handle); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_receive (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_recv (socket_t *self, PyObject *args, PyObject *kwdict) { - zmq_msg *msg; - zmq_msg_init (msg); - PyObject *socket; - int flags = 0; - static const char *kwlist [] = {"socket", "flags", NULL}; + static const char *kwlist [] = {"flags", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "|i", (char**) kwlist, + &flags)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist, - &socket, &flags)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_recv (socket, msg, flags); - assert (rc == 0 || (rc == -1 && errno == EAGAIN)); - - PyObject *py_message = PyString_FromStringAndSize (NULL, 0); - py_message = (PyObject *) zmq_msg_data (msg); - int py_message_size = zmq_msg_size (msg); - int py_message_type = zmq_msg_type (msg); - - zmq_msg_close (msg); - - return Py_BuildValue ("isi", rc, py_message, - py_message_size, py_message_type); + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_recv (self->handle, &msg, flags); + + if (rc != 0 && errno == EAGAIN) { + Py_INCREF (Py_None); + return Py_None; + } + + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + PyObject *result = PyString_FromStringAndSize ((char*) zmq_msg_data (&msg), + zmq_msg_size (&msg)); + rc = zmq_msg_close (&msg); + assert (rc == 0); + return result; } -static PyMethodDef pyZMQ_methods [] = +static PyMethodDef context_methods [] = { { - "context", - (PyCFunction) pyZMQ_context, - METH_VARARGS | METH_KEYWORDS, - "context (app_threads, io_threads) -> None\n\n" - "Creates new context\n\n" - "app_threads is the number of application threads.\n\n" - "io_threads is the number of io threads.\n\n" - - }, - { - "msg_init", - (PyCFunction) pyZMQ_msg_init, - METH_VARARGS | METH_KEYWORDS, - "msg_init () -> None\n\n" - "Creates new message\n\n" - - }, - { - "msg_init_size", - (PyCFunction) pyZMQ_msg_init_size, - METH_VARARGS | METH_KEYWORDS, - "msg_init_size (size) -> None\n\n" - "Creates new message of a specified size.\n\n" - "size if integer specifying the size of the message to be created.\n\n" - - }, - { - "msg_init_data", - (PyCFunction) pyZMQ_msg_init_data, - METH_VARARGS | METH_KEYWORDS, - "msg_init_data (data, size, ffn) -> None\n\n" - "Initialises new message with data\n\n" - "data is pointer to the data of the message\n\n" - "size is integer specifying size of data\n\n" - "ffn is function to free alocated data\n\n" - - }, - - { - "msg_close", - (PyCFunction) pyZMQ_msg_close, - METH_VARARGS | METH_KEYWORDS, - "msg_close (msg) -> None\n\n" - "Deletes the message.\n\n" - "msg is the message the be freed\n\n" - - }, - { - "msg_move", - (PyCFunction) pyZMQ_msg_move, - METH_VARARGS | METH_KEYWORDS, - "msg_move (src) -> dest\n\n" - "Move the content of the message from 'src' to 'dest'.\n\n" - "The content isn't copied, just moved. 'src' is an empty\n\n" - "message after the call. Original content of 'dest' message\n\n" - "is deallocated.\n\n" - - }, - { - "msg_copy", - (PyCFunction) pyZMQ_msg_copy, - METH_VARARGS | METH_KEYWORDS, - "msg_copy (src) -> dest\n\n" - "Copy the 'src' message to 'dest'. The content isn't copied, \n\n" - "instead reference count is increased. Don't modify the message \n\n" - "data after the call as they are shared between two messages.\n\n" |