commit a11fef539efb477a4ccd223150715f1792d1b5b5
parent d8b35ac19fe6bfff949a915fd0603a5dd1a38dfd
Author: Jacki <jacki@thejackimonster.de>
Date: Mon, 24 Jun 2024 23:47:51 +0200
Complete implementation of basic voice chat pipeline
Signed-off-by: Jacki <jacki@thejackimonster.de>
Diffstat:
3 files changed, 337 insertions(+), 41 deletions(-)
diff --git a/src/discourse.c b/src/discourse.c
@@ -27,25 +27,167 @@
#include <gnunet/gnunet_common.h>
#include <gnunet/gnunet_chat_lib.h>
#include <gstreamer-1.0/gst/gst.h>
+#include <stdlib.h>
+
+static void
+error_cb(GstBus *bus,
+ GstMessage *msg,
+ gpointer data)
+{
+ GError *err;
+ gchar *debug_info;
+
+ gst_message_parse_error (msg, &err, &debug_info);
+ g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
+ g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
+ g_clear_error (&err);
+ g_free (debug_info);
+}
static void
_setup_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info)
{
g_assert(info);
- // TODO: Create stream_pipeline per subscription in discourse!
+ info->stream_src = gst_element_factory_make("appsrc", "src");
+ info->decoder = gst_element_factory_make("rtpL16depay", "decoder");
+ info->converter = gst_element_factory_make("audioconvert", "audio");
- info->stream_pipeline = gst_parse_launch(
- "appsrc name=src ! rtpL16depay ! audioconvert",
- NULL
+ gst_bin_add_many(GST_BIN(info->discourse->mix_pipeline), info->stream_src, info->decoder, info->converter, NULL);
+ gst_element_link_many(info->stream_src, info->decoder, info->converter, NULL);
+
+ {
+ GstCaps *caps = gst_caps_new_simple (
+ "application/x-rtp",
+ "media", G_TYPE_STRING, "audio",
+ "payload", G_TYPE_INT, 96,
+ "clock-rate", G_TYPE_INT, 44100,
+ "encoding-name", G_TYPE_STRING, "L16",
+ "channels", G_TYPE_INT, 1,
+ NULL
+ );
+
+ g_object_set(info->stream_src, "format", GST_FORMAT_TIME, "caps", caps, NULL);
+ gst_caps_unref(caps);
+ }
+
+ info->mix_pad = gst_element_request_pad_simple(
+ info->discourse->mix_element, "sink_%u"
);
- info->stream_src = gst_bin_get_by_name(
- GST_BIN(info->stream_pipeline), "src"
+ {
+ GstPad *pad = gst_element_get_static_pad(
+ info->converter, "src"
+ );
+
+ g_object_set(info->mix_pad, "mute", FALSE, "volume", 1.0, NULL);
+ gst_pad_link(pad, info->mix_pad);
+ }
+
+ gst_element_sync_state_with_parent(info->stream_src);
+ gst_element_sync_state_with_parent(info->decoder);
+ gst_element_sync_state_with_parent(info->converter);
+}
+
+static MESSENGER_DiscourseSubscriptionInfo*
+discourse_subscription_create_info(MESSENGER_DiscourseInfo *discourse,
+ struct GNUNET_CHAT_Contact *contact)
+{
+ g_assert((discourse) && (contact));
+
+ MESSENGER_DiscourseSubscriptionInfo* info = g_malloc(
+ sizeof(MESSENGER_DiscourseSubscriptionInfo)
);
- // TODO: Connect stream_pipeline to audiomixer element of mix_pipeline!
- // https://gstreamer.freedesktop.org/documentation/audiomixer/audiomixer.html?gi-language=c
+ if (!info)
+ return NULL;
+
+ info->discourse = discourse;
+ info->contact = contact;
+
+ info->stream_src = NULL;
+ info->decoder = NULL;
+ info->converter = NULL;
+
+ info->mix_pad = NULL;
+
+ info->position = 0;
+
+ _setup_gst_pipelines_of_subscription(info);
+ return info;
+}
+
+static void
+discourse_subscription_destroy_info(MESSENGER_DiscourseSubscriptionInfo *info)
+{
+ g_assert(info);
+
+ gst_element_set_state(info->stream_src, GST_STATE_NULL);
+ gst_element_set_state(info->decoder, GST_STATE_NULL);
+ gst_element_set_state(info->converter, GST_STATE_NULL);
+
+ if (info->mix_pad)
+ {
+ GstPad *pad = gst_element_get_static_pad(
+ info->converter, "src"
+ );
+
+ gst_pad_unlink(pad, info->mix_pad);
+
+ gst_element_release_request_pad(info->discourse->mix_element, info->mix_pad);
+ gst_object_unref(GST_OBJECT(info->mix_pad));
+ }
+
+ gst_element_unlink_many(info->stream_src, info->decoder, info->converter, NULL);
+ gst_bin_remove_many(GST_BIN(info->discourse->mix_pipeline), info->stream_src, info->decoder, info->converter, NULL);
+
+ g_free(info);
+}
+
+static void
+discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,
+ const struct GNUNET_CHAT_Message *message)
+{
+ g_assert((info) && (message));
+
+ const uint64_t available = GNUNET_CHAT_message_available(message);
+
+ if (!available)
+ return;
+
+ const uint64_t samples = available / 2;
+
+ GstBuffer *buffer = gst_buffer_new_and_alloc(available);
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ if (!buffer)
+ return;
+
+ GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(info->position, GST_SECOND, 44100);
+ GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(samples, GST_SECOND, 44100);
+
+ GstMapInfo mapping;
+ if (gst_buffer_map(buffer, &mapping, GST_MAP_WRITE))
+ {
+ if (mapping.size)
+ {
+ if (GNUNET_OK != GNUNET_CHAT_message_read(message, (char*) mapping.data, mapping.size))
+ memset(mapping.data, 0, mapping.size);
+ }
+
+ gst_buffer_unmap(buffer, &mapping);
+ }
+ else
+ goto skip_buffer;
+
+ g_signal_emit_by_name(info->stream_src, "push-buffer", buffer, &ret);
+ info->position += samples;
+
+skip_buffer:
+ gst_buffer_unref(buffer);
+
+ if (GST_FLOW_OK != ret)
+ return;
}
static GstFlowReturn
@@ -66,14 +208,16 @@ _new_audio_sample(GstElement *sink, gpointer data)
if (!buffer)
goto cleanup_sample;
- gsize size = gst_buffer_get_size(buffer);
+ const gsize size = gst_buffer_get_size(buffer);
if (!size)
goto cleanup_sample;
GstMapInfo mapping;
if (gst_buffer_map(buffer, &mapping, GST_MAP_READ))
{
- GNUNET_CHAT_discourse_write(info->discourse, (const char*) mapping.data, mapping.size);
+ if (mapping.size)
+ GNUNET_CHAT_discourse_write(info->discourse, (const char*) mapping.data, mapping.size);
+
gst_buffer_unmap(buffer, &mapping);
}
@@ -82,30 +226,11 @@ cleanup_sample:
return GST_FLOW_OK;
}
-/* This function is called when an error message is posted on the bus */
-static void
-error_cb(GstBus *bus, GstMessage *msg, gpointer data)
-{
- GError *err;
- gchar *debug_info;
-
- /* Print error details on the screen */
- gst_message_parse_error (msg, &err, &debug_info);
- g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
- g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
- g_clear_error (&err);
- g_free (debug_info);
-}
-
static void
_setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
{
g_assert(info);
- // TODO: Use record_pipeline to record and send data via discourse!
- // https://gstreamer.freedesktop.org/documentation/tutorials/basic/short-cutting-the-pipeline.html?gi-language=c
- // https://gstreamer.freedesktop.org/documentation/app/appsink.html?gi-language=c
-
info->record_pipeline = gst_parse_launch(
"autoaudiosrc ! audioconvert ! rtpL16pay ! appsink name=sink",
NULL
@@ -139,8 +264,6 @@ _setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
gst_element_set_state(info->record_pipeline, GST_STATE_PLAYING);
}
- // TODO: Have mix_pipeline in background while being subscribed to discourse!
-
info->mix_pipeline = gst_parse_launch(
"audiomixer name=mixer ! autoaudiosink",
NULL
@@ -149,6 +272,15 @@ _setup_gst_pipelines(MESSENGER_DiscourseInfo *info)
info->mix_element = gst_bin_get_by_name(
GST_BIN(info->mix_pipeline), "mixer"
);
+
+ {
+ GstBus *bus = gst_element_get_bus(info->mix_pipeline);
+ gst_bus_add_signal_watch(bus);
+ g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info);
+ gst_object_unref(bus);
+
+ gst_element_set_state(info->mix_pipeline, GST_STATE_PLAYING);
+ }
}
enum GNUNET_GenericReturnValue
@@ -167,6 +299,8 @@ discourse_create_info(struct GNUNET_CHAT_Discourse *discourse)
info->mix_pipeline = NULL;
info->mix_element = NULL;
+ info->subscriptions = NULL;
+
_setup_gst_pipelines(info);
GNUNET_CHAT_discourse_set_user_pointer(discourse, info);
@@ -183,10 +317,18 @@ discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse)
if (!info)
return;
- if (info->record_pipeline)
+ if (info->subscriptions)
{
- gst_element_set_state(info->record_pipeline, GST_STATE_NULL);
- gst_object_unref(GST_OBJECT(info->record_pipeline));
+ MESSENGER_DiscourseSubscriptionInfo *sub_info;
+ GList *sub = info->subscriptions;
+ while (sub)
+ {
+ sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
+ discourse_subscription_destroy_info(sub_info);
+ sub = g_list_next(sub);
+ }
+
+ g_list_free(info->subscriptions);
}
if (info->mix_pipeline)
@@ -195,7 +337,131 @@ discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse)
gst_object_unref(GST_OBJECT(info->mix_pipeline));
}
+ if (info->record_pipeline)
+ {
+ gst_element_set_state(info->record_pipeline, GST_STATE_NULL);
+ gst_object_unref(GST_OBJECT(info->record_pipeline));
+ }
+
g_free(info);
GNUNET_CHAT_discourse_set_user_pointer(discourse, NULL);
}
+
+static enum GNUNET_GenericReturnValue
+_append_contact_to_subscription_list(void *cls,
+ const struct GNUNET_CHAT_Discourse *discourse,
+ struct GNUNET_CHAT_Contact *contact)
+{
+ g_assert((cls) && (discourse) && (contact));
+
+ GList **list = cls;
+
+ if (GNUNET_YES == GNUNET_CHAT_contact_is_owned(contact))
+ return GNUNET_YES;
+
+ *list = g_list_append(*list, contact);
+ return GNUNET_YES;
+}
+
+void
+discourse_update_subscriptions(struct GNUNET_CHAT_Discourse *discourse)
+{
+ g_assert(discourse);
+
+ MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
+
+ if (!info)
+ return;
+
+ GList *list = NULL;
+ GNUNET_CHAT_discourse_iterate_contacts(
+ info->discourse,
+ _append_contact_to_subscription_list,
+ &list
+ );
+
+ GList *sub = info->subscriptions;
+ MESSENGER_DiscourseSubscriptionInfo *sub_info;
+
+ GList *drop = NULL;
+
+ while (sub)
+ {
+ GList *link = sub;
+
+ sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (link->data);
+ sub = g_list_next(sub);
+
+ if (g_list_find(list, sub_info->contact))
+ {
+ list = g_list_remove(list, sub_info->contact);
+ continue;
+ }
+
+ link = g_list_remove_link(info->subscriptions, link);
+
+ if (!drop)
+ drop = link;
+ else
+ drop = g_list_concat(drop, link);
+ }
+
+ sub = drop;
+ while (sub)
+ {
+ sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
+ discourse_subscription_destroy_info(sub_info);
+ sub = g_list_next(sub);
+ }
+
+ if (drop)
+ g_list_free(drop);
+
+ sub = list;
+ while (sub)
+ {
+ sub_info = discourse_subscription_create_info(
+ info, (struct GNUNET_CHAT_Contact*) sub->data
+ );
+
+ if (sub_info)
+ info->subscriptions = g_list_append(
+ info->subscriptions, sub_info
+ );
+
+ sub = g_list_next(sub);
+ }
+
+ if (list)
+ g_list_free(list);
+}
+
+void
+discourse_stream_message(struct GNUNET_CHAT_Discourse *discourse,
+ const struct GNUNET_CHAT_Message *message)
+{
+ g_assert((discourse) && (message));
+
+ MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
+
+ if (!info)
+ return;
+
+ GList *sub = info->subscriptions;
+ MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL;
+
+ while (sub)
+ {
+ sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
+ if (GNUNET_CHAT_message_get_sender(message) == sub_info->contact)
+ break;
+
+ sub = g_list_next(sub);
+ }
+
+ if (!sub_info)
+ return;
+
+ discourse_subscription_stream_message(sub_info, message);
+}
diff --git a/src/discourse.h b/src/discourse.h
@@ -27,14 +27,9 @@
#include "application.h"
+#include <glib-2.0/glib.h>
#include <gnunet/gnunet_chat_lib.h>
-typedef struct MESSENGER_DiscourseSubscriptionInfo
-{
- GstElement *stream_pipeline;
- GstElement *stream_src;
-} MESSENGER_DiscourseSubscriptionInfo;
-
typedef struct MESSENGER_DiscourseInfo
{
struct GNUNET_CHAT_Discourse *discourse;
@@ -44,12 +39,35 @@ typedef struct MESSENGER_DiscourseInfo
GstElement *mix_pipeline;
GstElement *mix_element;
+
+ GList *subscriptions;
} MESSENGER_DiscourseInfo;
+typedef struct MESSENGER_DiscourseSubscriptionInfo
+{
+ MESSENGER_DiscourseInfo *discourse;
+ struct GNUNET_CHAT_Contact *contact;
+
+ GstElement *stream_src;
+ GstElement *decoder;
+ GstElement *converter;
+
+ GstPad *mix_pad;
+
+ uint64_t position;
+} MESSENGER_DiscourseSubscriptionInfo;
+
enum GNUNET_GenericReturnValue
discourse_create_info(struct GNUNET_CHAT_Discourse *discourse);
void
discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse);
+void
+discourse_update_subscriptions(struct GNUNET_CHAT_Discourse *discourse);
+
+void
+discourse_stream_message(struct GNUNET_CHAT_Discourse *discourse,
+ const struct GNUNET_CHAT_Message *message);
+
#endif /* DISCOURSE_H_ */
diff --git a/src/event.c b/src/event.c
@@ -1025,6 +1025,9 @@ event_discourse(MESSENGER_Application *app,
msg
);
+ if (!discourse)
+ return;
+
if (GNUNET_YES == GNUNET_CHAT_message_is_sent(msg))
{
if (GNUNET_YES == GNUNET_CHAT_discourse_is_open(discourse))
@@ -1033,6 +1036,8 @@ event_discourse(MESSENGER_Application *app,
discourse_destroy_info(discourse);
}
+ discourse_update_subscriptions(discourse);
+
if (context == app->ui.discourse.context)
ui_discourse_window_update(&(app->ui.discourse), context);
}
@@ -1044,8 +1049,15 @@ event_discourse_data(MESSENGER_Application *app,
{
g_assert((app) && (context) && (msg));
+ struct GNUNET_CHAT_Discourse *discourse = GNUNET_CHAT_message_get_discourse(
+ msg
+ );
+
+ if (!discourse)
+ return;
+
if (GNUNET_YES == GNUNET_CHAT_message_is_sent(msg))
return;
- // TODO
+ discourse_stream_message(discourse, msg);
}