summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-11 11:13:15 +0200
committermalosek <malosek@fastmq.com>2009-09-11 11:13:15 +0200
commit88695aaee607e6f4db1f4fd052e2596653fb18b2 (patch)
tree21b9439d327239053bfcd3c080e1ffa92a5855db /src/socket_base.cpp
parentf824b8a067c60b32260c56020742d6428ed3bb98 (diff)
link libzmq with glib when congifured --with-pgm
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp74
1 files changed, 62 insertions, 12 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index a26c280..2384d80 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <iostream>
+
#include <string>
#include <algorithm>
@@ -35,6 +37,7 @@
#include "uuid.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "platform.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -145,6 +148,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
errno = ENOTSUP;
return -1;
+ case ZMQ_RATE:
+ if (optvallen_ != sizeof (uint32_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ options.rate = *((int32_t*) optval_);
+ return 0;
+
+ case ZMQ_RECOVERY_IVL:
+ if (optvallen_ != sizeof (uint32_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+ options.recovery_ivl = *((int32_t*) optval_);
+ return 0;
+
default:
errno = EINVAL;
return -1;
@@ -170,6 +189,21 @@ int zmq::socket_base_t::connect (const char *addr_)
std::string session_name ("#");
session_name += uuid_t ().to_string ();
+ // Parse addr_ string.
+ std::string addr_type;
+ std::string addr_args;
+
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+
+ if (pos == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ addr_type = addr.substr (0, pos);
+ addr_args = addr.substr (pos + 3);
+
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
@@ -198,20 +232,36 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session);
send_own (this, session);
- // Create the connecter object. Supply it with the session name so that
- // it can bind the new connection to the session once it is established.
- zmq_connecter_t *connecter = new zmq_connecter_t (
- choose_io_thread (options.affinity), this, options,
- session_name.c_str ());
- int rc = connecter->set_address (addr_);
- if (rc != 0) {
- delete connecter;
- return -1;
+ if (addr_type == "tcp") {
+
+ // Create the connecter object. Supply it with the session name so that
+ // it can bind the new connection to the session once it is established.
+ zmq_connecter_t *connecter = new zmq_connecter_t (
+ choose_io_thread (options.affinity), this, options,
+ session_name.c_str ());
+ int rc = connecter->set_address (addr_args.c_str ());
+ if (rc != 0) {
+ delete connecter;
+ return -1;
+ }
+ send_plug (connecter);
+ send_own (this, connecter);
+
+ return 0;
}
- send_plug (connecter);
- send_own (this, connecter);
- return 0;
+#if defined ZMQ_HAVE_OPENPGM
+ if (addr_type == "pgm") {
+
+ zmq_assert (false);
+
+ return 0;
+ }
+#endif
+
+ // Unknown address type.
+ errno = ENOTSUP;
+ return -1;
}
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)