commit 538f92836127c6f03fb22a1aee79461ea9222d54
parent 1b7c398a3040ea22b897f48c08e831d12f275b7b
Author: Jacki <jacki@thejackimonster.de>
Date: Thu, 11 Jul 2024 21:09:00 +0200
Support piping into discourse to avoid synchronization issues
Signed-off-by: Jacki <jacki@thejackimonster.de>
Diffstat:
5 files changed, 157 insertions(+), 1 deletion(-)
diff --git a/include/gnunet/gnunet_chat_lib.h b/include/gnunet/gnunet_chat_lib.h
@@ -1887,6 +1887,16 @@ GNUNET_CHAT_discourse_write (struct GNUNET_CHAT_Discourse *discourse,
uint64_t size);
/**
+ * Returns a file descriptor of a pipe to write data via IPC
+ * into the given chat <i>discourse</i>.
+ *
+ * @param[in] discourse Chat discourse
+ * @return File descriptor of pipe end or #GNUNET_SYSERR on failure
+ */
+int
+GNUNET_CHAT_discourse_get_fd (const struct GNUNET_CHAT_Discourse *discourse);
+
+/**
* Iterates through the subscribed chat contacts of a given chat <i>discourse</i>
* with a selected callback and custom closure.
*
diff --git a/src/gnunet_chat_discourse.c b/src/gnunet_chat_discourse.c
@@ -27,6 +27,10 @@
#include <gnunet/gnunet_scheduler_lib.h>
#include <gnunet/gnunet_time_lib.h>
+#include <unistd.h>
+
+#include "gnunet_chat_discourse_intern.c"
+
struct GNUNET_CHAT_Discourse*
discourse_create (struct GNUNET_CHAT_Context *context,
const struct GNUNET_ShortHashCode *id)
@@ -39,9 +43,19 @@ discourse_create (struct GNUNET_CHAT_Context *context,
GNUNET_memcpy(&(discourse->id), id, sizeof (struct GNUNET_ShortHashCode));
+ if (0 != pipe(discourse->pipe))
+ {
+ discourse->pipe[0] = -1;
+ discourse->pipe[1] = -1;
+ }
+
discourse->head = NULL;
discourse->tail = NULL;
+ discourse->pipe_task = GNUNET_SCHEDULER_add_now(
+ cb_reinit_discourse_pipe, discourse
+ );
+
discourse->user_pointer = NULL;
return discourse;
@@ -68,7 +82,22 @@ discourse_destroy (struct GNUNET_CHAT_Discourse *discourse)
GNUNET_assert(discourse);
while (discourse->head)
- discourse_remove_subscription (discourse->head);
+ {
+ struct GNUNET_CHAT_DiscourseSubscription *sub = discourse->head;
+
+ if (sub->task)
+ GNUNET_SCHEDULER_cancel(sub->task);
+
+ discourse_remove_subscription(sub);
+ }
+
+ if (discourse->pipe_task)
+ GNUNET_SCHEDULER_cancel(discourse->pipe_task);
+
+ if (-1 != discourse->pipe[0])
+ close(discourse->pipe[0]);
+ if (-1 != discourse->pipe[1])
+ close(discourse->pipe[1]);
GNUNET_free(discourse);
}
diff --git a/src/gnunet_chat_discourse.h b/src/gnunet_chat_discourse.h
@@ -54,10 +54,13 @@ struct GNUNET_CHAT_Discourse
struct GNUNET_CHAT_Context *context;
struct GNUNET_ShortHashCode id;
+ int pipe [2];
struct GNUNET_CHAT_DiscourseSubscription *head;
struct GNUNET_CHAT_DiscourseSubscription *tail;
+ struct GNUNET_SCHEDULER_Task *pipe_task;
+
void *user_pointer;
};
diff --git a/src/gnunet_chat_discourse_intern.c b/src/gnunet_chat_discourse_intern.c
@@ -0,0 +1,102 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2024 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/*
+ * @author Tobias Frisch
+ * @file gnunet_chat_discourse_intern.c
+ */
+
+#include "gnunet_chat_context.h"
+
+#define GNUNET_UNUSED __attribute__ ((unused))
+
+#define MAX_WRITE_SIZE ( \
+ GNUNET_MAX_MESSAGE_SIZE - \
+ GNUNET_MIN_MESSAGE_SIZE - \
+ sizeof (struct GNUNET_MESSENGER_Message))
+
+static void
+cb_read_discourse_pipe (void *cls);
+
+void
+cb_reinit_discourse_pipe (void *cls)
+{
+ struct GNUNET_CHAT_Discourse *discourse = cls;
+
+ GNUNET_assert(discourse);
+
+ discourse->pipe_task = NULL;
+
+ if (-1 == discourse->pipe[0])
+ return;
+
+ struct GNUNET_NETWORK_FDSet *rs = GNUNET_NETWORK_fdset_create();
+
+ GNUNET_NETWORK_fdset_set_native(rs, discourse->pipe[0]);
+
+ discourse->pipe_task = GNUNET_SCHEDULER_add_select(
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ rs,
+ NULL,
+ cb_read_discourse_pipe,
+ discourse
+ );
+
+ GNUNET_NETWORK_fdset_destroy(rs);
+}
+
+static void
+cb_read_discourse_pipe (void *cls)
+{
+ struct GNUNET_CHAT_Discourse *discourse = cls;
+
+ GNUNET_assert((discourse) && (-1 != discourse->pipe[0]));
+
+ discourse->pipe_task = NULL;
+
+ struct GNUNET_MESSENGER_Message msg;
+ memset(&msg, 0, sizeof(msg));
+
+ msg.header.kind = GNUNET_MESSENGER_KIND_TALK;
+
+ char data [MAX_WRITE_SIZE];
+ ssize_t len;
+
+ do
+ {
+ len = read(discourse->pipe[0], data, MAX_WRITE_SIZE);
+
+ if (len <= 0)
+ break;
+
+ msg.body.talk.data = data;
+ msg.body.talk.length = (uint16_t) len;
+
+ GNUNET_MESSENGER_send_message(discourse->context->room, &msg, NULL);
+ }
+ while (MAX_WRITE_SIZE == len);
+
+ if (len < 0)
+ return;
+
+ discourse->pipe_task = GNUNET_SCHEDULER_add_now(
+ cb_reinit_discourse_pipe, discourse
+ );
+}
diff --git a/src/gnunet_chat_lib.c b/src/gnunet_chat_lib.c
@@ -3007,6 +3007,18 @@ GNUNET_CHAT_discourse_write (struct GNUNET_CHAT_Discourse *discourse,
int
+GNUNET_CHAT_discourse_get_fd (const struct GNUNET_CHAT_Discourse *discourse)
+{
+ GNUNET_CHAT_VERSION_ASSERT();
+
+ if (! discourse)
+ return GNUNET_SYSERR;
+
+ return discourse->pipe[1];
+}
+
+
+int
GNUNET_CHAT_discourse_iterate_contacts (const struct GNUNET_CHAT_Discourse *discourse,
GNUNET_CHAT_DiscourseContactCallback callback,
void *cls)