From 4ed70a930202b103e7e80b8dc925e0aaa4622595 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 29 Jul 2009 12:07:54 +0200 Subject: initial commit --- AUTHORS | 0 ChangeLog | 0 Makefile.am | 4 + NEWS | 0 README | 0 autogen.sh | 29 +++++ configure.in | 188 +++++++++++++++++++++++++++ examples/Makefile.am | 2 + examples/chat/Makefile.am | 15 +++ examples/chat/chatroom.cpp | 74 +++++++++++ examples/chat/display.cpp | 56 ++++++++ examples/chat/prompt.cpp | 61 +++++++++ include/zs.h | 206 ++++++++++++++++++++++++++++++ include/zs.hpp | 231 +++++++++++++++++++++++++++++++++ src/Makefile.am | 120 ++++++++++++++++++ src/app_thread.cpp | 221 ++++++++++++++++++++++++++++++++ src/app_thread.hpp | 95 ++++++++++++++ src/atomic.hpp | 310 +++++++++++++++++++++++++++++++++++++++++++++ src/atomic_bitmap.hpp | 286 +++++++++++++++++++++++++++++++++++++++++ src/atomic_counter.hpp | 197 ++++++++++++++++++++++++++++ src/atomic_ptr.hpp | 189 +++++++++++++++++++++++++++ src/command.hpp | 98 ++++++++++++++ src/config.hpp | 71 +++++++++++ src/connecter.cpp | 189 +++++++++++++++++++++++++++ src/connecter.hpp | 99 +++++++++++++++ src/data_distributor.cpp | 155 +++++++++++++++++++++++ src/data_distributor.hpp | 70 ++++++++++ src/decoder.hpp | 101 +++++++++++++++ src/devpoll.cpp | 224 ++++++++++++++++++++++++++++++++ src/devpoll.hpp | 110 ++++++++++++++++ src/dispatcher.cpp | 266 ++++++++++++++++++++++++++++++++++++++ src/dispatcher.hpp | 170 +++++++++++++++++++++++++ src/dummy_aggregator.cpp | 111 ++++++++++++++++ src/dummy_aggregator.hpp | 73 +++++++++++ src/dummy_distributor.cpp | 85 +++++++++++++ src/dummy_distributor.hpp | 68 ++++++++++ src/encoder.hpp | 108 ++++++++++++++++ src/epoll.cpp | 214 +++++++++++++++++++++++++++++++ src/epoll.hpp | 107 ++++++++++++++++ src/err.cpp | 146 +++++++++++++++++++++ src/err.hpp | 90 +++++++++++++ src/fair_aggregator.cpp | 143 +++++++++++++++++++++ src/fair_aggregator.hpp | 77 +++++++++++ src/fd.hpp | 44 +++++++ src/fd_signaler.cpp | 278 ++++++++++++++++++++++++++++++++++++++++ src/fd_signaler.hpp | 92 ++++++++++++++ src/i_api.hpp | 39 ++++++ src/i_demux.hpp | 56 ++++++++ src/i_engine.hpp | 53 ++++++++ src/i_mux.hpp | 59 +++++++++ src/i_poll_events.hpp | 45 +++++++ src/i_poller.hpp | 89 +++++++++++++ src/i_session.hpp | 37 ++++++ src/i_signaler.hpp | 38 ++++++ src/i_thread.hpp | 38 ++++++ src/io_object.cpp | 37 ++++++ src/io_object.hpp | 51 ++++++++ src/io_thread.cpp | 177 ++++++++++++++++++++++++++ src/io_thread.hpp | 99 +++++++++++++++ src/ip.cpp | 310 +++++++++++++++++++++++++++++++++++++++++++++ src/ip.hpp | 47 +++++++ src/kqueue.cpp | 214 +++++++++++++++++++++++++++++++ src/kqueue.hpp | 112 ++++++++++++++++ src/listener.cpp | 170 +++++++++++++++++++++++++ src/listener.hpp | 110 ++++++++++++++++ src/load_balancer.cpp | 130 +++++++++++++++++++ src/load_balancer.hpp | 73 +++++++++++ src/msg.hpp | 49 +++++++ src/mutex.hpp | 116 +++++++++++++++++ src/object.cpp | 294 ++++++++++++++++++++++++++++++++++++++++++ src/object.hpp | 105 +++++++++++++++ src/p2p.cpp | 29 +++++ src/p2p.hpp | 42 ++++++ src/pipe.cpp | 47 +++++++ src/pipe.hpp | 57 +++++++++ src/pipe_reader.cpp | 118 +++++++++++++++++ src/pipe_reader.hpp | 89 +++++++++++++ src/pipe_writer.cpp | 120 ++++++++++++++++++ src/pipe_writer.hpp | 88 +++++++++++++ src/platform.hpp.in | 210 ++++++++++++++++++++++++++++++ src/poll.cpp | 205 ++++++++++++++++++++++++++++++ src/poll.hpp | 112 ++++++++++++++++ src/pub.cpp | 38 ++++++ src/pub.hpp | 45 +++++++ src/rep.cpp | 29 +++++ src/rep.hpp | 42 ++++++ src/req.cpp | 29 +++++ src/req.hpp | 42 ++++++ src/safe_object.cpp | 76 +++++++++++ src/safe_object.hpp | 68 ++++++++++ src/select.cpp | 236 ++++++++++++++++++++++++++++++++++ src/select.hpp | 122 ++++++++++++++++++ src/session.cpp | 273 +++++++++++++++++++++++++++++++++++++++ src/session.hpp | 107 ++++++++++++++++ src/session_stub.cpp | 110 ++++++++++++++++ src/session_stub.hpp | 83 ++++++++++++ src/simple_semaphore.hpp | 188 +++++++++++++++++++++++++++ src/socket_base.cpp | 267 ++++++++++++++++++++++++++++++++++++++ src/socket_base.hpp | 96 ++++++++++++++ src/stdint.hpp | 70 ++++++++++ src/sub.cpp | 45 +++++++ src/sub.hpp | 46 +++++++ src/tcp_connecter.cpp | 138 ++++++++++++++++++++ src/tcp_connecter.hpp | 65 ++++++++++ src/tcp_listener.cpp | 165 ++++++++++++++++++++++++ src/tcp_listener.hpp | 65 ++++++++++ src/tcp_socket.cpp | 116 +++++++++++++++++ src/tcp_socket.hpp | 70 ++++++++++ src/thread.cpp | 88 +++++++++++++ src/thread.hpp | 77 +++++++++++ src/uuid.cpp | 136 ++++++++++++++++++++ src/uuid.hpp | 82 ++++++++++++ src/windows.hpp | 56 ++++++++ src/wire.hpp | 98 ++++++++++++++ src/ypipe.hpp | 209 ++++++++++++++++++++++++++++++ src/ypollset.cpp | 56 ++++++++ src/ypollset.hpp | 74 +++++++++++ src/yqueue.hpp | 138 ++++++++++++++++++++ src/zmq_decoder.cpp | 78 ++++++++++++ src/zmq_decoder.hpp | 57 +++++++++ src/zmq_encoder.cpp | 75 +++++++++++ src/zmq_encoder.hpp | 54 ++++++++ src/zmq_tcp_engine.cpp | 185 +++++++++++++++++++++++++++ src/zmq_tcp_engine.hpp | 92 ++++++++++++++ src/zs.cpp | 222 ++++++++++++++++++++++++++++++++ 125 files changed, 13546 insertions(+) create mode 100644 AUTHORS create mode 100644 ChangeLog create mode 100644 Makefile.am create mode 100644 NEWS create mode 100644 README create mode 100755 autogen.sh create mode 100644 configure.in create mode 100644 examples/Makefile.am create mode 100644 examples/chat/Makefile.am create mode 100644 examples/chat/chatroom.cpp create mode 100644 examples/chat/display.cpp create mode 100644 examples/chat/prompt.cpp create mode 100644 include/zs.h create mode 100644 include/zs.hpp create mode 100644 src/Makefile.am create mode 100644 src/app_thread.cpp create mode 100644 src/app_thread.hpp create mode 100644 src/atomic.hpp create mode 100644 src/atomic_bitmap.hpp create mode 100644 src/atomic_counter.hpp create mode 100644 src/atomic_ptr.hpp create mode 100644 src/command.hpp create mode 100644 src/config.hpp create mode 100644 src/connecter.cpp create mode 100644 src/connecter.hpp create mode 100644 src/data_distributor.cpp create mode 100644 src/data_distributor.hpp create mode 100644 src/decoder.hpp create mode 100644 src/devpoll.cpp create mode 100644 src/devpoll.hpp create mode 100644 src/dispatcher.cpp create mode 100644 src/dispatcher.hpp create mode 100644 src/dummy_aggregator.cpp create mode 100644 src/dummy_aggregator.hpp create mode 100644 src/dummy_distributor.cpp create mode 100644 src/dummy_distributor.hpp create mode 100644 src/encoder.hpp create mode 100644 src/epoll.cpp create mode 100644 src/epoll.hpp create mode 100644 src/err.cpp create mode 100644 src/err.hpp create mode 100644 src/fair_aggregator.cpp create mode 100644 src/fair_aggregator.hpp create mode 100644 src/fd.hpp create mode 100644 src/fd_signaler.cpp create mode 100644 src/fd_signaler.hpp create mode 100644 src/i_api.hpp create mode 100644 src/i_demux.hpp create mode 100644 src/i_engine.hpp create mode 100644 src/i_mux.hpp create mode 100644 src/i_poll_events.hpp create mode 100644 src/i_poller.hpp create mode 100644 src/i_session.hpp create mode 100644 src/i_signaler.hpp create mode 100644 src/i_thread.hpp create mode 100644 src/io_object.cpp create mode 100644 src/io_object.hpp create mode 100644 src/io_thread.cpp create mode 100644 src/io_thread.hpp create mode 100644 src/ip.cpp create mode 100644 src/ip.hpp create mode 100644 src/kqueue.cpp create mode 100644 src/kqueue.hpp create mode 100644 src/listener.cpp create mode 100644 src/listener.hpp create mode 100644 src/load_balancer.cpp create mode 100644 src/load_balancer.hpp create mode 100644 src/msg.hpp create mode 100644 src/mutex.hpp create mode 100644 src/object.cpp create mode 100644 src/object.hpp create mode 100644 src/p2p.cpp create mode 100644 src/p2p.hpp create mode 100644 src/pipe.cpp create mode 100644 src/pipe.hpp create mode 100644 src/pipe_reader.cpp create mode 100644 src/pipe_reader.hpp create mode 100644 src/pipe_writer.cpp create mode 100644 src/pipe_writer.hpp create mode 100644 src/platform.hpp.in create mode 100644 src/poll.cpp create mode 100644 src/poll.hpp create mode 100644 src/pub.cpp create mode 100644 src/pub.hpp create mode 100644 src/rep.cpp create mode 100644 src/rep.hpp create mode 100644 src/req.cpp create mode 100644 src/req.hpp create mode 100644 src/safe_object.cpp create mode 100644 src/safe_object.hpp create mode 100644 src/select.cpp create mode 100644 src/select.hpp create mode 100644 src/session.cpp create mode 100644 src/session.hpp create mode 100644 src/session_stub.cpp create mode 100644 src/session_stub.hpp create mode 100644 src/simple_semaphore.hpp create mode 100644 src/socket_base.cpp create mode 100644 src/socket_base.hpp create mode 100644 src/stdint.hpp create mode 100644 src/sub.cpp create mode 100644 src/sub.hpp create mode 100644 src/tcp_connecter.cpp create mode 100644 src/tcp_connecter.hpp create mode 100644 src/tcp_listener.cpp create mode 100644 src/tcp_listener.hpp create mode 100644 src/tcp_socket.cpp create mode 100644 src/tcp_socket.hpp create mode 100644 src/thread.cpp create mode 100644 src/thread.hpp create mode 100644 src/uuid.cpp create mode 100644 src/uuid.hpp create mode 100644 src/windows.hpp create mode 100644 src/wire.hpp create mode 100644 src/ypipe.hpp create mode 100644 src/ypollset.cpp create mode 100644 src/ypollset.hpp create mode 100644 src/yqueue.hpp create mode 100644 src/zmq_decoder.cpp create mode 100644 src/zmq_decoder.hpp create mode 100644 src/zmq_encoder.cpp create mode 100644 src/zmq_encoder.hpp create mode 100644 src/zmq_tcp_engine.cpp create mode 100644 src/zmq_tcp_engine.hpp create mode 100644 src/zs.cpp diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..e69de29 diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..e69de29 diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 0000000..cc77307 --- /dev/null +++ b/Makefile.am @@ -0,0 +1,4 @@ +include_HEADERS = include/zs.h include/zs.hpp + +SUBDIRS = src examples +DIST_SUBDIRS = src examples diff --git a/NEWS b/NEWS new file mode 100644 index 0000000..e69de29 diff --git a/README b/README new file mode 100644 index 0000000..e69de29 diff --git a/autogen.sh b/autogen.sh new file mode 100755 index 0000000..19f2174 --- /dev/null +++ b/autogen.sh @@ -0,0 +1,29 @@ +#!/bin/sh +# Copyright (c) 2007 FastMQ Inc. +# +# This file is part of 0MQ. +# +# 0MQ is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + +# 0MQ is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Script to generate all required files from fresh svn checkout. + + + +autoreconf --install --force --verbose -I config + +if [ $? -ne 0 ]; then + echo + echo "Could not run autoreconf, check autotools installation." + echo +fi diff --git a/configure.in b/configure.in new file mode 100644 index 0000000..e75ab11 --- /dev/null +++ b/configure.in @@ -0,0 +1,188 @@ +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. +AC_PREREQ(2.61) +AC_INIT([zsock],[dev]) +AC_CONFIG_AUX_DIR(config) +AM_CONFIG_HEADER(src/platform.hpp) +AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION) + +AM_PROG_CC_C_O + +# Checks for programs. +AC_PROG_CXX +AC_PROG_LIBTOOL + +# Checks for libraries. +AC_CHECK_LIB(pthread, pthread_create) + +# Host speciffic checks +AC_CANONICAL_HOST + +case "${host_os}" in + *linux*) + AC_DEFINE(ZS_HAVE_LINUX, 1, [Have Linux OS]) + CPPFLAGS="-D_REENTRANT $CPPFLAGS" + sed < libtool > libtool-2 \ + 's/^hardcode_libdir_flag_spec.*$'/'hardcode_libdir_flag_spec=" "/' + mv libtool-2 libtool + chmod 755 libtool + AC_CHECK_LIB(uuid, uuid_generate) + ;; + *solaris*) + AC_DEFINE(ZS_HAVE_SOLARIS, 1, [Have Solaris OS]) + AC_CHECK_LIB(socket, main) + AC_CHECK_LIB(nsl, main) + AC_CHECK_LIB(rt, main) + CPPFLAGS="-D_REENTRANT -D_PTHREADS $CPPFLAGS" + AC_MSG_CHECKING([wheter atomic operations can be used]) + AC_COMPILE_IFELSE([AC_LANG_PROGRAM( + [[#include ]], + [[uint32_t value; + atomic_cas_32 (&value, 0, 0); + return 0;]])], + [solaris_has_atomic=yes], + [solaris_has_atomic=no]) + AC_MSG_RESULT([$solaris_has_atomic]) + # Solaris 8 does not have atomic operations exported to user space. + if test "x$solaris_has_atomic" = "xno"; then + AC_DEFINE(ZS_FORCE_MUTEXES, 1, [Force to use mutexes]) + fi + ;; + *freebsd*) + AC_DEFINE(ZS_HAVE_FREEBSD, 1, [Have FreeBSD OS]) + CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS" + LIBS="-pthread" + ;; + *darwin*) + AC_DEFINE(ZS_HAVE_OSX, 1, [Have DarwinOSX OS]) + LIBS="-pthread" + ZS_EXTRA_CXXFLAGS+="-Wno-uninitialized" + ;; + *openbsd*) + AC_DEFINE(ZS_HAVE_OPENBSD, 1, [Have OpenBSD OS]) + CPPFLAGS="-pthread $CPPFLAGS" + LIBS="-pthread" + ;; + *nto-qnx*) + AC_DEFINE(ZS_HAVE_QNXNTO, 1, [Have QNX Neutrino OS]) + CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS" + AC_CHECK_LIB(socket,main) + ;; + *aix*) + AC_DEFINE(ZS_HAVE_AIX, 1, [Have AIX OS]) + if test "x$GXX" = "xyes"; then + CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS" + fi + ;; + *hpux*) + AC_DEFINE(ZS_HAVE_HPUX, 1, [Have HPUX OS]) + if test "x$GXX" = "xyes"; then + CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS" + fi + AC_CHECK_LIB(rt, main) + sed < libtool > libtool-2 \ + 's/^hardcode_libdir_flag_spec.*$'/'hardcode_libdir_flag_spec=" "/' + mv libtool-2 libtool + chmod 755 libtool + ;; + *mingw32*) + AC_DEFINE(ZS_HAVE_WINDOWS, 1, [Have Windows OS]) + AC_DEFINE(ZS_HAVE_MINGW32, 1, [Have MinGW32]) + AC_CHECK_HEADERS(windows.h) + LIBS="-lwsock32 -lws2_32 -no-undefined" + CFLAGS="-std=c99" + install_man="no" + ;; + *) + AC_MSG_ERROR([Not supported os: $host.]) + ;; +esac + +# Check if we are running at sparc harware +AC_MSG_CHECKING([wheter __sparc__ is defined]) +AC_COMPILE_IFELSE([AC_LANG_PROGRAM( + [[#if defined __sparc__ + //OK we are on sparc + #else + error: we are not on sparc + #endif + ]])], + [sparc=yes], + [sparc=no]) + +AC_MSG_RESULT([$sparc]) + +if test "x$sparc" = "xyes"; then + CPPFLAGS="$CPPFLAGS -mcpu=v9" +fi + +# Checks for header files. +AC_HEADER_STDC +AC_CHECK_HEADERS(errno.h arpa/inet.h netinet/tcp.h netinet/in.h stddef.h \ +stdlib.h string.h sys/socket.h sys/time.h unistd.h limits.h) + +# Check if we have eventfd.h header file. +AC_CHECK_HEADERS(sys/eventfd.h, [AC_DEFINE(ZS_HAVE_EVENTFD, 1, [Have eventfd extension.])]) + +# Check if we have ifaddrs.h header file. +AC_CHECK_HEADERS(ifaddrs.h, [AC_DEFINE(ZS_HAVE_IFADDRS, 1, [Have ifaddrs.h header.])]) + +# Use c++ in subsequent tests +AC_LANG(C++) + +# Optional stuff +AC_CHECK_PROG(have_pkg_config, pkg-config, yes, no) + +if test "x$have_pkg_config" != "xno"; then + # First instance of PKG_CHECK_ has to be executed + PKG_CHECK_EXISTS([dummy_pkg], [], []) +fi + +# Checks for typedefs, structures, and compiler characteristics. +AC_HEADER_STDBOOL +AC_C_CONST +AC_C_INLINE +AC_TYPE_SIZE_T +AC_TYPE_SSIZE_T +AC_HEADER_TIME +AC_TYPE_UINT32_T +AC_C_VOLATILE + +# Substs +stdint="0" +if test "x$HAVE_STDINT_H" = "xyes"; then + stdint="1" +fi + +inttypes="0" +if test "x$HAVE_INTTYPES_H" = "xyes"; then + inttypes="1" +fi + +AC_SUBST(stdint) +AC_SUBST(inttypes) + +# Subst ZS_EXTRA_CXXFLAGS +AC_SUBST(ZS_EXTRA_CXXFLAGS) + + +# Checks for library functions. +AC_FUNC_MALLOC +AC_TYPE_SIGNAL +AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) + +AC_OUTPUT(Makefile src/Makefile examples/Makefile examples/chat/Makefile) + +AC_MSG_RESULT([]) +AC_MSG_RESULT([ ******************************************************** ]) +AC_MSG_RESULT([ 0SOCKETS ]) +AC_MSG_RESULT([ ******************************************************** ]) +AC_MSG_RESULT([ This software is distributed under the terms and ]) +AC_MSG_RESULT([ conditions of the LESSER GNU GENERAL PUBLIC LICENSE. ]) +AC_MSG_RESULT([ See the file COPYING and COPYING.LESSER for the full ]) +AC_MSG_RESULT([ license text. ]) +AC_MSG_RESULT([ ******************************************************** ]) +AC_MSG_RESULT([]) +AC_MSG_RESULT([ zsock install dir: $prefix]) +AC_MSG_RESULT([]) + diff --git a/examples/Makefile.am b/examples/Makefile.am new file mode 100644 index 0000000..5ab090f --- /dev/null +++ b/examples/Makefile.am @@ -0,0 +1,2 @@ +SUBDIRS = chat +DIST_SUBDIRS = chat diff --git a/examples/chat/Makefile.am b/examples/chat/Makefile.am new file mode 100644 index 0000000..afdb827 --- /dev/null +++ b/examples/chat/Makefile.am @@ -0,0 +1,15 @@ +INCLUDES = -I$(top_builddir) -I$(top_builddir)/include + +noinst_PROGRAMS = chatroom display prompt + +chatroom_SOURCES = chatroom.cpp +chatroom_LDADD = $(top_builddir)/src/libzs.la +chatroom_CXXFLAGS = -Wall -pedantic -Werror + +display_SOURCES = display.cpp +display_LDADD = $(top_builddir)/src/libzs.la +display_CXXFLAGS = -Wall -pedantic -Werror + +prompt_SOURCES = prompt.cpp +prompt_LDADD = $(top_builddir)/src/libzs.la +prompt_CXXFLAGS = -Wall -pedantic -Werror diff --git a/examples/chat/chatroom.cpp b/examples/chat/chatroom.cpp new file mode 100644 index 0000000..f2240ab --- /dev/null +++ b/examples/chat/chatroom.cpp @@ -0,0 +1,74 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include +#include +#include + +using namespace std; + +#include + +int main (int argc, const char *argv []) +{ + // Check the command line syntax + if (argc != 3) { + cerr << "usage: chatroom " << endl; + return 1; + } + + // Retrieve command line arguments + const char *in_interface = argv [1]; + const char *out_interface = argv [2]; + + // Initialise 0MQ infrastructure + zs::context_t ctx (1, 1); + + // Create two sockets. One for receiving messages from 'propmt' + // applications, one for sending messages to 'display' applications + zs::socket_t in_socket (ctx, ZS_SUB); + in_socket.bind (in_interface); + zs::socket_t out_socket (ctx, ZS_PUB); + out_socket.bind (out_interface); + + while (true) { + + // Get a message + zs::message_t in_message; + in_socket.recv (&in_message); + + // Get the current time. Replace the newline character at the end + // by space character. + char timebuf [256]; + time_t current_time; + time (¤t_time); + snprintf (timebuf, 256, "%s", ctime (¤t_time)); + timebuf [strlen (timebuf) - 1] = ' '; + + // Create and fill in the message + zs::message_t out_message (strlen (timebuf) + in_message.size ()); + char *data = (char*) out_message.data (); + memcpy (data, timebuf, strlen (timebuf)); + data += strlen (timebuf); + memcpy (data, in_message.data (), in_message.size ()); + + // Send the message + out_socket.send (out_message); + } +} diff --git a/examples/chat/display.cpp b/examples/chat/display.cpp new file mode 100644 index 0000000..ceb096f --- /dev/null +++ b/examples/chat/display.cpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include +#include +#include + +using namespace std; + +#include + +int main (int argc, const char *argv []) +{ + // Check the command line syntax. + if (argc != 2) { + cerr << "usage: display " << endl; + return 1; + } + + // Retrieve command line arguments + const char *chatroom_out_address = argv [1]; + + // Initialise 0MQ infrastructure, connect to the chatroom and ask for all + // messages and gap notifications. + zs::context_t ctx (1, 1); + zs::socket_t s (ctx, ZS_SUB); + s.connect (chatroom_out_address); + s.subscribe ("*"); + + while (true) { + + // Get a message and print it to the console. + zs::message_t message; + s.recv (&message); + if (message.type () == zs::message_gap) + cout << "Problems connecting to the chatroom..." << endl; + else + cout << (char*) message.data () << flush; + } +} diff --git a/examples/chat/prompt.cpp b/examples/chat/prompt.cpp new file mode 100644 index 0000000..461e7b8 --- /dev/null +++ b/examples/chat/prompt.cpp @@ -0,0 +1,61 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include +#include +#include + +using namespace std; + +#include + +int main (int argc, const char *argv []) +{ + // Check the command line syntax. + if (argc != 3) { + cerr << "usage: prompt " << endl; + return 1; + } + + // Retrieve command line arguments + const char *chatroom_in_address = argv [1]; + const char *user_name = argv [2]; + + // Initialise 0MQ infrastructure and connect to the chatroom. + zs::context_t ctx (1, 1); + zs::socket_t s (ctx, ZS_PUB); + s.connect (chatroom_in_address); + + while (true) { + + // Allow user to input the message text. Prepend it by user name. + char textbuf [1024]; + char *rcc = fgets (textbuf, sizeof (textbuf), stdin); + assert (rcc); + string text (user_name); + text = text + ": " + textbuf; + + // Create the message (terminating zero is part of the message) + zs::message_t message (text.size () + 1); + memcpy (message.data (), text.c_str (), text.size () + 1); + + // Send the message + s.send (message); + } +} diff --git a/include/zs.h b/include/zs.h new file mode 100644 index 0000000..cae1a17 --- /dev/null +++ b/include/zs.h @@ -0,0 +1,206 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZSOCKETS_H_INCLUDED__ +#define __ZSOCKETS_H_INCLUDED__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +#if defined MSC_VER && defined ZS_BUILDING_LIBZS +#define ZS_EXPORT __declspec(dllexport) +#else +#define ZS_EXPORT +#endif + +// Maximal size of "Very Small Message". VSMs are passed by value +// to avoid excessive memory allocation/deallocation. +#define ZS_MAX_VSM_SIZE 30 + +// Message & notification types. +#define ZS_GAP 1 +#define ZS_DELIMITER 31 +#define ZS_VSM 32 + +// The operation should be performed in non-blocking mode. I.e. if it cannot +// be processed immediately, error should be returned with errno set to EAGAIN. +#define ZS_NOBLOCK 1 + +// zs_send should not flush the message downstream immediately. Instead, it +// should batch ZS_NOFLUSH messages and send them downstream only when zs_flush +// is invoked. This is an optimisation for cases where several messages are +// sent in a single business transaction. However, the effect is measurable +// only in extremely high-perf scenarios (million messages a second or so). +// If that's not your case, use standard flushing send instead. See exchange +// example for illustration of ZS_NOFLUSH functionality. +#define ZS_NOFLUSH 2 + +// Socket to communicate with a single peer. Allows for a singe connect or a +// single accept. There's no message routing or message filtering involved. +#define ZS_P2P 0 + +// Socket to distribute data. Recv fuction is not implemeted for this socket +// type. Messages are distributed in fanout fashion to all peers. +#define ZS_PUB 1 + +// Socket to subscribe to distributed data. Send function is not implemented +// for this socket type. However, subscribe function can be used to modify the +// message filter. +#define ZS_SUB 2 + +// Socket to send requests on and receive replies from. Requests are +// load-balanced among all the peers. This socket type doesn't allow for more +// recv's that there were send's. +#define ZS_REQ 3 + +// Socket to receive requests from and send replies to. This socket type allows +// only an alternated sequence of recv's and send's. Each send is routed to +// the peer that the previous recv delivered message from. +#define ZS_REP 4 + +// Prototype for the message body deallocation functions. +// It is deliberately defined in the way to comply with standard C free. +typedef void (zs_free_fn) (void *data); + +// A message. If 'shared' is true, message content pointed to by 'content' +// is shared, i.e. reference counting is used to manage its lifetime +// rather than straighforward malloc/free. struct zs_msg_content is +// not declared in the API. +struct zs_msg +{ + struct zs_msg_content *content; + unsigned char shared; + uint16_t vsm_size; + unsigned char vsm_data [ZS_MAX_VSM_SIZE]; +}; + +// TODO: Different options... +struct zs_opts +{ + uint64_t hwm; + uint64_t lwm; + uint64_t swap; + uint64_t mask; + uint64_t taskset; + const char *identity; + const char *args; +}; + +// Initialise an empty message (zero bytes long). +ZS_EXPORT int zs_msg_init (zs_msg *msg); + +// Initialise a message 'size' bytes long. +// +// Errors: ENOMEM - the size is too large to allocate. +ZS_EXPORT int zs_msg_init_size (zs_msg *msg, size_t size); + +// Initialise a message from an existing buffer. Message isn't copied, +// instead 0SOCKETS infrastructure take ownership of the buffer and call +// deallocation functio (ffn) once it's not needed anymore. +ZS_EXPORT int zs_msg_init_data (zs_msg *msg, void *data, size_t size, + zs_free_fn *ffn); + +// Deallocate the message. +ZS_EXPORT int zs_msg_close (zs_msg *msg); + +// Move the content of the message from 'src' to 'dest'. The content isn't +// copied, just moved. 'src' is an empty message after the call. Original +// content of 'dest' message is deallocated. +ZS_EXPORT int zs_msg_move (zs_msg *dest, zs_msg *src); + +// Copy the 'src' message to 'dest'. The content isn't copied, instead +// reference count is increased. Don't modify the message data after the +// call as they are shared between two messages. Original content of 'dest' +// message is deallocated. +ZS_EXPORT int zs_msg_copy (zs_msg *dest, zs_msg *src); + +// Returns pointer to message data. +ZS_EXPORT void *zs_msg_data (zs_msg *msg); + +// Return size of message data (in bytes). +ZS_EXPORT size_t zs_msg_size (zs_msg *msg); + +// Returns type of the message. +ZS_EXPORT int zs_msg_type (zs_msg *msg); + +// Initialise 0SOCKETS context. 'app_threads' specifies maximal number +// of application threads that can have open sockets at the same time. +// 'io_threads' specifies the size of thread pool to handle I/O operations. +// +// Errors: EINVAL - one of the arguments is less than zero or there are no +// threads declared at all. +ZS_EXPORT void *zs_init (int app_threads, int io_threads); + +// Deinitialise 0SOCKETS context including all the open sockets. Closing +// sockets after zs_term has been called will result in undefined behaviour. +ZS_EXPORT int zs_term (void *context); + +// Open a socket. +// +// Errors: EINVAL - invalid socket type. +// EMFILE - the number of application threads entitled to hold open +// sockets at the same time was exceeded. +ZS_EXPORT void *zs_socket (void *context, int type); + +// Close the socket. +ZS_EXPORT int zs_close (void *s); + +// Bind the socket to a particular address. +ZS_EXPORT int zs_bind (void *s, const char *addr, zs_opts *opts); + +// Connect the socket to a particular address. +ZS_EXPORT int zs_connect (void *s, const char *addr, zs_opts *opts); + +// Subscribe for the subset of messages identified by 'criteria' argument. +ZS_EXPORT int zs_subscribe (void *s, const char *criteria); + +// Send the message 'msg' to the socket 's'. 'flags' argument can be +// combination of following values: +// ZS_NOBLOCK - if message cannot be sent, return immediately. +// ZS_NOFLUSH - message won't be sent immediately. It'll be sent with either +// subsequent flushing send or explicit call to zs_flush function. +// +// Errors: EAGAIN - message cannot be sent at the moment (applies only to +// non-blocking send). +// ENOTSUP - function isn't supported by particular socket type. +ZS_EXPORT int zs_send (void *s, zs_msg *msg, int flags); + +// Flush the messages that were send using ZS_NOFLUSH flag down the stream. +// +// Errors: ENOTSUP - function isn't supported by particular socket type. +ZS_EXPORT int zs_flush (void *s); + +// Send a message from the socket 's'. 'flags' argument can be combination +// of following values: +// ZS_NOBLOCK - if message cannot be received, return immediately. +// +// Errors: EAGAIN - message cannot be received at the moment (applies only to +// non-blocking receive). +// ENOTSUP - function isn't supported by particular socket type. +ZS_EXPORT int zs_recv (void *s, zs_msg *msg, int flags); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/zs.hpp b/include/zs.hpp new file mode 100644 index 0000000..d0f607f --- /dev/null +++ b/include/zs.hpp @@ -0,0 +1,231 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZSOCKETS_HPP_INCLUDED__ +#define __ZSOCKETS_HPP_INCLUDED__ + +#include "zs.h" + +#include + +namespace zs +{ + + typedef zs_free_fn free_fn; + + enum message_type_t + { + message_data = 1 << 0, + message_gap = 1 << ZS_GAP, + message_delimiter = 1 << ZS_DELIMITER + }; + + // A message. Caution: Don't change the body of the message once you've + // copied it - the behaviour is undefined. Don't change the body of the + // received message either - other threads may be accessing it in parallel. + + class message_t : private zs_msg + { + friend class socket_t; + + public: + + // Creates message size_ bytes long. + inline message_t (size_t size_ = 0) + { + int rc = zs_msg_init_size (this, size_); + assert (rc == 0); + } + + // Creates message from the supplied buffer. 0MQ takes care of + // deallocating the buffer once it is not needed. The deallocation + // function is supplied in ffn_ parameter. If ffn_ is NULL, no + // deallocation happens - this is useful for sending static buffers. + inline message_t (void *data_, size_t size_, + free_fn *ffn_) + { + int rc = zs_msg_init_data (this, data_, size_, ffn_); + assert (rc == 0); + } + + // Destroys the message. + inline ~message_t () + { + int rc = zs_msg_close (this); + assert (rc == 0); + } + + // Destroys old content of the message and allocates buffer for the + // new message body. Having this as a separate function allows user + // to reuse once-allocated message for multiple times. + inline void rebuild (size_t size_) + { + int rc = zs_msg_close (this); + assert (rc == 0); + rc = zs_msg_init_size (this, size_); + assert (rc == 0); + } + + // Same as above, however, the message is rebuilt from the supplied + // buffer. See appropriate constructor for discussion of buffer + // deallocation mechanism. + inline void rebuild (void *data_, size_t size_, free_fn *ffn_) + { + int rc = zs_msg_close (this); + assert (rc == 0); + rc = zs_msg_init_data (this, data_, size_, ffn_); + assert (rc == 0); + } + + // Moves the message content from one message to the another. If the + // destination message have contained data prior to the operation + // these get deallocated. The source message will contain 0 bytes + // of data after the operation. + inline void move_to (message_t *msg_) + { + int rc = zs_msg_move (this, (zs_msg*) msg_); + assert (rc == 0); + } + + // Copies the message content from one message to the another. If the + // destination message have contained data prior to the operation + // these get deallocated. + inline void copy_to (message_t *msg_) + { + int rc = zs_msg_copy (this, (zs_msg*) msg_); + assert (rc == 0); + } + + // Returns message type. + inline message_type_t type () + { + return (message_type_t) (1 << zs_msg_type (this)); + } + + // Returns pointer to message's data buffer. + inline void *data () + { + return zs_msg_data (this); + } + + // Returns the size of message data buffer. + inline size_t size () + { + return zs_msg_size (this); + } + + private: + + // Disable implicit message copying, so that users won't use shared + // messages (less efficient) without being aware of the fact. + message_t (const message_t&); + void operator = (const message_t&); + }; + + class context_t + { + friend class socket_t; + + public: + + inline context_t (int app_threads_, int io_threads_) + { + ptr = zs_init (app_threads_, io_threads_); + assert (ptr); + } + + inline ~context_t () + { + int rc = zs_term (ptr); + assert (rc == 0); + } + + private: + + void *ptr; + + // Disable copying. + context_t (const context_t&); + void operator = (const context_t&); + }; + + class socket_t + { + public: + + inline socket_t (context_t &context_, int type_ = 0) + { + ptr = zs_socket (context_.ptr, type_); + assert (ptr); + } + + inline ~socket_t () + { + int rc = zs_close (ptr); + assert (rc == 0); + } + + inline void bind (const char *addr_, zs_opts *opts_ = NULL) + { + int rc = zs_bind (ptr, addr_, opts_); + assert (rc == 0); + } + + inline void connect (const char *addr_, zs_opts *opts_ = NULL) + { + int rc = zs_connect (ptr, addr_, opts_); + assert (rc == 0); + } + + inline void subscribe (const char *criteria_) + { + int rc = zs_subscribe (ptr, criteria_); + assert (rc == 0); + } + + inline void send (message_t &msg_, int flags_ = 0) + { + int rc = zs_send (ptr, &msg_, flags_); + assert (rc == 0); + } + + inline void flush () + { + int rc = zs_flush (ptr); + assert (rc == 0); + } + + inline void recv (message_t *msg_, int flags_ = 0) + { + int rc = zs_recv (ptr, msg_, flags_); + assert (rc == 0); + } + + private: + + void *ptr; + + // Disable copying. + socket_t (const socket_t&); + void operator = (const socket_t&); + }; + +} + +#endif diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..bb648ec --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,120 @@ +lib_LTLIBRARIES = libzs.la + +libzs_la_SOURCES = \ + app_thread.hpp \ + atomic_bitmap.hpp \ + atomic_counter.hpp \ + atomic_ptr.hpp \ + command.hpp \ + config.hpp \ + connecter.hpp \ + data_distributor.hpp \ + decoder.hpp \ + devpoll.hpp \ + dispatcher.hpp \ + dummy_aggregator.hpp \ + dummy_distributor.hpp \ + encoder.hpp \ + epoll.hpp \ + err.hpp \ + fair_aggregator.hpp \ + fd.hpp \ + fd_signaler.hpp \ + io_object.hpp \ + io_thread.hpp \ + ip.hpp \ + i_api.hpp \ + i_demux.hpp \ + i_mux.hpp \ + i_poller.hpp \ + i_poll_events.hpp \ + i_session.hpp \ + i_signaler.hpp \ + i_engine.hpp \ + i_thread.hpp \ + listener.hpp \ + kqueue.hpp \ + load_balancer.hpp \ + msg.hpp \ + mutex.hpp \ + object.hpp \ + p2p.hpp \ + pipe.hpp \ + pipe_reader.hpp \ + pipe_writer.hpp \ + platform.hpp \ + poll.hpp \ + pub.hpp \ + rep.hpp \ + req.hpp \ + safe_object.hpp \ + select.hpp \ + session.hpp \ + session_stub.hpp \ + simple_semaphore.hpp \ + socket_base.hpp \ + sub.hpp \ + stdint.hpp \ + tcp_connecter.hpp \ + tcp_listener.hpp \ + tcp_socket.hpp \ + thread.hpp \ + uuid.hpp \ + windows.hpp \ + wire.hpp \ + ypipe.hpp \ + ypollset.hpp \ + yqueue.hpp \ + zmq_decoder.hpp \ + zmq_encoder.hpp \ + zmq_tcp_engine.hpp \ + app_thread.cpp \ + connecter.cpp \ + data_distributor.cpp \ + devpoll.hpp \ + dispatcher.cpp \ + dummy_aggregator.cpp \ + dummy_distributor.cpp \ + epoll.cpp \ + err.cpp \ + fair_aggregator.cpp \ + fd_signaler.cpp \ + io_object.cpp \ + io_thread.cpp \ + ip.cpp \ + kqueue.cpp \ + listener.cpp \ + load_balancer.cpp \ + object.cpp \ + p2p.cpp \ + pipe.cpp \ + pipe_reader.cpp \ + pipe_writer.cpp \ + poll.cpp \ + pub.cpp \ + rep.cpp \ + req.cpp \ + safe_object.cpp \ + select.cpp \ + session.cpp \ + session_stub.cpp \ + socket_base.cpp \ + sub.cpp \ + tcp_connecter.cpp \ + tcp_listener.cpp \ + tcp_socket.cpp \ + thread.cpp \ + uuid.cpp \ + ypollset.cpp \ + zmq_decoder.cpp \ + zmq_encoder.cpp \ + zmq_tcp_engine.cpp \ + zs.cpp + +libzs_la_LDFLAGS = -version-info 0:0:0 +libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@ + +dist-hook: + -rm $(distdir)/src/platform.hpp + + diff --git a/src/app_thread.cpp b/src/app_thread.cpp new file mode 100644 index 0000000..ca08976 --- /dev/null +++ b/src/app_thread.cpp @@ -0,0 +1,221 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zs.h" + +#if defined ZS_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + +#include "app_thread.hpp" +#include "dispatcher.hpp" +#include "err.hpp" +#include "session.hpp" +#include "pipe.hpp" +#include "config.hpp" +#include "i_api.hpp" +#include "dummy_aggregator.hpp" +#include "fair_aggregator.hpp" +#include "dummy_distributor.hpp" +#include "data_distributor.hpp" +#include "load_balancer.hpp" +#include "p2p.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" + +// If the RDTSC is available we use it to prevent excessive +// polling for commands. The nice thing here is that it will work on any +// system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ + (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZS_DELAY_COMMANDS +#endif + +zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), + tid (0), + last_processing_time (0) +{ +} + +void zs::app_thread_t::shutdown () +{ + // Deallocate all the sessions associated with the thread. + while (!sessions.empty ()) + sessions [0]->shutdown (); + + delete this; +} + +zs::app_thread_t::~app_thread_t () +{ +} + +void zs::app_thread_t::attach_session (session_t *session_) +{ + session_->set_index (sessions.size ()); + sessions.push_back (session_); +} + +void zs::app_thread_t::detach_session (session_t *session_) +{ + // O(1) removal of the session from the list. + sessions_t::size_type i = session_->get_index (); + sessions [i] = sessions [sessions.size () - 1]; + sessions [i]->set_index (i); + sessions.pop_back (); +} + +zs::i_poller *zs::app_thread_t::get_poller () +{ + zs_assert (false); +} + +zs::i_signaler *zs::app_thread_t::get_signaler () +{ + return &pollset; +} + +bool zs::app_thread_t::is_current () +{ + return !sessions.empty () && tid == getpid (); +} + +bool zs::app_thread_t::make_current () +{ + // If there are object managed by this slot we cannot assign the slot + // to a different thread. + if (!sessions.empty ()) + return false; + + tid = getpid (); + return true; +} + +zs::i_api *zs::app_thread_t::create_socket (int type_) +{ + i_mux *mux = NULL; + i_demux *demux = NULL; + session_t *session = NULL; + i_api *api = NULL; + + switch (type_) { + case ZS_P2P: + mux = new dummy_aggregator_t; + zs_assert (mux); + demux = new dummy_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new p2p_t (this, session); + zs_assert (api); + break; + case ZS_PUB: + demux = new data_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new pub_t (this, session); + zs_assert (api); + break; + case ZS_SUB: + mux = new fair_aggregator_t; + zs_assert (mux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new sub_t (this, session); + zs_assert (api); + break; + case ZS_REQ: + // TODO + zs_assert (false); + api = new req_t (this, session); + zs_assert (api); + break; + case ZS_REP: + // TODO + zs_assert (false); + api = new rep_t (this, session); + zs_assert (api); + break; + default: + errno = EINVAL; + return NULL; + } + + attach_session (session); + + return api; +} + +void zs::app_thread_t::process_commands (bool block_) +{ + ypollset_t::signals_t signals; + if (block_) + signals = pollset.poll (); + else { + +#if defined ZS_DELAY_COMMANDS + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if certain time + // elapsed since last command processing. Command delay varies + // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU + // etc. The optimisation makes sense only on platforms where getting + // a timestamp is a very cheap operation (tens of nanoseconds). + + // Get timestamp counter. +#if defined __GNUC__ + uint32_t low; + uint32_t high; + __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); + uint64_t current_time = (uint64_t) high << 32 | low; +#elif defined _MSC_VER + uint64_t current_time = __rdtsc (); +#else +#error +#endif + + // Check whether certain time have elapsed since last command + // processing. + if (current_time - last_processing_time <= max_command_delay) + return; + last_processing_time = current_time; +#endif + + // Check whether there are any commands pending for this thread. + signals = pollset.check (); + } + + if (signals) { + + // Traverse all the possible sources of commands and process + // all the commands from all of them. + for (int i = 0; i != thread_slot_count (); i++) { + if (signals & (ypollset_t::signals_t (1) << i)) { + command_t cmd; + while (dispatcher->read (i, get_thread_slot (), &cmd)) + cmd.destination->process_command (cmd); + } + } + } +} diff --git a/src/app_thread.hpp b/src/app_thread.hpp new file mode 100644 index 0000000..61e7ff1 --- /dev/null +++ b/src/app_thread.hpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_APP_THREAD_HPP_INCLUDED__ +#define __ZS_APP_THREAD_HPP_INCLUDED__ + +#include + +#include "i_thread.hpp" +#include "stdint.hpp" +#include "object.hpp" +#include "ypollset.hpp" + +namespace zs +{ + + class app_thread_t : public object_t, public i_thread + { + public: + + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + + // To be called when the whole infrastrucure is being closed (zs_term). + void shutdown (); + + // Returns signaler associated with this application thread. + i_signaler *get_signaler (); + + // Create socket engine in this thread. Return false if the calling + // thread doesn't match the thread handled by this app thread object. + struct i_api *create_socket (int type_); + + // Nota bene: The following two functions are accessed from different + // threads. The caller (dispatcher) is responsible for synchronisation + // of accesses. + + // Returns true is current thread is associated with the app thread. + bool is_current (); + + // Tries to associate current thread with the app thread object. + // Returns true is successfull, false otherwise. + bool make_current (); + + // Processes commands sent to this thread (if any). If 'block' is + // set to true, returns only after at least one command was processed. + void process_commands (bool block_); + + // i_thread implementation. + void attach_session (class session_t *session_); + void detach_session (class session_t *session_); + struct i_poller *get_poller (); + + private: + + // Clean-up. + ~app_thread_t (); + + // Thread ID associated with this slot. + // TODO: Virtualise pid_t! + // TODO: Check whether getpid returns unique ID for each thread. + int tid; + + // Vector of all sessionss associated with this app thread. + typedef std::vector sessions_t; + sessions_t sessions; + + // App thread's signaler object. + ypollset_t pollset; + + // Timestamp of when commands were processed the last time. + uint64_t last_processing_time; + + app_thread_t (const app_thread_t&); + void operator = (const app_thread_t&); + }; + +} + +#endif diff --git a/src/atomic.hpp b/src/atomic.hpp new file mode 100644 index 0000000..e24b719 --- /dev/null +++ b/src/atomic.hpp @@ -0,0 +1,310 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_ATOMIC_HPP_INCLUDED__ +#define __ZS_ATOMIC_HPP_INCLUDED__ + +#include "stdint.hpp" + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_X86 +#elif defined ZMQ_HAVE_WINDOWS +#define ZS_ATOMIC_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZS_ATOMIC_SOLARIS +#else +#define ZS_ATOMIC_MUTEX +#endif + +namespace zs +{ + + // Atomic assignement. + inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Atomic retrieval of an integer. + inline uint32_t atomic_uint32_get (volatile uint32_t *p_) + { + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + return *p_; + } + + // Atomic addition. Returns the old value. + inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchangeAdd ((LONG*) &value, increment_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_add_32_nv (&value, increment_) - delta_; +#elif defined ZS_ATOMIC_X86 + uint32_t old; + __asm__ volatile ( + "lock; xadd %0, %1\n\t" + : "=r" (old), "=m" (*p_) + : "0" (delta_), "m" (*p_) + : "cc", "memory"); + return old; +#else +#error // TODO: + sync.lock (); + uint32_t old = *p_; + *p_ += delta_; + sync.unlock (); +#endif + } + + // Atomic subtraction. Returns the old value. + inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_) + { +#if defined ZS_ATOMIC_WINDOWS + LONG delta = - ((LONG) delta_); + return InterlockedExchangeAdd ((LONG*) &value, delta); +#elif defined ZS_ATOMIC_SOLARIS + int32_t delta = - ((int32_t) delta_); + return atomic_add_32_nv (&value, delta) + delta_; +#elif defined ZS_ATOMIC_X86 + uint32_t old = -delta_; + __asm__ volatile ("lock; xaddl %0,%1" + : "=r" (old), "=m" (*p_) + : "0" (old), "m" (*p_) + : "cc"); + return old; +#else +#error // TODO: + sync.lock (); + uint32_t old = *p_; + *p_ -= delta_; + sync.unlock (); + return old; +#endif + } + + // Atomic assignement. + template + inline void atomic_ptr_set (volatile T **p_, T *value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Perform atomic 'exchange pointers' operation. Old value is returned. + template + inline void *atomic_ptr_xchg (volatile T **p_, T *value_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchangePointer (p_, value_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_swap_ptr (p_, value_); +#elif defined ZS_ATOMIC_X86 + void *old; + __asm__ volatile ( + "lock; xchg %0, %2" + : "=r" (old), "=m" (*p_) + : "m" (*p_), "0" (value_)); + return old; +#else +#error // TODO: + sync.lock (); + void *old = *p_; + *p_ = value_; + sync.unlock (); + return old; +#endif + } + + // Perform atomic 'compare and swap' operation on the pointer. + // The pointer is compared to 'cmp' argument and if they are + // equal, its value is set to 'value'. Old value of the pointer + // is returned. + template + inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedCompareExchangePointer (p_, value_, cmp_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_cas_ptr (p_, cmp_, value_); +#elif defined ZS_ATOMIC_X86 + void *old; + __asm__ volatile ( + "lock; cmpxchg %2, %3" + : "=a" (old), "=m" (*p_) + : "r" (value_), "m" (*p_), "0" (cmp_) + : "cc"); + return old; +#else +#error // TODO: + sync.lock (); + void *old = *p_; + if (old == cmp_) + *p_ = value_; + sync.unlock (); + return old; +#endif + } + +#if defined ZS_ATOMIC_X86 && defined __x86_64__ + typedef uint64_t atomic_bitmap_t; +#else + typedef uint32_t atomic_bitmap_t; +#endif + + // Atomic assignement. + inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_, + atomic_bitmap_t value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Bit-test-set-and-reset. Sets one bit of the value and resets + // another one. Returns the original value of the reset bit. + inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_, + int set_index_, int reset_index_) + { +#if defined ZS_ATOMIC_WINDOWS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << + set_index_)) & ~(integer_t (1) << reset_index_); + if (InterlockedCompareExchange ((volatile LONG*) p_, newval, + oldval) == (LONG) oldval) + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_SOLARIS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << + set_index_)) & ~(integer_t (1) << reset_index_); + if (atomic_cas_32 (p_, oldval, newval) == oldval) + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval, dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %1, %2\n\t" + "bts %3, %2\n\t" + "btr %4, %2\n\t" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) + : "r" (atomic_bitmap_t (set_index_)), + "r" (atomic_bitmap_t (reset_index_)) + : "cc"); + return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_)); +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) & + ~(atomic_bitmap_t (1) << reset_index_); + sync.unlock (); + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; +#endif + } + + // Sets value to newval. Returns the original value. + inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_, + atomic_bitmap_t newval_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchange ((volatile LONG*) p_, newval_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_swap_32 (p_, newval_); +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval = newval_; + __asm__ volatile ( + "lock; xchg %0, %1" + : "=r" (oldval) + : "m" (*p_), "0" (oldval) + : "memory"); + return oldval; +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = newval_; + sync.unlock (); +#endif + } + + // izte is "if-zero-then-else" atomic operation - if the value is zero + // it substitutes it by 'thenval' else it rewrites it by 'elseval'. + // Original value of the integer is returned from this function. + inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_, + atomic_bitmap_t thenval_, atomic_bitmap_t elseval_) + { +#if defined ZS_ATOMIC_WINDOWS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); + if (InterlockedCompareExchange ((volatile LONG*) p_, newval, + oldval) == (LONG) oldval) + return oldval; + } +#elif defined ZS_ATOMIC_SOLARIS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); + if (atomic_cas_32 (p_, oldval, newval) == oldval) + return oldval; + } +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval; + atomic_bitmap_t dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %3, %2\n\t" + "test %1, %1\n\t" + "jz 2f\n\t" + "mov %4, %2\n\t" + "2:" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) + : "r" (thenval_), "r" (elseval_) + : "cc"); + return oldval; +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = oldval ? elseval_ : thenval_; + sync.unlock (); + return oldval; +#endif + } + +} + +#endif diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp new file mode 100644 index 0000000..a5440de --- /dev/null +++ b/src/atomic_bitmap.hpp @@ -0,0 +1,286 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ +#define __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ + +#include "stdint.hpp" +#include "platform.hpp" + +// These are the conditions to choose between different implementations +// of atomic_bitmap. + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_BITMAP_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_BITMAP_X86 +#elif 0 && defined __sparc__ && defined __GNUC__ +#define ZS_ATOMIC_BITMAP_SPARC +#elif defined ZS_HAVE_WINDOWS +#define ZS_ATOMIC_BITMAP_WINDOWS +#elif defined ZS_HAVE_SOLARIS +#define ZS_ATOMIC_BITMAP_SOLARIS +#else +#define ZS_ATOMIC_BITMAP_MUTEX +#endif + +#if defined ZS_ATOMIC_BITMAP_MUTEX +#include "mutex.hpp" +#elif defined ZS_ATOMIC_BITMAP_WINDOWS +#include "windows.hpp" +#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#include +#endif + +namespace zs +{ + +