diff options
-rw-r--r-- | doc/zmq_getsockopt.txt | 16 | ||||
-rw-r--r-- | doc/zmq_setsockopt.txt | 16 | ||||
-rw-r--r-- | include/zmq.h | 1 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/config.hpp | 4 | ||||
-rw-r--r-- | src/options.cpp | 18 | ||||
-rw-r--r-- | src/options.hpp | 1 | ||||
-rw-r--r-- | src/pipe.cpp | 93 | ||||
-rw-r--r-- | src/pipe.hpp | 30 | ||||
-rw-r--r-- | src/session.cpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 20 | ||||
-rw-r--r-- | src/swap.cpp | 325 | ||||
-rw-r--r-- | src/swap.hpp | 123 |
13 files changed, 24 insertions, 631 deletions
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index eef4b1f..81dcba3 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -79,22 +79,6 @@ Default value:: 0 Applicable socket types:: all -ZMQ_SWAP: Retrieve disk offload size -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_SWAP' option shall retrieve the disk offload (swap) size for the -specified 'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may -exceed it's high water mark; in this case outstanding messages shall be -offloaded to storage on disk rather than held in memory. - -The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes. - -[horizontal] -Option value type:: int64_t -Option value unit:: bytes -Default value:: 0 -Applicable socket types:: all - - ZMQ_AFFINITY: Retrieve I/O thread affinity ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_AFFINITY' option shall retrieve the I/O thread affinity for newly diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 9387116..5d93a62 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -47,22 +47,6 @@ Default value:: 0 Applicable socket types:: all -ZMQ_SWAP: Set disk offload size -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the specified -'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may exceed it's -high water mark; in this case outstanding messages shall be offloaded to -storage on disk rather than held in memory. - -The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes. - -[horizontal] -Option value type:: int64_t -Option value unit:: bytes -Default value:: 0 -Applicable socket types:: all - - ZMQ_AFFINITY: Set I/O thread affinity ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_AFFINITY' option shall set the I/O thread affinity for newly created diff --git a/include/zmq.h b/include/zmq.h index cd1bc90..aa16554 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -184,7 +184,6 @@ ZMQ_EXPORT int zmq_term (void *context); /* Socket options. */ #define ZMQ_HWM 1 -#define ZMQ_SWAP 3 #define ZMQ_AFFINITY 4 #define ZMQ_IDENTITY 5 #define ZMQ_SUBSCRIBE 6 diff --git a/src/Makefile.am b/src/Makefile.am index d0644ef..d2f6d09 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -115,7 +115,6 @@ libzmq_la_SOURCES = \ socket_base.hpp \ stdint.hpp \ sub.hpp \ - swap.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ @@ -173,7 +172,6 @@ libzmq_la_SOURCES = \ session.cpp \ socket_base.cpp \ sub.cpp \ - swap.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ diff --git a/src/config.hpp b/src/config.hpp index f144512..3df66c7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -59,10 +59,6 @@ namespace zmq // Maximal delta between high and low watermark. max_wm_delta = 1024, - // Swap inteligently batches data for writing to disk. The size of - // the batch in bytes is specified by this option. - swap_block_size = 8192, - // Maximum number of events the I/O thread can process in one go. max_io_events = 256, diff --git a/src/options.cpp b/src/options.cpp index 79e4029..9916475 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -27,7 +27,6 @@ zmq::options_t::options_t () : hwm (0), - swap (0), affinity (0), rate (100), recovery_ivl (10), @@ -59,14 +58,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, hwm = *((uint64_t*) optval_); return 0; - case ZMQ_SWAP: - if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) { - errno = EINVAL; - return -1; - } - swap = *((int64_t*) optval_); - return 0; - case ZMQ_AFFINITY: if (optvallen_ != sizeof (uint64_t)) { errno = EINVAL; @@ -195,15 +186,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; - case ZMQ_SWAP: - if (*optvallen_ < sizeof (int64_t)) { - errno = EINVAL; - return -1; - } - *((int64_t*) optval_) = swap; - *optvallen_ = sizeof (int64_t); - return 0; - case ZMQ_AFFINITY: if (*optvallen_ < sizeof (uint64_t)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 62ecadf..a1f9f33 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -36,7 +36,6 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); uint64_t hwm; - int64_t swap; uint64_t affinity; blob_t identity; diff --git a/src/pipe.cpp b/src/pipe.cpp index 8dcf577..f09dea4 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -163,7 +163,7 @@ void zmq::reader_t::process_pipe_term_ack () } zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_, int64_t swap_size_) : + uint64_t hwm_) : object_t (parent_), active (true), pipe (pipe_), @@ -171,28 +171,15 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, hwm (hwm_), msgs_read (0), msgs_written (0), - swap (NULL), sink (NULL), - swapping (false), - pending_delimiter (false), terminating (false) { // Inform reader about the writer. reader->set_writer (this); - - // Open the swap file, if required. - if (swap_size_ > 0) { - swap = new (std::nothrow) swap_t (swap_size_); - alloc_assert (swap); - int rc = swap->init (); - zmq_assert (rc == 0); - } } zmq::writer_t::~writer_t () { - if (swap) - delete swap; } void zmq::writer_t::set_event_sink (i_writer_events *sink_) @@ -208,21 +195,9 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) if (unlikely (!active)) return false; - if (unlikely (swapping)) { - if (unlikely (!swap->fits (msg_))) { - active = false; - return false; - } - } - else { - if (unlikely (pipe_full ())) { - if (swap) - swapping = true; - else { - active = false; - return false; - } - } + if (unlikely (pipe_full ())) { + active = false; + return false; } return true; @@ -233,14 +208,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - if (unlikely (swapping)) { - bool stored = swap->store (msg_); - zmq_assert (stored); - if (!(msg_->flags & ZMQ_MSG_MORE)) - swap->commit (); - return true; - } - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); if (!(msg_->flags & ZMQ_MSG_MORE)) msgs_written++; @@ -250,12 +217,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) void zmq::writer_t::rollback () { - // Remove incomplete message from the swap. - if (unlikely (swapping)) { - swap->rollback (); - return; - } - // Remove incomplete message from the pipe. zmq_msg_t msg; while (pipe->unwrite (&msg)) { @@ -266,8 +227,7 @@ void zmq::writer_t::rollback () void zmq::writer_t::flush () { - // In the swapping mode, flushing is automatically handled by swap object. - if (!swapping && !pipe->flush ()) + if (!pipe->flush ()) send_activate_reader (reader); } @@ -284,11 +244,6 @@ void zmq::writer_t::terminate () // Rollback any unfinished messages. rollback (); - if (swapping) { - pending_delimiter = true; - return; - } - // Push delimiter into the pipe. Trick the compiler to belive that // the tag is a valid pointer. Note that watermarks are not checked // thus the delimiter can be written even though the pipe is full. @@ -305,40 +260,6 @@ void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) // Store the reader's message sequence number. msgs_read = msgs_read_; - // If we are in the swapping mode, we have some messages in the swap. - // Given that pipe is now ready for writing we can move part of the - // swap into the pipe. - if (swapping) { - zmq_msg_t msg; - while (!pipe_full () && !swap->empty ()) { - swap->fetch(&msg); - pipe->write (msg, msg.flags & ZMQ_MSG_MORE); - if (!(msg.flags & ZMQ_MSG_MORE)) - msgs_written++; - } - if (!pipe->flush ()) - send_activate_reader (reader); - - // There are no more messages in the swap. We can switch into - // standard in-memory mode. - if (swap->empty ()) { - swapping = false; - - // Push delimiter into the pipe. Trick the compiler to belive that - // the tag is a valid pointer. Note that watermarks are not checked - // thus the delimiter can be written even though the pipe is full. - if (pending_delimiter) { - zmq_msg_t msg; - const unsigned char *offset = 0; - msg.content = (void*) (offset + ZMQ_DELIMITER); - msg.flags = 0; - pipe->write (msg, false); - flush (); - return; - } - } - } - // If the writer was non-active before, let's make it active // (available for writing messages to). if (!active && !terminating) { @@ -371,7 +292,7 @@ bool zmq::writer_t::pipe_full () } void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_) + uint64_t hwm_, reader_t **reader_, writer_t **writer_) { // First compute the low water mark. Following point should be taken // into consideration: @@ -404,6 +325,6 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); alloc_assert (*reader_); *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, - hwm_, swap_size_); + hwm_); alloc_assert (*writer_); } diff --git a/src/pipe.hpp b/src/pipe.hpp index b4a0ffa..ed13478 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -26,7 +26,6 @@ #include "stdint.hpp" #include "array.hpp" #include "ypipe.hpp" -#include "swap.hpp" #include "config.hpp" #include "object.hpp" @@ -35,7 +34,7 @@ namespace zmq // Creates a pipe. Returns pointer to reader and writer objects. void create_pipe (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_, class reader_t **reader_, + uint64_t hwm_, class reader_t **reader_, class writer_t **writer_); // The shutdown mechanism for pipe works as follows: Either endpoint @@ -59,7 +58,7 @@ namespace zmq class reader_t : public object_t, public array_item_t { friend void create_pipe (object_t*, object_t*, uint64_t, - int64_t, reader_t**, writer_t**); + reader_t**, writer_t**); friend class writer_t; public: @@ -128,7 +127,7 @@ namespace zmq class writer_t : public object_t, public array_item_t { friend void create_pipe (object_t*, object_t*, uint64_t, - int64_t, reader_t**, writer_t**); + reader_t**, writer_t**); public: @@ -136,8 +135,8 @@ namespace zmq void set_event_sink (i_writer_events *endpoint_); // Checks whether messages can be written to the pipe. - // If writing the message would cause high watermark and (optionally) - // if the swap is full, the function returns false. + // If writing the message would cause high watermark + // the function returns false. bool check_write (zmq_msg_t *msg_); // Writes a message to the underlying pipe. Returns false if the @@ -156,19 +155,17 @@ namespace zmq private: writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, - uint64_t hwm_, int64_t swap_size_); + uint64_t hwm_); ~writer_t (); // Command handlers. void process_activate_writer (uint64_t msgs_read_); void process_pipe_term (); - // Tests whether underlying pipe is already full. The swap is not - // taken into account. + // Tests whether underlying pipe is already full. bool pipe_full (); - // True, if this object can be written to. Undelying ypipe may be full - // but as long as there's swap space available, this flag is true. + // True, if this object can be written to. bool active; // The underlying pipe. @@ -187,20 +184,9 @@ namespace zmq // Number of messages we have written so far. uint64_t msgs_written; - // Pointer to the message swap. If NULL, messages are always - // kept in main memory. - swap_t *swap; - // Sink for the events (either the socket or the session). i_writer_events *sink; - // If true, swap is active. New messages are to be written to the swap. - bool swapping; - - // If true, there's a delimiter to be written to the pipe after the - // swap is empied. - bool pending_delimiter; - // True is 'terminate' method was called of 'pipe_term' command // arrived from the reader. bool terminating; diff --git a/src/session.cpp b/src/session.cpp index 3ba971f..176f0ef 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -247,13 +247,11 @@ void zmq::session_t::process_attach (i_engine *engine_, // Create the pipes, as required. if (options.requires_in) { - create_pipe (socket, this, options.hwm, options.swap, - &socket_reader, &out_pipe); + create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out) { - create_pipe (this, socket, options.hwm, options.swap, &in_pipe, - &socket_writer); + create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index abeddb9..093bbe0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -376,28 +376,22 @@ int zmq::socket_base_t::connect (const char *addr_) writer_t *outpipe_writer = NULL; // The total HWM for an inproc connection should be the sum of - // the binder's HWM and the connector's HWM. (Similarly for the - // SWAP.) + // the binder's HWM and the connector's HWM. int64_t hwm; if (options.hwm == 0 || peer.options.hwm == 0) hwm = 0; else hwm = options.hwm + peer.options.hwm; - int64_t swap; - if (options.swap == 0 && peer.options.swap == 0) - swap = 0; - else - swap = options.swap + peer.options.swap; // Create inbound pipe, if required. if (options.requires_in) - create_pipe (this, peer.socket, hwm, swap, - &inpipe_reader, &inpipe_writer); + create_pipe (this, peer.socket, hwm, &inpipe_reader, + &inpipe_writer); // Create outbound pipe, if required. if (options.requires_out) - create_pipe (peer.socket, this, hwm, swap, - &outpipe_reader, &outpipe_writer); + create_pipe (peer.socket, this, hwm, &outpipe_reader, + &outpipe_writer); // Attach the pipes to this socket object. attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity); @@ -435,12 +429,12 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) - create_pipe (this, session, options.hwm, options.swap, + create_pipe (this, session, options.hwm, &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. if (options.requires_out) - create_pipe (session, this, options.hwm, options.swap, + create_pipe (session, this, options.hwm, &outpipe_reader, &outpipe_writer); // Attach the pipes to the socket object. diff --git a/src/swap.cpp b/src/swap.cpp deleted file mode 100644 index 936f30e..0000000 --- a/src/swap.cpp +++ /dev/null @@ -1,325 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#ifdef ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#include <io.h> -#else -#include <unistd.h> -#endif - -#include "../include/zmq.h" - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <string.h> -#include <sstream> -#include <algorithm> - -#include "swap.hpp" -#include "config.hpp" -#include "atomic_counter.hpp" -#include "err.hpp" - -zmq::swap_t::swap_t (int64_t filesize_) : - fd (-1), - filesize (filesize_), - file_pos (0), - write_pos (0), - read_pos (0), - block_size (swap_block_size), - write_buf_start_addr (0) -{ - zmq_assert (filesize > 0); - zmq_assert (block_size > 0); - - buf1 = new (std::nothrow) char [block_size]; - alloc_assert (buf1); - - buf2 = new (std::nothrow) char [block_size]; - alloc_assert (buf2); - - read_buf = write_buf = buf1; -} - -zmq::swap_t::~swap_t () -{ - delete [] buf1; - delete [] buf2; - - if (fd == -1) - return; - -#ifdef ZMQ_HAVE_WINDOWS - int rc = _close (fd); -#else - int rc = close (fd); -#endif - errno_assert (rc == 0); - -#ifdef ZMQ_HAVE_WINDOWS - rc = _unlink (filename.c_str ()); -#else - rc = unlink (filename.c_str ()); -#endif - errno_assert (rc == 0); -} - -int zmq::swap_t::init () -{ - static zmq::atomic_counter_t seqnum (0); - - // Get process ID. -#ifdef ZMQ_HAVE_WINDOWS - int pid = GetCurrentThreadId (); -#else - pid_t pid = getpid (); -#endif - - std::ostringstream outs; - outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; - filename = outs.str (); - - seqnum.add (1); - - // Open the backing file. -#ifdef ZMQ_HAVE_WINDOWS - fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); -#else - fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); -#endif - if (fd == -1) - return -1; - -#ifdef ZMQ_HAVE_LINUX - // Enable more aggresive read-ahead optimization. - posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); -#endif - return 0; -} - -bool zmq::swap_t::store (zmq_msg_t *msg_) -{ - size_t msg_size = zmq_msg_size (msg_); - - // Check buffer space availability. - // NOTE: We always keep one byte open. - if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) - return false; - - // Don't store the ZMQ_MSG_SHARED flag. - uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; - - // Write message length, flags, and message body. - copy_to_file (&msg_size, sizeof msg_size); - copy_to_file (&msg_flags, sizeof msg_flags); - copy_to_file (zmq_msg_data (msg_), msg_size); - - zmq_msg_close (msg_); - - return true; -} - -void zmq::swap_t::fetch (zmq_msg_t *msg_) -{ - // There must be at least one message available. - zmq_assert (read_pos != write_pos); - - // Retrieve the message size. - size_t msg_size; - copy_from_file (&msg_size, sizeof msg_size); - - // Initialize the message. - zmq_msg_init_size (msg_, msg_size); - - // Retrieve the message flags. - copy_from_file (&msg_->flags, sizeof msg_->flags); - - // Retrieve the message payload. - copy_from_file (zmq_msg_data (msg_), msg_size); -} - -void zmq::swap_t::commit () -{ - commit_pos = write_pos; -} - -void zmq::swap_t::rollback () -{ - if (commit_pos == write_pos || read_pos == write_pos) - return; - - if (write_pos > read_pos) - zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); - else - zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); - - if (commit_pos / block_size == read_pos / block_size) { - write_buf_start_addr = commit_pos % block_size; - write_buf = read_buf; - } - else if (commit_pos / block_size != write_pos / block_size) { - write_buf_start_addr = commit_pos % block_size; - fill_buf (write_buf, write_buf_start_addr); - } - write_pos = commit_pos; -} - -bool zmq::swap_t::empty () -{ - return read_pos == write_pos; -} - -/* -bool zmq::swap_t::full () -{ - // Check that at least the message size can be written to the swap. - return buffer_space () < (int64_t) (sizeof (size_t) + 1); -} -*/ - -bool zmq::swap_t::fits (zmq_msg_t *msg_) -{ - // Check whether whole binary representation of the message - // fits into the swap. - size_t msg_size = zmq_msg_size (msg_); - if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) - return false; - return true; - } - -void zmq::swap_t::copy_from_file (void *buffer_, size_t count_) -{ - char *dest_ptr = (char *) buffer_; - size_t chunk_size, remainder = count_; - - while (remainder > 0) { - chunk_size = std::min (remainder, - std::min ((size_t) (filesize - read_pos), - (size_t) (block_size - read_pos % block_size))); - - memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size); - dest_ptr += chunk_size; - - read_pos = (read_pos + chunk_size) % filesize; - if (read_pos % block_size == 0) { - if (read_pos / block_size == write_pos / block_size) - read_buf = write_buf; - else - fill_buf (read_buf, read_pos); - } - remainder -= chunk_size; - } -} - -void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_) -{ - char *source_ptr = (char *) buffer_; - size_t chunk_size, remainder = count_; - - while (remainder > 0) { - chunk_size = std::min (remainder, - std::min ((size_t) (filesize - write_pos), - (size_t) (block_size - write_pos % block_size))); - - memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size); - source_ptr += chunk_size; - - write_pos = (write_pos + chunk_size) % filesize; - if (write_pos % block_size == 0) { - save_write_buf (); - write_buf_start_addr = write_pos; - - if (write_buf == read_buf) { - if (read_buf == buf2) - write_buf = buf1; - else - write_buf = buf2; - } - } - remainder -= chunk_size; - } -} - -void zmq::swap_t::fill_buf (char *buf, int64_t pos) -{ - if (file_pos != pos) { -#ifdef ZMQ_HAVE_WINDOWS - __int64 offset = _lseeki64 (fd, pos, SEEK_SET); -#else - off_t offset = lseek (fd, (off_t) pos, SEEK_SET); -#endif - errno_assert (offset == pos); - file_pos = pos; - } - size_t octets_stored = 0; - size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); - - while (octets_stored < octets_total) { -#ifdef ZMQ_HAVE_WINDOWS - int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored); -#else - ssize_t rc = read (fd, &buf [octets_stored], - octets_total - octets_stored); -#endif - errno_assert (rc > 0); - octets_stored += rc; - } - file_pos += octets_total; -} - -void zmq::swap_t::save_write_buf () -{ - if (file_pos != write_buf_start_addr) { -#ifdef ZMQ_HAVE_WINDOWS - __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET); -#else - off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET); -#endif - errno_assert (offset == write_buf_start_addr); - file_pos = write_buf_start_addr; - } - size_t octets_stored = 0; - size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); - - while (octets_stored < octets_total) { -#ifdef ZMQ_HAVE_WINDOWS - int rc = _write (fd, &write_buf [octets_stored], - octets_total - octets_stored); -#else - ssize_t rc = write (fd, &write_buf [octets_stored], - octets_total - octets_stored); -#endif - errno_assert (rc > 0); - octets_stored += rc; - } - file_pos += octets_total; -} - -int64_t zmq::swap_t::buffer_space () -{ - if (write_pos < read_pos) - return read_pos - write_pos; - - return filesize - (write_pos - read_pos); -} diff --git a/src/swap.hpp b/src/swap.hpp deleted file mode 100644 index ad2bcc3..0000000 --- a/src/swap.hpp +++ /dev/null @@ -1,123 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_SWAP_HPP_INCLUDED__ -#define __ZMQ_SWAP_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include <string> -#include "stdint.hpp" - -namespace zmq -{ - - // This class implements a message swap. Messages are retrieved from - // the swap in the same order as they entered it. - - class swap_t - { - public: - - enum { default_block_size = 8192 }; - - // Creates the swap. - swap_t (int64_t filesize_); - - ~swap_t (); - - int init (); - - // Stores the message into the swap. The function - // returns false if the swap is full; true otherwise. - bool store (zmq_msg_t *msg_); - - // Fetches the oldest message from the swap. It is an error - // to call this function when the swap is empty. - void fetch (zmq_msg_t *msg_); - - void commit (); - - void rollback (); - - // Returns true if the swap is empty; false otherwise. - bool empty (); - - -// // Returns true if and only if the swap is full. -// bool full (); - - // Returns true if the message fits into swap. - bool fits (zmq_msg_t *msg_); - - private: - - // Copies data from a memory buffer to the backing file. - // Wraps around when reaching maximum file size. - void copy_from_file (void *buffer_, size_t count_); - - // Copies data from the backing file to the memory buffer. - // Wraps around when reaching end-of-file. - void copy_to_file (const void *buffer_, size_t count_); - - // Returns the buffer space available. - int64_t buffer_space (); - - void fill_buf (char *buf, int64_t pos); - - void save_write_buf (); - - // File descriptor to the backing file. - int fd; - - // Name of the backing file. - std::string filename; - - // Maximum size of the backing file. - int64_t filesize; - - // File offset associated with the fd file descriptor. - int64_t file_pos; - - // File offset the next message will be stored at. - int64_t write_pos; - - // File offset the next message will be read from. - int64_t read_pos; - - int64_t commit_pos; - - size_t block_size; - - char *buf1; - char *buf2; - char *read_buf; - char *write_buf; - - int64_t write_buf_start_addr; - - // Disable copying of the swap object. - swap_t (const swap_t&); - const swap_t &operator = (const swap_t&); - }; - -} - -#endif |