diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 32 |
1 files changed, 26 insertions, 6 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 0e856ff..eb9b491 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -111,6 +111,9 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, return NULL; } alloc_assert (s); + int rc = s->init (); + if (rc != 0) + return NULL; return s; } @@ -119,6 +122,7 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : tag (0xbaddecaf), ctx_terminated (false), destroyed (false), + initialised (false), last_tsc (0), ticks (0), rcvmore (false) @@ -126,9 +130,25 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : options.socket_id = sid_; } +int xs::socket_base_t::init () +{ + xs_assert (!initialised); + int rc = mailbox_init (&mailbox); + if (rc != 0) { + destroyed = true; + delete this; + return -1; + } + initialised = true; + return 0; +} + xs::socket_base_t::~socket_base_t () { xs_assert (destroyed); + + if (initialised) + mailbox_close (&mailbox); } xs::mailbox_t *xs::socket_base_t::get_mailbox () @@ -146,7 +166,7 @@ void xs::socket_base_t::stop () } int xs::socket_base_t::parse_uri (const char *uri_, - std::string &protocol_, std::string &address_) + std::string &protocol_, std::string &address_) { xs_assert (uri_ != NULL); @@ -265,7 +285,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - *((fd_t*) optval_) = mailbox.get_fd (); + *((fd_t*) optval_) = mailbox_fd (&mailbox); *optvallen_ = sizeof (fd_t); return 0; } @@ -666,7 +686,7 @@ void xs::socket_base_t::start_reaping (io_thread_t *io_thread_) { // Plug the socket to the reaper thread. io_thread = io_thread_; - handle = io_thread->add_fd (mailbox.get_fd (), this); + handle = io_thread->add_fd (mailbox_fd (&mailbox), this); io_thread->set_pollin (handle); // Initialise the termination and check whether it can be deallocated @@ -682,7 +702,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) if (timeout_ != 0) { // If we are asked to wait, simply ask mailbox to wait. - rc = mailbox.recv (&cmd, timeout_); + rc = mailbox_recv (&mailbox, &cmd, timeout_); } else { @@ -709,7 +729,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, 0); + rc = mailbox_recv (&mailbox, &cmd, 0); } // Process all the commands available at the moment. @@ -720,7 +740,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, 0); + rc = mailbox_recv (&mailbox, &cmd, 0); } if (ctx_terminated) { |