diff options
author | Christian Grothoff <christian@grothoff.org> | 2015-10-08 17:09:42 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2015-10-08 17:09:42 +0000 |
commit | b910380795760d26a7cc38510c1f313bb1a72ea7 (patch) | |
tree | adcb256486cf4b97284ccb21850197037a802ba7 /src/core/core_api.c | |
parent | 0f420b095bd839165613d0bd681f5a8b1614e24e (diff) | |
download | gnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.tar.gz gnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.zip |
redefining core message timeout semantics and core-core-API messaging to address (hypothetical?) stalls which may explain #3863. This kind-of breaks the API in that the timeout now has a different semantic -- it's an advisory as to when the app would like the message transmitted, the transmission will no longer actually time out. However, that was the documented semantics before, just not the actual behavior. Most applications didn't rely on it, and tests still pass, so I didn't break too much...
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r-- | src/core/core_api.c | 171 |
1 files changed, 57 insertions, 114 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c index 2d7d71c48..7338672d6 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -63,9 +63,15 @@ struct GNUNET_CORE_TransmitHandle | |||
63 | void *get_message_cls; | 63 | void *get_message_cls; |
64 | 64 | ||
65 | /** | 65 | /** |
66 | * Timeout for this handle. | 66 | * Deadline for the transmission (the request does not get cancelled |
67 | * at this time, this is merely how soon the application wants this out). | ||
67 | */ | 68 | */ |
68 | struct GNUNET_TIME_Absolute timeout; | 69 | struct GNUNET_TIME_Absolute deadline; |
70 | |||
71 | /** | ||
72 | * When did this request get queued? | ||
73 | */ | ||
74 | struct GNUNET_TIME_Absolute request_time; | ||
69 | 75 | ||
70 | /** | 76 | /** |
71 | * How important is this message? | 77 | * How important is this message? |
@@ -127,15 +133,9 @@ struct PeerRecord | |||
127 | struct GNUNET_PeerIdentity peer; | 133 | struct GNUNET_PeerIdentity peer; |
128 | 134 | ||
129 | /** | 135 | /** |
130 | * ID of timeout task for the 'pending_head' handle | 136 | * ID of task to run #next_request_transmission(). |
131 | * which is the one with the smallest timeout. | ||
132 | */ | ||
133 | struct GNUNET_SCHEDULER_Task * timeout_task; | ||
134 | |||
135 | /** | ||
136 | * ID of task to run 'next_request_transmission'. | ||
137 | */ | 137 | */ |
138 | struct GNUNET_SCHEDULER_Task * ntr_task; | 138 | struct GNUNET_SCHEDULER_Task *ntr_task; |
139 | 139 | ||
140 | /** | 140 | /** |
141 | * SendMessageRequest ID generator for this peer. | 141 | * SendMessageRequest ID generator for this peer. |
@@ -369,11 +369,6 @@ disconnect_and_free_peer_entry (void *cls, | |||
369 | struct GNUNET_CORE_TransmitHandle *th; | 369 | struct GNUNET_CORE_TransmitHandle *th; |
370 | struct PeerRecord *pr = value; | 370 | struct PeerRecord *pr = value; |
371 | 371 | ||
372 | if (NULL != pr->timeout_task) | ||
373 | { | ||
374 | GNUNET_SCHEDULER_cancel (pr->timeout_task); | ||
375 | pr->timeout_task = NULL; | ||
376 | } | ||
377 | if (NULL != pr->ntr_task) | 372 | if (NULL != pr->ntr_task) |
378 | { | 373 | { |
379 | GNUNET_SCHEDULER_cancel (pr->ntr_task); | 374 | GNUNET_SCHEDULER_cancel (pr->ntr_task); |
@@ -401,7 +396,6 @@ disconnect_and_free_peer_entry (void *cls, | |||
401 | GNUNET_assert (GNUNET_YES == | 396 | GNUNET_assert (GNUNET_YES == |
402 | GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); | 397 | GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); |
403 | GNUNET_assert (pr->ch == h); | 398 | GNUNET_assert (pr->ch == h); |
404 | GNUNET_assert (NULL == pr->timeout_task); | ||
405 | GNUNET_assert (NULL == pr->ntr_task); | 399 | GNUNET_assert (NULL == pr->ntr_task); |
406 | GNUNET_free (pr); | 400 | GNUNET_free (pr); |
407 | return GNUNET_YES; | 401 | return GNUNET_YES; |
@@ -471,18 +465,6 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, | |||
471 | 465 | ||
472 | 466 | ||
473 | /** | 467 | /** |
474 | * The given request hit its timeout. Remove from the | ||
475 | * doubly-linked list and call the respective continuation. | ||
476 | * | ||
477 | * @param cls the transmit handle of the request that timed out | ||
478 | * @param tc context, can be NULL (!) | ||
479 | */ | ||
480 | static void | ||
481 | transmission_timeout (void *cls, | ||
482 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
483 | |||
484 | |||
485 | /** | ||
486 | * Send a control message to the peer asking for transmission | 468 | * Send a control message to the peer asking for transmission |
487 | * of the message in the given peer record. | 469 | * of the message in the given peer record. |
488 | * | 470 | * |
@@ -496,25 +478,16 @@ request_next_transmission (struct PeerRecord *pr) | |||
496 | struct SendMessageRequest *smr; | 478 | struct SendMessageRequest *smr; |
497 | struct GNUNET_CORE_TransmitHandle *th; | 479 | struct GNUNET_CORE_TransmitHandle *th; |
498 | 480 | ||
499 | if (pr->timeout_task != NULL) | ||
500 | { | ||
501 | GNUNET_SCHEDULER_cancel (pr->timeout_task); | ||
502 | pr->timeout_task = NULL; | ||
503 | } | ||
504 | th = &pr->th; | 481 | th = &pr->th; |
505 | if (NULL == th->peer) | 482 | if (NULL == th->peer) |
506 | { | 483 | { |
507 | trigger_next_request (h, GNUNET_NO); | 484 | trigger_next_request (h, GNUNET_NO); |
508 | return; | 485 | return; |
509 | } | 486 | } |
510 | if (th->cm != NULL) | 487 | if (NULL != th->cm) |
511 | return; /* already done */ | 488 | return; /* already done */ |
512 | GNUNET_assert (pr->prev == NULL); | 489 | GNUNET_assert (NULL == pr->prev); |
513 | GNUNET_assert (pr->next == NULL); | 490 | GNUNET_assert (NULL == pr->next); |
514 | pr->timeout_task | ||
515 | = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout), | ||
516 | &transmission_timeout, | ||
517 | pr); | ||
518 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | 491 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + |
519 | sizeof (struct SendMessageRequest)); | 492 | sizeof (struct SendMessageRequest)); |
520 | th->cm = cm; | 493 | th->cm = cm; |
@@ -523,7 +496,7 @@ request_next_transmission (struct PeerRecord *pr) | |||
523 | smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); | 496 | smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); |
524 | smr->header.size = htons (sizeof (struct SendMessageRequest)); | 497 | smr->header.size = htons (sizeof (struct SendMessageRequest)); |
525 | smr->priority = htonl ((uint32_t) th->priority); | 498 | smr->priority = htonl ((uint32_t) th->priority); |
526 | smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); | 499 | smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); |
527 | smr->peer = pr->peer; | 500 | smr->peer = pr->peer; |
528 | smr->reserved = htonl (0); | 501 | smr->reserved = htonl (0); |
529 | smr->size = htons (th->msize); | 502 | smr->size = htons (th->msize); |
@@ -538,56 +511,6 @@ request_next_transmission (struct PeerRecord *pr) | |||
538 | 511 | ||
539 | 512 | ||
540 | /** | 513 | /** |
541 | * The given request hit its timeout. Remove from the | ||
542 | * doubly-linked list and call the respective continuation. | ||
543 | * | ||
544 | * @param cls the transmit handle of the request that timed out | ||
545 | * @param tc context, can be NULL (!) | ||
546 | */ | ||
547 | static void | ||
548 | transmission_timeout (void *cls, | ||
549 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
550 | { | ||
551 | struct PeerRecord *pr = cls; | ||
552 | struct GNUNET_CORE_Handle *h = pr->ch; | ||
553 | struct GNUNET_CORE_TransmitHandle *th; | ||
554 | |||
555 | pr->timeout_task = NULL; | ||
556 | if (NULL != pr->ntr_task) | ||
557 | { | ||
558 | GNUNET_SCHEDULER_cancel (pr->ntr_task); | ||
559 | pr->ntr_task = NULL; | ||
560 | } | ||
561 | th = &pr->th; | ||
562 | th->peer = NULL; | ||
563 | if ( (NULL != pr->prev) || | ||
564 | (NULL != pr->next) || | ||
565 | (pr == h->ready_peer_head) ) | ||
566 | { | ||
567 | /* the request that was 'approved' by core was | ||
568 | * canceled before it could be transmitted; remove | ||
569 | * us from the 'ready' list */ | ||
570 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
571 | h->ready_peer_tail, | ||
572 | pr); | ||
573 | } | ||
574 | if (NULL != th->cm) | ||
575 | { | ||
576 | /* we're currently in the control queue, remove */ | ||
577 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | ||
578 | h->control_pending_tail, th->cm); | ||
579 | GNUNET_free (th->cm); | ||
580 | } | ||
581 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
582 | "Signalling timeout of request for transmission to peer `%s' via CORE\n", | ||
583 | GNUNET_i2s (&pr->peer)); | ||
584 | trigger_next_request (h, GNUNET_NO); | ||
585 | |||
586 | GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); | ||
587 | } | ||
588 | |||
589 | |||
590 | /** | ||
591 | * Transmit the next message to the core service. | 514 | * Transmit the next message to the core service. |
592 | * | 515 | * |
593 | * @param cls closure with the `struct GNUNET_CORE_Handle` | 516 | * @param cls closure with the `struct GNUNET_CORE_Handle` |
@@ -603,6 +526,8 @@ transmit_message (void *cls, | |||
603 | struct GNUNET_CORE_Handle *h = cls; | 526 | struct GNUNET_CORE_Handle *h = cls; |
604 | struct ControlMessage *cm; | 527 | struct ControlMessage *cm; |
605 | struct GNUNET_CORE_TransmitHandle *th; | 528 | struct GNUNET_CORE_TransmitHandle *th; |
529 | struct GNUNET_TIME_Relative delay; | ||
530 | struct GNUNET_TIME_Relative overdue; | ||
606 | struct PeerRecord *pr; | 531 | struct PeerRecord *pr; |
607 | struct SendMessage *sm; | 532 | struct SendMessage *sm; |
608 | const struct GNUNET_MessageHeader *hdr; | 533 | const struct GNUNET_MessageHeader *hdr; |
@@ -657,11 +582,6 @@ transmit_message (void *cls, | |||
657 | h->ready_peer_tail, | 582 | h->ready_peer_tail, |
658 | pr); | 583 | pr); |
659 | th->peer = NULL; | 584 | th->peer = NULL; |
660 | if (NULL != pr->timeout_task) | ||
661 | { | ||
662 | GNUNET_SCHEDULER_cancel (pr->timeout_task); | ||
663 | pr->timeout_task = NULL; | ||
664 | } | ||
665 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 585 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
666 | "Transmitting SEND request to `%s' with %u bytes.\n", | 586 | "Transmitting SEND request to `%s' with %u bytes.\n", |
667 | GNUNET_i2s (&pr->peer), | 587 | GNUNET_i2s (&pr->peer), |
@@ -669,7 +589,7 @@ transmit_message (void *cls, | |||
669 | sm = (struct SendMessage *) buf; | 589 | sm = (struct SendMessage *) buf; |
670 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); | 590 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); |
671 | sm->priority = htonl ((uint32_t) th->priority); | 591 | sm->priority = htonl ((uint32_t) th->priority); |
672 | sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); | 592 | sm->deadline = GNUNET_TIME_absolute_hton (th->deadline); |
673 | sm->peer = pr->peer; | 593 | sm->peer = pr->peer; |
674 | sm->cork = htonl ((uint32_t) th->cork); | 594 | sm->cork = htonl ((uint32_t) th->cork); |
675 | sm->reserved = htonl (0); | 595 | sm->reserved = htonl (0); |
@@ -677,24 +597,43 @@ transmit_message (void *cls, | |||
677 | th->get_message (th->get_message_cls, | 597 | th->get_message (th->get_message_cls, |
678 | size - sizeof (struct SendMessage), | 598 | size - sizeof (struct SendMessage), |
679 | &sm[1]); | 599 | &sm[1]); |
680 | 600 | delay = GNUNET_TIME_absolute_get_duration (th->request_time); | |
681 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 601 | overdue = GNUNET_TIME_absolute_get_duration (th->deadline); |
682 | "Transmitting SEND request to `%s' yielded %u bytes.\n", | 602 | if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) |
683 | GNUNET_i2s (&pr->peer), | 603 | LOG (GNUNET_ERROR_TYPE_WARNING, |
684 | ret); | 604 | "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n", |
685 | if (0 == ret) | 605 | ret, |
606 | GNUNET_i2s (&pr->peer), | ||
607 | (unsigned int) th->priority, | ||
608 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
609 | GNUNET_YES), | ||
610 | (th->cork) ? " (corked)" : ""); | ||
611 | else | ||
612 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
613 | "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n", | ||
614 | ret, | ||
615 | GNUNET_i2s (&pr->peer), | ||
616 | (unsigned int) th->priority, | ||
617 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
618 | GNUNET_YES), | ||
619 | (th->cork) ? " (corked)" : ""); | ||
620 | if ( (0 == ret) && | ||
621 | (GNUNET_CORE_PRIO_BACKGROUND == th->priority) ) | ||
686 | { | 622 | { |
623 | /* client decided to send nothing; as the priority was | ||
624 | BACKGROUND, we can just not send anything to core. | ||
625 | For higher-priority messages, we must give an | ||
626 | empty message to CORE so that it knows that this | ||
627 | message is no longer pending. */ | ||
687 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 628 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
688 | "Size of clients message to peer %s is 0!\n", | 629 | "Size of clients message to peer %s is 0!\n", |
689 | GNUNET_i2s (&pr->peer)); | 630 | GNUNET_i2s (&pr->peer)); |
690 | /* client decided to send nothing! */ | ||
691 | request_next_transmission (pr); | 631 | request_next_transmission (pr); |
692 | return 0; | 632 | return 0; |
693 | } | 633 | } |
694 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 634 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
695 | "Produced SEND message to core with %u bytes payload\n", | 635 | "Produced SEND message to core with %u bytes payload\n", |
696 | (unsigned int) ret); | 636 | (unsigned int) ret); |
697 | GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); | ||
698 | if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 637 | if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
699 | { | 638 | { |
700 | GNUNET_break (0); | 639 | GNUNET_break (0); |
@@ -932,7 +871,7 @@ main_notify_handler (void *cls, | |||
932 | } | 871 | } |
933 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | 872 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; |
934 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 873 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
935 | "Received message of type %u and size %u from peer `%4s'\n", | 874 | "Received message of type %u and size %u from peer `%s'\n", |
936 | ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); | 875 | ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); |
937 | if ((GNUNET_NO == h->inbound_hdr_only) && | 876 | if ((GNUNET_NO == h->inbound_hdr_only) && |
938 | (msize != | 877 | (msize != |
@@ -951,7 +890,7 @@ main_notify_handler (void *cls, | |||
951 | if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) | 890 | if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) |
952 | { | 891 | { |
953 | LOG (GNUNET_ERROR_TYPE_ERROR, | 892 | LOG (GNUNET_ERROR_TYPE_ERROR, |
954 | "Unexpected message size %u for message of type %u from peer `%4s'\n", | 893 | "Unexpected message size %u for message of type %u from peer `%s'\n", |
955 | htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); | 894 | htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); |
956 | GNUNET_break_op (0); | 895 | GNUNET_break_op (0); |
957 | continue; | 896 | continue; |
@@ -1208,7 +1147,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1208 | h->currently_down = GNUNET_YES; | 1147 | h->currently_down = GNUNET_YES; |
1209 | h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); | 1148 | h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); |
1210 | if (NULL != handlers) | 1149 | if (NULL != handlers) |
1211 | while (handlers[h->hcnt].callback != NULL) | 1150 | while (NULL != handlers[h->hcnt].callback) |
1212 | h->hcnt++; | 1151 | h->hcnt++; |
1213 | GNUNET_assert (h->hcnt < | 1152 | GNUNET_assert (h->hcnt < |
1214 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - | 1153 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - |
@@ -1259,22 +1198,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) | |||
1259 | GNUNET_CONTAINER_multipeermap_iterate (handle->peers, | 1198 | GNUNET_CONTAINER_multipeermap_iterate (handle->peers, |
1260 | &disconnect_and_free_peer_entry, | 1199 | &disconnect_and_free_peer_entry, |
1261 | handle); | 1200 | handle); |
1262 | if (handle->reconnect_task != NULL) | 1201 | if (NULL != handle->reconnect_task) |
1263 | { | 1202 | { |
1264 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | 1203 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); |
1265 | handle->reconnect_task = NULL; | 1204 | handle->reconnect_task = NULL; |
1266 | } | 1205 | } |
1267 | GNUNET_CONTAINER_multipeermap_destroy (handle->peers); | 1206 | GNUNET_CONTAINER_multipeermap_destroy (handle->peers); |
1268 | handle->peers = NULL; | 1207 | handle->peers = NULL; |
1269 | GNUNET_break (handle->ready_peer_head == NULL); | 1208 | GNUNET_break (NULL == handle->ready_peer_head); |
1270 | GNUNET_free (handle); | 1209 | GNUNET_free (handle); |
1271 | } | 1210 | } |
1272 | 1211 | ||
1273 | 1212 | ||
1274 | /** | 1213 | /** |
1275 | * Task that calls 'request_next_transmission'. | 1214 | * Task that calls #request_next_transmission(). |
1276 | * | 1215 | * |
1277 | * @param cls the 'struct PeerRecord *' | 1216 | * @param cls the `struct PeerRecord *` |
1278 | * @param tc scheduler context | 1217 | * @param tc scheduler context |
1279 | */ | 1218 | */ |
1280 | static void | 1219 | static void |
@@ -1357,7 +1296,11 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1357 | th->peer = pr; | 1296 | th->peer = pr; |
1358 | th->get_message = notify; | 1297 | th->get_message = notify; |
1359 | th->get_message_cls = notify_cls; | 1298 | th->get_message_cls = notify_cls; |
1360 | th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); | 1299 | th->request_time = GNUNET_TIME_absolute_get (); |
1300 | if (GNUNET_YES == cork) | ||
1301 | th->deadline = GNUNET_TIME_relative_to_absolute (maxdelay); | ||
1302 | else | ||
1303 | th->deadline = th->request_time; | ||
1361 | th->priority = priority; | 1304 | th->priority = priority; |
1362 | th->msize = notify_size; | 1305 | th->msize = notify_size; |
1363 | th->cork = cork; | 1306 | th->cork = cork; |
@@ -1373,7 +1316,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1373 | /** | 1316 | /** |
1374 | * Cancel the specified transmission-ready notification. | 1317 | * Cancel the specified transmission-ready notification. |
1375 | * | 1318 | * |
1376 | * @param th handle that was returned by "notify_transmit_ready". | 1319 | * @param th handle that was returned by #GNUNET_CORE_notify_transmit_ready(). |
1377 | */ | 1320 | */ |
1378 | void | 1321 | void |
1379 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) | 1322 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) |