diff options
Diffstat (limited to 'src')
51 files changed, 1441 insertions, 994 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 937372f..d7509dc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,6 +55,7 @@ libzmq_la_SOURCES = \ blob.hpp \ command.hpp \ config.hpp \ + connect_session.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -70,15 +71,17 @@ libzmq_la_SOURCES = \ ip.hpp \ i_engine.hpp \ i_poll_events.hpp \ + i_terminate_events.hpp \ kqueue.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ msg_store.hpp \ mutex.hpp \ + named_session.hpp \ object.hpp \ options.hpp \ - owned.hpp \ + own.hpp \ pgm_receiver.hpp \ pgm_sender.hpp \ pgm_socket.hpp \ @@ -106,6 +109,7 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + transient_session.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -123,6 +127,7 @@ libzmq_la_SOURCES = \ zmq_listener.hpp \ command.cpp \ ctx.cpp \ + connect_session.cpp \ devpoll.cpp \ epoll.cpp \ err.cpp \ @@ -134,9 +139,10 @@ libzmq_la_SOURCES = \ kqueue.cpp \ lb.cpp \ msg_store.cpp \ + named_session.cpp \ object.cpp \ options.cpp \ - owned.cpp \ + own.cpp \ pair.cpp \ pgm_receiver.cpp \ pgm_sender.cpp \ @@ -160,6 +166,7 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + transient_session.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/command.hpp b/src/command.hpp index 3d00cd7..a924b4e 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -61,7 +61,7 @@ namespace zmq // Sent to socket to let it know about the newly created object. struct { - class owned_t *object; + class own_t *object; } own; // Attach the engine to the session. @@ -104,7 +104,7 @@ namespace zmq // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { - class owned_t *object; + class own_t *object; } term_req; // Sent by socket to I/O object to start its shutdown. diff --git a/src/connect_session.cpp b/src/connect_session.cpp new file mode 100644 index 0000000..5c088f6 --- /dev/null +++ b/src/connect_session.cpp @@ -0,0 +1,115 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "connect_session.hpp" +#include "zmq_connecter.hpp" + +zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_t (io_thread_, socket_, options_), + protocol (protocol_), + address (address_) +{ +} + +zmq::connect_session_t::~connect_session_t () +{ +} + +void zmq::connect_session_t::process_plug () +{ + // Start connection process immediately. + start_connecting (); +} + +void zmq::connect_session_t::start_connecting () +{ + // Create the connecter object. + + // Both TCP and IPC transports are using the same infrastructure. + if (protocol == "tcp" || protocol == "ipc") { + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( + choose_io_thread (options.affinity), this, options, + protocol.c_str (), address.c_str ()); + zmq_assert (connecter); + launch_child (connecter); + return; + } + +#if defined ZMQ_HAVE_OPENPGM + + // Both PGM and EPGM transports are using the same infrastructure. + if (addr_type == "pgm" || addr_type == "epgm") { + + // For EPGM transport with UDP encapsulation of PGM is used. + bool udp_encapsulation = (addr_type == "epgm"); + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.requires_out) { + + // PGM sender. + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + choose_io_thread (options.affinity), options); + zmq_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); + if (rc != 0) { + delete pgm_sender; + return -1; + } + + send_attach (this, pgm_sender, blob_t ()); + } + else if (options.requires_in) { + + // PGM receiver. + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + choose_io_thread (options.affinity), options); + zmq_assert (pgm_receiver); + + int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); + if (rc != 0) { + delete pgm_receiver; + return -1; + } + + send_attach (this, pgm_receiver, blob_t ()); + } + else + zmq_assert (false); + + return; + } +#endif + + zmq_assert (false); +} + +void zmq::connect_session_t::detach () +{ + // Clean up the mess left over by the failed connection. + clean_pipes (); + + // Reconnect. + start_connecting (); +} + diff --git a/src/connect_session.hpp b/src/connect_session.hpp new file mode 100644 index 0000000..8303dda --- /dev/null +++ b/src/connect_session.hpp @@ -0,0 +1,60 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ +#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ + +#include <string> + +#include "session.hpp" + +namespace zmq +{ + + // Connect session contains an address to connect to. On disconnect it + // attempts to reconnect. + + class connect_session_t : public session_t + { + public: + + connect_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~connect_session_t (); + + // i_inout interface implementation. + void detach (); + + private: + + // Start the connection process. + void start_connecting (); + + // Command handlers. + void process_plug (); + + // Address to connect to. + std::string protocol; + std::string address; + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 91157a5..d096b91 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -119,6 +119,7 @@ int zmq::ctx_t::term () // We don't even have to synchronise access to data. zmq_assert (sockets.empty ()); +// TODO: We are accessing the list of zombies in unsynchronised manner here! // Get rid of remaining zombie sockets. while (!zombies.empty ()) { dezombify (); @@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return s; } -void zmq::ctx_t::zombify (socket_base_t *socket_) +void zmq::ctx_t::zombify_socket (socket_base_t *socket_) { // Zombification of socket basically means that its ownership is tranferred // from the application that created it to the context. @@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) void zmq::ctx_t::dezombify () { - // Try to dezombify each zombie in the list. + // Try to dezombify each zombie in the list. Note that caller is + // responsible for calling this method in the slot_sync critical section. for (zombies_t::size_type i = 0; i != zombies.size ();) if (zombies [i]->dezombify ()) { empty_slots.push_back (zombies [i]->get_slot ()); diff --git a/src/ctx.hpp b/src/ctx.hpp index cb9a2d9..c44cca6 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -58,7 +58,7 @@ namespace zmq class socket_base_t *create_socket (int type_); // Make socket a zombie. - void zombify (socket_base_t *socket_); + void zombify_socket (socket_base_t *socket_); // Send command to the destination slot. void send_command (uint32_t slot_, const command_t &command_); @@ -22,11 +22,14 @@ #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" +#include "i_terminate_events.hpp" -zmq::fq_t::fq_t () : +zmq::fq_t::fq_t (i_terminate_events *sink_) : active (0), current (0), - more (false) + more (false), + sink (sink_), + terminating (false) { } @@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; + + // If we are already terminating, ask the pipe to terminate straight away. + if (terminating) + pipe_->terminate (); } void zmq::fq_t::terminated (reader_t *pipe_) @@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_) current = 0; } pipes.erase (pipe_); -} -bool zmq::fq_t::has_pipes () -{ - return !pipes.empty (); + if (terminating && pipes.empty ()) + sink->terminated (); } -void zmq::fq_t::term_pipes () +void zmq::fq_t::terminate () { + terminating = true; + for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } @@ -33,12 +33,11 @@ namespace zmq { public: - fq_t (); + fq_t (struct i_terminate_events *sink_); ~fq_t (); void attach (reader_t *pipe_); - bool has_pipes (); - void term_pipes (); + void terminate (); int recv (zmq_msg_t *msg_, int flags_); bool has_in (); @@ -64,6 +63,12 @@ namespace zmq // there are following parts still waiting in the current pipe. bool more; + // Object to send events to. + i_terminate_events *sink; + + // If true, termination process is already underway. + bool terminating; + fq_t (const fq_t&); void operator = (const fq_t&); }; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index ea6b850..0ba94f5 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,8 +20,6 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ -#include <stddef.h> - namespace zmq { @@ -30,18 +28,19 @@ namespace zmq virtual ~i_engine () {} // Plug the engine to the session. - virtual void plug (struct i_inout *inout_) = 0; + virtual void plug (class io_thread_t *io_thread_, + struct i_inout *inout_) = 0; // Unplug the engine from the session. virtual void unplug () = 0; - // This method is called by the session to signalise that there - // are messages to send available. - virtual void revive () = 0; - // This method is called by the session to signalise that more // messages can be written to the pipe. - virtual void resume_input () = 0; + virtual void activate_in () = 0; + + // This method is called by the session to signalise that there + // are messages to send available. + virtual void activate_out () = 0; }; } diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 21d1838..60bc518 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -31,28 +31,17 @@ namespace zmq { virtual ~i_inout () {} - // Engine asks to get a message to send to the network. + // Engine asks for a message to send to the network. virtual bool read (::zmq_msg_t *msg_) = 0; - // Engine sends the incoming message further on downstream. + // Engine received message from the network and sends it further on. virtual bool write (::zmq_msg_t *msg_) = 0; - // Flush all the previously written messages downstream. + // Flush all the previously written messages. virtual void flush () = 0; - - // Drop all the references to the engine. The parameter is the object - // to use to reconnect. If reconnection is not required, the argument - // is set to NULL. - virtual void detach (class owned_t *reconnecter_) = 0; - // Returns least loaded I/O thread. - virtual class io_thread_t *get_io_thread () = 0; - - // Return pointer to the owning socket. - virtual class socket_base_t *get_owner () = 0; - - // Return ordinal number of the session. - virtual uint64_t get_ordinal () = 0; + // Engine is dead. Drop all the references to it. + virtual void detach () = 0; }; } diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp new file mode 100644 index 0000000..08599ff --- /dev/null +++ b/src/i_terminate_events.hpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ +#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ + +namespace zmq +{ + + // Algorithms such as fair queueing (fq_t) and load balancing (lb_t) + // use this interface to communicate termination event to the socket. + + struct i_terminate_events + { + virtual ~i_terminate_events () {} + + virtual void terminated () = 0; + }; + +} + +#endif diff --git a/src/io_object.cpp b/src/io_object.cpp index 086f173..b3b45ee 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -21,21 +21,35 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::io_object_t::io_object_t (io_thread_t *io_thread_) +zmq::io_object_t::io_object_t (io_thread_t *io_thread_) : + poller (NULL) { - // Retrieve the poller from the thread we are running in. - poller = io_thread_->get_poller (); + if (io_thread_) + plug (io_thread_); } zmq::io_object_t::~io_object_t () { } -void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) +void zmq::io_object_t::plug (io_thread_t *io_thread_) { + zmq_assert (io_thread_); + zmq_assert (!poller); + + // Retrieve the poller from the thread we are running in. poller = io_thread_->get_poller (); } +void zmq::io_object_t::unplug () +{ + zmq_assert (poller); + + // Forget about old poller in preparation to be migrated + // to a different I/O thread. + poller = NULL; +} + zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) { return poller->add_fd (fd_, this); diff --git a/src/io_object.hpp b/src/io_object.hpp index 655e7f5..284e6d1 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -40,15 +40,15 @@ namespace zmq io_object_t (class io_thread_t *io_thread_ = NULL); ~io_object_t (); + // When migrating an object from one I/O thread to another, first + // unplug it, then migrate it, then plug it to the new thread. + void plug (class io_thread_t *io_thread_); + void unplug (); + protected: typedef poller_t::handle_t handle_t; - // Derived class can init/swap the underlying I/O thread. - // Caution: Remove all the file descriptors from the old I/O thread - // before swapping to the new one! - void set_io_thread (class io_thread_t *io_thread_); - // Methods to access underlying poller object. handle_t add_fd (fd_t fd_); void rm_fd (handle_t handle_); @@ -22,11 +22,14 @@ #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" +#include "i_terminate_events.hpp" -zmq::lb_t::lb_t () : +zmq::lb_t::lb_t (i_terminate_events *sink_) : active (0), current (0), - more (false) + more (false), + sink (sink_), + terminating (false) { } @@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; + + if (terminating) + pipe_->terminate (); } -void zmq::lb_t::term_pipes () +void zmq::lb_t::terminate () { + terminating = true; + for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } void zmq::lb_t::terminated (writer_t *pipe_) { - // ??? + // TODO: ??? zmq_assert (!more || pipes [current] != pipe_); // Remove the pipe from the list; adjust number of active pipes @@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_) current = 0; } pipes.erase (pipe_); -} -bool zmq::lb_t::has_pipes () -{ - return !pipes.empty (); + if (terminating && pipes.empty ()) + sink->terminated (); } void zmq::lb_t::activated (writer_t *pipe_) @@ -32,12 +32,11 @@ namespace zmq { public: - lb_t (); + lb_t (struct i_terminate_events *sink_); ~lb_t (); void attach (writer_t *pipe_); - void term_pipes (); - bool has_pipes (); + void terminate (); int send (zmq_msg_t *msg_, int flags_); bool has_out (); @@ -61,6 +60,12 @@ namespace zmq // True if last we are in the middle of a multipart message. bool more; + // Object to send events to. + struct i_terminate_events *sink; + + // If true, termination process is already underway. + bool terminating; + lb_t (const lb_t&); void operator = (const lb_t&); }; diff --git a/src/named_session.cpp b/src/named_session.cpp new file mode 100644 index 0000000..d219286 --- /dev/null +++ b/src/named_session.cpp @@ -0,0 +1,87 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "named_session.hpp" +#include "socket_base.hpp" + +/* +zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, + socket_base_t *socket_, const options_t &options_, + const blob_t &name_) : + session_t (io_thread_, socket_, options_), + name (name_) +{ + // Make double sure that the session has valid name. + zmq_assert (!name.empty ()); + zmq_assert (name [0] != 0); + + if (!socket_->register_session (name, this)) { + + // TODO: There's already a session with the specified + // identity. We should log the error and drop the + // session. + zmq_assert (false); + } +} + +zmq::named_session_t::~named_session_t () +{ +} + +void zmq::named_session_t::detach () +{ + // TODO: + zmq_assert (false); +} + +void zmq::named_session_t::attached (const blob_t &peer_identity_) +{ + if (!peer_identity.empty ()) { + + // If both IDs are temporary, no checking is needed. + // TODO: Old ID should be reused in this case... + if (peer_identity.empty () || peer_identity [0] != 0 || + peer_identity_.empty () || peer_identity_ [0] != 0) { |