aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-08 17:09:42 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-08 17:09:42 +0000
commitb910380795760d26a7cc38510c1f313bb1a72ea7 (patch)
treeadcb256486cf4b97284ccb21850197037a802ba7 /src/core/core_api.c
parent0f420b095bd839165613d0bd681f5a8b1614e24e (diff)
downloadgnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.tar.gz
gnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.zip
redefining core message timeout semantics and core-core-API messaging to address (hypothetical?) stalls which may explain #3863. This kind-of breaks the API in that the timeout now has a different semantic -- it's an advisory as to when the app would like the message transmitted, the transmission will no longer actually time out. However, that was the documented semantics before, just not the actual behavior. Most applications didn't rely on it, and tests still pass, so I didn't break too much...
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r--src/core/core_api.c171
1 files changed, 57 insertions, 114 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 2d7d71c48..7338672d6 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -63,9 +63,15 @@ struct GNUNET_CORE_TransmitHandle
63 void *get_message_cls; 63 void *get_message_cls;
64 64
65 /** 65 /**
66 * Timeout for this handle. 66 * Deadline for the transmission (the request does not get cancelled
67 * at this time, this is merely how soon the application wants this out).
67 */ 68 */
68 struct GNUNET_TIME_Absolute timeout; 69 struct GNUNET_TIME_Absolute deadline;
70
71 /**
72 * When did this request get queued?
73 */
74 struct GNUNET_TIME_Absolute request_time;
69 75
70 /** 76 /**
71 * How important is this message? 77 * How important is this message?
@@ -127,15 +133,9 @@ struct PeerRecord
127 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity peer;
128 134
129 /** 135 /**
130 * ID of timeout task for the 'pending_head' handle 136 * ID of task to run #next_request_transmission().
131 * which is the one with the smallest timeout.
132 */
133 struct GNUNET_SCHEDULER_Task * timeout_task;
134
135 /**
136 * ID of task to run 'next_request_transmission'.
137 */ 137 */
138 struct GNUNET_SCHEDULER_Task * ntr_task; 138 struct GNUNET_SCHEDULER_Task *ntr_task;
139 139
140 /** 140 /**
141 * SendMessageRequest ID generator for this peer. 141 * SendMessageRequest ID generator for this peer.
@@ -369,11 +369,6 @@ disconnect_and_free_peer_entry (void *cls,
369 struct GNUNET_CORE_TransmitHandle *th; 369 struct GNUNET_CORE_TransmitHandle *th;
370 struct PeerRecord *pr = value; 370 struct PeerRecord *pr = value;
371 371
372 if (NULL != pr->timeout_task)
373 {
374 GNUNET_SCHEDULER_cancel (pr->timeout_task);
375 pr->timeout_task = NULL;
376 }
377 if (NULL != pr->ntr_task) 372 if (NULL != pr->ntr_task)
378 { 373 {
379 GNUNET_SCHEDULER_cancel (pr->ntr_task); 374 GNUNET_SCHEDULER_cancel (pr->ntr_task);
@@ -401,7 +396,6 @@ disconnect_and_free_peer_entry (void *cls,
401 GNUNET_assert (GNUNET_YES == 396 GNUNET_assert (GNUNET_YES ==
402 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); 397 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr));
403 GNUNET_assert (pr->ch == h); 398 GNUNET_assert (pr->ch == h);
404 GNUNET_assert (NULL == pr->timeout_task);
405 GNUNET_assert (NULL == pr->ntr_task); 399 GNUNET_assert (NULL == pr->ntr_task);
406 GNUNET_free (pr); 400 GNUNET_free (pr);
407 return GNUNET_YES; 401 return GNUNET_YES;
@@ -471,18 +465,6 @@ trigger_next_request (struct GNUNET_CORE_Handle *h,
471 465
472 466
473/** 467/**
474 * The given request hit its timeout. Remove from the
475 * doubly-linked list and call the respective continuation.
476 *
477 * @param cls the transmit handle of the request that timed out
478 * @param tc context, can be NULL (!)
479 */
480static void
481transmission_timeout (void *cls,
482 const struct GNUNET_SCHEDULER_TaskContext *tc);
483
484
485/**
486 * Send a control message to the peer asking for transmission 468 * Send a control message to the peer asking for transmission
487 * of the message in the given peer record. 469 * of the message in the given peer record.
488 * 470 *
@@ -496,25 +478,16 @@ request_next_transmission (struct PeerRecord *pr)
496 struct SendMessageRequest *smr; 478 struct SendMessageRequest *smr;
497 struct GNUNET_CORE_TransmitHandle *th; 479 struct GNUNET_CORE_TransmitHandle *th;
498 480
499 if (pr->timeout_task != NULL)
500 {
501 GNUNET_SCHEDULER_cancel (pr->timeout_task);
502 pr->timeout_task = NULL;
503 }
504 th = &pr->th; 481 th = &pr->th;
505 if (NULL == th->peer) 482 if (NULL == th->peer)
506 { 483 {
507 trigger_next_request (h, GNUNET_NO); 484 trigger_next_request (h, GNUNET_NO);
508 return; 485 return;
509 } 486 }
510 if (th->cm != NULL) 487 if (NULL != th->cm)
511 return; /* already done */ 488 return; /* already done */
512 GNUNET_assert (pr->prev == NULL); 489 GNUNET_assert (NULL == pr->prev);
513 GNUNET_assert (pr->next == NULL); 490 GNUNET_assert (NULL == pr->next);
514 pr->timeout_task
515 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
516 &transmission_timeout,
517 pr);
518 cm = GNUNET_malloc (sizeof (struct ControlMessage) + 491 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
519 sizeof (struct SendMessageRequest)); 492 sizeof (struct SendMessageRequest));
520 th->cm = cm; 493 th->cm = cm;
@@ -523,7 +496,7 @@ request_next_transmission (struct PeerRecord *pr)
523 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); 496 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
524 smr->header.size = htons (sizeof (struct SendMessageRequest)); 497 smr->header.size = htons (sizeof (struct SendMessageRequest));
525 smr->priority = htonl ((uint32_t) th->priority); 498 smr->priority = htonl ((uint32_t) th->priority);
526 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); 499 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline);
527 smr->peer = pr->peer; 500 smr->peer = pr->peer;
528 smr->reserved = htonl (0); 501 smr->reserved = htonl (0);
529 smr->size = htons (th->msize); 502 smr->size = htons (th->msize);
@@ -538,56 +511,6 @@ request_next_transmission (struct PeerRecord *pr)
538 511
539 512
540/** 513/**
541 * The given request hit its timeout. Remove from the
542 * doubly-linked list and call the respective continuation.
543 *
544 * @param cls the transmit handle of the request that timed out
545 * @param tc context, can be NULL (!)
546 */
547static void
548transmission_timeout (void *cls,
549 const struct GNUNET_SCHEDULER_TaskContext *tc)
550{
551 struct PeerRecord *pr = cls;
552 struct GNUNET_CORE_Handle *h = pr->ch;
553 struct GNUNET_CORE_TransmitHandle *th;
554
555 pr->timeout_task = NULL;
556 if (NULL != pr->ntr_task)
557 {
558 GNUNET_SCHEDULER_cancel (pr->ntr_task);
559 pr->ntr_task = NULL;
560 }
561 th = &pr->th;
562 th->peer = NULL;
563 if ( (NULL != pr->prev) ||
564 (NULL != pr->next) ||
565 (pr == h->ready_peer_head) )
566 {
567 /* the request that was 'approved' by core was
568 * canceled before it could be transmitted; remove
569 * us from the 'ready' list */
570 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
571 h->ready_peer_tail,
572 pr);
573 }
574 if (NULL != th->cm)
575 {
576 /* we're currently in the control queue, remove */
577 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
578 h->control_pending_tail, th->cm);
579 GNUNET_free (th->cm);
580 }
581 LOG (GNUNET_ERROR_TYPE_DEBUG,
582 "Signalling timeout of request for transmission to peer `%s' via CORE\n",
583 GNUNET_i2s (&pr->peer));
584 trigger_next_request (h, GNUNET_NO);
585
586 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
587}
588
589
590/**
591 * Transmit the next message to the core service. 514 * Transmit the next message to the core service.
592 * 515 *
593 * @param cls closure with the `struct GNUNET_CORE_Handle` 516 * @param cls closure with the `struct GNUNET_CORE_Handle`
@@ -603,6 +526,8 @@ transmit_message (void *cls,
603 struct GNUNET_CORE_Handle *h = cls; 526 struct GNUNET_CORE_Handle *h = cls;
604 struct ControlMessage *cm; 527 struct ControlMessage *cm;
605 struct GNUNET_CORE_TransmitHandle *th; 528 struct GNUNET_CORE_TransmitHandle *th;
529 struct GNUNET_TIME_Relative delay;
530 struct GNUNET_TIME_Relative overdue;
606 struct PeerRecord *pr; 531 struct PeerRecord *pr;
607 struct SendMessage *sm; 532 struct SendMessage *sm;
608 const struct GNUNET_MessageHeader *hdr; 533 const struct GNUNET_MessageHeader *hdr;
@@ -657,11 +582,6 @@ transmit_message (void *cls,
657 h->ready_peer_tail, 582 h->ready_peer_tail,
658 pr); 583 pr);
659 th->peer = NULL; 584 th->peer = NULL;
660 if (NULL != pr->timeout_task)
661 {
662 GNUNET_SCHEDULER_cancel (pr->timeout_task);
663 pr->timeout_task = NULL;
664 }
665 LOG (GNUNET_ERROR_TYPE_DEBUG, 585 LOG (GNUNET_ERROR_TYPE_DEBUG,
666 "Transmitting SEND request to `%s' with %u bytes.\n", 586 "Transmitting SEND request to `%s' with %u bytes.\n",
667 GNUNET_i2s (&pr->peer), 587 GNUNET_i2s (&pr->peer),
@@ -669,7 +589,7 @@ transmit_message (void *cls,
669 sm = (struct SendMessage *) buf; 589 sm = (struct SendMessage *) buf;
670 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); 590 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
671 sm->priority = htonl ((uint32_t) th->priority); 591 sm->priority = htonl ((uint32_t) th->priority);
672 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); 592 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
673 sm->peer = pr->peer; 593 sm->peer = pr->peer;
674 sm->cork = htonl ((uint32_t) th->cork); 594 sm->cork = htonl ((uint32_t) th->cork);
675 sm->reserved = htonl (0); 595 sm->reserved = htonl (0);
@@ -677,24 +597,43 @@ transmit_message (void *cls,
677 th->get_message (th->get_message_cls, 597 th->get_message (th->get_message_cls,
678 size - sizeof (struct SendMessage), 598 size - sizeof (struct SendMessage),
679 &sm[1]); 599 &sm[1]);
680 600 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 601 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
682 "Transmitting SEND request to `%s' yielded %u bytes.\n", 602 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
683 GNUNET_i2s (&pr->peer), 603 LOG (GNUNET_ERROR_TYPE_WARNING,
684 ret); 604 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n",
685 if (0 == ret) 605 ret,
606 GNUNET_i2s (&pr->peer),
607 (unsigned int) th->priority,
608 GNUNET_STRINGS_relative_time_to_string (delay,
609 GNUNET_YES),
610 (th->cork) ? " (corked)" : "");
611 else
612 LOG (GNUNET_ERROR_TYPE_DEBUG,
613 "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n",
614 ret,
615 GNUNET_i2s (&pr->peer),
616 (unsigned int) th->priority,
617 GNUNET_STRINGS_relative_time_to_string (delay,
618 GNUNET_YES),
619 (th->cork) ? " (corked)" : "");
620 if ( (0 == ret) &&
621 (GNUNET_CORE_PRIO_BACKGROUND == th->priority) )
686 { 622 {
623 /* client decided to send nothing; as the priority was
624 BACKGROUND, we can just not send anything to core.
625 For higher-priority messages, we must give an
626 empty message to CORE so that it knows that this
627 message is no longer pending. */
687 LOG (GNUNET_ERROR_TYPE_DEBUG, 628 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Size of clients message to peer %s is 0!\n", 629 "Size of clients message to peer %s is 0!\n",
689 GNUNET_i2s (&pr->peer)); 630 GNUNET_i2s (&pr->peer));
690 /* client decided to send nothing! */
691 request_next_transmission (pr); 631 request_next_transmission (pr);
692 return 0; 632 return 0;
693 } 633 }
694 LOG (GNUNET_ERROR_TYPE_DEBUG, 634 LOG (GNUNET_ERROR_TYPE_DEBUG,
695 "Produced SEND message to core with %u bytes payload\n", 635 "Produced SEND message to core with %u bytes payload\n",
696 (unsigned int) ret); 636 (unsigned int) ret);
697 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
698 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 637 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
699 { 638 {
700 GNUNET_break (0); 639 GNUNET_break (0);
@@ -932,7 +871,7 @@ main_notify_handler (void *cls,
932 } 871 }
933 em = (const struct GNUNET_MessageHeader *) &ntm[1]; 872 em = (const struct GNUNET_MessageHeader *) &ntm[1];
934 LOG (GNUNET_ERROR_TYPE_DEBUG, 873 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received message of type %u and size %u from peer `%4s'\n", 874 "Received message of type %u and size %u from peer `%s'\n",
936 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); 875 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
937 if ((GNUNET_NO == h->inbound_hdr_only) && 876 if ((GNUNET_NO == h->inbound_hdr_only) &&
938 (msize != 877 (msize !=
@@ -951,7 +890,7 @@ main_notify_handler (void *cls,
951 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) 890 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
952 { 891 {
953 LOG (GNUNET_ERROR_TYPE_ERROR, 892 LOG (GNUNET_ERROR_TYPE_ERROR,
954 "Unexpected message size %u for message of type %u from peer `%4s'\n", 893 "Unexpected message size %u for message of type %u from peer `%s'\n",
955 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); 894 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
956 GNUNET_break_op (0); 895 GNUNET_break_op (0);
957 continue; 896 continue;
@@ -1208,7 +1147,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1208 h->currently_down = GNUNET_YES; 1147 h->currently_down = GNUNET_YES;
1209 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 1148 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1210 if (NULL != handlers) 1149 if (NULL != handlers)
1211 while (handlers[h->hcnt].callback != NULL) 1150 while (NULL != handlers[h->hcnt].callback)
1212 h->hcnt++; 1151 h->hcnt++;
1213 GNUNET_assert (h->hcnt < 1152 GNUNET_assert (h->hcnt <
1214 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1153 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
@@ -1259,22 +1198,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1259 GNUNET_CONTAINER_multipeermap_iterate (handle->peers, 1198 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
1260 &disconnect_and_free_peer_entry, 1199 &disconnect_and_free_peer_entry,
1261 handle); 1200 handle);
1262 if (handle->reconnect_task != NULL) 1201 if (NULL != handle->reconnect_task)
1263 { 1202 {
1264 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 1203 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1265 handle->reconnect_task = NULL; 1204 handle->reconnect_task = NULL;
1266 } 1205 }
1267 GNUNET_CONTAINER_multipeermap_destroy (handle->peers); 1206 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
1268 handle->peers = NULL; 1207 handle->peers = NULL;
1269 GNUNET_break (handle->ready_peer_head == NULL); 1208 GNUNET_break (NULL == handle->ready_peer_head);
1270 GNUNET_free (handle); 1209 GNUNET_free (handle);
1271} 1210}
1272 1211
1273 1212
1274/** 1213/**
1275 * Task that calls 'request_next_transmission'. 1214 * Task that calls #request_next_transmission().
1276 * 1215 *
1277 * @param cls the 'struct PeerRecord *' 1216 * @param cls the `struct PeerRecord *`
1278 * @param tc scheduler context 1217 * @param tc scheduler context
1279 */ 1218 */
1280static void 1219static void
@@ -1357,7 +1296,11 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1357 th->peer = pr; 1296 th->peer = pr;
1358 th->get_message = notify; 1297 th->get_message = notify;
1359 th->get_message_cls = notify_cls; 1298 th->get_message_cls = notify_cls;
1360 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1299 th->request_time = GNUNET_TIME_absolute_get ();
1300 if (GNUNET_YES == cork)
1301 th->deadline = GNUNET_TIME_relative_to_absolute (maxdelay);
1302 else
1303 th->deadline = th->request_time;
1361 th->priority = priority; 1304 th->priority = priority;
1362 th->msize = notify_size; 1305 th->msize = notify_size;
1363 th->cork = cork; 1306 th->cork = cork;
@@ -1373,7 +1316,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1373/** 1316/**
1374 * Cancel the specified transmission-ready notification. 1317 * Cancel the specified transmission-ready notification.
1375 * 1318 *
1376 * @param th handle that was returned by "notify_transmit_ready". 1319 * @param th handle that was returned by #GNUNET_CORE_notify_transmit_ready().
1377 */ 1320 */
1378void 1321void
1379GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) 1322GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)