commit 9a1631e0c57d9e927e51740a90dc1ebd7300fb71
parent 6c8633fb2c677030d84df89587def575c74ee960
Author: Jacki <jacki@thejackimonster.de>
Date: Mon, 8 Jul 2024 16:38:33 +0200
Synchronize GStreamer pulling samples with GTK events using mutex
Signed-off-by: Jacki <jacki@thejackimonster.de>
Diffstat:
2 files changed, 88 insertions(+), 1 deletion(-)
diff --git a/src/discourse.c b/src/discourse.c
@@ -24,9 +24,11 @@
#include "discourse.h"
+#include <glib-2.0/glib.h>
#include <gnunet/gnunet_common.h>
#include <gnunet/gnunet_chat_lib.h>
#include <gstreamer-1.0/gst/gst.h>
+#include <pthread.h>
#include <stdlib.h>
static void
@@ -190,6 +192,40 @@ skip_buffer:
return;
}
+static gboolean
+_new_sample_task(gpointer user_data)
+{
+ MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) user_data;
+
+ pthread_mutex_lock(&(info->mutex));
+ info->sending_task = 0;
+
+ GList *list = info->samples;
+ while (list)
+ {
+ GArray *array = (GArray*) (list->data);
+
+ if (array)
+ {
+ GNUNET_CHAT_discourse_write(
+ info->discourse,
+ (const char*) array->data,
+ array->len
+ );
+
+ g_array_free(array, TRUE);
+ }
+
+ list = g_list_next(list);
+ }
+
+ g_list_free(info->samples);
+ info->samples = NULL;
+
+ pthread_mutex_unlock(&(info->mutex));
+ return FALSE;
+}
+
static GstFlowReturn
_new_audio_sample(GstElement *sink, gpointer data)
{
@@ -212,15 +248,35 @@ _new_audio_sample(GstElement *sink, gpointer data)
if (!size)
goto cleanup_sample;
+ GArray *array = NULL;
GstMapInfo mapping;
if (gst_buffer_map(buffer, &mapping, GST_MAP_READ))
{
if (mapping.size)
- GNUNET_CHAT_discourse_write(info->discourse, (const char*) mapping.data, mapping.size);
+ array = g_array_insert_vals(
+ g_array_sized_new(FALSE, FALSE, 1, mapping.size),
+ 0,
+ mapping.data,
+ mapping.size
+ );
gst_buffer_unmap(buffer, &mapping);
}
+ if (!array)
+ goto cleanup_sample;
+
+ pthread_mutex_lock(&(info->mutex));
+ info->samples = g_list_append(info->samples, array);
+
+ if (!(info->sending_task))
+ info->sending_task = util_immediate_add(
+ G_SOURCE_FUNC(_new_sample_task),
+ info
+ );
+
+ pthread_mutex_unlock(&(info->mutex));
+
cleanup_sample:
gst_sample_unref(sample);
return GST_FLOW_OK;
@@ -307,6 +363,10 @@ discourse_create_info(struct GNUNET_CHAT_Discourse *discourse)
info->mix_element = NULL;
info->volume_element = NULL;
+ info->sending_task = 0;
+ pthread_mutex_init(&(info->mutex), NULL);
+
+ info->samples = NULL;
info->subscriptions = NULL;
_setup_gst_pipelines(info);
@@ -351,6 +411,28 @@ discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse)
gst_object_unref(GST_OBJECT(info->record_pipeline));
}
+ pthread_mutex_lock(&(info->mutex));
+
+ if (info->samples)
+ {
+ GArray *array;
+ GList *list = info->samples;
+ while (list)
+ {
+ array = (GArray*) (list->data);
+ g_array_free(array, TRUE);
+ list = g_list_next(list);
+ }
+
+ g_list_free(info->samples);
+ }
+
+ if (info->sending_task)
+ util_source_remove(info->sending_task);
+
+ pthread_mutex_unlock(&(info->mutex));
+ pthread_mutex_destroy(&(info->mutex));
+
g_free(info);
GNUNET_CHAT_discourse_set_user_pointer(discourse, NULL);
diff --git a/src/discourse.h b/src/discourse.h
@@ -29,6 +29,7 @@
#include <glib-2.0/glib.h>
#include <gnunet/gnunet_chat_lib.h>
+#include <pthread.h>
typedef struct MESSENGER_DiscourseInfo
{
@@ -41,6 +42,10 @@ typedef struct MESSENGER_DiscourseInfo
GstElement *mix_element;
GstElement *volume_element;
+ guint sending_task;
+ pthread_mutex_t mutex;
+
+ GList *samples;
GList *subscriptions;
} MESSENGER_DiscourseInfo;