summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h6
-rw-r--r--perf/c/local_thr.c6
-rw-r--r--perf/cpp/local_thr.cpp5
-rw-r--r--perf/python/local_lat.py42
-rw-r--r--perf/python/local_thr.py44
-rw-r--r--perf/python/remote_lat.py43
-rw-r--r--perf/python/remote_thr.py17
-rw-r--r--python/pyzmq.cpp924
-rw-r--r--src/i_endpoint.hpp2
-rw-r--r--src/session.cpp29
-rw-r--r--src/session.hpp13
-rw-r--r--src/socket_base.cpp30
-rw-r--r--src/socket_base.hpp2
13 files changed, 452 insertions, 711 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 2b6f996..f321fa9 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied,
-// instead 0SOCKETS infrastructure take ownership of the buffer and call
+// instead 0MQ infrastructure take ownership of the buffer and call
// deallocation functio (ffn) once it's not needed anymore.
ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn);
@@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg);
// Returns type of the message.
ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
-// Initialise 0SOCKETS context. 'app_threads' specifies maximal number
+// Initialise 0MQ context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time.
// 'io_threads' specifies the size of thread pool to handle I/O operations.
//
@@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
// threads declared at all.
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads);
-// Deinitialise 0SOCKETS context including all the open sockets. Closing
+// Deinitialise 0MQ context including all the open sockets. Closing
// sockets after zmq_term has been called will result in undefined behaviour.
ZMQ_EXPORT int zmq_term (void *context);
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index 87c3220..b58fac3 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -38,6 +38,7 @@ int main (int argc, char *argv [])
struct timeval end;
uint64_t elapsed;
uint64_t throughput;
+ double megabits;
if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-count> "
@@ -81,12 +82,15 @@ int main (int argc, char *argv [])
elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
-
+ if (elapsed == 0)
+ elapsed = 1;
throughput = (uint64_t) message_count * 1000000 / elapsed;
+ megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
+ printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0;
}
diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp
index fdcbc8d..e328117 100644
--- a/perf/cpp/local_thr.cpp
+++ b/perf/cpp/local_thr.cpp
@@ -63,12 +63,15 @@ int main (int argc, char *argv [])
uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
-
+ if (elapsed == 0)
+ elapsed = 1;
uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed;
+ double megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
+ printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0;
}
diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py
index 9883dc0..7f1503f 100644
--- a/perf/python/local_lat.py
+++ b/perf/python/local_lat.py
@@ -18,50 +18,32 @@
#
import sys
-from datetime import datetime
-import libpyzmq
import time
-
+import libpyzmq
def main ():
- if len (sys.argv) != 5:
- print 'usage: py_local_lat <in-interface> <out-interface> <message-size> <roundtrip-count>'
+ if len (sys.argv) != 4:
+ print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>'
sys.exit (1)
try:
- in_interface = sys.argv [1]
- out_interface = sys.argv [2]
+ bind_to = sys.argv [1]
+ roundtrip_count = int (sys.argv [2])
message_size = int (sys.argv [3])
- roundtrip_count = int (sys.argv [4])
except (ValueError, OverflowError), e:
print 'message-size and roundtrip-count must be integers'
sys.exit (1)
- print "message size:", message_size, "[B]"
- print "roundtrip count:", roundtrip_count
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.REP)
+ s.bind (bind_to)
- z = libpyzmq.Zmq ()
- context = z.context (1,1);
-
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
-
- z.bind (in_socket, addr = in_interface)
- z.bind (out_socket, addr = out_interface)
-
- msg_out = z.init_msg_data (string_msg, type)
-
- start = datetime.now ()
for i in range (0, roundtrip_count):
- z.send (out_socket, msg_out, True)
- list = z.receive (in_socket, True)
- msg_in = list [1]
- assert len(msg_in) == message_size
- end = datetime.now ()
+ msg = s.recv ()
+ assert len (msg) == message_size
+ s.send (msg)
- delta = end - start
- delta_us = delta.seconds * 1000000 + delta.microseconds
- print 'Your average latency is', delta_us / roundtrip_count, ' [us]'
+ time.sleep (1)
if __name__ == "__main__":
main ()
diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py
index 9b199df..6113c82 100644
--- a/perf/python/local_thr.py
+++ b/perf/python/local_thr.py
@@ -23,46 +23,42 @@ import libpyzmq
def main ():
if len (sys.argv) != 4:
- print ('usage: py_local_thr <in_interface> <message-size> ' +
- '<message-count>')
+ print 'usage: local_thr <bind-to> <message-size> <message-count>'
sys.exit (1)
try:
+ bind_to = sys.argv [1]
message_size = int (sys.argv [2])
message_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- print "message size:", message_size, "[B]"
- print "message count:", message_count
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.P2P)
+ s.bind (bind_to)
- z = libpyzmq.Zmq ()
+ msg = s.recv ()
+ assert len (msg) == message_size
- context = z.context (1,1)
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- z.connect (in_socketaddr = sys.argv [1])
-
-
- list = z.receive (in_socket, True)
- msg = list [1]
- assert len(msg) == message_size
start = datetime.now ()
+
for i in range (1, message_count):
- list = z.receive (in_socket, True)
- msg = list [1]
- assert len(msg) == message_size
+ msg = s.recv ()
+ assert len (msg) == message_size
+
end = datetime.now()
- delta = end - start
- delta_us = delta.seconds * 1000000 + delta.microseconds
- if delta_us == 0:
- delta_us = 1
- message_thr = (1000000.0 * float (message_count)) / float (delta_us)
- megabit_thr = (message_thr * float (message_size) * 8.0) / 1000000.0;
+ elapsed = (end - start).seconds * 1000000 + (end - start).microseconds
+ if elapsed == 0:
+ elapsed = 1
+ throughput = (1000000.0 * float (message_count)) / float (elapsed)
+ megabits = float (throughput * message_size * 8) / 1000000
- print "Your average throughput is %.0f [msg/s]" % (message_thr, )
- print "Your average throughput is %.2f [Mb/s]" % (megabit_thr, )
+ print "message size: %.0f [B]" % (message_size, )
+ print "message count: %.0f" % (message_count, )
+ print "mean throughput: %.0f [msg/s]" % (throughput, )
+ print "mean throughput: %.3f [Mb/s]" % (megabits, )
if __name__ == "__main__":
main ()
diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py
index ac73595..372f567 100644
--- a/perf/python/remote_lat.py
+++ b/perf/python/remote_lat.py
@@ -20,39 +20,40 @@
import sys
from datetime import datetime
import libpyzmq
-import time
-
def main ():
- if len(sys.argv) != 5:
- print ('usage: py_remote_lat <in-interface> ' +
- '<out-interface> <message-size> <roundtrip-count>')
+ if len(sys.argv) != 4:
+ print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>'
sys.exit (1)
try:
- message_size = int (sys.argv [3])
- roundtrip_count = int (sys.argv [4])
+ connect_to = sys.argv [1]
+ message_size = int (sys.argv [2])
+ roundtrip_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- z = libpyzmq.Zmq ()
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.REQ)
+ s.connect (connect_to)
+
+ msg = ''.join ([' ' for n in range (0, message_size)])
+
+ start = datetime.now ()
- context = z.context (1,1)
-
- in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
-
- z.connect (in_socket, addr = in_interface)
- z.connect (out_socket, addr = out_interface)
-
for i in range (0, roundtrip_count):
- list = z.receive (in_socket, True)
- message = list [1]
- z.send (out_socket, message, True)
-
- time.sleep (2)
+ s.send (msg)
+ msg = s.recv ()
+ assert len (msg) == message_size
+
+ end = datetime.now ()
+ delta = (end - start).microseconds + 1000000 * (end - start).seconds
+ latency = delta / roundtrip_count / 2
+ print "message size: %.0f [B]" % (message_size, )
+ print "roundtrip count: %.0f" % (roundtrip_count, )
+ print "mean latency: %.3f [us]" % (latency, )
if __name__ == "__main__":
main ()
diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py
index a4f2b66..a80adfd 100644
--- a/perf/python/remote_thr.py
+++ b/perf/python/remote_thr.py
@@ -18,33 +18,32 @@
#
import sys
-from datetime import datetime
import libpyzmq
import time
def main ():
if len (sys.argv) != 4:
- print 'usage: py_remote_thr <out-interface> <message-size> <message-count>'
+ print 'usage: remote_thr <connect-to> <message-size> <message-count>'
sys.exit (1)
try:
+ connect_to = argv [1]
message_size = int (sys.argv [2])
message_count = int (sys.argv [3])
except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers'
sys.exit (1)
- z = libpyzmq.Zmq ()
- context = z.context (1,1);
- out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
- z.bind (out_socket, addr = sys.argv [1])
+ ctx = libpyzmq.Context (1, 1);
+ s = libpyzmq.Socket (ctx, libpyzmq.P2P)
+ s.connect (connect_to)
- msg = z.init_msg_data (string_msg, type)
+ msg = ''.join ([' ' for n in range (0, message_size)])
for i in range (0, message_count):
- z.send (out_socket, msg, True)
+ s.send (msg)
- time.sleep (2)
+ time.sleep (10)
if __name__ == "__main__":
main ()
diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp
index 019c73c..8913b8a 100644
--- a/python/pyzmq.cpp
+++ b/python/pyzmq.cpp
@@ -17,740 +17,484 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stddef.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
#include <Python.h>
-#include <zmq.hpp>
+#include "../include/zmq.h"
-struct pyZMQ
+struct context_t
{
PyObject_HEAD
-
+ void *handle;
};
-void pyZMQ_dealloc (pyZMQ *self)
-{
-
- self->ob_type->tp_free ((PyObject*) self);
-}
-
-PyObject *pyZMQ_new (PyTypeObject *type, PyObject *args, PyObject *kwdict)
+PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
- pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0);
+printf ("context_new\n");
+ context_t *self = (context_t*) type->tp_alloc (type, 0);
- return (PyObject*) self;
-}
+ if (self)
+ self->handle = NULL;
-PyObject *pyZMQ_term (PyTypeObject *type, PyObject *args, PyObject *kwdict)
-{
- pyZMQ *self = (pyZMQ*) type->tp_alloc (type, 0);
-
- static const char *kwlist [] = {"context", NULL};
- PyObject *context;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &context))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_term ((void *) context);
- assert (rc != 0);
-
return (PyObject*) self;
}
-int pyZMQ_init (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- return 0;
-}
-PyObject *pyZMQ_context (pyZMQ *self, PyObject *args, PyObject *kwdict)
+int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{
- static const char *kwlist [] = {"app_threads", "io_threads", NULL};
-
+printf ("context_init\n");
int app_threads;
int io_threads;
-
+ static const char *kwlist [] = {"app_threads", "io_threads", NULL};
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist,
- &app_threads, &io_threads))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- void *context = zmq_init (app_threads, io_threads);
- if (context == NULL) {
- assert (errno == EINVAL);
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
+ &app_threads, &io_threads)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+printf ("context_init err1\n");
+ return -1; // ?
}
-
- return (PyObject*) context;
-}
-PyObject *pyZMQ_msg_init (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- zmq_msg *msg;
+printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads);
- int rc = zmq_msg_init (msg);
-
- if (rc == -1) {
- assert (rc == ENOMEM);
- PyErr_SetString( PyExc_MemoryError, "Out of memory");
- }
-
- return (PyObject*) msg;
-}
-
-
-PyObject *pyZMQ_msg_init_size (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"size", NULL};
-
- zmq_msg *msg;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "i", (char**) kwlist,
- &size))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_init_size (msg, size);
-
- if (rc == -1) {
- assert (rc == ENOMEM);
- PyErr_SetString( PyExc_ValueError, "Out of memory");
- }
-
- return (PyObject*) msg;
-}
-
-PyObject *pyZMQ_msg_init_data (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"data", "size", "ffn", NULL};
-
- PyObject *data = PyString_FromStringAndSize (NULL, 0);
- zmq_msg *msg;
- PyObject *ffn;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "SiO", (char**) kwlist,
- &data, &size, &ffn))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_init_data (msg, data, size, NULL);
- assert (rc == 0);
-
- return (PyObject*) msg;
-}
-
-PyObject *pyZMQ_msg_close (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_close ((zmq_msg *) msg);
- assert (rc == 0);
+ assert (!self->handle);
+ self->handle = zmq_init (app_threads, io_threads);
+ if (!self->handle) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return -1; // ?
+printf ("context_init err2\n");
+ }
- return (PyObject*) self;
+printf ("context_init ok\n");
+ return 0;
}
-PyObject *pyZMQ_msg_move (pyZMQ *self, PyObject *args, PyObject *kwdict)
+void context_dealloc (context_t *self)
{
- static const char *kwlist [] = {"src", NULL};
-
- zmq_msg *dest;
- PyObject *src;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &src))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_move (dest, (zmq_msg*) src);
- assert (rc == 0);
+printf ("context_dealloc\n");
+ if (self->handle) {
+ int rc = zmq_term (self->handle);
+ if (rc != 0)
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ }
- return (PyObject*) dest;
+ self->ob_type->tp_free ((PyObject*) self);
}
-PyObject *pyZMQ_msg_copy (pyZMQ *self, PyObject *args, PyObject *kwdict)
+struct socket_t
{
- static const char *kwlist [] = {"src", NULL};
-
- PyObject *dest;
- PyObject *src;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &src))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_msg_copy ((zmq_msg*) dest, (zmq_msg*) src);
- assert (rc == 0);
-
- return (PyObject*) dest;
-}
+ PyObject_HEAD
+ void *handle;
+};
-PyObject *pyZMQ_msg_data (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- PyObject *data = PyString_FromStringAndSize (NULL, 0);
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- data = (PyObject *) zmq_msg_data ((zmq_msg *) msg);
-
- return (PyObject*) data;
-}
+printf ("socket_new\n");
+ socket_t *self = (socket_t*) type->tp_alloc (type, 0);
-int pyZMQ_msg_size (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- int size;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- size = zmq_msg_size ((zmq_msg*) msg);
-
- return size;
-}
+ if (self)
+ self->handle = NULL;
-int pyZMQ_msg_type (pyZMQ *self, PyObject *args, PyObject *kwdict)
-{
- static const char *kwlist [] = {"msg", NULL};
-
- PyObject *msg;
- int type;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &msg))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- type = zmq_msg_type ((zmq_msg*) msg);
-
- return type;
+ return (PyObject*) self;
}
-PyObject *pyZMQ_socket (pyZMQ *self, PyObject *args, PyObject *kwdict)
+int socket_init (socket_t *self, PyObject *args, PyObject *kwdict)
{
+printf ("socket_init\n");
+ context_t *context;
+ int socket_type;
static const char *kwlist [] = {"context", "type", NULL};
- void* context;
- int type;
-
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist,
- &context, &type))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
+ &context, &socket_type)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+ // TODO: Check whether 'context' is really a libpyzmq.Context object.
- void *socket = zmq_socket ((void *) context, type);
-
- if (socket == NULL) {
- assert (errno == EMFILE || errno == EINVAL);
- if (errno == EMFILE)
- PyErr_SetString (PyExc_MemoryError, "Too many threads");
- else
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
+ assert (!self->handle);
+ self->handle = zmq_socket (context->handle, socket_type);
+ if (!self->handle) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return -1; // ?
}
- return (PyObject*) socket;
+ return 0;
}
-PyObject *pyZMQ_close (pyZMQ *self, PyObject *args, PyObject *kwdict)
+void socket_dealloc (socket_t *self)
{
- static const char *kwlist [] = {"socket", NULL};
-
- PyObject* socket;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &socket))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
-
- int rc = zmq_close ((void *)socket);
- assert (rc == 0);
+printf ("socket_dealloc\n");
+ if (self->handle) {
+ int rc = zmq_close (self->handle);
+ if (rc != 0)
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ }
- return (PyObject *) self;
+ self->ob_type->tp_free ((PyObject*) self);
}
-PyObject *pyZMQ_setsockopt (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_setsockopt (socket_t *self, PyObject *args, PyObject *kwdict)
{
- static const char *kwlist [] = {"socket", "option", "optval", NULL};
- printf ("setsockopt\n");
- PyObject* socket;
int option;
PyObject* optval;
- int optvallen;
+ static const char *kwlist [] = {"option", "optval", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "iO", (char**) kwlist,
+ &option, &optval)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
int rc;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "OiO", (char**) kwlist,
- &socket, &option, &optval))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- if (PyInt_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 4);
- if (PyBool_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 1);
- if (PyFloat_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- 4);
+ if (PyInt_Check (optval)) {
+ int val = PyInt_AsLong (optval);
+ rc = zmq_setsockopt (self->handle, option, &val, sizeof (int));
+ }
if (PyString_Check (optval))
- rc = zmq_setsockopt ((void *) socket, option, (void *) optval,
- PyString_Size (optval));
+ rc = zmq_setsockopt (self->handle, option, PyString_AsString (optval),
+ PyString_Size (optval));
+ else {
+ rc = -1;
+ errno = EINVAL;
+ }
- assert (rc == 0);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
- return (PyObject *) self;
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_bind (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_bind (socket_t *self, PyObject *args, PyObject *kwdict)
{
- char const *addr = NULL;
- PyObject* socket;
-
- static const char *kwlist [] = {"socket", "addr", NULL};
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Os", (char**) kwlist,
- &socket, &addr))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_bind ((void*) socket, addr);
- if (rc == -1) {
- assert (errno == EINVAL || errno == EADDRINUSE);
- if (errno == EINVAL)
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
- else
- PyErr_SetString (PyExc_ValueError, "Address in use");
+ char const *addr;
+ static const char *kwlist [] = {"addr", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist,
+ &addr)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
}
- return (PyObject *) self;
+ int rc = zmq_bind (self->handle, addr);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_connect (pyZMQ *self, PyObject *args, PyObject *kw)
+
+PyObject *socket_connect (socket_t *self, PyObject *args, PyObject *kwdict)
{
- char const *addr = NULL;
- PyObject* socket;
-
- static const char* kwlist [] = {"socket", "addr", NULL};
-
- if (!PyArg_ParseTupleAndKeywords (args, kw, "Os", (char**) kwlist,
- &socket, &addr))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_connect ((void *) socket, addr);
- if (rc == -1) {
- assert (errno == EINVAL || errno == EADDRINUSE);
- if (errno == EINVAL)
- PyErr_SetString (PyExc_ValueError, "Invalid argument");
- else
- PyErr_SetString (PyExc_ValueError, "Address in use");
+ char const *addr;
+ static const char *kwlist [] = {"addr", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "s", (char**) kwlist,
+ &addr)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ int rc = zmq_connect (self->handle, addr);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
}
- return (PyObject *) self;
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_flush (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_send (socket_t *self, PyObject *args, PyObject *kwdict)
{
+ PyObject *msg; /* = PyString_FromStringAndSize (NULL, 0); */
+ int flags = 0;
+ static const char *kwlist [] = {"msg", "flags", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "S|i", (char**) kwlist,
+ &msg, &flags)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
- static const char *kwlist [] = {"socket", NULL};
- PyObject *socket;
-
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "O", (char**) kwlist,
- &socket))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_flush ((void*) socket);
- assert (rc == 0);
+ zmq_msg_t data;
+ int rc = zmq_msg_init_size (&data, PyString_Size (msg));
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+ memcpy (zmq_msg_data (&data), PyString_AsString (msg),
+ zmq_msg_size (&data));
+
+ rc = zmq_send (self->handle, &data, flags);
+ int rc2 = zmq_msg_close (&data);
+ assert (rc2 == 0);
+
+ if (rc != 0 && errno == EAGAIN)
+ return PyBool_FromLong (0);
+
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
- return (PyObject *) self;
+ return PyBool_FromLong (1);
}
-PyObject *pyZMQ_send (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_flush (socket_t *self, PyObject *args, PyObject *kwdict)
{
- PyObject *msg;
- PyObject *socket;
- int flags = 0;
-
- static const char *kwlist [] = {"socket", "msg", "flags", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwdict, "OOi", (char**) kwlist,
- &socket, &msg, &flags))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_send ((void*) socket, (zmq_msg*) msg, flags);
- assert (rc == 0 || (rc == -1 && errno == EAGAIN));
-
- return (PyObject *) self;
+ static const char *kwlist [] = {NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "", (char**) kwlist)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ int rc = zmq_flush (self->handle);
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, strerror (errno));
+ return NULL;
+ }
+
+ Py_INCREF (Py_None);
+ return Py_None;
}
-PyObject *pyZMQ_receive (pyZMQ *self, PyObject *args, PyObject *kwdict)
+PyObject *socket_recv (socket_t *self, PyObject *args, PyObject *kwdict)
{
- zmq_msg *msg;
- zmq_msg_init (msg);
- PyObject *socket;
-
int flags = 0;
- static const char *kwlist [] = {"socket", "flags", NULL};
+ static const char *kwlist [] = {"flags", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "|i", (char**) kwlist,
+ &flags)) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "Oi", (char**) kwlist,
- &socket, &flags))
- PyErr_SetString (PyExc_SystemError,
- "PyArg_ParseTupleAndKeywords error");
-
- int rc = zmq_recv (socket, msg, flags);
- assert (rc == 0 || (rc == -1 && errno == EAGAIN));
-
- PyObject *py_message = PyString_FromStringAndSize (NULL, 0);
- py_message = (PyObject *) zmq_msg_data (msg);
- int py_message_size = zmq_msg_size (msg);
- int py_message_type = zmq_msg_type (msg);
-
- zmq_msg_close (msg);
-
- return Py_BuildValue ("isi", rc, py_message,
- py_message_size, py_message_type);
+ zmq_msg_t msg;
+ int rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+
+ rc = zmq_recv (self->handle, &msg, flags);
+
+ if (rc != 0 && errno == EAGAIN) {
+ Py_INCREF (Py_None);
+ return Py_None;
+ }
+
+ if (rc != 0) {
+ PyErr_SetString (PyExc_SystemError, "invalid arguments");
+ return NULL;
+ }
+
+ PyObject *result = PyString_FromStringAndSize ((char*) zmq_msg_data (&msg),
+ zmq_msg_size (&msg));
+ rc = zmq_msg_close (&msg);
+ assert (rc == 0);
+ return result;
}
-static PyMethodDef pyZMQ_methods [] =
+static PyMethodDef context_methods [] =
{
{
- "context",
- (PyCFunction) pyZMQ_context,
- METH_VARARGS | METH_KEYWORDS,
- "context (app_threads, io_threads) -> None\n\n"
- "Creates new context\n\n"
- "app_threads is the number of application threads.\n\n"
- "io_threads is the number of io threads.\n\n"
-
- },
- {
- "msg_init",
- (PyCFunction) pyZMQ_msg_init,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init () -> None\n\n"
- "Creates new message\n\n"
-
- },
- {
- "msg_init_size",
- (PyCFunction) pyZMQ_msg_init_size,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init_size (size) -> None\n\n"
- "Creates new message of a specified size.\n\n"
- "size if integer specifying the size of the message to be created.\n\n"
-
- },
- {
- "msg_init_data",
- (PyCFunction) pyZMQ_msg_init_data,
- METH_VARARGS | METH_KEYWORDS,
- "msg_init_data (data, size, ffn) -> None\n\n"
- "Initialises new message with data\n\n"
- "data is pointer to the data of the message\n\n"
- "size is integer specifying size of data\n\n"
- "ffn is function to free alocated data\n\n"
-
- },
-
- {
- "msg_close",
- (PyCFunction) pyZMQ_msg_close,
- METH_VARARGS | METH_KEYWORDS,
- "msg_close (msg) -> None\n\n"
- "Deletes the message.\n\n"
- "msg is the message the be freed\n\n"
-
- },
- {
- "msg_move",
- (PyCFunction) pyZMQ_msg_move,
- METH_VARARGS | METH_KEYWORDS,
- "msg_move (src) -> dest\n\n"
- "Move the content of the message from 'src' to 'dest'.\n\n"
- "The content isn't copied, just moved. 'src' is an empty\n\n"
- "message after the call. Original content of 'dest' message\n\n"
- "is deallocated.\n\n"
-
- },
- {
- "msg_copy",
- (PyCFunction) pyZMQ_msg_copy,
- METH_VARARGS | METH_KEYWORDS,
- "msg_copy (src) -> dest\n\n"
- "Copy the 'src' message to 'dest'. The content isn't copied, \n\n"
- "instead reference count is increased. Don't modify the message \n\n"
- "data after the call as they are shared between two messages.\n\n"
- "Original content of 'dest' message is deallocated.\n\n"
-
- },
- {
- "msg_data",
- (PyCFunction) pyZMQ_msg_data,
- METH_VARARGS | METH_KEYWORDS,
- "msg_data (msg) -> data\n\n"
- "Returns pointer to message data.\n\n"
-
- },
- {
- "msg_size",
- (PyCFunction) pyZMQ_msg_size,
- METH_VARARGS | METH_KEYWORDS,
- "msg_size (msg) -> size\n\n"
- "Returns size of a message.\n\n"
-
- },
- {
- "msg_type",
- (PyCFunction) pyZMQ_msg_type,
- METH_VARARGS | METH_KEYWORDS,
- "msg_type (msg) -> type\n\n"
- "Returns type of a message.\n\n"
-
- },
- {
- "term",
- (PyCFunction) pyZMQ_term,
- METH_VARARGS | METH_KEYWORDS,
- "term (context) -> None\n\n"
- "Deinitialise 0SOCKETS context including all the open sockets.\n\n"
- "Closing sockets after zmq_term has been called will result in\n\n"
- "undefined behaviour.\n\n"
-
- },
- {
- "close",
- (PyCFunction) pyZMQ_close,
- METH_VARARGS | METH_KEYWORDS,
- "close (socket) -> None\n\n"
- "Close the socket.\n\n"
-
- },
-
- {
- "socket",
- (PyCFunction) pyZMQ_socket,
- METH_VARARGS | METH_KEYWORDS,
- "socket (context, type) -> None\n\n"
- "Creates new socket.\n\n"
- "'context' is a context_t object.\n"
- "'type' is one of 'ZMQ_NOBLOCK', 'ZMQ_NOFLUSH', 'ZMQ_P2P', 'ZMQ_PUB', "
- " 'ZMQ_SUB', 'ZMQ_REQ ZMQ_REP'\n"
- },
+ NULL
+ }
+};
+
+static PyTypeObject context_type =
+{
+ PyObject_HEAD_INIT (NULL)
+ 0,
+ "libpyzmq.Context", /* tp_name */
+ sizeof (context_t), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor) context_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT, /* tp_flags */
+ "", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ context_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc) context_init, /* tp_init */
+ 0, /* tp_alloc */
+ context_new, /* tp_new */
+};
+
+static PyMethodDef socket_methods [] =
+{
{
"setsockopt",
- (PyCFunction) pyZMQ_setsockopt,
+ (PyCFunction) socket_setsockopt,
METH_VARARGS | METH_KEYWORDS,
- "setsockopt (socket, option, value) -> None\n\n"
- "Set socket options."
- "Possible options are: 'ZMQ_HWM', 'ZMQ_LWM', 'ZMQ_SWAP', 'ZMQ_MASK', "
- "'ZMQ_AFFINITY', 'ZMQ_IDENTITY'."
+ "setsockopt (option, optval) -> None\n\n"
},
{
"bind",
- (PyCFunction) pyZMQ_bind,
+ (PyCFunction) socket_bind,
METH_VARARGS | METH_KEYWORDS,
"bind (addr) -> None\n\n"
- "Bind socket to specified address."
},
{
"connect",
- (PyCFunction) pyZMQ_connect,
+ (PyCFunction) socket_connect,
METH_VARARGS | METH_KEYWORDS,
"connect (addr) -> None\n\n"
- "connect socket to specified address."
},
{
- "flush",
- (PyCFunction) pyZMQ_flush,
+ "send",
+ (PyCFunction) socket_send,
METH_VARARGS | METH_KEYWORDS,
- "flush (addr) -> None\n\n"
- "flush "
+ "send (msg, [flags]) -> Bool\n\n"
},
{
- "send",
- (PyCFunction) pyZMQ_send,
+ "flush",
+ (PyCFunction) socket_flush,
METH_VARARGS | METH_KEYWORDS,
- "send (message, flags) -> sent\n\n"
- "Send a message to within the socket, "
- "returns integer specifying if the message was sent.\n"
- "'message' is message to be sent.\n"
- "'flags' is integer specifying send options.\n"
+ "flush () -> None\n\n"
},
{
- "receive",
- (PyCFunction) pyZMQ_receive,
+ "recv",
+ (PyCFunction) socket_recv,
METH_VARARGS | METH_KEYWORDS,
- "receive (flags) -> (received, message, type)\n\n"
- "Receive a message."
- "'flags' is integer specifying receive options.\n"
- "'message' is string storing the message received.\n"
- "'type' is type of the message received.\n"
-
+ "recv ([flags]) -> String\n\n"
},
{
NULL
}
};
-static const char* pyZMQ_ZMQ_doc =
- "0MQ messaging session\n\n"
- "Available functions:\n"
- " context\n"
- " socket\n"
- " setsockopt\n"
- " bind\n"
- " send\n"
- " flush\n"
- " receive\n\n";
-
-static PyTypeObject pyZMQType =
+static PyTypeObject socket_type =
{
PyObject_HEAD_INIT (NULL)
0,
- "libpyzmq.Zmq", /* tp_name (This will appear in the default
- textual representation of our objects and
- in some error messages)*/
- sizeof (pyZMQ), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor) pyZMQ_dealloc,/* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT, /* tp_flags */
- (char*) pyZMQ_ZMQ_doc, /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- pyZMQ_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc) pyZMQ_init, /* tp_init */
- 0, /* tp_alloc */
- pyZMQ_new, /* tp_new */
+ "libpyzmq.Socket" , /* tp_name */
+ sizeof (socket_t), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor) socket_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT, /* tp_flags */
+ "", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ socket_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc) socket_init, /* tp_init */
+ 0, /* tp_alloc */
+ socket_new, /* tp_new */
};
-static PyMethodDef module_methods[] =
-{
- { NULL, NULL, 0, NULL }
-};
+static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }};
+
+static const char* libpyzmq_doc =
+ "Python API for 0MQ lightweight messaging kernel.\n"
+ "For more information see http://www.zeromq.org.\n"
+ "0MQ is distributed under GNU Lesser General Public License v3.\n";
#ifndef PyMODINIT_FUNC
#define PyMODINIT_FUNC void
#endif
-static const char* pyZMQ_doc =
- "0MQ Python Module\n\n"
- "Constructor:\n"
- " z = libpyzmq.Zmq ()\n"
- "Available functions:\n"
- " context\n"
- " socket\n"
- " setsockopt\n"
- " bind\n"
- " send\n"
- " flush\n"
- " receive\n"
- "\n"
- "For more information see http://www.zeromq.org.\n"
- "\n"
- "0MQ is distributed under GNU Lesser General Public License v3\n";
-
-PyMODINIT_FUNC initlibpyzmq (void)
+PyMODINIT_FUNC initlibpyzmq ()
{
- if (PyType_Ready (&pyZMQType) < 0)
+ if (PyType_Ready (&context_type) < 0 && PyType_Ready (&socket_type) < 0)
return;
- PyObject *m = Py_InitModule3 ("libpyzmq", module_methods,
- (char*) pyZMQ_doc);
- if (!m)
+ PyObject *module = Py_InitModule3 ("libpyzmq", module_methods,
+ libpyzmq_doc);
+ if (!module)
return;
- Py_INCREF (&pyZMQType);
+ Py_INCREF (&context_type);
+ Py_INCREF (&socket_type);
+ PyModule_AddObject (module, "Context", (PyObject*) &context_type);
+ PyModule_AddObject (module, "Socket", (PyObject*) &socket_type);
- PyModule_AddObject (m, "Zmq", (PyObject*) &pyZMQType);
-
- PyObject *d = PyModule_GetDict (m);
-
-
- PyObject *t = PyInt_FromLong (ZMQ_NOBLOCK);
- PyDict_SetItemString (d, "ZMQ_NOBLOCK", t);
+ PyObject *dict = PyModule_GetDict (module);
+ assert (dict);
+ PyObject *t;
+ t = PyInt_FromLong (ZMQ_NOBLOCK);
+ PyDict_SetItemString (dict, "NOBLOCK", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_NOFLUSH);
- PyDict_SetItemString (d, "ZMQ_NOFLUSH", t);
+ PyDict_SetItemString (dict, "NOFLUSH", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_P2P);
- PyDict_SetItemString (d, "ZMQ_P2P", t);
+ PyDict_SetItemString (dict, "P2P", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_PUB);
- PyDict_SetItemString (d, "ZMQ_PUB", t);
+ PyDict_SetItemString (dict, "PUB", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_SUB);
- PyDict_SetItemString (d, "ZMQ_SUB", t);
+ PyDict_SetItemString (dict, "SUB", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_REQ);
- PyDict_SetItemString (d, "ZMQ_REQ", t);
+ PyDict_SetItemString (dict, "REQ", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_REP);
- PyDict_SetItemString (d, "ZMQ_REP", t);
+ PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t);
-
t = PyInt_FromLong (ZMQ_HWM);
- PyDict_SetItemString (d, "ZMQ_HWM", t);
+ PyDict_SetItemString (dict, "HWM", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_LWM);
- PyDict_SetItemString (d, "ZMQ_LWM", t);
- Py_DECREF (t);
+ PyDict_SetItemString (dict, "LWM", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_SWAP);
- PyDict_SetItemString (d, "ZMQ_SWAP", t);
+ PyDict_SetItemString (dict, "SWAP", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_MASK);
- PyDict_SetItemString (d, "ZMQ_MASK", t);
- Py_DECREF (t);
+ PyDict_SetItemString (dict, "MASK", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_AFFINITY);
- PyDict_SetItemString (d, "ZMQ_AFFINITY", t);
+ PyDict_SetItemString (dict, "AFFINITY", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_IDENTITY);
- PyDict_SetItemString (d, "ZMQ_IDENTITY", t);
- Py_DECREF (t);
-
-
+ PyDict_SetItemString (dict, "IDENTITY", t);
+ Py_DECREF (t);
}
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index 14a479e..8ee2984 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -25,6 +25,8 @@ namespace zmq
struct i_endpoint
{
+ virtual void attach_inpipe (class reader_t *pipe_) = 0;
+ virtual void attach_outpipe (class writer_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
diff --git a/src/session.cpp b/src/session.cpp
index f562bd5..ef17d6d 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -43,21 +43,6 @@ zmq::session_t::~session_t ()
out_pipe->term ();
}
-void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
-{
- zmq_assert (!in_pipe);
- in_pipe = pipe_;
- active = true;
- in_pipe->set_endpoint (this);
-}
-void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
-{
- zmq_assert (!out_pipe);
- out_pipe = pipe_;
- out_pipe->set_endpoint (this);
-}
-
-
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
if (!active)
@@ -90,6 +75,20 @@ void zmq::session_t::detach ()
// term ();
}
+void zmq::session_t::attach_inpipe (reader_t *pipe_)
+{
+ zmq_assert (!in_pipe);
+ in_pipe = pipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+}
+void zmq::session_t::attach_outpipe (writer_t *pipe_)
+{
+ zmq_assert (!out_pipe);
+ out_pipe = pipe_;
+ out_pipe->set_endpoint (this);
+}
+
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
diff --git a/src/session.hpp b/src/session.hpp
index 46699cf..195bdca 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -37,13 +37,6 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
const options_t &options_);
- void set_inbound_pipe (class reader_t *pipe_);
- void set_outbound_pipe (class writer_t *pipe_);
-
- private:
-
- ~session_t ();
-
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
@@ -51,10 +44,16 @@ namespace zmq
void detach ();
// i_endpoint interface implementation.
+ void attach_inpipe (class reader_t *pipe_);
+ void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
+ private:
+
+ ~session_t ();
+
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e14065b..4e14c68 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
- session->set_outbound_pipe (&in_pipe->writer);
+ session->attach_outpipe (&in_pipe->writer);
in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
out_pipe->writer.set_endpoint (this);
- session->set_inbound_pipe (&out_pipe->reader);
+ session->attach_inpipe (&out_pipe->reader);
out_pipes.push_back (&out_pipe->writer);
// Activate the session.
@@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
+void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ in_pipes.push_back (pipe_);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+}
+
+void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ out_pipes.push_back (pipe_);
+}
+
void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
zmq_assert (in_pipe_);
- in_pipe_->set_endpoint (this);
- in_pipes.push_back (in_pipe_);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
+ attach_inpipe (in_pipe_);
zmq_assert (out_pipe_);
- out_pipe_->set_endpoint (this);
- out_pipes.push_back (out_pipe_);
+ attach_outpipe (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 490c09a..284d2c4 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -60,6 +60,8 @@ namespace zmq
class session_t *find_session (const char *name_);
// i_endpoint interface implementation.
+ void attach_inpipe (class reader_t *pipe_);
+ void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);