diff options
Diffstat (limited to 'src')
73 files changed, 109 insertions, 4856 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 27f4412..bde9c39 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,53 +7,30 @@ libzmq_la_SOURCES = \      atomic_ptr.hpp \      command.hpp \      config.hpp \ -    connecter.hpp \      context.hpp \ -    data_distributor.hpp \      decoder.hpp \      devpoll.hpp \ -    dummy_aggregator.hpp \ -    dummy_distributor.hpp \      encoder.hpp \      epoll.hpp \      err.hpp \ -    fair_aggregator.hpp \      fd.hpp \      fd_signaler.hpp \ -    io_object.hpp \      io_thread.hpp \      ip.hpp \      i_api.hpp \ -    i_demux.hpp \ -    i_mux.hpp \      i_poller.hpp \      i_poll_events.hpp \ -    i_session.hpp \      i_signaler.hpp \ -    i_engine.hpp \ -    i_thread.hpp \ -    listener.hpp \ +    i_socket.hpp \      kqueue.hpp \ -    load_balancer.hpp \      msg.hpp \      mutex.hpp \      object.hpp \ -    p2p.hpp \      pipe.hpp \ -    pipe_reader.hpp \ -    pipe_writer.hpp \      platform.hpp \      poll.hpp \ -    pub.hpp \ -    rep.hpp \ -    req.hpp \ -    safe_object.hpp \      select.hpp \ -    session.hpp \ -    session_stub.hpp \      simple_semaphore.hpp \ -    socket_base.hpp \ -    sub.hpp \      stdint.hpp \      tcp_connecter.hpp \      tcp_listener.hpp \ @@ -65,50 +42,24 @@ libzmq_la_SOURCES = \      ypipe.hpp \      ypollset.hpp \      yqueue.hpp \ -    zmq_decoder.hpp \ -    zmq_encoder.hpp \ -    zmq_tcp_engine.hpp \      app_thread.cpp \ -    connecter.cpp \      context.cpp \ -    data_distributor.cpp \      devpoll.hpp \ -    dummy_aggregator.cpp \ -    dummy_distributor.cpp \      epoll.cpp \      err.cpp \ -    fair_aggregator.cpp \      fd_signaler.cpp \ -    io_object.cpp \      io_thread.cpp \      ip.cpp \      kqueue.cpp \ -    listener.cpp \ -    load_balancer.cpp \      object.cpp \ -    p2p.cpp \ -    pipe.cpp \ -    pipe_reader.cpp \ -    pipe_writer.cpp \      poll.cpp \ -    pub.cpp \ -    rep.cpp \ -    req.cpp \ -    safe_object.cpp \      select.cpp \ -    session.cpp \ -    session_stub.cpp \ -    socket_base.cpp \ -    sub.cpp \      tcp_connecter.cpp \      tcp_listener.cpp \      tcp_socket.cpp \      thread.cpp \      uuid.cpp \      ypollset.cpp \ -    zmq_decoder.cpp \ -    zmq_encoder.cpp \ -    zmq_tcp_engine.cpp \      zmq.cpp  libzmq_la_LDFLAGS = -version-info 0:0:0 diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9cc61c7..23a055a 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -28,20 +28,8 @@  #include "app_thread.hpp"  #include "context.hpp"  #include "err.hpp" -#include "session.hpp"  #include "pipe.hpp"  #include "config.hpp" -#include "i_api.hpp" -#include "dummy_aggregator.hpp" -#include "fair_aggregator.hpp" -#include "dummy_distributor.hpp" -#include "data_distributor.hpp" -#include "load_balancer.hpp" -#include "p2p.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp"  //  If the RDTSC is available we use it to prevent excessive  //  polling for commands. The nice thing here is that it will work on any @@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :  {  } -void zmq::app_thread_t::shutdown () -{ -    //  Deallocate all the sessions associated with the thread. -    while (!sessions.empty ()) -        sessions [0]->shutdown (); - -    delete this; -} -  zmq::app_thread_t::~app_thread_t ()  { -} +    //  Ask all the sockets to start termination, then wait till it is complete. +    for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) +        (*it)->stop (); +    for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) +        delete *it; -void zmq::app_thread_t::attach_session (session_t *session_) -{ -    session_->set_index (sessions.size ()); -    sessions.push_back (session_);  -} - -void zmq::app_thread_t::detach_session (session_t *session_) -{ -    //  O(1) removal of the session from the list. -    sessions_t::size_type i = session_->get_index (); -    sessions [i] = sessions [sessions.size () - 1]; -    sessions [i]->set_index (i); -    sessions.pop_back (); -} - -zmq::i_poller *zmq::app_thread_t::get_poller () -{ -    zmq_assert (false); +    delete this;  }  zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()  bool zmq::app_thread_t::is_current ()  { -    return !sessions.empty () && tid == getpid (); +    return !sockets.empty () && tid == getpid ();  }  bool zmq::app_thread_t::make_current ()  {      //  If there are object managed by this slot we cannot assign the slot      //  to a different thread. -    if (!sessions.empty ()) +    if (!sockets.empty ())          return false;      tid = getpid ();      return true;  } -zmq::i_api *zmq::app_thread_t::create_socket (int type_) -{ -    i_mux *mux = NULL; -    i_demux *demux = NULL; -    session_t *session = NULL; -    i_api *api = NULL; - -    switch (type_) { -    case ZMQ_P2P: -        mux = new dummy_aggregator_t; -        zmq_assert (mux); -        demux = new dummy_distributor_t; -        zmq_assert (demux); -        session = new session_t (this, this, mux, demux, true, false); -        zmq_assert (session); -        api = new p2p_t (this, session); -        zmq_assert (api); -        break; -    case ZMQ_PUB: -        demux = new data_distributor_t; -        zmq_assert (demux); -        session = new session_t (this, this, mux, demux, true, false); -        zmq_assert (session); -        api = new pub_t (this, session); -        zmq_assert (api); -        break; -    case ZMQ_SUB: -        mux = new fair_aggregator_t; -        zmq_assert (mux); -        session = new session_t (this, this, mux, demux, true, false); -        zmq_assert (session); -        api = new sub_t (this, session); -        zmq_assert (api); -        break; -    case ZMQ_REQ: -        //  TODO -        zmq_assert (false); -        api = new req_t (this, session); -        zmq_assert (api); -        break; -    case ZMQ_REP: -        //  TODO -        zmq_assert (false); -        api = new rep_t (this, session); -        zmq_assert (api); -        break; -    default: -        errno = EINVAL; -        return NULL; -    } - -    attach_session (session); -     -    return api; -} -  void zmq::app_thread_t::process_commands (bool block_)  {      ypollset_t::signals_t signals; diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 8295c2f..31679b8 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,7 @@  #include <vector> -#include "i_thread.hpp" +#include "i_socket.hpp"  #include "stdint.hpp"  #include "object.hpp"  #include "ypollset.hpp" @@ -30,23 +30,18 @@  namespace zmq  { -    class app_thread_t : public object_t, public i_thread +    class app_thread_t : public object_t      {      public:          app_thread_t (class context_t *context_, int thread_slot_); -        //  To be called when the whole infrastrucure is being closed. -        void shutdown (); +        ~app_thread_t ();          //  Returns signaler associated with this application thread.          i_signaler *get_signaler (); -        //  Create socket engine in this thread. Return false if the calling -        //  thread doesn't match the thread handled by this app thread object. -        struct i_api *create_socket (int type_); - -        //  Nota bene: The following two functions are accessed from different +        //  Nota bene: Following two functions are accessed from different          //  threads. The caller (context) is responsible for synchronisation          //  of accesses. @@ -61,25 +56,17 @@ namespace zmq          //  set to true, returns only after at least one command was processed.          void process_commands (bool block_); -        //  i_thread implementation. -        void attach_session (class session_t *session_); -        void detach_session (class session_t *session_); -        struct i_poller *get_poller (); -      private: -        //  Clean-up. -        ~app_thread_t (); +        //  All the sockets created from this application thread. +        typedef std::vector <i_socket*> sockets_t; +        sockets_t sockets;          //  Thread ID associated with this slot.          //  TODO: Virtualise pid_t!          //  TODO: Check whether getpid returns unique ID for each thread.          int tid; -        //  Vector of all sessionss associated with this app thread. -        typedef std::vector <class session_t*> sessions_t; -        sessions_t sessions; -          //  App thread's signaler object.          ypollset_t pollset; diff --git a/src/connecter.cpp b/src/connecter.cpp deleted file mode 100644 index 970dcf7..0000000 --- a/src/connecter.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* -    Copyright (c) 2007-2009 FastMQ Inc. - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" - -zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, -      session_t *session_) : -    io_object_t (thread_), -    state (idle), -    poller (NULL), -    session (session_), -    addr (addr_), -    identity ("abcde"), -    engine (NULL) -{ -} - -void zmq::connecter_t::terminate () -{ -    delete this; -} - -void zmq::connecter_t::shutdown () -{ -    delete this; -} - -zmq::connecter_t::~connecter_t () -{ -} - -void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) -{ -    //  Fet poller pointer for further use. -    zmq_assert (!poller); -    poller = get_poller (); - -    //  Ask the session to register itself with the I/O thread. Note that -    //  the session is living in the same I/O thread, thus this results -    //  in a synchronous call. -    session->inc_seqnum (); -    send_reg (session, NULL); - -    //  Unlock the application thread that created the connecter. -    if (smph_) -        smph_->post (); - -    //  Manually trigger timer event which will launch asynchronous connect. -    state = waiting; -    timer_event (); -} - -void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) -{ -    //  Unregister connecter/engine from the poller. -    zmq_assert (poller); -    if (state == connecting) -        poller->rm_fd (handle); -    else if (state == waiting) -        poller->cancel_timer (this); -    else if (state == sending) -        engine->terminate (); - -    //  Unlock the application thread closing the connecter. -    if (smph_) -        smph_->post (); -} - -void zmq::connecter_t::in_event () -{ -    //  Error occured in asynchronous connect. Retry to connect after a while. -    if (state == connecting) { -        fd_t fd = tcp_connecter.connect (); -        zmq_assert (fd == retired_fd); -        poller->rm_fd (handle); -        poller->add_timer (this); -        state = waiting; -        return; -    } - -    zmq_assert (false); -} - -void zmq::connecter_t::out_event () -{ -    if (state == connecting) { - -        fd_t fd = tcp_connecter.connect (); -        if (fd == retired_fd) { -            poller->rm_fd (handle); -            poller->add_timer (this); -            state = waiting; -            return; -        } - -        poller->rm_fd (handle); -        engine = new zmq_tcp_engine_t (fd); -        zmq_assert (engine); -        engine->attach (poller, this); -        state = sending; -        return; -    } - -    zmq_assert (false); -} - -void zmq::connecter_t::timer_event () -{ -    zmq_assert (state == waiting); - -    //  Initiate async connect and start polling for its completion. If async -    //  connect fails instantly, try to reconnect after a while. -    int rc = tcp_connecter.open (addr.c_str ()); -    if (rc == 0) { -        state = connecting; -        in_event (); -    } -    else if (rc == 1) { -        handle = poller->add_fd (tcp_connecter.get_fd (), this); -        poller->set_pollout (handle); -        state = connecting; -    } -    else { -        poller->add_timer (this); -        state = waiting; -    } -} - -void zmq::connecter_t::set_engine (struct i_engine *engine_) -{ -    engine = engine_; -} - -bool zmq::connecter_t::read (zmq_msg *msg_) -{ -    zmq_assert (state == sending); - -    //  Deallocate old content of the message just in case. -    zmq_msg_close (msg_); - -    //  Send the identity. -    zmq_msg_init_size (msg_, identity.size ()); -    memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); - -    //  Ask engine to unregister from the poller. -    i_engine *e = engine; -    engine->detach (); - -    //  Attach the engine to the session. (Note that this is actually -    //  a synchronous call. -    session->inc_seqnum (); -    send_engine (session, e); - -    state = idle; - -    return true;     -} - -bool zmq::connecter_t::write (struct zmq_msg *msg_) -{ -    //  No incoming messages are accepted till identity is sent. -    return false; -} - -void zmq::connecter_t::flush () -{ -    //  No incoming messages are accepted till identity is sent. -} diff --git a/src/connecter.hpp b/src/connecter.hpp deleted file mode 100644 index 1f11c63..0000000 --- a/src/connecter.hpp +++ /dev/null @@ -1,99 +0,0 @@ -/* -    Copyright (c) 2007-2009 FastMQ Inc. - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_CONNECTER_HPP_INCLUDED__ - -#include <string> - -#include "../include/zmq.h" - -#include "i_poller.hpp" -#include "io_object.hpp" -#include "i_poll_events.hpp" -#include "i_session.hpp" -#include "tcp_connecter.hpp" - -namespace zmq -{ - -    class connecter_t : public io_object_t, public i_poll_events, -        public i_session -    { -    public: - -        connecter_t (class io_thread_t *thread_, const char *addr_,< | 
