diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/app_thread.cpp | 23 | ||||
-rw-r--r-- | src/dispatcher.cpp | 12 | ||||
-rw-r--r-- | src/epoll.cpp | 3 | ||||
-rw-r--r-- | src/io_thread.cpp | 4 | ||||
-rw-r--r-- | src/kqueue.cpp | 3 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 4 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 8 | ||||
-rw-r--r-- | src/socket_base.cpp | 33 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 1 | ||||
-rw-r--r-- | src/yqueue.hpp | 5 | ||||
-rw-r--r-- | src/zmq.cpp | 4 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 6 | ||||
-rw-r--r-- | src/zmq_connecter_init.cpp | 5 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 4 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 6 | ||||
-rw-r--r-- | src/zmq_listener_init.cpp | 9 |
17 files changed, 85 insertions, 47 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 308fc36..666116f 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> #include <algorithm> #include "../bindings/c/zmq.h" @@ -65,11 +66,11 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, last_processing_time (0) { if (flags_ & ZMQ_POLL) { - signaler = new fd_signaler_t; + signaler = new (std::nothrow) fd_signaler_t; zmq_assert (signaler); } else { - signaler = new ypollset_t; + signaler = new (std::nothrow) ypollset_t; zmq_assert (signaler); } } @@ -163,31 +164,31 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) socket_base_t *s = NULL; switch (type_) { case ZMQ_P2P: - s = new p2p_t (this); + s = new (std::nothrow) p2p_t (this); break; case ZMQ_PUB: - s = new pub_t (this); + s = new (std::nothrow) pub_t (this); break; case ZMQ_SUB: - s = new sub_t (this); + s = new (std::nothrow) sub_t (this); break; case ZMQ_REQ: - s = new req_t (this); + s = new (std::nothrow) req_t (this); break; case ZMQ_REP: - s = new rep_t (this); + s = new (std::nothrow) rep_t (this); break; case ZMQ_XREQ: - s = new xreq_t (this); + s = new (std::nothrow) xreq_t (this); break; case ZMQ_XREP: - s = new xrep_t (this); + s = new (std::nothrow) xrep_t (this); break; case ZMQ_UPSTREAM: - s = new upstream_t (this); + s = new (std::nothrow) upstream_t (this); break; case ZMQ_DOWNSTREAM: - s = new downstream_t (this); + s = new (std::nothrow) downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 51143b3..7115bca 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "../bindings/c/zmq.h" #include "dispatcher.hpp" @@ -49,7 +51,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, // Create application thread proxies. for (int i = 0; i != app_threads_; i++) { - app_thread_t *app_thread = new app_thread_t (this, i, flags_); + app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i, + flags_); zmq_assert (app_thread); app_threads.push_back (app_thread); signalers.push_back (app_thread->get_signaler ()); @@ -57,15 +60,16 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, // Create I/O thread objects. for (int i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new io_thread_t (this, i + app_threads_, - flags_); + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, + i + app_threads_, flags_); zmq_assert (io_thread); io_threads.push_back (io_thread); signalers.push_back (io_thread->get_signaler ()); } // Create command pipe matrix. - command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; + command_pipes = new (std::nothrow) command_pipe_t [signalers.size () * + signalers.size ()]; zmq_assert (command_pipes); // Launch I/O threads. diff --git a/src/epoll.cpp b/src/epoll.cpp index 8f576f8..b25883f 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -26,6 +26,7 @@ #include <string.h> #include <unistd.h> #include <algorithm> +#include <new> #include "epoll.hpp" #include "err.hpp" @@ -54,7 +55,7 @@ zmq::epoll_t::~epoll_t () zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { - poll_entry_t *pe = new poll_entry_t; + poll_entry_t *pe = new (std::nothrow) poll_entry_t; zmq_assert (pe != NULL); // The memset is not actually needed. It's here to prevent debugging diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 6d4710a..1332795 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "../bindings/c/zmq.h" #include "io_thread.hpp" @@ -31,7 +33,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_) { - poller = new poller_t; + poller = new (std::nothrow) poller_t; zmq_assert (poller); signaler_handle = poller->add_fd (signaler.get_fd (), this); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 69ad0c8..8e33bdb 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -27,6 +27,7 @@ #include <stdlib.h> #include <unistd.h> #include <algorithm> +#include <new> #include "kqueue.hpp" #include "err.hpp" @@ -72,7 +73,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) { - poll_entry_t *pe = new poll_entry_t; + poll_entry_t *pe = new (std::nothrow) poll_entry_t; zmq_assert (pe != NULL); pe->fd = fd_; diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index e3f7996..2a24858 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -21,6 +21,8 @@ #if defined ZMQ_HAVE_OPENPGM +#include <new> + #ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #endif @@ -171,7 +173,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for joined peer. - it->second.decoder = new zmq_decoder_t (0, NULL, 0); + it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index a16acd6..66f2396 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -86,6 +86,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) // in_batch_size configured in confing.hpp if (receiver) { pgm_msgv_len = get_max_apdu_at_once (in_batch_size); + // TODO: use malloc instead of new pgm_msgv = new pgm_msgv_t [pgm_msgv_len]; } @@ -443,6 +444,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) *size_ = get_max_tsdu_size (); // Allocate buffer. + // TODO: use malloc instead of new unsigned char *apdu_buff = new unsigned char [*size_]; zmq_assert (apdu_buff); return apdu_buff; diff --git a/src/session.cpp b/src/session.cpp index cbcc883..37f2720 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "session.hpp" #include "i_engine.hpp" #include "err.hpp" @@ -157,14 +159,16 @@ void zmq::session_t::process_plug () pipe_t *outbound = NULL; if (options.requires_out) { - inbound = new pipe_t (this, owner, options.hwm, options.lwm); + inbound = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); zmq_assert (inbound); in_pipe = &inbound->reader; in_pipe->set_endpoint (this); } if (options.requires_in) { - outbound = new pipe_t (owner, this, options.hwm, options.lwm); + outbound = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); zmq_assert (outbound); out_pipe = &outbound->writer; out_pipe->set_endpoint (this); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 888b6ea..fde258c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> #include <string> #include <algorithm> @@ -87,8 +88,9 @@ int zmq::socket_base_t::bind (const char *addr_) return register_endpoint (addr_args.c_str (), this); if (addr_type == "tcp") { - zmq_listener_t *listener = new zmq_listener_t ( + zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( choose_io_thread (options.affinity), this, options); + zmq_assert (listener); int rc = listener->set_address (addr_args.c_str ()); if (rc != 0) return -1; @@ -143,13 +145,15 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new pipe_t (this, peer, options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, peer, + options.hwm, options.lwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new pipe_t (peer, this, options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (peer, this, + options.hwm, options.lwm); zmq_assert (out_pipe); } @@ -168,8 +172,8 @@ int zmq::socket_base_t::connect (const char *addr_) // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new session_t (io_thread, this, session_name.c_str (), - options, true); + session_t *session = new (std::nothrow) session_t (io_thread, this, + session_name.c_str (), options, true); zmq_assert (session); pipe_t *in_pipe = NULL; @@ -177,14 +181,16 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new pipe_t (this, session, options.hwm, options.lwm); + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new pipe_t (session, this, options.hwm, options.lwm); + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); zmq_assert (out_pipe); } @@ -205,9 +211,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Create the connecter object. Supply it with the session name // so that it can bind the new connection to the session once // it is established. - zmq_connecter_t *connecter = new zmq_connecter_t ( + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( choose_io_thread (options.affinity), this, options, session_name.c_str (), false); + zmq_assert (connecter); int rc = connecter->set_address (addr_args.c_str ()); if (rc != 0) { delete connecter; @@ -237,9 +244,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (options.requires_out) { // PGM sender. - pgm_sender_t *pgm_sender = - new pgm_sender_t (choose_io_thread (options.affinity), options, + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + choose_io_thread (options.affinity), options, session_name.c_str ()); + zmq_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); if (rc != 0) { @@ -252,9 +260,10 @@ int zmq::socket_base_t::connect (const char *addr_) else if (options.requires_in) { // PGM receiver. - pgm_receiver_t *pgm_receiver = - new pgm_receiver_t (choose_io_thread (options.affinity), options, + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + choose_io_thread (options.affinity), options, session_name.c_str ()); + zmq_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); if (rc != 0) { diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 383aebe..11707fb 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -66,7 +66,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_) // Bind the socket to the network interface and port. rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); - // TODO: Convert error code to errno. if (rc == SOCKET_ERROR) { wsa_error_to_errno (); return -1; diff --git a/src/yqueue.hpp b/src/yqueue.hpp index f80af9c..1d3f8e8 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -20,6 +20,7 @@ #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__ #define __ZMQ_YQUEUE_HPP_INCLUDED__ +#include <new> #include <stddef.h> #include "err.hpp" @@ -47,7 +48,7 @@ namespace zmq // Create the queue. inline yqueue_t () { - begin_chunk = new chunk_t; + begin_chunk = new (std::nothrow) chunk_t; zmq_assert (begin_chunk); begin_pos = 0; back_chunk = NULL; @@ -93,7 +94,7 @@ namespace zmq if (++end_pos != N) return; - end_chunk->next = new chunk_t; + end_chunk->next = new (std::nothrow) chunk_t; zmq_assert (end_chunk->next); end_chunk = end_chunk->next; end_pos = 0; diff --git a/src/zmq.cpp b/src/zmq.cpp index 3497fe1..d523036 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -208,8 +208,8 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_) return NULL; } - zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_, - io_threads_, flags_); + zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( + app_threads_, io_threads_, flags_); zmq_assert (dispatcher); return (void*) dispatcher; } diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index cd7d1b5..5bda48d 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "zmq_connecter.hpp" #include "zmq_connecter_init.hpp" #include "io_thread.hpp" @@ -87,8 +89,8 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner, - fd, options, session_name.c_str (), address.c_str ()); + zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t ( + io_thread, owner, fd, options, session_name.c_str (), address.c_str ()); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 1418429..f8436a3 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "zmq_connecter_init.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" @@ -31,7 +33,8 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, session_name (session_name_) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_, options, true, address_); + engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true, + address_); zmq_assert (engine); } diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 6b439f5..c2878a7 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "zmq_engine.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" @@ -155,7 +157,7 @@ void zmq::zmq_engine_t::error () // Create a connecter object to attempt reconnect. // Ask it to wait for a while before reconnecting. - reconnecter = new zmq_connecter_t ( + reconnecter = new (std::nothrow) zmq_connecter_t ( inout->get_io_thread (), inout->get_owner (), options, inout->get_session_name (), true); zmq_assert (reconnecter); diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 5d678a2..5c7552b 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "zmq_listener.hpp" #include "zmq_listener_init.hpp" #include "io_thread.hpp" @@ -62,8 +64,8 @@ void zmq::zmq_listener_t::in_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner, - fd, options); + zmq_listener_init_t *init = new (std::nothrow) zmq_listener_init_t ( + io_thread, owner, fd, options); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 752f062..f7b3001 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "zmq_listener_init.hpp" #include "io_thread.hpp" #include "session.hpp" @@ -29,7 +31,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, has_peer_identity (false) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_, options, false, NULL); + engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, + false, NULL); zmq_assert (engine); } @@ -74,8 +77,8 @@ void zmq::zmq_listener_init_t::flush () session = owner->find_session (peer_identity.c_str ()); if (!session) { io_thread_t *io_thread = choose_io_thread (options.affinity); - session = new session_t (io_thread, owner, peer_identity.c_str (), - options, false); + session = new (std::nothrow) session_t (io_thread, owner, + peer_identity.c_str (), options, false); zmq_assert (session); send_plug (session); send_own (owner, session); |