discourse.c (28734B)
1 /* 2 This file is part of GNUnet. 3 Copyright (C) 2024 GNUnet e.V. 4 5 GNUnet is free software: you can redistribute it and/or modify it 6 under the terms of the GNU Affero General Public License as published 7 by the Free Software Foundation, either version 3 of the License, 8 or (at your option) any later version. 9 10 GNUnet is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 Affero General Public License for more details. 14 15 You should have received a copy of the GNU Affero General Public License 16 along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 SPDX-License-Identifier: AGPL3.0-or-later 19 */ 20 /* 21 * @author Tobias Frisch 22 * @file discourse.c 23 */ 24 25 #include "discourse.h" 26 27 #include <glib-2.0/glib.h> 28 #include <gnunet/gnunet_common.h> 29 #include <gnunet/gnunet_chat_lib.h> 30 #include <gstreamer-1.0/gst/gst.h> 31 #include <gstreamer-1.0/gst/rtp/rtp.h> 32 #include <pthread.h> 33 #include <stdlib.h> 34 35 const struct GNUNET_CHAT_DiscourseId* 36 get_voice_discourse_id() 37 { 38 static enum GNUNET_GenericReturnValue init = GNUNET_NO; 39 static struct GNUNET_CHAT_DiscourseId id; 40 41 if (GNUNET_YES != init) 42 { 43 memset(&id, 0, sizeof(id)); 44 init = GNUNET_YES; 45 } 46 47 return &id; 48 } 49 50 const struct GNUNET_CHAT_DiscourseId* 51 get_video_discourse_id() 52 { 53 static enum GNUNET_GenericReturnValue init = GNUNET_NO; 54 static struct GNUNET_CHAT_DiscourseId id; 55 56 if (GNUNET_YES != init) 57 { 58 memset(&id, 1, sizeof(id)); 59 init = GNUNET_YES; 60 } 61 62 return &id; 63 } 64 65 static void 66 error_cb(GstBus *bus, 67 GstMessage *msg, 68 gpointer data) 69 { 70 GError *err; 71 gchar *debug_info; 72 73 gst_message_parse_error (msg, &err, &debug_info); 74 g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message); 75 g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none"); 76 g_clear_error (&err); 77 g_free (debug_info); 78 } 79 80 static void 81 _setup_audio_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info) 82 { 83 g_assert(info); 84 85 info->audio_stream_source = gst_element_factory_make("appsrc", NULL); 86 info->audio_jitter_buffer = gst_element_factory_make("rtpjitterbuffer", NULL); 87 info->audio_depay = gst_element_factory_make("rtpL16depay", NULL); 88 info->audio_converter = gst_element_factory_make("audioconvert", NULL); 89 90 gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_NULL); 91 92 gst_bin_add_many( 93 GST_BIN(info->discourse->audio_mix_pipeline), 94 info->audio_stream_source, 95 info->audio_jitter_buffer, 96 info->audio_depay, 97 info->audio_converter, 98 NULL 99 ); 100 101 gst_element_link_many( 102 info->audio_stream_source, 103 info->audio_jitter_buffer, 104 info->audio_depay, 105 info->audio_converter, 106 NULL 107 ); 108 109 { 110 GstCaps *caps = gst_caps_new_simple ( 111 "application/x-rtp", 112 "media", G_TYPE_STRING, "audio", 113 "encoding-name", G_TYPE_STRING, "L16", 114 "payload", G_TYPE_INT, 11, 115 "clock-rate", G_TYPE_INT, 44100, 116 NULL 117 ); 118 119 g_object_set( 120 info->audio_stream_source, 121 "format", GST_FORMAT_TIME, 122 "caps", caps, 123 "is-live", TRUE, 124 NULL 125 ); 126 127 gst_caps_unref(caps); 128 } 129 130 info->audio_mix_pad = gst_element_request_pad_simple( 131 info->discourse->audio_mix_element, "sink_%u" 132 ); 133 134 { 135 GstPad *pad = gst_element_get_static_pad( 136 info->audio_converter, "src" 137 ); 138 139 g_object_set(info->audio_mix_pad, "mute", FALSE, "volume", 1.0, NULL); 140 gst_pad_link(pad, info->audio_mix_pad); 141 } 142 143 gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_PLAYING); 144 } 145 146 static void 147 _setup_video_gst_pipelines_of_subscription(MESSENGER_DiscourseSubscriptionInfo *info) 148 { 149 g_assert(info); 150 151 info->video_stream_pipeline = gst_parse_launch( 152 "appsrc name=source ! rtpjitterbuffer ! rtph264depay ! avdec_h264 ! videoconvert ! " 153 "gtksink name=sink sync=false", 154 NULL 155 ); 156 157 info->video_stream_source = gst_bin_get_by_name( 158 GST_BIN(info->video_stream_pipeline), "source" 159 ); 160 161 info->video_stream_sink = gst_bin_get_by_name( 162 GST_BIN(info->video_stream_pipeline), "sink" 163 ); 164 165 { 166 GstBus *bus = gst_element_get_bus(info->video_stream_pipeline); 167 gst_bus_add_signal_watch(bus); 168 g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info); 169 gst_object_unref(bus); 170 171 GstCaps *caps = gst_caps_new_simple ( 172 "application/x-rtp", 173 "media", G_TYPE_STRING, "video", 174 "payload", G_TYPE_INT, 96, 175 "clock-rate", G_TYPE_INT, 90000, 176 "encoding-name", G_TYPE_STRING, "H264", 177 NULL 178 ); 179 180 g_object_set( 181 info->video_stream_source, 182 "format", GST_FORMAT_TIME, 183 "caps", caps, 184 "is-live", TRUE, 185 NULL 186 ); 187 188 gst_caps_unref(caps); 189 190 gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL); 191 } 192 } 193 194 static MESSENGER_DiscourseSubscriptionInfo* 195 discourse_subscription_create_info(MESSENGER_DiscourseInfo *discourse, 196 struct GNUNET_CHAT_Contact *contact) 197 { 198 g_assert((discourse) && (contact)); 199 200 MESSENGER_DiscourseSubscriptionInfo* info = g_malloc( 201 sizeof(MESSENGER_DiscourseSubscriptionInfo) 202 ); 203 204 if (!info) 205 return NULL; 206 207 info->discourse = discourse; 208 info->contact = contact; 209 210 info->audio_stream_source = NULL; 211 info->audio_jitter_buffer = NULL; 212 info->audio_depay = NULL; 213 info->audio_converter = NULL; 214 215 info->video_stream_pipeline = NULL; 216 info->video_stream_source = NULL; 217 info->video_stream_sink = NULL; 218 219 info->audio_mix_pad = NULL; 220 info->buffers = NULL; 221 222 info->position = 0; 223 info->last_timestamp = 0; 224 225 pthread_mutex_init(&(info->mutex), NULL); 226 227 info->end_datetime = NULL; 228 229 const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id( 230 info->discourse->discourse 231 ); 232 233 if (0 == GNUNET_memcmp(id, get_voice_discourse_id())) 234 _setup_audio_gst_pipelines_of_subscription(info); 235 else if (0 == GNUNET_memcmp(id, get_video_discourse_id())) 236 _setup_video_gst_pipelines_of_subscription(info); 237 238 return info; 239 } 240 241 static void 242 discourse_subscription_destroy_info(MESSENGER_DiscourseSubscriptionInfo *info) 243 { 244 g_assert(info); 245 246 GList *buf = info->buffers; 247 while (buf) 248 { 249 GstBuffer *buffer = (GstBuffer*) buf->data; 250 251 if (buffer) 252 gst_buffer_unref(buffer); 253 254 buf = g_list_next(buf); 255 } 256 257 if (info->buffers) 258 g_list_free(info->buffers); 259 260 if ((info->audio_stream_source) || (info->audio_jitter_buffer) || 261 (info->audio_depay) || (info->audio_converter)) 262 gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_NULL); 263 264 if (info->video_stream_pipeline) 265 { 266 gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL); 267 gst_object_unref(GST_OBJECT(info->video_stream_pipeline)); 268 } 269 270 if (info->audio_mix_pad) 271 { 272 GstPad *pad = gst_element_get_static_pad( 273 info->audio_converter, "src" 274 ); 275 276 gst_pad_unlink(pad, info->audio_mix_pad); 277 278 gst_element_release_request_pad(info->discourse->audio_mix_element, info->audio_mix_pad); 279 gst_object_unref(GST_OBJECT(info->audio_mix_pad)); 280 } 281 282 if ((info->audio_stream_source) || (info->audio_jitter_buffer) || 283 (info->audio_depay) || (info->audio_converter)) 284 { 285 gst_element_unlink_many( 286 info->audio_stream_source, 287 info->audio_jitter_buffer, 288 info->audio_depay, 289 info->audio_converter, 290 NULL 291 ); 292 293 gst_bin_remove_many( 294 GST_BIN(info->discourse->audio_mix_pipeline), 295 info->audio_stream_source, 296 info->audio_jitter_buffer, 297 info->audio_depay, 298 info->audio_converter, 299 NULL 300 ); 301 302 gst_element_set_state(info->discourse->audio_mix_pipeline, GST_STATE_PLAYING); 303 } 304 305 pthread_mutex_lock(&(info->mutex)); 306 307 if (info->end_datetime) 308 { 309 g_date_time_unref(info->end_datetime); 310 info->end_datetime = NULL; 311 } 312 313 pthread_mutex_unlock(&(info->mutex)); 314 pthread_mutex_destroy(&(info->mutex)); 315 316 g_free(info); 317 } 318 319 static void 320 discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info, 321 const struct GNUNET_CHAT_Message *message) 322 { 323 g_assert((info) && (message)); 324 325 const uint64_t available = GNUNET_CHAT_message_available(message); 326 327 if (!available) 328 return; 329 330 const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id( 331 info->discourse->discourse 332 ); 333 334 uint64_t clockrate = 0; 335 GstElement *appsrc = NULL; 336 337 if (0 == GNUNET_memcmp(id, get_voice_discourse_id())) 338 { 339 if (GNUNET_YES == GNUNET_CHAT_message_is_sent(message)) 340 return; 341 342 clockrate = 44100; 343 appsrc = info->audio_stream_source; 344 } 345 else if (0 == GNUNET_memcmp(id, get_video_discourse_id())) 346 { 347 clockrate = 90000; 348 appsrc = info->video_stream_source; 349 } 350 else 351 return; 352 353 GstBuffer *buffer = gst_buffer_new_and_alloc(available); 354 GstFlowReturn ret = GST_FLOW_ERROR; 355 356 if (!buffer) 357 return; 358 359 GstMapInfo mapping; 360 if (gst_buffer_map(buffer, &mapping, GST_MAP_WRITE)) 361 { 362 if (mapping.size) 363 { 364 if (GNUNET_OK != GNUNET_CHAT_message_read(message, (char*) mapping.data, mapping.size)) 365 memset(mapping.data, 0, mapping.size); 366 } 367 368 gst_buffer_unmap(buffer, &mapping); 369 } 370 else 371 goto skip_buffer; 372 373 uint64_t timestamp = info->last_timestamp; 374 uint32_t payload_len = 0; 375 376 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; 377 if (gst_rtp_buffer_map(buffer, GST_MAP_READ, &rtp)) 378 { 379 const uint32_t rtp_timestamp = gst_rtp_buffer_get_timestamp(&rtp); 380 payload_len = gst_rtp_buffer_get_payload_len(&rtp); 381 382 timestamp = gst_rtp_buffer_ext_timestamp(×tamp, rtp_timestamp); 383 if (!timestamp) 384 timestamp = rtp_timestamp; 385 386 gst_rtp_buffer_unmap(&rtp); 387 } 388 389 if (payload_len) 390 info->buffers = g_list_append(info->buffers, buffer); 391 392 buffer = NULL; 393 394 GDateTime *dt = g_date_time_new_now_local(); 395 396 if ((!payload_len) || (info->last_timestamp == timestamp) || 397 ((!(info->last_timestamp)) && (!(info->position)))) 398 goto skip_buffer; 399 400 if (info->buffers) 401 buffer = gst_buffer_new(); 402 403 GList *buf = info->buffers; 404 while (buf) 405 { 406 GstBuffer *sub_buffer = (GstBuffer*) buf->data; 407 408 if (sub_buffer) 409 { 410 gst_buffer_append_memory( 411 buffer, 412 gst_buffer_get_memory(sub_buffer, 0) 413 ); 414 415 gst_buffer_unref(sub_buffer); 416 } 417 418 buf = g_list_next(buf); 419 } 420 421 if (info->buffers) 422 { 423 g_list_free(info->buffers); 424 info->buffers = NULL; 425 } 426 427 const uint64_t duration = timestamp - info->last_timestamp; 428 429 if (buffer) 430 { 431 GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(info->position, GST_SECOND, clockrate); 432 GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(duration, GST_SECOND, clockrate); 433 434 g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret); 435 } 436 437 if ((appsrc) && (!(info->position))) 438 gst_element_set_state(appsrc, GST_STATE_PLAYING); 439 440 info->position += duration; 441 442 pthread_mutex_lock(&(info->mutex)); 443 444 if (info->end_datetime) 445 { 446 g_date_time_unref(info->end_datetime); 447 info->end_datetime = NULL; 448 } 449 450 if (dt) 451 info->end_datetime = g_date_time_add_seconds(dt, 0.1 + (gdouble) duration / clockrate); 452 453 pthread_mutex_unlock(&(info->mutex)); 454 455 skip_buffer: 456 if (payload_len) 457 info->last_timestamp = timestamp; 458 else if (dt) 459 { 460 pthread_mutex_lock(&(info->mutex)); 461 462 if (info->end_datetime) 463 g_date_time_unref(info->end_datetime); 464 465 info->end_datetime = g_date_time_add_seconds(dt, 0.1); 466 467 pthread_mutex_unlock(&(info->mutex)); 468 } 469 470 if (dt) 471 g_date_time_unref(dt); 472 473 if (buffer) 474 gst_buffer_unref(buffer); 475 476 if (GST_FLOW_OK != ret) 477 return; 478 } 479 480 static gboolean 481 _discourse_video_heartbeat(gpointer user_data) 482 { 483 MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) user_data; 484 485 info->heartbeat = 0; 486 487 GstBuffer *buffer = gst_buffer_new(); 488 489 if (!buffer) 490 return FALSE; 491 492 GstFlowReturn ret = GST_FLOW_ERROR; 493 494 gst_rtp_buffer_allocate_data( 495 buffer, 496 0, 497 0, 498 0 499 ); 500 501 g_signal_emit_by_name( 502 info->video_heartbeat_source, 503 "push-buffer", 504 buffer, 505 &ret 506 ); 507 508 if (buffer) 509 gst_buffer_unref(buffer); 510 511 info->heartbeat = util_timeout_add( 512 100, 513 G_SOURCE_FUNC(_discourse_video_heartbeat), 514 info 515 ); 516 517 return FALSE; 518 } 519 520 static gboolean 521 discourse_subscription_link_widget(MESSENGER_DiscourseSubscriptionInfo *info, 522 GtkContainer *container) 523 { 524 g_assert(info); 525 526 GtkWidget *widget; 527 if (info->video_stream_sink) 528 g_object_get(info->video_stream_sink, "widget", &widget, NULL); 529 else 530 widget = NULL; 531 532 if (!widget) 533 return FALSE; 534 535 GtkWidget *parent = gtk_widget_get_parent(widget); 536 if (parent) 537 { 538 GtkContainer *current = GTK_CONTAINER(parent); 539 540 if (current == container) 541 { 542 g_object_unref(widget); 543 return TRUE; 544 } 545 546 gst_element_set_state(info->video_stream_pipeline, GST_STATE_NULL); 547 548 gtk_widget_hide(widget); 549 gtk_widget_unrealize(widget); 550 551 gtk_container_remove( 552 current, 553 widget 554 ); 555 } 556 557 if (container) 558 { 559 gtk_box_pack_start( 560 GTK_BOX(container), 561 widget, 562 true, 563 true, 564 0 565 ); 566 } 567 568 g_object_unref(widget); 569 570 if (container) 571 { 572 gtk_widget_realize(widget); 573 gtk_widget_show_all(GTK_WIDGET(container)); 574 575 gst_element_set_state(info->video_stream_pipeline, GST_STATE_PLAYING); 576 } 577 578 return TRUE; 579 } 580 581 static void 582 _setup_audio_gst_pipelines(MESSENGER_DiscourseInfo *info) 583 { 584 g_assert(info); 585 586 info->audio_record_pipeline = gst_parse_launch( 587 "autoaudiosrc ! audioconvert ! audio/x-raw,format=S16BE,layout=interleaved,rate=44100,channels=1 ! " 588 "rtpL16pay ! capsfilter name=filter ! fdsink name=sink", 589 NULL 590 ); 591 592 info->audio_record_sink = gst_bin_get_by_name( 593 GST_BIN(info->audio_record_pipeline), "sink" 594 ); 595 596 GstElement *filter = gst_bin_get_by_name( 597 GST_BIN(info->audio_record_pipeline), "filter" 598 ); 599 600 { 601 GstBus *bus = gst_element_get_bus(info->audio_record_pipeline); 602 gst_bus_add_signal_watch(bus); 603 g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info); 604 gst_object_unref(bus); 605 606 GstCaps *caps = gst_caps_new_simple ( 607 "application/x-rtp", 608 "media", G_TYPE_STRING, "audio", 609 "encoding-name", G_TYPE_STRING, "L16", 610 "payload", G_TYPE_INT, 11, 611 "clock-rate", G_TYPE_INT, 44100, 612 NULL 613 ); 614 615 g_object_set(filter, "caps", caps, NULL); 616 gst_caps_unref(caps); 617 618 const int fd = GNUNET_CHAT_discourse_get_fd(info->discourse); 619 if (-1 != fd) 620 g_object_set(info->audio_record_sink, "fd", fd, NULL); 621 622 gst_element_set_state(info->audio_record_pipeline, GST_STATE_PLAYING); 623 } 624 625 info->audio_mix_pipeline = gst_parse_launch( 626 "audiomixer name=mixer ! volume name=control ! autoaudiosink", 627 NULL 628 ); 629 630 info->audio_mix_element = gst_bin_get_by_name( 631 GST_BIN(info->audio_mix_pipeline), "mixer" 632 ); 633 634 info->audio_volume_element = gst_bin_get_by_name( 635 GST_BIN(info->audio_mix_pipeline), "control" 636 ); 637 638 { 639 GstBus *bus = gst_element_get_bus(info->audio_mix_pipeline); 640 gst_bus_add_signal_watch(bus); 641 g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info); 642 gst_object_unref(bus); 643 644 gst_element_set_state(info->audio_mix_pipeline, GST_STATE_PLAYING); 645 } 646 } 647 648 static void 649 _setup_video_gst_pipelines(MESSENGER_DiscourseInfo *info) 650 { 651 g_assert(info); 652 653 info->video_record_pipeline = gst_parse_launch( 654 "pipewiresrc name=source ! " 655 "video/x-raw,framerate={ [ 0/1, 30/1 ] } ! " 656 "videoscale ! video/x-raw,height=[1,1280],width=[1,1280] ! " 657 "videoconvert ! video/x-raw,format=I420 ! " 658 "x264enc bitrate=1000 speed-preset=fast bframes=0 key-int-max=30 tune=zerolatency byte-stream=true ! " 659 "video/x-h264,profile=baseline ! rtph264pay aggregate-mode=zero-latency mtu=45000 ! " 660 "tee ! queue ! rtpmux name=mux ! capsfilter name=filter ! fdsink name=sink", 661 NULL 662 ); 663 664 info->video_record_source = gst_bin_get_by_name( 665 GST_BIN(info->video_record_pipeline), "source" 666 ); 667 668 info->video_record_sink = gst_bin_get_by_name( 669 GST_BIN(info->video_record_pipeline), "sink" 670 ); 671 672 GstElement *mux = gst_bin_get_by_name( 673 GST_BIN(info->video_record_pipeline), 674 "mux" 675 ); 676 677 info->video_heartbeat_source = gst_element_factory_make("appsrc", NULL); 678 679 { 680 gst_bin_add( 681 GST_BIN(info->video_record_pipeline), 682 info->video_heartbeat_source 683 ); 684 685 GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u"); 686 687 GstPad *pad = gst_element_get_static_pad( 688 info->video_heartbeat_source, "src" 689 ); 690 691 gst_pad_link(pad, mux_pad); 692 } 693 694 GstElement *filter = gst_bin_get_by_name( 695 GST_BIN(info->video_record_pipeline), "filter" 696 ); 697 698 { 699 GstBus *bus = gst_element_get_bus(info->video_record_pipeline); 700 gst_bus_add_signal_watch(bus); 701 g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, info); 702 gst_object_unref(bus); 703 704 GstCaps *caps = gst_caps_new_simple ( 705 "application/x-rtp", 706 "media", G_TYPE_STRING, "video", 707 "payload", G_TYPE_INT, 96, 708 "clock-rate", G_TYPE_INT, 90000, 709 "encoding-name", G_TYPE_STRING, "H264", 710 NULL 711 ); 712 713 g_object_set(info->video_heartbeat_source, "caps", caps, NULL); 714 g_object_set(filter, "caps", caps, NULL); 715 gst_caps_unref(caps); 716 717 const int fd = GNUNET_CHAT_discourse_get_fd(info->discourse); 718 if (-1 != fd) 719 g_object_set(info->video_record_sink, "fd", fd, NULL); 720 721 gst_element_set_state(info->video_record_pipeline, GST_STATE_NULL); 722 } 723 } 724 725 enum GNUNET_GenericReturnValue 726 discourse_create_info(struct GNUNET_CHAT_Discourse *discourse) 727 { 728 if ((!discourse) || (GNUNET_CHAT_discourse_get_user_pointer(discourse))) 729 return GNUNET_NO; 730 731 MESSENGER_DiscourseInfo* info = g_malloc(sizeof(MESSENGER_DiscourseInfo)); 732 733 if (!info) 734 return GNUNET_NO; 735 736 info->discourse = discourse; 737 738 info->audio_record_pipeline = NULL; 739 info->audio_record_sink = NULL; 740 741 info->video_record_pipeline = NULL; 742 info->video_record_source = NULL; 743 info->video_record_sink = NULL; 744 info->video_heartbeat_source = NULL; 745 746 info->audio_mix_pipeline = NULL; 747 info->audio_mix_element = NULL; 748 info->audio_volume_element = NULL; 749 750 pthread_mutex_init(&(info->mutex), NULL); 751 752 info->heartbeat = 0; 753 info->subscriptions = NULL; 754 755 const struct GNUNET_CHAT_DiscourseId *id = GNUNET_CHAT_discourse_get_id( 756 info->discourse 757 ); 758 759 if (0 == GNUNET_memcmp(id, get_voice_discourse_id())) 760 _setup_audio_gst_pipelines(info); 761 else if (0 == GNUNET_memcmp(id, get_video_discourse_id())) 762 _setup_video_gst_pipelines(info); 763 764 GNUNET_CHAT_discourse_set_user_pointer(discourse, info); 765 return GNUNET_YES; 766 } 767 768 void 769 discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse) 770 { 771 g_assert(discourse); 772 773 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 774 775 if (!info) 776 return; 777 778 pthread_mutex_lock(&(info->mutex)); 779 780 if (info->subscriptions) 781 { 782 MESSENGER_DiscourseSubscriptionInfo *sub_info; 783 GList *sub = info->subscriptions; 784 while (sub) 785 { 786 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data); 787 discourse_subscription_destroy_info(sub_info); 788 sub = g_list_next(sub); 789 } 790 791 g_list_free(info->subscriptions); 792 } 793 794 pthread_mutex_unlock(&(info->mutex)); 795 796 if (info->heartbeat) 797 util_source_remove(info->heartbeat); 798 799 if (info->video_record_pipeline) 800 { 801 gst_element_set_state(info->video_record_pipeline, GST_STATE_NULL); 802 803 GstElement *mux = gst_bin_get_by_name( 804 GST_BIN(info->video_record_pipeline), 805 "mux" 806 ); 807 808 GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u"); 809 GstPad *pad = gst_element_get_static_pad( 810 info->video_heartbeat_source, "src" 811 ); 812 813 gst_pad_unlink(pad, mux_pad); 814 815 gst_bin_remove( 816 GST_BIN(info->video_record_pipeline), 817 info->video_heartbeat_source 818 ); 819 820 gst_object_unref(GST_OBJECT(info->video_record_pipeline)); 821 } 822 823 if (info->audio_mix_pipeline) 824 { 825 gst_element_set_state(info->audio_mix_pipeline, GST_STATE_NULL); 826 gst_object_unref(GST_OBJECT(info->audio_mix_pipeline)); 827 } 828 829 if (info->audio_record_pipeline) 830 { 831 gst_element_set_state(info->audio_record_pipeline, GST_STATE_NULL); 832 gst_object_unref(GST_OBJECT(info->audio_record_pipeline)); 833 } 834 835 pthread_mutex_destroy(&(info->mutex)); 836 837 g_free(info); 838 839 GNUNET_CHAT_discourse_set_user_pointer(discourse, NULL); 840 } 841 842 static enum GNUNET_GenericReturnValue 843 _append_contact_to_subscription_list(void *cls, 844 struct GNUNET_CHAT_Discourse *discourse, 845 struct GNUNET_CHAT_Contact *contact) 846 { 847 g_assert((cls) && (discourse) && (contact)); 848 849 GList **list = cls; 850 *list = g_list_append(*list, contact); 851 return GNUNET_YES; 852 } 853 854 void 855 discourse_update_subscriptions(struct GNUNET_CHAT_Discourse *discourse) 856 { 857 g_assert(discourse); 858 859 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 860 861 if (!info) 862 return; 863 864 GList *list = NULL; 865 GNUNET_CHAT_discourse_iterate_contacts( 866 info->discourse, 867 _append_contact_to_subscription_list, 868 &list 869 ); 870 871 pthread_mutex_lock(&(info->mutex)); 872 873 GList *sub = info->subscriptions; 874 MESSENGER_DiscourseSubscriptionInfo *sub_info; 875 876 GList *drop = NULL; 877 878 while (sub) 879 { 880 GList *link = sub; 881 882 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (link->data); 883 sub = g_list_next(sub); 884 885 if (g_list_find(list, sub_info->contact)) 886 { 887 list = g_list_remove(list, sub_info->contact); 888 continue; 889 } 890 891 GList *rest = g_list_remove_link(info->subscriptions, link); 892 893 if (!drop) 894 drop = link; 895 else 896 drop = g_list_concat(drop, link); 897 898 if (!rest) 899 break; 900 } 901 902 sub = drop; 903 while (sub) 904 { 905 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data); 906 907 discourse_subscription_destroy_info(sub_info); 908 sub = g_list_next(sub); 909 } 910 911 if (drop) 912 g_list_free(drop); 913 914 sub = list; 915 while (sub) 916 { 917 sub_info = discourse_subscription_create_info( 918 info, (struct GNUNET_CHAT_Contact*) sub->data 919 ); 920 921 if (sub_info) 922 info->subscriptions = g_list_append( 923 info->subscriptions, sub_info 924 ); 925 926 sub = g_list_next(sub); 927 } 928 929 pthread_mutex_unlock(&(info->mutex)); 930 931 if (list) 932 g_list_free(list); 933 } 934 935 void 936 discourse_stream_message(struct GNUNET_CHAT_Discourse *discourse, 937 const struct GNUNET_CHAT_Message *message) 938 { 939 g_assert((discourse) && (message)); 940 941 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 942 943 if (!info) 944 return; 945 946 pthread_mutex_lock(&(info->mutex)); 947 948 GList *sub = info->subscriptions; 949 MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL; 950 951 while (sub) 952 { 953 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data); 954 if (GNUNET_CHAT_message_get_sender(message) == sub_info->contact) 955 break; 956 957 sub = g_list_next(sub); 958 } 959 960 if (sub_info) 961 discourse_subscription_stream_message(sub_info, message); 962 963 pthread_mutex_unlock(&(info->mutex)); 964 } 965 966 bool 967 discourse_has_controls(struct GNUNET_CHAT_Discourse *discourse, 968 MESSENGER_DiscourseControl control) 969 { 970 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 971 972 if (!info) 973 return FALSE; 974 975 switch (control) 976 { 977 case MESSENGER_DISCOURSE_CTRL_MICROPHONE: 978 return (info->audio_record_pipeline? TRUE : FALSE); 979 case MESSENGER_DISCOURSE_CTRL_SPEAKERS: 980 return (info->audio_mix_pipeline? TRUE : FALSE); 981 case MESSENGER_DISCOURSE_CTRL_WEBCAM: 982 return (info->video_record_pipeline? TRUE : FALSE); 983 case MESSENGER_DISCOURSE_CTRL_SCREEN_CAPTURE: 984 return (info->video_record_pipeline? TRUE : FALSE); 985 default: 986 return FALSE; 987 } 988 } 989 990 void 991 discourse_set_volume(struct GNUNET_CHAT_Discourse *discourse, 992 double volume) 993 { 994 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 995 996 if ((!info) || (!(info->audio_mix_pipeline)) || (!(info->audio_volume_element))) 997 return; 998 999 g_object_set(info->audio_volume_element, "volume", volume, NULL); 1000 } 1001 1002 double 1003 discourse_get_volume(struct GNUNET_CHAT_Discourse *discourse) 1004 { 1005 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1006 1007 if ((!info) || (!(info->audio_mix_pipeline)) || (!(info->audio_volume_element))) 1008 return 0.0; 1009 1010 gdouble volume; 1011 g_object_get(info->audio_volume_element, "volume", &volume, NULL); 1012 1013 return volume; 1014 } 1015 1016 void 1017 discourse_set_mute(struct GNUNET_CHAT_Discourse *discourse, 1018 bool mute) 1019 { 1020 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1021 1022 if (!info) 1023 return; 1024 1025 if ((mute) && (info->heartbeat)) 1026 { 1027 util_source_remove(info->heartbeat); 1028 info->heartbeat = 0; 1029 } 1030 1031 const GstState state = mute? GST_STATE_NULL : GST_STATE_PLAYING; 1032 1033 if (info->audio_record_pipeline) 1034 gst_element_set_state(info->audio_record_pipeline, state); 1035 1036 if (info->video_record_pipeline) 1037 { 1038 gst_element_set_state(info->video_record_pipeline, state); 1039 1040 if ((!mute) && (!(info->heartbeat))) 1041 info->heartbeat = util_idle_add( 1042 G_SOURCE_FUNC(_discourse_video_heartbeat), 1043 info 1044 ); 1045 } 1046 } 1047 1048 bool 1049 discourse_is_mute(const struct GNUNET_CHAT_Discourse *discourse) 1050 { 1051 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1052 1053 if (!info) 1054 return TRUE; 1055 1056 GstState state = GST_STATE_NULL; 1057 1058 if (info->audio_record_pipeline) 1059 gst_element_get_state( 1060 info->audio_record_pipeline, 1061 &state, 1062 NULL, 1063 GST_CLOCK_TIME_NONE 1064 ); 1065 1066 if (info->video_record_pipeline) 1067 gst_element_get_state( 1068 info->video_record_pipeline, 1069 &state, 1070 NULL, 1071 GST_CLOCK_TIME_NONE 1072 ); 1073 1074 return (GST_STATE_PLAYING != state); 1075 } 1076 1077 void 1078 discourse_set_target(struct GNUNET_CHAT_Discourse *discourse, 1079 const char *name) 1080 { 1081 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1082 1083 if (!info) 1084 return; 1085 1086 if (info->video_record_source) 1087 g_object_set( 1088 G_OBJECT(info->video_record_source), 1089 "target-object", 1090 name, 1091 NULL 1092 ); 1093 } 1094 1095 gboolean 1096 discourse_link_widget(const struct GNUNET_CHAT_Discourse *discourse, 1097 const struct GNUNET_CHAT_Contact *contact, 1098 GtkContainer *container) 1099 { 1100 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1101 1102 if (!info) 1103 return FALSE; 1104 1105 pthread_mutex_lock(&(info->mutex)); 1106 1107 GList *sub = info->subscriptions; 1108 MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL; 1109 1110 while (sub) 1111 { 1112 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data); 1113 if ((sub_info) && (contact == sub_info->contact)) 1114 break; 1115 1116 sub_info = NULL; 1117 sub = g_list_next(sub); 1118 } 1119 1120 gboolean linked = FALSE; 1121 if (sub_info) 1122 linked = discourse_subscription_link_widget(sub_info, container); 1123 1124 pthread_mutex_unlock(&(info->mutex)); 1125 return linked; 1126 } 1127 1128 gboolean 1129 discourse_is_active(const struct GNUNET_CHAT_Discourse *discourse, 1130 const struct GNUNET_CHAT_Contact *contact) 1131 { 1132 MESSENGER_DiscourseInfo* info = GNUNET_CHAT_discourse_get_user_pointer(discourse); 1133 1134 if (!info) 1135 return FALSE; 1136 1137 pthread_mutex_lock(&(info->mutex)); 1138 1139 GList *sub = info->subscriptions; 1140 MESSENGER_DiscourseSubscriptionInfo *sub_info = NULL; 1141 1142 while (sub) 1143 { 1144 sub_info = (MESSENGER_DiscourseSubscriptionInfo*) (sub->data); 1145 if ((sub_info) && (contact == sub_info->contact)) 1146 break; 1147 1148 sub_info = NULL; 1149 sub = g_list_next(sub); 1150 } 1151 1152 gboolean active = FALSE; 1153 if (!sub_info) 1154 goto unlock_info_mutex; 1155 1156 GstState state = GST_STATE_NULL; 1157 1158 if (sub_info->audio_stream_source) 1159 gst_element_get_state( 1160 sub_info->audio_stream_source, 1161 &state, 1162 NULL, 1163 GST_CLOCK_TIME_NONE 1164 ); 1165 1166 if (sub_info->video_stream_source) 1167 gst_element_get_state( 1168 sub_info->video_stream_source, 1169 &state, 1170 NULL, 1171 GST_CLOCK_TIME_NONE 1172 ); 1173 1174 if (GST_STATE_PLAYING != state) 1175 goto unlock_info_mutex; 1176 1177 pthread_mutex_lock(&(sub_info->mutex)); 1178 1179 if (!(sub_info->end_datetime)) 1180 goto unlock_sub_info_mutex; 1181 1182 GDateTime *dt = g_date_time_new_now_local(); 1183 if (dt) 1184 { 1185 GTimeSpan ts = g_date_time_difference(sub_info->end_datetime, dt); 1186 g_date_time_unref(dt); 1187 active = (ts >= 0); 1188 } 1189 1190 unlock_sub_info_mutex: 1191 pthread_mutex_unlock(&(sub_info->mutex)); 1192 1193 unlock_info_mutex: 1194 pthread_mutex_unlock(&(info->mutex)); 1195 return active; 1196 }