/*
Copyright (c) 2010 Martin Lucina
This file is part of zeromq-examples.
zeromq-examples is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
zeromq-examples is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
/*
* Overall architecture:
*
* zmq-camera uses 3 application threads:
*
* 1) the main thread, which receives and displays video from either the
* sender thread using inproc://local-camera if running in sender mode (-s)
* or from a remote endpoint specified by the user if running in
* receive-only mode (-r).
*
* the main thread also receives and handles signals and timers,
* using inproc://local-signal and periodically polls for UI events.
*
* 2) the signal handling thread, which handles all signals synchronously
* using a sigwait() loop and sends a single message for each signal
* received to the main thread.
*
* 3) the sender thread which does the actual video capture and sends one
* message per raw video frame both to inproc://local-camera so that the
* user sees what they are sending and to the remote endpoint specified.
*
* Message formats:
*
* inproc://local-signal
*
* [ signo (int) ]
*
* Video data:
*
* [ width (uint32_t in network byte order),
* height (uint32_t in network byte order),
* (RGB24 pixel data) ]
*
*/
#define FOURCC(a,b,c,d) (unsigned int)((((unsigned int)d)<<24)+\
(((unsigned int)c)<<16)+(((unsigned int)b)<<8)+a)
/* inproc endpoint used to provide the "local loopback" view of the camera */
const char local_camera[] = "inproc://local-camera";
/* inproc endpoint used for signalling the main thread */
const char local_signal[] = "inproc://local-signal";
/* multicast transport options */
const uint64_t mcast_rate = 100000; /* Rate limit for multicast, kbps */
const uint64_t recovery_ivl = 5; /* Recovery interval for multicast, seconds */
/*
* Sender thread
*/
/* Sender thread initialiser */
struct sender_args_t
{
const char *endpoint;
void *ctx;
int frame_rate;
};
void *sender_thread (void *arg)
{
struct sender_args_t *sender_args;
void *s;
int rc;
unicap_handle_t handle;
unicap_device_t device;
unicap_format_t src_format;
unicap_format_t dest_format;
unicap_data_buffer_t src_buffer;
unicap_data_buffer_t dest_buffer;
unicap_data_buffer_t *returned_buffer;
int conversion_found = 0;
int index = 0;
sender_args = (struct sender_args_t *)arg;
/* Create a ZMQ_PUB socket for sending video data */
s = zmq_socket (sender_args->ctx, ZMQ_PUB);
assert (s);
/* Set a suitable rate limit and recovery window for multicast transports.
This must be done before attempting to connect or bind the socket
to the multicast transport. */
rc = zmq_setsockopt (s, ZMQ_RATE, &mcast_rate, sizeof mcast_rate);
assert (rc == 0);
rc = zmq_setsockopt (s, ZMQ_RECOVERY_IVL, &recovery_ivl,
sizeof recovery_ivl);
assert (rc == 0);
/* Connect it to both an inproc endpoint for the local loopback view and
bind to the remote endpoint specified by the user. */
rc = zmq_connect (s, local_camera);
assert (rc == 0);
rc = zmq_bind (s, sender_args->endpoint);
if (rc != 0) {
fprintf (stderr, "zmq_bind (\"%s\"): %s\n", sender_args->endpoint,
zmq_strerror (errno));
exit (1);
}
/* Open first available video capture device. */
if (!SUCCESS (unicap_enumerate_devices (NULL, &device, 0))) {
fprintf (stderr, "Could not enumerate devices\n");
exit (1);
}
if (!SUCCESS (unicap_open (&handle, &device))) {
fprintf (stderr, "Failed to open device: %s\n", device.identifier);
exit (1);
}
printf( "Opened video capture device: %s\n", device.identifier );
/* Find a suitable video format that we can convert to RGB24. */
while (SUCCESS (unicap_enumerate_formats (handle, NULL, &src_format,
index))) {
printf ("Trying video format: %s\n", src_format.identifier);
if (ucil_conversion_supported (FOURCC ('R', 'G', 'B', '3'),
src_format.fourcc)) {
conversion_found = 1;
break;
}
index++;
}
if (!conversion_found) {
fprintf (stderr, "Could not find a suitable video format\n\n");
fprintf (stderr, "This probably means you have a cheap camera.\n");
fprintf (stderr, "Feel free to either get a better camera, or improve\n");
fprintf (stderr, "the colorspace conversion code in this example.\n");
exit (1);
}
src_format.buffer_type = UNICAP_BUFFER_TYPE_USER;
if (!SUCCESS (unicap_set_format (handle, &src_format))) {
fprintf (stderr, "Failed to set video format\n");
exit (1);
}
printf ("Using video format: %s [%dx%d]\n",
src_format.identifier,
src_format.size.width,
src_format.size.height);
/* Clone destination format with equal dimensions, but RGB24 colorspace. */
unicap_copy_format (&dest_format, &src_format);
strcpy (dest_format.identifier, "RGB 24bpp");
dest_format.fourcc = FOURCC ('R', 'G', 'B', '3');
dest_format.bpp = 24;
dest_format.buffer_size = dest_format.size.width *
dest_format.size.height * 3;
/* Initialise image buffers. */
memset (&src_buffer, 0, sizeof (unicap_data_buffer_t));
src_buffer.data = (unsigned char*) malloc (src_format.buffer_size);
src_buffer.buffer_size = src_format.buffer_size;
memset (&dest_buffer, 0, sizeof (unicap_data_buffer_t));
dest_buffer.data = (unsigned char*) malloc (dest_format.buffer_size);
dest_buffer.buffer_size = dest_format.buffer_size;
dest_buffer.format = dest_format;
/* Start video capture. */
if (!SUCCESS (unicap_start_capture (handle))) {
fprintf (stderr, "Failed to start capture on device: %s\n",
device.identifier);
exit (1);
}
/* Loop, publising one message per raw video frame. */
while (1) {
zmq_msg_t msg;
size_t msg_size;
uint32_t image_width, image_height;
unsigned char *data;
/* Queue buffer for video capture. */
if (!SUCCESS (unicap_queue_buffer (handle, &src_buffer))) {
fprintf (stderr, "Failed to queue a buffer on device: %s\n",
device.identifier);
exit (1);
}
/* Wait until buffer is ready. */
if (!SUCCESS (unicap_wait_buffer (handle, &returned_buffer))) {
fprintf (stderr, "Failed to wait for buffer on device: %s\n",
device.identifier);
exit (1);
}
/* Convert colorspace. */
if (!SUCCESS (ucil_convert_buffer (&dest_buffer, &src_buffer))) {
/* TODO: This fails sometimes for unknown reasons,
just skip the frame for now. */
/* fprintf (stderr, "Failed to convert video buffer\n"); */
}
/* Create 0MQ message and fill in data. */
msg_size = dest_format.buffer_size + (2 * sizeof (uint32_t));
rc = zmq_msg_init_size (&msg, msg_size);
assert (rc == 0);
data = (unsigned char *)zmq_msg_data (&msg);
/* Image width (uint32_t in network byte order) */
image_width = htonl (dest_format.size.width);
memcpy (data, &image_width, sizeof (uint32_t));
data += sizeof (uint32_t);
/* Image height (uint32_t in network byte order) */
image_height = htonl (dest_format.size.height);
memcpy (data, &image_height, sizeof (uint32_t));
data += sizeof (uint32_t);
/* RGB24 image data. */
memcpy (data, dest_buffer.data, dest_format.buffer_size);
/* Send message to network. */
rc = zmq_send (s, &msg, 0);
assert (rc == 0);
zmq_msg_close (&msg);
/* Limit to maximum frame rate if requested */
if (sender_args->frame_rate > 1)
usleep (1000000 / sender_args->frame_rate);
}
return NULL;
}
/*
* Signal handling thread
*/
/* Signal thread initialiser */
struct signal_args_t
{
void *ctx;
};
void *signal_thread (void *arg)
{
sigset_t wait_set;
int rc, signo;
struct signal_args_t *signal_args;
void *s;
zmq_msg_t msg;
signal_args = (struct signal_args_t *)arg;
/* Create a ZMQ_PUB socket for publising signal events back to the
main thread */
s = zmq_socket (signal_args->ctx, ZMQ_PUB);
assert (s);
rc = zmq_connect (s, local_signal);
assert (rc == 0);
/* Handle various termination signals and SIGALRM for alarm timer */
sigemptyset (&wait_set);
sigaddset (&wait_set, SIGTERM);
sigaddset (&wait_set, SIGINT);
sigaddset (&wait_set, SIGQUIT);
sigaddset (&wait_set, SIGALRM);
/* Loop and send a message to the main thread for each signal received */
while (1) {
rc = sigwait (&wait_set, &signo);
assert (rc == 0);
rc = zmq_msg_init_size (&msg, sizeof (int));
assert (rc == 0);
memcpy (zmq_msg_data (&msg), &signo, sizeof (int));
rc = zmq_send (s, &msg, 0);
assert (rc == 0);
zmq_msg_close (&msg);
}
}
/*
* Main thread
*/
/* Print usage and exit. */
void err_usage(char *error)
{
const char usage[] = \
"Usage: zmq-camera [OPTIONS] -r | -s ENDPOINT\n"
"Sends or receives video using 0MQ.\n"
"\n"
"Mode:\n"
" -r receive video from ENDPOINT\n"
" -s send video to ENDPOINT\n"
"\n"
"ENDPOINT is any 0MQ endpoint valid for a ZMQ_PUB/ZMQ_SUB socket.\n"
"\n"
"Options:\n"
" -f RATE limit maximum frame rate to RATE\n"
"\n"
"Examples:\n"
"\n"
" zmq-camera -s tcp://eth0:5555\n"
" zmq-camera -r tcp://eth0:5555\n";
if (error)
fprintf (stderr, "Error: %s\n", error);
fprintf (stderr, "%s", usage);
exit (1);
}
/* Main program. */
int main (int argc, char *argv [])
{
void *ctx;
void *s_video; /* Video socket */
void *s_signal; /* Signalling socket */
int rc;
int opt;
int opt_sender; /* Sender mode? */
int opt_receiver;
int opt_frame_rate, frame_rate;
char *endpoint = NULL;
SDL_Surface *screen = NULL;
SDL_Surface *rgb_surface = NULL;
SDL_Event event; /* Event from UI */
uint32_t image_width, image_height;
int sdl_initialised = 0;
int quit = 0;
sigset_t block_set;
struct itimerval alarm_timer;
uint64_t received_fps = 0; /* Frames received in the last second */
uint64_t received_bps = 0; /* Bytes received in the last second */
double received_mbps; /* Video bandwidth in megabits per second */
char window_title[80];
zmq_msg_t msg;
size_t msg_size;
unsigned char *data;
pthread_t signal_pthread, sender_pthread;
struct signal_args_t signal_args;
struct sender_args_t sender_args;
int received_signo; /* signo received from signal thread */
/* Parse command line. */
opt_sender = opt_receiver = opt_frame_rate = frame_rate = 0;
while ((opt = getopt (argc, argv, "rsf:")) != -1) {
switch (opt) {
case 'r':
opt_receiver = 1;
break;
case 's':
opt_sender = 1;
break;
case 'f':
opt_frame_rate = 1;
frame_rate = atoi (optarg);
if (frame_rate < 1)
err_usage ("RATE must be numeric and non-zero");
printf ("Limiting frame rate to %d fps\n", frame_rate);
break;
default:
err_usage (NULL);
}
}
if (optind >= argc)
err_usage ("Expected ENDPOINT argument after options");
if (opt_receiver && opt_sender)
err_usage ("Please specify either the -r or -s option");
if ((opt_receiver == 0) && (opt_sender == 0))
err_usage ("Please specify either the -r or -s option");
endpoint = argv [optind];
/* Block signal handling in all application and 0MQ threads */
sigfillset (&block_set);
pthread_sigmask (SIG_SETMASK, &block_set, NULL);
/* Initialise 0MQ infrastructure with a single I/O thread */
ctx = zmq_init (1);
assert (ctx);
/* Create a ZMQ_SUB socket to receive signal events and start the
signal handling thread */
s_signal = zmq_socket (ctx, ZMQ_SUB);
assert (s_signal);
rc = zmq_bind (s_signal, local_signal);
assert (rc == 0);
signal_args.ctx = ctx;
rc = pthread_create (&signal_pthread, NULL, signal_thread,
(void *)&signal_args);
assert (rc == 0);
/* Create a ZMQ_SUB socket to receive video data. */
s_video = zmq_socket (ctx, ZMQ_SUB);
assert (s_video);
/* Set a suitable rate limit and recovery window for multicast transports.
This must be done before attempting to connect or bind the socket
to the multicast transport. */
rc = zmq_setsockopt (s_video, ZMQ_RATE, &mcast_rate, sizeof mcast_rate);
assert (rc == 0);
rc = zmq_setsockopt (s_video, ZMQ_RECOVERY_IVL, &recovery_ivl,
sizeof recovery_ivl);
assert (rc == 0);
/* If we're sending video, bind to an inproc: endpoint that the sender
thread will connect to, otherwise connect to the remote endpoint the
user specified. */
if (opt_sender) {
rc = zmq_bind (s_video, local_camera);
assert (rc == 0);
/* Start the sender thread after binding to the inproc: endpoint
since this must exist for it to connect. */
sender_args.endpoint = endpoint;
sender_args.ctx = ctx;
sender_args.frame_rate = frame_rate;
rc = pthread_create (&sender_pthread, NULL, sender_thread,
(void *)&sender_args);
assert (rc == 0);
}
else {
rc = zmq_connect (s_video, endpoint);
if (rc != 0) {
fprintf (stderr, "zmq_connect (\"%s\"): %s\n", endpoint,
zmq_strerror (errno));
exit (1);
}
}
/* Subscribe to all messages on video and signal sockets. */
rc = zmq_setsockopt (s_video, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_setsockopt (s_signal, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
/* Set up an interval timer to fire SIGALRM once a second so that
we can update bandwidth and fps statistics */
alarm_timer.it_interval.tv_sec = 1;
alarm_timer.it_interval.tv_usec = 0;
alarm_timer.it_value.tv_sec = 1;
alarm_timer.it_value.tv_usec = 0;
rc = setitimer (ITIMER_REAL, &alarm_timer, NULL);
assert (rc == 0);
/* Intial window title is the endpoint we are receiving from */
snprintf (window_title, 80, "%s", opt_sender ? local_camera : endpoint);
/* Set up pollset */
zmq_pollitem_t items[2];
items[0].socket = s_video;
items[0].events = ZMQ_POLLIN;
items[1].socket = s_signal;
items[1].events = ZMQ_POLLIN;
/*
* Main event loop
*/
while (!quit) {
/* Poll for incoming messages on either the video or
signalling socket */
rc = zmq_poll (items, 2, 100000);
assert (rc >= 0);
/*
* Signal received
*/
if (items[1].revents == ZMQ_POLLIN) {
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (s_signal, &msg, 0);
assert (rc == 0);
assert (zmq_msg_size (&msg) == sizeof (int));
memcpy (&received_signo, zmq_msg_data (&msg), sizeof (int));
switch (received_signo) {
/* Termination request */
case SIGTERM:
case SIGINT:
case SIGQUIT:
fprintf (stderr, "Exiting.\n");
exit (1);
/* Alarm timer fired, update bandwidth statistics */
case SIGALRM:
received_mbps = (double)received_bps * 8 / (1<<20);
snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)",
opt_sender ? local_camera : endpoint,
(long long unsigned int) received_fps, received_mbps);
if (sdl_initialised)
SDL_WM_SetCaption (window_title, window_title);
received_fps = received_bps = 0;
break;
/* Ignore anything else */
default:
;
}
zmq_msg_close (&msg);
}
/*
* Video frame received
*/
if (items[0].revents == ZMQ_POLLIN) {
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (s_video, &msg, 0);
assert (rc == 0);
/* Parse message data. */
data = (unsigned char*) zmq_msg_data (&msg);
/* Sanity check that we have at least the width, height in
the message data */
msg_size = zmq_msg_size (&msg);
assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t));
/* Get image width in pixels. */
memcpy (&image_width, data, sizeof (uint32_t));
image_width = ntohl (image_width);
data += sizeof (uint32_t);
/* Get image height in pixels. */
memcpy (&image_height, data, sizeof (uint32_t));
image_height = ntohl (image_height);
data += sizeof (uint32_t);
/* Sanity check message size again */
assert (msg_size >= image_width * image_height * 3);
/* data now points to RGB24 pixel data. */
if (!sdl_initialised) {
/* Initialise SDL if not already done.
We need to have received at least one message, so that we
know what the image size being sent is. */
if (SDL_Init (SDL_INIT_VIDEO) < 0)
{
fprintf (stderr, "Failed to initialize SDL: %s\n",
SDL_GetError());
exit (1);
}
screen = SDL_SetVideoMode (image_width, image_height, 32,
SDL_HWSURFACE);
if (screen == NULL) {
fprintf (stderr, "Unable to set video mode: %s\n",
SDL_GetError ());
SDL_Quit ();
exit (1);
}
/* Set initial window title */
SDL_WM_SetCaption (window_title, window_title);
sdl_initialised = 1;
}
/* Create RGB surface. */
rgb_surface = SDL_CreateRGBSurfaceFrom (
data, /* Pixel data */
image_width, /* Width */
image_height, /* Height */
24, /* Depth */
image_width * 3, /* Scanline pitch */
0x000000ff, /* Red */
0x0000ff00, /* Green */
0x00ff0000, /* Blue */
0); /* No alpha */
/* Blit surface to screen. */
SDL_BlitSurface (rgb_surface, NULL, screen, NULL);
SDL_UpdateRect (screen, 0, 0, 0, 0);
SDL_FreeSurface (rgb_surface);
/* Free zmq_msg we received */
zmq_msg_close (&msg);
/* Update statistics */
received_fps += 1;
received_bps += msg_size;
}
/*
* Poll for UI events
*/
while (sdl_initialised && SDL_PollEvent (&event))
{
if (event.type == SDL_QUIT)
quit = 1;
if (event.type == SDL_KEYDOWN)
if (event.key.keysym.sym == SDLK_ESCAPE)
quit = 1;
}
}
/* TODO: Send a 'stop' message to sender thread rather than killing it
forcefully by terminating the main thread. This would allow it
to call zmq_close() correctly and us to call zmq_term() here. */
/* Cleanup */
SDL_Quit ();
return 0;
}