diff options
author | Florian Dold <florian.dold@gmail.com> | 2016-10-16 22:07:40 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2016-10-16 22:07:40 +0000 |
commit | b368ebf988178ed83775a94b604885aa89e25406 (patch) | |
tree | afc8e92ef333627d708a22e2311db4a343e10dbd /src/util | |
parent | 903a1d69865a112bc2f5e015aaf4f7c172d39556 (diff) | |
download | gnunet-b368ebf988178ed83775a94b604885aa89e25406.tar.gz gnunet-b368ebf988178ed83775a94b604885aa89e25406.zip |
implement impl_in_flight API for MQ, replacing evacuation
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/mq.c | 133 |
1 files changed, 79 insertions, 54 deletions
diff --git a/src/util/mq.c b/src/util/mq.c index da6c0b86f..6d3517dae 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -145,14 +145,21 @@ struct GNUNET_MQ_Handle | |||
145 | struct GNUNET_MQ_Envelope *current_envelope; | 145 | struct GNUNET_MQ_Envelope *current_envelope; |
146 | 146 | ||
147 | /** | 147 | /** |
148 | * GNUNET_YES if the sent notification was called | ||
149 | * for the current envelope. | ||
150 | */ | ||
151 | int send_notification_called; | ||
152 | |||
153 | /** | ||
148 | * Map of associations, lazily allocated | 154 | * Map of associations, lazily allocated |
149 | */ | 155 | */ |
150 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | 156 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; |
151 | 157 | ||
152 | /** | 158 | /** |
153 | * Task scheduled during #GNUNET_MQ_impl_send_continue. | 159 | * Task scheduled during #GNUNET_MQ_impl_send_continue |
160 | * or #GNUNET_MQ_impl_send_in_flight | ||
154 | */ | 161 | */ |
155 | struct GNUNET_SCHEDULER_Task *continue_task; | 162 | struct GNUNET_SCHEDULER_Task *send_task; |
156 | 163 | ||
157 | /** | 164 | /** |
158 | * Functions to call on queue destruction; kept in a DLL. | 165 | * Functions to call on queue destruction; kept in a DLL. |
@@ -344,8 +351,7 @@ void | |||
344 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) | 351 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) |
345 | { | 352 | { |
346 | GNUNET_assert (NULL == ev->parent_queue); | 353 | GNUNET_assert (NULL == ev->parent_queue); |
347 | /* also frees ev */ | 354 | GNUNET_free (ev); |
348 | GNUNET_free (ev->mh); | ||
349 | } | 355 | } |
350 | 356 | ||
351 | 357 | ||
@@ -421,6 +427,34 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq, | |||
421 | } | 427 | } |
422 | 428 | ||
423 | 429 | ||
430 | /** | ||
431 | * Task run to call the send notification for the next queued | ||
432 | * message, if any. Only useful for implementing message queues, | ||
433 | * results in undefined behavior if not used carefully. | ||
434 | * | ||
435 | * @param cls message queue to send the next message with | ||
436 | */ | ||
437 | static void | ||
438 | impl_send_in_flight (void *cls) | ||
439 | { | ||
440 | struct GNUNET_MQ_Handle *mq = cls; | ||
441 | struct GNUNET_MQ_Envelope *current_envelope; | ||
442 | |||
443 | mq->send_task = NULL; | ||
444 | /* call is only valid if we're actually currently sending | ||
445 | * a message */ | ||
446 | current_envelope = mq->current_envelope; | ||
447 | GNUNET_assert (NULL != current_envelope); | ||
448 | /* can't call cancel from now on anymore */ | ||
449 | current_envelope->parent_queue = NULL; | ||
450 | if ( (GNUNET_NO == mq->send_notification_called) && | ||
451 | (NULL != current_envelope->sent_cb) ) | ||
452 | { | ||
453 | current_envelope->sent_cb (current_envelope->sent_cls); | ||
454 | } | ||
455 | mq->send_notification_called = GNUNET_YES; | ||
456 | } | ||
457 | |||
424 | 458 | ||
425 | /** | 459 | /** |
426 | * Task run to call the send implementation for the next queued | 460 | * Task run to call the send implementation for the next queued |
@@ -435,12 +469,12 @@ impl_send_continue (void *cls) | |||
435 | struct GNUNET_MQ_Handle *mq = cls; | 469 | struct GNUNET_MQ_Handle *mq = cls; |
436 | struct GNUNET_MQ_Envelope *current_envelope; | 470 | struct GNUNET_MQ_Envelope *current_envelope; |
437 | 471 | ||
438 | mq->continue_task = NULL; | 472 | mq->send_task = NULL; |
439 | /* call is only valid if we're actually currently sending | 473 | /* call is only valid if we're actually currently sending |
440 | * a message */ | 474 | * a message */ |
441 | current_envelope = mq->current_envelope; | 475 | current_envelope = mq->current_envelope; |
442 | GNUNET_assert (NULL != current_envelope); | 476 | GNUNET_assert (NULL != current_envelope); |
443 | current_envelope->parent_queue = NULL; | 477 | impl_send_in_flight (mq); |
444 | GNUNET_assert (0 < mq->queue_length); | 478 | GNUNET_assert (0 < mq->queue_length); |
445 | mq->queue_length--; | 479 | mq->queue_length--; |
446 | if (NULL == mq->envelope_head) | 480 | if (NULL == mq->envelope_head) |
@@ -453,14 +487,12 @@ impl_send_continue (void *cls) | |||
453 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 487 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
454 | mq->envelope_tail, | 488 | mq->envelope_tail, |
455 | mq->current_envelope); | 489 | mq->current_envelope); |
490 | mq->send_notification_called = GNUNET_NO; | ||
456 | mq->send_impl (mq, | 491 | mq->send_impl (mq, |
457 | mq->current_envelope->mh, | 492 | mq->current_envelope->mh, |
458 | mq->impl_state); | 493 | mq->impl_state); |
459 | } | 494 | } |
460 | if (NULL != current_envelope->sent_cb) | 495 | GNUNET_free (current_envelope); |
461 | current_envelope->sent_cb (current_envelope->sent_cls); | ||
462 | /* also frees current_envelope */ | ||
463 | GNUNET_free (current_envelope->mh); | ||
464 | } | 496 | } |
465 | 497 | ||
466 | 498 | ||
@@ -474,9 +506,32 @@ impl_send_continue (void *cls) | |||
474 | void | 506 | void |
475 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) | 507 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) |
476 | { | 508 | { |
477 | GNUNET_assert (NULL == mq->continue_task); | 509 | /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ |
478 | mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, | 510 | if (NULL != mq->send_task) |
479 | mq); | 511 | { |
512 | GNUNET_SCHEDULER_cancel (mq->send_task); | ||
513 | } | ||
514 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, | ||
515 | mq); | ||
516 | } | ||
517 | |||
518 | |||
519 | /** | ||
520 | * Call the send notification for the current message, but do not | ||
521 | * try to send the message until #gnunet_mq_impl_send_continue | ||
522 | * is called. | ||
523 | * | ||
524 | * only useful for implementing message queues, results in undefined | ||
525 | * behavior if not used carefully. | ||
526 | * | ||
527 | * @param mq message queue to send the next message with | ||
528 | */ | ||
529 | void | ||
530 | GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) | ||
531 | { | ||
532 | GNUNET_assert (NULL == mq->send_task); | ||
533 | mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, | ||
534 | mq); | ||
480 | } | 535 | } |
481 | 536 | ||
482 | 537 | ||
@@ -592,11 +647,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, | |||
592 | uint16_t type) | 647 | uint16_t type) |
593 | { | 648 | { |
594 | struct GNUNET_MQ_Envelope *ev; | 649 | struct GNUNET_MQ_Envelope *ev; |
595 | void *mem; | ||
596 | 650 | ||
597 | mem = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); | 651 | ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); |
598 | ev = mem + size; | 652 | ev->mh = (struct GNUNET_MessageHeader *) &ev[1]; |
599 | ev->mh = mem; | ||
600 | ev->mh->size = htons (size); | 653 | ev->mh->size = htons (size); |
601 | ev->mh->type = htons (type); | 654 | ev->mh->type = htons (type); |
602 | if (NULL != mhp) | 655 | if (NULL != mhp) |
@@ -867,10 +920,10 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq, | |||
867 | GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); | 920 | GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); |
868 | state->th = NULL; | 921 | state->th = NULL; |
869 | } | 922 | } |
870 | else if (NULL != mq->continue_task) | 923 | else if (NULL != mq->send_task) |
871 | { | 924 | { |
872 | GNUNET_SCHEDULER_cancel (mq->continue_task); | 925 | GNUNET_SCHEDULER_cancel (mq->send_task); |
873 | mq->continue_task = NULL; | 926 | mq->send_task = NULL; |
874 | } | 927 | } |
875 | else | 928 | else |
876 | GNUNET_assert (0); | 929 | GNUNET_assert (0); |
@@ -1028,10 +1081,10 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) | |||
1028 | { | 1081 | { |
1029 | mq->destroy_impl (mq, mq->impl_state); | 1082 | mq->destroy_impl (mq, mq->impl_state); |
1030 | } | 1083 | } |
1031 | if (NULL != mq->continue_task) | 1084 | if (NULL != mq->send_task) |
1032 | { | 1085 | { |
1033 | GNUNET_SCHEDULER_cancel (mq->continue_task); | 1086 | GNUNET_SCHEDULER_cancel (mq->send_task); |
1034 | mq->continue_task = NULL; | 1087 | mq->send_task = NULL; |
1035 | } | 1088 | } |
1036 | while (NULL != mq->envelope_head) | 1089 | while (NULL != mq->envelope_head) |
1037 | { | 1090 | { |
@@ -1136,6 +1189,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
1136 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, | 1189 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, |
1137 | mq->envelope_tail, | 1190 | mq->envelope_tail, |
1138 | mq->current_envelope); | 1191 | mq->current_envelope); |
1192 | mq->send_notification_called = GNUNET_NO; | ||
1139 | mq->send_impl (mq, | 1193 | mq->send_impl (mq, |
1140 | mq->current_envelope->mh, | 1194 | mq->current_envelope->mh, |
1141 | mq->impl_state); | 1195 | mq->impl_state); |
@@ -1154,8 +1208,9 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
1154 | if (GNUNET_YES != mq->evacuate_called) | 1208 | if (GNUNET_YES != mq->evacuate_called) |
1155 | { | 1209 | { |
1156 | ev->parent_queue = NULL; | 1210 | ev->parent_queue = NULL; |
1211 | ev->mh = NULL; | ||
1157 | /* also frees ev */ | 1212 | /* also frees ev */ |
1158 | GNUNET_free (ev->mh); | 1213 | GNUNET_free (ev); |
1159 | } | 1214 | } |
1160 | } | 1215 | } |
1161 | 1216 | ||
@@ -1299,34 +1354,4 @@ GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh | |||
1299 | } | 1354 | } |
1300 | 1355 | ||
1301 | 1356 | ||
1302 | /** | ||
1303 | * Get the message that is currently being sent when cancellation of that | ||
1304 | * message is requested. The returned buffer must be freed by the caller. | ||
1305 | * | ||
1306 | * This function may be called at most once in the cancel_impl | ||
1307 | * function of a message queue. | ||
1308 | * | ||
1309 | * Use this function to avoid copying a half-sent message. | ||
1310 | * | ||
1311 | * @param mq message queue | ||
1312 | * @return pointer to store the message being canceled, | ||
1313 | * must be freed by the caller | ||
1314 | */ | ||
1315 | struct GNUNET_MessageHeader * | ||
1316 | GNUNET_MQ_impl_cancel_evacuate (struct GNUNET_MQ_Handle *mq) | ||
1317 | { | ||
1318 | struct GNUNET_MessageHeader *mh; | ||
1319 | |||
1320 | GNUNET_assert (GNUNET_NO == mq->evacuate_called); | ||
1321 | GNUNET_assert (NULL != mq->current_envelope); | ||
1322 | |||
1323 | mq->evacuate_called = GNUNET_YES; | ||
1324 | mh = mq->current_envelope->mh; | ||
1325 | mq->current_envelope->parent_queue = NULL; | ||
1326 | mq->current_envelope = NULL; | ||
1327 | |||
1328 | return mh; | ||
1329 | } | ||
1330 | |||
1331 | |||
1332 | /* end of mq.c */ | 1357 | /* end of mq.c */ |