summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h6
-rw-r--r--perf/c/local_thr.c6
-rw-r--r--perf/cpp/local_thr.cpp5
-rw-r--r--perf/python/local_lat.py42
-rw-r--r--perf/python/local_thr.py44
-rw-r--r--perf/python/remote_lat.py43
-rw-r--r--perf/python/remote_thr.py17
-rw-r--r--python/pyzmq.cpp924
-rw-r--r--src/i_endpoint.hpp2
-rw-r--r--src/session.cpp29
-rw-r--r--src/session.hpp13
-rw-r--r--src/socket_base.cpp30
-rw-r--r--src/socket_base.hpp2
13 files changed, 452 insertions, 711 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 2b6f996..f321fa9 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied,
-// instead 0SOCKETS infrastructure take ownership of the buffer and call
+// instead 0MQ infrastructure take ownership of the buffer and call
// deallocation functio (ffn) once it's not needed anymore.
ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn);
@@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg);
// Returns type of the message.
ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
-// Initialise 0SOCKETS context. 'app_threads' specifies maximal number
+// Initialise 0MQ context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time.
// 'io_threads' specifies the size of thread pool to handle I/O operations.
//
@@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
// threads declared at all.
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads);
-// Deinitialise 0SOCKETS context including all the open sockets. Closing
+// Deinitialise 0MQ context including all the open sockets. Closing
// sockets after zmq_term has been called will result in undefined behaviour.
ZMQ_EXPORT int zmq_term (void *context);
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index 87c3220..b58fac3 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -38,6 +38,7 @@ int main (int argc, char *argv [])
struct timeval end;
uint64_t elapsed;
uint64_t throughput;
+ double megabits;
if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-count> "
@@ -81,12 +82,15 @@ int main (int argc, char *argv [])
elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
-
+ if (elapsed == 0)
+ elapsed = 1;
throughput = (uint64_t) message_count * 1000000 / elapsed;
+ megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
+ printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0;
}
diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp
index fdcbc8d..e328117 100644
--- a/perf/cpp/local_thr.cpp
+++ b/perf/cpp/local_thr.cpp
@@ -63,12 +63,15 @@ int main (int argc, char *argv [])
uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
-
+ if (elapsed == 0)
+ elapsed = 1;
uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed;
+ double megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
+ printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0;
}
diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py
index 9883dc0..7f1503f 100644
--- a/perf/python/local_lat.py
+++ b/perf/python/local_lat.py
@@ -18,50 +18,32 @@
#
import sys
-from datetime import datetime
-import libpyzmq
import time
-
+import libpyzmq
def main ():
- if len (sys.argv) != 5:
- print 'usage: py_local_lat <in-interface> <out-interface> <message-size> <roundtrip-count>'
+ if len (sys.argv) != 4:
+ print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>'
sys.exit (1)
try:
- in_interface = sys.argv [1]
- out_interface = sys.argv [2]
+ bind_to = sys.argv [1]
+ roundtrip_count = int (sys.argv [2])
message_size = int (sys.argv [3])
- roundtrip_count = int (sys.argv [4])
except (ValueError, OverflowError), e:
print 'message-size and roundtrip-count must be integers'
sys.exit (1)
- print "message size:", message_size, "[B]"
- print "roundtrip count:", roundtrip_count
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.REP)
+ s.bind (bind_to)
- z = libpyzmq.Zmq ()
- context = z.context (1,1);
-
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
-
- z.bind (in_socket, addr = in_interface)
- z.bind (out_socket, addr = out_interface)
-
- msg_out = z.init_msg_data (string_msg, type)
-
- start = datetime.now ()
for i in range (0, roundtrip_count):
- z.send (out_socket, msg_out, True)
- list = z.receive (in_socket, True)
- msg_in = list [1]
- assert len(msg_in) == message_size
- end = datetime.now ()
+ msg = s.recv ()
+ assert len (msg) == message_size
+ s.send (msg)
- delta = end - start
- delta_us = delta.seconds * 1000000 + delta.microseconds
- print 'Your average latency is', delta_us / roundtrip_count, ' [us]'
+ time.sleep (1)
if __name__ == "__main__":
main ()
diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py
index 9b199df..6113c82 100644
--- a/perf/python/local_thr.py
+++ b/perf/python/local_thr.py
@@ -23,46 +23,42 @@ import libpyzmq
def main ():
if len (sys.argv) != 4:
- print ('usage: py_local_thr <in_interface> <message-size> ' +
- '<message-count>')
+ print 'usage: local_thr <bind-to> <message-size> <message-count>'
sys.exit (1)
try:
+ bind_to = sys.argv [1]
message_size = int (sys.argv [2])
message_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- print "message size:", message_size, "[B]"
- print "message count:", message_count
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.P2P)
+ s.bind (bind_to)
- z = libpyzmq.Zmq ()
+ msg = s.recv ()
+ assert len (msg) == message_size
- context = z.context (1,1)
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- z.connect (in_socketaddr = sys.argv [1])
-
-
- list = z.receive (in_socket, True)
- msg = list [1]
- assert len(msg) == message_size
start = datetime.now ()
+
for i in range (1, message_count):
- list = z.receive (in_socket, True)
- msg = list [1]
- assert len(msg) == message_size
+ msg = s.recv ()
+ assert len (msg) == message_size
+
end = datetime.now()
- delta = end - start
- delta_us = delta.seconds * 1000000 + delta.microseconds
- if delta_us == 0:
- delta_us = 1
- message_thr = (1000000.0 * float (message_count)) / float (delta_us)
- megabit_thr = (message_thr * float (message_size) * 8.0) / 1000000.0;
+ elapsed = (end - start).seconds * 1000000 + (end - start).microseconds
+ if elapsed == 0:
+ elapsed = 1
+ throughput = (1000000.0 * float (message_count)) / float (elapsed)
+ megabits = float (throughput * message_size * 8) / 1000000
- print "Your average throughput is %.0f [msg/s]" % (message_thr, )
- print "Your average throughput is %.2f [Mb/s]" % (megabit_thr, )
+ print "message size: %.0f [B]" % (message_size, )
+ print "message count: %.0f" % (message_count, )
+ print "mean throughput: %.0f [msg/s]" % (throughput, )
+ print "mean throughput: %.3f [Mb/s]" % (megabits, )
if __name__ == "__main__":
main ()
diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py
index ac73595..372f567 100644
--- a/perf/python/remote_lat.py
+++ b/perf/python/remote_lat.py
@@ -20,39 +20,40 @@
import sys
from datetime import datetime
import libpyzmq
-import time
-
def main ():
- if len(sys.argv) != 5:
- print ('usage: py_remote_lat <in-interface> ' +
- '<out-interface> <message-size> <roundtrip-count>')
+ if len(sys.argv) != 4:
+ print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>'
sys.exit (1)
try:
- message_size = int (sys.argv [3])
- roundtrip_count = int (sys.argv [4])
+ connect_to = sys.argv [1]
+ message_size = int (sys.argv [2])
+ roundtrip_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- z = libpyzmq.Zmq ()
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.REQ)
+ s.connect (connect_to)
+
+ msg = ''.join ([' ' for n in range (0, message_size)])
+
+ start = datetime.now ()
- context = z.context (1,1)
-
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
-
- z.connect (in_socket, addr = in_interface)
- z.connect (out_socket, addr = out_interface)
-
for i in range (0, roundtrip_count):
- list = z.receive (in_socket, True)
- message = list [1]
- z.send (out_socket, message, True)
-
- time.sleep (2)
+ s.send (msg)
+ msg = s.recv ()
+ assert len (msg) == message_size
+
+ end = datetime.now ()
+ delta = (end - start).microseconds + 1000000 * (end - start).seconds
+ latency = delta / roundtrip_count / 2
+ print "message size: %.0f [B]" % (message_size, )
+ print "roundtrip count: %.0f" % (roundtrip_count, )
+ print "mean latency: %.3f [us]" % (latency, )
if __name__ == "__main__":
main ()
diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py
index a4f2b66..a80adfd 100644
--- a/perf/python/remote_thr.py
+++ b/perf/python/remote_thr.py
@@ -18,33 +18,32 @@
#
import sys
-from datetime import datetime
import libpyzmq
import time
def main ():
if len (sys.argv) != 4:
- print 'usage: py_remote_thr <out-interface> <message-size> <message-count>'
+ print 'usage: remote_thr <connect-to> <message-size> <message-count>'
sys.exit (1)
try:
+ connect_to = argv [1]
message_size = int (sys.argv [2])
message_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- z = libpyzmq.Zmq ()
- context = z.context (1,1);
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
- z.bind (out_socket, addr = sys.argv [1])
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.P2P)
+ s.connect (connect_to)
- msg = z.init_msg_data (string_msg, type)
+ msg = ''.join ([' ' for n in range (0, message_size)])
for i in range (0, message_count):
- z.send (out_socket, msg, True)
+ s.send (msg)
- time.sleep (2)
+ time.sleep (10)
if __name__ == "__main__":
main ()
diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp
index 019c73c..8913b8a 100644
--- a/python/pyzmq.cpp
+++ b/python/pyzmq.cpp
@@ -17,740 +17,484 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stddef.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
#include <Python.h>
-#include <zmq.hpp>
+#include "../include/zmq.h"
-struct pyZMQ
+struct context_t
{
PyObject_HEAD
-
+ void *handle;
};
-void pyZMQ_dealloc (pyZMQ *self)
-{
-
- self->ob_type->tp_free ((PyObject*) self);
-}
-
-PyObject *pyZMQ_new (PyTypeObject *type, PyObject *args, PyObject *kwdict)
+PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
- pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0);
+printf ("context_new\n");
+ context_t *self = (context_t*) type->tp_alloc (type, 0);
- return (PyObject*) self;
-}
+ if (self)
+ self->handle = NULL;
-PyObject *pyZMQ_term (PyTypeObject *type, PyObject *args, PyObject *kwdict)
-{
- pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0);
-
- static const char *kwlist [] = {"context", NULL};
- PyObject *context;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &context))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_term ((void *) context);
- assert (rc != 0);
-
return (PyObject*) self;
}
-int pyZMQ_init (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- return 0;
-}
-PyObject *pyZMQ_context (pyZMQ *self, PyObject *args, PyObject *kwdict)
+int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{
- static const char *kwlist [] = {"app_threads", "io_threads", NULL};
-
+printf ("context_init\n");
int app_threads;
int io_threads;
-
+ static const char *kwlist [] = {"app_threads", "io_threads", NULL};
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist,
- &app_threads, &io_threads))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- void *context = zmq_init (app_threads, io_threads);
- if (context == NULL) {
- assert (errno == EINVAL);
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
+ &app_threads, &io_threads)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+printf ("context_init err1\n");
+ return -1; // ?
}
-
- return (PyObject*) context;
-}
-PyObject *pyZMQ_msg_init (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- zmq_msg *msg;
+printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads);
- int rc = zmq_msg_init (msg);
-
- if (rc == -1) {
- assert (rc == ENOMEM);
- PyErr_SetString( PyExc_MemoryError, "Out of memory");
- }
-
- return (PyObject*) msg;
-}
-
-
-PyObject *pyZMQ_msg_init_size (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"size", NULL};
-
- zmq_msg *msg;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "i", (char**) kwlist,
- &size))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_init_size (msg, size);
-
- if (rc == -1) {
- assert (rc == ENOMEM);
- PyErr_SetString( PyExc_ValueError, "Out of memory");
- }
-
- return (PyObject*) msg;
-}
-
-PyObject *pyZMQ_msg_init_data (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"data", "size", "ffn", NULL};
-
- PyObject *data = PyString_FromStringAndSize (NULL, 0);
- zmq_msg *msg;
- PyObject *ffn;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "SiO", (char**) kwlist,
- &data, &size, &ffn))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_init_data (msg, data, size, NULL);
- assert (rc == 0);
-
- return (PyObject*) msg;
-}
-
-PyObject *pyZMQ_msg_close (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_close ((zmq_msg *) msg);
- assert (rc == 0);
+ assert (!self->handle);
+ self->handle = zmq_init (app_threads, io_threads);
+ if (!self->handle) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return -1; // ?
+printf ("context_init err2\n");
+ }
- return (PyObject*) self;
+printf ("context_init ok\n");
+ return 0;
}
-PyObject *pyZMQ_msg_move (pyZMQ *self, PyObject *args, PyObject *kwdict)
+void context_dealloc (context_t *self)
{
- static const char *kwlist [] = {"src", NULL};
-
- zmq_msg *dest;
- PyObject *src;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &src))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_move (dest, (zmq_msg*) src);
- assert (rc == 0);
+printf ("context_dealloc\n");
+ if (self->handle) {
+ int rc = zmq_term (self->handle);
+ if (rc != 0)
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ }
- return (PyObject*) dest;
+ self->ob_type->tp_free ((PyObject*) self);
}
-PyObject *pyZMQ_msg_copy (pyZMQ *self, PyObject *args, PyObject *kwdict)
+struct socket_t
{
- static const char *kwlist [] = {"src", NULL};
-
- PyObject *dest;
- PyObject *src;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &src))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_copy ((zmq_msg*) dest, (zmq_msg*) src);
- assert (rc == 0);
-
- return (PyObject*) dest;
-}
+ PyObject_HEAD
+ void *handle;
+};
-PyObject *pyZMQ_msg_data (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- PyObject *data = PyString_FromStringAndSize (NULL, 0);
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- data = (PyObject *) zmq_msg_data ((zmq_msg *) msg);
-
- return (PyObject*) data;
-}
+printf ("socket_new\n");
+ socket_t *self = (socket_t*) type->tp_alloc (type, 0);
-int pyZMQ_msg_size (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- size = zmq_msg_size ((zmq_msg*) msg);
-
- return size;
-}
+ if (self)
+ self->handle = NULL;
-int pyZMQ_msg_type (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- int type;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- type = zmq_msg_type ((zmq_msg*) msg);
-
- return type;
+ return (PyObject*) self;
}
-PyObject *pyZMQ_socket (pyZMQ *self, PyObject *args, PyObject *kwdict)
+int socket_init (socket_t *self, PyObject *args, PyObject *kwdict)
{
+printf ("socket_init\n");
+ context_t *context;
+ int socket_type;
static const char *kwlist [] = {"context", "type", NULL};
- void* context;
- int type;
-
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist,
- &context, &type))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
+ &context, &socket_type)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+ // TODO: Check whether 'context' is really a libpyzmq.Context object.
- void *socket = zmq_socket ((void *) context, type);
-
- if (socket == NULL) {
- assert (errno == EMFILE || errno == EINVAL);
- if (errno == EMFILE)
- PyErr_SetString (PyExc_MemoryError, "Too many threads");
- else
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
+ assert (!self->handle);
+ self->handle = zmq_socket (context->handle, socket_type);
+ if (!self->handle) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return -1; // ?
}
- return (PyObject*) socket;
+ return 0;
}
-PyObject *pyZMQ_close (pyZMQ *self, PyObject *args, PyObject *kwdict)
+void socket_dealloc (socket_t *self)
{
- static const char *kwlist [] = {"socket", NULL};
-
- PyObject* socket;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &socket))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
-
- int rc = zmq_close ((void *)socket);
- assert (rc == 0);
+printf ("socket_dealloc\n");
+ if (self->handle) {
+ int rc = zmq_close (self->handle);
+ if (rc != 0)
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ }
- return (PyObject *) self;
+ self->ob_type->tp_free ((PyObject*) self);
}
-PyObject *pyZMQ_setsockopt (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_setsockopt (socket_t *self, PyObject *args, PyObject *kwdict)
{
- static const char *kwlist [] = {"socket", "option", "optval", NULL};
- printf ("setsockopt\n");
- PyObject* socket;
int option;
PyObject* optval;
- int optvallen;
+ static const char *kwlist [] = {"option", "optval", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "iO", (char**) kwlist,
+ &option, &optval)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
int rc;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "OiO", (char**) kwlist,
- &socket, &option, &optval))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- if (PyInt_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 4);
- if (PyBool_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 1);
- if (PyFloat_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 4);
+ if (PyInt_Check (optval)) {
+ int val = PyInt_AsLong (optval);
+ rc = zmq_setsockopt (self->handle, option, &val, sizeof (int));
+ }
if (PyString_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- PyString_Size (optval));
+ rc = zmq_setsockopt (self->handle, option, PyString_AsString (optval),
+ PyString_Size (optval));
+ else {
+ rc = -1;
+ errno = EINVAL;
+ }
- assert (rc == 0);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
- return (PyObject *) self;
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_bind (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_bind (socket_t *self, PyObject *args, PyObject *kwdict)
{
- char const *addr = NULL;
- PyObject* socket;
-
- static const char *kwlist [] = {"socket", "addr", NULL};
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Os", (char**) kwlist,
- &socket, &addr))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_bind ((void*) socket, addr);
- if (rc == -1) {
- assert (errno == EINVAL || errno == EADDRINUSE);
- if (errno == EINVAL)
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
- else
- PyErr_SetString (PyExc_ValueError, "Address in use");
+ char const *addr;
+ static const char *kwlist [] = {"addr", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist,
+ &addr)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
}
- return (PyObject *) self;
+ int rc = zmq_bind (self->handle, addr);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_connect (pyZMQ *self, PyObject *args, PyObject *kw)
+
+PyObject *socket_connect (socket_t *self, PyObject *args, PyObject *kwdict)
{
- char const *addr = NULL;
- PyObject* socket;
-
- static const char* kwlist [] = {"socket", "addr", NULL};
-
- if (!PyArg_ParseTupleAndKeywords (args, kw, "Os", (char**) kwlist,
- &socket, &addr))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_connect ((void *) socket, addr);
- if (rc == -1) {
- assert (errno == EINVAL || errno == EADDRINUSE);
- if (errno == EINVAL)
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
- else
- PyErr_SetString (PyExc_ValueError, "Address in use");
+ char const *addr;
+ static const char *kwlist [] = {"addr", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist,
+ &addr)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ int rc = zmq_connect (self->handle, addr);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
}
- return (PyObject *) self;
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_flush (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_send (socket_t *self, PyObject *args, PyObject *kwdict)
{
+ PyObject *msg; /* = PyString_FromStringAndSize (NULL, 0); */
+ int flags = 0;
+ static const char *kwlist [] = {"msg", "flags", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "S|i", (char**) kwlist,
+ &msg, &flags)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
- static const char *kwlist [] = {"socket", NULL};
- PyObject *socket;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &socket))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_flush ((void*) socket);
- assert (rc == 0);
+ zmq_msg_t data;
+ int rc = zmq_msg_init_size (&data, PyString_Size (msg));
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+ memcpy (zmq_msg_data (&data), PyString_AsString (msg),
+ zmq_msg_size (&data));
+
+ rc = zmq_send (self->handle, &data, flags);
+ int rc2 = zmq_msg_close (&data);
+ assert (rc2 == 0);
+
+ if (rc != 0 && errno == EAGAIN)
+ return PyBool_FromLong (0);
+
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
- return (PyObject *) self;
+ return PyBool_FromLong (1);
}
-PyObject *pyZMQ_send (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_flush (socket_t *self, PyObject *args, PyObject *kwdict)
{
- PyObject *msg;
- PyObject *socket;
- int flags = 0;
-
- static const char *kwlist [] = {"socket", "msg", "flags", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwdict, "OOi", (char**) kwlist,
- &socket, &msg, &flags))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_send ((void*) socket, (zmq_msg*) msg, flags);
- assert (rc == 0 || (rc == -1 && errno == EAGAIN));
-
- return (PyObject *) self;
+ static const char *kwlist [] = {NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "", (char**) kwlist)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ int rc = zmq_flush (self->handle);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_receive (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_recv (socket_t *self, PyObject *args, PyObject *kwdict)
{
- zmq_msg *msg;
- zmq_msg_init (msg);
- PyObject *socket;
-
int flags = 0;
- static const char *kwlist [] = {"socket", "flags", NULL};
+ static const char *kwlist [] = {"flags", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "|i", (char**) kwlist,
+ &flags)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist,
- &socket, &flags))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_recv (socket, msg, flags);
- assert (rc == 0 || (rc == -1 && errno == EAGAIN));
-
- PyObject *py_message = PyString_FromStringAndSize (NULL, 0);
- py_message = (PyObject *) zmq_msg_data (msg);
- int py_message_size = zmq_msg_size (msg);
- int py_message_type = zmq_msg_type (msg);
-
- zmq_msg_close (msg);
-
- return Py_BuildValue ("isi", rc, py_message,
- py_message_size, py_message_type);
+ zmq_msg_t msg;
+ int rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+
+ rc = zmq_recv (self->handle, &msg, flags);
+
+ if (rc != 0 && errno == EAGAIN) {
+ Py_INCREF (Py_None);
+ return Py_None;
+ }
+
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ PyObject *result = PyString_FromStringAndSize ((char*) zmq_msg_data (&msg),
+ zmq_msg_size (&msg));
+ rc = zmq_msg_close (&msg);
+ assert (rc == 0);
+ return result;
}
-static PyMethodDef pyZMQ_methods [] =
+static PyMethodDef context_methods [] =
{
{
- "context",
- (PyCFunction) pyZMQ_context,
- METH_VARARGS | METH_KEYWORDS,
- "context (app_threads, io_threads) -> None\n\n"
- "Creates new context\n\n"
- "app_threads is the number of application threads.\n\n"
- "io_threads is the number of io threads.\n\n"
-
- },
- {
- "msg_init",
- (PyCFunction) pyZMQ_msg_init,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init () -> None\n\n"
- "Creates new message\n\n"
-
- },
- {
- "msg_init_size",
- (PyCFunction) pyZMQ_msg_init_size,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init_size (size) -> None\n\n"
- "Creates new message of a specified size.\n\n"
- "size if integer specifying the size of the message to be created.\n\n"
-
- },
- {
- "msg_init_data",
- (PyCFunction) pyZMQ_msg_init_data,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init_data (data, size, ffn) -> None\n\n"
- "Initialises new message with data\n\n"
- "data is pointer to the data of the message\n\n"
- "size is integer specifying size of data\n\n"
- "ffn is function to free alocated data\n\n"
-
- },
-
- {
- "msg_close",
- (PyCFunction) pyZMQ_msg_close,
- METH_VARARGS | METH_KEYWORDS,
- "msg_close (msg) -> None\n\n"
- "Deletes the message.\n\n"
- "msg is the message the be freed\n\n"
-
- },
- {
- "msg_move",
- (PyCFunction) pyZMQ_msg_move,
- METH_VARARGS | METH_KEYWORDS,
- "msg_move (src) -> dest\n\n"
- "Move the content of the message from 'src' to 'dest'.\n\n"
- "The content isn't copied, just moved. 'src' is an empty\n\n"
- "message after the call. Original content of 'dest' message\n\n"
- "is deallocated.\n\n"
-
- },
- {
- "msg_copy",
- (PyCFunction) pyZMQ_msg_copy,
- METH_VARARGS | METH_KEYWORDS,
- "msg_copy (src) -> dest\n\n"
- "Copy the 'src' message to 'dest'. The content isn't copied, \n\n"
- "instead reference count is increased. Don't modify the message \n\n"
- "data after the call as they are shared between two messages.\n\n"