summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 16:24:21 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 16:24:21 +0200
commit2dd501651592baa7f9e49f52e1321ae2b9b4e126 (patch)
treefe5d221061004894eb259304fcdbd4f8092de99d
parent67194267f89d63391288600f127205a2b7a8a5ae (diff)
multiple bugs fixed
-rw-r--r--src/i_inout.hpp8
-rw-r--r--src/pipe.hpp3
-rw-r--r--src/session.cpp13
-rw-r--r--src/session.hpp1
-rw-r--r--src/zmq_connecter_init.cpp6
-rw-r--r--src/zmq_connecter_init.hpp1
-rw-r--r--src/zmq_encoder.cpp1
-rw-r--r--src/zmq_engine.cpp5
-rw-r--r--src/zmq_listener_init.cpp33
-rw-r--r--src/zmq_listener_init.hpp5
10 files changed, 61 insertions, 15 deletions
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index 8901c04..89b9fbd 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -27,9 +27,17 @@ namespace zmq
struct i_inout
{
+ // Engine asks to get a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0;
+
+ // Engine sends the incoming message further on downstream.
virtual bool write (::zmq_msg_t *msg_) = 0;
+
+ // Flush all the previously written messages downstream.
virtual void flush () = 0;
+
+ // Drop all the references to the engine.
+ virtual void detach () = 0;
};
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index d48fc47..b7593c7 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -42,8 +42,9 @@ namespace zmq
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
- // Mnaipulation of index of the pipe.
void set_endpoint (i_endpoint *endpoint_);
+
+ // Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
diff --git a/src/session.cpp b/src/session.cpp
index 115fb85..0b1b947 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -26,7 +26,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
- active (false),
+ active (true),
out_pipe (NULL),
engine (NULL),
name (name_),
@@ -74,6 +74,16 @@ void zmq::session_t::flush ()
out_pipe->flush ();
}
+void zmq::session_t::detach ()
+{
+ // Engine is terminating itself.
+ engine = NULL;
+
+ // TODO: In the case od anonymous connection, terminate the session.
+// if (anonymous)
+// term ();
+}
+
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
@@ -98,6 +108,7 @@ void zmq::session_t::process_plug ()
pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
zmq_assert (inbound);
in_pipe = &inbound->reader;
+ in_pipe->set_endpoint (this);
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;
diff --git a/src/session.hpp b/src/session.hpp
index b79fb4b..4a0882b 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -48,6 +48,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
+ void detach ();
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index 7048bd1..7326ebe 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -80,6 +80,12 @@ void zmq::zmq_connecter_init_t::flush ()
zmq_assert (false);
}
+void zmq::zmq_connecter_init_t::detach ()
+{
+ // TODO: Engine is closing down. Init object is to be closed as well.
+ zmq_assert (false);
+}
+
void zmq::zmq_connecter_init_t::process_plug ()
{
zmq_assert (engine);
diff --git a/src/zmq_connecter_init.hpp b/src/zmq_connecter_init.hpp
index 79ea9e2..3f42fc6 100644
--- a/src/zmq_connecter_init.hpp
+++ b/src/zmq_connecter_init.hpp
@@ -49,6 +49,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
+ void detach ();
// Handlers for incoming commands.
void process_plug ();
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 39b7192..55e1a83 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -73,4 +73,3 @@ bool zmq::zmq_encoder_t::message_ready ()
}
return true;
}
-
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index cd7ad7e..3cab4c9 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -136,5 +136,8 @@ void zmq::zmq_engine_t::revive ()
void zmq::zmq_engine_t::error ()
{
- zmq_assert (false);
+ zmq_assert (inout);
+ inout->detach ();
+ unplug ();
+ delete this;
}
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 7e2f311..c188030 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -17,8 +17,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <string>
-
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
@@ -27,7 +25,8 @@
zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_) :
owned_t (parent_, owner_),
- options (options_)
+ options (options_),
+ has_peer_identity (false)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
@@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
{
+ // Once we've got peer's identity we aren't interested in subsequent
+ // messages.
+ if (has_peer_identity)
+ return false;
+
// Retreieve the remote identity. We'll use it as a local session name.
- std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
+ has_peer_identity = true;
+ peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
+
+
+ return true;
+}
+
+void zmq::zmq_listener_init_t::flush ()
+{
+ zmq_assert (has_peer_identity);
// Initialisation is done. Disconnect the engine from the init object.
engine->unplug ();
// Have a look whether the session already exists. If it does, attach it
// to the engine. If it doesn't create it first.
- session_t *session = owner->find_session (session_name.c_str ());
+ session_t *session = owner->find_session (peer_identity.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session = new session_t (io_thread, owner, session_name.c_str (),
+ session = new session_t (io_thread, owner, peer_identity.c_str (),
options);
zmq_assert (session);
send_plug (session);
@@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
// Destroy the init object.
term ();
-
- return true;
}
-void zmq::zmq_listener_init_t::flush ()
+void zmq::zmq_listener_init_t::detach ()
{
- // No need to do anything. zmq_listener_init_t does no batching
- // of messages. Each message is processed immediately on write.
+ // TODO: Engine is closing down. Init object is to be closed as well.
+ zmq_assert (false);
}
void zmq::zmq_listener_init_t::process_plug ()
diff --git a/src/zmq_listener_init.hpp b/src/zmq_listener_init.hpp
index b061eaa..885b36b 100644
--- a/src/zmq_listener_init.hpp
+++ b/src/zmq_listener_init.hpp
@@ -49,6 +49,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
+ void detach ();
// Handlers for incoming commands.
void process_plug ();
@@ -62,6 +63,10 @@ namespace zmq
// Associated socket options.
options_t options;
+ // Indetity on the other end of the connection.
+ bool has_peer_identity;
+ std::string peer_identity;
+
zmq_listener_init_t (const zmq_listener_init_t&);
void operator = (const zmq_listener_init_t&);
};