aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r--src/core/core_api.c171
1 files changed, 57 insertions, 114 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 2d7d71c48..7338672d6 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -63,9 +63,15 @@ struct GNUNET_CORE_TransmitHandle
63 void *get_message_cls; 63 void *get_message_cls;
64 64
65 /** 65 /**
66 * Timeout for this handle. 66 * Deadline for the transmission (the request does not get cancelled
67 * at this time, this is merely how soon the application wants this out).
67 */ 68 */
68 struct GNUNET_TIME_Absolute timeout; 69 struct GNUNET_TIME_Absolute deadline;
70
71 /**
72 * When did this request get queued?
73 */
74 struct GNUNET_TIME_Absolute request_time;
69 75
70 /** 76 /**
71 * How important is this message? 77 * How important is this message?
@@ -127,15 +133,9 @@ struct PeerRecord
127 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity peer;
128 134
129 /** 135 /**
130 * ID of timeout task for the 'pending_head' handle 136 * ID of task to run #next_request_transmission().
131 * which is the one with the smallest timeout.
132 */
133 struct GNUNET_SCHEDULER_Task * timeout_task;
134
135 /**
136 * ID of task to run 'next_request_transmission'.
137 */ 137 */
138 struct GNUNET_SCHEDULER_Task * ntr_task; 138 struct GNUNET_SCHEDULER_Task *ntr_task;
139 139
140 /** 140 /**
141 * SendMessageRequest ID generator for this peer. 141 * SendMessageRequest ID generator for this peer.
@@ -369,11 +369,6 @@ disconnect_and_free_peer_entry (void *cls,
369 struct GNUNET_CORE_TransmitHandle *th; 369 struct GNUNET_CORE_TransmitHandle *th;
370 struct PeerRecord *pr = value; 370 struct PeerRecord *pr = value;
371 371
372 if (NULL != pr->timeout_task)
373 {
374 GNUNET_SCHEDULER_cancel (pr->timeout_task);
375 pr->timeout_task = NULL;
376 }
377 if (NULL != pr->ntr_task) 372 if (NULL != pr->ntr_task)
378 { 373 {
379 GNUNET_SCHEDULER_cancel (pr->ntr_task); 374 GNUNET_SCHEDULER_cancel (pr->ntr_task);
@@ -401,7 +396,6 @@ disconnect_and_free_peer_entry (void *cls,
401 GNUNET_assert (GNUNET_YES == 396 GNUNET_assert (GNUNET_YES ==
402 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); 397 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr));
403 GNUNET_assert (pr->ch == h); 398 GNUNET_assert (pr->ch == h);
404 GNUNET_assert (NULL == pr->timeout_task);
405 GNUNET_assert (NULL == pr->ntr_task); 399 GNUNET_assert (NULL == pr->ntr_task);
406 GNUNET_free (pr); 400 GNUNET_free (pr);
407 return GNUNET_YES; 401 return GNUNET_YES;
@@ -471,18 +465,6 @@ trigger_next_request (struct GNUNET_CORE_Handle *h,
471 465
472 466
473/** 467/**
474 * The given request hit its timeout. Remove from the
475 * doubly-linked list and call the respective continuation.
476 *
477 * @param cls the transmit handle of the request that timed out
478 * @param tc context, can be NULL (!)
479 */
480static void
481transmission_timeout (void *cls,
482 const struct GNUNET_SCHEDULER_TaskContext *tc);
483
484
485/**
486 * Send a control message to the peer asking for transmission 468 * Send a control message to the peer asking for transmission
487 * of the message in the given peer record. 469 * of the message in the given peer record.
488 * 470 *
@@ -496,25 +478,16 @@ request_next_transmission (struct PeerRecord *pr)
496 struct SendMessageRequest *smr; 478 struct SendMessageRequest *smr;
497 struct GNUNET_CORE_TransmitHandle *th; 479 struct GNUNET_CORE_TransmitHandle *th;
498 480
499 if (pr->timeout_task != NULL)
500 {
501 GNUNET_SCHEDULER_cancel (pr->timeout_task);
502 pr->timeout_task = NULL;
503 }
504 th = &pr->th; 481 th = &pr->th;
505 if (NULL == th->peer) 482 if (NULL == th->peer)
506 { 483 {
507 trigger_next_request (h, GNUNET_NO); 484 trigger_next_request (h, GNUNET_NO);
508 return; 485 return;
509 } 486 }
510 if (th->cm != NULL) 487 if (NULL != th->cm)
511 return; /* already done */ 488 return; /* already done */
512 GNUNET_assert (pr->prev == NULL); 489 GNUNET_assert (NULL == pr->prev);
513 GNUNET_assert (pr->next == NULL); 490 GNUNET_assert (NULL == pr->next);
514 pr->timeout_task
515 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
516 &transmission_timeout,
517 pr);
518 cm = GNUNET_malloc (sizeof (struct ControlMessage) + 491 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
519 sizeof (struct SendMessageRequest)); 492 sizeof (struct SendMessageRequest));
520 th->cm = cm; 493 th->cm = cm;
@@ -523,7 +496,7 @@ request_next_transmission (struct PeerRecord *pr)
523 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); 496 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
524 smr->header.size = htons (sizeof (struct SendMessageRequest)); 497 smr->header.size = htons (sizeof (struct SendMessageRequest));
525 smr->priority = htonl ((uint32_t) th->priority); 498 smr->priority = htonl ((uint32_t) th->priority);
526 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); 499 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline);
527 smr->peer = pr->peer; 500 smr->peer = pr->peer;
528 smr->reserved = htonl (0); 501 smr->reserved = htonl (0);
529 smr->size = htons (th->msize); 502 smr->size = htons (th->msize);
@@ -538,56 +511,6 @@ request_next_transmission (struct PeerRecord *pr)
538 511
539 512
540/** 513/**
541 * The given request hit its timeout. Remove from the
542 * doubly-linked list and call the respective continuation.
543 *
544 * @param cls the transmit handle of the request that timed out
545 * @param tc context, can be NULL (!)
546 */
547static void
548transmission_timeout (void *cls,
549 const struct GNUNET_SCHEDULER_TaskContext *tc)
550{
551 struct PeerRecord *pr = cls;
552 struct GNUNET_CORE_Handle *h = pr->ch;
553 struct GNUNET_CORE_TransmitHandle *th;
554
555 pr->timeout_task = NULL;
556 if (NULL != pr->ntr_task)
557 {
558 GNUNET_SCHEDULER_cancel (pr->ntr_task);
559 pr->ntr_task = NULL;
560 }
561 th = &pr->th;
562 th->peer = NULL;
563 if ( (NULL != pr->prev) ||
564 (NULL != pr->next) ||
565 (pr == h->ready_peer_head) )
566 {
567 /* the request that was 'approved' by core was
568 * canceled before it could be transmitted; remove
569 * us from the 'ready' list */
570 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
571 h->ready_peer_tail,
572 pr);
573 }
574 if (NULL != th->cm)
575 {
576 /* we're currently in the control queue, remove */
577 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
578 h->control_pending_tail, th->cm);
579 GNUNET_free (th->cm);
580 }
581 LOG (GNUNET_ERROR_TYPE_DEBUG,
582 "Signalling timeout of request for transmission to peer `%s' via CORE\n",
583 GNUNET_i2s (&pr->peer));
584 trigger_next_request (h, GNUNET_NO);
585
586 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
587}
588
589
590/**
591 * Transmit the next message to the core service. 514 * Transmit the next message to the core service.
592 * 515 *
593 * @param cls closure with the `struct GNUNET_CORE_Handle` 516 * @param cls closure with the `struct GNUNET_CORE_Handle`
@@ -603,6 +526,8 @@ transmit_message (void *cls,
603 struct GNUNET_CORE_Handle *h = cls; 526 struct GNUNET_CORE_Handle *h = cls;
604 struct ControlMessage *cm; 527 struct ControlMessage *cm;
605 struct GNUNET_CORE_TransmitHandle *th; 528 struct GNUNET_CORE_TransmitHandle *th;
529 struct GNUNET_TIME_Relative delay;
530 struct GNUNET_TIME_Relative overdue;
606 struct PeerRecord *pr; 531 struct PeerRecord *pr;
607 struct SendMessage *sm; 532 struct SendMessage *sm;
608 const struct GNUNET_MessageHeader *hdr; 533 const struct GNUNET_MessageHeader *hdr;
@@ -657,11 +582,6 @@ transmit_message (void *cls,
657 h->ready_peer_tail, 582 h->ready_peer_tail,
658 pr); 583 pr);
659 th->peer = NULL; 584 th->peer = NULL;
660 if (NULL != pr->timeout_task)
661 {
662 GNUNET_SCHEDULER_cancel (pr->timeout_task);
663 pr->timeout_task = NULL;
664 }
665 LOG (GNUNET_ERROR_TYPE_DEBUG, 585 LOG (GNUNET_ERROR_TYPE_DEBUG,
666 "Transmitting SEND request to `%s' with %u bytes.\n", 586 "Transmitting SEND request to `%s' with %u bytes.\n",
667 GNUNET_i2s (&pr->peer), 587 GNUNET_i2s (&pr->peer),
@@ -669,7 +589,7 @@ transmit_message (void *cls,
669 sm = (struct SendMessage *) buf; 589 sm = (struct SendMessage *) buf;
670 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); 590 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
671 sm->priority = htonl ((uint32_t) th->priority); 591 sm->priority = htonl ((uint32_t) th->priority);
672 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); 592 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
673 sm->peer = pr->peer; 593 sm->peer = pr->peer;
674 sm->cork = htonl ((uint32_t) th->cork); 594 sm->cork = htonl ((uint32_t) th->cork);
675 sm->reserved = htonl (0); 595 sm->reserved = htonl (0);
@@ -677,24 +597,43 @@ transmit_message (void *cls,
677 th->get_message (th->get_message_cls, 597 th->get_message (th->get_message_cls,
678 size - sizeof (struct SendMessage), 598 size - sizeof (struct SendMessage),
679 &sm[1]); 599 &sm[1]);
680 600 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 601 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
682 "Transmitting SEND request to `%s' yielded %u bytes.\n", 602 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
683 GNUNET_i2s (&pr->peer), 603 LOG (GNUNET_ERROR_TYPE_WARNING,
684 ret); 604 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n",
685 if (0 == ret) 605 ret,
606 GNUNET_i2s (&pr->peer),
607 (unsigned int) th->priority,
608 GNUNET_STRINGS_relative_time_to_string (delay,
609 GNUNET_YES),
610 (th->cork) ? " (corked)" : "");
611 else
612 LOG (GNUNET_ERROR_TYPE_DEBUG,
613 "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n",
614 ret,
615 GNUNET_i2s (&pr->peer),
616 (unsigned int) th->priority,
617 GNUNET_STRINGS_relative_time_to_string (delay,
618 GNUNET_YES),
619 (th->cork) ? " (corked)" : "");
620 if ( (0 == ret) &&
621 (GNUNET_CORE_PRIO_BACKGROUND == th->priority) )
686 { 622 {
623 /* client decided to send nothing; as the priority was
624 BACKGROUND, we can just not send anything to core.
625 For higher-priority messages, we must give an
626 empty message to CORE so that it knows that this
627 message is no longer pending. */
687 LOG (GNUNET_ERROR_TYPE_DEBUG, 628 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Size of clients message to peer %s is 0!\n", 629 "Size of clients message to peer %s is 0!\n",
689 GNUNET_i2s (&pr->peer)); 630 GNUNET_i2s (&pr->peer));
690 /* client decided to send nothing! */
691 request_next_transmission (pr); 631 request_next_transmission (pr);
692 return 0; 632 return 0;
693 } 633 }
694 LOG (GNUNET_ERROR_TYPE_DEBUG, 634 LOG (GNUNET_ERROR_TYPE_DEBUG,
695 "Produced SEND message to core with %u bytes payload\n", 635 "Produced SEND message to core with %u bytes payload\n",
696 (unsigned int) ret); 636 (unsigned int) ret);
697 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
698 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 637 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
699 { 638 {
700 GNUNET_break (0); 639 GNUNET_break (0);
@@ -932,7 +871,7 @@ main_notify_handler (void *cls,
932 } 871 }
933 em = (const struct GNUNET_MessageHeader *) &ntm[1]; 872 em = (const struct GNUNET_MessageHeader *) &ntm[1];
934 LOG (GNUNET_ERROR_TYPE_DEBUG, 873 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received message of type %u and size %u from peer `%4s'\n", 874 "Received message of type %u and size %u from peer `%s'\n",
936 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); 875 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
937 if ((GNUNET_NO == h->inbound_hdr_only) && 876 if ((GNUNET_NO == h->inbound_hdr_only) &&
938 (msize != 877 (msize !=
@@ -951,7 +890,7 @@ main_notify_handler (void *cls,
951 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) 890 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
952 { 891 {
953 LOG (GNUNET_ERROR_TYPE_ERROR, 892 LOG (GNUNET_ERROR_TYPE_ERROR,
954 "Unexpected message size %u for message of type %u from peer `%4s'\n", 893 "Unexpected message size %u for message of type %u from peer `%s'\n",
955 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); 894 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
956 GNUNET_break_op (0); 895 GNUNET_break_op (0);
957 continue; 896 continue;
@@ -1208,7 +1147,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1208 h->currently_down = GNUNET_YES; 1147 h->currently_down = GNUNET_YES;
1209 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 1148 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1210 if (NULL != handlers) 1149 if (NULL != handlers)
1211 while (handlers[h->hcnt].callback != NULL) 1150 while (NULL != handlers[h->hcnt].callback)
1212 h->hcnt++; 1151 h->hcnt++;
1213 GNUNET_assert (h->hcnt < 1152 GNUNET_assert (h->hcnt <
1214 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1153 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
@@ -1259,22 +1198,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1259 GNUNET_CONTAINER_multipeermap_iterate (handle->peers, 1198 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
1260 &disconnect_and_free_peer_entry, 1199 &disconnect_and_free_peer_entry,
1261 handle); 1200 handle);
1262 if (handle->reconnect_task != NULL) 1201 if (NULL != handle->reconnect_task)
1263 { 1202 {
1264 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 1203 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1265 handle->reconnect_task = NULL; 1204 handle->reconnect_task = NULL;
1266 } 1205 }
1267 GNUNET_CONTAINER_multipeermap_destroy (handle->peers); 1206 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
1268 handle->peers = NULL; 1207 handle->peers = NULL;
1269 GNUNET_break (handle->ready_peer_head == NULL); 1208 GNUNET_break (NULL == handle->ready_peer_head);
1270 GNUNET_free (handle); 1209 GNUNET_free (handle);
1271} 1210}
1272 1211
1273 1212
1274/** 1213/**
1275 * Task that calls 'request_next_transmission'. 1214 * Task that calls #request_next_transmission().
1276 * 1215 *
1277 * @param cls the 'struct PeerRecord *' 1216 * @param cls the `struct PeerRecord *`
1278 * @param tc scheduler context 1217 * @param tc scheduler context
1279 */ 1218 */
1280static void 1219static void
@@ -1357,7 +1296,11 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1357 th->peer = pr; 1296 th->peer = pr;
1358 th->get_message = notify; 1297 th->get_message = notify;
1359 th->get_message_cls = notify_cls; 1298 th->get_message_cls = notify_cls;
1360 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1299 th->request_time = GNUNET_TIME_absolute_get ();
1300 if (GNUNET_YES == cork)
1301 th->deadline = GNUNET_TIME_relative_to_absolute (maxdelay);
1302 else
1303 th->deadline = th->request_time;
1361 th->priority = priority; 1304 th->priority = priority;
1362 th->msize = notify_size; 1305 th->msize = notify_size;
1363 th->cork = cork; 1306 th->cork = cork;
@@ -1373,7 +1316,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1373/** 1316/**
1374 * Cancel the specified transmission-ready notification. 1317 * Cancel the specified transmission-ready notification.
1375 * 1318 *
1376 * @param th handle that was returned by "notify_transmit_ready". 1319 * @param th handle that was returned by #GNUNET_CORE_notify_transmit_ready().
1377 */ 1320 */
1378void 1321void
1379GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) 1322GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)