diff options
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 207 |
1 files changed, 186 insertions, 21 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index a7e2a8c04..a90bef3b5 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -1736,6 +1736,11 @@ struct Queue | |||
1736 | const char *address; | 1736 | const char *address; |
1737 | 1737 | ||
1738 | /** | 1738 | /** |
1739 | * Is this queue of unlimited length. | ||
1740 | */ | ||
1741 | unsigned int unlimited_length; | ||
1742 | |||
1743 | /** | ||
1739 | * Task scheduled for the time when this queue can (likely) transmit the | 1744 | * Task scheduled for the time when this queue can (likely) transmit the |
1740 | * next message. | 1745 | * next message. |
1741 | */ | 1746 | */ |
@@ -1787,6 +1792,11 @@ struct Queue | |||
1787 | unsigned int queue_length; | 1792 | unsigned int queue_length; |
1788 | 1793 | ||
1789 | /** | 1794 | /** |
1795 | * Capacity of the queue. | ||
1796 | */ | ||
1797 | uint64_t q_capacity; | ||
1798 | |||
1799 | /** | ||
1790 | * Queue priority | 1800 | * Queue priority |
1791 | */ | 1801 | */ |
1792 | uint32_t priority; | 1802 | uint32_t priority; |
@@ -3446,6 +3456,35 @@ transmit_on_queue (void *cls); | |||
3446 | 3456 | ||
3447 | 3457 | ||
3448 | /** | 3458 | /** |
3459 | * Check if the communicator has another queue with higher prio ready for sending. | ||
3460 | */ | ||
3461 | static unsigned int | ||
3462 | check_for_queue_with_higher_prio (struct Queue *queue, struct Queue *queue_head) | ||
3463 | { | ||
3464 | for (struct Queue *s = queue_head; NULL != s; | ||
3465 | s = s->next_client) | ||
3466 | { | ||
3467 | if (s->tc->details.communicator.address_prefix != | ||
3468 | queue->tc->details.communicator.address_prefix) | ||
3469 | { | ||
3470 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3471 | "queue address %s qid %u compare with queue: address %s qid %u\n", | ||
3472 | queue->address, | ||
3473 | queue->qid, | ||
3474 | s->address, | ||
3475 | s->qid); | ||
3476 | if ((s->priority > queue->priority) && (0 < s->q_capacity) && | ||
3477 | (QUEUE_LENGTH_LIMIT > s->queue_length) ) | ||
3478 | return GNUNET_YES; | ||
3479 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3480 | "Lower prio\n"); | ||
3481 | } | ||
3482 | } | ||
3483 | return GNUNET_NO; | ||
3484 | } | ||
3485 | |||
3486 | |||
3487 | /** | ||
3449 | * Called whenever something changed that might effect when we | 3488 | * Called whenever something changed that might effect when we |
3450 | * try to do the next transmission on @a queue using #transmit_on_queue(). | 3489 | * try to do the next transmission on @a queue using #transmit_on_queue(). |
3451 | * | 3490 | * |
@@ -3456,6 +3495,11 @@ static void | |||
3456 | schedule_transmit_on_queue (struct Queue *queue, | 3495 | schedule_transmit_on_queue (struct Queue *queue, |
3457 | enum GNUNET_SCHEDULER_Priority p) | 3496 | enum GNUNET_SCHEDULER_Priority p) |
3458 | { | 3497 | { |
3498 | if (check_for_queue_with_higher_prio (queue, | ||
3499 | queue->tc->details.communicator. | ||
3500 | queue_head)) | ||
3501 | return; | ||
3502 | |||
3459 | if (queue->tc->details.communicator.total_queue_length >= | 3503 | if (queue->tc->details.communicator.total_queue_length >= |
3460 | COMMUNICATOR_TOTAL_QUEUE_LIMIT) | 3504 | COMMUNICATOR_TOTAL_QUEUE_LIMIT) |
3461 | { | 3505 | { |
@@ -3480,6 +3524,19 @@ schedule_transmit_on_queue (struct Queue *queue, | |||
3480 | queue->idle = GNUNET_NO; | 3524 | queue->idle = GNUNET_NO; |
3481 | return; | 3525 | return; |
3482 | } | 3526 | } |
3527 | if (0 == queue->q_capacity) | ||
3528 | { | ||
3529 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3530 | "Transmission throttled due to communicator message queue qid %u has capacity %lu.\n", | ||
3531 | queue->qid, | ||
3532 | queue->q_capacity); | ||
3533 | GNUNET_STATISTICS_update (GST_stats, | ||
3534 | "# Transmission throttled due to message queue capacity", | ||
3535 | 1, | ||
3536 | GNUNET_NO); | ||
3537 | queue->idle = GNUNET_NO; | ||
3538 | return; | ||
3539 | } | ||
3483 | /* queue might indeed be ready, schedule it */ | 3540 | /* queue might indeed be ready, schedule it */ |
3484 | if (NULL != queue->transmit_task) | 3541 | if (NULL != queue->transmit_task) |
3485 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 3542 | GNUNET_SCHEDULER_cancel (queue->transmit_task); |
@@ -3582,8 +3639,9 @@ free_queue (struct Queue *queue) | |||
3582 | tc->details.communicator.queue_head, | 3639 | tc->details.communicator.queue_head, |
3583 | tc->details.communicator.queue_tail, | 3640 | tc->details.communicator.queue_tail, |
3584 | queue); | 3641 | queue); |
3585 | maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= | 3642 | maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <= |
3586 | tc->details.communicator.total_queue_length); | 3643 | tc->details.communicator. |
3644 | total_queue_length); | ||
3587 | while (NULL != (qe = queue->queue_head)) | 3645 | while (NULL != (qe = queue->queue_head)) |
3588 | { | 3646 | { |
3589 | GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); | 3647 | GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); |
@@ -3597,7 +3655,7 @@ free_queue (struct Queue *queue) | |||
3597 | GNUNET_free (qe); | 3655 | GNUNET_free (qe); |
3598 | } | 3656 | } |
3599 | GNUNET_assert (0 == queue->queue_length); | 3657 | GNUNET_assert (0 == queue->queue_length); |
3600 | if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < | 3658 | if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT > |
3601 | tc->details.communicator.total_queue_length)) | 3659 | tc->details.communicator.total_queue_length)) |
3602 | { | 3660 | { |
3603 | /* Communicator dropped below threshold, resume all _other_ queues */ | 3661 | /* Communicator dropped below threshold, resume all _other_ queues */ |
@@ -4223,18 +4281,36 @@ queue_send_msg (struct Queue *queue, | |||
4223 | if (NULL != pm) | 4281 | if (NULL != pm) |
4224 | { | 4282 | { |
4225 | qe->pm = pm; | 4283 | qe->pm = pm; |
4226 | GNUNET_assert (NULL == pm->qe); | 4284 | // TODO Why do we have a retransmission. When we know, make decision if we still want this. |
4285 | // GNUNET_assert (NULL == pm->qe); | ||
4286 | /*if (NULL != pm->qe) | ||
4287 | { | ||
4288 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4289 | "Retransmitting message <%llu> remove pm from qe with MID: %llu \n", | ||
4290 | pm->logging_uuid, | ||
4291 | (unsigned long long) pm->qe->mid); | ||
4292 | pm->qe->pm = NULL; | ||
4293 | }*/ | ||
4227 | pm->qe = qe; | 4294 | pm->qe = qe; |
4228 | } | 4295 | } |
4229 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); | 4296 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); |
4230 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | 4297 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); |
4231 | queue->queue_length++; | 4298 | queue->queue_length++; |
4232 | queue->tc->details.communicator.total_queue_length++; | 4299 | queue->tc->details.communicator.total_queue_length++; |
4300 | if (GNUNET_NO == queue->unlimited_length) | ||
4301 | queue->q_capacity--; | ||
4302 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4303 | "Queue %s with qid %u has capacity %lu\n", | ||
4304 | queue->address, | ||
4305 | queue->qid, | ||
4306 | queue->q_capacity); | ||
4233 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT == | 4307 | if (COMMUNICATOR_TOTAL_QUEUE_LIMIT == |
4234 | queue->tc->details.communicator.total_queue_length) | 4308 | queue->tc->details.communicator.total_queue_length) |
4235 | queue->idle = GNUNET_NO; | 4309 | queue->idle = GNUNET_NO; |
4236 | if (QUEUE_LENGTH_LIMIT == queue->queue_length) | 4310 | if (QUEUE_LENGTH_LIMIT == queue->queue_length) |
4237 | queue->idle = GNUNET_NO; | 4311 | queue->idle = GNUNET_NO; |
4312 | if (0 == queue->q_capacity) | ||
4313 | queue->idle = GNUNET_NO; | ||
4238 | GNUNET_MQ_send (queue->tc->mq, env); | 4314 | GNUNET_MQ_send (queue->tc->mq, env); |
4239 | } | 4315 | } |
4240 | } | 4316 | } |
@@ -4672,8 +4748,16 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, | |||
4672 | struct GNUNET_TIME_Relative rtt1; | 4748 | struct GNUNET_TIME_Relative rtt1; |
4673 | struct GNUNET_TIME_Relative rtt2; | 4749 | struct GNUNET_TIME_Relative rtt2; |
4674 | 4750 | ||
4751 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4752 | "Trying to route message of type %u to %s without fc\n", | ||
4753 | ntohs (hdr->type), | ||
4754 | GNUNET_i2s (target)); | ||
4755 | |||
4756 | // TODO Do this elsewhere. vl should be given as parameter to method. | ||
4675 | vl = lookup_virtual_link (target); | 4757 | vl = lookup_virtual_link (target); |
4676 | GNUNET_assert (NULL != vl); | 4758 | GNUNET_assert (NULL != vl); |
4759 | if (NULL == vl) | ||
4760 | return GNUNET_TIME_UNIT_FOREVER_REL; | ||
4677 | n = vl->n; | 4761 | n = vl->n; |
4678 | dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; | 4762 | dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; |
4679 | if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) | 4763 | if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) |
@@ -4718,6 +4802,10 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, | |||
4718 | rtt2 = GNUNET_TIME_UNIT_FOREVER_REL; | 4802 | rtt2 = GNUNET_TIME_UNIT_FOREVER_REL; |
4719 | if (NULL != n) | 4803 | if (NULL != n) |
4720 | { | 4804 | { |
4805 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4806 | "Try to route message of type %u to %s without fc via neighbour\n", | ||
4807 | ntohs (hdr->type), | ||
4808 | GNUNET_i2s (target)); | ||
4721 | rtt1 = route_via_neighbour (n, hdr, options); | 4809 | rtt1 = route_via_neighbour (n, hdr, options); |
4722 | } | 4810 | } |
4723 | if (NULL != dv) | 4811 | if (NULL != dv) |
@@ -4889,7 +4977,9 @@ check_vl_transmission (struct VirtualLink *vl) | |||
4889 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | 4977 | schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
4890 | else | 4978 | else |
4891 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 4979 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
4892 | "Queue busy or invalid\n"); | 4980 | "Neighbour Queue QID: %u (%u) busy or invalid\n", |
4981 | queue->qid, | ||
4982 | queue->idle); | ||
4893 | } | 4983 | } |
4894 | } | 4984 | } |
4895 | /* Notify queues via DV that we are interested */ | 4985 | /* Notify queues via DV that we are interested */ |
@@ -4910,6 +5000,11 @@ check_vl_transmission (struct VirtualLink *vl) | |||
4910 | (queue->validated_until.abs_value_us > now.abs_value_us)) | 5000 | (queue->validated_until.abs_value_us > now.abs_value_us)) |
4911 | schedule_transmit_on_queue (queue, | 5001 | schedule_transmit_on_queue (queue, |
4912 | GNUNET_SCHEDULER_PRIORITY_BACKGROUND); | 5002 | GNUNET_SCHEDULER_PRIORITY_BACKGROUND); |
5003 | else | ||
5004 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5005 | "DV Queue QID: %u (%u) busy or invalid\n", | ||
5006 | queue->qid, | ||
5007 | queue->idle); | ||
4913 | } | 5008 | } |
4914 | } | 5009 | } |
4915 | } | 5010 | } |
@@ -4996,8 +5091,9 @@ handle_communicator_backchannel ( | |||
4996 | (const struct GNUNET_MessageHeader *) &cb[1]; | 5091 | (const struct GNUNET_MessageHeader *) &cb[1]; |
4997 | uint16_t isize = ntohs (inbox->size); | 5092 | uint16_t isize = ntohs (inbox->size); |
4998 | const char *is = ((const char *) &cb[1]) + isize; | 5093 | const char *is = ((const char *) &cb[1]) + isize; |
5094 | size_t slen = strlen (is) + 1; | ||
4999 | char | 5095 | char |
5000 | mbuf[isize | 5096 | mbuf[slen + isize |
5001 | + sizeof(struct | 5097 | + sizeof(struct |
5002 | TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN; | 5098 | TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN; |
5003 | struct TransportBackchannelEncapsulationMessage *be = | 5099 | struct TransportBackchannelEncapsulationMessage *be = |
@@ -5006,9 +5102,10 @@ handle_communicator_backchannel ( | |||
5006 | /* 0-termination of 'is' was checked already in | 5102 | /* 0-termination of 'is' was checked already in |
5007 | #check_communicator_backchannel() */ | 5103 | #check_communicator_backchannel() */ |
5008 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 5104 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
5009 | "Preparing backchannel transmission to %s:%s of type %u\n", | 5105 | "Preparing backchannel transmission to %s:%s of type %u and size %u\n", |
5010 | GNUNET_i2s (&cb->pid), | 5106 | GNUNET_i2s (&cb->pid), |
5011 | is, | 5107 | is, |
5108 | ntohs (inbox->type), | ||
5012 | ntohs (inbox->size)); | 5109 | ntohs (inbox->size)); |
5013 | /* encapsulate and encrypt message */ | 5110 | /* encapsulate and encrypt message */ |
5014 | be->header.type = | 5111 | be->header.type = |
@@ -5264,6 +5361,11 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
5264 | uint16_t size = ntohs (mh->size); | 5361 | uint16_t size = ntohs (mh->size); |
5265 | int have_core; | 5362 | int have_core; |
5266 | 5363 | ||
5364 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5365 | "Handling message of type %u with %u bytes\n", | ||
5366 | (unsigned int) ntohs (mh->type), | ||
5367 | (unsigned int) ntohs (mh->size)); | ||
5368 | |||
5267 | if ((size > UINT16_MAX - sizeof(struct InboundMessage)) || | 5369 | if ((size > UINT16_MAX - sizeof(struct InboundMessage)) || |
5268 | (size < sizeof(struct GNUNET_MessageHeader))) | 5370 | (size < sizeof(struct GNUNET_MessageHeader))) |
5269 | { | 5371 | { |
@@ -5290,6 +5392,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
5290 | 1, | 5392 | 1, |
5291 | GNUNET_NO); | 5393 | GNUNET_NO); |
5292 | 5394 | ||
5395 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5396 | "CORE messages of type %u with %u bytes dropped (virtual link still down)\n", | ||
5397 | (unsigned int) ntohs (mh->type), | ||
5398 | (unsigned int) ntohs (mh->size)); | ||
5293 | finish_cmc_handling (cmc); | 5399 | finish_cmc_handling (cmc); |
5294 | return; | 5400 | return; |
5295 | } | 5401 | } |
@@ -5299,7 +5405,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
5299 | "# CORE messages dropped (FC arithmetic overflow)", | 5405 | "# CORE messages dropped (FC arithmetic overflow)", |
5300 | 1, | 5406 | 1, |
5301 | GNUNET_NO); | 5407 | GNUNET_NO); |
5302 | 5408 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |
5409 | "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n", | ||
5410 | (unsigned int) ntohs (mh->type), | ||
5411 | (unsigned int) ntohs (mh->size)); | ||
5303 | finish_cmc_handling (cmc); | 5412 | finish_cmc_handling (cmc); |
5304 | return; | 5413 | return; |
5305 | } | 5414 | } |
@@ -5309,6 +5418,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
5309 | "# CORE messages dropped (FC window overflow)", | 5418 | "# CORE messages dropped (FC window overflow)", |
5310 | 1, | 5419 | 1, |
5311 | GNUNET_NO); | 5420 | GNUNET_NO); |
5421 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5422 | "CORE messages of type %u with %u bytes dropped (FC window overflow)\n", | ||
5423 | (unsigned int) ntohs (mh->type), | ||
5424 | (unsigned int) ntohs (mh->size)); | ||
5312 | finish_cmc_handling (cmc); | 5425 | finish_cmc_handling (cmc); |
5313 | return; | 5426 | return; |
5314 | } | 5427 | } |
@@ -5345,6 +5458,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
5345 | perspective of the other peer! */ | 5458 | perspective of the other peer! */ |
5346 | vl->incoming_fc_window_size_used += size; | 5459 | vl->incoming_fc_window_size_used += size; |
5347 | /* TODO-M1 */ | 5460 | /* TODO-M1 */ |
5461 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5462 | "Dropped message of type %u with %u bytes to CORE: no CORE client connected!", | ||
5463 | (unsigned int) ntohs (mh->type), | ||
5464 | (unsigned int) ntohs (mh->size)); | ||
5348 | finish_cmc_handling (cmc); | 5465 | finish_cmc_handling (cmc); |
5349 | return; | 5466 | return; |
5350 | } | 5467 | } |
@@ -6074,6 +6191,15 @@ handle_backchannel_encapsulation ( | |||
6074 | (const struct GNUNET_MessageHeader *) &be[1]; | 6191 | (const struct GNUNET_MessageHeader *) &be[1]; |
6075 | uint16_t isize = ntohs (inbox->size); | 6192 | uint16_t isize = ntohs (inbox->size); |
6076 | const char *target_communicator = ((const char *) inbox) + isize; | 6193 | const char *target_communicator = ((const char *) inbox) + isize; |
6194 | char *sender; | ||
6195 | char *self; | ||
6196 | |||
6197 | GNUNET_asprintf (&sender, | ||
6198 | "%s", | ||
6199 | GNUNET_i2s (&cmc->im.sender)); | ||
6200 | GNUNET_asprintf (&self, | ||
6201 | "%s", | ||
6202 | GNUNET_i2s (&GST_my_identity)); | ||
6077 | 6203 | ||
6078 | /* Find client providing this communicator */ | 6204 | /* Find client providing this communicator */ |
6079 | for (tc = clients_head; NULL != tc; tc = tc->next) | 6205 | for (tc = clients_head; NULL != tc; tc = tc->next) |
@@ -6095,8 +6221,9 @@ handle_backchannel_encapsulation ( | |||
6095 | } | 6221 | } |
6096 | /* Finally, deliver backchannel message to communicator */ | 6222 | /* Finally, deliver backchannel message to communicator */ |
6097 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 6223 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
6098 | "Delivering backchannel message from %s of type %u to %s\n", | 6224 | "Delivering backchannel message from %s to %s of type %u to %s\n", |
6099 | GNUNET_i2s (&cmc->im.sender), | 6225 | sender, |
6226 | self, | ||
6100 | ntohs (inbox->type), | 6227 | ntohs (inbox->type), |
6101 | target_communicator); | 6228 | target_communicator); |
6102 | env = GNUNET_MQ_msg_extra ( | 6229 | env = GNUNET_MQ_msg_extra ( |
@@ -8407,14 +8534,15 @@ fragment_message (struct Queue *queue, | |||
8407 | struct PendingMessage *ff; | 8534 | struct PendingMessage *ff; |
8408 | uint16_t mtu; | 8535 | uint16_t mtu; |
8409 | 8536 | ||
8410 | mtu = (0 == queue->mtu) | 8537 | mtu = (UINT16_MAX == queue->mtu) |
8411 | ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo) | 8538 | ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo) |
8412 | : queue->mtu; | 8539 | : queue->mtu; |
8413 | set_pending_message_uuid (pm); | 8540 | set_pending_message_uuid (pm); |
8414 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 8541 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
8415 | "Fragmenting message %llu <%llu> to %s for MTU %u\n", | 8542 | "Fragmenting message %llu <%llu> with size %u to %s for MTU %u\n", |
8416 | (unsigned long long) pm->msg_uuid.uuid, | 8543 | (unsigned long long) pm->msg_uuid.uuid, |
8417 | pm->logging_uuid, | 8544 | pm->logging_uuid, |
8545 | pm->bytes_msg, | ||
8418 | GNUNET_i2s (&pm->vl->target), | 8546 | GNUNET_i2s (&pm->vl->target), |
8419 | (unsigned int) mtu); | 8547 | (unsigned int) mtu); |
8420 | pa = prepare_pending_acknowledgement (queue, dvh, pm); | 8548 | pa = prepare_pending_acknowledgement (queue, dvh, pm); |
@@ -8700,7 +8828,7 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, | |||
8700 | GNUNET_TRANSPORT_SendMessageTo)) | 8828 | GNUNET_TRANSPORT_SendMessageTo)) |
8701 | || | 8829 | || |
8702 | (NULL != pos->head_frag /* fragments already exist, should | 8830 | (NULL != pos->head_frag /* fragments already exist, should |
8703 | respect that even if MTU is 0 for | 8831 | respect that even if MTU is UINT16_MAX for |
8704 | this queue */)) | 8832 | this queue */)) |
8705 | { | 8833 | { |
8706 | frag = GNUNET_YES; | 8834 | frag = GNUNET_YES; |
@@ -9069,12 +9197,17 @@ handle_send_message_ack (void *cls, | |||
9069 | qep = qep->next) | 9197 | qep = qep->next) |
9070 | { | 9198 | { |
9071 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 9199 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
9072 | "QueueEntry MID: %llu, Ack MID: %llu\n", | 9200 | "QueueEntry MID: %llu on queue QID: %llu, Ack MID: %llu\n", |
9073 | (unsigned long long) qep->mid, | 9201 | (unsigned long long) qep->mid, |
9202 | (unsigned long long) queue->qid, | ||
9074 | (unsigned long long) sma->mid); | 9203 | (unsigned long long) sma->mid); |
9075 | if (qep->mid != sma->mid) | 9204 | if (qep->mid != sma->mid) |
9076 | continue; | 9205 | continue; |
9077 | qe = qep; | 9206 | qe = qep; |
9207 | if ((NULL != qe->pm)&&(qe->pm->qe != qe)) | ||
9208 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
9209 | "For pending message %llu we had retransmissions.\n", | ||
9210 | qe->pm->logging_uuid); | ||
9078 | break; | 9211 | break; |
9079 | } | 9212 | } |
9080 | } | 9213 | } |
@@ -9125,12 +9258,26 @@ handle_send_message_ack (void *cls, | |||
9125 | GNUNET_NO); | 9258 | GNUNET_NO); |
9126 | schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | 9259 | schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); |
9127 | } | 9260 | } |
9261 | else if (1 == qe->queue->q_capacity) | ||
9262 | { | ||
9263 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
9264 | "Transmission rescheduled due to communicator message queue with qid %u has capacity %lu.\n", | ||
9265 | qe->queue->qid, | ||
9266 | qe->queue->q_capacity); | ||
9267 | /* message queue has capacity; only resume this one queue */ | ||
9268 | /* queue dropped below threshold; only resume this one queue */ | ||
9269 | GNUNET_STATISTICS_update (GST_stats, | ||
9270 | "# Transmission throttled due to message queue capacity", | ||
9271 | -1, | ||
9272 | GNUNET_NO); | ||
9273 | schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); | ||
9274 | } | ||
9128 | 9275 | ||
9129 | if (NULL != (pm = qe->pm)) | 9276 | if (NULL != (pm = qe->pm)) |
9130 | { | 9277 | { |
9131 | struct VirtualLink *vl; | 9278 | struct VirtualLink *vl; |
9132 | 9279 | ||
9133 | GNUNET_assert (qe == pm->qe); | 9280 | // GNUNET_assert (qe == pm->qe); |
9134 | pm->qe = NULL; | 9281 | pm->qe = NULL; |
9135 | /* If waiting for this communicator may have blocked transmission | 9282 | /* If waiting for this communicator may have blocked transmission |
9136 | of pm on other queues for this neighbour, force schedule | 9283 | of pm on other queues for this neighbour, force schedule |
@@ -9671,16 +9818,20 @@ handle_add_queue_message (void *cls, | |||
9671 | addr_len = ntohs (aqm->header.size) - sizeof(*aqm); | 9818 | addr_len = ntohs (aqm->header.size) - sizeof(*aqm); |
9672 | addr = (const char *) &aqm[1]; | 9819 | addr = (const char *) &aqm[1]; |
9673 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 9820 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
9674 | "New queue %s to %s available with QID %llu\n", | 9821 | "New queue %s to %s available with QID %llu and q_len %lu \n", |
9675 | addr, | 9822 | addr, |
9676 | GNUNET_i2s (&aqm->receiver), | 9823 | GNUNET_i2s (&aqm->receiver), |
9677 | (unsigned long long) aqm->qid); | 9824 | (unsigned long long) aqm->qid, |
9825 | GNUNET_ntohll (aqm->q_len)); | ||
9678 | queue = GNUNET_malloc (sizeof(struct Queue) + addr_len); | 9826 | queue = GNUNET_malloc (sizeof(struct Queue) + addr_len); |
9679 | queue->tc = tc; | 9827 | queue->tc = tc; |
9680 | queue->address = (const char *) &queue[1]; | 9828 | queue->address = (const char *) &queue[1]; |
9681 | queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL; | 9829 | queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL; |
9682 | queue->qid = aqm->qid; | 9830 | queue->qid = aqm->qid; |
9683 | queue->neighbour = neighbour; | 9831 | queue->neighbour = neighbour; |
9832 | if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (aqm->q_len)) | ||
9833 | queue->unlimited_length = GNUNET_YES; | ||
9834 | queue->q_capacity = GNUNET_ntohll (aqm->q_len); | ||
9684 | memcpy (&queue[1], addr, addr_len); | 9835 | memcpy (&queue[1], addr, addr_len); |
9685 | /* notify monitors about new queue */ | 9836 | /* notify monitors about new queue */ |
9686 | { | 9837 | { |
@@ -9752,10 +9903,14 @@ handle_update_queue_message (void *cls, | |||
9752 | target_queue->mtu = ntohl (msg->mtu); | 9903 | target_queue->mtu = ntohl (msg->mtu); |
9753 | target_queue->cs = msg->cs; | 9904 | target_queue->cs = msg->cs; |
9754 | target_queue->priority = ntohl (msg->priority); | 9905 | target_queue->priority = ntohl (msg->priority); |
9755 | /* The update message indicates how many _additional_ | 9906 | /* The update message indicates how many messages |
9756 | * messages the queue should be able to handle | 9907 | * the queue should be able to handle. |
9757 | */ | 9908 | */ |
9758 | target_queue->queue_length += GNUNET_ntohll (msg->q_len); | 9909 | if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (msg->q_len)) |
9910 | target_queue->unlimited_length = GNUNET_YES; | ||
9911 | else | ||
9912 | target_queue->unlimited_length = GNUNET_NO; | ||
9913 | target_queue->q_capacity = GNUNET_ntohll (msg->q_len); | ||
9759 | GNUNET_SERVICE_client_continue (tc->client); | 9914 | GNUNET_SERVICE_client_continue (tc->client); |
9760 | } | 9915 | } |
9761 | 9916 | ||
@@ -10179,8 +10334,18 @@ static void | |||
10179 | shutdown_task (void *cls) | 10334 | shutdown_task (void *cls) |
10180 | { | 10335 | { |
10181 | in_shutdown = GNUNET_YES; | 10336 | in_shutdown = GNUNET_YES; |
10337 | |||
10182 | if (NULL == clients_head) | 10338 | if (NULL == clients_head) |
10183 | do_shutdown (cls); | 10339 | { |
10340 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | ||
10341 | { | ||
10342 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
10343 | "client still connected: %u\n", | ||
10344 | tc->type); | ||
10345 | } | ||
10346 | } | ||
10347 | do_shutdown (cls); | ||
10348 | |||
10184 | } | 10349 | } |
10185 | 10350 | ||
10186 | 10351 | ||