commit a45f482e6d2786575a2a84a9a7ec55c2ddfc4c91
parent f65fb1f6abae3dcd9adfb3fae0b90ab1c4d660f5
Author: Jacki <jacki@thejackimonster.de>
Date: Mon, 29 Jul 2024 15:02:28 +0200
Fix undefined behavior of mutexes via semaphores and optimize scheduling via eventfd
Signed-off-by: Jacki <jacki@thejackimonster.de>
Diffstat:
| M | src/schedule.c | | | 179 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------- |
| M | src/schedule.h | | | 22 | ++++++++++++++++++---- |
2 files changed, 168 insertions(+), 33 deletions(-)
diff --git a/src/schedule.c b/src/schedule.c
@@ -28,17 +28,138 @@
#include <glib-2.0/glib.h>
#include <pthread.h>
+#ifndef MESSENGER_APPLICATION_NO_EVENT_FD
+#include <sys/eventfd.h>
+#endif
+
+static void
+semaphore_init(MESSENGER_Semaphore *semaphore,
+ unsigned int val)
+{
+ g_assert(semaphore);
+
+ g_assert(0 == pthread_mutex_init(&(semaphore->mutex), NULL));
+ g_assert(0 == pthread_cond_init(&(semaphore->condition), NULL));
+
+ semaphore->counter = val;
+}
+
+static void
+semaphore_destroy(MESSENGER_Semaphore *semaphore)
+{
+ g_assert(0 == pthread_cond_destroy(&(semaphore->condition)));
+ g_assert(0 == pthread_mutex_destroy(&(semaphore->mutex)));
+}
+
+static void
+semaphore_down(MESSENGER_Semaphore *semaphore)
+{
+ g_assert(semaphore);
+
+ g_assert(0 == pthread_mutex_lock(&(semaphore->mutex)));
+ while (0 == semaphore->counter)
+ pthread_cond_wait(&(semaphore->condition), &(semaphore->mutex));
+ semaphore->counter--;
+ g_assert(0 == pthread_mutex_unlock(&(semaphore->mutex)));
+}
+
+static void
+semaphore_up(MESSENGER_Semaphore *semaphore)
+{
+ g_assert(semaphore);
+
+ g_assert(0 == pthread_mutex_lock(&(semaphore->mutex)));
+ semaphore->counter++;
+ g_assert(0 == pthread_mutex_unlock(&(semaphore->mutex)));
+
+ pthread_cond_signal(&(semaphore->condition));
+}
+
+static void
+signal_init(MESSENGER_SignalHandle *handle)
+{
+ g_assert(handle);
+
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ g_assert(0 == pipe(handle->pipe_fds));
+#else
+ handle->event_fd = eventfd(0, 0);
+ g_assert(-1 != handle->event_fd);
+#endif
+}
+
+static void
+signal_destroy(MESSENGER_SignalHandle *handle)
+{
+ g_assert(handle);
+
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ close(handle->pipe_fds[0]);
+ close(handle->pipe_fds[1]);
+#else
+ close(handle->event_fd);
+#endif
+}
+
+static int
+signal_fd(MESSENGER_SignalHandle *handle,
+ unsigned char index)
+{
+ g_assert(handle);
+
+ int fd;
+
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ fd = handle->pipe_fds[index];
+#else
+ fd = handle->event_fd;
+#endif
+
+ g_assert(-1 != fd);
+ return fd;
+}
+
+static MESSENGER_ScheduleSignal
+signal_read(MESSENGER_SignalHandle *handle)
+{
+ const int fd = signal_fd(handle, 0);
+
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ unsigned char data;
+#else
+ unsigned long data;
+#endif
+
+ g_assert(sizeof(data) == read(fd, &data, sizeof(data)));
+ return (MESSENGER_ScheduleSignal) data;
+}
+
+static void
+signal_write(MESSENGER_SignalHandle *handle,
+ MESSENGER_ScheduleSignal val)
+{
+ const int fd = signal_fd(handle, 1);
+
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ const unsigned char data = (unsigned char) val;
+#else
+ const unsigned long data = (unsigned long) val;
+#endif
+
+ g_assert(sizeof(data) == write(fd, &data, sizeof(data)));
+}
+
void
schedule_init(MESSENGER_Schedule *schedule)
{
g_assert(schedule);
memset(schedule, 0, sizeof(MESSENGER_Schedule));
- g_assert(0 == pipe(schedule->push_pipe));
- g_assert(0 == pipe(schedule->sync_pipe));
+ signal_init(&(schedule->push_signal));
+ signal_init(&(schedule->sync_signal));
- pthread_mutex_init(&(schedule->push_mutex), NULL);
- pthread_mutex_init(&(schedule->sync_mutex), NULL);
+ semaphore_init(&(schedule->push_sem), 0);
+ semaphore_init(&(schedule->sync_sem), 0);
}
static gboolean
@@ -64,9 +185,9 @@ __schedule_pushed_handling(MESSENGER_Schedule *schedule,
keep = TRUE;
- pthread_mutex_unlock(&(schedule->push_mutex));
- pthread_mutex_lock(&(schedule->sync_mutex));
- pthread_mutex_unlock(&(schedule->sync_mutex));
+ semaphore_down(&(schedule->push_sem));
+ semaphore_up(&(schedule->sync_sem));
+ semaphore_down(&(schedule->sync_sem));
break;
default:
return FALSE;
@@ -87,19 +208,22 @@ __schedule_pushed_task(void *cls)
g_assert(schedule);
schedule->task = NULL;
- g_assert(sizeof(val) == read(schedule->push_pipe[0], &val, sizeof(val)));
+ val = signal_read(&(schedule->push_signal));
if (__schedule_pushed_handling(schedule, val))
__schedule_setup_push_task(schedule);
- g_assert(sizeof(val) == write(schedule->sync_pipe[1], &val, sizeof(val)));
+ signal_write(&(schedule->sync_signal), val);
}
static void
__schedule_setup_push_task(MESSENGER_Schedule *schedule)
{
struct GNUNET_NETWORK_FDSet *fd = GNUNET_NETWORK_fdset_create ();
- GNUNET_NETWORK_fdset_set_native(fd, schedule->push_pipe[0]);
+ GNUNET_NETWORK_fdset_set_native(
+ fd,
+ signal_fd(&(schedule->push_signal), 0)
+ );
schedule->task = GNUNET_SCHEDULER_add_select(
GNUNET_SCHEDULER_PRIORITY_DEFAULT,
@@ -134,14 +258,14 @@ __schedule_pushed(gint fd,
task = schedule->poll;
schedule->poll = 0;
- g_assert(sizeof(val) == read(schedule->push_pipe[0], &val, sizeof(val)));
+ val = signal_read(&(schedule->push_signal));
keep = __schedule_pushed_handling(schedule, val);
if (keep)
schedule->poll = task;
- g_assert(sizeof(val) == write(schedule->sync_pipe[1], &val, sizeof(val)));
+ signal_write(&(schedule->sync_signal), val);
return keep;
}
@@ -150,7 +274,7 @@ schedule_load_glib(MESSENGER_Schedule *schedule)
{
g_assert(schedule);
schedule->poll = g_unix_fd_add(
- schedule->push_pipe[0],
+ signal_fd(&(schedule->push_signal), 0),
G_IO_IN,
__schedule_pushed,
schedule
@@ -167,14 +291,11 @@ schedule_cleanup(MESSENGER_Schedule *schedule)
if (schedule->poll)
g_source_remove(schedule->poll);
- pthread_mutex_destroy(&(schedule->push_mutex));
- pthread_mutex_destroy(&(schedule->sync_mutex));
-
- close(schedule->push_pipe[0]);
- close(schedule->push_pipe[1]);
+ semaphore_destroy(&(schedule->push_sem));
+ semaphore_destroy(&(schedule->sync_sem));
- close(schedule->sync_pipe[0]);
- close(schedule->sync_pipe[1]);
+ signal_destroy(&(schedule->push_signal));
+ signal_destroy(&(schedule->sync_signal));
memset(schedule, 0, sizeof(MESSENGER_Schedule));
}
@@ -197,8 +318,8 @@ schedule_sync_run(MESSENGER_Schedule *schedule,
const MESSENGER_ScheduleSignal push = MESSENGER_SCHEDULE_SIGNAL_RUN;
MESSENGER_ScheduleSignal sync;
- g_assert(sizeof(push) == write(schedule->push_pipe[1], &push, sizeof(push)));
- g_assert(sizeof(sync) == read(schedule->sync_pipe[0], &sync, sizeof(sync)));
+ signal_write(&(schedule->push_signal), push);
+ sync = signal_read(&(schedule->sync_signal));
g_assert(push == sync);
}
@@ -213,13 +334,13 @@ schedule_sync_lock(MESSENGER_Schedule *schedule)
const MESSENGER_ScheduleSignal push = MESSENGER_SCHEDULE_SIGNAL_LOCK;
- pthread_mutex_lock(&(schedule->push_mutex));
- pthread_mutex_lock(&(schedule->sync_mutex));
+ semaphore_up(&(schedule->push_sem));
+ semaphore_up(&(schedule->sync_sem));
- g_assert(sizeof(push) == write(schedule->push_pipe[1], &push, sizeof(push)));
+ signal_write(&(schedule->push_signal), push);
- pthread_mutex_lock(&(schedule->push_mutex));
- pthread_mutex_unlock(&(schedule->push_mutex));
+ semaphore_up(&(schedule->push_sem));
+ semaphore_down(&(schedule->push_sem));
schedule->locked = TRUE;
}
@@ -235,9 +356,9 @@ schedule_sync_unlock(MESSENGER_Schedule *schedule)
MESSENGER_ScheduleSignal sync;
- pthread_mutex_unlock(&(schedule->sync_mutex));
+ semaphore_down(&(schedule->sync_sem));
- g_assert(sizeof(sync) == read(schedule->sync_pipe[0], &sync, sizeof(sync)));
+ sync = signal_read(&(schedule->sync_signal));
g_assert(MESSENGER_SCHEDULE_SIGNAL_LOCK == sync);
schedule->locked = FALSE;
diff --git a/src/schedule.h b/src/schedule.h
@@ -29,17 +29,31 @@
#include <gnunet/gnunet_util_lib.h>
#include <pthread.h>
+typedef struct MESSENGER_Semaphore {
+ pthread_mutex_t mutex;
+ pthread_cond_t condition;
+ unsigned int counter;
+} MESSENGER_Semaphore;
+
+typedef struct MESSENGER_SignalHandle {
+#ifdef MESSENGER_APPLICATION_NO_EVENT_FD
+ int pipe_fds [2];
+#else
+ int event_fd;
+#endif
+} MESSENGER_SignalHandle;
+
typedef enum MESSENGER_ScheduleSignal : unsigned char {
MESSENGER_SCHEDULE_SIGNAL_RUN = 1,
MESSENGER_SCHEDULE_SIGNAL_LOCK = 2,
} MESSENGER_ScheduleSignal;
typedef struct MESSENGER_Schedule {
- int push_pipe [2];
- int sync_pipe [2];
+ MESSENGER_SignalHandle push_signal;
+ MESSENGER_SignalHandle sync_signal;
- pthread_mutex_t push_mutex;
- pthread_mutex_t sync_mutex;
+ MESSENGER_Semaphore push_sem;
+ MESSENGER_Semaphore sync_sem;
gboolean locked;
GSourceFunc function;