aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
committerGabor X Toth <*@tg-x.net>2015-09-27 21:04:34 +0000
commitcab1b047ddcac497e14515fc11f097c4aac8ee27 (patch)
tree7f4e14a8c77d76bef07cb4bbf6b94adcce44d53c /src/psycstore
parent51f530b98232f7a9947f29008d161ed0d8e23af4 (diff)
downloadgnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.tar.gz
gnunet-cab1b047ddcac497e14515fc11f097c4aac8ee27.zip
multicast/psyc/social: message acks & scheduling
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/psyc_util_lib.c41
1 files changed, 27 insertions, 14 deletions
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c
index 80e84f29c..9e6b1b770 100644
--- a/src/psycstore/psyc_util_lib.c
+++ b/src/psycstore/psyc_util_lib.c
@@ -101,6 +101,12 @@ struct GNUNET_PSYC_TransmitHandle
101 * Are we currently transmitting a message? 101 * Are we currently transmitting a message?
102 */ 102 */
103 uint8_t in_transmit; 103 uint8_t in_transmit;
104
105 /**
106 * Notify callback is currently being called.
107 */
108 uint8_t in_notify;
109
104}; 110};
105 111
106 112
@@ -334,20 +340,20 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
334 * Transmission handle. 340 * Transmission handle.
335 * @param msg 341 * @param msg
336 * Message part, or NULL. 342 * Message part, or NULL.
337 * @param end 343 * @param tmit_now
338 * End of message? 344 * Transmit message now, or wait for buffer to fill up?
339 * #GNUNET_YES or #GNUNET_NO. 345 * #GNUNET_YES or #GNUNET_NO.
340 */ 346 */
341static void 347static void
342transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, 348transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343 const struct GNUNET_MessageHeader *msg, 349 const struct GNUNET_MessageHeader *msg,
344 uint8_t end) 350 uint8_t tmit_now)
345{ 351{
346 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; 352 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
347 353
348 LOG (GNUNET_ERROR_TYPE_DEBUG, 354 LOG (GNUNET_ERROR_TYPE_DEBUG,
349 "Queueing message part of type %u and size %u (end: %u)).\n", 355 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
350 NULL != msg ? ntohs (msg->type) : 0, size, end); 356 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
351 357
352 if (NULL != tmit->msg) 358 if (NULL != tmit->msg)
353 { 359 {
@@ -380,7 +386,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
380 } 386 }
381 387
382 if (NULL != tmit->msg 388 if (NULL != tmit->msg
383 && (GNUNET_YES == end 389 && (GNUNET_YES == tmit_now
384 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 390 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
385 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) 391 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
386 { 392 {
@@ -391,9 +397,6 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
391 tmit->msg = NULL; 397 tmit->msg = NULL;
392 tmit->acks_pending++; 398 tmit->acks_pending++;
393 } 399 }
394
395 if (GNUNET_YES == end)
396 tmit->in_transmit = GNUNET_NO;
397} 400}
398 401
399 402
@@ -414,7 +417,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
414 if (NULL != tmit->notify_data) 417 if (NULL != tmit->notify_data)
415 { 418 {
416 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 419 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
420 tmit->in_notify = GNUNET_YES;
417 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); 421 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
422 tmit->in_notify = GNUNET_NO;
418 } 423 }
419 LOG (GNUNET_ERROR_TYPE_DEBUG, 424 LOG (GNUNET_ERROR_TYPE_DEBUG,
420 "transmit_data (ret: %d, size: %u): %.*s\n", 425 "transmit_data (ret: %d, size: %u): %.*s\n",
@@ -442,6 +447,7 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 447 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg)); 448 msg->size = htons (sizeof (*msg));
444 transmit_queue_insert (tmit, msg, GNUNET_YES); 449 transmit_queue_insert (tmit, msg, GNUNET_YES);
450 tmit->in_transmit = GNUNET_NO;
445 return; 451 return;
446 } 452 }
447 453
@@ -458,6 +464,8 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
458 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); 464 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
459 msg->size = htons (sizeof (*msg)); 465 msg->size = htons (sizeof (*msg));
460 transmit_queue_insert (tmit, msg, GNUNET_YES); 466 transmit_queue_insert (tmit, msg, GNUNET_YES);
467 /* FIXME: wait for ACK before setting in_transmit to no */
468 tmit->in_transmit = GNUNET_NO;
461 } 469 }
462} 470}
463 471
@@ -489,8 +497,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
489 { 497 {
490 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 498 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
491 data_size = max_data_size; 499 data_size = max_data_size;
500 tmit->in_notify = GNUNET_YES;
492 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], 501 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
493 &mod->oper, &mod->value_size); 502 &mod->oper, &mod->value_size);
503 tmit->in_notify = GNUNET_NO;
494 } 504 }
495 505
496 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; 506 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
@@ -520,8 +530,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
520 { 530 {
521 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 531 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
522 data_size = max_data_size; 532 data_size = max_data_size;
533 tmit->in_notify = GNUNET_YES;
523 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, 534 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
524 &data_size, &msg[1], NULL, NULL); 535 &data_size, &msg[1], NULL, NULL);
536 tmit->in_notify = GNUNET_NO;
525 } 537 }
526 tmit->mod_value_remaining -= data_size; 538 tmit->mod_value_remaining -= data_size;
527 LOG (GNUNET_ERROR_TYPE_DEBUG, 539 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -558,8 +570,8 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
558 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; 570 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
559 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 571 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
560 msg->size = htons (sizeof (*msg)); 572 msg->size = htons (sizeof (*msg));
561
562 transmit_queue_insert (tmit, msg, GNUNET_YES); 573 transmit_queue_insert (tmit, msg, GNUNET_YES);
574 tmit->in_transmit = GNUNET_NO;
563 return; 575 return;
564 } 576 }
565 577
@@ -748,6 +760,9 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
748void 760void
749GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) 761GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
750{ 762{
763 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
764 return;
765
751 if (0 == tmit->acks_pending) 766 if (0 == tmit->acks_pending)
752 { 767 {
753 tmit->paused = GNUNET_NO; 768 tmit->paused = GNUNET_NO;
@@ -800,13 +815,11 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
800 { 815 {
801 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: 816 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
802 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: 817 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
803 if (GNUNET_NO == tmit->paused) 818 transmit_mod (tmit);
804 transmit_mod (tmit);
805 break; 819 break;
806 820
807 case GNUNET_PSYC_MESSAGE_STATE_DATA: 821 case GNUNET_PSYC_MESSAGE_STATE_DATA:
808 if (GNUNET_NO == tmit->paused) 822 transmit_data (tmit);
809 transmit_data (tmit);
810 break; 823 break;
811 824
812 case GNUNET_PSYC_MESSAGE_STATE_END: 825 case GNUNET_PSYC_MESSAGE_STATE_END: