summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp267
1 files changed, 267 insertions, 0 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
new file mode 100644
index 0000000..07606ad
--- /dev/null
+++ b/src/socket_base.cpp
@@ -0,0 +1,267 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string>
+
+#include "../include/zs.h"
+
+#include "socket_base.hpp"
+#include "app_thread.hpp"
+#include "err.hpp"
+#include "listener.hpp"
+#include "connecter.hpp"
+#include "simple_semaphore.hpp"
+#include "io_thread.hpp"
+#include "io_object.hpp"
+#include "session.hpp"
+#include "dummy_aggregator.hpp"
+#include "dummy_distributor.hpp"
+
+zs::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) :
+ object_t (thread_),
+ thread (thread_),
+ session (session_),
+ has_in (true),
+ has_out (true)
+{
+ session->set_engine (this);
+}
+
+void zs::socket_base_t::shutdown ()
+{
+ // Destroy all the I/O objects created from this socket.
+ for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
+ io_objects [i]->shutdown ();
+
+ delete this;
+}
+
+void zs::socket_base_t::schedule_terminate ()
+{
+ // Terminate is never scheduled on socket engines.
+ zs_assert (false);
+}
+
+void zs::socket_base_t::terminate ()
+{
+ // Destroy all the I/O objects created from this socket.
+ // First unregister the object from I/O thread, then terminate it in
+ // this application thread.
+ simple_semaphore_t smph;
+ for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) {
+ send_unreg (io_objects [i], &smph);
+ smph.wait ();
+ io_objects [i]->terminate ();
+ }
+
+ zs_assert (session);
+ session->disconnected ();
+
+ delete this;
+}
+
+zs::socket_base_t::~socket_base_t ()
+{
+}
+
+void zs::socket_base_t::disable_in ()
+{
+ has_in = false;
+}
+
+void zs::socket_base_t::disable_out ()
+{
+ has_out = false;
+}
+
+int zs::socket_base_t::bind (const char *addr_, zs_opts *opts_)
+{
+ thread->process_commands (false);
+
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
+
+ // Choose the I/O thread with the least load, create the listener.
+ // Note that same taskset is used to choose the I/O thread to handle
+ // the listening socket and newly created connections.
+ // Note that has_in and has_out are twisted at this place - listener
+ // is going to create peer objects, so the message flows are viewed
+ // from the opposite direction.
+ io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
+ listener_t *listener = new listener_t (io_thread, addr_, session,
+ has_out, has_in, opts_ ? opts_->taskset : 0);
+
+ // Ask it to start interacting with the I/O thread.
+ simple_semaphore_t smph;
+ send_reg (listener, &smph);
+
+ // Store the reference to the listener so that it can be terminated
+ // when the socket is closed.
+ io_objects.push_back (listener);
+
+ // Wait while listener is actually registered with the I/O thread.
+ smph.wait ();
+
+ return 0;
+ }
+ else if (addr.substr (0, pos) == "inproc") {
+
+ // For inproc transport the only thing we have to do is to register
+ // this socket as an inproc endpoint with the supplied name.
+ return register_inproc_endpoint (addr.substr (pos + 3).c_str (),
+ session);
+ }
+ else {
+
+ // Unknown protocol requested.
+ errno = EINVAL;
+ return -1;
+ }
+}
+
+int zs::socket_base_t::connect (const char *addr_, zs_opts *opts_)
+{
+ thread->process_commands (false);
+
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
+
+ // Choose the I/O thread with the least load, create the connecter and
+ // session.
+ io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
+ i_mux *mux = new dummy_aggregator_t;
+ zs_assert (mux);
+ i_demux *demux = new dummy_distributor_t;
+ zs_assert (demux);
+ session_t *peer = new session_t (io_thread, io_thread, mux, demux,
+ false, true);
+ zs_assert (peer);
+ connecter_t *connecter = new connecter_t (io_thread, addr_, peer);
+ zs_assert (connecter);
+
+ // Increment session's command sequence number so that it won't get
+ // deallocated till the subsequent bind command arrives.
+ peer->inc_seqnum ();
+
+ // Register the connecter (and session) with its I/O thread.
+ simple_semaphore_t smph;
+ send_reg (connecter, &smph);
+
+ // Store the reference to the connecter so that it can be terminated
+ // when the socket is closed.
+ io_objects.push_back (connecter);
+
+ // Wait till registration succeeds.
+ smph.wait ();
+
+ // Bind local session with the connecter's session so that messages
+ // can flow in both directions.
+ session->bind (peer, has_in, has_out);
+
+ return 0;
+ }
+ else if (addr.substr (0, pos) == "inproc") {
+
+ // Get the MD responsible for the address. In case of invalid address
+ // return error.
+ object_t *peer = get_inproc_endpoint (addr.substr (pos + 3).c_str ());
+ if (!peer) {
+ errno = EADDRNOTAVAIL;
+ return -1;
+ }
+
+ // Create bidirectional message pipes between this session and
+ // the peer session.
+ session->bind (peer, has_in, has_out);
+
+ return 0;
+ }
+ else {
+
+ // Unknown protocol requested.
+ errno = EINVAL;
+ return -1;
+ }
+}
+
+int zs::socket_base_t::subscribe (const char *criteria_)
+{
+ // No implementation at the moment...
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zs::socket_base_t::send (zs_msg *msg_, int flags_)
+{
+ thread->process_commands (false);
+ while (true) {
+ if (session->write (msg_))
+ return 0;
+ if (flags_ & ZS_NOBLOCK) {
+ errno = EAGAIN;
+ return -1;
+ }
+ thread->process_commands (true);
+ }
+}
+
+int zs::socket_base_t::flush ()
+{
+ thread->process_commands (false);
+ session->flush ();
+ return 0;
+}
+
+int zs::socket_base_t::recv (zs_msg *msg_, int flags_)
+{
+ thread->process_commands (false);
+ while (true) {
+ if (session->read (msg_))
+ return 0;
+ if (flags_ & ZS_NOBLOCK) {
+ errno = EAGAIN;
+ return -1;
+ }
+ thread->process_commands (true);
+ }
+}
+
+int zs::socket_base_t::close ()
+{
+ terminate ();
+ return 0;
+}
+
+void zs::socket_base_t::attach (struct i_poller *poller_, i_session *session_)
+{
+ zs_assert (false);
+}
+
+void zs::socket_base_t::detach ()
+{
+ zs_assert (false);
+}
+
+void zs::socket_base_t::revive ()
+{
+ // We can ignore the event safely here.
+}
+