diff options
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/msg_store.cpp | 313 | ||||
| -rw-r--r-- | src/msg_store.hpp | 114 | ||||
| -rw-r--r-- | src/pipe.cpp | 114 | ||||
| -rw-r--r-- | src/pipe.hpp | 29 | ||||
| -rw-r--r-- | src/session.cpp | 4 | ||||
| -rw-r--r-- | src/socket_base.cpp | 8 | 
7 files changed, 545 insertions, 39 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 9b0ff5d..977b655 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \      lb.hpp \      likely.hpp \      msg_content.hpp \ +    msg_store.hpp \      mutex.hpp \      object.hpp \      options.hpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \      ip.cpp \      kqueue.cpp \      lb.cpp \ +    msg_store.cpp \      object.cpp \      options.cpp \      owned.cpp \ diff --git a/src/msg_store.cpp b/src/msg_store.cpp new file mode 100644 index 0000000..c4b18ca --- /dev/null +++ b/src/msg_store.cpp @@ -0,0 +1,313 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <string.h> +#include <sstream> +#include <algorithm> + +#ifdef ZMQ_HAVE_WINDOWS +#include <io.h> +#else +#include <unistd.h> +#endif + +#include "atomic_counter.hpp" +#include "msg_store.hpp" +#include "err.hpp" + +zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : +    fd (-1), +    filesize (filesize_), +    file_pos (0), +    write_pos (0), +    read_pos (0), +    block_size (block_size_), +    write_buf_start_addr (0) +{ +    zmq_assert (filesize > 0); +    zmq_assert (block_size > 0); + +    buf1 = new (std::nothrow) char [block_size]; +    zmq_assert (buf1); + +    buf2 = new (std::nothrow) char [block_size]; +    zmq_assert (buf2); + +    read_buf = write_buf = buf1; +} + +zmq::msg_store_t::~msg_store_t () +{ +    delete [] buf1; +    delete [] buf2; + +    if (fd == -1) +        return; + +#ifdef ZMQ_HAVE_WINDOWS +    int rc = _close (fd); +#else +    int rc = close (fd); +#endif +    errno_assert (rc == 0); + +#ifdef ZMQ_HAVE_WINDOWS +    rc = _unlink (filename.c_str ()); +#else +    rc = unlink (filename.c_str ()); +#endif +    errno_assert (rc == 0); +} + +int zmq::msg_store_t::init () +{ +    static zmq::atomic_counter_t seqnum (0); + +    //  Get process ID. +#ifdef ZMQ_HAVE_WINDOWS +    int pid = GetCurrentThreadId (); +#else +    pid_t pid = getpid (); +#endif + +    std::ostringstream outs; +    outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; +    filename = outs.str (); + +    seqnum.add (1); + +    //  Open the backing file. +#ifdef ZMQ_HAVE_WINDOWS +    fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); +#else +    fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); +#endif +    if (fd == -1) +        return -1; + +#ifdef ZMQ_HAVE_LINUX +    //  Enable more aggresive read-ahead optimization. +    posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); +#endif +    return 0; +} + +bool zmq::msg_store_t::store (zmq_msg_t *msg_) +{ +    size_t msg_size = zmq_msg_size (msg_); + +    //  Check buffer space availability. +    //  NOTE: We always keep one byte open. +    if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) +        return false; + +    //  Don't store the ZMQ_MSG_SHARED flag. +    uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; + +    //  Write message length, flags, and message body. +    copy_to_file (&msg_size, sizeof msg_size); +    copy_to_file (&msg_flags, sizeof msg_flags); +    copy_to_file (zmq_msg_data (msg_), msg_size); + +    zmq_msg_close (msg_); + +    return true; +} + +void zmq::msg_store_t::fetch (zmq_msg_t *msg_) +{ +    //  There must be at least one message available. +    zmq_assert (read_pos != write_pos); + +    //  Retrieve the message size. +    size_t msg_size; +    copy_from_file (&msg_size, sizeof msg_size); + +    //  Initialize the message. +    zmq_msg_init_size (msg_, msg_size); + +    //  Retrieve the message flags. +    copy_from_file (&msg_->flags, sizeof msg_->flags); + +    //  Retrieve the message payload. +    copy_from_file (zmq_msg_data (msg_), msg_size); +} + +void zmq::msg_store_t::commit () +{ +    commit_pos = write_pos; +} + +void zmq::msg_store_t::rollback () +{ +    if (commit_pos == write_pos || read_pos == write_pos) +        return; + + +    if (write_pos > read_pos) +        zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); +    else +        zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); + +    if (commit_pos / block_size == read_pos / block_size) { +        write_buf_start_addr = commit_pos % block_size; +        write_buf = read_buf; +    } +    else if (commit_pos / block_size != write_pos / block_size) { +        write_buf_start_addr = commit_pos % block_size; +        fill_buf (write_buf, write_buf_start_addr); +    } + +    write_pos = commit_pos; +} + +bool zmq::msg_store_t::empty () +{ +    return read_pos == write_pos; +} + +bool zmq::msg_store_t::full () +{ +    return buffer_space () == 1; +} + +void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) +{ +    char *ptr = (char*) buffer_; +    size_t n, n_left = count_; + +    while (n_left > 0) { + +        n = std::min (n_left, std::min ((size_t) (filesize - read_pos), +            (size_t) (block_size - read_pos % block_size))); + +        memcpy (ptr, &read_buf [read_pos % block_size], n); +        ptr += n; + +        read_pos = (read_pos + n) % filesize; +        if (read_pos % block_size == 0) { +            if (read_pos / block_size == write_pos / block_size) +                read_buf = write_buf; +            else +                fill_buf (read_buf, read_pos); +        } + +        n_left -= n; +    } +} + +void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) +{ +    char *ptr = (char*) buffer_; +    size_t n, n_left = count_; + +    while (n_left > 0) { + +        n = std::min (n_left, std::min ((size_t) (filesize - write_pos), +            (size_t) (block_size - write_pos % block_size))); + +        memcpy (&write_buf [write_pos % block_size], ptr, n); +        ptr += n; + +        write_pos = (write_pos + n) % filesize; +        if (write_pos % block_size == 0) { + +            save_write_buf (); +            write_buf_start_addr = write_pos; + +            if (write_buf == read_buf) { +                if (read_buf == buf2) +                    write_buf = buf1; +                else +                    write_buf = buf2; +            } +        } + +        n_left -= n; +    } +} + +void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) +{ +    if (file_pos != pos) { +#ifdef ZMQ_HAVE_WINDOWS +        __int64 offset = _lseeki64 (fd, pos, SEEK_SET); +#else +        off_t offset = lseek (fd, (off_t) pos, SEEK_SET); +#endif +        errno_assert (offset == pos); +        file_pos = pos; +    } + +    size_t i = 0; +    size_t n = std::min (block_size, (size_t) (filesize - file_pos)); + +    while (i < n) { +#ifdef ZMQ_HAVE_WINDOWS +        int rc = _read (fd, &buf [i], n - i); +#else +        ssize_t rc = read (fd, &buf [i], n - i); +#endif +        errno_assert (rc > 0); +        i += rc; +    } + +    file_pos += n; +} + +void zmq::msg_store_t::save_write_buf () +{ +    if (file_pos != write_buf_start_addr) { +#ifdef ZMQ_HAVE_WINDOWS +        __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET); +#else +        off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET); +#endif +        errno_assert (offset == write_buf_start_addr); +        file_pos = write_buf_start_addr; +    } + +    size_t i = 0; +    size_t n = std::min (block_size, (size_t) (filesize - file_pos)); + +    while (i < n) { +#ifdef ZMQ_HAVE_WINDOWS +        int rc = _write (fd, &write_buf [i], n - i); +#else +        ssize_t rc = write (fd, &write_buf [i], n - i); +#endif +        errno_assert (rc > 0); +        i += rc; +    } + +    file_pos += n; +} + +int64_t zmq::msg_store_t::buffer_space () +{ +    if (write_pos < read_pos) +        return read_pos - write_pos; + +    return filesize - (write_pos - read_pos); +} diff --git a/src/msg_store.hpp b/src/msg_store.hpp new file mode 100644 index 0000000..765fc60 --- /dev/null +++ b/src/msg_store.hpp @@ -0,0 +1,114 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__ +#define __ZMQ_MSG_STORE_HPP_INCLUDED__ + +#include "../include/zmq.h" + +#include <string> +#include "stdint.hpp" + +namespace zmq +{ + +    //  This class implements a message store. Messages are retrieved from +    //  the store in the same order as they entered it. + +    class msg_store_t +    { +    public: + +        enum { default_block_size = 8192 }; + +        //  Creates message store. +        msg_store_t (int64_t filesize_, size_t block_size_ = default_block_size); + +        ~msg_store_t (); + +        int init (); + +        //  Stores the message into the message store. The function +        //  returns false if the message store is full; true otherwise. +        bool store (zmq_msg_t *msg_); + +        //  Fetches the oldest message from the message store. It is an error +        //  to call this function when the message store is empty. +        void fetch (zmq_msg_t *msg_); + +        void commit (); + +        void rollback (); + +        //  Returns true if the message store is empty; false otherwise. +        bool empty (); + +        //  Returns true if and only if the store is full. +        bool full (); + +    private: + +        //  Copies data from a memory buffer to the backing file. +        //  Wraps around when reaching maximum file size. +        void copy_from_file (void *buffer_, size_t count_); + +        //  Copies data from the backing file to the memory buffer. +        //  Wraps around when reaching end-of-file. +        void copy_to_file (const void *buffer_, size_t count_); + +        //  Returns the buffer space available. +        int64_t buffer_space (); + +        void fill_buf (char *buf, int64_t pos); + +        void save_write_buf (); + +        //  File descriptor to the backing file. +        int fd; + +        //  Name of the backing file. +        std::string filename; + +        //  Maximum size of the backing file. +        int64_t filesize; + +        //  File offset associated with the fd file descriptor. +        int64_t file_pos; + +        //  File offset the next message will be stored at. +        int64_t write_pos; + +        //  File offset the next message will be read from. +        int64_t read_pos; + +        int64_t commit_pos; + +        size_t block_size; + +        char *buf1; +        char *buf2; +        char *read_buf; +        char *write_buf; + +        int64_t write_buf_start_addr; +    }; + +} + +#endif diff --git a/src/pipe.cpp b/src/pipe.cpp index ff64716..b5c656d 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -21,20 +21,14 @@  #include "pipe.hpp" -zmq::reader_t::reader_t (object_t *parent_, -      uint64_t hwm_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :      object_t (parent_),      pipe (NULL),      peer (NULL), -    hwm (hwm_),      lwm (lwm_),      msgs_read (0),      endpoint (NULL) -{ -    //  Adjust lwm and hwm. -    if (lwm == 0 || lwm > hwm) -        lwm = hwm; -} +{}  zmq::reader_t::~reader_t ()  { @@ -113,20 +107,28 @@ void zmq::reader_t::process_pipe_term_ack ()  }  zmq::writer_t::writer_t (object_t *parent_, -      uint64_t hwm_, uint64_t lwm_) : +      uint64_t hwm_, int64_t swap_size_) :      object_t (parent_),      pipe (NULL),      peer (NULL),      hwm (hwm_), -    lwm (lwm_),      msgs_read (0),      msgs_written (0), +    msg_store (NULL), +    extra_msg_flag (false),      stalled (false), +    pending_close (false),      endpoint (NULL)  { -    //  Adjust lwm and hwm. -    if (lwm == 0 || lwm > hwm) -        lwm = hwm; +    if (swap_size_ > 0) { +        msg_store = new (std::nothrow) msg_store_t (swap_size_); +        if (msg_store != NULL) { +            if (msg_store->init () < 0) { +                delete msg_store; +                msg_store = NULL; +            } +        } +    }  }  void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -136,6 +138,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)  zmq::writer_t::~writer_t ()  { +    if (extra_msg_flag) +        zmq_msg_close (&extra_msg); + +    delete msg_store;  }  void zmq::writer_t::set_pipe (pipe_t *pipe_) @@ -147,7 +153,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)  bool zmq::writer_t::check_write ()  { -    if (pipe_full ()) { +    if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {          stalled = true;          return false;      } @@ -157,28 +163,45 @@ bool zmq::writer_t::check_write ()  bool zmq::writer_t::write (zmq_msg_t *msg_)  { -    if (pipe_full ()) { -        stalled = true; +    if (!check_write ())          return false; + +    if (pipe_full ()) { +        if (msg_store->store (msg_)) { +            if (!(msg_->flags & ZMQ_MSG_MORE)) +                msg_store->commit (); +        } else { +            extra_msg = *msg_; +            extra_msg_flag = true; +        } +    } +    else { +        pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); +        if (!(msg_->flags & ZMQ_MSG_MORE)) +            msgs_written++;      } -    pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); -    if (!(msg_->flags & ZMQ_MSG_MORE)) -        msgs_written++;      return true;  }  void zmq::writer_t::rollback ()  { -    zmq_msg_t msg; +    if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) { +        zmq_msg_close (&extra_msg); +        extra_msg_flag = false; +    } + +    if (msg_store != NULL) +        msg_store->rollback (); +    zmq_msg_t msg;      //  Remove all incomplete messages from the pipe.      while (pipe->unwrite (&msg)) {          zmq_assert (msg.flags & ZMQ_MSG_MORE);          zmq_msg_close (&msg);      } -    if (stalled && endpoint != NULL && !pipe_full()) { +    if (stalled && endpoint != NULL && check_write ()) {          stalled = false;          endpoint->revive (this);      } @@ -197,6 +220,14 @@ void zmq::writer_t::term ()      //  Rollback any unfinished messages.      rollback (); +    if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag)) +        write_delimiter (); +    else +        pending_close = true; +} + +void zmq::writer_t::write_delimiter () +{      //  Push delimiter into the pipe.      //  Trick the compiler to belive that the tag is a valid pointer.      zmq_msg_t msg; @@ -209,7 +240,42 @@ void zmq::writer_t::term ()  void zmq::writer_t::process_reader_info (uint64_t msgs_read_)  { +    zmq_msg_t msg; +      msgs_read = msgs_read_; +    if (msg_store) { + +        //  Move messages from backing store into pipe. +        while (!pipe_full () && !msg_store->empty ()) { +            msg_store->fetch(&msg); +            //  Write message into the pipe. +            pipe->write (msg, msg.flags & ZMQ_MSG_MORE); +            if (!(msg.flags & ZMQ_MSG_MORE)) +                msgs_written++; +        } + +        if (extra_msg_flag) { +            if (!pipe_full ()) { +                pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE); +                if (!(extra_msg.flags & ZMQ_MSG_MORE)) +                    msgs_written++; +                extra_msg_flag = false; +            } +            else if (msg_store->store (&extra_msg)) { +                if (!(extra_msg.flags & ZMQ_MSG_MORE)) +                    msg_store->commit (); +                extra_msg_flag = false; +            } +        } + +        if (pending_close && msg_store->empty () && !extra_msg_flag) { +            write_delimiter (); +            pending_close = false; +        } + +        flush (); +    } +      if (stalled && endpoint != NULL) {          stalled = false;          endpoint->revive (this); @@ -232,9 +298,9 @@ bool zmq::writer_t::pipe_full ()  }  zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, -      uint64_t hwm_) : -    reader (reader_parent_, hwm_, compute_lwm (hwm_)), -    writer (writer_parent_, hwm_, compute_lwm (hwm_)) +      uint64_t hwm_, int64_t swap_size_) : +    reader (reader_parent_, compute_lwm (hwm_)), +    writer (writer_parent_, hwm_, swap_size_)  {      reader.set_pipe (this);      writer.set_pipe (this); diff --git a/src/pipe.hpp b/src/pipe.hpp index 9f57653..a3516b5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -26,6 +26,7 @@  #include "i_endpoint.hpp"  #include "yarray_item.hpp"  #include "ypipe.hpp" +#include "msg_store.hpp"  #include "config.hpp"  #include "object.hpp" @@ -36,8 +37,7 @@ namespace zmq      {      public: -        reader_t (class object_t *parent_, -            uint64_t hwm_, uint64_t lwm_); +        reader_t (class object_t *parent_, uint64_t lwm_);          ~reader_t ();          void set_pipe (class pipe_t *pipe_); @@ -64,8 +64,7 @@ namespace zmq          //  Pipe writer associated with the other side of the pipe.          class writer_t *peer; -        //  High and low watermarks for in-memory storage (in bytes). -        uint64_t hwm; +        //  Low watermark for in-memory storage (in bytes).          uint64_t lwm;          //  Number of messages read so far. @@ -82,8 +81,7 @@ namespace zmq      {      public: -        writer_t (class object_t *parent_, -            uint64_t hwm_, uint64_t lwm_); +        writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_);          ~writer_t ();          void set_pipe (class pipe_t *pipe_); @@ -117,15 +115,18 @@ namespace zmq          //  Tests whether the pipe is already full.          bool pipe_full (); +        //  Write special message to the pipe so that the reader +        //  can find out we are finished. +        void write_delimiter (); +          //  The underlying pipe.          class pipe_t *pipe;          //  Pipe reader associated with the other side of the pipe.          class reader_t *peer; -        //  High and low watermarks for in-memory storage (in bytes). +        //  High watermark for in-memory storage (in bytes).          uint64_t hwm; -        uint64_t lwm;          //  Last confirmed number of messages read from the pipe.          //  The actual number can be higher. @@ -134,9 +135,19 @@ namespace zmq          //  Number of messages we have written so far.          uint64_t msgs_written; +        //  Pointer to backing store. If NULL, messages are always +        //  kept in main memory. +        msg_store_t *msg_store; + +        bool extra_msg_flag; + +        zmq_msg_t extra_msg; +          //  True iff the last attempt to write a message has failed.          bool stalled; +        bool pending_close; +          //  Endpoint (either session or socket) the pipe is attached to.          i_endpoint *endpoint; @@ -150,7 +161,7 @@ namespace zmq      public:          pipe_t (object_t *reader_parent_, object_t *writer_parent_, -            uint64_t hwm_); +            uint64_t hwm_, int64_t swap_size_);          ~pipe_t ();          reader_t reader; diff --git a/src/session.cpp b/src/session.cpp index 3cd27fb..f798877 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -265,7 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,      writer_t *socket_writer = NULL;      if (options.requires_in && !out_pipe) { -        pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm); +        pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap);          zmq_assert (pipe);          out_pipe = &pipe->writer;          out_pipe->set_endpoint (this); @@ -273,7 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,      }      if (options.requires_out && !in_pipe) { -        pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm); +        pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap);          zmq_assert (pipe);          in_pipe = &pipe->reader;          in_pipe->set_endpoint (this); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index cca83f7..56c3b1a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -195,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_)          //  Create inbound pipe, if required.          if (options.requires_in) { -            in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm); +            in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap);              zmq_assert (in_pipe);          }          //  Create outbound pipe, if required.          if (options.requires_out) { -            out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm); +            out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap);              zmq_assert (out_pipe);          } @@ -234,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_)          //  Create inbound pipe, if required.          if (options.requires_in) { -            in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm); +            in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap);              zmq_assert (in_pipe);          }          //  Create outbound pipe, if required.          if (options.requires_out) { -            out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm); +            out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap);              zmq_assert (out_pipe);          } | 
