summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--configure.in3
-rw-r--r--perf/Makefile.am4
-rw-r--r--perf/c/local_lat.c11
-rw-r--r--perf/c/local_thr.c10
-rw-r--r--perf/c/remote_lat.c11
-rw-r--r--perf/c/remote_thr.c11
-rw-r--r--perf/cpp/local_lat.cpp8
-rw-r--r--perf/cpp/local_thr.cpp8
-rw-r--r--perf/cpp/remote_lat.cpp8
-rw-r--r--perf/cpp/remote_thr.cpp8
-rw-r--r--perf/python/Makefile.am5
-rw-r--r--perf/python/local_lat.py6
-rw-r--r--perf/python/remote_lat.py4
-rw-r--r--perf/python/remote_thr.py2
-rw-r--r--python/pyzmq.cpp25
-rw-r--r--src/app_thread.cpp4
-rw-r--r--src/dispatcher.cpp36
-rw-r--r--src/dispatcher.hpp25
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp1
-rw-r--r--src/session.cpp7
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/zmq.cpp3
23 files changed, 148 insertions, 69 deletions
diff --git a/configure.in b/configure.in
index f1789b6..8feef7b 100644
--- a/configure.in
+++ b/configure.in
@@ -328,7 +328,8 @@ AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile \
- java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile)
+ java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
+ perf/python/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
diff --git a/perf/Makefile.am b/perf/Makefile.am
index 7e87d68..dbebc93 100644
--- a/perf/Makefile.am
+++ b/perf/Makefile.am
@@ -1,2 +1,2 @@
-SUBDIRS = c cpp
-DIST_SUBDIRS = c cpp
+SUBDIRS = c cpp python
+DIST_SUBDIRS = c cpp python
diff --git a/perf/c/local_lat.c b/perf/c/local_lat.c
index 81a2e0c..92cfadf 100644
--- a/perf/c/local_lat.c
+++ b/perf/c/local_lat.c
@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct zmq_msg_t msg;
if (argc != 4) {
- printf ("usage: local_lat <bind-to> <roundtrip-count> "
- "<message-size>\n");
+ printf ("usage: local_lat <bind-to> <message-size> "
+ "<roundtrip-count>\n");
return 1;
}
bind_to = argv [1];
- roundtrip_count = atoi (argv [2]);
- message_size = atoi (argv [3]);
+ message_size = atoi (argv [2]);
+ roundtrip_count = atoi (argv [3]);
ctx = zmq_init (1, 1);
assert (ctx);
@@ -68,6 +68,9 @@ int main (int argc, char *argv [])
sleep (1);
+ rc = zmq_close (s);
+ assert (rc == 0);
+
rc = zmq_term (ctx);
assert (rc == 0);
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index 64c492d..71ed21c 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -41,13 +41,12 @@ int main (int argc, char *argv [])
double megabits;
if (argc != 4) {
- printf ("usage: local_thr <bind-to> <message-count> "
- "<message-size>\n");
+ printf ("usage: local_thr <bind-to> <message-size> <message-count>\n");
return 1;
}
bind_to = argv [1];
- message_count = atoi (argv [2]);
- message_size = atoi (argv [3]);
+ message_size = atoi (argv [2]);
+ message_count = atoi (argv [3]);
ctx = zmq_init (1, 1);
assert (ctx);
@@ -92,6 +91,9 @@ int main (int argc, char *argv [])
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
+ rc = zmq_close (s);
+ assert (rc == 0);
+
rc = zmq_term (ctx);
assert (rc == 0);
diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c
index 32329b8..6da1c42 100644
--- a/perf/c/remote_lat.c
+++ b/perf/c/remote_lat.c
@@ -39,13 +39,13 @@ int main (int argc, char *argv [])
double latency;
if (argc != 4) {
- printf ("usage: remote_lat <connect-to> <roundtrip-count> "
- "<message-size>\n");
+ printf ("usage: remote_lat <connect-to> <message-size> "
+ "<roundtrip-count>\n");
return 1;
}
connect_to = argv [1];
- roundtrip_count = atoi (argv [2]);
- message_size = atoi (argv [3]);
+ message_size = atoi (argv [2]);
+ roundtrip_count = atoi (argv [3]);
ctx = zmq_init (1, 1);
assert (ctx);
@@ -87,6 +87,9 @@ int main (int argc, char *argv [])
printf ("roundtrip count: %d\n", (int) roundtrip_count);
printf ("average latency: %.3f [us]\n", (double) latency);
+ rc = zmq_close (s);
+ assert (rc == 0);
+
rc = zmq_term (ctx);
assert (rc == 0);
diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c
index 1010bc9..9606d00 100644
--- a/perf/c/remote_thr.c
+++ b/perf/c/remote_thr.c
@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct zmq_msg_t msg;
if (argc != 4) {
- printf ("usage: remote_thr <connect-to> <message-count> "
- "<message-size>\n");
+ printf ("usage: remote_thr <connect-to> <message-size> "
+ "<message-count>\n");
return 1;
}
connect_to = argv [1];
- message_count = atoi (argv [2]);
- message_size = atoi (argv [3]);
+ message_size = atoi (argv [2]);
+ message_count = atoi (argv [3]);
ctx = zmq_init (1, 1);
assert (ctx);
@@ -63,6 +63,9 @@ int main (int argc, char *argv [])
sleep (10);
+ rc = zmq_close (s);
+ assert (rc == 0);
+
rc = zmq_term (ctx);
assert (rc == 0);
diff --git a/perf/cpp/local_lat.cpp b/perf/cpp/local_lat.cpp
index 9260f0a..343ca74 100644
--- a/perf/cpp/local_lat.cpp
+++ b/perf/cpp/local_lat.cpp
@@ -27,13 +27,13 @@
int main (int argc, char *argv [])
{
if (argc != 4) {
- printf ("usage: local_lat <bind-to> <roundtrip-count> "
- "<message-size>\n");
+ printf ("usage: local_lat <bind-to> <message-size> "
+ "<roundtrip-count>\n");
return 1;
}
const char *bind_to = argv [1];
- int roundtrip_count = atoi (argv [2]);
- size_t message_size = (size_t) atoi (argv [3]);
+ size_t message_size = (size_t) atoi (argv [2]);
+ int roundtrip_count = atoi (argv [3]);
zmq::context_t ctx (1, 1);
diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp
index 3e961de..ca81ba9 100644
--- a/perf/cpp/local_thr.cpp
+++ b/perf/cpp/local_thr.cpp
@@ -28,13 +28,13 @@
int main (int argc, char *argv [])
{
if (argc != 4) {
- printf ("usage: local_thr <bind-to> <message-count> "
- "<message-size>\n");
+ printf ("usage: local_thr <bind-to> <message-size> "
+ "<message-count>\n");
return 1;
}
const char *bind_to = argv [1];
- int message_count = atoi (argv [2]);
- size_t message_size = (size_t) atoi (argv [3]);
+ size_t message_size = (size_t) atoi (argv [2]);
+ int message_count = atoi (argv [3]);
zmq::context_t ctx (1, 1);
diff --git a/perf/cpp/remote_lat.cpp b/perf/cpp/remote_lat.cpp
index 169ed1e..c3ded10 100644
--- a/perf/cpp/remote_lat.cpp
+++ b/perf/cpp/remote_lat.cpp
@@ -27,13 +27,13 @@
int main (int argc, char *argv [])
{
if (argc != 4) {
- printf ("usage: remote_lat <connect-to> <roundtrip-count> "
- "<message-size>\n");
+ printf ("usage: remote_lat <connect-to> <message-size> "
+ "<roundtrip-count>\n");
return 1;
}
const char *connect_to = argv [1];
- int roundtrip_count = atoi (argv [2]);
- size_t message_size = (size_t) atoi (argv [3]);
+ size_t message_size = (size_t) atoi (argv [2]);
+ int roundtrip_count = atoi (argv [3]);
zmq::context_t ctx (1, 1);
diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp
index 06946f5..5474c6a 100644
--- a/perf/cpp/remote_thr.cpp
+++ b/perf/cpp/remote_thr.cpp
@@ -27,13 +27,13 @@
int main (int argc, char *argv [])
{
if (argc != 4) {
- printf ("usage: remote_thr <connect-to> <message-count> "
- "<message-size>\n");
+ printf ("usage: remote_thr <connect-to> <message-size> "
+ "<message-count>\n");
return 1;
}
const char *connect_to = argv [1];
- int message_count = atoi (argv [2]);
- size_t message_size = (size_t) atoi (argv [3]);
+ size_t message_size = (size_t) atoi (argv [2]);
+ int message_count = atoi (argv [3]);
zmq::context_t ctx (1, 1);
diff --git a/perf/python/Makefile.am b/perf/python/Makefile.am
new file mode 100644
index 0000000..cda8477
--- /dev/null
+++ b/perf/python/Makefile.am
@@ -0,0 +1,5 @@
+EXTRA_DIST = \
+ local_lat.py \
+ remote_lat.py \
+ local_thr.py \
+ remote_thr.py
diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py
index 7f1503f..e9d46e0 100644
--- a/perf/python/local_lat.py
+++ b/perf/python/local_lat.py
@@ -23,13 +23,13 @@ import libpyzmq
def main ():
if len (sys.argv) != 4:
- print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>'
+ print 'usage: local_lat <bind-to> <message-size> <roundtrip-count>'
sys.exit (1)
try:
bind_to = sys.argv [1]
- roundtrip_count = int (sys.argv [2])
- message_size = int (sys.argv [3])
+ message_size = int (sys.argv [2])
+ roundtrip_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and roundtrip-count must be integers'
sys.exit (1)
diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py
index 372f567..f2ee04a 100644
--- a/perf/python/remote_lat.py
+++ b/perf/python/remote_lat.py
@@ -23,7 +23,7 @@ import libpyzmq
def main ():
if len(sys.argv) != 4:
- print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>'
+ print 'usage: remote_lat <connect-to> <message-size> <roundtrip-count>'
sys.exit (1)
try:
@@ -49,7 +49,7 @@ def main ():
end = datetime.now ()
delta = (end - start).microseconds + 1000000 * (end - start).seconds
- latency = delta / roundtrip_count / 2
+ latency = float (delta) / roundtrip_count / 2
print "message size: %.0f [B]" % (message_size, )
print "roundtrip count: %.0f" % (roundtrip_count, )
diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py
index a80adfd..bab001d 100644
--- a/perf/python/remote_thr.py
+++ b/perf/python/remote_thr.py
@@ -27,7 +27,7 @@ def main ():
sys.exit (1)
try:
- connect_to = argv [1]
+ connect_to = sys.argv [1]
message_size = int (sys.argv [2])
message_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp
index 8913b8a..2fc32d1 100644
--- a/python/pyzmq.cpp
+++ b/python/pyzmq.cpp
@@ -33,7 +33,6 @@ struct context_t
PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
-printf ("context_new\n");
context_t *self = (context_t*) type->tp_alloc (type, 0);
if (self)
@@ -45,34 +44,27 @@ printf ("context_new\n");
int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{
-printf ("context_init\n");
int app_threads;
int io_threads;
static const char *kwlist [] = {"app_threads", "io_threads", NULL};
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist,
&app_threads, &io_threads)) {
PyErr_SetString (PyExc_SystemError, "invalid arguments");
-printf ("context_init err1\n");
return -1; // ?
}
-printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads);
-
assert (!self->handle);
self->handle = zmq_init (app_threads, io_threads);
if (!self->handle) {
PyErr_SetString (PyExc_SystemError, strerror (errno));
return -1; // ?
-printf ("context_init err2\n");
}
-printf ("context_init ok\n");
return 0;
}
void context_dealloc (context_t *self)
{
-printf ("context_dealloc\n");
if (self->handle) {
int rc = zmq_term (self->handle);
if (rc != 0)
@@ -90,7 +82,6 @@ struct socket_t
PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
-printf ("socket_new\n");
socket_t *self = (socket_t*) type->tp_alloc (type, 0);
if (self)
@@ -101,7 +92,6 @@ printf ("socket_new\n");
int socket_init (socket_t *self, PyObject *args, PyObject *kwdict)
{
-printf ("socket_init\n");
context_t *context;
int socket_type;
static const char *kwlist [] = {"context", "type", NULL};
@@ -124,7 +114,6 @@ printf ("socket_init\n");
void socket_dealloc (socket_t *self)
{
-printf ("socket_dealloc\n");
if (self->handle) {
int rc = zmq_close (self->handle);
if (rc != 0)
@@ -340,7 +329,7 @@ static PyTypeObject context_type =
0, /* tp_dictoffset */
(initproc) context_init, /* tp_init */
0, /* tp_alloc */
- context_new, /* tp_new */
+ context_new /* tp_new */
};
static PyMethodDef socket_methods [] =
@@ -390,7 +379,7 @@ static PyTypeObject socket_type =
{
PyObject_HEAD_INIT (NULL)
0,
- "libpyzmq.Socket" , /* tp_name */
+ "libpyzmq.Socket", /* tp_name */
sizeof (socket_t), /* tp_basicsize */
0, /* tp_itemsize */
(destructor) socket_dealloc, /* tp_dealloc */
@@ -426,7 +415,7 @@ static PyTypeObject socket_type =
0, /* tp_dictoffset */
(initproc) socket_init, /* tp_init */
0, /* tp_alloc */
- socket_new, /* tp_new */
+ socket_new /* tp_new */
};
static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }};
@@ -442,8 +431,10 @@ static const char* libpyzmq_doc =
PyMODINIT_FUNC initlibpyzmq ()
{
- if (PyType_Ready (&context_type) < 0 && PyType_Ready (&socket_type) < 0)
- return;
+ int rc = PyType_Ready (&context_type);
+ assert (rc == 0);
+ rc = PyType_Ready (&socket_type);
+ assert (rc == 0);
PyObject *module = Py_InitModule3 ("libpyzmq", module_methods,
libpyzmq_doc);
@@ -451,8 +442,8 @@ PyMODINIT_FUNC initlibpyzmq ()
return;
Py_INCREF (&context_type);
- Py_INCREF (&socket_type);
PyModule_AddObject (module, "Context", (PyObject*) &context_type);
+ Py_INCREF (&socket_type);
PyModule_AddObject (module, "Socket", (PyObject*) &socket_type);
PyObject *dict = PyModule_GetDict (module);
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index e108594..58fe19d 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t ()
{
- // Destroy all the sockets owned by this application thread.
- for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
- delete *it;
+ zmq_assert (sockets.empty ());
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 71e20df..49c2197 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -30,7 +30,9 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
+ sockets (0),
+ terminated (false)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
+int zmq::dispatcher_t::term ()
+{
+ term_sync.lock ();
+ zmq_assert (!terminated);
+ terminated = true;
+ bool destroy = (sockets == 0);
+ term_sync.unlock ();
+
+ if (destroy)
+ delete this;
+
+ return 0;
+}
+
zmq::dispatcher_t::~dispatcher_t ()
{
// Close all application theads, sockets, io_objects etc.
@@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
}
threads_sync.unlock ();
+ term_sync.lock ();
+ sockets++;
+ term_sync.unlock ();
+
return thread->create_socket (type_);
}
+void zmq::dispatcher_t::destroy_socket ()
+{
+ // If zmq_term was already called and there are no more sockets,
+ // terminate the whole 0MQ infrastructure.
+ term_sync.lock ();
+ zmq_assert (sockets > 0);
+ sockets--;
+ bool destroy = (sockets == 0 && terminated);
+ term_sync.unlock ();
+
+ if (destroy)
+ delete this;
+}
+
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index cb445ef..bd1f655 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -52,12 +52,18 @@ namespace zmq
// signalers.
dispatcher_t (int app_threads_, int io_threads_);
- // To be called to terminate the whole infrastructure (zmq_term).
- ~dispatcher_t ();
+ // This function is called when user invokes zmq_term. If there are
+ // no more sockets open it'll cause all the infrastructure to be shut
+ // down. If there are open sockets still, the deallocation happens
+ // after the last one is closed.
+ int term ();
// Create a socket.
class socket_base_t *create_socket (int type_);
+ // Destroy a socket.
+ void destroy_socket ();
+
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
@@ -93,6 +99,8 @@ namespace zmq
private:
+ ~dispatcher_t ();
+
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
class app_thread_t *choose_app_thread ();
@@ -127,9 +135,20 @@ namespace zmq
typedef std::set <class pipe_t*> pipes_t;
pipes_t pipes;
- // Synchronisation of access to the pipes repository.
+ // Synchronisation of access to the pipes repository.
mutex_t pipes_sync;
+ // Number of sockets alive.
+ int sockets;
+
+ // If true, zmq_term was already called. When last socket is closed
+ // the whole 0MQ infrastructure should be deallocated.
+ bool terminated;
+
+ // Synchronisation of access to the termination data (socket count
+ // and 'terminated' flag).
+ mutex_t term_sync;
+
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
diff --git a/src/object.cpp b/src/object.cpp
index c0ef21c..1433b7b 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot ()
return thread_slot;
}
+zmq::dispatcher_t *zmq::object_t::get_dispatcher ()
+{
+ return dispatcher;
+}
+
void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
diff --git a/src/object.hpp b/src/object.hpp
index 250e856..2e41507 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -40,6 +40,7 @@ namespace zmq
~object_t ();
int get_thread_slot ();
+ dispatcher_t *get_dispatcher ();
void process_command (struct command_t &cmd_);
// Allow pipe to access corresponding dispatcher functions.
diff --git a/src/session.cpp b/src/session.cpp
index ac2dd12..bc334e0 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- return out_pipe->write (msg_);
+ if (out_pipe->write (msg_)) {
+ zmq_msg_init (msg_);
+ return true;
+ }
+
+ return false;
}
void zmq::session_t::flush ()
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6ad1f55..93a0a4c 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -24,7 +24,7 @@
#include "socket_base.hpp"
#include "app_thread.hpp"
-#include "err.hpp"
+#include "dispatcher.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "msg_content.hpp"
@@ -34,6 +34,7 @@
#include "owned.hpp"
#include "uuid.hpp"
#include "pipe.hpp"
+#include "err.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
app_thread->remove_socket (this);
+
+ // Pointer to the dispatcher must be retrieved before the socket is
+ // deallocated. Afterwards it is not available.
+ dispatcher_t *dispatcher = get_dispatcher ();
delete this;
+
+ // This function must be called after the socket is completely deallocated
+ // as it may cause termination of the whole 0MQ infrastructure.
+ dispatcher->destroy_socket ();
+
return 0;
}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 49096ad..0ffd530 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_)
int zmq_term (void *dispatcher_)
{
- delete (zmq::dispatcher_t*) dispatcher_;
- return 0;
+ return ((zmq::dispatcher_t*) dispatcher_)->term ();
}
void *zmq_socket (void *dispatcher_, int type_)