commit 7596736e0274915b71d5b40e2702b078a5d47999
parent 8396bf70ed01484dab4bca01e3acebd9920586d2
Author: Jacki <jacki@thejackimonster.de>
Date: Thu, 11 Jul 2024 21:43:39 +0200
Use file descriptor to pipe audio data from GStreamer into GNUnet thread fixing synchronization
Signed-off-by: Jacki <jacki@thejackimonster.de>
Diffstat:
| M | src/discourse.c | | | 106 | +++++++++---------------------------------------------------------------------- |
1 file changed, 11 insertions(+), 95 deletions(-)
diff --git a/src/discourse.c b/src/discourse.c
@@ -69,7 +69,7 @@ _setup_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info)
NULL
);
- g_object_set(info->stream_source, "format", GST_FORMAT_TIME, "caps", caps, NULL);
+ g_object_set(info->stream_source, "format", GST_FORMAT_TIME, "caps", caps, "is-live", TRUE, NULL);
gst_caps_unref(caps);
}
@@ -192,103 +192,13 @@ skip_buffer:
return;
}
-static gboolean
-_new_sample_task(gpointer user_data)
-{
- MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) user_data;
-
- pthread_mutex_lock(&(info->mutex));
- info->sending_task = 0;
-
- GList *list = info->samples;
- while (list)
- {
- GArray *array = (GArray*) (list->data);
-
- if (array)
- {
- GNUNET_CHAT_discourse_write(
- info->discourse,
- (const char*) array->data,
- array->len
- );
-
- g_array_free(array, TRUE);
- }
-
- list = g_list_next(list);
- }
-
- g_list_free(info->samples);
- info->samples = NULL;
-
- pthread_mutex_unlock(&(info->mutex));
- return FALSE;
-}
-
-static GstFlowReturn
-_new_audio_sample(GstElement *sink, gpointer data)
-{
- g_assert((sink) && (data));
-
- MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) data;
-
- GstSample *sample;
- g_signal_emit_by_name(sink, "pull-sample", &sample);
-
- if (!sample)
- return GST_FLOW_ERROR;
-
- GstBuffer *buffer = gst_sample_get_buffer(sample);
-
- if (!buffer)
- goto cleanup_sample;
-
- const gsize size = gst_buffer_get_size(buffer);
- if (!size)
- goto cleanup_sample;
-
- GArray *array = NULL;
- GstMapInfo mapping;
- if (gst_buffer_map(buffer, &mapping, GST_MAP_READ))
- {
- if (mapping.size)
- array = g_array_insert_vals(
- g_array_sized_new(FALSE, FALSE, 1, mapping.size),
- 0,
- mapping.data,
- mapping.size
- );
-
- gst_buffer_unmap(buffer, &mapping);
- }
-
- if (!array)
- goto cleanup_sample;
-
- pthread_mutex_lock(&(info->mutex));
- info->samples = g_list_append(info->samples, array);
-
- if (!(info->sending_task))
- info->sending_task = util_immediate_add(
- G_SOURCE_FUNC(_new_sample_task),
- info
- );
-
- pthread_mutex_unlock(&(info->mutex));
-
-cleanup_sample:
- gst_sample_unref(sample);
- return GST_FLOW_OK;
-}
-
static void
_setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
{
g_assert(info);
info->record_pipeline = gst_parse_launch(
- "autoaudiosrc ! audioconvert ! rtpL16pay ! appsink name=sink",
+ "autoaudiosrc ! audioconvert ! rtpL16pay ! capsfilter name=filter ! fdsink name=sink",
NULL
);
@@ -296,6 +206,10 @@ _setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
GST_BIN(info->record_pipeline), "sink"
);
+ GstElement *filter = gst_bin_get_by_name(
+ GST_BIN(info->record_pipeline), "filter"
+ );
+
{
GstBus *bus = gst_element_get_bus(info->record_pipeline);
gst_bus_add_signal_watch(bus);
@@ -312,11 +226,13 @@ _setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
NULL
);
- g_object_set(info->record_sink, "emit-signals", TRUE, "caps", caps, NULL);
- g_signal_connect(info->record_sink, "new-sample", G_CALLBACK(_new_audio_sample), info);
-
+ g_object_set(filter, "caps", caps, NULL);
gst_caps_unref(caps);
+ const int fd = GNUNET_CHAT_discourse_get_fd(info->discourse);
+ if (-1 != fd)
+ g_object_set(info->record_sink, "fd", fd, NULL);
+
gst_element_set_state(info->record_pipeline, GST_STATE_PLAYING);
}