diff options
author | Gabor X Toth <*@tg-x.net> | 2015-09-27 21:04:34 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-09-27 21:04:34 +0000 |
commit | cab1b047ddcac497e14515fc11f097c4aac8ee27 (patch) | |
tree | 7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c /src/psycstore | |
parent | 51f530b98232f7a9947f29008d161ed0d8e23af4 (diff) | |
download | gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip |
multicast/psyc/social: message acks & scheduling
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/psyc_util_lib.c | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c index 80e84f29c..9e6b1b770 100644 --- a/src/psycstore/psyc_util_lib.c +++ b/src/psycstore/psyc_util_lib.c | |||
@@ -101,6 +101,12 @@ struct GNUNET_PSYC_TransmitHandle | |||
101 | * Are we currently transmitting a message? | 101 | * Are we currently transmitting a message? |
102 | */ | 102 | */ |
103 | uint8_t in_transmit; | 103 | uint8_t in_transmit; |
104 | |||
105 | /** | ||
106 | * Notify callback is currently being called. | ||
107 | */ | ||
108 | uint8_t in_notify; | ||
109 | |||
104 | }; | 110 | }; |
105 | 111 | ||
106 | 112 | ||
@@ -334,20 +340,20 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
334 | * Transmission handle. | 340 | * Transmission handle. |
335 | * @param msg | 341 | * @param msg |
336 | * Message part, or NULL. | 342 | * Message part, or NULL. |
337 | * @param end | 343 | * @param tmit_now |
338 | * End of message? | 344 | * Transmit message now, or wait for buffer to fill up? |
339 | * #GNUNET_YES or #GNUNET_NO. | 345 | * #GNUNET_YES or #GNUNET_NO. |
340 | */ | 346 | */ |
341 | static void | 347 | static void |
342 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | 348 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, |
343 | const struct GNUNET_MessageHeader *msg, | 349 | const struct GNUNET_MessageHeader *msg, |
344 | uint8_t end) | 350 | uint8_t tmit_now) |
345 | { | 351 | { |
346 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; | 352 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; |
347 | 353 | ||
348 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 354 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
349 | "Queueing message part of type %u and size %u (end: %u)).\n", | 355 | "Queueing message part of type %u and size %u (tmit_now: %u)).\n", |
350 | NULL != msg ? ntohs (msg->type) : 0, size, end); | 356 | NULL != msg ? ntohs (msg->type) : 0, size, tmit_now); |
351 | 357 | ||
352 | if (NULL != tmit->msg) | 358 | if (NULL != tmit->msg) |
353 | { | 359 | { |
@@ -380,7 +386,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
380 | } | 386 | } |
381 | 387 | ||
382 | if (NULL != tmit->msg | 388 | if (NULL != tmit->msg |
383 | && (GNUNET_YES == end | 389 | && (GNUNET_YES == tmit_now |
384 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 390 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
385 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 391 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) |
386 | { | 392 | { |
@@ -391,9 +397,6 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
391 | tmit->msg = NULL; | 397 | tmit->msg = NULL; |
392 | tmit->acks_pending++; | 398 | tmit->acks_pending++; |
393 | } | 399 | } |
394 | |||
395 | if (GNUNET_YES == end) | ||
396 | tmit->in_transmit = GNUNET_NO; | ||
397 | } | 400 | } |
398 | 401 | ||
399 | 402 | ||
@@ -414,7 +417,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
414 | if (NULL != tmit->notify_data) | 417 | if (NULL != tmit->notify_data) |
415 | { | 418 | { |
416 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 419 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
420 | tmit->in_notify = GNUNET_YES; | ||
417 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | 421 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); |
422 | tmit->in_notify = GNUNET_NO; | ||
418 | } | 423 | } |
419 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 424 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
420 | "transmit_data (ret: %d, size: %u): %.*s\n", | 425 | "transmit_data (ret: %d, size: %u): %.*s\n", |
@@ -442,6 +447,7 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 447 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
443 | msg->size = htons (sizeof (*msg)); | 448 | msg->size = htons (sizeof (*msg)); |
444 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 449 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
450 | tmit->in_transmit = GNUNET_NO; | ||
445 | return; | 451 | return; |
446 | } | 452 | } |
447 | 453 | ||
@@ -458,6 +464,8 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
458 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | 464 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); |
459 | msg->size = htons (sizeof (*msg)); | 465 | msg->size = htons (sizeof (*msg)); |
460 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 466 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
467 | /* FIXME: wait for ACK before setting in_transmit to no */ | ||
468 | tmit->in_transmit = GNUNET_NO; | ||
461 | } | 469 | } |
462 | } | 470 | } |
463 | 471 | ||
@@ -489,8 +497,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
489 | { | 497 | { |
490 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | 498 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
491 | data_size = max_data_size; | 499 | data_size = max_data_size; |
500 | tmit->in_notify = GNUNET_YES; | ||
492 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | 501 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], |
493 | &mod->oper, &mod->value_size); | 502 | &mod->oper, &mod->value_size); |
503 | tmit->in_notify = GNUNET_NO; | ||
494 | } | 504 | } |
495 | 505 | ||
496 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; | 506 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; |
@@ -520,8 +530,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
520 | { | 530 | { |
521 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | 531 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; |
522 | data_size = max_data_size; | 532 | data_size = max_data_size; |
533 | tmit->in_notify = GNUNET_YES; | ||
523 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | 534 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, |
524 | &data_size, &msg[1], NULL, NULL); | 535 | &data_size, &msg[1], NULL, NULL); |
536 | tmit->in_notify = GNUNET_NO; | ||
525 | } | 537 | } |
526 | tmit->mod_value_remaining -= data_size; | 538 | tmit->mod_value_remaining -= data_size; |
527 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 539 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -558,8 +570,8 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
558 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | 570 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; |
559 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 571 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
560 | msg->size = htons (sizeof (*msg)); | 572 | msg->size = htons (sizeof (*msg)); |
561 | |||
562 | transmit_queue_insert (tmit, msg, GNUNET_YES); | 573 | transmit_queue_insert (tmit, msg, GNUNET_YES); |
574 | tmit->in_transmit = GNUNET_NO; | ||
563 | return; | 575 | return; |
564 | } | 576 | } |
565 | 577 | ||
@@ -748,6 +760,9 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
748 | void | 760 | void |
749 | GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) | 761 | GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) |
750 | { | 762 | { |
763 | if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify) | ||
764 | return; | ||
765 | |||
751 | if (0 == tmit->acks_pending) | 766 | if (0 == tmit->acks_pending) |
752 | { | 767 | { |
753 | tmit->paused = GNUNET_NO; | 768 | tmit->paused = GNUNET_NO; |
@@ -800,13 +815,11 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
800 | { | 815 | { |
801 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: | 816 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: |
802 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: | 817 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: |
803 | if (GNUNET_NO == tmit->paused) | 818 | transmit_mod (tmit); |
804 | transmit_mod (tmit); | ||
805 | break; | 819 | break; |
806 | 820 | ||
807 | case GNUNET_PSYC_MESSAGE_STATE_DATA: | 821 | case GNUNET_PSYC_MESSAGE_STATE_DATA: |
808 | if (GNUNET_NO == tmit->paused) | 822 | transmit_data (tmit); |
809 | transmit_data (tmit); | ||
810 | break; | 823 | break; |
811 | 824 | ||
812 | case GNUNET_PSYC_MESSAGE_STATE_END: | 825 | case GNUNET_PSYC_MESSAGE_STATE_END: |