diff options
-rw-r--r-- | configure.in | 3 | ||||
-rw-r--r-- | perf/Makefile.am | 4 | ||||
-rw-r--r-- | perf/c/local_lat.c | 11 | ||||
-rw-r--r-- | perf/c/local_thr.c | 10 | ||||
-rw-r--r-- | perf/c/remote_lat.c | 11 | ||||
-rw-r--r-- | perf/c/remote_thr.c | 11 | ||||
-rw-r--r-- | perf/cpp/local_lat.cpp | 8 | ||||
-rw-r--r-- | perf/cpp/local_thr.cpp | 8 | ||||
-rw-r--r-- | perf/cpp/remote_lat.cpp | 8 | ||||
-rw-r--r-- | perf/cpp/remote_thr.cpp | 8 | ||||
-rw-r--r-- | perf/python/Makefile.am | 5 | ||||
-rw-r--r-- | perf/python/local_lat.py | 6 | ||||
-rw-r--r-- | perf/python/remote_lat.py | 4 | ||||
-rw-r--r-- | perf/python/remote_thr.py | 2 | ||||
-rw-r--r-- | python/pyzmq.cpp | 25 | ||||
-rw-r--r-- | src/app_thread.cpp | 4 | ||||
-rw-r--r-- | src/dispatcher.cpp | 36 | ||||
-rw-r--r-- | src/dispatcher.hpp | 25 | ||||
-rw-r--r-- | src/object.cpp | 5 | ||||
-rw-r--r-- | src/object.hpp | 1 | ||||
-rw-r--r-- | src/session.cpp | 7 | ||||
-rw-r--r-- | src/socket_base.cpp | 12 | ||||
-rw-r--r-- | src/zmq.cpp | 3 |
23 files changed, 148 insertions, 69 deletions
diff --git a/configure.in b/configure.in index f1789b6..8feef7b 100644 --- a/configure.in +++ b/configure.in @@ -328,7 +328,8 @@ AC_TYPE_SIGNAL AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile \ - java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile) + java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ + perf/python/Makefile) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) diff --git a/perf/Makefile.am b/perf/Makefile.am index 7e87d68..dbebc93 100644 --- a/perf/Makefile.am +++ b/perf/Makefile.am @@ -1,2 +1,2 @@ -SUBDIRS = c cpp -DIST_SUBDIRS = c cpp +SUBDIRS = c cpp python +DIST_SUBDIRS = c cpp python diff --git a/perf/c/local_lat.c b/perf/c/local_lat.c index 81a2e0c..92cfadf 100644 --- a/perf/c/local_lat.c +++ b/perf/c/local_lat.c @@ -35,13 +35,13 @@ int main (int argc, char *argv []) struct zmq_msg_t msg; if (argc != 4) { - printf ("usage: local_lat <bind-to> <roundtrip-count> " - "<message-size>\n"); + printf ("usage: local_lat <bind-to> <message-size> " + "<roundtrip-count>\n"); return 1; } bind_to = argv [1]; - roundtrip_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + roundtrip_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -68,6 +68,9 @@ int main (int argc, char *argv []) sleep (1); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 64c492d..71ed21c 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -41,13 +41,12 @@ int main (int argc, char *argv []) double megabits; if (argc != 4) { - printf ("usage: local_thr <bind-to> <message-count> " - "<message-size>\n"); + printf ("usage: local_thr <bind-to> <message-size> <message-count>\n"); return 1; } bind_to = argv [1]; - message_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -92,6 +91,9 @@ int main (int argc, char *argv []) printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c index 32329b8..6da1c42 100644 --- a/perf/c/remote_lat.c +++ b/perf/c/remote_lat.c @@ -39,13 +39,13 @@ int main (int argc, char *argv []) double latency; if (argc != 4) { - printf ("usage: remote_lat <connect-to> <roundtrip-count> " - "<message-size>\n"); + printf ("usage: remote_lat <connect-to> <message-size> " + "<roundtrip-count>\n"); return 1; } connect_to = argv [1]; - roundtrip_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + roundtrip_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -87,6 +87,9 @@ int main (int argc, char *argv []) printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("average latency: %.3f [us]\n", (double) latency); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c index 1010bc9..9606d00 100644 --- a/perf/c/remote_thr.c +++ b/perf/c/remote_thr.c @@ -35,13 +35,13 @@ int main (int argc, char *argv []) struct zmq_msg_t msg; if (argc != 4) { - printf ("usage: remote_thr <connect-to> <message-count> " - "<message-size>\n"); + printf ("usage: remote_thr <connect-to> <message-size> " + "<message-count>\n"); return 1; } connect_to = argv [1]; - message_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -63,6 +63,9 @@ int main (int argc, char *argv []) sleep (10); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/cpp/local_lat.cpp b/perf/cpp/local_lat.cpp index 9260f0a..343ca74 100644 --- a/perf/cpp/local_lat.cpp +++ b/perf/cpp/local_lat.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: local_lat <bind-to> <roundtrip-count> " - "<message-size>\n"); + printf ("usage: local_lat <bind-to> <message-size> " + "<roundtrip-count>\n"); return 1; } const char *bind_to = argv [1]; - int roundtrip_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int roundtrip_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index 3e961de..ca81ba9 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -28,13 +28,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: local_thr <bind-to> <message-count> " - "<message-size>\n"); + printf ("usage: local_thr <bind-to> <message-size> " + "<message-count>\n"); return 1; } const char *bind_to = argv [1]; - int message_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int message_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/remote_lat.cpp b/perf/cpp/remote_lat.cpp index 169ed1e..c3ded10 100644 --- a/perf/cpp/remote_lat.cpp +++ b/perf/cpp/remote_lat.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: remote_lat <connect-to> <roundtrip-count> " - "<message-size>\n"); + printf ("usage: remote_lat <connect-to> <message-size> " + "<roundtrip-count>\n"); return 1; } const char *connect_to = argv [1]; - int roundtrip_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int roundtrip_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp index 06946f5..5474c6a 100644 --- a/perf/cpp/remote_thr.cpp +++ b/perf/cpp/remote_thr.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: remote_thr <connect-to> <message-count> " - "<message-size>\n"); + printf ("usage: remote_thr <connect-to> <message-size> " + "<message-count>\n"); return 1; } const char *connect_to = argv [1]; - int message_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int message_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/python/Makefile.am b/perf/python/Makefile.am new file mode 100644 index 0000000..cda8477 --- /dev/null +++ b/perf/python/Makefile.am @@ -0,0 +1,5 @@ +EXTRA_DIST = \ + local_lat.py \ + remote_lat.py \ + local_thr.py \ + remote_thr.py diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py index 7f1503f..e9d46e0 100644 --- a/perf/python/local_lat.py +++ b/perf/python/local_lat.py @@ -23,13 +23,13 @@ import libpyzmq def main (): if len (sys.argv) != 4: - print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>' + print 'usage: local_lat <bind-to> <message-size> <roundtrip-count>' sys.exit (1) try: bind_to = sys.argv [1] - roundtrip_count = int (sys.argv [2]) - message_size = int (sys.argv [3]) + message_size = int (sys.argv [2]) + roundtrip_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and roundtrip-count must be integers' sys.exit (1) diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py index 372f567..f2ee04a 100644 --- a/perf/python/remote_lat.py +++ b/perf/python/remote_lat.py @@ -23,7 +23,7 @@ import libpyzmq def main (): if len(sys.argv) != 4: - print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>' + print 'usage: remote_lat <connect-to> <message-size> <roundtrip-count>' sys.exit (1) try: @@ -49,7 +49,7 @@ def main (): end = datetime.now () delta = (end - start).microseconds + 1000000 * (end - start).seconds - latency = delta / roundtrip_count / 2 + latency = float (delta) / roundtrip_count / 2 print "message size: %.0f [B]" % (message_size, ) print "roundtrip count: %.0f" % (roundtrip_count, ) diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index a80adfd..bab001d 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -27,7 +27,7 @@ def main (): sys.exit (1) try: - connect_to = argv [1] + connect_to = sys.argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp index 8913b8a..2fc32d1 100644 --- a/python/pyzmq.cpp +++ b/python/pyzmq.cpp @@ -33,7 +33,6 @@ struct context_t PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { -printf ("context_new\n"); context_t *self = (context_t*) type->tp_alloc (type, 0); if (self) @@ -45,34 +44,27 @@ printf ("context_new\n"); int context_init (context_t *self, PyObject *args, PyObject *kwdict) { -printf ("context_init\n"); int app_threads; int io_threads; static const char *kwlist [] = {"app_threads", "io_threads", NULL}; if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist, &app_threads, &io_threads)) { PyErr_SetString (PyExc_SystemError, "invalid arguments"); -printf ("context_init err1\n"); return -1; // ? } -printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads); - assert (!self->handle); self->handle = zmq_init (app_threads, io_threads); if (!self->handle) { PyErr_SetString (PyExc_SystemError, strerror (errno)); return -1; // ? -printf ("context_init err2\n"); } -printf ("context_init ok\n"); return 0; } void context_dealloc (context_t *self) { -printf ("context_dealloc\n"); if (self->handle) { int rc = zmq_term (self->handle); if (rc != 0) @@ -90,7 +82,6 @@ struct socket_t PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds) { -printf ("socket_new\n"); socket_t *self = (socket_t*) type->tp_alloc (type, 0); if (self) @@ -101,7 +92,6 @@ printf ("socket_new\n"); int socket_init (socket_t *self, PyObject *args, PyObject *kwdict) { -printf ("socket_init\n"); context_t *context; int socket_type; static const char *kwlist [] = {"context", "type", NULL}; @@ -124,7 +114,6 @@ printf ("socket_init\n"); void socket_dealloc (socket_t *self) { -printf ("socket_dealloc\n"); if (self->handle) { int rc = zmq_close (self->handle); if (rc != 0) @@ -340,7 +329,7 @@ static PyTypeObject context_type = 0, /* tp_dictoffset */ (initproc) context_init, /* tp_init */ 0, /* tp_alloc */ - context_new, /* tp_new */ + context_new /* tp_new */ }; static PyMethodDef socket_methods [] = @@ -390,7 +379,7 @@ static PyTypeObject socket_type = { PyObject_HEAD_INIT (NULL) 0, - "libpyzmq.Socket" , /* tp_name */ + "libpyzmq.Socket", /* tp_name */ sizeof (socket_t), /* tp_basicsize */ 0, /* tp_itemsize */ (destructor) socket_dealloc, /* tp_dealloc */ @@ -426,7 +415,7 @@ static PyTypeObject socket_type = 0, /* tp_dictoffset */ (initproc) socket_init, /* tp_init */ 0, /* tp_alloc */ - socket_new, /* tp_new */ + socket_new /* tp_new */ }; static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }}; @@ -442,8 +431,10 @@ static const char* libpyzmq_doc = PyMODINIT_FUNC initlibpyzmq () { - if (PyType_Ready (&context_type) < 0 && PyType_Ready (&socket_type) < 0) - return; + int rc = PyType_Ready (&context_type); + assert (rc == 0); + rc = PyType_Ready (&socket_type); + assert (rc == 0); PyObject *module = Py_InitModule3 ("libpyzmq", module_methods, libpyzmq_doc); @@ -451,8 +442,8 @@ PyMODINIT_FUNC initlibpyzmq () return; Py_INCREF (&context_type); - Py_INCREF (&socket_type); PyModule_AddObject (module, "Context", (PyObject*) &context_type); + Py_INCREF (&socket_type); PyModule_AddObject (module, "Socket", (PyObject*) &socket_type); PyObject *dict = PyModule_GetDict (module); diff --git a/src/app_thread.cpp b/src/app_thread.cpp index e108594..58fe19d 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Destroy all the sockets owned by this application thread. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - delete *it; + zmq_assert (sockets.empty ()); } zmq::i_signaler *zmq::app_thread_t::get_signaler () diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 71e20df..49c2197 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -30,7 +30,9 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : + sockets (0), + terminated (false) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) io_threads [i]->start (); } +int zmq::dispatcher_t::term () +{ + term_sync.lock (); + zmq_assert (!terminated); + terminated = true; + bool destroy = (sockets == 0); + term_sync.unlock (); + + if (destroy) + delete this; + + return 0; +} + zmq::dispatcher_t::~dispatcher_t () { // Close all application theads, sockets, io_objects etc. @@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) } threads_sync.unlock (); + term_sync.lock (); + sockets++; + term_sync.unlock (); + return thread->create_socket (type_); } +void zmq::dispatcher_t::destroy_socket () +{ + // If zmq_term was already called and there are no more sockets, + // terminate the whole 0MQ infrastructure. + term_sync.lock (); + zmq_assert (sockets > 0); + sockets--; + bool destroy = (sockets == 0 && terminated); + term_sync.unlock (); + + if (destroy) + delete this; +} + zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index cb445ef..bd1f655 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -52,12 +52,18 @@ namespace zmq // signalers. dispatcher_t (int app_threads_, int io_threads_); - // To be called to terminate the whole infrastructure (zmq_term). - ~dispatcher_t (); + // This function is called when user invokes zmq_term. If there are + // no more sockets open it'll cause all the infrastructure to be shut + // down. If there are open sockets still, the deallocation happens + // after the last one is closed. + int term (); // Create a socket. class socket_base_t *create_socket (int type_); + // Destroy a socket. + void destroy_socket (); + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. @@ -93,6 +99,8 @@ namespace zmq private: + ~dispatcher_t (); + // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. class app_thread_t *choose_app_thread (); @@ -127,9 +135,20 @@ namespace zmq typedef std::set <class pipe_t*> pipes_t; pipes_t pipes; - // Synchronisation of access to the pipes repository. + // Synchronisation of access to the pipes repository. mutex_t pipes_sync; + // Number of sockets alive. + int sockets; + + // If true, zmq_term was already called. When last socket is closed + // the whole 0MQ infrastructure should be deallocated. + bool terminated; + + // Synchronisation of access to the termination data (socket count + // and 'terminated' flag). + mutex_t term_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/object.cpp b/src/object.cpp index c0ef21c..1433b7b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot () return thread_slot; } +zmq::dispatcher_t *zmq::object_t::get_dispatcher () +{ + return dispatcher; +} + void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { diff --git a/src/object.hpp b/src/object.hpp index 250e856..2e41507 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -40,6 +40,7 @@ namespace zmq ~object_t (); int get_thread_slot (); + dispatcher_t *get_dispatcher (); void process_command (struct command_t &cmd_); // Allow pipe to access corresponding dispatcher functions. diff --git a/src/session.cpp b/src/session.cpp index ac2dd12..bc334e0 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - return out_pipe->write (msg_); + if (out_pipe->write (msg_)) { + zmq_msg_init (msg_); + return true; + } + + return false; } void zmq::session_t::flush () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6ad1f55..93a0a4c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -24,7 +24,7 @@ #include "socket_base.hpp" #include "app_thread.hpp" -#include "err.hpp" +#include "dispatcher.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "msg_content.hpp" @@ -34,6 +34,7 @@ #include "owned.hpp" #include "uuid.hpp" #include "pipe.hpp" +#include "err.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { app_thread->remove_socket (this); + + // Pointer to the dispatcher must be retrieved before the socket is + // deallocated. Afterwards it is not available. + dispatcher_t *dispatcher = get_dispatcher (); delete this; + + // This function must be called after the socket is completely deallocated + // as it may cause termination of the whole 0MQ infrastructure. + dispatcher->destroy_socket (); + return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index 49096ad..0ffd530 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_) int zmq_term (void *dispatcher_) { - delete (zmq::dispatcher_t*) dispatcher_; - return 0; + return ((zmq::dispatcher_t*) dispatcher_)->term (); } void *zmq_socket (void *dispatcher_, int type_) |