From 26ca5ed8c62f8a88a32106a5c9e003712f4ca655 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 14 Nov 2009 18:57:04 +0100 Subject: Fixing concurrency issue in rep.cpp resulting in broken connections with multiple requesters under heavy load. --- src/rep.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rep.cpp b/src/rep.cpp index e8a9e39..7599cb5 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { bool fetched = in_pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; if (fetched) { reply_pipe = out_pipes [current]; waiting_for_reply = true; - return 0; } + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; } // No message is available. Initialise the output parameter -- cgit v1.2.3 From c2e0661b0afb2f50f47e0275fa6603947f26d240 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 19 Nov 2009 08:06:52 +0100 Subject: uninitialised variable in devpoll_t and kqueue_t --- src/devpoll.cpp | 3 ++- src/kqueue.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/devpoll.cpp b/src/devpoll.cpp index f28d55e..0ee772b 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -37,7 +37,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::devpoll_t::devpoll_t () +zmq::devpoll_t::devpoll_t () : + stopping (false) { // Get limit on open files struct rlimit rl; diff --git a/src/kqueue.cpp b/src/kqueue.cpp index f32fa36..69ad0c8 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -33,7 +33,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::kqueue_t::kqueue_t () +zmq::kqueue_t::kqueue_t () : + stopping (false) { // Create event queue kqueue_fd = kqueue (); -- cgit v1.2.3 From 14f2fecdcd9732fe741c211138a4ba327816a937 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 19 Nov 2009 09:53:49 +0100 Subject: ZMQII-24: SEGFAULT when anonymous session disconnects --- src/pipe.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pipe.cpp b/src/pipe.cpp index e444520..0e15dce 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -81,7 +81,11 @@ void zmq::reader_t::term () void zmq::reader_t::process_revive () { - endpoint->revive (this); + // Beacuse of command throttling mechanism, incoming termination request + // may not have been processed before subsequent send. + // In that case endpoint is NULL. + if (endpoint) + endpoint->revive (this); } void zmq::reader_t::process_pipe_term_ack () -- cgit v1.2.3 From 0e9ab2e8a3f5bc22f2c331c14236a2918a5512a8 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Nov 2009 20:59:55 +0100 Subject: inproc transport - initial commit --- src/dispatcher.cpp | 56 +++++++++++++++++++++++++++++++++++++++++++++++ src/dispatcher.hpp | 12 +++++++++++ src/object.cpp | 15 +++++++++++++ src/object.hpp | 6 ++++++ src/socket_base.cpp | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/socket_base.hpp | 12 +++++++++++ src/zmq.cpp | 6 ++++-- 7 files changed, 165 insertions(+), 4 deletions(-) diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1f6b4f0..1e41ee8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -20,6 +20,7 @@ #include "../bindings/c/zmq.h" #include "dispatcher.hpp" +#include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) zmq_assert (erased == 1); pipes_sync.unlock (); } + +int zmq::dispatcher_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 23b6a33..8364d4d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -97,6 +97,11 @@ namespace zmq void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); + // Management of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + private: ~dispatcher_t (); @@ -149,6 +154,13 @@ namespace zmq // and 'terminated' flag). mutex_t term_sync; + // List of inproc endpoints within this context. + typedef std::map endpoints_t; + endpoints_t endpoints; + + // Synchronisation of access to the list of inproc endpoints. + mutex_t endpoints_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/object.cpp b/src/object.cpp index 1433b7b..d24e477 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_) dispatcher->unregister_pipe (pipe_); } +int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) +{ + return dispatcher->register_endpoint (addr_, socket_); +} + +void zmq::object_t::unregister_endpoints (socket_base_t *socket_) +{ + return dispatcher->unregister_endpoints (socket_); +} + +zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) +{ + return dispatcher->find_endpoint (addr_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); diff --git a/src/object.hpp b/src/object.hpp index 1954071..6331372 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,6 +49,12 @@ namespace zmq protected: + // Using following function, socket is able to access global + // repository of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + // Derived object can use following functions to interact with // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6583608..86e1205 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : pending_term_acks (0), ticks (0), app_thread (parent_), - shutting_down (false) + shutting_down (false), + sent_seqnum (0), + processed_seqnum (0) { } @@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") + return register_endpoint (addr_args.c_str (), this); + if (addr_type == "tcp") { zmq_listener_t *listener = new zmq_listener_t ( choose_io_thread (options.affinity), this, options); @@ -126,6 +131,42 @@ int zmq::socket_base_t::connect (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") { + + // Find the peer socket. + socket_base_t *peer = find_endpoint (addr_args.c_str ()); + if (!peer) + return -1; + + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; + + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new pipe_t (this, peer, options.hwm, options.lwm); + zmq_assert (in_pipe); + } + + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new pipe_t (peer, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + } + + // Attach the pipes to this socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the peer socket. Note that peer's seqnum + // was incremented in find_endpoint function. When this command + // is delivered, peer will consider the seqnum to be processed. + // TODO: Seems that 'session' parameter is unused... + send_bind (peer, NULL, out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); + + return 0; + } + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -319,13 +360,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + shutting_down = true; + + // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); // Pointer to the dispatcher must be retrieved before the socket is // deallocated. Afterwards it is not available. dispatcher_t *dispatcher = get_dispatcher (); - shutting_down = true; + // Unregister all inproc endpoints associated with this socket. + // From this point we are sure that inc_seqnum won't be called again + // on this object. + dispatcher->unregister_endpoints (this); + + // Wait till all undelivered commands are delivered. This should happen + // very quickly. There's no way to wait here for extensive period of time. + while (processed_seqnum != sent_seqnum.get ()) + app_thread->process_commands (true, false); while (true) { @@ -364,6 +416,12 @@ int zmq::socket_base_t::close () return 0; } +void zmq::socket_base_t::inc_seqnum () +{ + // NB: This function may be called from a different thread! + sent_seqnum.add (1); +} + zmq::app_thread_t *zmq::socket_base_t::get_thread () { return app_thread; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 49ff5a5..b6df8c4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -33,6 +33,7 @@ #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -54,6 +55,11 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); + // This function is used by the polling mechanism to determine // whether the socket belongs to the application thread the poll // is called from. @@ -132,6 +138,12 @@ namespace zmq // started. bool shutting_down; + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is diff --git a/src/zmq.cpp b/src/zmq.cpp index 7952b61..9b66be8 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_) void *zmq_init (int app_threads_, int io_threads_, int flags_) { - // There should be at least a single thread managed by the dispatcher. - if (app_threads_ <= 0 || io_threads_ <= 0 || + // There should be at least a single application thread managed + // by the dispatcher. There's no need for I/O threads if 0MQ is used + // only for inproc messaging + if (app_threads_ < 1 || io_threads_ < 0 || app_threads_ > 63 || io_threads_ > 63) { errno = EINVAL; return NULL; -- cgit v1.2.3 From 64634605b3ccb90d582cfdf380535c89bf900a0e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Nov 2009 21:13:29 +0100 Subject: obsolete parameter removed from 'bind' command --- src/command.hpp | 1 - src/object.cpp | 9 +++------ src/object.hpp | 6 +++--- src/session.cpp | 2 +- src/socket_base.cpp | 6 ++---- src/socket_base.hpp | 3 +-- 6 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/command.hpp b/src/command.hpp index 9a2e5d5..a31805b 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -70,7 +70,6 @@ namespace zmq // Sent from session to socket to establish pipe(s) between them. struct { - class owned_t *session; class reader_t *in_pipe; class writer_t *out_pipe; } bind; diff --git a/src/object.cpp b/src/object.cpp index d24e477..6b05380 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,8 +83,7 @@ void zmq::object_t::process_command (command_t &cmd_) return; case command_t::bind: - process_bind (cmd_.args.bind.session, - cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); return; case command_t::pipe_term: @@ -183,13 +182,12 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) send_command (cmd); } -void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, +void zmq::object_t::send_bind (object_t *destination_, reader_t *in_pipe_, writer_t *out_pipe_) { command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; - cmd.args.bind.session = session_; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; send_command (cmd); @@ -265,8 +263,7 @@ void zmq::object_t::process_attach (i_engine *engine_) zmq_assert (false); } -void zmq::object_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 6331372..2f6c0c4 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -68,7 +68,7 @@ namespace zmq class owned_t *object_); void send_attach (class session_t *destination_, struct i_engine *engine_); - void send_bind (object_t *destination_, class owned_t *session_, + void send_bind (object_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); @@ -84,8 +84,8 @@ namespace zmq virtual void process_plug (); virtual void process_own (class owned_t *object_); virtual void process_attach (struct i_engine *engine_); - virtual void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + virtual void process_bind (class reader_t *in_pipe_, + class writer_t *out_pipe_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/session.cpp b/src/session.cpp index eb0a963..388437b 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -155,7 +155,7 @@ void zmq::session_t::process_plug () out_pipe->set_endpoint (this); } - send_bind (owner, this, outbound ? &outbound->reader : NULL, + send_bind (owner, outbound ? &outbound->reader : NULL, inbound ? &inbound->writer : NULL); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 86e1205..e242e05 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -160,8 +160,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. When this command // is delivered, peer will consider the seqnum to be processed. - // TODO: Seems that 'session' parameter is unused... - send_bind (peer, NULL, out_pipe ? &out_pipe->reader : NULL, + send_bind (peer, out_pipe ? &out_pipe->reader : NULL, in_pipe ? &in_pipe->writer : NULL); return 0; @@ -510,8 +509,7 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) { attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index b6df8c4..c766bda 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -114,8 +114,7 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); void process_term_req (class owned_t *object_); void process_term_ack (); -- cgit v1.2.3 From c41daca3da6ffd033f93c3e24898414567f71eb3 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Nov 2009 21:30:09 +0100 Subject: race condition in inproc transport shutdown fixed --- src/command.hpp | 3 +++ src/object.cpp | 9 ++++++--- src/object.hpp | 6 +++--- src/session.cpp | 4 +++- src/socket_base.cpp | 16 ++++++++++++---- src/socket_base.hpp | 3 ++- 6 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/command.hpp b/src/command.hpp index a31805b..3099852 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -69,9 +69,12 @@ namespace zmq } attach; // Sent from session to socket to establish pipe(s) between them. + // If adjust_seqnum is true, caller have used inc_seqnum beforehand + // and thus the callee should take care of catching up. struct { class reader_t *in_pipe; class writer_t *out_pipe; + bool adjust_seqnum; } bind; // Sent by pipe writer to inform dormant pipe reader that there diff --git a/src/object.cpp b/src/object.cpp index 6b05380..b5d5eee 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,7 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_) return; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + cmd_.args.bind.adjust_seqnum); return; case command_t::pipe_term: @@ -183,13 +184,14 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) } void zmq::object_t::send_bind (object_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_) + reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_) { command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + cmd.args.bind.adjust_seqnum = adjust_seqnum_; send_command (cmd); } @@ -263,7 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_) zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 2f6c0c4..4fd0a8e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -68,8 +68,8 @@ namespace zmq class owned_t *object_); void send_attach (class session_t *destination_, struct i_engine *engine_); - void send_bind (object_t *destination_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void send_bind (object_t *destination_, class reader_t *in_pipe_, + class writer_t *out_pipe_, bool adjust_seqnum_); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -85,7 +85,7 @@ namespace zmq virtual void process_own (class owned_t *object_); virtual void process_attach (struct i_engine *engine_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, bool adjust_seqnum_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/session.cpp b/src/session.cpp index 388437b..f62de27 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -155,8 +155,10 @@ void zmq::session_t::process_plug () out_pipe->set_endpoint (this); } + // Note that initial call to inc_seqnum was optimised out. Last + // parameter conveys the fact to the callee. send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); + inbound ? &inbound->writer : NULL, false); } owned_t::process_plug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e242e05..a614759 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -158,10 +158,10 @@ int zmq::socket_base_t::connect (const char *addr_) out_pipe ? &out_pipe->writer : NULL); // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. When this command - // is delivered, peer will consider the seqnum to be processed. + // was incremented in find_endpoint function. The callee is notified + // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + in_pipe ? &in_pipe->writer : NULL, true); return 0; } @@ -509,8 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { + // In case of inproc transport, the seqnum should catch up here. + // For other transports the seqnum modification can be optimised out + // because final handshaking between the socket and the session ensures + // that no 'bind' command will be left unprocessed. + if (adjust_seqnum_) + processed_seqnum++; + attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c766bda..dd7b526 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -114,7 +114,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + bool adjust_seqnum_); void process_term_req (class owned_t *object_); void process_term_ack (); -- cgit v1.2.3 From 55b64a02e7f3b1ce9e512240a9f9a337ead3b54c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 Nov 2009 08:47:06 +0100 Subject: man pages - initial (dummy) version --- Makefile.am | 8 ++++++-- configure.in | 8 +++++++- man/Makefile.am | 16 ++++++++++++++++ man/convert2pdf.sh | 48 +++++++++++++++++++++++++++++++++++++++++++++++ man/man1/zmq_forwarder.1 | 11 +++++++++++ man/man3/zmq_bind.3 | 12 ++++++++++++ man/man3/zmq_close.3 | 12 ++++++++++++ man/man3/zmq_connect.3 | 12 ++++++++++++ man/man3/zmq_flush.3 | 12 ++++++++++++ man/man3/zmq_init.3 | 12 ++++++++++++ man/man3/zmq_poll.3 | 12 ++++++++++++ man/man3/zmq_recv.3 | 12 ++++++++++++ man/man3/zmq_send.3 | 12 ++++++++++++ man/man3/zmq_setsockopt.3 | 12 ++++++++++++ man/man3/zmq_socket.3 | 12 ++++++++++++ man/man3/zmq_term.3 | 12 ++++++++++++ man/man7/zmq.7 | 9 +++++++++ 17 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 man/Makefile.am create mode 100644 man/convert2pdf.sh create mode 100644 man/man1/zmq_forwarder.1 create mode 100644 man/man3/zmq_bind.3 create mode 100644 man/man3/zmq_close.3 create mode 100644 man/man3/zmq_connect.3 create mode 100644 man/man3/zmq_flush.3 create mode 100644 man/man3/zmq_init.3 create mode 100644 man/man3/zmq_poll.3 create mode 100644 man/man3/zmq_recv.3 create mode 100644 man/man3/zmq_send.3 create mode 100644 man/man3/zmq_setsockopt.3 create mode 100644 man/man3/zmq_socket.3 create mode 100644 man/man3/zmq_term.3 create mode 100644 man/man7/zmq.7 diff --git a/Makefile.am b/Makefile.am index fe0f1f0..af6ba2e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,8 +2,12 @@ if BUILD_PERF DIR_PERF = perf endif -SUBDIRS = src $(DIR_PERF) devices bindings -DIST_SUBDIRS = src perf devices bindings +if INSTALL_MAN +DIR_MAN = man +endif + +SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings +DIST_SUBDIRS = src man perf devices bindings EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \ $(top_srcdir)/foreign/openpgm/@pgm2_basename@ \ diff --git a/configure.in b/configure.in index 1b1db35..c2cf678 100644 --- a/configure.in +++ b/configure.in @@ -49,6 +49,10 @@ on_mingw32="no" # Host speciffic checks AC_CANONICAL_HOST +# Whether or not install manual pages. +# Note that on MinGW manpages are not installed. +install_man="yes" + case "${host_os}" in *linux*) AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) @@ -134,6 +138,7 @@ case "${host_os}" in [AC_MSG_ERROR([Could not link with Iphlpapi.dll.])]) CFLAGS="${CFLAGS} -std=c99" on_mingw32="yes" + install_man="no" ;; *) AC_MSG_ERROR([Not supported os: $host.]) @@ -617,6 +622,7 @@ AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") +AM_CONDITIONAL(INSTALL_MAN, test "x$install_man" = "xyes") AC_SUBST(stdint) AC_SUBST(inttypes) @@ -631,7 +637,7 @@ AC_FUNC_MALLOC AC_TYPE_SIGNAL AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) -AC_OUTPUT(Makefile src/Makefile bindings/python/Makefile \ +AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ diff --git a/man/Makefile.am b/man/Makefile.am new file mode 100644 index 0000000..84e0e47 --- /dev/null +++ b/man/Makefile.am @@ -0,0 +1,16 @@ +dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ + man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \ + man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \ + man3/zmq_poll.3 man7/zmq.7 + +distclean-local: + -rm *.pdf + -rm man1/*.ps + -rm man3/*.ps + -rm man7/*.ps + +dist-hook: + ./convert2pdf.sh + $(mkdir_p) $(top_distdir)/doc + cp $(top_srcdir)/man/*.pdf $(top_distdir)/doc + diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh new file mode 100644 index 0000000..6a7029f --- /dev/null +++ b/man/convert2pdf.sh @@ -0,0 +1,48 @@ +#!/bin/sh +# +# Copyright (c) 2007-2009 FastMQ Inc. +# +# This file is part of 0MQ. +# +# 0MQ is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# 0MQ is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see . + +groff -man -Tps man1/zmq_forwarder.1 > man1/zmq_forwarder.1.ps +ps2pdf man1/zmq_forwarder.1.ps zmq_forwarder.pdf + +groff -man -Tps man3/zmq_init.3 > man3/zmq_init.3.ps +ps2pdf man3/zmq_init.3.ps zmq_init.pdf +groff -man -Tps man3/zmq_term.3 > man3/zmq_term.3.ps +ps2pdf man3/zmq_term.3.ps zmq_term.pdf +groff -man -Tps man3/zmq_socket.3 > man3/zmq_socket.3.ps +ps2pdf man3/zmq_socket.3.ps zmq_socket.pdf +groff -man -Tps man3/zmq_close.3 > man3/zmq_close.3.ps +ps2pdf man3/zmq_close.3.ps zmq_close.pdf +groff -man -Tps man3/zmq_setsockopt.3 > man3/zmq_setsockopt.3.ps +ps2pdf man3/zmq_setsockopt.3.ps zmq_setsockopt.pdf +groff -man -Tps man3/zmq_bind.3 > man3/zmq_bind.3.ps +ps2pdf man3/zmq_bind.3.ps zmq_bind.pdf +groff -man -Tps man3/zmq_connect.3 > man3/zmq_connect.3.ps +ps2pdf man3/zmq_connect.3.ps zmq_connect.pdf +groff -man -Tps man3/zmq_send.3 > man3/zmq_send.3.ps +ps2pdf man3/zmq_send.3.ps zmq_send.pdf +groff -man -Tps man3/zmq_flush.3 > man3/zmq_flush.3.ps +ps2pdf man3/zmq_flush.3.ps zmq_flush.pdf +groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps +ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf +groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps +ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf + +groff -man -Tps man7/zmq.7 > man7/zmq.7.ps +ps2pdf man7/zmq.7.ps zmq.pdf + diff --git a/man/man1/zmq_forwarder.1 b/man/man1/zmq_forwarder.1 new file mode 100644 index 0000000..63a0b6b --- /dev/null +++ b/man/man1/zmq_forwarder.1 @@ -0,0 +1,11 @@ +.TH zmq_forwarder 1 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_forwarder \- forwards the stream of PUB/SUB messages +.SH SYNOPSIS +.SH DESCRIPTION +.SH OPTIONS +.SH "SEE ALSO" +.SH AUTHOR +Martin Sustrik + + diff --git a/man/man3/zmq_bind.3 b/man/man3/zmq_bind.3 new file mode 100644 index 0000000..70f1df7 --- /dev/null +++ b/man/man3/zmq_bind.3 @@ -0,0 +1,12 @@ +.TH zmq_bind 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_bind \- binds the socket to the specified address +.SH SYNOPSIS +.B int zmq_bind (void *s, const char *addr); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_close.3 b/man/man3/zmq_close.3 new file mode 100644 index 0000000..c04f97a --- /dev/null +++ b/man/man3/zmq_close.3 @@ -0,0 +1,12 @@ +.TH zmq_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_close \- destroys 0MQ socket +.SH SYNOPSIS +.B int zmq_close (void *s); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_connect.3 b/man/man3/zmq_connect.3 new file mode 100644 index 0000000..c68101c --- /dev/null +++ b/man/man3/zmq_connect.3 @@ -0,0 +1,12 @@ +.TH zmq_connect 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_connect \- connects the socket to the specified address +.SH SYNOPSIS +.B int zmq_connect (void *s, const char *addr); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_flush.3 b/man/man3/zmq_flush.3 new file mode 100644 index 0000000..f84c561 --- /dev/null +++ b/man/man3/zmq_flush.3 @@ -0,0 +1,12 @@ +.TH zmq_flush 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_flush \- flushes pre-sent messages to the socket +.SH SYNOPSIS +.B int zmq_flush (void *s); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_init.3 b/man/man3/zmq_init.3 new file mode 100644 index 0000000..04f04ef --- /dev/null +++ b/man/man3/zmq_init.3 @@ -0,0 +1,12 @@ +.TH zmq_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_init \- initialises 0MQ context +.SH SYNOPSIS +.B void *zmq_init (int app_threads, int io_threads, int flags); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3 new file mode 100644 index 0000000..d821e9f --- /dev/null +++ b/man/man3/zmq_poll.3 @@ -0,0 +1,12 @@ +.TH zmq_poll 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_poll \- polls for events on a set of 0MQ and POSIX sockets +.SH SYNOPSIS +.B int zmq_poll (zmq_pollitem_t *items, int nitems); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_recv.3 b/man/man3/zmq_recv.3 new file mode 100644 index 0000000..8308f79 --- /dev/null +++ b/man/man3/zmq_recv.3 @@ -0,0 +1,12 @@ +.TH zmq_recv 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_recv \- retrieves a message from the socket +.SH SYNOPSIS +.B int zmq_recv (void *s, zmq_msg_t *msg, int flags); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_send.3 b/man/man3/zmq_send.3 new file mode 100644 index 0000000..ff6e429 --- /dev/null +++ b/man/man3/zmq_send.3 @@ -0,0 +1,12 @@ +.TH zmq_send 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_send \- sends a message +.SH SYNOPSIS +.B int zmq_send (void *s, zmq_msg_t *msg, int flags); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3 new file mode 100644 index 0000000..da528cf --- /dev/null +++ b/man/man3/zmq_setsockopt.3 @@ -0,0 +1,12 @@ +.TH zmq_setsockopt 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_setsockopt \- sets a specified option on a 0MQ socket +.SH SYNOPSIS +.B int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3 new file mode 100644 index 0000000..f404b6b --- /dev/null +++ b/man/man3/zmq_socket.3 @@ -0,0 +1,12 @@ +.TH zmq_socket 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_socket \- creates 0MQ socket +.SH SYNOPSIS +.B void *zmq_socket (void *context, int type); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_term.3 b/man/man3/zmq_term.3 new file mode 100644 index 0000000..afd3273 --- /dev/null +++ b/man/man3/zmq_term.3 @@ -0,0 +1,12 @@ +.TH zmq_term 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_init \- terminates 0MQ context +.SH SYNOPSIS +.B int zmq_term (void *context); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man7/zmq.7 b/man/man7/zmq.7 new file mode 100644 index 0000000..02257a7 --- /dev/null +++ b/man/man7/zmq.7 @@ -0,0 +1,9 @@ +.TH zmq 7 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +0MQ \- a lightweight messaging kernel +.SH SYNOPSIS +.SH DESCRIPTION +.SH "SEE ALSO" +.SH AUTHOR +Martin Sustrik + -- cgit v1.2.3 From ed5563f75285197aa0cdbe8a0dc6f80c5bb1f89c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 Nov 2009 10:25:53 +0100 Subject: man pages (dummy) added for zmq_msg_* functions --- man/Makefile.am | 5 ++++- man/convert2pdf.sh | 16 ++++++++++++++++ man/man3/zmq_msg_close.3 | 12 ++++++++++++ man/man3/zmq_msg_copy.3 | 12 ++++++++++++ man/man3/zmq_msg_data.3 | 12 ++++++++++++ man/man3/zmq_msg_init.3 | 12 ++++++++++++ man/man3/zmq_msg_init_data.3 | 15 +++++++++++++++ man/man3/zmq_msg_init_size.3 | 12 ++++++++++++ man/man3/zmq_msg_move.3 | 12 ++++++++++++ man/man3/zmq_msg_size.3 | 12 ++++++++++++ 10 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 man/man3/zmq_msg_close.3 create mode 100644 man/man3/zmq_msg_copy.3 create mode 100644 man/man3/zmq_msg_data.3 create mode 100644 man/man3/zmq_msg_init.3 create mode 100644 man/man3/zmq_msg_init_data.3 create mode 100644 man/man3/zmq_msg_init_size.3 create mode 100644 man/man3/zmq_msg_move.3 create mode 100644 man/man3/zmq_msg_size.3 diff --git a/man/Makefile.am b/man/Makefile.am index 84e0e47..9477351 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -1,7 +1,10 @@ dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \ man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \ - man3/zmq_poll.3 man7/zmq.7 + man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \ + man3/zmq_msg_data.3 man3/zmq_msg_close.3 man3/zmq_msg_move.3 \ + man3/zmq_msg_copy.3 man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ + man7/zmq.7 distclean-local: -rm *.pdf diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh index 6a7029f..f47854a 100644 --- a/man/convert2pdf.sh +++ b/man/convert2pdf.sh @@ -42,6 +42,22 @@ groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf +groff -man -Tps man3/zmq_msg_init.3 > man3/zmq_msg_init.3.ps +ps2pdf man3/zmq_msg_init.3.ps zmq_msg_init.pdf +groff -man -Tps man3/zmq_msg_init_size.3 > man3/zmq_msg_init_size.3.ps +ps2pdf man3/zmq_msg_init_size.3.ps zmq_msg_init_size.pdf +groff -man -Tps man3/zmq_msg_init_data.3 > man3/zmq_msg_init_data.3.ps +ps2pdf man3/zmq_msg_init_data.3.ps zmq_msg_init_data.pdf +groff -man -Tps man3/zmq_msg_close.3 > man3/zmq_msg_close.3.ps +ps2pdf man3/zmq_msg_close.3.ps zmq_msg_close.pdf +groff -man -Tps man3/zmq_msg_move.3 > man3/zmq_msg_move.3.ps +ps2pdf man3/zmq_msg_move.3.ps zmq_msg_move.pdf +groff -man -Tps man3/zmq_msg_copy.3 > man3/zmq_msg_copy.3.ps +ps2pdf man3/zmq_msg_copy.3.ps zmq_msg_copy.pdf +groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps +ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf +groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps +ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf groff -man -Tps man7/zmq.7 > man7/zmq.7.ps ps2pdf man7/zmq.7.ps zmq.pdf diff --git a/man/man3/zmq_msg_close.3 b/man/man3/zmq_msg_close.3 new file mode 100644 index 0000000..ee9d009 --- /dev/null +++ b/man/man3/zmq_msg_close.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_close \- destroys 0MQ message +.SH SYNOPSIS +.B int zmq_msg_close (zmq_msg_t *msg); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_copy.3 b/man/man3/zmq_msg_copy.3 new file mode 100644 index 0000000..239203b --- /dev/null +++ b/man/man3/zmq_msg_copy.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_copy 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_copy \- copies content of a message to another message +.SH SYNOPSIS +.B int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_data.3 b/man/man3/zmq_msg_data.3 new file mode 100644 index 0000000..647e9e9 --- /dev/null +++ b/man/man3/zmq_msg_data.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_data \- retrieves pointer to the message content +.SH SYNOPSIS +.B void *zmq_msg_data (zmq_msg_t *msg); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_init.3 b/man/man3/zmq_msg_init.3 new file mode 100644 index 0000000..8b18207 --- /dev/null +++ b/man/man3/zmq_msg_init.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises empty 0MQ message +.SH SYNOPSIS +.B int zmq_msg_init (zmq_msg_t *msg); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_init_data.3 b/man/man3/zmq_msg_init_data.3 new file mode 100644 index 0000000..d00ed9c --- /dev/null +++ b/man/man3/zmq_msg_init_data.3 @@ -0,0 +1,15 @@ +.TH zmq_msg_init_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises 0MQ message from the given data +.SH SYNOPSIS +.nf +.B typedef void (zmq_free_fn) (void *data); +.B int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); +.fi +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_init_size.3 b/man/man3/zmq_msg_init_size.3 new file mode 100644 index 0000000..b1baa75 --- /dev/null +++ b/man/man3/zmq_msg_init_size.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_init_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises 0MQ message of a specified size +.SH SYNOPSIS +.B int zmq_msg_init_size (zmq_msg_t *msg, size_t size); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_move.3 b/man/man3/zmq_msg_move.3 new file mode 100644 index 0000000..0daf2f7 --- /dev/null +++ b/man/man3/zmq_msg_move.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_move 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_move \- moves content of a message to another message +.SH SYNOPSIS +.B int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_msg_size.3 b/man/man3/zmq_msg_size.3 new file mode 100644 index 0000000..d8bbc53 --- /dev/null +++ b/man/man3/zmq_msg_size.3 @@ -0,0 +1,12 @@ +.TH zmq_msg_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_size \- retrieves size of the message content +.SH SYNOPSIS +.B size_t zmq_msg_size (zmq_msg_t *msg); +.SH DESCRIPTION +.SH RETURN VALUE +.SH ERRORS +.SH EXAMPLE +.SH SEE ALSO +.SH AUTHOR +Martin Sustrik -- cgit v1.2.3 From 6602cce9af93539df8f1c43235e7e7130a3df60d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 Nov 2009 12:05:11 +0100 Subject: zmq_init, zmq_term & zmq_strerror man pages added --- man/Makefile.am | 2 +- man/convert2pdf.sh | 2 ++ man/man3/zmq_init.3 | 26 ++++++++++++++++++++++++++ man/man3/zmq_strerror.3 | 27 +++++++++++++++++++++++++++ man/man3/zmq_term.3 | 11 +++++++++++ 5 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 man/man3/zmq_strerror.3 diff --git a/man/Makefile.am b/man/Makefile.am index 9477351..2efb80d 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -4,7 +4,7 @@ dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \ man3/zmq_msg_data.3 man3/zmq_msg_close.3 man3/zmq_msg_move.3 \ man3/zmq_msg_copy.3 man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ - man7/zmq.7 + man3/zmq_strerror.3 man7/zmq.7 distclean-local: -rm *.pdf diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh index f47854a..85bc22c 100644 --- a/man/convert2pdf.sh +++ b/man/convert2pdf.sh @@ -58,6 +58,8 @@ groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf +groff -man -Tps man3/zmq_strerror.3 > man3/zmq_strerror.3.ps +ps2pdf man3/zmq_strerror.3.ps zmq_strerror.pdf groff -man -Tps man7/zmq.7 > man7/zmq.7.ps ps2pdf man7/zmq.7.ps zmq.pdf diff --git a/man/man3/zmq_init.3 b/man/man3/zmq_init.3 index 04f04ef..1e48fd7 100644 --- a/man/man3/zmq_init.3 +++ b/man/man3/zmq_init.3 @@ -4,9 +4,35 @@ zmq_init \- initialises 0MQ context .SH SYNOPSIS .B void *zmq_init (int app_threads, int io_threads, int flags); .SH DESCRIPTION +Initialises 0MQ context. +.IR app_threads +specifies maximal number of application threads that can own open sockets +at the same time. At least one application thread should be defined. +.IR io_threads +specifies the size of thread pool to handle I/O operations. The value shouldn't +be negative. Zero can be used in case only in-process messaging is going to be +used, i.e. there will be no I/O traffic. +'flags' argument is a combination of the flags defined below: + +.IP "\fBZMQ_POLL\fP" +flag specifying that the sockets within this context should be pollable (see +.IR zmq_poll +). Pollable sockets may add a little latency to the message transfer when +compared to non-pollable sockets. + .SH RETURN VALUE +Function returns context handle is successful. Otherwise it returns NULL and +sets errno to one of the values below. .SH ERRORS +.IP "\fBEINVAL\fP" +- there's less than one application thread allocated, or number of I/O threads +is negative. .SH EXAMPLE +.nf +void *ctx = zmq_init (1, 1, ZMQ_POLL); +assert (ctx); +.fi .SH SEE ALSO +.BR zmq_term (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_strerror.3 b/man/man3/zmq_strerror.3 new file mode 100644 index 0000000..343c3ed --- /dev/null +++ b/man/man3/zmq_strerror.3 @@ -0,0 +1,27 @@ +.TH zmq_strerror 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_strerror \- returns string describing the error number +.SH SYNOPSIS +.B const char *zmq_strerror (int errnum); +.SH DESCRIPTION +As 0MQ defines few additional (non-POSIX) error codes, standard +.IR strerror +isn't capable of translating those errors into human readable strings. Instead, +.IR zmq_strerror +should be used. +.SH RETURN VALUE +Returns string describing the error number. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +void *ctx = zmq_init (1, 1, 0); +if (!ctx) { + printf ("error occured during zmq_init: %s\\n", zmq_strerror (errno)); + abort (); +} +.fi +.SH SEE ALSO +.BR zmq (7) +.SH AUTHOR +Martin Sustrik diff --git a/man/man3/zmq_term.3 b/man/man3/zmq_term.3 index afd3273..14d9da9 100644 --- a/man/man3/zmq_term.3 +++ b/man/man3/zmq_term.3 @@ -4,9 +4,20 @@ zmq_init \- terminates 0MQ context .SH SYNOPSIS .B int zmq_term (void *context); .SH DESCRIPTION +Destroys 0MQ context. However, if there are still any sockets open within +the context, +.IR zmq_term +succeeds but shutdown of the context is delayed till the last socket is closed. .SH RETURN VALUE +Function returns zero is successful. Otherwise it returns -1 and +sets errno to one of the values below. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +int rc = zmq_term (context); +assert (rc == 0); +.fi .SH SEE ALSO .SH AUTHOR Martin Sustrik -- cgit v1.2.3 From e90ada0d044636201c57786307a49a52f9cf7643 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 Nov 2009 16:51:21 +0100 Subject: more man pages filled in --- bindings/c/zmq.h | 2 +- man/man3/zmq_bind.3 | 36 ++++++++++++++++ man/man3/zmq_close.3 | 15 +++++++ man/man3/zmq_connect.3 | 37 ++++++++++++++++- man/man3/zmq_flush.3 | 25 +++++++++++ man/man3/zmq_init.3 | 3 +- man/man3/zmq_recv.3 | 40 ++++++++++++++++++ man/man3/zmq_send.3 | 52 +++++++++++++++++++++++ man/man3/zmq_setsockopt.3 | 103 ++++++++++++++++++++++++++++++++++++++++++++++ man/man3/zmq_socket.3 | 56 +++++++++++++++++++++++++ man/man3/zmq_term.3 | 2 + 11 files changed, 368 insertions(+), 3 deletions(-) diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 40750ec..9b11a1d 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -180,7 +180,7 @@ ZMQ_EXPORT int zmq_term (void *context); // Socket to send requests and receive replies. Requests are // load-balanced among all the peers. This socket type allows -// only an alternated sequence of send's and recv's +// only an alternated sequence of send's and recv's. #define ZMQ_REQ 3 // Socket to receive requests and send replies. This socket type allows diff --git a/man/man3/zmq_bind.3 b/man/man3/zmq_bind.3 index 70f1df7..069b966 100644 --- a/man/man3/zmq_bind.3 +++ b/man/man3/zmq_bind.3 @@ -4,9 +4,45 @@ zmq_bind \- binds the socket to the specified address .SH SYNOPSIS .B int zmq_bind (void *s, const char *addr); .SH DESCRIPTION +The function binds socket +.IR s to a particular transport. Actual semantics of the +command depend on the underlying transport mechanism, however, in cases where +peers connect in an asymetric manner, +.IR zmq_bind +should be called first, +.IR zmq_connect +afterwards. For actual formats of +.IR addr +parameter for different types of transport have a look at +.IR zmq(7) . +Note that single socket can be bound (and connected) to +arbitrary number of peers using different transport mechanisms. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBEPROTONOSUPPORT\fP" +unsupported protocol. +.IP "\fBENOCOMPATPROTO\fP" +protocol is not compatible with the socket type. +.IP "\fBEADDRINUSE\fP" +the given address is already in use. +.IP "\fBEADDRNOTAVAIL\fP" +a nonexistent interface was requested or the requested address was not local. .SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_PUB); +assert (s); +int rc = zmq_bind (s, "inproc://my_publisher"); +assert (rc == 0); +rc = zmq_bind (s, "tcp://eth0:5555"); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_connect (3) +.BR zmq_socket (3) +.BR zmq (7) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_close.3 b/man/man3/zmq_close.3 index c04f97a..cc49635 100644 --- a/man/man3/zmq_close.3 +++ b/man/man3/zmq_close.3 @@ -4,9 +4,24 @@ zmq_close \- destroys 0MQ socket .SH SYNOPSIS .B int zmq_close (void *s); .SH DESCRIPTION +Destroys 0MQ socket (one created using +.IR zmq_socket +function). All sockets have to be properly closed before the application +terminates, otherwise memory leaks will occur. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +int rc = zmq_close (s); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_socket (3) +.BR zmq_term (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_connect.3 b/man/man3/zmq_connect.3 index c68101c..8f09e20 100644 --- a/man/man3/zmq_connect.3 +++ b/man/man3/zmq_connect.3 @@ -1,12 +1,47 @@ .TH zmq_connect 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" .SH NAME -zmq_connect \- connects the socket to the specified address +zmq_connect \- connect the socket to the specified peer .SH SYNOPSIS .B int zmq_connect (void *s, const char *addr); .SH DESCRIPTION +The function connect socket +.IR s to the peer identified by +.IR addr . +Actual semantics of the command depend on the underlying transport mechanism, +however, in cases where peers connect in an asymetric manner, +.IR zmq_bind +should be called first, +.IR zmq_connect +afterwards. For actual formats of +.IR addr +parameter for different types of transport have a look at +.IR zmq(7) . +Note that single socket can be connected (and bound) to +arbitrary number of peers using different transport mechanisms. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBEPROTONOSUPPORT\fP" +unsupported protocol. +.IP "\fBENOCOMPATPROTO\fP" +protocol is not compatible with the socket type. +.IP "\fBECONNREFUSED\fP" +no-one listening on the remote address. .SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_SUB); +assert (s); +int rc = zmq_connect (s, "inproc://my_publisher"); +assert (rc == 0); +rc = zmq_connect (s, "tcp://server001:5555"); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_bind (3) +.BR zmq_socket (3) +.BR zmq (7) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_flush.3 b/man/man3/zmq_flush.3 index f84c561..194cf6c 100644 --- a/man/man3/zmq_flush.3 +++ b/man/man3/zmq_flush.3 @@ -4,9 +4,34 @@ zmq_flush \- flushes pre-sent messages to the socket .SH SYNOPSIS .B int zmq_flush (void *s); .SH DESCRIPTION +Flushes all the pre-sent messages - i.e. those that have been sent with +ZMQ_NOFLUSH flag - to the socket. This functionality improves performance in +cases where several messages are sent during a single business operation. +It should not be used as a transaction - ACID properties are not guaranteed. +Note that calling +.IR zmq_send +without ZMQ_NOFLUSH flag automatically flushes all previously pre-sent messages. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. .SH EXAMPLE +.nf +rc = zmq_send (s, &msg1, ZMQ_NOFLUSH); +assert (rc == 0); +rc = zmq_send (s, &msg2, ZMQ_NOFLUSH); +assert (rc == 0); +rc = zmq_flush (s); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_send (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_init.3 b/man/man3/zmq_init.3 index 1e48fd7..317dbba 100644 --- a/man/man3/zmq_init.3 +++ b/man/man3/zmq_init.3 @@ -25,7 +25,7 @@ Function returns context handle is successful. Otherwise it returns NULL and sets errno to one of the values below. .SH ERRORS .IP "\fBEINVAL\fP" -- there's less than one application thread allocated, or number of I/O threads +there's less than one application thread allocated, or number of I/O threads is negative. .SH EXAMPLE .nf @@ -34,5 +34,6 @@ assert (ctx); .fi .SH SEE ALSO .BR zmq_term (3) +.BR zmq_socket (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_recv.3 b/man/man3/zmq_recv.3 index 8308f79..d3cf2fd 100644 --- a/man/man3/zmq_recv.3 +++ b/man/man3/zmq_recv.3 @@ -4,9 +4,49 @@ zmq_recv \- retrieves a message from the socket .SH SYNOPSIS .B int zmq_recv (void *s, zmq_msg_t *msg, int flags); .SH DESCRIPTION +Receive a message from the socket +.IR s , +store it in +.IR msg . +Any content previously in +.IR msg +will be properly deallocated. +.IR flags +argument can be combination of the flags described below. + +.IP "\fBZMQ_NOBLOCK\fP" +The flag specifies that the operation should be performed in +non-blocking mode. I.e. if it cannot be processed immediately, +error should be returned with +.IR errno +set to EAGAIN. + .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBEAGAIN\fP" +it's a non-blocking receive and there's no message available at the moment. +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. This error may occur with sockets that switch between +several states (e.g. ZMQ_REQ). .SH EXAMPLE +.nf +zmq_msg_t msg; +int rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_send (3) +.BR zmq_msg_init (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_send.3 b/man/man3/zmq_send.3 index ff6e429..0ebbd0c 100644 --- a/man/man3/zmq_send.3 +++ b/man/man3/zmq_send.3 @@ -4,9 +4,61 @@ zmq_send \- sends a message .SH SYNOPSIS .B int zmq_send (void *s, zmq_msg_t *msg, int flags); .SH DESCRIPTION +Send the message +.IR msg +to the socket +.IR s . +.IR flags +argument can be combination the flags described below. + +.IP "\fBZMQ_NOBLOCK\fP" +The flag specifies that the operation should be performed in +non-blocking mode. I.e. if it cannot be processed immediately, +error should be returned with +.IR errno +set to EAGAIN. + +.IP "\fBZMQ_NOFLUSH\fP" +The flag specifies that +.IR zmq_send +should not flush the message downstream immediately. Instead, it should batch +ZMQ_NOFLUSH messages and send them downstream only once +.IR zmq_flush +is invoked. This is an optimisation for cases where several messages are sent +in a single business transaction. However, the effect is measurable only in +extremely high-perf scenarios (million messages a second or so). +If that's not your case, use standard flushing send instead. + .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBEAGAIN\fP" +it's a non-blocking send and message cannot be sent at the moment. +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. This error may occur with sockets that switch between +several states (e.g. ZMQ_REQ). .SH EXAMPLE +.nf +zmq_msg_t msg; +int rc = zmq_msg_init_size (&msg, 6); +assert (rc == 0); +memset (zmq_msg_data (&msg), 'A', 6); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_flush (3) +.BR zmq_recv (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3 index da528cf..a79f879 100644 --- a/man/man3/zmq_setsockopt.3 +++ b/man/man3/zmq_setsockopt.3 @@ -4,9 +4,112 @@ zmq_setsockopt \- sets a specified option on a 0MQ socket .SH SYNOPSIS .B int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); .SH DESCRIPTION +Sets an option on the socket. +.IR option +argument specifies the option from the list below. +.IR optval +is a pointer to the value to set, +.IR optvallen +is the size of the value in bytes. + +.IP "\fBZMQ_HWM\fP" +High watermark for the message pipes associated with the socket. The water +mark cannot be exceeded. If the messages don't fit into the pipe emergency +mechanisms of the particular socket type are used (block, drop etc.) If HWM +is set to zero, there are no limits for the content of the pipe. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_LWM\fP" +Low watermark makes sense only if high watermark is defined (i.e. is non-zero). +When the emergency state is reached when messages overflow the pipe, the +emergency lasts till the size of the pipe decreases to low watermark. +At that point normal state is resumed. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_SWAP\fP" +Swap allows the pipe to exceed high watermark. However, the data are written +to the disk rather than held in the memory. Until high watermark is +exceeded there is no disk activity involved though. The value of the option +defines maximal size of the swap file. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_AFFINITY\fP" +Affinity defines which threads in the thread pool will be used to handle +newly created sockets. This way you can dedicate some of the threads (CPUs) +to a specific work. Value of 0 means no affinity. Work is distributed +fairly among the threads in the thread pool. For non-zero values, the lowest +bit corresponds to the thread 1, second lowest bit to the thread 2 etc. +Thus, value of 3 means that from now on newly created sockets will handle +I/O activity exclusively using threads no. 1 and 2. +Type: int64_t Unit: N/A (bitmap) Default: 0 + +.IP "\fBZMQ_IDENTITY\fP" +Identity of the socket. Identity is important when restarting applications. +If the socket has no identity, each run of the application is completely +separated from other runs. However, with identity application reconnects to +existing infrastructure left by the previous run. Thus it may receive +messages that were sent in the meantime, it shares pipe limits with the +previous run etc. +Type: string Unit: N/A Default: NULL + +.IP "\fBZMQ_SUBSCRIBE\fP" +Applicable only to ZMQ_SUB socket type. It establishes new message filter. +When ZMQ_SUB socket is created all the incoming messages are filtered out. +This option allows you to subscribe for all messages ("*"), messages with +specific topic ("x.y.z") and/or messages with specific topic prefix +("x.y.*"). Topic is one-byte-size-prefixed string located at +the very beginning of the message. Multiple filters can be attached to +a single 'sub' socket. In that case message passes if it matches at least +one of the filters. +Type: string Unit: N/A Default: N/A + +.IP "\fBZMQ_UNSUBSCRIBE\fP" +Applicable only to ZMQ_SUB socket type. Removes existing message filter. +The filter specified must match the string passed to ZMQ_SUBSCRIBE options +exactly. If there were several instances of the same filter created, +this options removes only one of them, leaving the rest in place +and functional. +Type: string Unit: N/A Default: N/A + +.IP "\fBZMQ_RATE\fP" +This option applies only to sending side of multicast transports (pgm & udp). +It specifies maximal outgoing data rate that an individual sender socket +can send. +Type: uint64_t Unit: kilobits/second Default: 100 + +.IP "\fBZMQ_RECOVERY_IVL\fP" +This option applies only to multicast transports (pgm & udp). It specifies +how long can the receiver socket survive when the sender is inaccessible. +Keep in mind that large recovery intervals at high data rates result in +very large recovery buffers, meaning that you can easily overload your box +by setting say 1 minute recovery interval at 1Gb/s rate (requires +7GB in-memory buffer). +Type: uint64_t Unit: seconds Default: 10 + +.IP "\fBZMQ_MCAST_LOOP\fP" +This option applies only to multicast transports (pgm & udp). Value of 1 +means that the mutlicast packets can be received on the box they were sent +from. Setting the value to 0 disables the loopback functionality which +can have negative impact on the performance. If possible, disable +the loopback in production environments. +Type: uint64_t Unit: N/A (boolean value) Default: 1 + .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBEINVAL\fP" +unknown option, a value with incorrect length or invalid value. .SH EXAMPLE +.nf +int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "*", 1); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_socket (3) +.BR zmq (7) + .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3 index f404b6b..8b819b5 100644 --- a/man/man3/zmq_socket.3 +++ b/man/man3/zmq_socket.3 @@ -4,9 +4,65 @@ zmq_socket \- creates 0MQ socket .SH SYNOPSIS .B void *zmq_socket (void *context, int type); .SH DESCRIPTION +Open a socket within the specified +.IR context . +To create a context, use +.IR zmq_init +function. +.IR type +argument can be one of the values defined below. Note that each socket is owned +by exactly one thread (the one that it was created from) and should not be used +from any other thread. + +.IP "\fBZMQ_P2P\fP" +Socket to communicate with a single peer. Allows for only a single connect or a +single bind. There's no message routing or message filtering involved. + +.IP "\fBZMQ_PUB\fP" +Socket to distribute data. Recv fuction is not implemented for this socket +type. Messages are distributed in fanout fashion to all the peers. + +.IP "\fBZMQ_SUB\fP" +Socket to subscribe for data. Send function is not implemented for this +socket type. Initially, socket is subscribed for no messages. Use ZMQ_SUBSCRIBE +option to specify which messages to subscribe for. + +.IP "\fBZMQ_REQ\fP" +Socket to send requests and receive replies. Requests are load-balanced among +all the peers. This socket type allows only an alternated sequence of +send's and recv's. + +.IP "\fBZMQ_REP\fP" +Socket to receive requests and send replies. This socket type allows +only an alternated sequence of recv's and send's. Each send is routed to +the peer that issued the last received request. + .SH RETURN VALUE +Function returns socket handle is successful. Otherwise it returns NULL and +sets errno to one of the values below. .SH ERRORS +.IP "\fBEINVAL\fP" +invalid socket type. +.IP "\fBEMTHREAD\fP" +the number of application threads allowed to own 0MQ sockets was exceeded. See +.IR app_threads +parameter to +.IR zmq_init +function. .SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_PUB); +assert (s); +int rc = zmq_bind (s, "tcp://192.168.0.1:5555"); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_init (3) +.BR zmq_setsockopt (3) +.BR zmq_bind (3) +.BR zmq_connect (3) +.BR zmq_send (3) +.BR zmq_flush (3) +.BR zmq_recv (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_term.3 b/man/man3/zmq_term.3 index 14d9da9..8d822b6 100644 --- a/man/man3/zmq_term.3 +++ b/man/man3/zmq_term.3 @@ -19,5 +19,7 @@ int rc = zmq_term (context); assert (rc == 0); .fi .SH SEE ALSO +.BR zmq_init (3) +.BR zmq_close (3) .SH AUTHOR Martin Sustrik -- cgit v1.2.3 From 5cd98bc575517ea72c435770a5313711484f7d34 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 23 Nov 2009 09:22:25 +0100 Subject: the rest of man3 man pages filled in --- man/man3/zmq_msg_close.3 | 20 +++++++++++++++++ man/man3/zmq_msg_copy.3 | 31 ++++++++++++++++++++++++++ man/man3/zmq_msg_data.3 | 15 +++++++++++++ man/man3/zmq_msg_init.3 | 21 ++++++++++++++++++ man/man3/zmq_msg_init_data.3 | 33 +++++++++++++++++++++++++++ man/man3/zmq_msg_init_size.3 | 32 ++++++++++++++++++++++++++ man/man3/zmq_msg_move.3 | 26 ++++++++++++++++++++++ man/man3/zmq_msg_size.3 | 18 +++++++++++++++ man/man3/zmq_poll.3 | 53 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 249 insertions(+) diff --git a/man/man3/zmq_msg_close.3 b/man/man3/zmq_msg_close.3 index ee9d009..6613360 100644 --- a/man/man3/zmq_msg_close.3 +++ b/man/man3/zmq_msg_close.3 @@ -4,9 +4,29 @@ zmq_msg_close \- destroys 0MQ message .SH SYNOPSIS .B int zmq_msg_close (zmq_msg_t *msg); .SH DESCRIPTION +Deallocates message +.IR msg +including any associated buffers (unless the buffer is +shared with another message). Not calling this function can result in +memory leaks. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 1000000); +assert (rc = 0); +rc = zmq_msg_close (&msg); +assert (rc = 0); +.fi .SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_copy.3 b/man/man3/zmq_msg_copy.3 index 239203b..2f70400 100644 --- a/man/man3/zmq_msg_copy.3 +++ b/man/man3/zmq_msg_copy.3 @@ -4,9 +4,40 @@ zmq_msg_copy \- copies content of a message to another message .SH SYNOPSIS .B int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); .SH DESCRIPTION +Copy the +.IR src +message to +.IR dest . +The original content of +.IR dest +is orderly deallocated. +Caution: The implementation may choose not to physically copy the data, rather +to share the buffer between two messages. Thus avoid modifying message data +after the message was copied. Doing so can modify multiple message instances. +If what you need is actual hard copy, allocate new message using +.IR zmq_msg_size +and copy the data using +.IR memcpy . .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t dest; +rc = zmq_msg_init (&dest); +assert (rc == 0); +rc = zmq_msg_copy (&dest, &src); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_msg_move (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_data.3 b/man/man3/zmq_msg_data.3 index 647e9e9..9876378 100644 --- a/man/man3/zmq_msg_data.3 +++ b/man/man3/zmq_msg_data.3 @@ -4,9 +4,24 @@ zmq_msg_data \- retrieves pointer to the message content .SH SYNOPSIS .B void *zmq_msg_data (zmq_msg_t *msg); .SH DESCRIPTION +Returns pointer to message data. Always use this function to access the data, +never use +.IR zmq_msg_t +members directly. .SH RETURN VALUE +Pointer to the message data. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 100); +memset (zmq_msg_data (&msg), 0, 100); +.fi .SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_init.3 b/man/man3/zmq_msg_init.3 index 8b18207..a531fc1 100644 --- a/man/man3/zmq_msg_init.3 +++ b/man/man3/zmq_msg_init.3 @@ -4,9 +4,30 @@ zmq_msg_init \- initialises empty 0MQ message .SH SYNOPSIS .B int zmq_msg_init (zmq_msg_t *msg); .SH DESCRIPTION +Initialises 0MQ message zero bytes long. The function is most useful +to initialise a +.IR zmq_msg_t +structure before receiving a message. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_init_data.3 b/man/man3/zmq_msg_init_data.3 index d00ed9c..a0b14c8 100644 --- a/man/man3/zmq_msg_init_data.3 +++ b/man/man3/zmq_msg_init_data.3 @@ -7,9 +7,42 @@ zmq_msg_init \- initialises 0MQ message from the given data .B int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); .fi .SH DESCRIPTION +Initialise a message from a supplied buffer. Message isn't copied, +instead 0MQ infrastructure takes ownership of the buffer located at address +.IR data , +.IR size +bytes long. +Deallocation function ( +.IR ffn +) will be called once the data are not needed anymore. Note that deallocation +function prototype is designed so that it complies with standard C +.IR free +function. When using a static constant buffer, +.IR ffn +may be NULL to prevent subsequent deallocation. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +void *data = malloc (6); +assert (data); +memcpy (data, "ABCDEF", 6); +zmq_msg_t msg; +rc = zmq_msg_init_data (&msg, data, 6, free); +assert (rc == 0); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_init_size.3 b/man/man3/zmq_msg_init_size.3 index b1baa75..ce1ec94 100644 --- a/man/man3/zmq_msg_init_size.3 +++ b/man/man3/zmq_msg_init_size.3 @@ -4,9 +4,41 @@ zmq_msg_init \- initialises 0MQ message of a specified size .SH SYNOPSIS .B int zmq_msg_init_size (zmq_msg_t *msg, size_t size); .SH DESCRIPTION +Initialises 0MQ message +.IR size +bytes long. The implementation chooses whether it is more efficient to store +message content on the stack (small messages) or on the heap (large messages). +Therefore, never access message data directly via +.IR zmq_msg_t +members, rather use +.IR zmq_msg_data +and +.IR zmq_msg_size +functions to get message data and size. Note that the message data are not +nullified to avoid the associated performance impact. Thus you +should expect your message to contain bogus data after this call. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +.IP "\fBENOMEM\fP" +memory to hold the message cannot be allocated. .SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 6); +assert (rc == 0); +memcpy (zmq_msg_data (&msg), "ABCDEF", 6); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_move.3 b/man/man3/zmq_msg_move.3 index 0daf2f7..810e105 100644 --- a/man/man3/zmq_msg_move.3 +++ b/man/man3/zmq_msg_move.3 @@ -4,9 +4,35 @@ zmq_msg_move \- moves content of a message to another message .SH SYNOPSIS .B int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); .SH DESCRIPTION +Move the content of the message from +.IR src +to +.IR dest . +The content isn't copied, just moved. +.IR src +becomes an empty message after the call. Original content of +.IR dest +message is deallocated. .SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t dest; +rc = zmq_msg_init (&dest); +assert (rc == 0); +rc = zmq_msg_move (&dest, &src); +assert (rc == 0); +.fi .SH SEE ALSO +.BR zmq_msg_copy (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_msg_size.3 b/man/man3/zmq_msg_size.3 index d8bbc53..b51d582 100644 --- a/man/man3/zmq_msg_size.3 +++ b/man/man3/zmq_msg_size.3 @@ -4,9 +4,27 @@ zmq_msg_size \- retrieves size of the message content .SH SYNOPSIS .B size_t zmq_msg_size (zmq_msg_t *msg); .SH DESCRIPTION +Returns size of the message data. Always use this function to get the size, +never use +.IR zmq_msg_t +members directly. .SH RETURN VALUE +Size of the message data (bytes). .SH ERRORS +No errors are defined. .SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +size_t msg_size = zmq_msg_size (&msg); +.fi .SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) .SH AUTHOR Martin Sustrik diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3 index d821e9f..5ce5055 100644 --- a/man/man3/zmq_poll.3 +++ b/man/man3/zmq_poll.3 @@ -4,9 +4,62 @@ zmq_poll \- polls for events on a set of 0MQ and POSIX sockets .SH SYNOPSIS .B int zmq_poll (zmq_pollitem_t *items, int nitems); .SH DESCRIPTION +Waits for the events specified by +.IR items +parameter. Number of items in the array is determined by +.IR nitems +argument. Each item in the array looks like this: + +.nf +typedef struct +{ + void *socket; + int fd; + short events; + short revents; +} zmq_pollitem_t; +.fi + +0MQ socket to poll on is specified by +.IR socket . +In case you want to poll on standard POSIX socket, set +.IR socket +to NULL and fill the POSIX file descriptor to +.IR fd . +.IR events +specifies which events to wait for. It's a combination of the values below. +Once the call exits, +.IR revent +will be filled with events that have actually occured on the socket. The field +will contain a combination of the following values. + +.IP "\fBZMQ_POLLIN\fP" +poll for incoming messages. +.IP "\fBZMQ_POLLOUT\fP" +wait while message can be set socket. Poll will return if a message of at least +one byte can be written to the socket. However, there is no guarantee that +arbitrarily large message can be sent. + .SH RETURN VALUE +Function returns number of items signaled or -1 in the case of error. .SH ERRORS +.IP "\fBEFAULT\fP" +there's a 0MQ socket in the pollset belonging to a different application thread. +.IP "\fBENOTSUP\fP" +0MQ context was initialised without ZMQ_POLL flag. I/O multiplexing is disabled. .SH EXAMPLE +.nf +zmq_pollitem_t items [2]; +items [0].socket = s; +items [0].events = POLLIN; +items [1].socket = NULL; +items [1].fd = my_fd; +items [1].events = POLLIN; + +int rc = zmq_poll (items, 2); +assert (rc != -1); +.fi .SH SEE ALSO +.BR zmq_socket (3) .SH AUTHOR Martin Sustrik -- cgit v1.2.3 From c98fd6bc3f2a49d7cb0b820a07354168c98f60b7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 24 Nov 2009 11:23:10 +0100 Subject: ZMQII-25: Implement streamed request/reply --- bindings/c/zmq.h | 6 ++ bindings/java/org/zmq/Socket.java | 2 + bindings/python/pyzmq.cpp | 6 ++ bindings/ruby/rbzmq.cpp | 2 + configure.in | 15 +++- devices/Makefile.am | 8 +- devices/zmq_forwarder/zmq_forwarder.cpp | 7 +- devices/zmq_streamer/Makefile.am | 9 ++ devices/zmq_streamer/zmq_streamer.cpp | 122 +++++++++++++++++++++++++++ man/man3/zmq_socket.3 | 9 ++ src/Makefile.am | 4 + src/app_thread.cpp | 14 +++- src/downstream.cpp | 131 +++++++++++++++++++++++++++++ src/downstream.hpp | 64 ++++++++++++++ src/p2p.hpp | 4 +- src/pub.hpp | 4 +- src/rep.cpp | 2 +- src/rep.hpp | 4 +- src/req.hpp | 4 +- src/sub.hpp | 4 +- src/upstream.cpp | 143 ++++++++++++++++++++++++++++++++ src/upstream.hpp | 69 +++++++++++++++ 22 files changed, 612 insertions(+), 21 deletions(-) create mode 100644 devices/zmq_streamer/Makefile.am create mode 100644 devices/zmq_streamer/zmq_streamer.cpp create mode 100644 src/downstream.cpp create mode 100644 src/downstream.hpp create mode 100644 src/upstream.cpp create mode 100644 src/upstream.hpp diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 9b11a1d..a65926e 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -188,6 +188,12 @@ ZMQ_EXPORT int zmq_term (void *context); // the peer that issued the last received request. #define ZMQ_REP 4 +// Socket to receive messages from up the stream. +#define ZMQ_UPSTREAM 5 + +// Socket to send messages downstream. +#define ZMQ_DOWNSTREAM 6 + // Open a socket. 'type' is one of the socket types defined above. // // Errors: EINVAL - invalid socket type. diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 501bc16..396a6a0 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -34,6 +34,8 @@ public class Socket public static final int SUB = 2; public static final int REQ = 3; public static final int REP = 4; + public static final int UPSTREAM = 4; + public static final int DOWNSTREAM = 4; public static final int HWM = 1; public static final int LWM = 2; diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index b180bcd..26ca7ac 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_REP); PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_UPSTREAM); + PyDict_SetItemString (dict, "UPSTREAM", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_DOWNSTREAM); + PyDict_SetItemString (dict, "DOWNSTREAM", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_HWM); PyDict_SetItemString (dict, "HWM", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 6112972..2a26ce1 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -275,6 +275,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); + rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL)); } diff --git a/configure.in b/configure.in index c2cf678..bd3a0f4 100644 --- a/configure.in +++ b/configure.in @@ -590,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then forwarder="yes" fi +# streamer device +streamer="no" +AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer], + [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no]) + +if test "x$with_streamer" != "xno"; then + streamer="yes" +fi # Perf perf="no" @@ -618,7 +626,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") -AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") @@ -641,7 +650,8 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ - devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile) + devices/Makefile devices/zmq_forwarder/Makefile \ + devices/zmq_streamer/Makefile bindings/Makefile) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) @@ -676,6 +686,7 @@ AC_MSG_RESULT([ PGM: no]) fi AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ forwarder: $forwarder]) +AC_MSG_RESULT([ streamer: $streamer]) AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) diff --git a/devices/Makefile.am b/devices/Makefile.am index 4cbad14..ab18976 100644 --- a/devices/Makefile.am +++ b/devices/Makefile.am @@ -2,5 +2,9 @@ if BUILD_FORWARDER FORWARDER_DIR = zmq_forwarder endif -SUBDIRS = $(FORWARDER_DIR) -DIST_SUBDIRS = zmq_forwarder +if BUILD_STREAMER +STREAMER_DIR = zmq_streamer +endif + +SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) +DIST_SUBDIRS = zmq_forwarder zmq_streamer diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 32af5dd..d29ed62 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -23,7 +23,7 @@ int main (int argc, char *argv []) { if (argc != 2) { - fprintf (stderr, "usage: forwarder \n"); + fprintf (stderr, "usage: zmq_forwarder \n"); return 1; } @@ -53,8 +53,9 @@ int main (int argc, char *argv []) // TODO: make the number of I/O threads configurable. zmq::context_t ctx (1, 1); - zmq::socket_t in_socket (ctx, ZMQ_P2P); - zmq::socket_t out_socket (ctx, ZMQ_P2P); + zmq::socket_t in_socket (ctx, ZMQ_SUB); + in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1); + zmq::socket_t out_socket (ctx, ZMQ_PUB); int n = 0; while (true) { diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am new file mode 100644 index 0000000..e3681bf --- /dev/null +++ b/devices/zmq_streamer/Makefile.am @@ -0,0 +1,9 @@ +INCLUDES = -I$(top_builddir)/bindings/c + +bin_PROGRAMS = zmq_streamer + +zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la +zmq_streamer_SOURCES = zmq_streamer.cpp +zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror + + diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp new file mode 100644 index 0000000..84e6569 --- /dev/null +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../../bindings/cpp/zmq.hpp" +#include "../../foreign/xmlParser/xmlParser.cpp" + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmq_streamer \n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found\n"); + return 1; + } + + if (strcmp (root.getName (), "streamer") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'streamer'\n"); + return 1; + } + + XMLNode in_node = root.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return 1; + } + + XMLNode out_node = root.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return 1; + } + + // TODO: make the number of I/O threads configurable. + zmq::context_t ctx (1, 1); + zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM); + zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM); + + int n = 0; + while (true) { + XMLNode bind = in_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = in_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.connect (addr); + n++; + } + + n = 0; + while (true) { + XMLNode bind = out_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = out_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.connect (addr); + n++; + } + + zmq::message_t msg; + while (true) { + in_socket.recv (&msg); + out_socket.send (msg); + } + + return 0; +} diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3 index 8b819b5..a73bba5 100644 --- a/man/man3/zmq_socket.3 +++ b/man/man3/zmq_socket.3 @@ -37,6 +37,15 @@ Socket to receive requests and send replies. This socket type allows only an alternated sequence of recv's and send's. Each send is routed to the peer that issued the last received request. +.IP "\fBZMQ_UPSTREAM\fP" +Socket to receive messages from up the stream. Messages are fair-queued +from among all the connected peers. Send function is not implemented for +this socket type. + +.IP "\fBZMQ_DOWNSTREAM\fP" +Socket to send messages down stream. Messages are load-balanced among all the +connected peers. Send function is not implemented for this socket type. + .SH RETURN VALUE Function returns socket handle is successful. Otherwise it returns NULL and sets errno to one of the values below. diff --git a/src/Makefile.am b/src/Makefile.am index 91fb555..3d038b7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ decoder.hpp \ devpoll.hpp \ dispatcher.hpp \ + downstream.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + upstream.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ + downstream.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ @@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + upstream.cpp \ uuid.cpp \ ypollset.cpp \ zmq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbda335..a671822 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -40,11 +40,13 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "p2p.hpp" #include "pub.hpp" #include "sub.hpp" #include "req.hpp" #include "rep.hpp" -#include "p2p.hpp" +#include "upstream.hpp" +#include "downstream.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_P2P: + s = new p2p_t (this); + break; case ZMQ_PUB: s = new pub_t (this); break; @@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; - case ZMQ_P2P: - s = new p2p_t (this); + case ZMQ_UPSTREAM: + s = new upstream_t (this); + break; + case ZMQ_DOWNSTREAM: + s = new downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/downstream.cpp b/src/downstream.cpp new file mode 100644 index 0000000..4f994e6 --- /dev/null +++ b/src/downstream.cpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "downstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::downstream_t::downstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + current (0) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::downstream_t::~downstream_t () +{ +} + +void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_ && outpipe_); + pipes.push_back (outpipe_); +} + +void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + pipes.erase (pipes.index (pipe_)); +} + +void zmq::downstream_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= pipes.size ()) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::downstream_t::xflush () +{ + // TODO: Maybe there's a point in flushing messages downstream. + // It may be useful in the case where number of messages in a single + // transaction is much greater than the number of attached pipes. + errno = ENOTSUP; + return -1; + +} + +int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::downstream_t::xhas_in () +{ + return false; +} + +bool zmq::downstream_t::xhas_out () +{ + // TODO: Modify this code once pipe limits are in place. + return true; +} + + diff --git a/src/downstream.hpp b/src/downstream.hpp new file mode 100644 index 0000000..c6a7ed8 --- /dev/null +++ b/src/downstream.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class downstream_t : public socket_base_t + { + public: + + downstream_t (class app_thread_t *parent_); + ~downstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // List of outbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + downstream_t (const downstream_t&); + void operator = (const downstream_t&); + }; + +} + +#endif diff --git a/src/p2p.hpp b/src/p2p.hpp index 1fd7e34..32d7755 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_P2P_INCLUDED__ -#define __ZMQ_P2P_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" diff --git a/src/pub.hpp b/src/pub.hpp index b3e868d..9dbcb4a 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_PUB_INCLUDED__ -#define __ZMQ_PUB_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/rep.cpp b/src/rep.cpp index 7599cb5..f06f4ab 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) } // Now both inpipe and outpipe are detached. Remove them from the lists. - if (in_pipes.index (pipe_) < active) + if (index < active) active--; in_pipes.erase (index); out_pipes.erase (index); diff --git a/src/rep.hpp b/src/rep.hpp index 3e87dc1..0b327aa 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REP_INCLUDED__ -#define __ZMQ_REP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/req.hpp b/src/req.hpp index 86554b5..756cc42 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REQ_INCLUDED__ -#define __ZMQ_REQ_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/sub.hpp b/src/sub.hpp index fb881dc..8ad8a18 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_SUB_INCLUDED__ -#define __ZMQ_SUB_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include #include diff --git a/src/upstream.cpp b/src/upstream.cpp new file mode 100644 index 0000000..da202f8 --- /dev/null +++ b/src/upstream.cpp @@ -0,0 +1,143 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "upstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::upstream_t::upstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + active (0), + current (0) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::upstream_t::~upstream_t () +{ +} + +void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && !outpipe_); + + pipes.push_back (inpipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + zmq_assert (pipe_); + pipes_t::size_type index = pipes.index (pipe_); + if (index < active) + active--; + pipes.erase (index); +} + +void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::upstream_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::upstream_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xflush () +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::upstream_t::xhas_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::upstream_t::xhas_out () +{ + return false; +} + diff --git a/src/upstream.hpp b/src/upstream.hpp new file mode 100644 index 0000000..0e2f5ad --- /dev/null +++ b/src/upstream.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class upstream_t : public socket_base_t + { + public: + + upstream_t (class app_thread_t *parent_); + ~upstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Inbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + upstream_t (const upstream_t&); + void operator = (const upstream_t&); + + }; + +} + +#endif -- cgit v1.2.3 From 92aa9e94e21b652839faa3dda27c67571bad315d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 25 Nov 2009 08:55:03 +0100 Subject: experimental code to use futexes instead of mutexes added to simple_semapthore_t --- src/simple_semaphore.hpp | 60 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp index 209ccb4..3342281 100644 --- a/src/simple_semaphore.hpp +++ b/src/simple_semaphore.hpp @@ -23,7 +23,11 @@ #include "platform.hpp" #include "err.hpp" -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX +#include +#include +#include +#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #include #elif defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -33,13 +37,63 @@ namespace zmq { - // Simple semaphore. Only single thread may be waiting at any given time. // Also, the semaphore may not be posted before the previous post // was matched by corresponding wait and the waiting thread was // released. -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX + + // In theory, using private futexes should be more efficient on Linux + // platform than using mutexes. However, in uncontended cases of TCP + // transport on loopback interface we haven't seen any latency improvement. + // The code is commented out waiting for more thorough testing. + + class simple_semaphore_t + { + public: + + // Initialise the semaphore. + inline simple_semaphore_t () : + dummy (0) + { + } + + // Destroy the semaphore. + inline ~simple_semaphore_t () + { + } + + // Wait for the semaphore. + inline void wait () + { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE, + (int) 0, NULL, NULL, (int) 0); + zmq_assert (rc == 0); + } + + // Post the semaphore. + inline void post () + { + while (true) { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE, + (int) 1, NULL, NULL, (int) 0); + zmq_assert (rc != -1 && rc <= 1); + if (rc == 1) + break; + } + } + + private: + + int dummy; + + // Disable copying of the object. + simple_semaphore_t (const simple_semaphore_t&); + void operator = (const simple_semaphore_t&); + }; + +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS // On platforms that allow for double locking of a mutex from the same // thread, simple semaphore is implemented using mutex, as it is more -- cgit v1.2.3 From 8d85638f77ec0aa886170ba6bb49763ef165393b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 26 Nov 2009 12:01:26 +0100 Subject: memory leak in message encoder fixed --- AUTHORS | 1 + perf/c/local_thr.c | 3 +++ src/session.cpp | 4 ---- src/zmq_decoder.cpp | 8 +++++++- src/zmq_encoder.cpp | 7 ++++++- src/zmq_listener_init.cpp | 1 - 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/AUTHORS b/AUTHORS index 13f0076..b37bffa 100644 --- a/AUTHORS +++ b/AUTHORS @@ -9,6 +9,7 @@ Dirk O. Kaar Erich Heine Frank Denis George Neill +Jon Dyte Martin Hurton Martin Lucina Martin Sustrik diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index c97af11..f785e80 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -79,6 +79,9 @@ int main (int argc, char *argv []) if (elapsed == 0) elapsed = 1; + rc = zmq_msg_close (&msg); + assert (rc == 0); + throughput = (unsigned long) ((double) message_count / (double) elapsed * 1000000); megabits = (double) (throughput * message_size * 8) / 1000000; diff --git a/src/session.cpp b/src/session.cpp index f62de27..87b47b0 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - // The communication is unidirectional. - // We don't expect any message to arrive. - zmq_assert (out_pipe); - if (out_pipe->write (msg_)) { zmq_msg_init (msg_); return true; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 53811a1..8040f21 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () else { // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, *tmpbuf); errno_assert (rc == 0); @@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () size_t size = (size_t) get_uint64 (tmpbuf); // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, size); errno_assert (rc == 0); @@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () bool zmq::zmq_decoder_t::message_ready () { // Message is completely read. Push it further and start reading - // new message. + // new message. (in_progress is a 0-byte message after this point.) if (!destination || !destination->write (&in_progress)) return false; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 44b919b..180bda7 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::message_ready () { + // Destroy content of the old message. + zmq_msg_close(&in_progress); + // Read new message from the dispatcher. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!source || !source->read (&in_progress)) + if (!source || !source->read (&in_progress)) { + zmq_msg_init (&in_progress); return false; + } size_t size = zmq_msg_size (&in_progress); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index eec41c7..0d9488d 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) has_peer_identity = true; peer_identity.assign ((const char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); - return true; } -- cgit v1.2.3 From 19ce7c0e77f703ed2ec3b54685ddf4a6f2329ffe Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 26 Nov 2009 12:41:50 +0100 Subject: zmq_msg_data.3 referenced twice in Makefile.am - fixed --- AUTHORS | 1 + man/Makefile.am | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/AUTHORS b/AUTHORS index b37bffa..5e09bd3 100644 --- a/AUTHORS +++ b/AUTHORS @@ -37,3 +37,4 @@ Matt Muggeridge Paulo Henrique Silva Peter Lemenkov Robert Zhang +Vitaly Mayatskikh diff --git a/man/Makefile.am b/man/Makefile.am index 2efb80d..4f8c05a 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -2,8 +2,8 @@ dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \ man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \ man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \ - man3/zmq_msg_data.3 man3/zmq_msg_close.3 man3/zmq_msg_move.3 \ - man3/zmq_msg_copy.3 man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ + man3/zmq_msg_close.3 man3/zmq_msg_move.3 man3/zmq_msg_copy.3 \ + man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ man3/zmq_strerror.3 man7/zmq.7 distclean-local: -- cgit v1.2.3 From fa1641afc593be5926e558381861112b584e861a Mon Sep 17 00:00:00 2001 From: malosek Date: Fri, 27 Nov 2009 14:30:48 +0100 Subject: msvc build fixed --- bindings/c/zmq.h | 3 +++ builds/msvc/libzmq/libzmq.vcproj | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index a65926e..ff94dd4 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -63,6 +63,9 @@ extern "C" { #ifndef EADDRNOTAVAIL #define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6) #endif +#ifndef ECONNREFUSED +#define ECONNREFUSED (ZMQ_HAUSNUMERO + 7) +#endif // Native 0MQ error codes. #define EMTHREAD (ZMQ_HAUSNUMERO + 50) diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index 8927fa2..e575d67 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -181,6 +181,10 @@ RelativePath="..\..\..\src\dispatcher.cpp" > + + @@ -289,6 +293,10 @@ RelativePath="..\..\..\src\thread.cpp" > + + @@ -375,6 +383,10 @@ RelativePath="..\..\..\src\encoder.hpp" > + + @@ -531,6 +543,10 @@ RelativePath="..\..\..\src\thread.hpp" > + + -- cgit v1.2.3