messenger-gtk

Gtk+3 graphical user interfaces for GNUnet Messenger
Log | Files | Refs | Submodules | README | LICENSE

discourse.c (28734B)


      1 /*
      2    This file is part of GNUnet.
      3    Copyright (C) 2024 GNUnet e.V.
      4 
      5    GNUnet is free software: you can redistribute it and/or modify it
      6    under the terms of the GNU Affero General Public License as published
      7    by the Free Software Foundation, either version 3 of the License,
      8    or (at your option) any later version.
      9 
     10    GNUnet is distributed in the hope that it will be useful, but
     11    WITHOUT ANY WARRANTY; without even the implied warranty of
     12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     13    Affero General Public License for more details.
     14 
     15    You should have received a copy of the GNU Affero General Public License
     16    along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18    SPDX-License-Identifier: AGPL3.0-or-later
     19  */
     20 /*
     21  * @author Tobias Frisch
     22  * @file discourse.c
     23  */
     24 
     25 #include "discourse.h"
     26 
     27 #include <glib-2.0/glib.h>
     28 #include <gnunet/gnunet_common.h>
     29 #include <gnunet/gnunet_chat_lib.h>
     30 #include <gstreamer-1.0/gst/gst.h>
     31 #include <gstreamer-1.0/gst/rtp/rtp.h>
     32 #include <pthread.h>
     33 #include <stdlib.h>
     34 
     35 const struct GNUNET_CHAT_DiscourseId*
     36 get_voice_discourse_id()
     37 {
     38   static enum GNUNET_GenericReturnValue init = GNUNET_NO;
     39   static struct GNUNET_CHAT_DiscourseId id;
     40 
     41   if (GNUNET_YES != init)
     42   {
     43     memset(&id, 0, sizeof(id));
     44     init = GNUNET_YES;
     45   }
     46 
     47   return &id;
     48 }
     49 
     50 const struct GNUNET_CHAT_DiscourseId*
     51 get_video_discourse_id()
     52 {
     53   static enum GNUNET_GenericReturnValue init = GNUNET_NO;
     54   static struct GNUNET_CHAT_DiscourseId id;
     55 
     56   if (GNUNET_YES != init)
     57   {
     58     memset(&id, 1, sizeof(id));
     59     init = GNUNET_YES;
     60   }
     61 
     62   return &id;
     63 }
     64 
     65 static void
     66 error_cb(GstBus *bus,
     67          GstMessage *msg,
     68          gpointer data)
     69 {
     70   GError *err;
     71   gchar *debug_info;
     72 
     73   gst_message_parse_error (msg, &err, &debug_info);
     74   g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
     75   g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
     76   g_clear_error (&err);
     77   g_free (debug_info);
     78 }
     79 
     80 static void
     81 _setup_audio_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info)
     82 {
     83   g_assert(info);
     84 
     85   info->audio_stream_source = gst_element_factory_make("appsrc", NULL);
     86   info->audio_jitter_buffer = gst_element_factory_make("rtpjitterbuffer", NULL);
     87   info->audio_depay = gst_element_factory_make("rtpL16depay", NULL);
     88   info->audio_converter = gst_element_factory_make("audioconvert", NULL);
     89 
     90   gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_NULL);
     91 
     92   gst_bin_add_many(
     93     GST_BIN(info->discourse->audio_mix_pipeline),
     94     info->audio_stream_source,
     95     info->audio_jitter_buffer,
     96     info->audio_depay,
     97     info->audio_converter,
     98     NULL
     99   );
    100 
    101   gst_element_link_many(
    102     info->audio_stream_source,
    103     info->audio_jitter_buffer,
    104     info->audio_depay,
    105     info->audio_converter,
    106     NULL
    107   );
    108 
    109   {
    110     GstCaps *caps = gst_caps_new_simple (
    111       "application/x-rtp",
    112       "media", G_TYPE_STRING, "audio",
    113       "encoding-name", G_TYPE_STRING, "L16",
    114       "payload", G_TYPE_INT, 11,
    115       "clock-rate", G_TYPE_INT, 44100,
    116       NULL
    117     );
    118 
    119     g_object_set(
    120       info->audio_stream_source,
    121       "format", GST_FORMAT_TIME,
    122       "caps", caps,
    123       "is-live", TRUE,
    124       NULL
    125     );
    126 
    127     gst_caps_unref(caps);
    128   }
    129 
    130   info->audio_mix_pad = gst_element_request_pad_simple(
    131     info->discourse->audio_mix_element, "sink_%u"
    132   );
    133 
    134   {
    135     GstPad *pad = gst_element_get_static_pad(
    136       info->audio_converter, "src"
    137     );
    138 
    139     g_object_set(info->audio_mix_pad, "mute", FALSE, "volume", 1.0, NULL);
    140     gst_pad_link(pad, info->audio_mix_pad);
    141   }
    142 
    143   gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_PLAYING);
    144 }
    145 
    146 static void
    147 _setup_video_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info)
    148 {
    149   g_assert(info);
    150 
    151   info->video_stream_pipeline = gst_parse_launch(
    152     "appsrc name=source ! rtpjitterbuffer ! rtph264depay ! avdec_h264 ! videoconvert ! "
    153     "gtksink name=sink sync=false",
    154     NULL
    155   );
    156 
    157   info->video_stream_source = gst_bin_get_by_name(
    158     GST_BIN(info->video_stream_pipeline), "source"
    159   );
    160 
    161   info->video_stream_sink = gst_bin_get_by_name(
    162     GST_BIN(info->video_stream_pipeline), "sink"
    163   );
    164 
    165   {
    166     GstBus *bus = gst_element_get_bus(info->video_stream_pipeline);
    167     gst_bus_add_signal_watch(bus);
    168     g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info);
    169     gst_object_unref(bus);
    170 
    171     GstCaps *caps = gst_caps_new_simple (
    172       "application/x-rtp",
    173       "media", G_TYPE_STRING, "video",
    174       "payload", G_TYPE_INT, 96,
    175       "clock-rate", G_TYPE_INT, 90000,
    176       "encoding-name", G_TYPE_STRING, "H264",
    177       NULL
    178     );
    179 
    180     g_object_set(
    181       info->video_stream_source,
    182       "format", GST_FORMAT_TIME,
    183       "caps", caps,
    184       "is-live", TRUE,
    185       NULL
    186     );
    187 
    188     gst_caps_unref(caps);
    189 
    190     gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL);
    191   }
    192 }
    193 
    194 static MESSENGER_DiscourseSubscriptionInfo*
    195 discourse_subscription_create_info(MESSENGER_DiscourseInfo *discourse,
    196                                    struct GNUNET_CHAT_Contact *contact)
    197 {
    198   g_assert((discourse) && (contact));
    199 
    200   MESSENGER_DiscourseSubscriptionInfo* info = g_malloc(
    201     sizeof(MESSENGER_DiscourseSubscriptionInfo)
    202   );
    203 
    204   if (!info)
    205     return NULL;
    206 
    207   info->discourse = discourse;
    208   info->contact = contact;
    209 
    210   info->audio_stream_source = NULL;
    211   info->audio_jitter_buffer = NULL;
    212   info->audio_depay = NULL;
    213   info->audio_converter = NULL;
    214 
    215   info->video_stream_pipeline = NULL;
    216   info->video_stream_source = NULL;
    217   info->video_stream_sink = NULL;
    218 
    219   info->audio_mix_pad = NULL;
    220   info->buffers = NULL;
    221 
    222   info->position = 0;
    223   info->last_timestamp = 0;
    224 
    225   pthread_mutex_init(&(info->mutex), NULL);
    226 
    227   info->end_datetime = NULL;
    228 
    229   const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id(
    230     info->discourse->discourse
    231   );
    232 
    233   if (0 == GNUNET_memcmp(id, get_voice_discourse_id()))
    234     _setup_audio_gst_pipelines_of_subscription(info);
    235   else if (0 == GNUNET_memcmp(id, get_video_discourse_id()))
    236     _setup_video_gst_pipelines_of_subscription(info);
    237 
    238   return info;
    239 }
    240 
    241 static void
    242 discourse_subscription_destroy_info(MESSENGER_DiscourseSubscriptionInfo *info)
    243 {
    244   g_assert(info);
    245 
    246   GList *buf = info->buffers;
    247   while (buf)
    248   {
    249     GstBuffer *buffer = (GstBuffer*) buf->data;
    250 
    251     if (buffer)
    252       gst_buffer_unref(buffer);
    253 
    254     buf = g_list_next(buf);
    255   }
    256 
    257   if (info->buffers)
    258     g_list_free(info->buffers);
    259 
    260   if ((info->audio_stream_source) || (info->audio_jitter_buffer) || 
    261       (info->audio_depay) || (info->audio_converter))
    262     gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_NULL);
    263 
    264   if (info->video_stream_pipeline)
    265   {
    266     gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL);
    267     gst_object_unref(GST_OBJECT(info->video_stream_pipeline));
    268   }
    269 
    270   if (info->audio_mix_pad)
    271   {
    272     GstPad *pad = gst_element_get_static_pad(
    273       info->audio_converter, "src"
    274     );
    275 
    276     gst_pad_unlink(pad, info->audio_mix_pad);
    277 
    278     gst_element_release_request_pad(info->discourse->audio_mix_element, info->audio_mix_pad);
    279     gst_object_unref(GST_OBJECT(info->audio_mix_pad));
    280   }
    281 
    282   if ((info->audio_stream_source) || (info->audio_jitter_buffer) || 
    283       (info->audio_depay) || (info->audio_converter))
    284   {
    285     gst_element_unlink_many(
    286       info->audio_stream_source,
    287       info->audio_jitter_buffer,
    288       info->audio_depay,
    289       info->audio_converter,
    290       NULL
    291     );
    292 
    293     gst_bin_remove_many(
    294       GST_BIN(info->discourse->audio_mix_pipeline),
    295       info->audio_stream_source,
    296       info->audio_jitter_buffer,
    297       info->audio_depay,
    298       info->audio_converter,
    299       NULL
    300     );
    301 
    302     gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_PLAYING);
    303   }
    304 
    305   pthread_mutex_lock(&(info->mutex));
    306 
    307   if (info->end_datetime)
    308   {
    309     g_date_time_unref(info->end_datetime);
    310     info->end_datetime = NULL;
    311   }
    312 
    313   pthread_mutex_unlock(&(info->mutex));
    314   pthread_mutex_destroy(&(info->mutex));
    315 
    316   g_free(info);
    317 }
    318 
    319 static void
    320 discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,
    321                                       const struct GNUNET_CHAT_Message *message)
    322 {
    323   g_assert((info) && (message));
    324 
    325   const uint64_t available = GNUNET_CHAT_message_available(message);
    326 
    327   if (!available)
    328     return;
    329 
    330   const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id(
    331     info->discourse->discourse
    332   );
    333 
    334   uint64_t clockrate = 0;
    335   GstElement *appsrc = NULL;
    336 
    337   if (0 == GNUNET_memcmp(id, get_voice_discourse_id()))
    338   {
    339     if (GNUNET_YES == GNUNET_CHAT_message_is_sent(message))
    340       return;
    341 
    342     clockrate = 44100;
    343     appsrc = info->audio_stream_source;
    344   }
    345   else if (0 == GNUNET_memcmp(id, get_video_discourse_id()))
    346   {
    347     clockrate = 90000;
    348     appsrc = info->video_stream_source;
    349   }
    350   else
    351     return;
    352 
    353   GstBuffer *buffer = gst_buffer_new_and_alloc(available);
    354   GstFlowReturn ret = GST_FLOW_ERROR;
    355 
    356   if (!buffer)
    357     return;
    358 
    359   GstMapInfo mapping;
    360   if (gst_buffer_map(buffer, &mapping, GST_MAP_WRITE))
    361   {
    362     if (mapping.size)
    363     {
    364       if (GNUNET_OK != GNUNET_CHAT_message_read(message, (char*) mapping.data, mapping.size))
    365         memset(mapping.data, 0, mapping.size);
    366     }
    367 
    368     gst_buffer_unmap(buffer, &mapping);
    369   }
    370   else
    371     goto skip_buffer;
    372 
    373   uint64_t timestamp = info->last_timestamp;
    374   uint32_t payload_len = 0;
    375 
    376   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    377   if (gst_rtp_buffer_map(buffer, GST_MAP_READ, &rtp))
    378   {
    379     const uint32_t rtp_timestamp = gst_rtp_buffer_get_timestamp(&rtp);
    380     payload_len = gst_rtp_buffer_get_payload_len(&rtp);
    381 
    382     timestamp = gst_rtp_buffer_ext_timestamp(&timestamp, rtp_timestamp);
    383     if (!timestamp)
    384       timestamp = rtp_timestamp;
    385 
    386     gst_rtp_buffer_unmap(&rtp);
    387   }
    388 
    389   if (payload_len)
    390     info->buffers = g_list_append(info->buffers, buffer);
    391   
    392   buffer = NULL;
    393 
    394   GDateTime *dt = g_date_time_new_now_local();
    395 
    396   if ((!payload_len) || (info->last_timestamp == timestamp) ||
    397       ((!(info->last_timestamp)) && (!(info->position))))
    398     goto skip_buffer;
    399 
    400   if (info->buffers)
    401     buffer = gst_buffer_new();
    402   
    403   GList *buf = info->buffers;
    404   while (buf)
    405   {
    406     GstBuffer *sub_buffer = (GstBuffer*) buf->data;
    407 
    408     if (sub_buffer)
    409     {
    410       gst_buffer_append_memory(
    411         buffer,
    412         gst_buffer_get_memory(sub_buffer, 0)
    413       );
    414 
    415       gst_buffer_unref(sub_buffer);
    416     }
    417 
    418     buf = g_list_next(buf);
    419   }
    420 
    421   if (info->buffers)
    422   {
    423     g_list_free(info->buffers);
    424     info->buffers = NULL;
    425   }
    426 
    427   const uint64_t duration = timestamp - info->last_timestamp;
    428 
    429   if (buffer)
    430   {
    431     GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(info->position, GST_SECOND, clockrate);
    432     GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(duration, GST_SECOND, clockrate);
    433 
    434     g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
    435   }
    436 
    437   if ((appsrc) && (!(info->position)))
    438     gst_element_set_state(appsrc, GST_STATE_PLAYING);
    439 
    440   info->position += duration;
    441 
    442   pthread_mutex_lock(&(info->mutex));
    443 
    444   if (info->end_datetime)
    445   {
    446     g_date_time_unref(info->end_datetime);
    447     info->end_datetime = NULL;
    448   }
    449 
    450   if (dt)
    451     info->end_datetime = g_date_time_add_seconds(dt, 0.1 + (gdouble) duration / clockrate);
    452 
    453   pthread_mutex_unlock(&(info->mutex));
    454 
    455 skip_buffer:
    456   if (payload_len)
    457     info->last_timestamp = timestamp;
    458   else if (dt)
    459   {
    460     pthread_mutex_lock(&(info->mutex));
    461 
    462     if (info->end_datetime)
    463       g_date_time_unref(info->end_datetime);
    464 
    465     info->end_datetime = g_date_time_add_seconds(dt, 0.1);
    466 
    467     pthread_mutex_unlock(&(info->mutex));
    468   }
    469 
    470   if (dt)
    471     g_date_time_unref(dt);
    472 
    473   if (buffer)
    474     gst_buffer_unref(buffer);
    475 
    476   if (GST_FLOW_OK != ret)
    477     return;
    478 }
    479 
    480 static gboolean
    481 _discourse_video_heartbeat(gpointer user_data)
    482 {
    483   MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) user_data;
    484 
    485   info->heartbeat = 0;
    486 
    487   GstBuffer *buffer = gst_buffer_new();
    488 
    489   if (!buffer)
    490     return FALSE;
    491 
    492   GstFlowReturn ret = GST_FLOW_ERROR;
    493   
    494   gst_rtp_buffer_allocate_data(
    495     buffer,
    496     0,
    497     0,
    498     0
    499   );
    500 
    501   g_signal_emit_by_name(
    502     info->video_heartbeat_source,
    503     "push-buffer",
    504     buffer,
    505     &ret
    506   );
    507 
    508   if (buffer)
    509     gst_buffer_unref(buffer);
    510 
    511   info->heartbeat = util_timeout_add(
    512     100,
    513     G_SOURCE_FUNC(_discourse_video_heartbeat),
    514     info
    515   );
    516 
    517   return FALSE;
    518 }
    519 
    520 static gboolean
    521 discourse_subscription_link_widget(MESSENGER_DiscourseSubscriptionInfo *info,
    522                                    GtkContainer *container)
    523 {
    524   g_assert(info);
    525 
    526   GtkWidget *widget;
    527   if (info->video_stream_sink)
    528     g_object_get(info->video_stream_sink, "widget", &widget, NULL);
    529   else
    530     widget = NULL;
    531 
    532   if (!widget)
    533     return FALSE;
    534 
    535   GtkWidget *parent = gtk_widget_get_parent(widget);
    536   if (parent)
    537   {
    538     GtkContainer *current = GTK_CONTAINER(parent);
    539 
    540     if (current == container)
    541     {
    542       g_object_unref(widget);
    543       return TRUE;
    544     }
    545 
    546     gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL);
    547 
    548     gtk_widget_hide(widget);
    549     gtk_widget_unrealize(widget);
    550 
    551     gtk_container_remove(
    552       current,
    553       widget
    554     );
    555   }
    556 
    557   if (container)
    558   {
    559     gtk_box_pack_start(
    560       GTK_BOX(container),
    561       widget,
    562       true,
    563       true,
    564       0
    565     );
    566   }
    567 
    568   g_object_unref(widget);
    569   
    570   if (container)
    571   {
    572     gtk_widget_realize(widget);
    573     gtk_widget_show_all(GTK_WIDGET(container));
    574 
    575     gst_element_set_state(info->video_stream_pipeline, GST_STATE_PLAYING);
    576   }
    577 
    578   return TRUE;
    579 }
    580 
    581 static void
    582 _setup_audio_gst_pipelines(MESSENGER_DiscourseInfo *info)
    583 {
    584   g_assert(info);
    585 
    586   info->audio_record_pipeline = gst_parse_launch(
    587     "autoaudiosrc ! audioconvert ! audio/x-raw,format=S16BE,layout=interleaved,rate=44100,channels=1 ! "
    588     "rtpL16pay ! capsfilter name=filter ! fdsink name=sink",
    589     NULL
    590   );
    591 
    592   info->audio_record_sink = gst_bin_get_by_name(
    593     GST_BIN(info->audio_record_pipeline), "sink"
    594   );
    595 
    596   GstElement *filter = gst_bin_get_by_name(
    597     GST_BIN(info->audio_record_pipeline), "filter"
    598   );
    599 
    600   {
    601     GstBus *bus = gst_element_get_bus(info->audio_record_pipeline);
    602     gst_bus_add_signal_watch(bus);
    603     g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info);
    604     gst_object_unref(bus);
    605 
    606     GstCaps *caps = gst_caps_new_simple (
    607       "application/x-rtp",
    608       "media", G_TYPE_STRING, "audio",
    609       "encoding-name", G_TYPE_STRING, "L16",
    610       "payload", G_TYPE_INT, 11,
    611       "clock-rate", G_TYPE_INT, 44100,
    612       NULL
    613     );
    614 
    615     g_object_set(filter, "caps", caps, NULL);
    616     gst_caps_unref(caps);
    617 
    618     const int fd = GNUNET_CHAT_discourse_get_fd(info->discourse);
    619     if (-1 != fd)
    620       g_object_set(info->audio_record_sink, "fd", fd, NULL);
    621 
    622     gst_element_set_state(info->audio_record_pipeline, GST_STATE_PLAYING);
    623   }
    624 
    625   info->audio_mix_pipeline = gst_parse_launch(
    626     "audiomixer name=mixer ! volume name=control ! autoaudiosink",
    627     NULL
    628   );
    629 
    630   info->audio_mix_element = gst_bin_get_by_name(
    631     GST_BIN(info->audio_mix_pipeline), "mixer"
    632   );
    633 
    634   info->audio_volume_element = gst_bin_get_by_name(
    635     GST_BIN(info->audio_mix_pipeline), "control"
    636   );
    637 
    638   {
    639     GstBus *bus = gst_element_get_bus(info->audio_mix_pipeline);
    640     gst_bus_add_signal_watch(bus);
    641     g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info);
    642     gst_object_unref(bus);
    643 
    644     gst_element_set_state(info->audio_mix_pipeline, GST_STATE_PLAYING);
    645   }
    646 }
    647 
    648 static void
    649 _setup_video_gst_pipelines(MESSENGER_DiscourseInfo *info)
    650 {
    651   g_assert(info);
    652 
    653   info->video_record_pipeline = gst_parse_launch(
    654     "pipewiresrc name=source ! "
    655     "video/x-raw,framerate={ [ 0/1, 30/1 ] } ! "
    656     "videoscale ! video/x-raw,height=[1,1280],width=[1,1280] ! "
    657     "videoconvert ! video/x-raw,format=I420 ! "
    658     "x264enc bitrate=1000 speed-preset=fast bframes=0 key-int-max=30 tune=zerolatency byte-stream=true ! "
    659     "video/x-h264,profile=baseline ! rtph264pay aggregate-mode=zero-latency mtu=45000 ! "
    660     "tee ! queue ! rtpmux name=mux ! capsfilter name=filter ! fdsink name=sink",
    661     NULL
    662   );
    663 
    664   info->video_record_source = gst_bin_get_by_name(
    665     GST_BIN(info->video_record_pipeline), "source"
    666   );
    667 
    668   info->video_record_sink = gst_bin_get_by_name(
    669     GST_BIN(info->video_record_pipeline), "sink"
    670   );
    671 
    672   GstElement *mux = gst_bin_get_by_name(
    673     GST_BIN(info->video_record_pipeline),
    674     "mux"
    675   );
    676 
    677   info->video_heartbeat_source = gst_element_factory_make("appsrc", NULL);
    678 
    679   {
    680     gst_bin_add(
    681       GST_BIN(info->video_record_pipeline),
    682       info->video_heartbeat_source
    683     );
    684 
    685     GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u");
    686 
    687     GstPad *pad = gst_element_get_static_pad(
    688       info->video_heartbeat_source, "src"
    689     );
    690 
    691     gst_pad_link(pad, mux_pad);
    692   }
    693 
    694   GstElement *filter = gst_bin_get_by_name(
    695     GST_BIN(info->video_record_pipeline), "filter"
    696   );
    697 
    698   {
    699     GstBus *bus = gst_element_get_bus(info->video_record_pipeline);
    700     gst_bus_add_signal_watch(bus);
    701     g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info);
    702     gst_object_unref(bus);
    703 
    704     GstCaps *caps = gst_caps_new_simple (
    705       "application/x-rtp",
    706       "media", G_TYPE_STRING, "video",
    707       "payload", G_TYPE_INT, 96,
    708       "clock-rate", G_TYPE_INT, 90000,
    709       "encoding-name", G_TYPE_STRING, "H264",
    710       NULL
    711     );
    712 
    713     g_object_set(info->video_heartbeat_source, "caps", caps, NULL);
    714     g_object_set(filter, "caps", caps, NULL);
    715     gst_caps_unref(caps);
    716 
    717     const int fd = GNUNET_CHAT_discourse_get_fd(info->discourse);
    718     if (-1 != fd)
    719       g_object_set(info->video_record_sink, "fd", fd, NULL);
    720 
    721     gst_element_set_state(info->video_record_pipeline, GST_STATE_NULL);
    722   }
    723 }
    724 
    725 enum GNUNET_GenericReturnValue
    726 discourse_create_info(struct GNUNET_CHAT_Discourse *discourse)
    727 {
    728   if ((!discourse) || (GNUNET_CHAT_discourse_get_user_pointer(discourse)))
    729     return GNUNET_NO;
    730 
    731   MESSENGER_DiscourseInfo* info = g_malloc(sizeof(MESSENGER_DiscourseInfo));
    732 
    733   if (!info)
    734     return GNUNET_NO;
    735 
    736   info->discourse = discourse;
    737 
    738   info->audio_record_pipeline = NULL;
    739   info->audio_record_sink = NULL;
    740 
    741   info->video_record_pipeline = NULL;
    742   info->video_record_source = NULL;
    743   info->video_record_sink = NULL;
    744   info->video_heartbeat_source = NULL;
    745 
    746   info->audio_mix_pipeline = NULL;
    747   info->audio_mix_element = NULL;
    748   info->audio_volume_element = NULL;
    749 
    750   pthread_mutex_init(&(info->mutex), NULL);
    751 
    752   info->heartbeat = 0;
    753   info->subscriptions = NULL;
    754 
    755   const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id(
    756     info->discourse
    757   );
    758 
    759   if (0 == GNUNET_memcmp(id, get_voice_discourse_id()))
    760      _setup_audio_gst_pipelines(info);
    761   else if (0 == GNUNET_memcmp(id, get_video_discourse_id()))
    762     _setup_video_gst_pipelines(info);
    763 
    764   GNUNET_CHAT_discourse_set_user_pointer(discourse, info);
    765   return GNUNET_YES;
    766 }
    767 
    768 void
    769 discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse)
    770 {
    771   g_assert(discourse);
    772 
    773   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
    774 
    775   if (!info)
    776     return;
    777 
    778   pthread_mutex_lock(&(info->mutex));
    779 
    780   if (info->subscriptions)
    781   {
    782     MESSENGER_DiscourseSubscriptionInfo *sub_info;
    783     GList *sub = info->subscriptions;
    784     while (sub)
    785     {
    786       sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
    787       discourse_subscription_destroy_info(sub_info);
    788       sub = g_list_next(sub);
    789     }
    790 
    791     g_list_free(info->subscriptions);
    792   }
    793 
    794   pthread_mutex_unlock(&(info->mutex));
    795 
    796   if (info->heartbeat)
    797     util_source_remove(info->heartbeat);
    798 
    799   if (info->video_record_pipeline)
    800   {
    801     gst_element_set_state(info->video_record_pipeline, GST_STATE_NULL);
    802 
    803     GstElement *mux = gst_bin_get_by_name(
    804       GST_BIN(info->video_record_pipeline),
    805       "mux"
    806     );
    807 
    808     GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u");
    809     GstPad *pad = gst_element_get_static_pad(
    810       info->video_heartbeat_source, "src"
    811     );
    812 
    813     gst_pad_unlink(pad, mux_pad);
    814 
    815     gst_bin_remove(
    816       GST_BIN(info->video_record_pipeline),
    817       info->video_heartbeat_source
    818     );
    819 
    820     gst_object_unref(GST_OBJECT(info->video_record_pipeline));
    821   }
    822 
    823   if (info->audio_mix_pipeline)
    824   {
    825     gst_element_set_state(info->audio_mix_pipeline, GST_STATE_NULL);
    826     gst_object_unref(GST_OBJECT(info->audio_mix_pipeline));
    827   }
    828 
    829   if (info->audio_record_pipeline)
    830   {
    831     gst_element_set_state(info->audio_record_pipeline, GST_STATE_NULL);
    832     gst_object_unref(GST_OBJECT(info->audio_record_pipeline));
    833   }
    834 
    835   pthread_mutex_destroy(&(info->mutex));
    836 
    837   g_free(info);
    838 
    839   GNUNET_CHAT_discourse_set_user_pointer(discourse, NULL);
    840 }
    841 
    842 static enum GNUNET_GenericReturnValue
    843 _append_contact_to_subscription_list(void *cls,
    844                                      struct GNUNET_CHAT_Discourse *discourse,
    845                                      struct GNUNET_CHAT_Contact *contact)
    846 {
    847   g_assert((cls) && (discourse) && (contact));
    848 
    849   GList **list = cls;
    850   *list = g_list_append(*list, contact);
    851   return GNUNET_YES;
    852 }
    853 
    854 void
    855 discourse_update_subscriptions(struct GNUNET_CHAT_Discourse *discourse)
    856 {
    857   g_assert(discourse);
    858 
    859   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
    860 
    861   if (!info)
    862     return;
    863 
    864   GList *list = NULL;
    865   GNUNET_CHAT_discourse_iterate_contacts(
    866     info->discourse,
    867     _append_contact_to_subscription_list,
    868     &list
    869   );
    870 
    871   pthread_mutex_lock(&(info->mutex));
    872 
    873   GList *sub = info->subscriptions;
    874   MESSENGER_DiscourseSubscriptionInfo *sub_info;
    875 
    876   GList *drop = NULL;
    877 
    878   while (sub)
    879   {
    880     GList *link = sub;
    881 
    882     sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (link->data);
    883     sub = g_list_next(sub);
    884 
    885     if (g_list_find(list, sub_info->contact))
    886     {
    887       list = g_list_remove(list, sub_info->contact);
    888       continue;
    889     }
    890 
    891     GList *rest = g_list_remove_link(info->subscriptions, link);
    892 
    893     if (!drop)
    894       drop = link;
    895     else
    896       drop = g_list_concat(drop, link);
    897 
    898     if (!rest)
    899       break;
    900   }
    901 
    902   sub = drop;
    903   while (sub)
    904   {
    905     sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
    906 
    907     discourse_subscription_destroy_info(sub_info);
    908     sub = g_list_next(sub);
    909   }
    910 
    911   if (drop)
    912     g_list_free(drop);
    913 
    914   sub = list;
    915   while (sub)
    916   {
    917     sub_info = discourse_subscription_create_info(
    918       info, (struct GNUNET_CHAT_Contact*) sub->data
    919     );
    920 
    921     if (sub_info)
    922       info->subscriptions = g_list_append(
    923         info->subscriptions, sub_info
    924       );
    925     
    926     sub = g_list_next(sub);
    927   }
    928 
    929   pthread_mutex_unlock(&(info->mutex));
    930 
    931   if (list)
    932     g_list_free(list);
    933 }
    934 
    935 void
    936 discourse_stream_message(struct GNUNET_CHAT_Discourse *discourse,
    937                          const struct GNUNET_CHAT_Message *message)
    938 {
    939   g_assert((discourse) && (message));
    940 
    941   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
    942 
    943   if (!info)
    944     return;
    945 
    946   pthread_mutex_lock(&(info->mutex));
    947 
    948   GList *sub = info->subscriptions;
    949   MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL;
    950 
    951   while (sub)
    952   {
    953     sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
    954     if (GNUNET_CHAT_message_get_sender(message) == sub_info->contact)
    955       break;
    956 
    957     sub = g_list_next(sub);
    958   }
    959 
    960   if (sub_info)
    961     discourse_subscription_stream_message(sub_info, message);
    962 
    963   pthread_mutex_unlock(&(info->mutex));
    964 }
    965 
    966 bool
    967 discourse_has_controls(struct GNUNET_CHAT_Discourse *discourse,
    968                        MESSENGER_DiscourseControl control)
    969 {
    970   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
    971 
    972   if (!info)
    973     return FALSE;
    974 
    975   switch (control)
    976   {
    977     case MESSENGER_DISCOURSE_CTRL_MICROPHONE:
    978       return (info->audio_record_pipeline? TRUE : FALSE);
    979     case MESSENGER_DISCOURSE_CTRL_SPEAKERS:
    980       return (info->audio_mix_pipeline? TRUE : FALSE);
    981     case MESSENGER_DISCOURSE_CTRL_WEBCAM:
    982       return (info->video_record_pipeline? TRUE : FALSE);
    983     case MESSENGER_DISCOURSE_CTRL_SCREEN_CAPTURE:
    984       return (info->video_record_pipeline? TRUE : FALSE);
    985     default:
    986       return FALSE;
    987   }
    988 }
    989 
    990 void
    991 discourse_set_volume(struct GNUNET_CHAT_Discourse *discourse,
    992                      double volume)
    993 {
    994   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
    995 
    996   if ((!info) || (!(info->audio_mix_pipeline)) || (!(info->audio_volume_element)))
    997     return;
    998 
    999   g_object_set(info->audio_volume_element, "volume", volume, NULL);
   1000 }
   1001 
   1002 double
   1003 discourse_get_volume(struct GNUNET_CHAT_Discourse *discourse)
   1004 {
   1005   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1006 
   1007   if ((!info) || (!(info->audio_mix_pipeline)) || (!(info->audio_volume_element)))
   1008     return 0.0;
   1009 
   1010   gdouble volume;
   1011   g_object_get(info->audio_volume_element, "volume", &volume, NULL);
   1012 
   1013   return volume;
   1014 }
   1015 
   1016 void
   1017 discourse_set_mute(struct GNUNET_CHAT_Discourse *discourse,
   1018                    bool mute)
   1019 {
   1020   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1021 
   1022   if (!info)
   1023     return;
   1024 
   1025   if ((mute) && (info->heartbeat))
   1026   {
   1027     util_source_remove(info->heartbeat);
   1028     info->heartbeat = 0;
   1029   }
   1030 
   1031   const GstState state = mute? GST_STATE_NULL : GST_STATE_PLAYING;
   1032 
   1033   if (info->audio_record_pipeline)
   1034     gst_element_set_state(info->audio_record_pipeline, state);
   1035 
   1036   if (info->video_record_pipeline)
   1037   {
   1038     gst_element_set_state(info->video_record_pipeline, state);
   1039 
   1040     if ((!mute) && (!(info->heartbeat)))
   1041       info->heartbeat = util_idle_add(
   1042         G_SOURCE_FUNC(_discourse_video_heartbeat),
   1043         info
   1044       );
   1045   }
   1046 }
   1047 
   1048 bool
   1049 discourse_is_mute(const struct GNUNET_CHAT_Discourse *discourse)
   1050 {
   1051   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1052 
   1053   if (!info)
   1054     return TRUE;
   1055 
   1056   GstState state = GST_STATE_NULL;
   1057 
   1058   if (info->audio_record_pipeline)
   1059     gst_element_get_state(
   1060       info->audio_record_pipeline,
   1061       &state,
   1062       NULL,
   1063       GST_CLOCK_TIME_NONE
   1064     );
   1065   
   1066   if (info->video_record_pipeline)
   1067     gst_element_get_state(
   1068       info->video_record_pipeline,
   1069       &state,
   1070       NULL,
   1071       GST_CLOCK_TIME_NONE
   1072     );
   1073 
   1074   return (GST_STATE_PLAYING != state);
   1075 }
   1076 
   1077 void
   1078 discourse_set_target(struct GNUNET_CHAT_Discourse *discourse,
   1079                      const char *name)
   1080 {
   1081   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1082 
   1083   if (!info)
   1084     return;
   1085 
   1086   if (info->video_record_source)
   1087     g_object_set(
   1088       G_OBJECT(info->video_record_source),
   1089       "target-object",
   1090       name,
   1091       NULL
   1092     );
   1093 }
   1094 
   1095 gboolean
   1096 discourse_link_widget(const struct GNUNET_CHAT_Discourse *discourse,
   1097                       const struct GNUNET_CHAT_Contact *contact,
   1098                       GtkContainer *container)
   1099 {
   1100   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1101 
   1102   if (!info)
   1103     return FALSE;
   1104 
   1105   pthread_mutex_lock(&(info->mutex));
   1106 
   1107   GList *sub = info->subscriptions;
   1108   MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL;
   1109 
   1110   while (sub)
   1111   {
   1112     sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
   1113     if ((sub_info) && (contact == sub_info->contact))
   1114       break;
   1115 
   1116     sub_info = NULL;
   1117     sub = g_list_next(sub);
   1118   }
   1119 
   1120   gboolean linked = FALSE;
   1121   if (sub_info)
   1122     linked = discourse_subscription_link_widget(sub_info, container);
   1123 
   1124   pthread_mutex_unlock(&(info->mutex));
   1125   return linked;
   1126 }
   1127 
   1128 gboolean
   1129 discourse_is_active(const struct GNUNET_CHAT_Discourse *discourse,
   1130                     const struct GNUNET_CHAT_Contact *contact)
   1131 {
   1132   MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse);
   1133 
   1134   if (!info)
   1135     return FALSE;
   1136 
   1137   pthread_mutex_lock(&(info->mutex));
   1138 
   1139   GList *sub = info->subscriptions;
   1140   MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL;
   1141 
   1142   while (sub)
   1143   {
   1144     sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data);
   1145     if ((sub_info) && (contact == sub_info->contact))
   1146       break;
   1147 
   1148     sub_info = NULL;
   1149     sub = g_list_next(sub);
   1150   }
   1151 
   1152   gboolean active = FALSE;
   1153   if (!sub_info)
   1154     goto unlock_info_mutex;
   1155 
   1156   GstState state = GST_STATE_NULL;
   1157 
   1158   if (sub_info->audio_stream_source)
   1159     gst_element_get_state(
   1160       sub_info->audio_stream_source,
   1161       &state,
   1162       NULL,
   1163       GST_CLOCK_TIME_NONE
   1164     );
   1165   
   1166   if (sub_info->video_stream_source)
   1167     gst_element_get_state(
   1168       sub_info->video_stream_source,
   1169       &state,
   1170       NULL,
   1171       GST_CLOCK_TIME_NONE
   1172     );
   1173 
   1174   if (GST_STATE_PLAYING != state)
   1175     goto unlock_info_mutex;
   1176 
   1177   pthread_mutex_lock(&(sub_info->mutex));
   1178 
   1179   if (!(sub_info->end_datetime))
   1180     goto unlock_sub_info_mutex;
   1181 
   1182   GDateTime *dt = g_date_time_new_now_local();
   1183   if (dt)
   1184   {
   1185     GTimeSpan ts = g_date_time_difference(sub_info->end_datetime, dt);
   1186     g_date_time_unref(dt);
   1187     active = (ts >= 0);
   1188   }
   1189 
   1190 unlock_sub_info_mutex:
   1191   pthread_mutex_unlock(&(sub_info->mutex));
   1192 
   1193 unlock_info_mutex:
   1194   pthread_mutex_unlock(&(info->mutex));
   1195   return active;
   1196 }