summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp33
1 files changed, 20 insertions, 13 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3737410..026b317 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -46,8 +46,9 @@ zmq::socket_base_t::~socket_base_t ()
break;
// Send termination request to all associated I/O objects.
- for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
- send_term (io_objects [i]);
+ for (io_objects_t::iterator it = io_objects.begin ();
+ it != io_objects.end (); it++)
+ send_term (*it);
// Move the objects to the list of pending term acks.
pending_term_acks += io_objects.size ();
@@ -59,16 +60,23 @@ zmq::socket_base_t::~socket_base_t ()
}
}
-int zmq::socket_base_t::bind (const char *addr_, struct zmq_opts *opts_)
+int zmq::socket_base_t::setsockopt (int option_, void *optval_,
+ size_t optvallen_)
{
- uint64_t taskset = opts_ ? opts_->taskset : 0;
+ zmq_assert (false);
+}
+
+int zmq::socket_base_t::bind (const char *addr_)
+{
+ // TODO: The taskset should be taken from socket options.
+ uint64_t taskset = 0;
object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
send_plug (listener);
send_own (this, listener);
return 0;
}
-int zmq::socket_base_t::connect (const char *addr_, struct zmq_opts *opts_)
+int zmq::socket_base_t::connect (const char *addr_)
{
zmq_assert (false);
}
@@ -102,24 +110,23 @@ int zmq::socket_base_t::close ()
void zmq::socket_base_t::process_own (object_t *object_)
{
- io_objects.push_back (object_);
+ io_objects.insert (object_);
}
void zmq::socket_base_t::process_term_req (object_t *object_)
{
// If I/O object is well and alive ask it to terminate.
- // TODO: Following find may produce an unacceptable jitter in
- // C10K-style applications. If so, use set instead of vector.
io_objects_t::iterator it = std::find (io_objects.begin (),
io_objects.end (), object_);
- if (it != io_objects.end ()) {
- pending_term_acks++;
- io_objects.erase (it);
- send_term (object_);
- }
// If not found, we assume that termination request was already sent to
// the object so we can sagely ignore the request.
+ if (it == io_objects.end ())
+ return;
+
+ pending_term_acks++;
+ io_objects.erase (it);
+ send_term (object_);
}
void zmq::socket_base_t::process_term_ack ()