/* Copyright (c) 2010 Martin Lucina This file is part of zeromq-examples. zeromq-examples is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. zeromq-examples is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include /* * Overall architecture: * * zmq-camera uses 3 application threads: * * 1) the main thread, which receives and displays video from either the * sender thread using inproc://local-camera if running in sender mode (-s) * or from a remote endpoint specified by the user if running in * receive-only mode (-r). * * the main thread also receives and handles signals and timers, * using inproc://local-signal and periodically polls for UI events. * * 2) the signal handling thread, which handles all signals synchronously * using a sigwait() loop and sends a single message for each signal * received to the main thread. * * 3) the sender thread which does the actual video capture and sends one * message per raw video frame both to inproc://local-camera so that the * user sees what they are sending and to the remote endpoint specified. * * Message formats: * * inproc://local-signal * * [ signo (int) ] * * Video data: * * [ width (uint32_t in network byte order), * height (uint32_t in network byte order), * (RGB24 pixel data) ] * */ #define FOURCC(a,b,c,d) (unsigned int)((((unsigned int)d)<<24)+\ (((unsigned int)c)<<16)+(((unsigned int)b)<<8)+a) /* inproc endpoint used to provide the "local loopback" view of the camera */ const char local_camera[] = "inproc://local-camera"; /* inproc endpoint used for signalling the main thread */ const char local_signal[] = "inproc://local-signal"; /* multicast transport options */ const uint64_t mcast_rate = 100000; /* Rate limit for multicast, kbps */ const uint64_t recovery_ivl = 5; /* Recovery interval for multicast, seconds */ /* * Sender thread */ /* Sender thread initialiser */ struct sender_args_t { const char *endpoint; void *ctx; int frame_rate; }; void *sender_thread (void *arg) { struct sender_args_t *sender_args; void *s; int rc; unicap_handle_t handle; unicap_device_t device; unicap_format_t src_format; unicap_format_t dest_format; unicap_data_buffer_t src_buffer; unicap_data_buffer_t dest_buffer; unicap_data_buffer_t *returned_buffer; int conversion_found = 0; int index = 0; sender_args = (struct sender_args_t *)arg; /* Create a ZMQ_PUB socket for sending video data */ s = zmq_socket (sender_args->ctx, ZMQ_PUB); assert (s); /* Set a suitable rate limit and recovery window for multicast transports. This must be done before attempting to connect or bind the socket to the multicast transport. */ rc = zmq_setsockopt (s, ZMQ_RATE, &mcast_rate, sizeof mcast_rate); assert (rc == 0); rc = zmq_setsockopt (s, ZMQ_RECOVERY_IVL, &recovery_ivl, sizeof recovery_ivl); assert (rc == 0); /* Connect it to both an inproc endpoint for the local loopback view and bind to the remote endpoint specified by the user. */ rc = zmq_connect (s, local_camera); assert (rc == 0); rc = zmq_bind (s, sender_args->endpoint); if (rc != 0) { fprintf (stderr, "zmq_bind (\"%s\"): %s\n", sender_args->endpoint, zmq_strerror (errno)); exit (1); } /* Open first available video capture device. */ if (!SUCCESS (unicap_enumerate_devices (NULL, &device, 0))) { fprintf (stderr, "Could not enumerate devices\n"); exit (1); } if (!SUCCESS (unicap_open (&handle, &device))) { fprintf (stderr, "Failed to open device: %s\n", device.identifier); exit (1); } printf( "Opened video capture device: %s\n", device.identifier ); /* Find a suitable video format that we can convert to RGB24. */ while (SUCCESS (unicap_enumerate_formats (handle, NULL, &src_format, index))) { printf ("Trying video format: %s\n", src_format.identifier); if (ucil_conversion_supported (FOURCC ('R', 'G', 'B', '3'), src_format.fourcc)) { conversion_found = 1; break; } index++; } if (!conversion_found) { fprintf (stderr, "Could not find a suitable video format\n\n"); fprintf (stderr, "This probably means you have a cheap camera.\n"); fprintf (stderr, "Feel free to either get a better camera, or improve\n"); fprintf (stderr, "the colorspace conversion code in this example.\n"); exit (1); } src_format.buffer_type = UNICAP_BUFFER_TYPE_USER; if (!SUCCESS (unicap_set_format (handle, &src_format))) { fprintf (stderr, "Failed to set video format\n"); exit (1); } printf ("Using video format: %s [%dx%d]\n", src_format.identifier, src_format.size.width, src_format.size.height); /* Clone destination format with equal dimensions, but RGB24 colorspace. */ unicap_copy_format (&dest_format, &src_format); strcpy (dest_format.identifier, "RGB 24bpp"); dest_format.fourcc = FOURCC ('R', 'G', 'B', '3'); dest_format.bpp = 24; dest_format.buffer_size = dest_format.size.width * dest_format.size.height * 3; /* Initialise image buffers. */ memset (&src_buffer, 0, sizeof (unicap_data_buffer_t)); src_buffer.data = (unsigned char*) malloc (src_format.buffer_size); src_buffer.buffer_size = src_format.buffer_size; memset (&dest_buffer, 0, sizeof (unicap_data_buffer_t)); dest_buffer.data = (unsigned char*) malloc (dest_format.buffer_size); dest_buffer.buffer_size = dest_format.buffer_size; dest_buffer.format = dest_format; /* Start video capture. */ if (!SUCCESS (unicap_start_capture (handle))) { fprintf (stderr, "Failed to start capture on device: %s\n", device.identifier); exit (1); } /* Loop, publising one message per raw video frame. */ while (1) { zmq_msg_t msg; size_t msg_size; uint32_t image_width, image_height; unsigned char *data; /* Queue buffer for video capture. */ if (!SUCCESS (unicap_queue_buffer (handle, &src_buffer))) { fprintf (stderr, "Failed to queue a buffer on device: %s\n", device.identifier); exit (1); } /* Wait until buffer is ready. */ if (!SUCCESS (unicap_wait_buffer (handle, &returned_buffer))) { fprintf (stderr, "Failed to wait for buffer on device: %s\n", device.identifier); exit (1); } /* Convert colorspace. */ if (!SUCCESS (ucil_convert_buffer (&dest_buffer, &src_buffer))) { /* TODO: This fails sometimes for unknown reasons, just skip the frame for now. */ /* fprintf (stderr, "Failed to convert video buffer\n"); */ } /* Create 0MQ message and fill in data. */ msg_size = dest_format.buffer_size + (2 * sizeof (uint32_t)); rc = zmq_msg_init_size (&msg, msg_size); assert (rc == 0); data = (unsigned char *)zmq_msg_data (&msg); /* Image width (uint32_t in network byte order) */ image_width = htonl (dest_format.size.width); memcpy (data, &image_width, sizeof (uint32_t)); data += sizeof (uint32_t); /* Image height (uint32_t in network byte order) */ image_height = htonl (dest_format.size.height); memcpy (data, &image_height, sizeof (uint32_t)); data += sizeof (uint32_t); /* RGB24 image data. */ memcpy (data, dest_buffer.data, dest_format.buffer_size); /* Send message to network. */ rc = zmq_send (s, &msg, 0); assert (rc == 0); zmq_msg_close (&msg); /* Limit to maximum frame rate if requested */ if (sender_args->frame_rate > 1) usleep (1000000 / sender_args->frame_rate); } return NULL; } /* * Signal handling thread */ /* Signal thread initialiser */ struct signal_args_t { void *ctx; }; void *signal_thread (void *arg) { sigset_t wait_set; int rc, signo; struct signal_args_t *signal_args; void *s; zmq_msg_t msg; signal_args = (struct signal_args_t *)arg; /* Create a ZMQ_PUB socket for publising signal events back to the main thread */ s = zmq_socket (signal_args->ctx, ZMQ_PUB); assert (s); rc = zmq_connect (s, local_signal); assert (rc == 0); /* Handle various termination signals and SIGALRM for alarm timer */ sigemptyset (&wait_set); sigaddset (&wait_set, SIGTERM); sigaddset (&wait_set, SIGINT); sigaddset (&wait_set, SIGQUIT); sigaddset (&wait_set, SIGALRM); /* Loop and send a message to the main thread for each signal received */ while (1) { rc = sigwait (&wait_set, &signo); assert (rc == 0); rc = zmq_msg_init_size (&msg, sizeof (int)); assert (rc == 0); memcpy (zmq_msg_data (&msg), &signo, sizeof (int)); rc = zmq_send (s, &msg, 0); assert (rc == 0); zmq_msg_close (&msg); } } /* * Main thread */ /* Print usage and exit. */ void err_usage(char *error) { const char usage[] = \ "Usage: zmq-camera [OPTIONS] -r | -s ENDPOINT\n" "Sends or receives video using 0MQ.\n" "\n" "Mode:\n" " -r receive video from ENDPOINT\n" " -s send video to ENDPOINT\n" "\n" "ENDPOINT is any 0MQ endpoint valid for a ZMQ_PUB/ZMQ_SUB socket.\n" "\n" "Options:\n" " -f RATE limit maximum frame rate to RATE\n" "\n" "Examples:\n" "\n" " zmq-camera -s tcp://eth0:5555\n" " zmq-camera -r tcp://eth0:5555\n"; if (error) fprintf (stderr, "Error: %s\n", error); fprintf (stderr, "%s", usage); exit (1); } /* Main program. */ int main (int argc, char *argv []) { void *ctx; void *s_video; /* Video socket */ void *s_signal; /* Signalling socket */ int rc; int opt; int opt_sender; /* Sender mode? */ int opt_receiver; int opt_frame_rate, frame_rate; char *endpoint = NULL; SDL_Surface *screen = NULL; SDL_Surface *rgb_surface = NULL; SDL_Event event; /* Event from UI */ uint32_t image_width, image_height; int sdl_initialised = 0; int quit = 0; sigset_t block_set; struct itimerval alarm_timer; uint64_t received_fps = 0; /* Frames received in the last second */ uint64_t received_bps = 0; /* Bytes received in the last second */ double received_mbps; /* Video bandwidth in megabits per second */ char window_title[80]; zmq_msg_t msg; size_t msg_size; unsigned char *data; pthread_t signal_pthread, sender_pthread; struct signal_args_t signal_args; struct sender_args_t sender_args; int received_signo; /* signo received from signal thread */ /* Parse command line. */ opt_sender = opt_receiver = opt_frame_rate = frame_rate = 0; while ((opt = getopt (argc, argv, "rsf:")) != -1) { switch (opt) { case 'r': opt_receiver = 1; break; case 's': opt_sender = 1; break; case 'f': opt_frame_rate = 1; frame_rate = atoi (optarg); if (frame_rate < 1) err_usage ("RATE must be numeric and non-zero"); printf ("Limiting frame rate to %d fps\n", frame_rate); break; default: err_usage (NULL); } } if (optind >= argc) err_usage ("Expected ENDPOINT argument after options"); if (opt_receiver && opt_sender) err_usage ("Please specify either the -r or -s option"); if ((opt_receiver == 0) && (opt_sender == 0)) err_usage ("Please specify either the -r or -s option"); endpoint = argv [optind]; /* Block signal handling in all application and 0MQ threads */ sigfillset (&block_set); pthread_sigmask (SIG_SETMASK, &block_set, NULL); /* Initialise 0MQ infrastructure with a single I/O thread */ ctx = zmq_init (1); assert (ctx); /* Create a ZMQ_SUB socket to receive signal events and start the signal handling thread */ s_signal = zmq_socket (ctx, ZMQ_SUB); assert (s_signal); rc = zmq_bind (s_signal, local_signal); assert (rc == 0); signal_args.ctx = ctx; rc = pthread_create (&signal_pthread, NULL, signal_thread, (void *)&signal_args); assert (rc == 0); /* Create a ZMQ_SUB socket to receive video data. */ s_video = zmq_socket (ctx, ZMQ_SUB); assert (s_video); /* Set a suitable rate limit and recovery window for multicast transports. This must be done before attempting to connect or bind the socket to the multicast transport. */ rc = zmq_setsockopt (s_video, ZMQ_RATE, &mcast_rate, sizeof mcast_rate); assert (rc == 0); rc = zmq_setsockopt (s_video, ZMQ_RECOVERY_IVL, &recovery_ivl, sizeof recovery_ivl); assert (rc == 0); /* If we're sending video, bind to an inproc: endpoint that the sender thread will connect to, otherwise connect to the remote endpoint the user specified. */ if (opt_sender) { rc = zmq_bind (s_video, local_camera); assert (rc == 0); /* Start the sender thread after binding to the inproc: endpoint since this must exist for it to connect. */ sender_args.endpoint = endpoint; sender_args.ctx = ctx; sender_args.frame_rate = frame_rate; rc = pthread_create (&sender_pthread, NULL, sender_thread, (void *)&sender_args); assert (rc == 0); } else { rc = zmq_connect (s_video, endpoint); if (rc != 0) { fprintf (stderr, "zmq_connect (\"%s\"): %s\n", endpoint, zmq_strerror (errno)); exit (1); } } /* Subscribe to all messages on video and signal sockets. */ rc = zmq_setsockopt (s_video, ZMQ_SUBSCRIBE, "", 0); assert (rc == 0); rc = zmq_setsockopt (s_signal, ZMQ_SUBSCRIBE, "", 0); assert (rc == 0); /* Set up an interval timer to fire SIGALRM once a second so that we can update bandwidth and fps statistics */ alarm_timer.it_interval.tv_sec = 1; alarm_timer.it_interval.tv_usec = 0; alarm_timer.it_value.tv_sec = 1; alarm_timer.it_value.tv_usec = 0; rc = setitimer (ITIMER_REAL, &alarm_timer, NULL); assert (rc == 0); /* Intial window title is the endpoint we are receiving from */ snprintf (window_title, 80, "%s", opt_sender ? local_camera : endpoint); /* Set up pollset */ zmq_pollitem_t items[2]; items[0].socket = s_video; items[0].events = ZMQ_POLLIN; items[1].socket = s_signal; items[1].events = ZMQ_POLLIN; /* * Main event loop */ while (!quit) { /* Poll for incoming messages on either the video or signalling socket */ rc = zmq_poll (items, 2, 100000); assert (rc >= 0); /* * Signal received */ if (items[1].revents == ZMQ_POLLIN) { rc = zmq_msg_init (&msg); assert (rc == 0); rc = zmq_recv (s_signal, &msg, 0); assert (rc == 0); assert (zmq_msg_size (&msg) == sizeof (int)); memcpy (&received_signo, zmq_msg_data (&msg), sizeof (int)); switch (received_signo) { /* Termination request */ case SIGTERM: case SIGINT: case SIGQUIT: fprintf (stderr, "Exiting.\n"); exit (1); /* Alarm timer fired, update bandwidth statistics */ case SIGALRM: received_mbps = (double)received_bps * 8 / (1<<20); snprintf (window_title, 80, "%s (%llu fps, %.2lf Mbps)", opt_sender ? local_camera : endpoint, (long long unsigned int) received_fps, received_mbps); if (sdl_initialised) SDL_WM_SetCaption (window_title, window_title); received_fps = received_bps = 0; break; /* Ignore anything else */ default: ; } zmq_msg_close (&msg); } /* * Video frame received */ if (items[0].revents == ZMQ_POLLIN) { rc = zmq_msg_init (&msg); assert (rc == 0); rc = zmq_recv (s_video, &msg, 0); assert (rc == 0); /* Parse message data. */ data = (unsigned char*) zmq_msg_data (&msg); /* Sanity check that we have at least the width, height in the message data */ msg_size = zmq_msg_size (&msg); assert (msg_size >= sizeof (uint32_t) + sizeof (uint32_t)); /* Get image width in pixels. */ memcpy (&image_width, data, sizeof (uint32_t)); image_width = ntohl (image_width); data += sizeof (uint32_t); /* Get image height in pixels. */ memcpy (&image_height, data, sizeof (uint32_t)); image_height = ntohl (image_height); data += sizeof (uint32_t); /* Sanity check message size again */ assert (msg_size >= image_width * image_height * 3); /* data now points to RGB24 pixel data. */ if (!sdl_initialised) { /* Initialise SDL if not already done. We need to have received at least one message, so that we know what the image size being sent is. */ if (SDL_Init (SDL_INIT_VIDEO) < 0) { fprintf (stderr, "Failed to initialize SDL: %s\n", SDL_GetError()); exit (1); } screen = SDL_SetVideoMode (image_width, image_height, 32, SDL_HWSURFACE); if (screen == NULL) { fprintf (stderr, "Unable to set video mode: %s\n", SDL_GetError ()); SDL_Quit (); exit (1); } /* Set initial window title */ SDL_WM_SetCaption (window_title, window_title); sdl_initialised = 1; } /* Create RGB surface. */ rgb_surface = SDL_CreateRGBSurfaceFrom ( data, /* Pixel data */ image_width, /* Width */ image_height, /* Height */ 24, /* Depth */ image_width * 3, /* Scanline pitch */ 0x000000ff, /* Red */ 0x0000ff00, /* Green */ 0x00ff0000, /* Blue */ 0); /* No alpha */ /* Blit surface to screen. */ SDL_BlitSurface (rgb_surface, NULL, screen, NULL); SDL_UpdateRect (screen, 0, 0, 0, 0); SDL_FreeSurface (rgb_surface); /* Free zmq_msg we received */ zmq_msg_close (&msg); /* Update statistics */ received_fps += 1; received_bps += msg_size; } /* * Poll for UI events */ while (sdl_initialised && SDL_PollEvent (&event)) { if (event.type == SDL_QUIT) quit = 1; if (event.type == SDL_KEYDOWN) if (event.key.keysym.sym == SDLK_ESCAPE) quit = 1; } } /* TODO: Send a 'stop' message to sender thread rather than killing it forcefully by terminating the main thread. This would allow it to call zmq_close() correctly and us to call zmq_term() here. */ /* Cleanup */ SDL_Quit (); return 0; }