summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp69
1 files changed, 64 insertions, 5 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 3141517..ac3b4b9 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -26,12 +26,18 @@
#include "err.hpp"
#include "zmq_listener.hpp"
#include "io_thread.hpp"
+#include "config.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
- app_thread (parent_)
-{
+ app_thread (parent_),
+ hwm (0),
+ lwm (0),
+ swap (0),
+ mask (0),
+ affinity (0)
+{
}
zmq::socket_base_t::~socket_base_t ()
@@ -63,14 +69,67 @@ zmq::socket_base_t::~socket_base_t ()
int zmq::socket_base_t::setsockopt (int option_, void *optval_,
size_t optvallen_)
{
- zmq_assert (false);
+ switch (option_) {
+
+ case ZMQ_HWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ hwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_LWM:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ lwm = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_SWAP:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ swap = *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_MASK:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ mask = (uint64_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_AFFINITY:
+ if (optvallen_ != sizeof (int64_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ affinity = (uint64_t) *((int64_t*) optval_);
+ return 0;
+
+ case ZMQ_SESSIONID:
+ if (optvallen_ != sizeof (const char*)) {
+ errno = EINVAL;
+ return -1;
+ }
+ session_id = (const char*) optval_;
+ return 0;
+
+ default:
+ errno = EINVAL;
+ return -1;
+ }
}
int zmq::socket_base_t::bind (const char *addr_)
{
// TODO: The taskset should be taken from socket options.
- uint64_t taskset = 0;
- zmq_listener_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
+ zmq_listener_t *listener =
+ new zmq_listener_t (choose_io_thread (affinity), this);
int rc = listener->set_address (addr_);
if (rc != 0)
return -1;