diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/decoder.cpp | 21 | ||||
-rw-r--r-- | src/decoder.hpp | 5 | ||||
-rw-r--r-- | src/options.cpp | 18 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 3 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 2 |
6 files changed, 45 insertions, 7 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp index c8ca715..7fb32ae 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -25,9 +25,10 @@ #include "wire.hpp" #include "err.hpp" -zmq::decoder_t::decoder_t (size_t bufsize_) : +zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t <decoder_t> (bufsize_), - destination (NULL) + destination (NULL), + maxmsgsize (maxmsgsize_) { zmq_msg_init (&in_progress); @@ -63,7 +64,13 @@ bool zmq::decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); + int rc; + if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) { + rc = -1; + errno = ENOMEM; + } + else + rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); if (rc != 0 && errno == ENOMEM) { rc = zmq_msg_init (&in_progress); errno_assert (rc == 0); @@ -92,7 +99,13 @@ bool zmq::decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, size - 1); + int rc; + if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) { + rc = -1; + errno = ENOMEM; + } + else + rc = zmq_msg_init_size (&in_progress, size - 1); if (rc != 0 && errno == ENOMEM) { rc = zmq_msg_init (&in_progress); errno_assert (rc == 0); diff --git a/src/decoder.hpp b/src/decoder.hpp index 6d28801..3b8b460 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -26,6 +26,7 @@ #include <algorithm> #include "err.hpp" +#include "stdint.hpp" #include "../include/zmq.h" @@ -180,7 +181,7 @@ namespace zmq { public: - decoder_t (size_t bufsize_); + decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); void set_inout (struct i_inout *destination_); @@ -196,6 +197,8 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; + int64_t maxmsgsize; + decoder_t (const decoder_t&); void operator = (const decoder_t&); }; diff --git a/src/options.cpp b/src/options.cpp index c6d5760..9271b88 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -39,6 +39,7 @@ zmq::options_t::options_t () : reconnect_ivl (100), reconnect_ivl_max (0), backlog (100), + maxmsgsize (-1), requires_in (false), requires_out (false), immediate_connect (true) @@ -182,6 +183,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, backlog = *((int*) optval_); return 0; + case ZMQ_MAXMSGSIZE: + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + maxmsgsize = *((int64_t*) optval_); + return 0; + } errno = EINVAL; @@ -328,6 +337,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case ZMQ_MAXMSGSIZE: + if (*optvallen_ < sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + *((int64_t*) optval_) = maxmsgsize; + *optvallen_ = sizeof (int64_t); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 38c6982..00e59c8 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -69,6 +69,9 @@ namespace zmq // Maximum backlog for pending connections. int backlog; + // Maximal size of message to handle. + int64_t maxmsgsize; + // These options are never set by the user directly. Instead they are // provided by the specific socket type. bool requires_in; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1b96198..1b66cc4 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -211,7 +211,8 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) decoder_t (0); + it->second.decoder = new (std::nothrow) decoder_t (0, + options.maxmsgsize); alloc_assert (it->second.decoder); it->second.decoder->set_inout (inout); } diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index c51b7ad..5c7af3a 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -35,7 +35,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), - decoder (in_batch_size), + decoder (in_batch_size, options_.maxmsgsize), outpos (NULL), outsize (0), encoder (out_batch_size), |