aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/core_api.c171
-rw-r--r--src/core/gnunet-service-core_clients.c34
-rw-r--r--src/core/gnunet-service-core_clients.h11
-rw-r--r--src/core/gnunet-service-core_sessions.c64
-rw-r--r--src/core/test_core_api_reliability.c58
5 files changed, 143 insertions, 195 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 2d7d71c48..7338672d6 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -63,9 +63,15 @@ struct GNUNET_CORE_TransmitHandle
63 void *get_message_cls; 63 void *get_message_cls;
64 64
65 /** 65 /**
66 * Timeout for this handle. 66 * Deadline for the transmission (the request does not get cancelled
67 * at this time, this is merely how soon the application wants this out).
67 */ 68 */
68 struct GNUNET_TIME_Absolute timeout; 69 struct GNUNET_TIME_Absolute deadline;
70
71 /**
72 * When did this request get queued?
73 */
74 struct GNUNET_TIME_Absolute request_time;
69 75
70 /** 76 /**
71 * How important is this message? 77 * How important is this message?
@@ -127,15 +133,9 @@ struct PeerRecord
127 struct GNUNET_PeerIdentity peer; 133 struct GNUNET_PeerIdentity peer;
128 134
129 /** 135 /**
130 * ID of timeout task for the 'pending_head' handle 136 * ID of task to run #next_request_transmission().
131 * which is the one with the smallest timeout.
132 */
133 struct GNUNET_SCHEDULER_Task * timeout_task;
134
135 /**
136 * ID of task to run 'next_request_transmission'.
137 */ 137 */
138 struct GNUNET_SCHEDULER_Task * ntr_task; 138 struct GNUNET_SCHEDULER_Task *ntr_task;
139 139
140 /** 140 /**
141 * SendMessageRequest ID generator for this peer. 141 * SendMessageRequest ID generator for this peer.
@@ -369,11 +369,6 @@ disconnect_and_free_peer_entry (void *cls,
369 struct GNUNET_CORE_TransmitHandle *th; 369 struct GNUNET_CORE_TransmitHandle *th;
370 struct PeerRecord *pr = value; 370 struct PeerRecord *pr = value;
371 371
372 if (NULL != pr->timeout_task)
373 {
374 GNUNET_SCHEDULER_cancel (pr->timeout_task);
375 pr->timeout_task = NULL;
376 }
377 if (NULL != pr->ntr_task) 372 if (NULL != pr->ntr_task)
378 { 373 {
379 GNUNET_SCHEDULER_cancel (pr->ntr_task); 374 GNUNET_SCHEDULER_cancel (pr->ntr_task);
@@ -401,7 +396,6 @@ disconnect_and_free_peer_entry (void *cls,
401 GNUNET_assert (GNUNET_YES == 396 GNUNET_assert (GNUNET_YES ==
402 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); 397 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr));
403 GNUNET_assert (pr->ch == h); 398 GNUNET_assert (pr->ch == h);
404 GNUNET_assert (NULL == pr->timeout_task);
405 GNUNET_assert (NULL == pr->ntr_task); 399 GNUNET_assert (NULL == pr->ntr_task);
406 GNUNET_free (pr); 400 GNUNET_free (pr);
407 return GNUNET_YES; 401 return GNUNET_YES;
@@ -471,18 +465,6 @@ trigger_next_request (struct GNUNET_CORE_Handle *h,
471 465
472 466
473/** 467/**
474 * The given request hit its timeout. Remove from the
475 * doubly-linked list and call the respective continuation.
476 *
477 * @param cls the transmit handle of the request that timed out
478 * @param tc context, can be NULL (!)
479 */
480static void
481transmission_timeout (void *cls,
482 const struct GNUNET_SCHEDULER_TaskContext *tc);
483
484
485/**
486 * Send a control message to the peer asking for transmission 468 * Send a control message to the peer asking for transmission
487 * of the message in the given peer record. 469 * of the message in the given peer record.
488 * 470 *
@@ -496,25 +478,16 @@ request_next_transmission (struct PeerRecord *pr)
496 struct SendMessageRequest *smr; 478 struct SendMessageRequest *smr;
497 struct GNUNET_CORE_TransmitHandle *th; 479 struct GNUNET_CORE_TransmitHandle *th;
498 480
499 if (pr->timeout_task != NULL)
500 {
501 GNUNET_SCHEDULER_cancel (pr->timeout_task);
502 pr->timeout_task = NULL;
503 }
504 th = &pr->th; 481 th = &pr->th;
505 if (NULL == th->peer) 482 if (NULL == th->peer)
506 { 483 {
507 trigger_next_request (h, GNUNET_NO); 484 trigger_next_request (h, GNUNET_NO);
508 return; 485 return;
509 } 486 }
510 if (th->cm != NULL) 487 if (NULL != th->cm)
511 return; /* already done */ 488 return; /* already done */
512 GNUNET_assert (pr->prev == NULL); 489 GNUNET_assert (NULL == pr->prev);
513 GNUNET_assert (pr->next == NULL); 490 GNUNET_assert (NULL == pr->next);
514 pr->timeout_task
515 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
516 &transmission_timeout,
517 pr);
518 cm = GNUNET_malloc (sizeof (struct ControlMessage) + 491 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
519 sizeof (struct SendMessageRequest)); 492 sizeof (struct SendMessageRequest));
520 th->cm = cm; 493 th->cm = cm;
@@ -523,7 +496,7 @@ request_next_transmission (struct PeerRecord *pr)
523 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); 496 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
524 smr->header.size = htons (sizeof (struct SendMessageRequest)); 497 smr->header.size = htons (sizeof (struct SendMessageRequest));
525 smr->priority = htonl ((uint32_t) th->priority); 498 smr->priority = htonl ((uint32_t) th->priority);
526 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); 499 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline);
527 smr->peer = pr->peer; 500 smr->peer = pr->peer;
528 smr->reserved = htonl (0); 501 smr->reserved = htonl (0);
529 smr->size = htons (th->msize); 502 smr->size = htons (th->msize);
@@ -538,56 +511,6 @@ request_next_transmission (struct PeerRecord *pr)
538 511
539 512
540/** 513/**
541 * The given request hit its timeout. Remove from the
542 * doubly-linked list and call the respective continuation.
543 *
544 * @param cls the transmit handle of the request that timed out
545 * @param tc context, can be NULL (!)
546 */
547static void
548transmission_timeout (void *cls,
549 const struct GNUNET_SCHEDULER_TaskContext *tc)
550{
551 struct PeerRecord *pr = cls;
552 struct GNUNET_CORE_Handle *h = pr->ch;
553 struct GNUNET_CORE_TransmitHandle *th;
554
555 pr->timeout_task = NULL;
556 if (NULL != pr->ntr_task)
557 {
558 GNUNET_SCHEDULER_cancel (pr->ntr_task);
559 pr->ntr_task = NULL;
560 }
561 th = &pr->th;
562 th->peer = NULL;
563 if ( (NULL != pr->prev) ||
564 (NULL != pr->next) ||
565 (pr == h->ready_peer_head) )
566 {
567 /* the request that was 'approved' by core was
568 * canceled before it could be transmitted; remove
569 * us from the 'ready' list */
570 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
571 h->ready_peer_tail,
572 pr);
573 }
574 if (NULL != th->cm)
575 {
576 /* we're currently in the control queue, remove */
577 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
578 h->control_pending_tail, th->cm);
579 GNUNET_free (th->cm);
580 }
581 LOG (GNUNET_ERROR_TYPE_DEBUG,
582 "Signalling timeout of request for transmission to peer `%s' via CORE\n",
583 GNUNET_i2s (&pr->peer));
584 trigger_next_request (h, GNUNET_NO);
585
586 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
587}
588
589
590/**
591 * Transmit the next message to the core service. 514 * Transmit the next message to the core service.
592 * 515 *
593 * @param cls closure with the `struct GNUNET_CORE_Handle` 516 * @param cls closure with the `struct GNUNET_CORE_Handle`
@@ -603,6 +526,8 @@ transmit_message (void *cls,
603 struct GNUNET_CORE_Handle *h = cls; 526 struct GNUNET_CORE_Handle *h = cls;
604 struct ControlMessage *cm; 527 struct ControlMessage *cm;
605 struct GNUNET_CORE_TransmitHandle *th; 528 struct GNUNET_CORE_TransmitHandle *th;
529 struct GNUNET_TIME_Relative delay;
530 struct GNUNET_TIME_Relative overdue;
606 struct PeerRecord *pr; 531 struct PeerRecord *pr;
607 struct SendMessage *sm; 532 struct SendMessage *sm;
608 const struct GNUNET_MessageHeader *hdr; 533 const struct GNUNET_MessageHeader *hdr;
@@ -657,11 +582,6 @@ transmit_message (void *cls,
657 h->ready_peer_tail, 582 h->ready_peer_tail,
658 pr); 583 pr);
659 th->peer = NULL; 584 th->peer = NULL;
660 if (NULL != pr->timeout_task)
661 {
662 GNUNET_SCHEDULER_cancel (pr->timeout_task);
663 pr->timeout_task = NULL;
664 }
665 LOG (GNUNET_ERROR_TYPE_DEBUG, 585 LOG (GNUNET_ERROR_TYPE_DEBUG,
666 "Transmitting SEND request to `%s' with %u bytes.\n", 586 "Transmitting SEND request to `%s' with %u bytes.\n",
667 GNUNET_i2s (&pr->peer), 587 GNUNET_i2s (&pr->peer),
@@ -669,7 +589,7 @@ transmit_message (void *cls,
669 sm = (struct SendMessage *) buf; 589 sm = (struct SendMessage *) buf;
670 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); 590 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
671 sm->priority = htonl ((uint32_t) th->priority); 591 sm->priority = htonl ((uint32_t) th->priority);
672 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); 592 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
673 sm->peer = pr->peer; 593 sm->peer = pr->peer;
674 sm->cork = htonl ((uint32_t) th->cork); 594 sm->cork = htonl ((uint32_t) th->cork);
675 sm->reserved = htonl (0); 595 sm->reserved = htonl (0);
@@ -677,24 +597,43 @@ transmit_message (void *cls,
677 th->get_message (th->get_message_cls, 597 th->get_message (th->get_message_cls,
678 size - sizeof (struct SendMessage), 598 size - sizeof (struct SendMessage),
679 &sm[1]); 599 &sm[1]);
680 600 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 601 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
682 "Transmitting SEND request to `%s' yielded %u bytes.\n", 602 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
683 GNUNET_i2s (&pr->peer), 603 LOG (GNUNET_ERROR_TYPE_WARNING,
684 ret); 604 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n",
685 if (0 == ret) 605 ret,
606 GNUNET_i2s (&pr->peer),
607 (unsigned int) th->priority,
608 GNUNET_STRINGS_relative_time_to_string (delay,
609 GNUNET_YES),
610 (th->cork) ? " (corked)" : "");
611 else
612 LOG (GNUNET_ERROR_TYPE_DEBUG,
613 "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n",
614 ret,
615 GNUNET_i2s (&pr->peer),
616 (unsigned int) th->priority,
617 GNUNET_STRINGS_relative_time_to_string (delay,
618 GNUNET_YES),
619 (th->cork) ? " (corked)" : "");
620 if ( (0 == ret) &&
621 (GNUNET_CORE_PRIO_BACKGROUND == th->priority) )
686 { 622 {
623 /* client decided to send nothing; as the priority was
624 BACKGROUND, we can just not send anything to core.
625 For higher-priority messages, we must give an
626 empty message to CORE so that it knows that this
627 message is no longer pending. */
687 LOG (GNUNET_ERROR_TYPE_DEBUG, 628 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Size of clients message to peer %s is 0!\n", 629 "Size of clients message to peer %s is 0!\n",
689 GNUNET_i2s (&pr->peer)); 630 GNUNET_i2s (&pr->peer));
690 /* client decided to send nothing! */
691 request_next_transmission (pr); 631 request_next_transmission (pr);
692 return 0; 632 return 0;
693 } 633 }
694 LOG (GNUNET_ERROR_TYPE_DEBUG, 634 LOG (GNUNET_ERROR_TYPE_DEBUG,
695 "Produced SEND message to core with %u bytes payload\n", 635 "Produced SEND message to core with %u bytes payload\n",
696 (unsigned int) ret); 636 (unsigned int) ret);
697 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
698 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 637 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
699 { 638 {
700 GNUNET_break (0); 639 GNUNET_break (0);
@@ -932,7 +871,7 @@ main_notify_handler (void *cls,
932 } 871 }
933 em = (const struct GNUNET_MessageHeader *) &ntm[1]; 872 em = (const struct GNUNET_MessageHeader *) &ntm[1];
934 LOG (GNUNET_ERROR_TYPE_DEBUG, 873 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received message of type %u and size %u from peer `%4s'\n", 874 "Received message of type %u and size %u from peer `%s'\n",
936 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); 875 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
937 if ((GNUNET_NO == h->inbound_hdr_only) && 876 if ((GNUNET_NO == h->inbound_hdr_only) &&
938 (msize != 877 (msize !=
@@ -951,7 +890,7 @@ main_notify_handler (void *cls,
951 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) 890 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
952 { 891 {
953 LOG (GNUNET_ERROR_TYPE_ERROR, 892 LOG (GNUNET_ERROR_TYPE_ERROR,
954 "Unexpected message size %u for message of type %u from peer `%4s'\n", 893 "Unexpected message size %u for message of type %u from peer `%s'\n",
955 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); 894 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
956 GNUNET_break_op (0); 895 GNUNET_break_op (0);
957 continue; 896 continue;
@@ -1208,7 +1147,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1208 h->currently_down = GNUNET_YES; 1147 h->currently_down = GNUNET_YES;
1209 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 1148 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
1210 if (NULL != handlers) 1149 if (NULL != handlers)
1211 while (handlers[h->hcnt].callback != NULL) 1150 while (NULL != handlers[h->hcnt].callback)
1212 h->hcnt++; 1151 h->hcnt++;
1213 GNUNET_assert (h->hcnt < 1152 GNUNET_assert (h->hcnt <
1214 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1153 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
@@ -1259,22 +1198,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1259 GNUNET_CONTAINER_multipeermap_iterate (handle->peers, 1198 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
1260 &disconnect_and_free_peer_entry, 1199 &disconnect_and_free_peer_entry,
1261 handle); 1200 handle);
1262 if (handle->reconnect_task != NULL) 1201 if (NULL != handle->reconnect_task)
1263 { 1202 {
1264 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 1203 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1265 handle->reconnect_task = NULL; 1204 handle->reconnect_task = NULL;
1266 } 1205 }
1267 GNUNET_CONTAINER_multipeermap_destroy (handle->peers); 1206 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
1268 handle->peers = NULL; 1207 handle->peers = NULL;
1269 GNUNET_break (handle->ready_peer_head == NULL); 1208 GNUNET_break (NULL == handle->ready_peer_head);
1270 GNUNET_free (handle); 1209 GNUNET_free (handle);
1271} 1210}
1272 1211
1273 1212
1274/** 1213/**
1275 * Task that calls 'request_next_transmission'. 1214 * Task that calls #request_next_transmission().
1276 * 1215 *
1277 * @param cls the 'struct PeerRecord *' 1216 * @param cls the `struct PeerRecord *`
1278 * @param tc scheduler context 1217 * @param tc scheduler context
1279 */ 1218 */
1280static void 1219static void
@@ -1357,7 +1296,11 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1357 th->peer = pr; 1296 th->peer = pr;
1358 th->get_message = notify; 1297 th->get_message = notify;
1359 th->get_message_cls = notify_cls; 1298 th->get_message_cls = notify_cls;
1360 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1299 th->request_time = GNUNET_TIME_absolute_get ();
1300 if (GNUNET_YES == cork)
1301 th->deadline = GNUNET_TIME_relative_to_absolute (maxdelay);
1302 else
1303 th->deadline = th->request_time;
1361 th->priority = priority; 1304 th->priority = priority;
1362 th->msize = notify_size; 1305 th->msize = notify_size;
1363 th->cork = cork; 1306 th->cork = cork;
@@ -1373,7 +1316,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1373/** 1316/**
1374 * Cancel the specified transmission-ready notification. 1317 * Cancel the specified transmission-ready notification.
1375 * 1318 *
1376 * @param th handle that was returned by "notify_transmit_ready". 1319 * @param th handle that was returned by #GNUNET_CORE_notify_transmit_ready().
1377 */ 1320 */
1378void 1321void
1379GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) 1322GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c
index 57a5d5826..3dff68528 100644
--- a/src/core/gnunet-service-core_clients.c
+++ b/src/core/gnunet-service-core_clients.c
@@ -462,10 +462,10 @@ handle_client_send (void *cls,
462 struct TokenizerContext tc; 462 struct TokenizerContext tc;
463 uint16_t msize; 463 uint16_t msize;
464 struct GNUNET_TIME_Relative delay; 464 struct GNUNET_TIME_Relative delay;
465 struct GNUNET_TIME_Relative overdue;
465 466
466 msize = ntohs (message->size); 467 msize = ntohs (message->size);
467 if (msize < 468 if (msize < sizeof (struct SendMessage))
468 sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
469 { 469 {
470 GNUNET_break (0); 470 GNUNET_break (0);
471 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 471 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -501,9 +501,10 @@ handle_client_send (void *cls,
501 return; 501 return;
502 } 502 }
503 delay = GNUNET_TIME_absolute_get_duration (tc.car->received_time); 503 delay = GNUNET_TIME_absolute_get_duration (tc.car->received_time);
504 overdue = GNUNET_TIME_absolute_get_duration (tc.car->deadline);
504 tc.cork = ntohl (sm->cork); 505 tc.cork = ntohl (sm->cork);
505 tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority); 506 tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority);
506 if (delay.rel_value_us > GNUNET_TIME_UNIT_SECONDS.rel_value_us) 507 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
507 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 508 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
508 "Client waited %s for transmission of %u bytes to `%s'%s, CORE queue is %u entries\n", 509 "Client waited %s for transmission of %u bytes to `%s'%s, CORE queue is %u entries\n",
509 GNUNET_STRINGS_relative_time_to_string (delay, 510 GNUNET_STRINGS_relative_time_to_string (delay,
@@ -531,11 +532,7 @@ handle_client_send (void *cls,
531 msize, 532 msize,
532 GNUNET_YES, 533 GNUNET_YES,
533 GNUNET_NO); 534 GNUNET_NO);
534 if (0 != 535 GSC_SESSIONS_dequeue_request (tc.car);
535 memcmp (&tc.car->target,
536 &GSC_my_identity,
537 sizeof (struct GNUNET_PeerIdentity)))
538 GSC_SESSIONS_dequeue_request (tc.car);
539 GNUNET_free (tc.car); 536 GNUNET_free (tc.car);
540 GNUNET_SERVER_receive_done (client, 537 GNUNET_SERVER_receive_done (client,
541 GNUNET_OK); 538 GNUNET_OK);
@@ -702,13 +699,13 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
702 &GSC_my_identity, 699 &GSC_my_identity,
703 sizeof (struct GNUNET_PeerIdentity))); 700 sizeof (struct GNUNET_PeerIdentity)));
704 GSC_SESSIONS_dequeue_request (car); 701 GSC_SESSIONS_dequeue_request (car);
705 GSC_CLIENTS_reject_request (car); 702 GSC_CLIENTS_reject_request (car,
703 GNUNET_NO);
706 return; 704 return;
707 } 705 }
708 delay = GNUNET_TIME_absolute_get_duration (car->received_time); 706 delay = GNUNET_TIME_absolute_get_duration (car->received_time);
709 left = GNUNET_TIME_absolute_get_remaining (car->deadline); 707 left = GNUNET_TIME_absolute_get_duration (car->deadline);
710 if ( (delay.rel_value_us > GNUNET_TIME_UNIT_SECONDS.rel_value_us) || 708 if (left.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
711 (0 == left.rel_value_us) )
712 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 709 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
713 "Client waited %s for permission to transmit to `%s'%s (priority %u)\n", 710 "Client waited %s for permission to transmit to `%s'%s (priority %u)\n",
714 GNUNET_STRINGS_relative_time_to_string (delay, 711 GNUNET_STRINGS_relative_time_to_string (delay,
@@ -728,21 +725,28 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
728 725
729 726
730/** 727/**
731 * Tell a client that we will never be ready to receive the 728 * We will never be ready to transmit the given message in (disconnect
732 * given message in time (disconnect or timeout). 729 * or invalid request). Frees resources associated with @a car. We
730 * don't explicitly tell the client, he'll learn with the disconnect
731 * (or violated the protocol).
733 * 732 *
734 * @param car request that now permanently failed; the 733 * @param car request that now permanently failed; the
735 * responsibility for the handle is now returned 734 * responsibility for the handle is now returned
736 * to CLIENTS (SESSIONS is done with it). 735 * to CLIENTS (SESSIONS is done with it).
736 * @param drop_client #GNUNET_YES if the client violated the protocol
737 * and we should thus drop the connection
737 */ 738 */
738void 739void
739GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car) 740GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car,
741 int drop_client)
740{ 742{
741 GNUNET_assert (GNUNET_YES == 743 GNUNET_assert (GNUNET_YES ==
742 GNUNET_CONTAINER_multipeermap_remove (car-> 744 GNUNET_CONTAINER_multipeermap_remove (car->
743 client_handle->requests, 745 client_handle->requests,
744 &car->target, 746 &car->target,
745 car)); 747 car));
748 if (GNUNET_YES == drop_client)
749 GNUNET_SERVER_client_disconnect (car->client_handle->client_handle);
746 GNUNET_free (car); 750 GNUNET_free (car);
747} 751}
748 752
diff --git a/src/core/gnunet-service-core_clients.h b/src/core/gnunet-service-core_clients.h
index e39ce616f..a9d712442 100644
--- a/src/core/gnunet-service-core_clients.h
+++ b/src/core/gnunet-service-core_clients.h
@@ -107,15 +107,20 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car);
107 107
108 108
109/** 109/**
110 * Tell a client that we will never be ready to receive the 110 * We will never be ready to transmit the given message in (disconnect
111 * given message in time (disconnect or timeout). 111 * or invalid request). Frees resources associated with @a car. We
112 * don't explicitly tell the client, he'll learn with the disconnect
113 * (or violated the protocol).
112 * 114 *
113 * @param car request that now permanently failed; the 115 * @param car request that now permanently failed; the
114 * responsibility for the handle is now returned 116 * responsibility for the handle is now returned
115 * to CLIENTS (SESSIONS is done with it). 117 * to CLIENTS (SESSIONS is done with it).
118 * @param drop_client #GNUNET_YES if the client violated the protocol
119 * and we should thus drop the connection
116 */ 120 */
117void 121void
118GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car); 122GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car,
123 int drop_client);
119 124
120 125
121/** 126/**
diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c
index bbabf8866..b99fee272 100644
--- a/src/core/gnunet-service-core_sessions.c
+++ b/src/core/gnunet-service-core_sessions.c
@@ -229,7 +229,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
229 { 229 {
230 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, 230 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
231 session->active_client_request_tail, car); 231 session->active_client_request_tail, car);
232 GSC_CLIENTS_reject_request (car); 232 GSC_CLIENTS_reject_request (car,
233 GNUNET_NO);
233 } 234 }
234 while (NULL != (sme = session->sme_head)) 235 while (NULL != (sme = session->sme_head))
235 { 236 {
@@ -486,13 +487,15 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487 "Dropped client request for transmission (am disconnected)\n"); 488 "Dropped client request for transmission (am disconnected)\n");
488 GNUNET_break (0); /* should have been rejected earlier */ 489 GNUNET_break (0); /* should have been rejected earlier */
489 GSC_CLIENTS_reject_request (car); 490 GSC_CLIENTS_reject_request (car,
491 GNUNET_NO);
490 return; 492 return;
491 } 493 }
492 if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) 494 if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
493 { 495 {
494 GNUNET_break (0); 496 GNUNET_break (0);
495 GSC_CLIENTS_reject_request (car); 497 GSC_CLIENTS_reject_request (car,
498 GNUNET_YES);
496 return; 499 return;
497 } 500 }
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -516,7 +519,8 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
516 struct Session *session; 519 struct Session *session;
517 520
518 if (0 == 521 if (0 ==
519 memcmp (&car->target, &GSC_my_identity, 522 memcmp (&car->target,
523 &GSC_my_identity,
520 sizeof (struct GNUNET_PeerIdentity))) 524 sizeof (struct GNUNET_PeerIdentity)))
521 return; 525 return;
522 session = find_session (&car->target); 526 session = find_session (&car->target);
@@ -524,41 +528,10 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
524 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, 528 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
525 session->active_client_request_tail, 529 session->active_client_request_tail,
526 car); 530 car);
527} 531 /* dequeueing of 'high' priority messages may unblock
528 532 transmission for lower-priority messages, so we also
529 533 need to try in this case. */
530/** 534 try_transmission (session);
531 * Discard all expired active transmission requests from clients.
532 *
533 * @param session session to clean up
534 */
535static void
536discard_expired_requests (struct Session *session)
537{
538 struct GSC_ClientActiveRequest *pos;
539 struct GSC_ClientActiveRequest *nxt;
540 struct GNUNET_TIME_Absolute now;
541
542 now = GNUNET_TIME_absolute_get ();
543 pos = NULL;
544 nxt = session->active_client_request_head;
545 while (NULL != nxt)
546 {
547 pos = nxt;
548 nxt = pos->next;
549 if ( (pos->deadline.abs_value_us < now.abs_value_us) &&
550 (GNUNET_YES != pos->was_solicited) )
551 {
552 GNUNET_STATISTICS_update (GSC_stats,
553 gettext_noop
554 ("# messages discarded (expired prior to transmission)"),
555 1, GNUNET_NO);
556 GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
557 session->active_client_request_tail,
558 pos);
559 GSC_CLIENTS_reject_request (pos);
560 }
561 }
562} 535}
563 536
564 537
@@ -578,7 +551,6 @@ solicit_messages (struct Session *session,
578 size_t so_size; 551 size_t so_size;
579 enum GNUNET_CORE_Priority pmax; 552 enum GNUNET_CORE_Priority pmax;
580 553
581 discard_expired_requests (session);
582 so_size = msize; 554 so_size = msize;
583 pmax = GNUNET_CORE_PRIO_BACKGROUND; 555 pmax = GNUNET_CORE_PRIO_BACKGROUND;
584 for (car = session->active_client_request_head; NULL != car; car = car->next) 556 for (car = session->active_client_request_head; NULL != car; car = car->next)
@@ -599,6 +571,9 @@ solicit_messages (struct Session *session,
599 if (GNUNET_YES == car->was_solicited) 571 if (GNUNET_YES == car->was_solicited)
600 continue; 572 continue;
601 car->was_solicited = GNUNET_YES; 573 car->was_solicited = GNUNET_YES;
574 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575 "Soliciting message with priority %u\n",
576 car->priority);
602 GSC_CLIENTS_solicit_request (car); 577 GSC_CLIENTS_solicit_request (car);
603 } 578 }
604} 579}
@@ -642,7 +617,11 @@ try_transmission (struct Session *session)
642 int excess; 617 int excess;
643 618
644 if (GNUNET_YES != session->ready_to_transmit) 619 if (GNUNET_YES != session->ready_to_transmit)
620 {
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "Already ready to transmit, not evaluating queue\n");
645 return; 623 return;
624 }
646 msize = 0; 625 msize = 0;
647 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; 626 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
648 /* if the peer has excess bandwidth, background traffic is allowed, 627 /* if the peer has excess bandwidth, background traffic is allowed,
@@ -745,6 +724,11 @@ try_transmission (struct Session *session)
745 &pop_cork_task, 724 &pop_cork_task,
746 session); 725 session);
747 } 726 }
727 else
728 {
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Queue empty, waiting for solicitations\n");
731 }
748 return; 732 return;
749 } 733 }
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/core/test_core_api_reliability.c b/src/core/test_core_api_reliability.c
index f2fadb59e..bd55565e3 100644
--- a/src/core/test_core_api_reliability.c
+++ b/src/core/test_core_api_reliability.c
@@ -41,7 +41,7 @@
41/** 41/**
42 * How long until we give up on transmitting the message? 42 * How long until we give up on transmitting the message?
43 */ 43 */
44#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 6000) 44#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600)
45 45
46/** 46/**
47 * What delay do we request from the core service for transmission? 47 * What delay do we request from the core service for transmission?
@@ -55,9 +55,9 @@ static unsigned long long total_bytes;
55 55
56static struct GNUNET_TIME_Absolute start_time; 56static struct GNUNET_TIME_Absolute start_time;
57 57
58static struct GNUNET_SCHEDULER_Task * err_task; 58static struct GNUNET_SCHEDULER_Task *err_task;
59 59
60static struct GNUNET_SCHEDULER_Task * connect_task; 60static struct GNUNET_SCHEDULER_Task *connect_task;
61 61
62 62
63struct PeerContext 63struct PeerContext
@@ -185,9 +185,9 @@ transmit_ready (void *cls, size_t size, void *buf)
185 unsigned int ret; 185 unsigned int ret;
186 186
187 GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); 187 GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
188 if (buf == NULL) 188 if (NULL == buf)
189 { 189 {
190 if (p1.ch != NULL) 190 if (NULL != p1.ch)
191 GNUNET_break (NULL != 191 GNUNET_break (NULL !=
192 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO, 192 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
193 GNUNET_CORE_PRIO_BEST_EFFORT, 193 GNUNET_CORE_PRIO_BEST_EFFORT,
@@ -221,7 +221,8 @@ transmit_ready (void *cls, size_t size, void *buf)
221 while (size - ret >= s); 221 while (size - ret >= s);
222 GNUNET_SCHEDULER_cancel (err_task); 222 GNUNET_SCHEDULER_cancel (err_task);
223 err_task = 223 err_task =
224 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); 224 GNUNET_SCHEDULER_add_delayed (TIMEOUT,
225 &terminate_task_error, NULL);
225 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
226 "Returning total message block of size %u\n", ret); 227 "Returning total message block of size %u\n", ret);
227 total_bytes += ret; 228 total_bytes += ret;
@@ -241,10 +242,10 @@ connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
241 if (pc == &p1) 242 if (pc == &p1)
242 { 243 {
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Encrypted connection established to peer `%4s'\n", 245 "Encrypted connection established to peer `%s'\n",
245 GNUNET_i2s (peer)); 246 GNUNET_i2s (peer));
246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 247 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
247 "Asking core (1) for transmission to peer `%4s'\n", 248 "Asking core (1) for transmission to peer `%s'\n",
248 GNUNET_i2s (&p2.id)); 249 GNUNET_i2s (&p2.id));
249 GNUNET_SCHEDULER_cancel (err_task); 250 GNUNET_SCHEDULER_cancel (err_task);
250 err_task = 251 err_task =
@@ -268,7 +269,7 @@ disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
268 if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity))) 269 if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
269 return; 270 return;
270 pc->connect_status = 0; 271 pc->connect_status = 0;
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%4s' cut\n", 272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%s' cut\n",
272 GNUNET_i2s (peer)); 273 GNUNET_i2s (peer));
273} 274}
274 275
@@ -278,7 +279,7 @@ inbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
278 const struct GNUNET_MessageHeader *message) 279 const struct GNUNET_MessageHeader *message)
279{ 280{
280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281 "Core provides inbound data from `%4s'.\n", GNUNET_i2s (other)); 282 "Core provides inbound data from `%s'.\n", GNUNET_i2s (other));
282 return GNUNET_OK; 283 return GNUNET_OK;
283} 284}
284 285
@@ -288,7 +289,7 @@ outbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
288 const struct GNUNET_MessageHeader *message) 289 const struct GNUNET_MessageHeader *message)
289{ 290{
290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 291 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
291 "Core notifies about outbound data for `%4s'.\n", 292 "Core notifies about outbound data for `%s'.\n",
292 GNUNET_i2s (other)); 293 GNUNET_i2s (other));
293 return GNUNET_OK; 294 return GNUNET_OK;
294} 295}
@@ -299,7 +300,8 @@ transmit_ready (void *cls, size_t size, void *buf);
299 300
300 301
301static int 302static int
302process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer, 303process_mtype (void *cls,
304 const struct GNUNET_PeerIdentity *peer,
303 const struct GNUNET_MessageHeader *message) 305 const struct GNUNET_MessageHeader *message)
304{ 306{
305 static int n; 307 static int n;
@@ -342,9 +344,11 @@ process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer,
342 { 344 {
343 if (n == tr_n) 345 if (n == tr_n)
344 GNUNET_break (NULL != 346 GNUNET_break (NULL !=
345 GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO, 347 GNUNET_CORE_notify_transmit_ready (p1.ch,
348 GNUNET_NO /* no cork */,
346 GNUNET_CORE_PRIO_BEST_EFFORT, 349 GNUNET_CORE_PRIO_BEST_EFFORT,
347 FAST_TIMEOUT, &p2.id, 350 FAST_TIMEOUT /* ignored! */,
351 &p2.id,
348 get_size (tr_n), 352 get_size (tr_n),
349 &transmit_ready, &p1)); 353 &transmit_ready, &p1));
350 } 354 }
@@ -365,7 +369,7 @@ init_notify (void *cls,
365 struct PeerContext *p = cls; 369 struct PeerContext *p = cls;
366 370
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Connection to CORE service of `%4s' established\n", 372 "Connection to CORE service of `%s' established\n",
369 GNUNET_i2s (my_identity)); 373 GNUNET_i2s (my_identity));
370 p->id = *my_identity; 374 p->id = *my_identity;
371 if (cls == &p1) 375 if (cls == &p1)
@@ -373,9 +377,13 @@ init_notify (void *cls,
373 GNUNET_assert (ok == 2); 377 GNUNET_assert (ok == 2);
374 OKPP; 378 OKPP;
375 /* connect p2 */ 379 /* connect p2 */
376 GNUNET_assert (NULL != (p2.ch = GNUNET_CORE_connect (p2.cfg, &p2, &init_notify, &connect_notify, 380 GNUNET_assert (NULL != (p2.ch = GNUNET_CORE_connect (p2.cfg, &p2,
377 &disconnect_notify, &inbound_notify, GNUNET_YES, 381 &init_notify,
378 &outbound_notify, GNUNET_YES, handlers))); 382 &connect_notify,
383 &disconnect_notify,
384 &inbound_notify, GNUNET_YES,
385 &outbound_notify, GNUNET_YES,
386 handlers)));
379 } 387 }
380 else 388 else
381 { 389 {
@@ -383,7 +391,7 @@ init_notify (void *cls,
383 OKPP; 391 OKPP;
384 GNUNET_assert (cls == &p2); 392 GNUNET_assert (cls == &p2);
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "Asking transport (1) to connect to peer `%4s'\n", 394 "Asking transport (1) to connect to peer `%s'\n",
387 GNUNET_i2s (&p2.id)); 395 GNUNET_i2s (&p2.id));
388 connect_task = GNUNET_SCHEDULER_add_now (&try_connect, NULL); 396 connect_task = GNUNET_SCHEDULER_add_now (&try_connect, NULL);
389 } 397 }
@@ -443,9 +451,13 @@ run (void *cls, char *const *args, const char *cfgfile,
443 err_task = 451 err_task =
444 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); 452 GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
445 453
446 GNUNET_assert (NULL != (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1, &init_notify, &connect_notify, 454 GNUNET_assert (NULL != (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1,
447 &disconnect_notify, &inbound_notify, GNUNET_YES, 455 &init_notify,
448 &outbound_notify, GNUNET_YES, handlers))); 456 &connect_notify,
457 &disconnect_notify,
458 &inbound_notify, GNUNET_YES,
459 &outbound_notify, GNUNET_YES,
460 handlers)));
449} 461}
450 462
451 463
@@ -476,7 +488,7 @@ main (int argc, char *argv1[])
476 GNUNET_GETOPT_OPTION_END 488 GNUNET_GETOPT_OPTION_END
477 }; 489 };
478 ok = 1; 490 ok = 1;
479 GNUNET_log_setup ("test-core-api", 491 GNUNET_log_setup ("test-core-api-reliability",
480 "WARNING", 492 "WARNING",
481 NULL); 493 NULL);
482 GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv, 494 GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv,