aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-10-16 22:07:40 +0000
committerFlorian Dold <florian.dold@gmail.com>2016-10-16 22:07:40 +0000
commitb368ebf988178ed83775a94b604885aa89e25406 (patch)
treeafc8e92ef333627d708a22e2311db4a343e10dbd /src/util
parent903a1d69865a112bc2f5e015aaf4f7c172d39556 (diff)
downloadgnunet-b368ebf988178ed83775a94b604885aa89e25406.tar.gz
gnunet-b368ebf988178ed83775a94b604885aa89e25406.zip
implement impl_in_flight API for MQ, replacing evacuation
Diffstat (limited to 'src/util')
-rw-r--r--src/util/mq.c133
1 files changed, 79 insertions, 54 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index da6c0b86f..6d3517dae 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -145,14 +145,21 @@ struct GNUNET_MQ_Handle
145 struct GNUNET_MQ_Envelope *current_envelope; 145 struct GNUNET_MQ_Envelope *current_envelope;
146 146
147 /** 147 /**
148 * GNUNET_YES if the sent notification was called
149 * for the current envelope.
150 */
151 int send_notification_called;
152
153 /**
148 * Map of associations, lazily allocated 154 * Map of associations, lazily allocated
149 */ 155 */
150 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; 156 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
151 157
152 /** 158 /**
153 * Task scheduled during #GNUNET_MQ_impl_send_continue. 159 * Task scheduled during #GNUNET_MQ_impl_send_continue
160 * or #GNUNET_MQ_impl_send_in_flight
154 */ 161 */
155 struct GNUNET_SCHEDULER_Task *continue_task; 162 struct GNUNET_SCHEDULER_Task *send_task;
156 163
157 /** 164 /**
158 * Functions to call on queue destruction; kept in a DLL. 165 * Functions to call on queue destruction; kept in a DLL.
@@ -344,8 +351,7 @@ void
344GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev) 351GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
345{ 352{
346 GNUNET_assert (NULL == ev->parent_queue); 353 GNUNET_assert (NULL == ev->parent_queue);
347 /* also frees ev */ 354 GNUNET_free (ev);
348 GNUNET_free (ev->mh);
349} 355}
350 356
351 357
@@ -421,6 +427,34 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
421} 427}
422 428
423 429
430/**
431 * Task run to call the send notification for the next queued
432 * message, if any. Only useful for implementing message queues,
433 * results in undefined behavior if not used carefully.
434 *
435 * @param cls message queue to send the next message with
436 */
437static void
438impl_send_in_flight (void *cls)
439{
440 struct GNUNET_MQ_Handle *mq = cls;
441 struct GNUNET_MQ_Envelope *current_envelope;
442
443 mq->send_task = NULL;
444 /* call is only valid if we're actually currently sending
445 * a message */
446 current_envelope = mq->current_envelope;
447 GNUNET_assert (NULL != current_envelope);
448 /* can't call cancel from now on anymore */
449 current_envelope->parent_queue = NULL;
450 if ( (GNUNET_NO == mq->send_notification_called) &&
451 (NULL != current_envelope->sent_cb) )
452 {
453 current_envelope->sent_cb (current_envelope->sent_cls);
454 }
455 mq->send_notification_called = GNUNET_YES;
456}
457
424 458
425/** 459/**
426 * Task run to call the send implementation for the next queued 460 * Task run to call the send implementation for the next queued
@@ -435,12 +469,12 @@ impl_send_continue (void *cls)
435 struct GNUNET_MQ_Handle *mq = cls; 469 struct GNUNET_MQ_Handle *mq = cls;
436 struct GNUNET_MQ_Envelope *current_envelope; 470 struct GNUNET_MQ_Envelope *current_envelope;
437 471
438 mq->continue_task = NULL; 472 mq->send_task = NULL;
439 /* call is only valid if we're actually currently sending 473 /* call is only valid if we're actually currently sending
440 * a message */ 474 * a message */
441 current_envelope = mq->current_envelope; 475 current_envelope = mq->current_envelope;
442 GNUNET_assert (NULL != current_envelope); 476 GNUNET_assert (NULL != current_envelope);
443 current_envelope->parent_queue = NULL; 477 impl_send_in_flight (mq);
444 GNUNET_assert (0 < mq->queue_length); 478 GNUNET_assert (0 < mq->queue_length);
445 mq->queue_length--; 479 mq->queue_length--;
446 if (NULL == mq->envelope_head) 480 if (NULL == mq->envelope_head)
@@ -453,14 +487,12 @@ impl_send_continue (void *cls)
453 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 487 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
454 mq->envelope_tail, 488 mq->envelope_tail,
455 mq->current_envelope); 489 mq->current_envelope);
490 mq->send_notification_called = GNUNET_NO;
456 mq->send_impl (mq, 491 mq->send_impl (mq,
457 mq->current_envelope->mh, 492 mq->current_envelope->mh,
458 mq->impl_state); 493 mq->impl_state);
459 } 494 }
460 if (NULL != current_envelope->sent_cb) 495 GNUNET_free (current_envelope);
461 current_envelope->sent_cb (current_envelope->sent_cls);
462 /* also frees current_envelope */
463 GNUNET_free (current_envelope->mh);
464} 496}
465 497
466 498
@@ -474,9 +506,32 @@ impl_send_continue (void *cls)
474void 506void
475GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) 507GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
476{ 508{
477 GNUNET_assert (NULL == mq->continue_task); 509 /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
478 mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, 510 if (NULL != mq->send_task)
479 mq); 511 {
512 GNUNET_SCHEDULER_cancel (mq->send_task);
513 }
514 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
515 mq);
516}
517
518
519/**
520 * Call the send notification for the current message, but do not
521 * try to send the message until #gnunet_mq_impl_send_continue
522 * is called.
523 *
524 * only useful for implementing message queues, results in undefined
525 * behavior if not used carefully.
526 *
527 * @param mq message queue to send the next message with
528 */
529void
530GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
531{
532 GNUNET_assert (NULL == mq->send_task);
533 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
534 mq);
480} 535}
481 536
482 537
@@ -592,11 +647,9 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
592 uint16_t type) 647 uint16_t type)
593{ 648{
594 struct GNUNET_MQ_Envelope *ev; 649 struct GNUNET_MQ_Envelope *ev;
595 void *mem;
596 650
597 mem = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope)); 651 ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
598 ev = mem + size; 652 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
599 ev->mh = mem;
600 ev->mh->size = htons (size); 653 ev->mh->size = htons (size);
601 ev->mh->type = htons (type); 654 ev->mh->type = htons (type);
602 if (NULL != mhp) 655 if (NULL != mhp)
@@ -867,10 +920,10 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
867 GNUNET_CLIENT_notify_transmit_ready_cancel (state->th); 920 GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
868 state->th = NULL; 921 state->th = NULL;
869 } 922 }
870 else if (NULL != mq->continue_task) 923 else if (NULL != mq->send_task)
871 { 924 {
872 GNUNET_SCHEDULER_cancel (mq->continue_task); 925 GNUNET_SCHEDULER_cancel (mq->send_task);
873 mq->continue_task = NULL; 926 mq->send_task = NULL;
874 } 927 }
875 else 928 else
876 GNUNET_assert (0); 929 GNUNET_assert (0);
@@ -1028,10 +1081,10 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
1028 { 1081 {
1029 mq->destroy_impl (mq, mq->impl_state); 1082 mq->destroy_impl (mq, mq->impl_state);
1030 } 1083 }
1031 if (NULL != mq->continue_task) 1084 if (NULL != mq->send_task)
1032 { 1085 {
1033 GNUNET_SCHEDULER_cancel (mq->continue_task); 1086 GNUNET_SCHEDULER_cancel (mq->send_task);
1034 mq->continue_task = NULL; 1087 mq->send_task = NULL;
1035 } 1088 }
1036 while (NULL != mq->envelope_head) 1089 while (NULL != mq->envelope_head)
1037 { 1090 {
@@ -1136,6 +1189,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1136 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 1189 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1137 mq->envelope_tail, 1190 mq->envelope_tail,
1138 mq->current_envelope); 1191 mq->current_envelope);
1192 mq->send_notification_called = GNUNET_NO;
1139 mq->send_impl (mq, 1193 mq->send_impl (mq,
1140 mq->current_envelope->mh, 1194 mq->current_envelope->mh,
1141 mq->impl_state); 1195 mq->impl_state);
@@ -1154,8 +1208,9 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1154 if (GNUNET_YES != mq->evacuate_called) 1208 if (GNUNET_YES != mq->evacuate_called)
1155 { 1209 {
1156 ev->parent_queue = NULL; 1210 ev->parent_queue = NULL;
1211 ev->mh = NULL;
1157 /* also frees ev */ 1212 /* also frees ev */
1158 GNUNET_free (ev->mh); 1213 GNUNET_free (ev);
1159 } 1214 }
1160} 1215}
1161 1216
@@ -1299,34 +1354,4 @@ GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh
1299} 1354}
1300 1355
1301 1356
1302/**
1303 * Get the message that is currently being sent when cancellation of that
1304 * message is requested. The returned buffer must be freed by the caller.
1305 *
1306 * This function may be called at most once in the cancel_impl
1307 * function of a message queue.
1308 *
1309 * Use this function to avoid copying a half-sent message.
1310 *
1311 * @param mq message queue
1312 * @return pointer to store the message being canceled,
1313 * must be freed by the caller
1314 */
1315struct GNUNET_MessageHeader *
1316GNUNET_MQ_impl_cancel_evacuate (struct GNUNET_MQ_Handle *mq)
1317{
1318 struct GNUNET_MessageHeader *mh;
1319
1320 GNUNET_assert (GNUNET_NO == mq->evacuate_called);
1321 GNUNET_assert (NULL != mq->current_envelope);
1322
1323 mq->evacuate_called = GNUNET_YES;
1324 mh = mq->current_envelope->mh;
1325 mq->current_envelope->parent_queue = NULL;
1326 mq->current_envelope = NULL;
1327
1328 return mh;
1329}
1330
1331
1332/* end of mq.c */ 1357/* end of mq.c */