summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2010-01-30 20:10:21 +0100
committerMartin Lucina <mato@kotelna.sk>2010-01-30 20:10:21 +0100
commit4ffc0ea124c4b1632fe29bbb1743393e024ae95e (patch)
tree7d9db073b6cabcc84c173954e56c3a02b85c9248
parentdac8e8449dd134f9c43378e9ad1993ebc86080da (diff)
Signal handling take 2, use zmq_poll in main thread
Signal handling now uses an inproc transport to send a message to the main thread instead of acting on SIGINT and other termination signals and using a global variable to signal alarm expiration (yuck). This also changes the main/receiver thread architecture which now uses zmq_poll() and gives us the additional advantage of being able to poll for GUI events regularly. This solves the problem of the receive mode UI freezing when the sender has gone away.
-rw-r--r--zmq-camera.c294
1 files changed, 180 insertions, 114 deletions
diff --git a/zmq-camera.c b/zmq-camera.c
index fb46899..007df9b 100644
--- a/zmq-camera.c
+++ b/zmq-camera.c
@@ -40,6 +40,13 @@
/* inproc endpoint used to provide the "local loopback" view of the camera */
const char local_camera[] = "inproc://local-camera";
+/* inproc endpoint used for signalling the main thread */
+const char local_signal[] = "inproc://local-signal";
+
+/*
+ * Sender thread
+ */
+
/* Sender thread initialiser */
struct sender_args_t
{
@@ -48,10 +55,6 @@ struct sender_args_t
int frame_rate;
};
-/*
- * Sender thread
- */
-
void *sender_thread (void *arg)
{
struct sender_args_t *sender_args;
@@ -214,13 +217,32 @@ void *sender_thread (void *arg)
* Signal handling thread
*/
-/* Global variable used to signal if an alarm has expired */
-volatile int alarm_expired = 0;
+/* Signal thread initialiser */
+struct signal_args_t
+{
+ void *ctx;
+};
void *signal_thread (void *arg)
{
sigset_t wait_set;
int rc, signo;
+ struct signal_args_t *signal_args;
+ void *s;
+ zmq_msg_t msg;
+
+ signal_args = (struct signal_args_t *)arg;
+
+ /* Create a ZMQ_PUB socket for publising signal events back to the
+ main thread */
+ s = zmq_socket (signal_args->ctx, ZMQ_PUB);
+ assert (s);
+ rc = zmq_connect (s, local_signal);
+ if (rc != 0) {
+ fprintf (stderr, "zmq_connect (\"%s\"): %s\n", local_signal,
+ zmq_strerror (errno));
+ exit (1);
+ }
sigemptyset (&wait_set);
sigaddset (&wait_set, SIGTERM);
@@ -231,19 +253,12 @@ void *signal_thread (void *arg)
while (1) {
rc = sigwait (&wait_set, &signo);
assert (rc == 0);
-
- switch (signo) {
- case SIGTERM:
- case SIGINT:
- case SIGQUIT:
- fprintf (stderr, "Exiting.\n");
- exit (1);
- case SIGALRM:
- alarm_expired = 1;
- break;
- default:
- ; /* Ignored */
- }
+ rc = zmq_msg_init_size (&msg, sizeof (int));
+ assert (rc == 0);
+ memcpy (zmq_msg_data (&msg), &signo, sizeof (int));
+ rc = zmq_send (s, &msg, 0);
+ assert (rc == 0);
+ zmq_msg_close (&msg);
}
}
@@ -281,7 +296,9 @@ void err_usage(char *error)
/* Main program. */
int main (int argc, char *argv [])
{
- void *ctx, *s;
+ void *ctx,
+ *s_sub,
+ *s_signal;
int rc;
int opt, opt_sender, opt_receiver, opt_frame_rate, frame_rate;
char *endpoint = NULL;
@@ -294,6 +311,7 @@ int main (int argc, char *argv [])
struct itimerval alarm_timer;
uint64_t received_fps = 0;
uint64_t received_bps = 0;
+ double received_mbps;
char window_title[80];
/* Parse command line. */
@@ -325,26 +343,34 @@ int main (int argc, char *argv [])
err_usage ("Please specify either the -r or -s option");
endpoint = argv [optind];
- /* Block signal handling in all application and 0MQ threads and
- start a separate thread to handle all signals synchronously */
+ /* Block signal handling in all application and 0MQ threads */
sigfillset (&block_set);
pthread_sigmask (SIG_SETMASK, &block_set, NULL);
- pthread_t signal;
- rc = pthread_create (&signal, NULL, signal_thread, NULL);
- assert (rc == 0);
- /* Initialise 0MQ infrastructure for 2 application threads and
+ /* Initialise 0MQ infrastructure for 3 application threads and
a single I/O thread */
- ctx = zmq_init (2, 1, 0);
+ ctx = zmq_init (3, 1, ZMQ_POLL);
assert (ctx);
+
+ /* Create a ZMQ_SUB socket to receive signal events and start the
+ signal handling thread */
+ s_signal = zmq_socket (ctx, ZMQ_SUB);
+ assert (s_signal);
+ rc = zmq_bind (s_signal, local_signal);
+ assert (rc == 0);
+
+ pthread_t signal;
+ struct signal_args_t signal_args = {ctx};
+ rc = pthread_create (&signal, NULL, signal_thread, (void *) &signal_args);
+ assert (rc == 0);
/* Create a ZMQ_SUB socket to receive video data. If we're sending video,
bind to an inproc: endpoint that the sender thread will connect to,
otherwise connect to the endpoint the user specified. */
- s = zmq_socket (ctx, ZMQ_SUB);
- assert (s);
+ s_sub = zmq_socket (ctx, ZMQ_SUB);
+ assert (s_sub);
if (opt_sender) {
- rc = zmq_bind (s, local_camera);
+ rc = zmq_bind (s_sub, local_camera);
if (rc != 0) {
fprintf (stderr, "zmq_bind (\"%s\"): %s\n", local_camera,
zmq_strerror (errno));
@@ -360,7 +386,7 @@ int main (int argc, char *argv [])
assert (rc == 0);
}
else {
- rc = zmq_connect (s, endpoint);
+ rc = zmq_connect (s_sub, endpoint);
if (rc != 0) {
fprintf (stderr, "zmq_connect (\"%s\"): %s\n", endpoint,
zmq_strerror (errno));
@@ -368,8 +394,10 @@ int main (int argc, char *argv [])
}
}
- /* Subscribe to all messages on socket. */
- rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "", 0);
+ /* Subscribe to all messages on video and signal sockets. */
+ rc = zmq_setsockopt (s_sub, ZMQ_SUBSCRIBE, "", 0);
+ assert (rc == 0);
+ rc = zmq_setsockopt (s_signal, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
/* Set up an interval timer to fire SIGALRM once a second so that
@@ -384,6 +412,13 @@ int main (int argc, char *argv [])
/* Intial window title is the endpoint we are receiving from */
snprintf (window_title, 80, "%s", opt_sender ? local_camera : endpoint);
+ /* Set up pollset */
+ zmq_pollitem_t items[2];
+ items[0].socket = s_sub;
+ items[0].events = ZMQ_POLLIN;
+ items[1].socket = s_signal;
+ items[1].events = ZMQ_POLLIN;
+
/* Display video until user asks to quit. */
while (!quit) {
@@ -392,95 +427,126 @@ int main (int argc, char *argv [])
unsigned char *data;
SDL_Event event;
- /* Receive single message. */
- rc = zmq_msg_init (&msg);
- assert (rc == 0);
- rc = zmq_recv (s, &msg, 0);
- assert (rc == 0);
-
- /* Parse message data. */
- data = (unsigned char*) zmq_msg_data (&msg);
- /* Sanity check that we have at least the width, height in
- the message data */
- msg_size = zmq_msg_size (&msg);
- assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t));
-
- /* Get image width in pixels. */
- memcpy (&image_width, data, sizeof (uint32_t));
- image_width = ntohl (image_width);
- data += sizeof (uint32_t);
-
- /* Get image height in pixels. */
- memcpy (&image_height, data, sizeof (uint32_t));
- image_height = ntohl (image_height);
- data += sizeof (uint32_t);
-
- /* Sanity check message size again */
- assert (msg_size >= image_width * image_height * 3);
-
- /* data now points to RGB24 pixel data. */
-
- if (!sdl_initialised) {
-
- /* Initialise SDL if not already done.
- We need to have received at least one message, so that we
- know what the image size being sent is. */
- if (SDL_Init (SDL_INIT_VIDEO) < 0)
- {
- fprintf (stderr, "Failed to initialize SDL: %s\n",
- SDL_GetError());
- exit (1);
+ /* Poll for incoming messages on either socket */
+ rc = zmq_poll (items, 2, 100000);
+ assert (rc >= 0);
+
+ /* Signal received */
+ if (items[1].revents == ZMQ_POLLIN) {
+ rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+ rc = zmq_recv (s_signal, &msg, 0);
+ assert (rc == 0);
+ assert (zmq_msg_size (&msg) == sizeof (int));
+ int signo;
+ memcpy (&signo, zmq_msg_data (&msg), sizeof (int));
+
+ switch (signo) {
+ case SIGTERM:
+ case SIGINT:
+ case SIGQUIT:
+ fprintf (stderr, "Exiting.\n");
+ exit (1);
+ case SIGALRM:
+ /* If the timer fired, recalculate bandwidth and update window title */
+ received_mbps = (double)received_bps * 8 / (1<<20);
+ snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)",
+ opt_sender ? local_camera : endpoint,
+ (long long unsigned int) received_fps, received_mbps);
+ if (sdl_initialised)
+ SDL_WM_SetCaption (window_title, window_title);
+ received_fps = received_bps = 0;
+ break;
+ default:
+ ; /* Ignore anything else */
}
- screen = SDL_SetVideoMode (image_width, image_height, 32,
- SDL_HWSURFACE);
- if (screen == NULL) {
- fprintf (stderr, "Unable to set video mode: %s\n",
- SDL_GetError ());
- SDL_Quit ();
- exit (1);
- }
- /* Set initial window title */
- SDL_WM_SetCaption (window_title, window_title);
- sdl_initialised = 1;
+ zmq_msg_close (&msg);
}
- /* Create RGB surface. */
- rgb_surface = SDL_CreateRGBSurfaceFrom (
- data, /* Pixel data */
- image_width, /* Width */
- image_height, /* Height */
- 24, /* Depth */
- image_width * 3, /* Scanline pitch */
- 0, 0, 0, 0); /* TODO: RGBA mask */
-
- /* Blit surface to screen. */
- SDL_BlitSurface (rgb_surface, NULL, screen, NULL);
- SDL_UpdateRect (screen, 0, 0, 0, 0);
- SDL_FreeSurface (rgb_surface);
-
- /* Free zmq_msg we received */
- zmq_msg_close (&msg);
-
- /* Update statistics */
- received_fps += 1;
- received_bps += msg_size;
- /* If the timer fired, recalculate bandwidth and update window title */
- if (alarm_expired) {
- double received_mbps = (double)received_bps * 8 / (1<<20);
- snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)",
- opt_sender ? local_camera : endpoint,
- (long long unsigned int) received_fps, received_mbps);
- SDL_WM_SetCaption (window_title, window_title);
- received_fps = received_bps = 0;
- alarm_expired = 0;
+ /* Video frame received */
+ if (items[0].revents == ZMQ_POLLIN) {
+ rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+ rc = zmq_recv (s_sub, &msg, 0);
+ assert (rc == 0);
+
+ /* Parse message data. */
+ data = (unsigned char*) zmq_msg_data (&msg);
+ /* Sanity check that we have at least the width, height in
+ the message data */
+ msg_size = zmq_msg_size (&msg);
+ assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t));
+
+ /* Get image width in pixels. */
+ memcpy (&image_width, data, sizeof (uint32_t));
+ image_width = ntohl (image_width);
+ data += sizeof (uint32_t);
+
+ /* Get image height in pixels. */
+ memcpy (&image_height, data, sizeof (uint32_t));
+ image_height = ntohl (image_height);
+ data += sizeof (uint32_t);
+
+ /* Sanity check message size again */
+ assert (msg_size >= image_width * image_height * 3);
+
+ /* data now points to RGB24 pixel data. */
+
+ if (!sdl_initialised) {
+
+ /* Initialise SDL if not already done.
+ We need to have received at least one message, so that we
+ know what the image size being sent is. */
+ if (SDL_Init (SDL_INIT_VIDEO) < 0)
+ {
+ fprintf (stderr, "Failed to initialize SDL: %s\n",
+ SDL_GetError());
+ exit (1);
+ }
+ screen = SDL_SetVideoMode (image_width, image_height, 32,
+ SDL_HWSURFACE);
+ if (screen == NULL) {
+ fprintf (stderr, "Unable to set video mode: %s\n",
+ SDL_GetError ());
+ SDL_Quit ();
+ exit (1);
+ }
+ /* Set initial window title */
+ SDL_WM_SetCaption (window_title, window_title);
+
+ sdl_initialised = 1;
+ }
+
+ /* Create RGB surface. */
+ rgb_surface = SDL_CreateRGBSurfaceFrom (
+ data, /* Pixel data */
+ image_width, /* Width */
+ image_height, /* Height */
+ 24, /* Depth */
+ image_width * 3, /* Scanline pitch */
+ 0, 0, 0, 0); /* TODO: RGBA mask */
+
+ /* Blit surface to screen. */
+ SDL_BlitSurface (rgb_surface, NULL, screen, NULL);
+ SDL_UpdateRect (screen, 0, 0, 0, 0);
+ SDL_FreeSurface (rgb_surface);
+
+ /* Free zmq_msg we received */
+ zmq_msg_close (&msg);
+
+ /* Update statistics */
+ received_fps += 1;
+ received_bps += msg_size;
}
/* Check if user asked to quit. */
+ if (sdl_initialised) {
while (SDL_PollEvent (&event))
- {
- if (event.type == SDL_QUIT)
- quit = 1;
+ {
+ if (event.type == SDL_QUIT)
+ quit = 1;
+ }
}
}