summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp32
1 files changed, 26 insertions, 6 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 0e856ff..eb9b491 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -111,6 +111,9 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
return NULL;
}
alloc_assert (s);
+ int rc = s->init ();
+ if (rc != 0)
+ return NULL;
return s;
}
@@ -119,6 +122,7 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
tag (0xbaddecaf),
ctx_terminated (false),
destroyed (false),
+ initialised (false),
last_tsc (0),
ticks (0),
rcvmore (false)
@@ -126,9 +130,25 @@ xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
options.socket_id = sid_;
}
+int xs::socket_base_t::init ()
+{
+ xs_assert (!initialised);
+ int rc = mailbox_init (&mailbox);
+ if (rc != 0) {
+ destroyed = true;
+ delete this;
+ return -1;
+ }
+ initialised = true;
+ return 0;
+}
+
xs::socket_base_t::~socket_base_t ()
{
xs_assert (destroyed);
+
+ if (initialised)
+ mailbox_close (&mailbox);
}
xs::mailbox_t *xs::socket_base_t::get_mailbox ()
@@ -146,7 +166,7 @@ void xs::socket_base_t::stop ()
}
int xs::socket_base_t::parse_uri (const char *uri_,
- std::string &protocol_, std::string &address_)
+ std::string &protocol_, std::string &address_)
{
xs_assert (uri_ != NULL);
@@ -265,7 +285,7 @@ int xs::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- *((fd_t*) optval_) = mailbox.get_fd ();
+ *((fd_t*) optval_) = mailbox_fd (&mailbox);
*optvallen_ = sizeof (fd_t);
return 0;
}
@@ -666,7 +686,7 @@ void xs::socket_base_t::start_reaping (io_thread_t *io_thread_)
{
// Plug the socket to the reaper thread.
io_thread = io_thread_;
- handle = io_thread->add_fd (mailbox.get_fd (), this);
+ handle = io_thread->add_fd (mailbox_fd (&mailbox), this);
io_thread->set_pollin (handle);
// Initialise the termination and check whether it can be deallocated
@@ -682,7 +702,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
if (timeout_ != 0) {
// If we are asked to wait, simply ask mailbox to wait.
- rc = mailbox.recv (&cmd, timeout_);
+ rc = mailbox_recv (&mailbox, &cmd, timeout_);
}
else {
@@ -709,7 +729,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox_recv (&mailbox, &cmd, 0);
}
// Process all the commands available at the moment.
@@ -720,7 +740,7 @@ int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox_recv (&mailbox, &cmd, 0);
}
if (ctx_terminated) {