aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-08 17:09:42 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-08 17:09:42 +0000
commitb910380795760d26a7cc38510c1f313bb1a72ea7 (patch)
treeadcb256486cf4b97284ccb21850197037a802ba7 /src/core
parent0f420b095bd839165613d0bd681f5a8b1614e24e (diff)
downloadgnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.tar.gz
gnunet-b910380795760d26a7cc38510c1f313bb1a72ea7.zip
redefining core message timeout semantics and core-core-API messaging to address (hypothetical?) stalls which may explain #3863. This kind-of breaks the API in that the timeout now has a different semantic -- it's an advisory as to when the app would like the message transmitted, the transmission will no longer actually time out. However, that was the documented semantics before, just not the actual behavior. Most applications didn't rely on it, and tests still pass, so I didn't break too much...
Diffstat (limited to 'src/core')
-rw-r--r--src/core/core_api.c171
-rw-r--r--src/core/gnunet-service-core_clients.c34
-rw-r--r--src/core/gnunet-service-core_clients.h11
-rw-r--r--src/core/gnunet-service-core_sessions.c64
-rw-r--r--src/core/test_core_api_reliability.c58
5 files changed, 143 insertions, 195 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 2d7d71c48..7338672d6 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -63,9 +63,15 @@ struct GNUNET_CORE_TransmitHandle
63 void *get_message_cls; 63 void *get_message_cls;
64 64
65 /** 65 /**
66 * Timeout for this handle. 66 * Deadline for the transmission (the request does not get cancelled
67 * at this time, this is merely how soon the application wants this out).
67 */ 68 */
68 struct GNUNET_TIME_Absolute timeout; 69 struct GNUNET_TIME_Absolute deadline;
70
71 /**
72 * When did this request get queued?
73 */
74 struct GNUNET_TIME_Absolute request_time;
69 75
70 /** 76 /**
71 * How important is this message? 77 * How important is this message?
@@ -127,15 +133,9 @@ struct PeerRecord
127 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity peer;
128 134
129 /** 135 /**
130 * ID of timeout task for the 'pending_head' handle 136 * ID of task to run #next_request_transmission().
131 * which is the one with the smallest timeout.
132 */
133 struct GNUNET_SCHEDULER_Task * timeout_task;
134
135 /**
136 * ID of task to run 'next_request_transmission'.
137 */ 137 */
138 struct GNUNET_SCHEDULER_Task * ntr_task; 138 struct GNUNET_SCHEDULER_Task *ntr_task;
139 139
140 /** 140 /**
141 * SendMessageRequest ID generator for this peer. 141 * SendMessageRequest ID generator for this peer.
@@ -369,11 +369,6 @@ disconnect_and_free_peer_entry (void *cls,
369 struct GNUNET_CORE_TransmitHandle *th; 369 struct GNUNET_CORE_TransmitHandle *th;
370 struct PeerRecord *pr = value; 370 struct PeerRecord *pr = value;
371 371
372 if (NULL != pr->timeout_task)
373 {
374 GNUNET_SCHEDULER_cancel (pr->timeout_task);
375 pr->timeout_task = NULL;
376 }
377 if (NULL != pr->ntr_task) 372 if (NULL != pr->ntr_task)
378 { 373 {
379 GNUNET_SCHEDULER_cancel (pr->ntr_task); 374 GNUNET_SCHEDULER_cancel (pr->ntr_task);
@@ -401,7 +396,6 @@ disconnect_and_free_peer_entry (void *cls,
401 GNUNET_assert (GNUNET_YES == 396 GNUNET_assert (GNUNET_YES ==
402 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); 397 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr));
403 GNUNET_assert (pr->ch == h); 398 GNUNET_assert (pr->ch == h);
404 GNUNET_assert (NULL == pr->timeout_task);
405 GNUNET_assert (NULL == pr->ntr_task); 399 GNUNET_assert (NULL == pr->ntr_task);
406 GNUNET_free (pr); 400 GNUNET_free (pr);
407 return GNUNET_YES; 401 return GNUNET_YES;
@@ -471,18 +465,6 @@ trigger_next_request (struct GNUNET_CORE_Handle *h,
471 465
472 466
473/** 467/**
474 * The given request hit its timeout. Remove from the
475 * doubly-linked list and call the respective continuation.
476 *
477 * @param cls the transmit handle of the request that timed out
478 * @param tc context, can be NULL (!)
479 */
480static void
481transmission_timeout (void *cls,
482 const struct GNUNET_SCHEDULER_TaskContext *tc);
483
484
485/**
486 * Send a control message to the peer asking for transmission 468 * Send a control message to the peer asking for transmission
487 * of the message in the given peer record. 469 * of the message in the given peer record.
488 * 470 *
@@ -496,25 +478,16 @@ request_next_transmission (struct PeerRecord *pr)
496 struct SendMessageRequest *smr; 478 struct SendMessageRequest *smr;
497 struct GNUNET_CORE_TransmitHandle *th; 479 struct GNUNET_CORE_TransmitHandle *th;
498 480
499 if (pr->timeout_task != NULL)
500 {
501 GNUNET_SCHEDULER_cancel (pr->timeout_task);
502 pr->timeout_task = NULL;
503 }
504 th = &pr->th; 481 th = &pr->th;
505 if (NULL == th->peer) 482 if (NULL == th->peer)
506 { 483 {
507 trigger_next_request (h, GNUNET_NO); 484 trigger_next_request (h, GNUNET_NO);
508 return; 485 return;
509 } 486 }
510 if (th->cm != NULL) 487 if (NULL != th->cm)
511 return; /* already done */ 488 return; /* already done */
512 GNUNET_assert (pr->prev == NULL); 489 GNUNET_assert (NULL == pr->prev);
513 GNUNET_assert (pr->next == NULL); 490 GNUNET_assert (NULL == pr->next);
514 pr->timeout_task
515 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
516 &transmission_timeout,
517 pr);
518 cm = GNUNET_malloc (sizeof (struct ControlMessage) + 491 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
519 sizeof (struct SendMessageRequest)); 492 sizeof (struct SendMessageRequest));
520 th->cm = cm; 493 th->cm = cm;
@@ -523,7 +496,7 @@ request_next_transmission (struct PeerRecord *pr)
523 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); 496 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
524 smr->header.size = htons (sizeof (struct SendMessageRequest)); 497 smr->header.size = htons (sizeof (struct SendMessageRequest));
525 smr->priority = htonl ((uint32_t) th->priority); 498 smr->priority = htonl ((uint32_t) th->priority);
526 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); 499 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline);
527 smr->peer = pr->peer; 500 smr->peer = pr->peer;
528 smr->reserved = htonl (0); 501 smr->reserved = htonl (0);
529 smr->size = htons (th->msize); 502 smr->size = htons (th->msize);
@@ -538,56 +511,6 @@ request_next_transmission (struct PeerRecord *pr)
538 511
539 512
540/** 513/**
541 * The given request hit its timeout. Remove from the
542 * doubly-linked list and call the respective continuation.
543 *
544 * @param cls the transmit handle of the request that timed out
545 * @param tc context, can be NULL (!)
546 */
547static void
548transmission_timeout (void *cls,
549 const struct GNUNET_SCHEDULER_TaskContext *tc)
550{
551 struct PeerRecord *pr = cls;
552 struct GNUNET_CORE_Handle *h = pr->ch;
553 struct GNUNET_CORE_TransmitHandle *th;
554
555 pr->timeout_task = NULL;
556 if (NULL != pr->ntr_task)
557 {
558 GNUNET_SCHEDULER_cancel (pr->ntr_task);
559 pr->ntr_task = NULL;
560 }
561 th = &pr->th;
562 th->peer = NULL;
563 if ( (NULL != pr->prev) ||
564 (NULL != pr->next) ||
565 (pr == h->ready_peer_head) )
566 {
567 /* the request that was 'approved' by core was
568 * canceled before it could be transmitted; remove
569 * us from the 'ready' list */
570 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
571 h->ready_peer_tail,
572 pr);
573 }
574 if (NULL != th->cm)
575 {
576 /* we're currently in the control queue, remove */
577 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
578 h->control_pending_tail, th->cm);
579 GNUNET_free (th->cm);
580 }
581 LOG (GNUNET_ERROR_TYPE_DEBUG,
582 "Signalling timeout of request for transmission to peer `%s' via CORE\n",
583 GNUNET_i2s (&pr->peer));
584 trigger_next_request (h, GNUNET_NO);
585
586 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
587}
588
589
590/**
591 * Transmit the next message to the core service. 514 * Transmit the next message to the core service.
592 * 515 *
593 * @param cls closure with the `struct GNUNET_CORE_Handle` 516 * @param cls closure with the `struct GNUNET_CORE_Handle`
@@ -603,6 +526,8 @@ transmit_message (void *cls,
603 struct GNUNET_CORE_Handle *h = cls; 526 struct GNUNET_CORE_Handle *h = cls;
604 struct ControlMessage *cm; 527 struct ControlMessage *cm;
605 struct GNUNET_CORE_TransmitHandle *th; 528 struct GNUNET_CORE_TransmitHandle *th;
529 struct GNUNET_TIME_Relative delay;
530 struct GNUNET_TIME_Relative overdue;
606 struct PeerRecord *pr; 531 struct PeerRecord *pr;
607 struct SendMessage *sm; 532 struct SendMessage *sm;
608 const struct GNUNET_MessageHeader *hdr; 533 const struct GNUNET_MessageHeader *hdr;
@@ -657,11 +582,6 @@ transmit_message (void *cls,
657 h->ready_peer_tail, 582 h->ready_peer_tail,
658 pr); 583 pr);
659 th->peer = NULL; 584 th->peer = NULL;
660 if (NULL != pr->timeout_task)
661 {
662 GNUNET_SCHEDULER_cancel (pr->timeout_task);
663 pr->timeout_task = NULL;
664 }
665 LOG (GNUNET_ERROR_TYPE_DEBUG, 585 LOG (GNUNET_ERROR_TYPE_DEBUG,
666 "Transmitting SEND request to `%s' with %u bytes.\n", 586 "Transmitting SEND request to `%s' with %u bytes.\n",
667 GNUNET_i2s (&pr->peer), 587 GNUNET_i2s (&pr->peer),
@@ -669,7 +589,7 @@ transmit_message (void *cls,
669 sm = (struct SendMessage *) buf; 589 sm = (struct SendMessage *) buf;
670 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); 590 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
671 sm->priority = htonl ((uint32_t) th->priority); 591 sm->priority = htonl ((uint32_t) th->priority);
672 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); 592 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
673 sm->peer = pr->peer; 593 sm->peer = pr->peer;
674 sm->cork = htonl ((uint32_t) th->cork); 594 sm->cork = htonl ((uint32_t) th->cork);
675 sm->reserved = htonl (0); 595 sm->reserved = htonl (0);
@@ -677,24 +597,43 @@ transmit_message (void *cls,
677 th->get_message (th->get_message_cls, 597 th->get_message (th->get_message_cls,
678 size - sizeof (struct SendMessage), 598 size - sizeof (struct SendMessage),
679 &sm[1]); 599 &sm[1]);
680 600 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 601 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
682 "Transmitting SEND request to `%s' yielded %u bytes.\n", 602 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
683 GNUNET_i2s (&pr->peer), 603 LOG (GNUNET_ERROR_TYPE_WARNING,
684 ret); 604 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n",
685 if (0 == ret) 605 ret,
606 GNUNET_i2s (&pr->peer),
607 (unsigned int) th->priority,
608 GNUNET_STRINGS_relative_time_to_string (delay,
609 GNUNET_YES),
610 (th->cork) ? " (corked)" : "");
611 else
612 LOG (GNUNET_ERROR_TYPE_DEBUG,
613 "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n",
614 ret,
615 GNUNET_i2s (&pr->peer),
616 (unsigned int) th->priority,
617 GNUNET_STRINGS_relative_time_to_string (delay,
618 GNUNET_YES),
619 (th->cork) ? " (corked)" : "");
620 if ( (0 == ret) &&
621 (GNUNET_CORE_PRIO_BACKGROUND == th->priority) )
686 { 622 {
623 /* client decided to send nothing; as the priority was
624 BACKGROUND, we can just not send anything to core.
625 For higher-priority messages, we must give an
626 empty message to CORE so that it knows that this
627 message is no longer pending. */
687 LOG (GNUNET_ERROR_TYPE_DEBUG, 628 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Size of clients message to peer %s is 0!\n", 629 "Size of clients message to peer %s is 0!\n",
689 GNUNET_i2s (&pr->peer)); 630 GNUNET_i2s (&pr->peer));
690 /* client decided to send nothing! */
691 request_next_transmission (pr); 631 request_next_transmission (pr);
692 return 0; 632 return 0;
693 } 633 }
694 LOG (GNUNET_ERROR_TYPE_DEBUG, 634 LOG (GNUNET_ERROR_TYPE_DEBUG,
695 "Produced SEND message to core with %u bytes payload\n", 635 "Produced SEND message to core with %u bytes payload\n",
696 (unsigned int) ret); 636 (unsigned int) ret);
697 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
698 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 637 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
699 { 638 {
700 GNUNET_break (0); 639 GNUNET_break (0);
@@ -932,7 +871,7 @@ main_notify_handler (void *cls,
932 } 871 }
933 em = (const struct GNUNET_MessageHeader *) &ntm[1]; 872 em = (const struct GNUNET_MessageHeader *) &ntm[1];
934 LOG (GNUNET_ERROR_TYPE_DEBUG, 873 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received message of type %u and size %u from peer `%4s'\n", 874 "Received message of type %u and size %u from peer `%s'\n",
936 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); 875 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
937 if ((GNUNET_NO == h->inbound_hdr_only) && 876 if ((GNUNET_NO == h->inbound_hdr_only) &&
938 (msize != 877 (msize !=
@@ -951,7 +890,7 @@ main_notify_handler (void *cls,
951 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) 890 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
952 { 891 {
953 LOG (GNUNET_ERROR_TYPE_ERROR, 892 LOG (GNUNET_ERROR_TYPE_ERROR,
954 "Unexpected message size %u for message of type %u from peer `%4s'\n", 893 "Unexpected message size %u for message of type %u from peer `%s'\n",
955 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); 894 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
956 GNUNET_break_op (0); 895 GNUNET_break_op (0);
957 continue; 896 continue;
@@ -1208,7 +1147,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1208 h->currently_down = GNUNET_YES; 1147 h->currently_down = GNUNET_YES;
1209 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 1148 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1210 if (NULL != handlers) 1149 if (NULL != handlers)
1211 while (handlers[h->hcnt].callback != NULL) 1150 while (NULL != handlers[h->hcnt].callback)
1212 h->hcnt++; 1151 h->hcnt++;
1213 GNUNET_assert (h->hcnt < 1152 GNUNET_assert (h->hcnt <
1214 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1153 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
@@ -1259,22 +1198,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1259 GNUNET_CONTAINER_multipeermap_iterate (handle->peers, 1198 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
1260 &disconnect_and_free_peer_entry, 1199 &disconnect_and_free_peer_entry,
1261 handle); 1200 handle);
1262 if (handle->reconnect_task != NULL) 1201 if (NULL != handle->reconnect_task)
1263 { 1202 {
1264 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 1203 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1265 handle->reconnect_task = NULL; 1204 handle->reconnect_task = NULL;
1266 } 1205 }
1267 GNUNET_CONTAINER_multipeermap_destroy (handle->peers); 1206 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
1268 handle->peers = NULL; 1207 handle->peers = NULL;
1269 GNUNET_break (handle->ready_peer_head == NULL); 1208 GNUNET_break (NULL == handle->ready_peer_head);
1270 GNUNET_free (handle); 1209 GNUNET_free (handle);
1271} 1210}
1272 1211
1273 1212
1274/** 1213/**
1275 * Task that calls 'request_next_transmission'. 1214 * Task that calls #request_next_transmission().
1276 * 1215 *
1277 * @param cls the 'struct PeerRecord *' 1216 * @param cls the `struct PeerRecord *`
1278 * @param tc scheduler context 1217 * @param tc scheduler context
1279 */ 1218 */
1280static void 1219static void
@@ -1357,7 +1296,11 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1357 th->peer = pr; 1296 th->peer = pr;
1358 th->get_message = notify; 1297 th->get_message = notify;
1359 th->get_message_cls = notify_cls; 1298 th->get_message_cls = notify_cls;
1360 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1299 th->request_time = GNUNET_TIME_absolute_get ();
1300 if (GNUNET_YES == cork)
1301 th->deadline = GNUNET_TIME_relative_to_absolute (maxdelay);
1302 else
1303 th->deadline = th->request_time;
1361 th->priority = priority; 1304 th->priority = priority;
1362 th->msize = notify_size; 1305 th->msize = notify_size;
1363 th->cork = cork; 1306 th->cork = cork;
@@ -1373,7 +1316,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1373/** 1316/**
1374 * Cancel the specified transmission-ready notification. 1317 * Cancel the specified transmission-ready notification.
1375 * 1318 *
1376 * @param th handle that was returned by "notify_transmit_ready". 1319 * @param th handle that was returned by #GNUNET_CORE_notify_transmit_ready().
1377 */ 1320 */
1378void 1321void
1379GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) 1322GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c
index 57a5d5826..3dff68528 100644
--- a/src/core/gnunet-service-core_clients.c
+++ b/src/core/gnunet-service-core_clients.c
@@ -462,10 +462,10 @@ handle_client_send (void *cls,
462 struct TokenizerContext tc; 462 struct TokenizerContext tc;
463 uint16_t msize; 463 uint16_t msize;
464 struct GNUNET_TIME_Relative delay; 464 struct GNUNET_TIME_Relative delay;
465 struct GNUNET_TIME_Relative overdue;
465 466
466 msize = ntohs (message->size); 467 msize = ntohs (message->size);
467 if (msize < 468 if (msize < sizeof (struct SendMessage))
468 sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
469 { 469 {
470 GNUNET_break (0); 470 GNUNET_break (0);
471 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 471 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -501,9 +501,10 @@ handle_client_send (void *cls,
501 return; 501 return;
502 } 502 }
503 delay = GNUNET_TIME_absolute_get_duration (tc.car->received_time); 503 delay = GNUNET_TIME_absolute_get_duration (tc.car->received_time);
504 overdue = GNUNET_TIME_absolute_get_duration (tc.car->deadline);
504 tc.cork = ntohl (sm->cork); 505 tc.cork = ntohl (sm->cork);
505 tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority); 506 tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority);
506 if (delay.rel_value_us > GNUNET_TIME_UNIT_SECONDS.rel_value_us) 507 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
507 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 508 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
508 "Client waited %s for transmission of %u bytes to `%s'%s, CORE queue is %u entries\n", 509 "Client waited %s for transmission of %u bytes to `%s'%s, CORE queue is %u entries\n",
509 GNUNET_STRINGS_relative_time_to_string (delay, 510 GNUNET_STRINGS_relative_time_to_string (delay,
@@ -531,11 +532,7 @@ handle_client_send (void *cls,
531 msize, 532 msize,
532 GNUNET_YES, 533 GNUNET_YES,
533 GNUNET_NO); 534 GNUNET_NO);
534 if (0 != 535 GSC_SESSIONS_dequeue_request (tc.car);
535 memcmp (&tc.car->target,
536 &GSC_my_identity,
537 sizeof (struct GNUNET_PeerIdentity)))
538 GSC_SESSIONS_dequeue_request (tc.car);
539 GNUNET_free (tc.car); 536 GNUNET_free (tc.car);
540 GNUNET_SERVER_receive_done (client, 537 GNUNET_SERVER_receive_done (client,
541 GNUNET_OK); 538 GNUNET_OK);
@@ -702,13 +699,13 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
702 &GSC_my_identity, 699 &GSC_my_identity,
703 sizeof (struct GNUNET_PeerIdentity))); 700 sizeof (struct GNUNET_PeerIdentity)));
704 GSC_SESSIONS_dequeue_request (car); 701 GSC_SESSIONS_dequeue_request (car);
705 GSC_CLIENTS_reject_request (car); 702 GSC_CLIENTS_reject_request (car,
703 GNUNET_NO);
706 return; 704 return;
707 } 705 }
708 delay = GNUNET_TIME_absolute_get_duration (car->received_time); 706 delay = GNUNET_TIME_absolute_get_duration (car->received_time);
709 left = GNUNET_TIME_absolute_get_remaining (car->deadline); 707 left = GNUNET_TIME_absolute_get_duration (car->deadline);
710 if ( (delay.rel_value_us > GNUNET_TIME_UNIT_SECONDS.rel_value_us) || 708 if (left.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
711 (0 == left.rel_value_us) )
712 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 709 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
713 "Client waited %s for permission to transmit to `%s'%s (priority %u)\n", 710 "Client waited %s for permission to transmit to `%s'%s (priority %u)\n",
714 GNUNET_STRINGS_relative_time_to_string (delay, 711 GNUNET_STRINGS_relative_time_to_string (delay,
@@ -728,21 +725,28 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
728 725
729 726
730/** 727/**
731 * Tell a client that we will never be ready to receive the 728 * We will never be ready to transmit the given message in (disconnect
732 * given message in time (disconnect or timeout). 729 * or invalid request). Frees resources associated with @a car. We
730 * don't explicitly tell the client, he'll learn with the disconnect
731 * (or violated the protocol).
733 * 732 *
734 * @param car request that now permanently failed; the 733 * @param car request that now permanently failed; the
735 * responsibility for the handle is now returned 734 * responsibility for the handle is now returned
736 * to CLIENTS (SESSIONS is done with it). 735 * to CLIENTS (SESSIONS is done with it).
736 * @param drop_client #GNUNET_YES if the client violated the protocol
737 * and we should thus drop the connection
737 */ 738 */
738void 739void
739GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car) 740GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car,
741 int drop_client)
740{ 742{
741 GNUNET_assert (GNUNET_YES == 743 GNUNET_assert (GNUNET_YES ==
742 GNUNET_CONTAINER_multipeermap_remove (car-> 744 GNUNET_CONTAINER_multipeermap_remove (car->
743 client_handle->requests, 745 client_handle->requests,
744 &car->target, 746 &car->target,
745 car)); 747 car));
748 if (GNUNET_YES == drop_client)
749 GNUNET_SERVER_client_disconnect (car->client_handle->client_handle);
746 GNUNET_free (car); 750 GNUNET_free (car);
747} 751}
748 752
diff --git a/src/core/gnunet-service-core_clients.h b/src/core/gnunet-service-core_clients.h
index e39ce616f..a9d712442 100644
--- a/src/core/gnunet-service-core_clients.h
+++ b/src/core/gnunet-service-core_clients.h
@@ -107,15 +107,20 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car);
107 107
108 108
109/** 109/**
110 * Tell a client that we will never be ready to receive the 110 * We will never be ready to transmit the given message in (disconnect
111 * given message in time (disconnect or timeout). 111 * or invalid request). Frees resources associated with @a car. We
112 * don't explicitly tell the client, he'll learn with the disconnect
113 * (or violated the protocol).
112 * 114 *
113 * @param car request that now permanently failed; the 115 * @param car request that now permanently failed; the
114 * responsibility for the handle is now returned 116 * responsibility for the handle is now returned
115 * to CLIENTS (SESSIONS is done with it). 117 * to CLIENTS (SESSIONS is done with it).
118 * @param drop_client #GNUNET_YES if the client violated the protocol
119 * and we should thus drop the connection
116 */ 120 */
117void 121void
118GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car); 122GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car,
123 int drop_client);
119 124
120 125
121/** 126/**
diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c
index bbabf8866..b99fee272 100644
--- a/src/core/gnunet-service-core_sessions.c
+++ b/src/core/gnunet-service-core_sessions.c
@@ -229,7 +229,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
229 { 229 {
230 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, 230 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
231 session->active_client_request_tail, car); 231 session->active_client_request_tail, car);
232 GSC_CLIENTS_reject_request (car); 232 GSC_CLIENTS_reject_request (car,
233 GNUNET_NO);
233 } 234 }
234 while (NULL != (sme = session->sme_head)) 235 while (NULL != (sme = session->sme_head))
235 { 236 {
@@ -486,13 +487,15 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487 "Dropped client request for transmission (am disconnected)\n"); 488 "Dropped client request for transmission (am disconnected)\n");
488 GNUNET_break (0); /* should have been rejected earlier */ 489 GNUNET_break (0); /* should have been rejected earlier */
489 GSC_CLIENTS_reject_request (car); 490 GSC_CLIENTS_reject_request (car,
491 GNUNET_NO);
490 return; 492 return;
491 } 493 }
492 if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) 494 if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
493 { 495 {
494 GNUNET_break (0); 496 GNUNET_break (0);
495 GSC_CLIENTS_reject_request (car); 497 GSC_CLIENTS_reject_request (car,
498 GNUNET_YES);
496 return; 499 return;
497 } 500 }
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -516,7 +519,8 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
516 struct Session *session; 519 struct Session *session;
517 520
518 if (0 == 521 if (0 ==
519 memcmp (&car->target, &GSC_my_identity, 522 memcmp (&car->target,
523 &GSC_my_identity,
520 sizeof (struct GNUNET_PeerIdentity))) 524 sizeof (struct GNUNET_PeerIdentity)))
521 return; 525 return;
522 session = find_session (&car->target); 526 session = find_session (&car->target);
@@ -524,41 +528,10 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
524 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, 528 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
525 session->active_client_request_tail, 529 session->active_client_request_tail,
526 car); 530 car);
527} 531 /* dequeueing of 'high' priority messages may unblock
528 532 transmission for lower-priority messages, so we also
529 533 need to try in this case. */
530/** 534 try_transmission (session);
531 * Discard all expired active transmission requests from clients.
532 *
533 * @param session session to clean up
534 */
535static void
536discard_expired_requests (struct Session *session)
537{
538 struct GSC_ClientActiveRequest *pos;
539 struct GSC_ClientActiveRequest *nxt;
540 struct GNUNET_TIME_Absolute now;
541
542 now = GNUNET_TIME_absolute_get ();
543 pos = NULL;
544 nxt = session->active_client_request_head;
545 while (NULL != nxt)
546 {
547 pos = nxt;
548 nxt = pos->next;
549 if ( (pos->deadline.abs_value_us < now.abs_value_us) &&
550 (GNUNET_YES != pos->was_solicited) )
551 {
552 GNUNET_STATISTICS_update (GSC_stats,
553 gettext_noop
554 ("# messages discarded (expired prior to transmission)"),
555 1, GNUNET_NO);
556 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
557 session->active_client_request_tail,
558 pos);
559 GSC_CLIENTS_reject_request (pos);
560 }
561 }
562} 535}
563 536
564 537
@@ -578,7 +551,6 @@ solicit_messages (struct Session *session,
578 size_t so_size; 551 size_t so_size;
579 enum GNUNET_CORE_Priority pmax; 552 enum GNUNET_CORE_Priority pmax;
580 553
581 discard_expired_requests (session);
582 so_size = msize; 554 so_size = msize;
583 pmax = GNUNET_CORE_PRIO_BACKGROUND; 555 pmax = GNUNET_CORE_PRIO_BACKGROUND;
584 for (car = session->active_client_request_head; NULL != car; car = car->next) 556 for (car = session->active_client_request_head; NULL != car; car = car->next)
@@ -599,6 +571,9 @@ solicit_messages (struct Session *session,
599 if (GNUNET_YES == car->was_solicited) 571 if (GNUNET_YES == car->was_solicited)
600 continue; 572 continue;
601 car->was_solicited = GNUNET_YES; 573 car->was_solicited = GNUNET_YES;
574 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575 "Soliciting message with priority %u\n",
576 car->priority);
602 GSC_CLIENTS_solicit_request (car); 577 GSC_CLIENTS_solicit_request (car);
603 } 578 }
604} 579}
@@ -642,7 +617,11 @@ try_transmission (struct Session *session)
642 int excess; 617 int excess;
643 618
644 if (GNUNET_YES != session->ready_to_transmit) 619 if (GNUNET_YES != session->ready_to_transmit)
620 {
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "Already ready to transmit, not evaluating queue\n");
645 return; 623 return;
624 }
646 msize = 0; 625 msize = 0;
647 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; 626 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
648 /* if the peer has excess bandwidth, background traffic is allowed, 627 /* if the peer has excess bandwidth, background traffic is allowed,
@@ -745,6 +724,11 @@ try_transmission (struct Session *session)
745 &pop_cork_task, 724 &pop_cork_task,
746 session); 725 session);
747 } 726 }
727 else
728 {
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Queue empty, waiting for solicitations\n");
731 }
748 return; 732 return;
749 } 733 }
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/core/test_core_api_reliability.c b/src/core/test_core_api_reliability.c
index f2fadb59e..bd55565e3 100644
--- a/src/core/test_core_api_reliability.c
+++ b/src/core/test_core_api_reliability.c
@@ -41,7 +41,7 @@
41/** 41/**
42 * How long until we give up on transmitting the message? 42 * How long until we give up on transmitting the message?
43 */ 43 */
44#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 6000) 44#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600)
45 45
46/** 46/**
47 * What delay do we request from the core service for transmission? 47 * What delay do we request from the core service for transmission?
@@ -55,9 +55,9 @@ static unsigned long long total_bytes;
55 55
56static struct GNUNET_TIME_Absolute start_time; 56static struct GNUNET_TIME_Absolute start_time;
57 57
58static struct GNUNET_SCHEDULER_Task * err_task; 58static struct GNUNET_SCHEDULER_Task *err_task;
59 59
60static struct GNUNET_SCHEDULER_Task * connect_task; 60static struct GNUNET_SCHEDULER_Task *connect_task;
61 61
62 62
63struct PeerContext 63struct PeerContext
@@ -185,9 +185,9 @@ transmit_ready (void *cls, size_t size, void *buf)
185 unsigned int ret; 185 unsigned int ret;
186 186
187 GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); 187 GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
188 if (buf == NULL) 188 if (NULL == buf)
189 { 189 {
190 if (p1.ch != NULL) 190 if (NULL != p1.ch)
191 GNUNET_break (NULL != 191 GNUNET_break (NULL !=
192 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO, 192 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
193 GNUNET_CORE_PRIO_BEST_EFFORT, 193 GNUNET_CORE_PRIO_BEST_EFFORT,
@@ -221,7 +221,8 @@ transmit_ready (void *cls, size_t size, void *buf)
221 while (size - ret >= s); 221 while (size - ret >= s);
222 GNUNET_SCHEDULER_cancel (err_task); 222 GNUNET_SCHEDULER_cancel (err_task);
223 err_task = 223 err_task =
224 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); 224 GNUNET_SCHEDULER_add_delayed (TIMEOUT,
225 &terminate_task_error, NULL);
225 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
226 "Returning total message block of size %u\n", ret); 227 "Returning total message block of size %u\n", ret);
227 total_bytes += ret; 228 total_bytes += ret;
@@ -241,10 +242,10 @@ connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
241 if (pc == &p1) 242 if (pc == &p1)
242 { 243 {
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Encrypted connection established to peer `%4s'\n", 245 "Encrypted connection established to peer `%s'\n",
245 GNUNET_i2s (peer)); 246 GNUNET_i2s (peer));
246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 247 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
247 "Asking core (1) for transmission to peer `%4s'\n", 248 "Asking core (1) for transmission to peer `%s'\n",
248 GNUNET_i2s (&p2.id)); 249 GNUNET_i2s (&p2.id));
249 GNUNET_SCHEDULER_cancel (err_task); 250 GNUNET_SCHEDULER_cancel (err_task);
250 err_task = 251 err_task =
@@ -268,7 +269,7 @@ disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
268 if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity))) 269 if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
269 return; 270 return;
270 pc->connect_status = 0; 271 pc->connect_status = 0;
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%4s' cut\n", 272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%s' cut\n",
272 GNUNET_i2s (peer)); 273 GNUNET_i2s (peer));
273} 274}
274 275
@@ -278,7 +279,7 @@ inbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
278 const struct GNUNET_MessageHeader *message) 279 const struct GNUNET_MessageHeader *message)
279{ 280{
280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281 "Core provides inbound data from `%4s'.\n", GNUNET_i2s (other)); 282 "Core provides inbound data from `%s'.\n", GNUNET_i2s (other));
282 return GNUNET_OK; 283 return GNUNET_OK;
283} 284}
284 285
@@ -288,7 +289,7 @@ outbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
288 const struct GNUNET_MessageHeader *message) 289 const struct GNUNET_MessageHeader *message)
289{ 290{
290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 291 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
291 "Core notifies about outbound data for `%4s'.\n", 292 "Core notifies about outbound data for `%s'.\n",
292 GNUNET_i2s (other)); 293 GNUNET_i2s (other));
293 return GNUNET_OK; 294 return GNUNET_OK;
294} 295}
@@ -299,7 +300,8 @@ transmit_ready (void *cls, size_t size, void *buf);
299 300
300 301
301static int 302static int
302process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer, 303process_mtype (void *cls,
304 const struct GNUNET_PeerIdentity *peer,
303 const struct GNUNET_MessageHeader *message) 305 const struct GNUNET_MessageHeader *message)
304{ 306{
305 static int n; 307 static int n;
@@ -342,9 +344,11 @@ process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer,
342 { 344 {
343 if (n == tr_n) 345 if (n == tr_n)
344 GNUNET_break (NULL != 346 GNUNET_break (NULL !=
345 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO, 347 GNUNET_CORE_notify_transmit_ready (p1.ch,
348 GNUNET_NO /* no cork */,
346 GNUNET_CORE_PRIO_BEST_EFFORT, 349 GNUNET_CORE_PRIO_BEST_EFFORT,
347 FAST_TIMEOUT, &p2.id, 350 FAST_TIMEOUT /* ignored! */,
351 &p2.id,
348 get_size (tr_n), 352 get_size (tr_n),
349 &transmit_ready, &p1)); 353 &transmit_ready, &p1));
350 } 354 }
@@ -365,7 +369,7 @@ init_notify (void *cls,
365 struct PeerContext *p = cls; 369 struct PeerContext *p = cls;
366 370
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Connection to CORE service of `%4s' established\n", 372 "Connection to CORE service of `%s' established\n",
369 GNUNET_i2s (my_identity)); 373 GNUNET_i2s (my_identity));
370 p->id = *my_identity; 374 p->id = *my_identity;
371 if (cls == &p1) 375 if (cls == &p1)
@@ -373,9 +377,13 @@ init_notify (void *cls,
373 GNUNET_assert (ok == 2); 377 GNUNET_assert (ok == 2);
374 OKPP; 378 OKPP;
375 /* connect p2 */ 379 /* connect p2 */
376 GNUNET_assert (NULL != (p2.ch = GNUNET_CORE_connect (p2.cfg, &p2, &init_notify, &connect_notify, 380 GNUNET_assert (NULL != (p2.ch = GNUNET_CORE_connect (p2.cfg, &p2,
377 &disconnect_notify, &inbound_notify, GNUNET_YES, 381 &init_notify,
378 &outbound_notify, GNUNET_YES, handlers))); 382 &connect_notify,
383 &disconnect_notify,
384 &inbound_notify, GNUNET_YES,
385 &outbound_notify, GNUNET_YES,
386 handlers)));
379 } 387 }
380 else 388 else
381 { 389 {
@@ -383,7 +391,7 @@ init_notify (void *cls,
383 OKPP; 391 OKPP;
384 GNUNET_assert (cls == &p2); 392 GNUNET_assert (cls == &p2);
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "Asking transport (1) to connect to peer `%4s'\n", 394 "Asking transport (1) to connect to peer `%s'\n",
387 GNUNET_i2s (&p2.id)); 395 GNUNET_i2s (&p2.id));
388 connect_task = GNUNET_SCHEDULER_add_now (&try_connect, NULL); 396 connect_task = GNUNET_SCHEDULER_add_now (&try_connect, NULL);
389 } 397 }
@@ -443,9 +451,13 @@ run (void *cls, char *const *args, const char *cfgfile,
443 err_task = 451 err_task =
444 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); 452 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
445 453
446 GNUNET_assert (NULL != (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1, &init_notify, &connect_notify, 454 GNUNET_assert (NULL != (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1,
447 &disconnect_notify, &inbound_notify, GNUNET_YES, 455 &init_notify,
448 &outbound_notify, GNUNET_YES, handlers))); 456 &connect_notify,
457 &disconnect_notify,
458 &inbound_notify, GNUNET_YES,
459 &outbound_notify, GNUNET_YES,
460 handlers)));
449} 461}
450 462
451 463
@@ -476,7 +488,7 @@ main (int argc, char *argv1[])
476 GNUNET_GETOPT_OPTION_END 488 GNUNET_GETOPT_OPTION_END
477 }; 489 };
478 ok = 1; 490 ok = 1;
479 GNUNET_log_setup ("test-core-api", 491 GNUNET_log_setup ("test-core-api-reliability",
480 "WARNING", 492 "WARNING",
481 NULL); 493 NULL);
482 GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv, 494 GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv,