From 43e34d028115c43577713c0c3e1f0c33b0aac94a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 14 Aug 2010 08:37:38 +0200 Subject: engine leak fixed; pgm compilation fixed --- src/connect_session.cpp | 20 ++++++++------------ src/i_engine.hpp | 4 ++++ src/pgm_receiver.cpp | 12 +++++++++--- src/pgm_receiver.hpp | 7 ++++--- src/pgm_sender.cpp | 12 +++++++++--- src/pgm_sender.hpp | 7 ++++--- src/session.cpp | 3 +++ src/socket_base.cpp | 2 -- src/zmq_engine.cpp | 6 ++++++ src/zmq_engine.hpp | 1 + 10 files changed, 48 insertions(+), 26 deletions(-) diff --git a/src/connect_session.cpp b/src/connect_session.cpp index 896cc48..afa80b8 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -19,6 +19,8 @@ #include "connect_session.hpp" #include "zmq_connecter.hpp" +#include "pgm_sender.hpp" +#include "pgm_receiver.hpp" zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_, @@ -56,10 +58,10 @@ void zmq::connect_session_t::start_connecting () #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. - if (addr_type == "pgm" || addr_type == "epgm") { + if (protocol == "pgm" || protocol == "epgm") { // For EPGM transport with UDP encapsulation of PGM is used. - bool udp_encapsulation = (addr_type == "epgm"); + bool udp_encapsulation = (protocol == "epgm"); // At this point we'll create message pipes to the session straight // away. There's no point in delaying it as no concept of 'connect' @@ -71,11 +73,8 @@ void zmq::connect_session_t::start_connecting () choose_io_thread (options.affinity), options); zmq_assert (pgm_sender); - int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_sender; - return -1; - } + int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); send_attach (this, pgm_sender, blob_t ()); } @@ -86,11 +85,8 @@ void zmq::connect_session_t::start_connecting () choose_io_thread (options.affinity), options); zmq_assert (pgm_receiver); - int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_receiver; - return -1; - } + int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); send_attach (this, pgm_receiver, blob_t ()); } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 0ba94f5..e104a9c 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -34,6 +34,10 @@ namespace zmq // Unplug the engine from the session. virtual void unplug () = 0; + // Terminate and deallocate the engine. Note that 'detached' + // events in not fired on termination. + virtual void terminate () = 0; + // This method is called by the session to signalise that more // messages can be written to the pipe. virtual void activate_in () = 0; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 048c529..ff61b96 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (i_inout *inout_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) { // Retrieve PGM fds and start polling. int socket_fd; @@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug () inout = NULL; } -void zmq::pgm_receiver_t::revive () +void zmq::pgm_receiver_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::pgm_receiver_t::activate_out () { zmq_assert (false); } -void zmq::pgm_receiver_t::resume_input () +void zmq::pgm_receiver_t::activate_in () { // It is possible that the most recently used decoder // processed the whole buffer but failed to write diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 1b367bf..7215324 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -51,10 +51,11 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void unplug (); - void revive (); - void resume_input (); + void terminate (); + void activate_in (); + void activate_out (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9aeb7a9..5c9020d 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (i_inout *inout_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) { // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; @@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug () encoder.set_inout (NULL); } -void zmq::pgm_sender_t::revive () +void zmq::pgm_sender_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::pgm_sender_t::activate_out () { set_pollout (handle); out_event (); } -void zmq::pgm_sender_t::resume_input () +void zmq::pgm_sender_t::activate_in () { zmq_assert (false); } diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 23a53bc..a1ac329 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -49,10 +49,11 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void unplug (); - void revive (); - void resume_input (); + void terminate (); + void activate_in (); + void activate_out (); // i_poll_events interface implementation. void in_event (); diff --git a/src/session.cpp b/src/session.cpp index e208ebf..9655e64 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -45,6 +45,9 @@ zmq::session_t::~session_t () { zmq_assert (!in_pipe); zmq_assert (!out_pipe); + + if (engine) + engine->terminate (); } void zmq::session_t::terminate () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 060480f..0103618 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -44,8 +44,6 @@ #include "err.hpp" #include "ctx.hpp" #include "platform.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" #include "likely.hpp" #include "pair.hpp" #include "pub.hpp" diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index de26b27..6551bc3 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -87,6 +87,12 @@ void zmq::zmq_engine_t::unplug () inout = NULL; } +void zmq::zmq_engine_t::terminate () +{ + unplug (); + delete this; +} + void zmq::zmq_engine_t::in_event () { bool disconnection = false; diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 328ec95..1023051 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -44,6 +44,7 @@ namespace zmq // i_engine interface implementation. void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void unplug (); + void terminate (); void activate_in (); void activate_out (); -- cgit v1.2.3