diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-02 10:22:23 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-02 10:22:23 +0200 |
commit | 6a5120b1f1c48d19b777f76ac756b00fb624d110 (patch) | |
tree | 33853b4f9aaaf88bcb82a53fe3607d91d7a06ab1 | |
parent | 72fdf47d16c8d3ecd9da657b4649978e414d775c (diff) |
python extension & perf tests
-rw-r--r-- | include/zmq.h | 6 | ||||
-rw-r--r-- | perf/c/local_thr.c | 6 | ||||
-rw-r--r-- | perf/cpp/local_thr.cpp | 5 | ||||
-rw-r--r-- | perf/python/local_lat.py | 42 | ||||
-rw-r--r-- | perf/python/local_thr.py | 44 | ||||
-rw-r--r-- | perf/python/remote_lat.py | 43 | ||||
-rw-r--r-- | perf/python/remote_thr.py | 17 | ||||
-rw-r--r-- | python/pyzmq.cpp | 924 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 29 | ||||
-rw-r--r-- | src/session.hpp | 13 | ||||
-rw-r--r-- | src/socket_base.cpp | 30 | ||||
-rw-r--r-- | src/socket_base.hpp | 2 |
13 files changed, 452 insertions, 711 deletions
diff --git a/include/zmq.h b/include/zmq.h index 2b6f996..f321fa9 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size); // Initialise a message from an existing buffer. Message isn't copied, -// instead 0SOCKETS infrastructure take ownership of the buffer and call +// instead 0MQ infrastructure take ownership of the buffer and call // deallocation functio (ffn) once it's not needed anymore. ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); @@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg); // Returns type of the message. ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); -// Initialise 0SOCKETS context. 'app_threads' specifies maximal number +// Initialise 0MQ context. 'app_threads' specifies maximal number // of application threads that can have open sockets at the same time. // 'io_threads' specifies the size of thread pool to handle I/O operations. // @@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); // threads declared at all. ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads); -// Deinitialise 0SOCKETS context including all the open sockets. Closing +// Deinitialise 0MQ context including all the open sockets. Closing // sockets after zmq_term has been called will result in undefined behaviour. ZMQ_EXPORT int zmq_term (void *context); diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 87c3220..b58fac3 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -38,6 +38,7 @@ int main (int argc, char *argv []) struct timeval end; uint64_t elapsed; uint64_t throughput; + double megabits; if (argc != 4) { printf ("usage: local_thr <bind-to> <message-count> " @@ -81,12 +82,15 @@ int main (int argc, char *argv []) elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - ((uint64_t) start.tv_sec * 1000000 + start.tv_usec); - + if (elapsed == 0) + elapsed = 1; throughput = (uint64_t) message_count * 1000000 / elapsed; + megabits = (double) (throughput * message_size * 8) / 1000000; printf ("message size: %d [B]\n", (int) message_size); printf ("message count: %d\n", (int) message_count); printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); return 0; } diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index fdcbc8d..e328117 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -63,12 +63,15 @@ int main (int argc, char *argv []) uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - ((uint64_t) start.tv_sec * 1000000 + start.tv_usec); - + if (elapsed == 0) + elapsed = 1; uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed; + double megabits = (double) (throughput * message_size * 8) / 1000000; printf ("message size: %d [B]\n", (int) message_size); printf ("message count: %d\n", (int) message_count); printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); return 0; } diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py index 9883dc0..7f1503f 100644 --- a/perf/python/local_lat.py +++ b/perf/python/local_lat.py @@ -18,50 +18,32 @@ # import sys -from datetime import datetime -import libpyzmq import time - +import libpyzmq def main (): - if len (sys.argv) != 5: - print 'usage: py_local_lat <in-interface> <out-interface> <message-size> <roundtrip-count>' + if len (sys.argv) != 4: + print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>' sys.exit (1) try: - in_interface = sys.argv [1] - out_interface = sys.argv [2] + bind_to = sys.argv [1] + roundtrip_count = int (sys.argv [2]) message_size = int (sys.argv [3]) - roundtrip_count = int (sys.argv [4]) except (ValueError, OverflowError), e: print 'message-size and roundtrip-count must be integers' sys.exit (1) - print "message size:", message_size, "[B]" - print "roundtrip count:", roundtrip_count + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.REP) + s.bind (bind_to) - z = libpyzmq.Zmq () - context = z.context (1,1); - - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - - z.bind (in_socket, addr = in_interface) - z.bind (out_socket, addr = out_interface) - - msg_out = z.init_msg_data (string_msg, type) - - start = datetime.now () for i in range (0, roundtrip_count): - z.send (out_socket, msg_out, True) - list = z.receive (in_socket, True) - msg_in = list [1] - assert len(msg_in) == message_size - end = datetime.now () + msg = s.recv () + assert len (msg) == message_size + s.send (msg) - delta = end - start - delta_us = delta.seconds * 1000000 + delta.microseconds - print 'Your average latency is', delta_us / roundtrip_count, ' [us]' + time.sleep (1) if __name__ == "__main__": main () diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py index 9b199df..6113c82 100644 --- a/perf/python/local_thr.py +++ b/perf/python/local_thr.py @@ -23,46 +23,42 @@ import libpyzmq def main (): if len (sys.argv) != 4: - print ('usage: py_local_thr <in_interface> <message-size> ' + - '<message-count>') + print 'usage: local_thr <bind-to> <message-size> <message-count>' sys.exit (1) try: + bind_to = sys.argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - print "message size:", message_size, "[B]" - print "message count:", message_count + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s.bind (bind_to) - z = libpyzmq.Zmq () + msg = s.recv () + assert len (msg) == message_size - context = z.context (1,1) - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - z.connect (in_socketaddr = sys.argv [1]) - - - list = z.receive (in_socket, True) - msg = list [1] - assert len(msg) == message_size start = datetime.now () + for i in range (1, message_count): - list = z.receive (in_socket, True) - msg = list [1] - assert len(msg) == message_size + msg = s.recv () + assert len (msg) == message_size + end = datetime.now() - delta = end - start - delta_us = delta.seconds * 1000000 + delta.microseconds - if delta_us == 0: - delta_us = 1 - message_thr = (1000000.0 * float (message_count)) / float (delta_us) - megabit_thr = (message_thr * float (message_size) * 8.0) / 1000000.0; + elapsed = (end - start).seconds * 1000000 + (end - start).microseconds + if elapsed == 0: + elapsed = 1 + throughput = (1000000.0 * float (message_count)) / float (elapsed) + megabits = float (throughput * message_size * 8) / 1000000 - print "Your average throughput is %.0f [msg/s]" % (message_thr, ) - print "Your average throughput is %.2f [Mb/s]" % (megabit_thr, ) + print "message size: %.0f [B]" % (message_size, ) + print "message count: %.0f" % (message_count, ) + print "mean throughput: %.0f [msg/s]" % (throughput, ) + print "mean throughput: %.3f [Mb/s]" % (megabits, ) if __name__ == "__main__": main () diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py index ac73595..372f567 100644 --- a/perf/python/remote_lat.py +++ b/perf/python/remote_lat.py @@ -20,39 +20,40 @@ import sys from datetime import datetime import libpyzmq -import time - def main (): - if len(sys.argv) != 5: - print ('usage: py_remote_lat <in-interface> ' + - '<out-interface> <message-size> <roundtrip-count>') + if len(sys.argv) != 4: + print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>' sys.exit (1) try: - message_size = int (sys.argv [3]) - roundtrip_count = int (sys.argv [4]) + connect_to = sys.argv [1] + message_size = int (sys.argv [2]) + roundtrip_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - z = libpyzmq.Zmq () + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.REQ) + s.connect (connect_to) + + msg = ''.join ([' ' for n in range (0, message_size)]) + + start = datetime.now () - context = z.context (1,1) - - in_socket = z.socket (context, libpyzmq.ZMQ_SUB) - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - - z.connect (in_socket, addr = in_interface) - z.connect (out_socket, addr = out_interface) - for i in range (0, roundtrip_count): - list = z.receive (in_socket, True) - message = list [1] - z.send (out_socket, message, True) - - time.sleep (2) + s.send (msg) + msg = s.recv () + assert len (msg) == message_size + + end = datetime.now () + delta = (end - start).microseconds + 1000000 * (end - start).seconds + latency = delta / roundtrip_count / 2 + print "message size: %.0f [B]" % (message_size, ) + print "roundtrip count: %.0f" % (roundtrip_count, ) + print "mean latency: %.3f [us]" % (latency, ) if __name__ == "__main__": main () diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index a4f2b66..a80adfd 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -18,33 +18,32 @@ # import sys -from datetime import datetime import libpyzmq import time def main (): if len (sys.argv) != 4: - print 'usage: py_remote_thr <out-interface> <message-size> <message-count>' + print 'usage: remote_thr <connect-to> <message-size> <message-count>' sys.exit (1) try: + connect_to = argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and message-count must be integers' sys.exit (1) - z = libpyzmq.Zmq () - context = z.context (1,1); - out_socket = z.socket (context, libpyzmq.ZMQ_PUB) - z.bind (out_socket, addr = sys.argv [1]) + ctx = libpyzmq.Context (1, 1); + s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s.connect (connect_to) - msg = z.init_msg_data (string_msg, type) + msg = ''.join ([' ' for n in range (0, message_size)]) for i in range (0, message_count): - z.send (out_socket, msg, True) + s.send (msg) - time.sleep (2) + time.sleep (10) if __name__ == "__main__": main () diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp index 019c73c..8913b8a 100644 --- a/python/pyzmq.cpp +++ b/python/pyzmq.cpp @@ -17,740 +17,484 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <stddef.h> +#include <assert.h> +#include <errno.h> +#include <string.h> #include <Python.h> -#include <zmq.hpp> +#include "../include/zmq.h" -struct pyZMQ +struct context_t { PyObject_HEAD - + void *handle; }; -void pyZMQ_dealloc (pyZMQ *self) -{ - - self->ob_type->tp_free ((PyObject*) self); -} - -PyObject *pyZMQ_new (PyTypeObject *type, PyObject *args, PyObject *kwdict) +PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { - pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0); +printf ("context_new\n"); + context_t *self = (context_t*) type->tp_alloc (type, 0); - return (PyObject*) self; -} + if (self) + self->handle = NULL; -PyObject *pyZMQ_term (PyTypeObject *type, PyObject *args, PyObject *kwdict) -{ - pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0); - - static const char *kwlist [] = {"context", NULL}; - PyObject *context; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &context)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_term ((void *) context); - assert (rc != 0); - return (PyObject*) self; } -int pyZMQ_init (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - return 0; -} -PyObject *pyZMQ_context (pyZMQ *self, PyObject *args, PyObject *kwdict) +int context_init (context_t *self, PyObject *args, PyObject *kwdict) { - static const char *kwlist [] = {"app_threads", "io_threads", NULL}; - +printf ("context_init\n"); int app_threads; int io_threads; - + static const char *kwlist [] = {"app_threads", "io_threads", NULL}; if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist, - &app_threads, &io_threads)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - void *context = zmq_init (app_threads, io_threads); - if (context == NULL) { - assert (errno == EINVAL); - PyErr_SetString (PyExc_ValueError, "Invalid argument"); + &app_threads, &io_threads)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); +printf ("context_init err1\n"); + return -1; // ? } - - return (PyObject*) context; -} -PyObject *pyZMQ_msg_init (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - zmq_msg *msg; +printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads); - int rc = zmq_msg_init (msg); - - if (rc == -1) { - assert (rc == ENOMEM); - PyErr_SetString( PyExc_MemoryError, "Out of memory"); - } - - return (PyObject*) msg; -} - - -PyObject *pyZMQ_msg_init_size (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"size", NULL}; - - zmq_msg *msg; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "i", (char**) kwlist, - &size)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_init_size (msg, size); - - if (rc == -1) { - assert (rc == ENOMEM); - PyErr_SetString( PyExc_ValueError, "Out of memory"); - } - - return (PyObject*) msg; -} - -PyObject *pyZMQ_msg_init_data (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"data", "size", "ffn", NULL}; - - PyObject *data = PyString_FromStringAndSize (NULL, 0); - zmq_msg *msg; - PyObject *ffn; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "SiO", (char**) kwlist, - &data, &size, &ffn)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_init_data (msg, data, size, NULL); - assert (rc == 0); - - return (PyObject*) msg; -} - -PyObject *pyZMQ_msg_close (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_close ((zmq_msg *) msg); - assert (rc == 0); + assert (!self->handle); + self->handle = zmq_init (app_threads, io_threads); + if (!self->handle) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return -1; // ? +printf ("context_init err2\n"); + } - return (PyObject*) self; +printf ("context_init ok\n"); + return 0; } -PyObject *pyZMQ_msg_move (pyZMQ *self, PyObject *args, PyObject *kwdict) +void context_dealloc (context_t *self) { - static const char *kwlist [] = {"src", NULL}; - - zmq_msg *dest; - PyObject *src; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &src)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_move (dest, (zmq_msg*) src); - assert (rc == 0); +printf ("context_dealloc\n"); + if (self->handle) { + int rc = zmq_term (self->handle); + if (rc != 0) + PyErr_SetString (PyExc_SystemError, strerror (errno)); + } - return (PyObject*) dest; + self->ob_type->tp_free ((PyObject*) self); } -PyObject *pyZMQ_msg_copy (pyZMQ *self, PyObject *args, PyObject *kwdict) +struct socket_t { - static const char *kwlist [] = {"src", NULL}; - - PyObject *dest; - PyObject *src; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &src)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_msg_copy ((zmq_msg*) dest, (zmq_msg*) src); - assert (rc == 0); - - return (PyObject*) dest; -} + PyObject_HEAD + void *handle; +}; -PyObject *pyZMQ_msg_data (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - PyObject *data = PyString_FromStringAndSize (NULL, 0); - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - data = (PyObject *) zmq_msg_data ((zmq_msg *) msg); - - return (PyObject*) data; -} +printf ("socket_new\n"); + socket_t *self = (socket_t*) type->tp_alloc (type, 0); -int pyZMQ_msg_size (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - int size; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - size = zmq_msg_size ((zmq_msg*) msg); - - return size; -} + if (self) + self->handle = NULL; -int pyZMQ_msg_type (pyZMQ *self, PyObject *args, PyObject *kwdict) -{ - static const char *kwlist [] = {"msg", NULL}; - - PyObject *msg; - int type; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &msg)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - type = zmq_msg_type ((zmq_msg*) msg); - - return type; + return (PyObject*) self; } -PyObject *pyZMQ_socket (pyZMQ *self, PyObject *args, PyObject *kwdict) +int socket_init (socket_t *self, PyObject *args, PyObject *kwdict) { +printf ("socket_init\n"); + context_t *context; + int socket_type; static const char *kwlist [] = {"context", "type", NULL}; - void* context; - int type; - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist, - &context, &type)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); + &context, &socket_type)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + // TODO: Check whether 'context' is really a libpyzmq.Context object. - void *socket = zmq_socket ((void *) context, type); - - if (socket == NULL) { - assert (errno == EMFILE || errno == EINVAL); - if (errno == EMFILE) - PyErr_SetString (PyExc_MemoryError, "Too many threads"); - else - PyErr_SetString (PyExc_ValueError, "Invalid argument"); + assert (!self->handle); + self->handle = zmq_socket (context->handle, socket_type); + if (!self->handle) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return -1; // ? } - return (PyObject*) socket; + return 0; } -PyObject *pyZMQ_close (pyZMQ *self, PyObject *args, PyObject *kwdict) +void socket_dealloc (socket_t *self) { - static const char *kwlist [] = {"socket", NULL}; - - PyObject* socket; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &socket)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - - int rc = zmq_close ((void *)socket); - assert (rc == 0); +printf ("socket_dealloc\n"); + if (self->handle) { + int rc = zmq_close (self->handle); + if (rc != 0) + PyErr_SetString (PyExc_SystemError, strerror (errno)); + } - return (PyObject *) self; + self->ob_type->tp_free ((PyObject*) self); } -PyObject *pyZMQ_setsockopt (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_setsockopt (socket_t *self, PyObject *args, PyObject *kwdict) { - static const char *kwlist [] = {"socket", "option", "optval", NULL}; - printf ("setsockopt\n"); - PyObject* socket; int option; PyObject* optval; - int optvallen; + static const char *kwlist [] = {"option", "optval", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "iO", (char**) kwlist, + &option, &optval)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + int rc; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "OiO", (char**) kwlist, - &socket, &option, &optval)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - if (PyInt_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 4); - if (PyBool_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 1); - if (PyFloat_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - 4); + if (PyInt_Check (optval)) { + int val = PyInt_AsLong (optval); + rc = zmq_setsockopt (self->handle, option, &val, sizeof (int)); + } if (PyString_Check (optval)) - rc = zmq_setsockopt ((void *) socket, option, (void *) optval, - PyString_Size (optval)); + rc = zmq_setsockopt (self->handle, option, PyString_AsString (optval), + PyString_Size (optval)); + else { + rc = -1; + errno = EINVAL; + } - assert (rc == 0); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } - return (PyObject *) self; + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_bind (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_bind (socket_t *self, PyObject *args, PyObject *kwdict) { - char const *addr = NULL; - PyObject* socket; - - static const char *kwlist [] = {"socket", "addr", NULL}; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Os", (char**) kwlist, - &socket, &addr)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_bind ((void*) socket, addr); - if (rc == -1) { - assert (errno == EINVAL || errno == EADDRINUSE); - if (errno == EINVAL) - PyErr_SetString (PyExc_ValueError, "Invalid argument"); - else - PyErr_SetString (PyExc_ValueError, "Address in use"); + char const *addr; + static const char *kwlist [] = {"addr", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist, + &addr)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; } - return (PyObject *) self; + int rc = zmq_bind (self->handle, addr); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_connect (pyZMQ *self, PyObject *args, PyObject *kw) + +PyObject *socket_connect (socket_t *self, PyObject *args, PyObject *kwdict) { - char const *addr = NULL; - PyObject* socket; - - static const char* kwlist [] = {"socket", "addr", NULL}; - - if (!PyArg_ParseTupleAndKeywords (args, kw, "Os", (char**) kwlist, - &socket, &addr)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_connect ((void *) socket, addr); - if (rc == -1) { - assert (errno == EINVAL || errno == EADDRINUSE); - if (errno == EINVAL) - PyErr_SetString (PyExc_ValueError, "Invalid argument"); - else - PyErr_SetString (PyExc_ValueError, "Address in use"); + char const *addr; + static const char *kwlist [] = {"addr", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist, + &addr)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + int rc = zmq_connect (self->handle, addr); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; } - return (PyObject *) self; + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_flush (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_send (socket_t *self, PyObject *args, PyObject *kwdict) { + PyObject *msg; /* = PyString_FromStringAndSize (NULL, 0); */ + int flags = 0; + static const char *kwlist [] = {"msg", "flags", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "S|i", (char**) kwlist, + &msg, &flags)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } - static const char *kwlist [] = {"socket", NULL}; - PyObject *socket; - - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist, - &socket)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_flush ((void*) socket); - assert (rc == 0); + zmq_msg_t data; + int rc = zmq_msg_init_size (&data, PyString_Size (msg)); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + memcpy (zmq_msg_data (&data), PyString_AsString (msg), + zmq_msg_size (&data)); + + rc = zmq_send (self->handle, &data, flags); + int rc2 = zmq_msg_close (&data); + assert (rc2 == 0); + + if (rc != 0 && errno == EAGAIN) + return PyBool_FromLong (0); + + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } - return (PyObject *) self; + return PyBool_FromLong (1); } -PyObject *pyZMQ_send (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_flush (socket_t *self, PyObject *args, PyObject *kwdict) { - PyObject *msg; - PyObject *socket; - int flags = 0; - - static const char *kwlist [] = {"socket", "msg", "flags", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwdict, "OOi", (char**) kwlist, - &socket, &msg, &flags)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_send ((void*) socket, (zmq_msg*) msg, flags); - assert (rc == 0 || (rc == -1 && errno == EAGAIN)); - - return (PyObject *) self; + static const char *kwlist [] = {NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "", (char**) kwlist)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + int rc = zmq_flush (self->handle); + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, strerror (errno)); + return NULL; + } + + Py_INCREF (Py_None); + return Py_None; } -PyObject *pyZMQ_receive (pyZMQ *self, PyObject *args, PyObject *kwdict) +PyObject *socket_recv (socket_t *self, PyObject *args, PyObject *kwdict) { - zmq_msg *msg; - zmq_msg_init (msg); - PyObject *socket; - int flags = 0; - static const char *kwlist [] = {"socket", "flags", NULL}; + static const char *kwlist [] = {"flags", NULL}; + if (!PyArg_ParseTupleAndKeywords (args, kwdict, "|i", (char**) kwlist, + &flags)) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } - if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist, - &socket, &flags)) - PyErr_SetString (PyExc_SystemError, - "PyArg_ParseTupleAndKeywords error"); - - int rc = zmq_recv (socket, msg, flags); - assert (rc == 0 || (rc == -1 && errno == EAGAIN)); - - PyObject *py_message = PyString_FromStringAndSize (NULL, 0); - py_message = (PyObject *) zmq_msg_data (msg); - int py_message_size = zmq_msg_size (msg); - int py_message_type = zmq_msg_type (msg); - - zmq_msg_close (msg); - - return Py_BuildValue ("isi", rc, py_message, - py_message_size, py_message_type); + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_recv (self->handle, &msg, flags); + + if (rc != 0 && errno == EAGAIN) { + Py_INCREF (Py_None); + return Py_None; + } + + if (rc != 0) { + PyErr_SetString (PyExc_SystemError, "invalid arguments"); + return NULL; + } + + PyObject *result = PyString_FromStringAndSize ((char*) zmq_msg_data (&msg), + zmq_msg_size (&msg)); + rc = zmq_msg_close (&msg); + assert (rc == 0); + return result; } -static PyMethodDef pyZMQ_methods [] = +static PyMethodDef context_methods [] = { { - "context", - (PyCFunction) pyZMQ_context, - METH_VARARGS | METH_KEYWORDS, - "context (app_threads, io_threads) -> None\n\n" - "Creates new context\n\n" - "app_threads is the number of application threads.\n\n" - "io_threads is the number of io threads.\n\n" - - }, - { - "msg_init", - (PyCFunction) pyZMQ_msg_init, - METH_VARARGS | METH_KEYWORDS, - "msg_init () -> None\n\n" - "Creates new message\n\n" - - }, - { - "msg_init_size", - (PyCFunction) pyZMQ_msg_init_size, - METH_VARARGS | METH_KEYWORDS, - "msg_init_size (size) -> None\n\n" - "Creates new message of a specified size.\n\n" - "size if integer specifying the size of the message to be created.\n\n" - - }, - { - "msg_init_data", - (PyCFunction) pyZMQ_msg_init_data, - METH_VARARGS | METH_KEYWORDS, - "msg_init_data (data, size, ffn) -> None\n\n" - "Initialises new message with data\n\n" - "data is pointer to the data of the message\n\n" - "size is integer specifying size of data\n\n" - "ffn is function to free alocated data\n\n" - - }, - - { - "msg_close", - (PyCFunction) pyZMQ_msg_close, - METH_VARARGS | METH_KEYWORDS, - "msg_close (msg) -> None\n\n" - "Deletes the message.\n\n" - "msg is the message the be freed\n\n" - - }, - { - "msg_move", - (PyCFunction) pyZMQ_msg_move, - METH_VARARGS | METH_KEYWORDS, - "msg_move (src) -> dest\n\n" - "Move the content of the message from 'src' to 'dest'.\n\n" - "The content isn't copied, just moved. 'src' is an empty\n\n" - "message after the call. Original content of 'dest' message\n\n" - "is deallocated.\n\n" - - }, - { - "msg_copy", - (PyCFunction) pyZMQ_msg_copy, - METH_VARARGS | METH_KEYWORDS, - "msg_copy (src) -> dest\n\n" - "Copy the 'src' message to 'dest'. The content isn't copied, \n\n" - "instead reference count is increased. Don't modify the message \n\n" - "data after the call as they are shared between two messages.\n\n" - "Original content of 'dest' message is deallocated.\n\n" - - }, - { - "msg_data", - (PyCFunction) pyZMQ_msg_data, - METH_VARARGS | METH_KEYWORDS, - "msg_data (msg) -> data\n\n" - "Returns pointer to message data.\n\n" - - }, - { - "msg_size", - (PyCFunction) pyZMQ_msg_size, - METH_VARARGS | METH_KEYWORDS, - "msg_size (msg) -> size\n\n" - "Returns size of a message.\n\n" - - }, - { - "msg_type", - (PyCFunction) pyZMQ_msg_type, - METH_VARARGS | METH_KEYWORDS, - "msg_type (msg) -> type\n\n" - "Returns type of a message.\n\n" - - }, - { - "term", - (PyCFunction) pyZMQ_term, - METH_VARARGS | METH_KEYWORDS, - "term (context) -> None\n\n" - "Deinitialise 0SOCKETS context including all the open sockets.\n\n" - "Closing sockets after zmq_term has been called will result in\n\n" - "undefined behaviour.\n\n" - - }, - { - "close", - (PyCFunction) pyZMQ_close, - METH_VARARGS | METH_KEYWORDS, - "close (socket) -> None\n\n" - "Close the socket.\n\n" - - }, - - { - "socket", - (PyCFunction) pyZMQ_socket, - METH_VARARGS | METH_KEYWORDS, - "socket (context, type) -> None\n\n" - "Creates new socket.\n\n" - "'context' is a context_t object.\n" - "'type' is one of 'ZMQ_NOBLOCK', 'ZMQ_NOFLUSH', 'ZMQ_P2P', 'ZMQ_PUB', " - " 'ZMQ_SUB', 'ZMQ_REQ ZMQ_REP'\n" - }, + NULL + } +}; + +static PyTypeObject context_type = +{ + PyObject_HEAD_INIT (NULL) + 0, + "libpyzmq.Context", /* tp_name */ + sizeof (context_t), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) context_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + context_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) context_init, /* tp_init */ + 0, /* tp_alloc */ + context_new, /* tp_new */ +}; + +static PyMethodDef socket_methods [] = +{ { "setsockopt", - (PyCFunction) pyZMQ_setsockopt, + (PyCFunction) socket_setsockopt, METH_VARARGS | METH_KEYWORDS, - "setsockopt (socket, option, value) -> None\n\n" - "Set socket options." - "Possible options are: 'ZMQ_HWM', 'ZMQ_LWM', 'ZMQ_SWAP', 'ZMQ_MASK', " - "'ZMQ_AFFINITY', 'ZMQ_IDENTITY'." + "setsockopt (option, optval) -> None\n\n" }, { "bind", - (PyCFunction) pyZMQ_bind, + (PyCFunction) socket_bind, METH_VARARGS | METH_KEYWORDS, "bind (addr) -> None\n\n" - "Bind socket to specified address." }, { "connect", - (PyCFunction) pyZMQ_connect, + (PyCFunction) socket_connect, METH_VARARGS | METH_KEYWORDS, "connect (addr) -> None\n\n" - "connect socket to specified address." }, { - "flush", - (PyCFunction) pyZMQ_flush, + "send", + (PyCFunction) socket_send, METH_VARARGS | METH_KEYWORDS, - "flush (addr) -> None\n\n" - "flush " + "send (msg, [flags]) -> Bool\n\n" }, { - "send", - (PyCFunction) pyZMQ_send, + "flush", + (PyCFunction) socket_flush, METH_VARARGS | METH_KEYWORDS, - "send (message, flags) -> sent\n\n" - "Send a message to within the socket, " - "returns integer specifying if the message was sent.\n" - "'message' is message to be sent.\n" - "'flags' is integer specifying send options.\n" + "flush () -> None\n\n" }, { - "receive", - (PyCFunction) pyZMQ_receive, + "recv", + (PyCFunction) socket_recv, METH_VARARGS | METH_KEYWORDS, - "receive (flags) -> (received, message, type)\n\n" - "Receive a message." - "'flags' is integer specifying receive options.\n" - "'message' is string storing the message received.\n" - "'type' is type of the message received.\n" - + "recv ([flags]) -> String\n\n" }, { NULL } }; -static const char* pyZMQ_ZMQ_doc = - "0MQ messaging session\n\n" - "Available functions:\n" - " context\n" - " socket\n" - " setsockopt\n" - " bind\n" - " send\n" - " flush\n" - " receive\n\n"; - -static PyTypeObject pyZMQType = +static PyTypeObject socket_type = { PyObject_HEAD_INIT (NULL) 0, - "libpyzmq.Zmq", /* tp_name (This will appear in the default - textual representation of our objects and - in some error messages)*/ - sizeof (pyZMQ), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor) pyZMQ_dealloc,/* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - (char*) pyZMQ_ZMQ_doc, /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - pyZMQ_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc) pyZMQ_init, /* tp_init */ - 0, /* tp_alloc */ - pyZMQ_new, /* tp_new */ + "libpyzmq.Socket" , /* tp_name */ + sizeof (socket_t), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) socket_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + socket_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) socket_init, /* tp_init */ + 0, /* tp_alloc */ + socket_new, /* tp_new */ }; -static PyMethodDef module_methods[] = -{ - { NULL, NULL, 0, NULL } -}; +static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }}; + +static const char* libpyzmq_doc = + "Python API for 0MQ lightweight messaging kernel.\n" + "For more information see http://www.zeromq.org.\n" + "0MQ is distributed under GNU Lesser General Public License v3.\n"; #ifndef PyMODINIT_FUNC #define PyMODINIT_FUNC void #endif -static const char* pyZMQ_doc = - "0MQ Python Module\n\n" - "Constructor:\n" - " z = libpyzmq.Zmq ()\n" - "Available functions:\n" - " context\n" - " socket\n" - " setsockopt\n" - " bind\n" - " send\n" - " flush\n" - " receive\n" - "\n" - "For more information see http://www.zeromq.org.\n" - "\n" - "0MQ is distributed under GNU Lesser General Public License v3\n"; - -PyMODINIT_FUNC initlibpyzmq (void) +PyMODINIT_FUNC initlibpyzmq () { - if (PyType_Ready (&pyZMQType) < 0) + if (PyType_Ready (&context_type) < 0 && PyType_Ready (&socket_type) < 0) return; - PyObject *m = Py_InitModule3 ("libpyzmq", module_methods, - (char*) pyZMQ_doc); - if (!m) + PyObject *module = Py_InitModule3 ("libpyzmq", module_methods, + libpyzmq_doc); + if (!module) return; - Py_INCREF (&pyZMQType); + Py_INCREF (&context_type); + Py_INCREF (&socket_type); + PyModule_AddObject (module, "Context", (PyObject*) &context_type); + PyModule_AddObject (module, "Socket", (PyObject*) &socket_type); - PyModule_AddObject (m, "Zmq", (PyObject*) &pyZMQType); - - PyObject *d = PyModule_GetDict (m); - - - PyObject *t = PyInt_FromLong (ZMQ_NOBLOCK); - PyDict_SetItemString (d, "ZMQ_NOBLOCK", t); + PyObject *dict = PyModule_GetDict (module); + assert (dict); + PyObject *t; + t = PyInt_FromLong (ZMQ_NOBLOCK); + PyDict_SetItemString (dict, "NOBLOCK", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_NOFLUSH); - PyDict_SetItemString (d, "ZMQ_NOFLUSH", t); + PyDict_SetItemString (dict, "NOFLUSH", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_P2P); - PyDict_SetItemString (d, "ZMQ_P2P", t); + PyDict_SetItemString (dict, "P2P", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_PUB); - PyDict_SetItemString (d, "ZMQ_PUB", t); + PyDict_SetItemString (dict, "PUB", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_SUB); - PyDict_SetItemString (d, "ZMQ_SUB", t); + PyDict_SetItemString (dict, "SUB", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_REQ); - PyDict_SetItemString (d, "ZMQ_REQ", t); + PyDict_SetItemString (dict, "REQ", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_REP); - PyDict_SetItemString (d, "ZMQ_REP", t); + PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); - t = PyInt_FromLong (ZMQ_HWM); - PyDict_SetItemString (d, "ZMQ_HWM", t); + PyDict_SetItemString (dict, "HWM", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_LWM); - PyDict_SetItemString (d, "ZMQ_LWM", t); - Py_DECREF (t); + PyDict_SetItemString (dict, "LWM", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_SWAP); - PyDict_SetItemString (d, "ZMQ_SWAP", t); + PyDict_SetItemString (dict, "SWAP", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_MASK); - PyDict_SetItemString (d, "ZMQ_MASK", t); - Py_DECREF (t); + PyDict_SetItemString (dict, "MASK", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_AFFINITY); - PyDict_SetItemString (d, "ZMQ_AFFINITY", t); + PyDict_SetItemString (dict, "AFFINITY", t); Py_DECREF (t); t = PyInt_FromLong (ZMQ_IDENTITY); - PyDict_SetItemString (d, "ZMQ_IDENTITY", t); - Py_DECREF (t); - - + PyDict_SetItemString (dict, "IDENTITY", t); + Py_DECREF (t); } diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index 14a479e..8ee2984 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -25,6 +25,8 @@ namespace zmq struct i_endpoint { + virtual void attach_inpipe (class reader_t *pipe_) = 0; + virtual void attach_outpipe (class writer_t *pipe_) = 0; virtual void revive (class reader_t *pipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; diff --git a/src/session.cpp b/src/session.cpp index f562bd5..ef17d6d 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -43,21 +43,6 @@ zmq::session_t::~session_t () out_pipe->term (); } -void zmq::session_t::set_inbound_pipe (reader_t *pipe_) -{ - zmq_assert (!in_pipe); - in_pipe = pipe_; - active = true; - in_pipe->set_endpoint (this); -} -void zmq::session_t::set_outbound_pipe (writer_t *pipe_) -{ - zmq_assert (!out_pipe); - out_pipe = pipe_; - out_pipe->set_endpoint (this); -} - - bool zmq::session_t::read (::zmq_msg_t *msg_) { if (!active) @@ -90,6 +75,20 @@ void zmq::session_t::detach () // term (); } +void zmq::session_t::attach_inpipe (reader_t *pipe_) +{ + zmq_assert (!in_pipe); + in_pipe = pipe_; + active = true; + in_pipe->set_endpoint (this); +} +void zmq::session_t::attach_outpipe (writer_t *pipe_) +{ + zmq_assert (!out_pipe); + out_pipe = pipe_; + out_pipe->set_endpoint (this); +} + void zmq::session_t::revive (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); diff --git a/src/session.hpp b/src/session.hpp index 46699cf..195bdca 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -37,13 +37,6 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const char *name_, const options_t &options_); - void set_inbound_pipe (class reader_t *pipe_); - void set_outbound_pipe (class writer_t *pipe_); - - private: - - ~session_t (); - // i_inout interface implementation. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); @@ -51,10 +44,16 @@ namespace zmq void detach (); // i_endpoint interface implementation. + void attach_inpipe (class reader_t *pipe_); + void attach_outpipe (class writer_t *pipe_); void revive (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); + private: + + ~session_t (); + // Handlers for incoming commands. void process_plug (); void process_unplug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e14065b..4e14c68 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); zmq_assert (in_pipe); in_pipe->reader.set_endpoint (this); - session->set_outbound_pipe (&in_pipe->writer); + session->attach_outpipe (&in_pipe->writer); in_pipes.push_back (&in_pipe->reader); in_pipes.back ()->set_index (active); in_pipes [active]->set_index (in_pipes.size () - 1); @@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); out_pipe->writer.set_endpoint (this); - session->set_inbound_pipe (&out_pipe->reader); + session->attach_inpipe (&out_pipe->reader); out_pipes.push_back (&out_pipe->writer); // Activate the session. @@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) return it->second; } +void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_) +{ + pipe_->set_endpoint (this); + in_pipes.push_back (pipe_); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; +} + +void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_) +{ + pipe_->set_endpoint (this); + out_pipes.push_back (pipe_); +} + void zmq::socket_base_t::revive (reader_t *pipe_) { // Move the pipe to the list of active pipes. @@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_, reader_t *in_pipe_, writer_t *out_pipe_) { zmq_assert (in_pipe_); - in_pipe_->set_endpoint (this); - in_pipes.push_back (in_pipe_); - in_pipes.back ()->set_index (active); - in_pipes [active]->set_index (in_pipes.size () - 1); - std::swap (in_pipes.back (), in_pipes [active]); - active++; + attach_inpipe (in_pipe_); zmq_assert (out_pipe_); - out_pipe_->set_endpoint (this); - out_pipes.push_back (out_pipe_); + attach_outpipe (out_pipe_); } void zmq::socket_base_t::process_term_req (owned_t *object_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 490c09a..284d2c4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -60,6 +60,8 @@ namespace zmq class session_t *find_session (const char *name_); // i_endpoint interface implementation. + void attach_inpipe (class reader_t *pipe_); + void attach_outpipe (class writer_t *pipe_); void revive (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); |