schedule.c (7796B)
1 /* 2 This file is part of GNUnet. 3 Copyright (C) 2024 GNUnet e.V. 4 5 GNUnet is free software: you can redistribute it and/or modify it 6 under the terms of the GNU Affero General Public License as published 7 by the Free Software Foundation, either version 3 of the License, 8 or (at your option) any later version. 9 10 GNUnet is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 Affero General Public License for more details. 14 15 You should have received a copy of the GNU Affero General Public License 16 along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 SPDX-License-Identifier: AGPL3.0-or-later 19 */ 20 /* 21 * @author Tobias Frisch 22 * @file schedule.c 23 */ 24 25 #include "schedule.h" 26 27 #include <glib-2.0/glib-unix.h> 28 #include <glib-2.0/glib.h> 29 #include <pthread.h> 30 31 #ifndef MESSENGER_APPLICATION_NO_EVENT_FD 32 #include <sys/eventfd.h> 33 #endif 34 35 static void 36 semaphore_init(MESSENGER_Semaphore *semaphore, 37 unsigned int val) 38 { 39 g_assert(semaphore); 40 41 g_assert(0 == pthread_mutex_init(&(semaphore->mutex), NULL)); 42 g_assert(0 == pthread_cond_init(&(semaphore->condition), NULL)); 43 44 semaphore->counter = val; 45 } 46 47 static void 48 semaphore_destroy(MESSENGER_Semaphore *semaphore) 49 { 50 g_assert(0 == pthread_cond_destroy(&(semaphore->condition))); 51 g_assert(0 == pthread_mutex_destroy(&(semaphore->mutex))); 52 } 53 54 static void 55 semaphore_down(MESSENGER_Semaphore *semaphore) 56 { 57 g_assert(semaphore); 58 59 g_assert(0 == pthread_mutex_lock(&(semaphore->mutex))); 60 while (0 == semaphore->counter) 61 pthread_cond_wait(&(semaphore->condition), &(semaphore->mutex)); 62 semaphore->counter--; 63 g_assert(0 == pthread_mutex_unlock(&(semaphore->mutex))); 64 } 65 66 static void 67 semaphore_up(MESSENGER_Semaphore *semaphore) 68 { 69 g_assert(semaphore); 70 71 g_assert(0 == pthread_mutex_lock(&(semaphore->mutex))); 72 semaphore->counter++; 73 g_assert(0 == pthread_mutex_unlock(&(semaphore->mutex))); 74 75 pthread_cond_signal(&(semaphore->condition)); 76 } 77 78 static void 79 signal_init(MESSENGER_SignalHandle *handle) 80 { 81 g_assert(handle); 82 83 #ifdef MESSENGER_APPLICATION_NO_EVENT_FD 84 g_assert(0 == pipe(handle->pipe_fds)); 85 #else 86 handle->event_fd = eventfd(0, 0); 87 g_assert(-1 != handle->event_fd); 88 #endif 89 } 90 91 static void 92 signal_destroy(MESSENGER_SignalHandle *handle) 93 { 94 g_assert(handle); 95 96 #ifdef MESSENGER_APPLICATION_NO_EVENT_FD 97 close(handle->pipe_fds[0]); 98 close(handle->pipe_fds[1]); 99 #else 100 close(handle->event_fd); 101 #endif 102 } 103 104 static int 105 signal_fd(MESSENGER_SignalHandle *handle, 106 unsigned char index) 107 { 108 g_assert(handle); 109 110 int fd; 111 112 #ifdef MESSENGER_APPLICATION_NO_EVENT_FD 113 fd = handle->pipe_fds[index]; 114 #else 115 fd = handle->event_fd; 116 #endif 117 118 g_assert(-1 != fd); 119 return fd; 120 } 121 122 static MESSENGER_ScheduleSignal 123 signal_read(MESSENGER_SignalHandle *handle) 124 { 125 const int fd = signal_fd(handle, 0); 126 127 #ifdef MESSENGER_APPLICATION_NO_EVENT_FD 128 unsigned char data; 129 #else 130 unsigned long data; 131 #endif 132 133 g_assert(sizeof(data) == read(fd, &data, sizeof(data))); 134 return (MESSENGER_ScheduleSignal) data; 135 } 136 137 static void 138 signal_write(MESSENGER_SignalHandle *handle, 139 MESSENGER_ScheduleSignal val) 140 { 141 const int fd = signal_fd(handle, 1); 142 143 #ifdef MESSENGER_APPLICATION_NO_EVENT_FD 144 const unsigned char data = (unsigned char) val; 145 #else 146 const unsigned long data = (unsigned long) val; 147 #endif 148 149 g_assert(sizeof(data) == write(fd, &data, sizeof(data))); 150 } 151 152 void 153 schedule_init(MESSENGER_Schedule *schedule) 154 { 155 g_assert(schedule); 156 memset(schedule, 0, sizeof(MESSENGER_Schedule)); 157 158 signal_init(&(schedule->push_signal)); 159 160 semaphore_init(&(schedule->push_sem), 0); 161 semaphore_init(&(schedule->sync_sem), 0); 162 } 163 164 static gboolean 165 __schedule_pushed_handling(MESSENGER_Schedule *schedule, 166 MESSENGER_ScheduleSignal val) 167 { 168 g_assert(schedule); 169 170 gboolean keep; 171 172 switch (val) 173 { 174 case MESSENGER_SCHEDULE_SIGNAL_RUN: 175 g_assert(schedule->function); 176 177 keep = schedule->function(schedule->data); 178 179 schedule->function = NULL; 180 schedule->data = NULL; 181 break; 182 case MESSENGER_SCHEDULE_SIGNAL_LOCK: 183 g_assert(!(schedule->function)); 184 185 keep = TRUE; 186 187 semaphore_up(&(schedule->sync_sem)); 188 semaphore_down(&(schedule->push_sem)); 189 break; 190 default: 191 return FALSE; 192 } 193 194 return keep; 195 } 196 197 static void 198 __schedule_exit_handling(MESSENGER_Schedule *schedule, 199 MESSENGER_ScheduleSignal val) 200 { 201 g_assert(schedule); 202 203 semaphore_up(&(schedule->sync_sem)); 204 } 205 206 static void 207 __schedule_setup_push_task(MESSENGER_Schedule *schedule); 208 209 static void 210 __schedule_pushed_task(void *cls) 211 { 212 MESSENGER_Schedule *schedule = cls; 213 MESSENGER_ScheduleSignal val; 214 215 g_assert(schedule); 216 schedule->task = NULL; 217 218 val = signal_read(&(schedule->push_signal)); 219 220 if (__schedule_pushed_handling(schedule, val)) 221 __schedule_setup_push_task(schedule); 222 223 __schedule_exit_handling(schedule, val); 224 } 225 226 static void 227 __schedule_setup_push_task(MESSENGER_Schedule *schedule) 228 { 229 struct GNUNET_NETWORK_FDSet *fd = GNUNET_NETWORK_fdset_create (); 230 GNUNET_NETWORK_fdset_set_native( 231 fd, 232 signal_fd(&(schedule->push_signal), 0) 233 ); 234 235 schedule->task = GNUNET_SCHEDULER_add_select( 236 GNUNET_SCHEDULER_PRIORITY_DEFAULT, 237 GNUNET_TIME_relative_get_forever_(), 238 fd, 239 NULL, 240 __schedule_pushed_task, 241 schedule 242 ); 243 244 GNUNET_NETWORK_fdset_destroy(fd); 245 } 246 247 void 248 schedule_load_gnunet(MESSENGER_Schedule *schedule) 249 { 250 g_assert(schedule); 251 __schedule_setup_push_task(schedule); 252 } 253 254 static gboolean 255 __schedule_pushed(gint fd, 256 GIOCondition condition, 257 gpointer user_data) 258 { 259 MESSENGER_Schedule *schedule = user_data; 260 MESSENGER_ScheduleSignal val; 261 gboolean keep; 262 guint task; 263 264 g_assert(schedule); 265 task = schedule->poll; 266 schedule->poll = 0; 267 268 val = signal_read(&(schedule->push_signal)); 269 270 keep = __schedule_pushed_handling(schedule, val); 271 272 if (keep) 273 schedule->poll = task; 274 275 __schedule_exit_handling(schedule, val); 276 return keep; 277 } 278 279 void 280 schedule_load_glib(MESSENGER_Schedule *schedule) 281 { 282 g_assert(schedule); 283 schedule->poll = g_unix_fd_add( 284 signal_fd(&(schedule->push_signal), 0), 285 G_IO_IN, 286 __schedule_pushed, 287 schedule 288 ); 289 } 290 291 void 292 schedule_cleanup(MESSENGER_Schedule *schedule) 293 { 294 g_assert(schedule); 295 296 if (schedule->task) 297 GNUNET_SCHEDULER_cancel(schedule->task); 298 if (schedule->poll) 299 g_source_remove(schedule->poll); 300 301 semaphore_destroy(&(schedule->push_sem)); 302 semaphore_destroy(&(schedule->sync_sem)); 303 304 signal_destroy(&(schedule->push_signal)); 305 306 memset(schedule, 0, sizeof(MESSENGER_Schedule)); 307 } 308 309 void 310 schedule_sync_run(MESSENGER_Schedule *schedule, 311 GSourceFunc function, 312 gpointer data) 313 { 314 g_assert( 315 (schedule) && 316 (!(schedule->locked)) && 317 (!(schedule->function)) && 318 (function) 319 ); 320 321 schedule->function = function; 322 schedule->data = data; 323 324 const MESSENGER_ScheduleSignal push = MESSENGER_SCHEDULE_SIGNAL_RUN; 325 326 signal_write(&(schedule->push_signal), push); 327 semaphore_down(&(schedule->sync_sem)); 328 } 329 330 void 331 schedule_sync_lock(MESSENGER_Schedule *schedule) 332 { 333 g_assert( 334 (schedule) && 335 (!(schedule->locked)) && 336 (!(schedule->function)) 337 ); 338 339 const MESSENGER_ScheduleSignal push = MESSENGER_SCHEDULE_SIGNAL_LOCK; 340 341 signal_write(&(schedule->push_signal), push); 342 semaphore_down(&(schedule->sync_sem)); 343 344 schedule->locked = TRUE; 345 } 346 347 void 348 schedule_sync_unlock(MESSENGER_Schedule *schedule) 349 { 350 g_assert( 351 (schedule) && 352 (schedule->locked) && 353 (!(schedule->function)) 354 ); 355 356 semaphore_up(&(schedule->push_sem)); 357 semaphore_down(&(schedule->sync_sem)); 358 359 schedule->locked = FALSE; 360 }