summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp267
1 files changed, 0 insertions, 267 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
deleted file mode 100644
index 6718244..0000000
--- a/src/socket_base.cpp
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string>
-
-#include "../include/zmq.h"
-
-#include "socket_base.hpp"
-#include "app_thread.hpp"
-#include "err.hpp"
-#include "listener.hpp"
-#include "connecter.hpp"
-#include "simple_semaphore.hpp"
-#include "io_thread.hpp"
-#include "io_object.hpp"
-#include "session.hpp"
-#include "dummy_aggregator.hpp"
-#include "dummy_distributor.hpp"
-
-zmq::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) :
- object_t (thread_),
- thread (thread_),
- session (session_),
- has_in (true),
- has_out (true)
-{
- session->set_engine (this);
-}
-
-void zmq::socket_base_t::shutdown ()
-{
- // Destroy all the I/O objects created from this socket.
- for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
- io_objects [i]->shutdown ();
-
- delete this;
-}
-
-void zmq::socket_base_t::schedule_terminate ()
-{
- // Terminate is never scheduled on socket engines.
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::terminate ()
-{
- // Destroy all the I/O objects created from this socket.
- // First unregister the object from I/O thread, then terminate it in
- // this application thread.
- simple_semaphore_t smph;
- for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) {
- send_unreg (io_objects [i], &smph);
- smph.wait ();
- io_objects [i]->terminate ();
- }
-
- zmq_assert (session);
- session->disconnected ();
-
- delete this;
-}
-
-zmq::socket_base_t::~socket_base_t ()
-{
-}
-
-void zmq::socket_base_t::disable_in ()
-{
- has_in = false;
-}
-
-void zmq::socket_base_t::disable_out ()
-{
- has_out = false;
-}
-
-int zmq::socket_base_t::bind (const char *addr_, zmq_opts *opts_)
-{
- thread->process_commands (false);
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
- if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
-
- // Choose the I/O thread with the least load, create the listener.
- // Note that same taskset is used to choose the I/O thread to handle
- // the listening socket and newly created connections.
- // Note that has_in and has_out are twisted at this place - listener
- // is going to create peer objects, so the message flows are viewed
- // from the opposite direction.
- io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
- listener_t *listener = new listener_t (io_thread, addr_, session,
- has_out, has_in, opts_ ? opts_->taskset : 0);
-
- // Ask it to start interacting with the I/O thread.
- simple_semaphore_t smph;
- send_reg (listener, &smph);
-
- // Store the reference to the listener so that it can be terminated
- // when the socket is closed.
- io_objects.push_back (listener);
-
- // Wait while listener is actually registered with the I/O thread.
- smph.wait ();
-
- return 0;
- }
- else if (addr.substr (0, pos) == "inproc") {
-
- // For inproc transport the only thing we have to do is to register
- // this socket as an inproc endpoint with the supplied name.
- return register_inproc_endpoint (addr.substr (pos + 3).c_str (),
- session);
- }
- else {
-
- // Unknown protocol requested.
- errno = EINVAL;
- return -1;
- }
-}
-
-int zmq::socket_base_t::connect (const char *addr_, zmq_opts *opts_)
-{
- thread->process_commands (false);
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
- if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
-
- // Choose the I/O thread with the least load, create the connecter and
- // session.
- io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
- i_mux *mux = new dummy_aggregator_t;
- zmq_assert (mux);
- i_demux *demux = new dummy_distributor_t;
- zmq_assert (demux);
- session_t *peer = new session_t (io_thread, io_thread, mux, demux,
- false, true);
- zmq_assert (peer);
- connecter_t *connecter = new connecter_t (io_thread, addr_, peer);
- zmq_assert (connecter);
-
- // Increment session's command sequence number so that it won't get
- // deallocated till the subsequent bind command arrives.
- peer->inc_seqnum ();
-
- // Register the connecter (and session) with its I/O thread.
- simple_semaphore_t smph;
- send_reg (connecter, &smph);
-
- // Store the reference to the connecter so that it can be terminated
- // when the socket is closed.
- io_objects.push_back (connecter);
-
- // Wait till registration succeeds.
- smph.wait ();
-
- // Bind local session with the connecter's session so that messages
- // can flow in both directions.
- session->bind (peer, has_in, has_out);
-
- return 0;
- }
- else if (addr.substr (0, pos) == "inproc") {
-
- // Get the MD responsible for the address. In case of invalid address
- // return error.
- object_t *peer = get_inproc_endpoint (addr.substr (pos + 3).c_str ());
- if (!peer) {
- errno = EADDRNOTAVAIL;
- return -1;
- }
-
- // Create bidirectional message pipes between this session and
- // the peer session.
- session->bind (peer, has_in, has_out);
-
- return 0;
- }
- else {
-
- // Unknown protocol requested.
- errno = EINVAL;
- return -1;
- }
-}
-
-int zmq::socket_base_t::subscribe (const char *criteria_)
-{
- // No implementation at the moment...
- errno = ENOTSUP;
- return -1;
-}
-
-int zmq::socket_base_t::send (zmq_msg *msg_, int flags_)
-{
- thread->process_commands (false);
- while (true) {
- if (session->write (msg_))
- return 0;
- if (flags_ & ZMQ_NOBLOCK) {
- errno = EAGAIN;
- return -1;
- }
- thread->process_commands (true);
- }
-}
-
-int zmq::socket_base_t::flush ()
-{
- thread->process_commands (false);
- session->flush ();
- return 0;
-}
-
-int zmq::socket_base_t::recv (zmq_msg *msg_, int flags_)
-{
- thread->process_commands (false);
- while (true) {
- if (session->read (msg_))
- return 0;
- if (flags_ & ZMQ_NOBLOCK) {
- errno = EAGAIN;
- return -1;
- }
- thread->process_commands (true);
- }
-}
-
-int zmq::socket_base_t::close ()
-{
- terminate ();
- return 0;
-}
-
-void zmq::socket_base_t::attach (struct i_poller *poller_, i_session *session_)
-{
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::detach ()
-{
- zmq_assert (false);
-}
-
-void zmq::socket_base_t::revive ()
-{
- // We can ignore the event safely here.
-}
-