From 73bbb9dfcfaa75720f90d35f4f9e9bf731ea9bc5 Mon Sep 17 00:00:00 2001 From: LRN Date: Wed, 8 Jan 2014 14:14:44 +0000 Subject: Add GStreamer-based implementation of conversation helpers --- configure.ac | 58 +++- src/Makefile.am | 8 +- src/conversation/Makefile.am | 58 +++- .../gnunet-helper-audio-playback-gst.c | 372 +++++++++++++++++++++ src/conversation/gnunet-helper-audio-record-gst.c | 334 ++++++++++++++++++ 5 files changed, 801 insertions(+), 29 deletions(-) create mode 100755 src/conversation/gnunet-helper-audio-playback-gst.c create mode 100755 src/conversation/gnunet-helper-audio-record-gst.c diff --git a/configure.ac b/configure.ac index a9ba0125c..7aea4d12e 100644 --- a/configure.ac +++ b/configure.ac @@ -388,10 +388,7 @@ AC_CHECK_LIB(pulse,pa_stream_peek, [AC_CHECK_HEADER([pulse/simple.h],pulse=1)]) if test "$pulse" = 1 then - AM_CONDITIONAL(HAVE_PULSE, true) AC_DEFINE([HAVE_PULSE],[1],[Have libpulse(audio) library]) -else - AM_CONDITIONAL(HAVE_PULSE, false) fi if test "$build_target" = "mingw" then @@ -404,12 +401,39 @@ AC_CHECK_LIB(opus,opus_decode_float, [AC_CHECK_HEADER([opus/opus.h],opus=1)]) if test "$opus" = 1 then - AM_CONDITIONAL(HAVE_OPUS, true) AC_DEFINE([HAVE_OPUS],[1],[Have libopus library]) -else - AM_CONDITIONAL(HAVE_OPUS, false) fi +gst=0 +PKG_CHECK_MODULES( + [GST], + [glib-2.0 gobject-2.0 gstreamer-1.0 gstreamer-app-1.0 gstreamer-audio-1.0], + [ + gst=1 + AC_MSG_RESULT(ok) + ], [ + gst=0 + AC_MSG_RESULT(not found) + ]) + +# Pulse Audio +if test "x$pulse" != "x1" -o "x$opus" != "x1" +then + if test "x$gst" != "x1" -o "x$opus" != "x1" + then + conversation_backend=none + AM_CONDITIONAL(BUILD_PULSE_HELPERS, false) + AM_CONDITIONAL(BUILD_GST_HELPERS, false) + else + conversation_backend=gst + AM_CONDITIONAL(BUILD_PULSE_HELPERS, false) + AM_CONDITIONAL(BUILD_GST_HELPERS, true) + fi +else + conversation_backend=pulse + AM_CONDITIONAL(BUILD_PULSE_HELPERS, true) + AM_CONDITIONAL(BUILD_GST_HELPERS, false) +fi # libgnurl LIBGNURL_CHECK_CONFIG(,7.34.0,gnurl=1,gnurl=0) @@ -1519,17 +1543,23 @@ then AC_MSG_NOTICE([NOTICE: libmicrohttpd not found, http transport will not be installed.]) fi -# Pulse Audio -if test "x$pulse" != "x1" +# conversation +if test "x$conversation_backend" == "xnone" then - AC_MSG_NOTICE([NOTICE: libpulse(audio) not found, conversation will not be built.]) + if test "x$pulse" != "x1" + then + AC_MSG_NOTICE([NOTICE: libpulse(audio) not found, conversation will not be built.]) + fi + if test "x$opus" != "x1" + then + AC_MSG_NOTICE([NOTICE: libopus not found, conversation will not be built.]) + fi + if test "x$gst" != "x1" + then + AC_MSG_NOTICE([NOTICE: GStreamer not found, conversation will not be built.]) + fi fi -# Opus -if test "x$opus" != "x1" -then - AC_MSG_NOTICE([NOTICE: libopus not found, conversation will not be built.]) -fi AC_MSG_NOTICE([NOTICE: Database support is set to MySQL: $mysql, SQLite: $sqlite, Postgres: $postgres]) diff --git a/src/Makefile.am b/src/Makefile.am index 94345a5bd..0aa1ec4d1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,9 +24,11 @@ if HAVE_EXPERIMENTAL endif -if HAVE_PULSE -if HAVE_OPUS - CONVERSATION_DIR = conversation +if BUILD_PULSE_HELPERS +CONVERSATION_DIR = conversation +else +if BUILD_GST_HELPERS +CONVERSATION_DIR = conversation endif endif diff --git a/src/conversation/Makefile.am b/src/conversation/Makefile.am index e455ba45e..a11763cc5 100644 --- a/src/conversation/Makefile.am +++ b/src/conversation/Makefile.am @@ -2,6 +2,10 @@ SUBDIRS = . plugindir = $(libdir)/gnunet +if MINGW + WINFLAGS = -no-undefined -Wl,--export-all-symbols +endif + AM_CPPFLAGS = \ $(GNUNET_CPPFLAGS) \ -I$(top_srcdir)/src/include \ @@ -69,26 +73,30 @@ bin_PROGRAMS = \ libexec_PROGRAMS = \ gnunet-service-conversation -if HAVE_PULSE -if HAVE_OPUS -libexec_PROGRAMS += \ - gnunet-helper-audio-record \ - gnunet-helper-audio-playback -endif -endif - - check_PROGRAMS = \ test_conversation_api \ test_conversation_api_reject \ test_conversation_api_twocalls -if HAVE_PULSE -if HAVE_OPUS -TESTS = $(check_PROGRAMS) +if BUILD_PULSE_HELPERS +AUDIO_HELPER_RECD=gnunet-helper-audio-record +AUDIO_HELPER_PLAY=gnunet-helper-audio-playback +AUDIO_TESTS=$(check_PROGRAMS) +else +if BUILD_GST_HELPERS +AUDIO_HELPER_RECD=gnunet-helper-audio-record +AUDIO_HELPER_PLAY=gnunet-helper-audio-playback +AUDIO_TESTS=$(check_PROGRAMS) endif endif +libexec_PROGRAMS += \ + $(AUDIO_HELPER_RECD) \ + $(AUDIO_HELPER_PLAY) + +TESTS = $(AUDIO_TESTS) + +if BUILD_PULSE_HELPERS gnunet_helper_audio_record_SOURCES = \ gnunet-helper-audio-record.c gnunet_helper_audio_record_LDADD = \ @@ -106,6 +114,32 @@ gnunet_helper_audio_playback_LDADD = \ $(INTLLIBS) gnunet_helper_audio_playback_LDFLAGS = \ $(GNUNET_LDFLAGS) $(WINFLAGS) +else +if BUILD_GST_HELPERS +gnunet_helper_audio_record_SOURCES = \ + gnunet-helper-audio-record-gst.c +gnunet_helper_audio_record_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(GST_LIBS) \ + $(INTLLIBS) +gnunet_helper_audio_record_LDFLAGS = \ + $(GNUNET_LDFLAGS) $(WINFLAGS) $(GST_LDFLAGS) +gnunet_helper_audio_record_CFLAGS = \ + $(GST_CFLAGS) + +gnunet_helper_audio_playback_SOURCES = \ + gnunet-helper-audio-playback-gst.c +gnunet_helper_audio_playback_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + -lopus \ + $(GST_LIBS) \ + $(INTLLIBS) +gnunet_helper_audio_playback_LDFLAGS = \ + $(GNUNET_LDFLAGS) $(WINFLAGS) $(GST_LDFLAGS) +gnunet_helper_audio_playback_CFLAGS = \ + $(GST_CFLAGS) +endif +endif gnunet_service_conversation_SOURCES = \ gnunet-service-conversation.c diff --git a/src/conversation/gnunet-helper-audio-playback-gst.c b/src/conversation/gnunet-helper-audio-playback-gst.c new file mode 100755 index 000000000..d6d2316fc --- /dev/null +++ b/src/conversation/gnunet-helper-audio-playback-gst.c @@ -0,0 +1,372 @@ +/* + This file is part of GNUnet. + (C) 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file conversation/gnunet-helper-audio-playback-gst.c + * @brief program to playback audio data to the speaker (GStreamer version) + * @author LRN + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "conversation.h" +#include "gnunet_constants.h" +#include "gnunet_core_service.h" + +#include +#include +#include +#include + +#include +#include + +/** + * How much data to read in one go + */ +#define MAXLINE 4096 + +#define SAMPLING_RATE 48000 + +#define CHANNELS 1 + +#define FRAME_SIZE (SAMPLING_RATE / 50) + +#define PCM_LENGTH (FRAME_SIZE * CHANNELS * sizeof (int16_t)) + +/** + * Max number of microseconds to buffer in audiosink. + * Default is 200000 + */ +#define BUFFER_TIME 1000 + +/** + * Min number of microseconds to buffer in audiosink. + * Default is 10000 + */ +#define LATENCY_TIME 1000 + +/** + * Tokenizer for the data we get from stdin + */ +struct GNUNET_SERVER_MessageStreamTokenizer *stdin_mst; + +/** + * Main pipeline. + */ +static GstElement *pipeline; + +/** + * Appsrc instance into which we write data for the pipeline. + */ +static GstElement *source; + +/** + * OPUS decoder + */ +static OpusDecoder *dec; + + +/** + * Set to 1 to break the reading loop + */ +static int abort_read; + + +/** + * OPUS initialization + */ +static void +opus_init () +{ + int err; + int channels = 1; + + dec = opus_decoder_create (SAMPLING_RATE, channels, &err); +} + +void +sink_child_added (GstChildProxy *child_proxy, GObject *object, gchar *name, gpointer user_data) +{ + if (GST_IS_AUDIO_BASE_SRC (object)) + g_object_set (object, "buffer-time", (gint64) BUFFER_TIME, "latency-time", (gint64) LATENCY_TIME, NULL); +} + +static void +quit () +{ + if (NULL != source) + gst_app_src_end_of_stream (GST_APP_SRC (source)); + if (NULL != pipeline) + gst_element_set_state (pipeline, GST_STATE_NULL); + abort_read = 1; +} + +static gboolean +bus_call (GstBus *bus, GstMessage *msg, gpointer data) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bus message\n"); + switch (GST_MESSAGE_TYPE (msg)) + { + case GST_MESSAGE_EOS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "End of stream\n"); + quit (); + break; + + case GST_MESSAGE_ERROR: + { + gchar *debug; + GError *error; + + gst_message_parse_error (msg, &error, &debug); + g_free (debug); + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error: %s\n", error->message); + g_error_free (error); + + quit (); + break; + } + default: + break; + } + + return TRUE; +} + + +static void +signalhandler (int s) +{ + quit (); +} + + +/** + * Message callback + */ +static int +stdin_receiver (void *cls, + void *client, + const struct GNUNET_MessageHeader *msg) +{ + struct AudioMessage *audio; + GstBuffer *b; + int16_t *bufspace; + GstFlowReturn flow; + int ret; + + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO: + audio = (struct AudioMessage *) msg; + + bufspace = (int16_t *) g_malloc (PCM_LENGTH); + + ret = opus_decode (dec, + (const unsigned char *) &audio[1], + ntohs (audio->header.size) - sizeof (struct AudioMessage), + bufspace, + FRAME_SIZE, 0); + if (ret < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Opus decoding failed: %d\n", + ret); + g_free (bufspace); + return GNUNET_OK; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Decoded frame with %u bytes\n", + ntohs (audio->header.size)); + + b = gst_buffer_new_wrapped (bufspace, ret * sizeof (int16_t)); + if (NULL == b) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to wrap a buffer\n"); + g_free (bufspace); + return GNUNET_SYSERR; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pushing...\n"); + flow = gst_app_src_push_buffer (GST_APP_SRC (source), b); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pushed!\n"); + /* They all return GNUNET_OK, because currently player stops when + * data stops coming. This might need to be changed for the player + * to also stop when pipeline breaks. + */ + switch (flow) + { + case GST_FLOW_OK: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fed %u bytes to the pipeline\n", + (unsigned int) ret * sizeof (int16_t)); + break; + case GST_FLOW_FLUSHING: + /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Dropped a buffer\n"); + break; + case GST_FLOW_EOS: + /* end of stream */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "EOS\n"); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unexpected push result\n"); + break; + } + break; + default: + break; + } + return GNUNET_OK; +} + + +int +main (int argc, char **argv) +{ + GstElement *conv, *resampler, *sink; + GstBus *bus; + GstCaps *caps; + guint bus_watch_id; + uint64_t toff; + + typedef void (*SignalHandlerPointer) (int); + + SignalHandlerPointer inthandler, termhandler; + + inthandler = signal (SIGINT, signalhandler); + termhandler = signal (SIGTERM, signalhandler); + +#ifdef WINDOWS + setmode (0, _O_BINARY); +#endif + + opus_init (); + + /* Initialisation */ + gst_init (&argc, &argv); + + GNUNET_assert (GNUNET_OK == + GNUNET_log_setup ("gnunet-helper-audio-playback", + "WARNING", + NULL)); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Audio sink starts\n"); + + stdin_mst = GNUNET_SERVER_mst_create (&stdin_receiver, NULL); + + /* Create gstreamer elements */ + pipeline = gst_pipeline_new ("audio-player"); + source = gst_element_factory_make ("appsrc", "audio-input"); + conv = gst_element_factory_make ("audioconvert", "converter"); + resampler= gst_element_factory_make ("audioresample", "resampler"); + sink = gst_element_factory_make ("autoaudiosink", "audiosink"); + + if (!pipeline || !source || !conv || !resampler || !sink) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "One element could not be created. Exiting.\n"); + return -1; + } + + g_signal_connect (sink, "child-added", G_CALLBACK (sink_child_added), NULL); + + caps = gst_caps_new_simple ("audio/x-raw", + "format", G_TYPE_STRING, "S16LE", + "rate", G_TYPE_INT, SAMPLING_RATE, + "channels", G_TYPE_INT, CHANNELS, + "layout", G_TYPE_STRING, "interleaved", + NULL); + gst_app_src_set_caps (GST_APP_SRC (source), caps); + gst_caps_unref (caps); + + /* Keep a reference to it, we operate on it */ + gst_object_ref (GST_OBJECT (source)); + + /* Set up the pipeline */ + + /* we feed appsrc as fast as possible, it just blocks when it's full */ + g_object_set (G_OBJECT (source), + "format", GST_FORMAT_TIME, + "block", TRUE, + "is-live", TRUE, + NULL); + + /* we add a message handler */ + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline); + gst_object_unref (bus); + + /* we add all elements into the pipeline */ + /* audio-input | converter | resampler | audiosink */ + gst_bin_add_many (GST_BIN (pipeline), source, conv, + resampler, sink, NULL); + + /* we link the elements together */ + gst_element_link_many (source, conv, resampler, sink, NULL); + + /* Set the pipeline to "playing" state*/ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n"); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running...\n"); + /* Iterate */ + toff = 0; + while (!abort_read) + { + char readbuf[MAXLINE]; + int ret; + + ret = read (0, readbuf, sizeof (readbuf)); + if (0 > ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Read error from STDIN: %d %s\n"), + ret, strerror (errno)); + break; + } + toff += ret; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received %d bytes of audio data (total: %llu)\n", + (int) ret, + toff); + if (0 == ret) + break; + GNUNET_SERVER_mst_receive (stdin_mst, NULL, + readbuf, ret, + GNUNET_NO, GNUNET_NO); + } + GNUNET_SERVER_mst_destroy (stdin_mst); + + signal (SIGINT, inthandler); + signal (SIGINT, termhandler); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Returned, stopping playback\n"); + quit (); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Deleting pipeline\n"); + gst_object_unref (GST_OBJECT (source)); + source = NULL; + gst_object_unref (GST_OBJECT (pipeline)); + pipeline = NULL; + g_source_remove (bus_watch_id); + + return 0; +} diff --git a/src/conversation/gnunet-helper-audio-record-gst.c b/src/conversation/gnunet-helper-audio-record-gst.c new file mode 100755 index 000000000..8d7a88fab --- /dev/null +++ b/src/conversation/gnunet-helper-audio-record-gst.c @@ -0,0 +1,334 @@ +/* + This file is part of GNUnet. + (C) 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file conversation/gnunet-helper-audio-record-gst.c + * @brief program to record audio data from the microphone (GStreamer version) + * @author LRN + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "conversation.h" +#include "gnunet_constants.h" +#include "gnunet_core_service.h" + +#include +#include +#include +#include + +/** + * Number of channels. + * Must be one of the following (from libopusenc documentation): + * 1, 2 + */ +#define OPUS_CHANNELS 1 + +/** + * Maximal size of a single opus packet. + */ +#define MAX_PAYLOAD_SIZE (1024 / OPUS_CHANNELS) + +/** + * Size of a single frame fed to the encoder, in ms. + * Must be one of the following (from libopus documentation): + * 2.5, 5, 10, 20, 40 or 60 + */ +#define OPUS_FRAME_SIZE 20 + +/** + * Expected packet loss to prepare for, in percents. + */ +#define PACKET_LOSS_PERCENTAGE 1 + +/** + * Set to 1 to enable forward error correction. + * Set to 0 to disable. + */ +#define INBAND_FEC_MODE 1 + +/** + * Max number of microseconds to buffer in audiosource. + * Default is 200000 + */ +#define BUFFER_TIME 1000 + +/** + * Min number of microseconds to buffer in audiosource. + * Default is 10000 + */ +#define LATENCY_TIME 1000 + +/** + * Main pipeline. + */ +static GstElement *pipeline; + +static void +quit () +{ + if (NULL != pipeline) + gst_element_set_state (pipeline, GST_STATE_NULL); +} + +static gboolean +bus_call (GstBus *bus, GstMessage *msg, gpointer data) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bus message\n"); + switch (GST_MESSAGE_TYPE (msg)) + { + case GST_MESSAGE_EOS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "End of stream\n"); + quit (); + break; + + case GST_MESSAGE_ERROR: + { + gchar *debug; + GError *error; + + gst_message_parse_error (msg, &error, &debug); + g_free (debug); + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error: %s\n", error->message); + g_error_free (error); + + quit (); + break; + } + default: + break; + } + + return TRUE; +} + +void +source_child_added (GstChildProxy *child_proxy, GObject *object, gchar *name, gpointer user_data) +{ + if (GST_IS_AUDIO_BASE_SRC (object)) + g_object_set (object, "buffer-time", (gint64) BUFFER_TIME, "latency-time", (gint64) LATENCY_TIME, NULL); +} + +static void +signalhandler (int s) +{ + quit (); +} + + +int +main (int argc, char **argv) +{ + GstElement *source, *encoder, *conv, *resampler, *sink; + GstBus *bus; + guint bus_watch_id; + struct AudioMessage audio_message; + int abort_send = 0; + + typedef void (*SignalHandlerPointer) (int); + + SignalHandlerPointer inthandler, termhandler; + inthandler = signal (SIGINT, signalhandler); + termhandler = signal (SIGTERM, signalhandler); + +#ifdef WINDOWS + setmode (1, _O_BINARY); +#endif + + /* Initialisation */ + gst_init (&argc, &argv); + + GNUNET_assert (GNUNET_OK == + GNUNET_log_setup ("gnunet-helper-audio-record", + "WARNING", + NULL)); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Audio source starts\n"); + + audio_message.header.type = htons (GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO); + + /* Create gstreamer elements */ + pipeline = gst_pipeline_new ("audio-recorder"); + source = gst_element_factory_make ("autoaudiosrc", "audiosource"); + conv = gst_element_factory_make ("audioconvert", "converter"); + resampler= gst_element_factory_make ("audioresample", "resampler"); + encoder = gst_element_factory_make ("opusenc", "opus-encoder"); + sink = gst_element_factory_make ("appsink", "audio-output"); + + if (!pipeline || !source || !conv || !resampler || !encoder || !sink) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "One element could not be created. Exiting.\n"); + return -1; + } + + g_signal_connect (source, "child-added", G_CALLBACK (source_child_added), NULL); + + /* Set up the pipeline */ + + g_object_set (G_OBJECT (encoder), +/* "bitrate", 64000, */ +/* "bandwidth", OPUS_BANDWIDTH_FULLBAND, */ + "inband-fec", INBAND_FEC_MODE, + "packet-loss-percentage", PACKET_LOSS_PERCENTAGE, + "max-payload-size", MAX_PAYLOAD_SIZE, + "audio", FALSE, /* VoIP, not audio */ + "frame-size", OPUS_FRAME_SIZE, + NULL); + + /* we add a message handler */ + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline); + gst_object_unref (bus); + + /* we add all elements into the pipeline */ + /* audiosource | converter | resampler | opus-encoder | audio-output */ + gst_bin_add_many (GST_BIN (pipeline), source, conv, resampler, encoder, + sink, NULL); + + /* we link the elements together */ + gst_element_link_many (source, conv, resampler, encoder, sink, NULL); + + /* Set the pipeline to "playing" state*/ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n"); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running...\n"); + /* Iterate */ + while (!abort_send) + { + GstSample *s; + GstBuffer *b; + GstMapInfo m; + size_t len, msg_size; + const char *ptr; + int phase; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pulling...\n"); + s = gst_app_sink_pull_sample (GST_APP_SINK (sink)); + if (NULL == s) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pulled NULL\n"); + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "...pulled!\n"); + { + const GstStructure *si; + char *si_str; + GstCaps *s_caps; + char *caps_str; + si = gst_sample_get_info (s); + if (si) + { + si_str = gst_structure_to_string (si); + if (si_str) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got sample %s\n", si_str); + g_free (si_str); + } + } + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got sample with no info\n"); + s_caps = gst_sample_get_caps (s); + if (s_caps) + { + caps_str = gst_caps_to_string (s_caps); + if (caps_str) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got sample with caps %s\n", caps_str); + g_free (caps_str); + } + } + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got sample with no caps\n"); + } + b = gst_sample_get_buffer (s); + if (NULL == b || !gst_buffer_map (b, &m, GST_MAP_READ)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got NULL buffer %p or failed to map the buffer\n", b); + gst_sample_unref (s); + continue; + } + + len = m.size; + if (len > UINT16_MAX - sizeof (struct AudioMessage)) + { + GNUNET_break (0); + len = UINT16_MAX - sizeof (struct AudioMessage); + } + msg_size = sizeof (struct AudioMessage) + len; + audio_message.header.size = htons ((uint16_t) msg_size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes of audio data\n", (unsigned int) msg_size); + for (phase = 0; phase < 2; phase++) + { + size_t offset; + size_t to_send; + ssize_t ret; + if (0 == phase) + { + ptr = (const char *) &audio_message; + to_send = sizeof (audio_message); + } + else + { + ptr = (const char *) m.data; + to_send = len; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes on phase %d\n", (unsigned int) to_send, phase); + for (offset = 0; offset < to_send; offset += ret) + { + ret = write (1, &ptr[offset], to_send - offset); + if (0 >= ret) + { + if (-1 == ret) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to write %u bytes at offset %u (total %u) in phase %d: %s\n", + (unsigned int) to_send - offset, (unsigned int) offset, + (unsigned int) (to_send + offset), phase, strerror (errno)); + abort_send = 1; + break; + } + } + if (abort_send) + break; + } + gst_buffer_unmap (b, &m); + gst_sample_unref (s); + } + + signal (SIGINT, inthandler); + signal (SIGINT, termhandler); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Returned, stopping playback\n"); + quit (); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Deleting pipeline\n"); + gst_object_unref (GST_OBJECT (pipeline)); + pipeline = NULL; + g_source_remove (bus_watch_id); + + return 0; +} -- cgit v1.2.3