aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c207
1 files changed, 186 insertions, 21 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index a7e2a8c04..a90bef3b5 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -1736,6 +1736,11 @@ struct Queue
1736 const char *address; 1736 const char *address;
1737 1737
1738 /** 1738 /**
1739 * Is this queue of unlimited length.
1740 */
1741 unsigned int unlimited_length;
1742
1743 /**
1739 * Task scheduled for the time when this queue can (likely) transmit the 1744 * Task scheduled for the time when this queue can (likely) transmit the
1740 * next message. 1745 * next message.
1741 */ 1746 */
@@ -1787,6 +1792,11 @@ struct Queue
1787 unsigned int queue_length; 1792 unsigned int queue_length;
1788 1793
1789 /** 1794 /**
1795 * Capacity of the queue.
1796 */
1797 uint64_t q_capacity;
1798
1799 /**
1790 * Queue priority 1800 * Queue priority
1791 */ 1801 */
1792 uint32_t priority; 1802 uint32_t priority;
@@ -3446,6 +3456,35 @@ transmit_on_queue (void *cls);
3446 3456
3447 3457
3448/** 3458/**
3459 * Check if the communicator has another queue with higher prio ready for sending.
3460 */
3461static unsigned int
3462check_for_queue_with_higher_prio (struct Queue *queue, struct Queue *queue_head)
3463{
3464 for (struct Queue *s = queue_head; NULL != s;
3465 s = s->next_client)
3466 {
3467 if (s->tc->details.communicator.address_prefix !=
3468 queue->tc->details.communicator.address_prefix)
3469 {
3470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3471 "queue address %s qid %u compare with queue: address %s qid %u\n",
3472 queue->address,
3473 queue->qid,
3474 s->address,
3475 s->qid);
3476 if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3477 (QUEUE_LENGTH_LIMIT > s->queue_length) )
3478 return GNUNET_YES;
3479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3480 "Lower prio\n");
3481 }
3482 }
3483 return GNUNET_NO;
3484}
3485
3486
3487/**
3449 * Called whenever something changed that might effect when we 3488 * Called whenever something changed that might effect when we
3450 * try to do the next transmission on @a queue using #transmit_on_queue(). 3489 * try to do the next transmission on @a queue using #transmit_on_queue().
3451 * 3490 *
@@ -3456,6 +3495,11 @@ static void
3456schedule_transmit_on_queue (struct Queue *queue, 3495schedule_transmit_on_queue (struct Queue *queue,
3457 enum GNUNET_SCHEDULER_Priority p) 3496 enum GNUNET_SCHEDULER_Priority p)
3458{ 3497{
3498 if (check_for_queue_with_higher_prio (queue,
3499 queue->tc->details.communicator.
3500 queue_head))
3501 return;
3502
3459 if (queue->tc->details.communicator.total_queue_length >= 3503 if (queue->tc->details.communicator.total_queue_length >=
3460 COMMUNICATOR_TOTAL_QUEUE_LIMIT) 3504 COMMUNICATOR_TOTAL_QUEUE_LIMIT)
3461 { 3505 {
@@ -3480,6 +3524,19 @@ schedule_transmit_on_queue (struct Queue *queue,
3480 queue->idle = GNUNET_NO; 3524 queue->idle = GNUNET_NO;
3481 return; 3525 return;
3482 } 3526 }
3527 if (0 == queue->q_capacity)
3528 {
3529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3530 "Transmission throttled due to communicator message queue qid %u has capacity %lu.\n",
3531 queue->qid,
3532 queue->q_capacity);
3533 GNUNET_STATISTICS_update (GST_stats,
3534 "# Transmission throttled due to message queue capacity",
3535 1,
3536 GNUNET_NO);
3537 queue->idle = GNUNET_NO;
3538 return;
3539 }
3483 /* queue might indeed be ready, schedule it */ 3540 /* queue might indeed be ready, schedule it */
3484 if (NULL != queue->transmit_task) 3541 if (NULL != queue->transmit_task)
3485 GNUNET_SCHEDULER_cancel (queue->transmit_task); 3542 GNUNET_SCHEDULER_cancel (queue->transmit_task);
@@ -3582,8 +3639,9 @@ free_queue (struct Queue *queue)
3582 tc->details.communicator.queue_head, 3639 tc->details.communicator.queue_head,
3583 tc->details.communicator.queue_tail, 3640 tc->details.communicator.queue_tail,
3584 queue); 3641 queue);
3585 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= 3642 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <=
3586 tc->details.communicator.total_queue_length); 3643 tc->details.communicator.
3644 total_queue_length);
3587 while (NULL != (qe = queue->queue_head)) 3645 while (NULL != (qe = queue->queue_head))
3588 { 3646 {
3589 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); 3647 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
@@ -3597,7 +3655,7 @@ free_queue (struct Queue *queue)
3597 GNUNET_free (qe); 3655 GNUNET_free (qe);
3598 } 3656 }
3599 GNUNET_assert (0 == queue->queue_length); 3657 GNUNET_assert (0 == queue->queue_length);
3600 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < 3658 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3601 tc->details.communicator.total_queue_length)) 3659 tc->details.communicator.total_queue_length))
3602 { 3660 {
3603 /* Communicator dropped below threshold, resume all _other_ queues */ 3661 /* Communicator dropped below threshold, resume all _other_ queues */
@@ -4223,18 +4281,36 @@ queue_send_msg (struct Queue *queue,
4223 if (NULL != pm) 4281 if (NULL != pm)
4224 { 4282 {
4225 qe->pm = pm; 4283 qe->pm = pm;
4226 GNUNET_assert (NULL == pm->qe); 4284 // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4285 // GNUNET_assert (NULL == pm->qe);
4286 /*if (NULL != pm->qe)
4287 {
4288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4289 "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4290 pm->logging_uuid,
4291 (unsigned long long) pm->qe->mid);
4292 pm->qe->pm = NULL;
4293 }*/
4227 pm->qe = qe; 4294 pm->qe = qe;
4228 } 4295 }
4229 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); 4296 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4230 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); 4297 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4231 queue->queue_length++; 4298 queue->queue_length++;
4232 queue->tc->details.communicator.total_queue_length++; 4299 queue->tc->details.communicator.total_queue_length++;
4300 if (GNUNET_NO == queue->unlimited_length)
4301 queue->q_capacity--;
4302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4303 "Queue %s with qid %u has capacity %lu\n",
4304 queue->address,
4305 queue->qid,
4306 queue->q_capacity);
4233 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT == 4307 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
4234 queue->tc->details.communicator.total_queue_length) 4308 queue->tc->details.communicator.total_queue_length)
4235 queue->idle = GNUNET_NO; 4309 queue->idle = GNUNET_NO;
4236 if (QUEUE_LENGTH_LIMIT == queue->queue_length) 4310 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4237 queue->idle = GNUNET_NO; 4311 queue->idle = GNUNET_NO;
4312 if (0 == queue->q_capacity)
4313 queue->idle = GNUNET_NO;
4238 GNUNET_MQ_send (queue->tc->mq, env); 4314 GNUNET_MQ_send (queue->tc->mq, env);
4239 } 4315 }
4240} 4316}
@@ -4672,8 +4748,16 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4672 struct GNUNET_TIME_Relative rtt1; 4748 struct GNUNET_TIME_Relative rtt1;
4673 struct GNUNET_TIME_Relative rtt2; 4749 struct GNUNET_TIME_Relative rtt2;
4674 4750
4751 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4752 "Trying to route message of type %u to %s without fc\n",
4753 ntohs (hdr->type),
4754 GNUNET_i2s (target));
4755
4756 // TODO Do this elsewhere. vl should be given as parameter to method.
4675 vl = lookup_virtual_link (target); 4757 vl = lookup_virtual_link (target);
4676 GNUNET_assert (NULL != vl); 4758 GNUNET_assert (NULL != vl);
4759 if (NULL == vl)
4760 return GNUNET_TIME_UNIT_FOREVER_REL;
4677 n = vl->n; 4761 n = vl->n;
4678 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; 4762 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4679 if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) 4763 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
@@ -4718,6 +4802,10 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4718 rtt2 = GNUNET_TIME_UNIT_FOREVER_REL; 4802 rtt2 = GNUNET_TIME_UNIT_FOREVER_REL;
4719 if (NULL != n) 4803 if (NULL != n)
4720 { 4804 {
4805 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4806 "Try to route message of type %u to %s without fc via neighbour\n",
4807 ntohs (hdr->type),
4808 GNUNET_i2s (target));
4721 rtt1 = route_via_neighbour (n, hdr, options); 4809 rtt1 = route_via_neighbour (n, hdr, options);
4722 } 4810 }
4723 if (NULL != dv) 4811 if (NULL != dv)
@@ -4889,7 +4977,9 @@ check_vl_transmission (struct VirtualLink *vl)
4889 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 4977 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
4890 else 4978 else
4891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4892 "Queue busy or invalid\n"); 4980 "Neighbour Queue QID: %u (%u) busy or invalid\n",
4981 queue->qid,
4982 queue->idle);
4893 } 4983 }
4894 } 4984 }
4895 /* Notify queues via DV that we are interested */ 4985 /* Notify queues via DV that we are interested */
@@ -4910,6 +5000,11 @@ check_vl_transmission (struct VirtualLink *vl)
4910 (queue->validated_until.abs_value_us > now.abs_value_us)) 5000 (queue->validated_until.abs_value_us > now.abs_value_us))
4911 schedule_transmit_on_queue (queue, 5001 schedule_transmit_on_queue (queue,
4912 GNUNET_SCHEDULER_PRIORITY_BACKGROUND); 5002 GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
5003 else
5004 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5005 "DV Queue QID: %u (%u) busy or invalid\n",
5006 queue->qid,
5007 queue->idle);
4913 } 5008 }
4914 } 5009 }
4915} 5010}
@@ -4996,8 +5091,9 @@ handle_communicator_backchannel (
4996 (const struct GNUNET_MessageHeader *) &cb[1]; 5091 (const struct GNUNET_MessageHeader *) &cb[1];
4997 uint16_t isize = ntohs (inbox->size); 5092 uint16_t isize = ntohs (inbox->size);
4998 const char *is = ((const char *) &cb[1]) + isize; 5093 const char *is = ((const char *) &cb[1]) + isize;
5094 size_t slen = strlen (is) + 1;
4999 char 5095 char
5000 mbuf[isize 5096 mbuf[slen + isize
5001 + sizeof(struct 5097 + sizeof(struct
5002 TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN; 5098 TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
5003 struct TransportBackchannelEncapsulationMessage *be = 5099 struct TransportBackchannelEncapsulationMessage *be =
@@ -5006,9 +5102,10 @@ handle_communicator_backchannel (
5006 /* 0-termination of 'is' was checked already in 5102 /* 0-termination of 'is' was checked already in
5007 #check_communicator_backchannel() */ 5103 #check_communicator_backchannel() */
5008 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5009 "Preparing backchannel transmission to %s:%s of type %u\n", 5105 "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5010 GNUNET_i2s (&cb->pid), 5106 GNUNET_i2s (&cb->pid),
5011 is, 5107 is,
5108 ntohs (inbox->type),
5012 ntohs (inbox->size)); 5109 ntohs (inbox->size));
5013 /* encapsulate and encrypt message */ 5110 /* encapsulate and encrypt message */
5014 be->header.type = 5111 be->header.type =
@@ -5264,6 +5361,11 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5264 uint16_t size = ntohs (mh->size); 5361 uint16_t size = ntohs (mh->size);
5265 int have_core; 5362 int have_core;
5266 5363
5364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5365 "Handling message of type %u with %u bytes\n",
5366 (unsigned int) ntohs (mh->type),
5367 (unsigned int) ntohs (mh->size));
5368
5267 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) || 5369 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
5268 (size < sizeof(struct GNUNET_MessageHeader))) 5370 (size < sizeof(struct GNUNET_MessageHeader)))
5269 { 5371 {
@@ -5290,6 +5392,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5290 1, 5392 1,
5291 GNUNET_NO); 5393 GNUNET_NO);
5292 5394
5395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5396 "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
5397 (unsigned int) ntohs (mh->type),
5398 (unsigned int) ntohs (mh->size));
5293 finish_cmc_handling (cmc); 5399 finish_cmc_handling (cmc);
5294 return; 5400 return;
5295 } 5401 }
@@ -5299,7 +5405,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5299 "# CORE messages dropped (FC arithmetic overflow)", 5405 "# CORE messages dropped (FC arithmetic overflow)",
5300 1, 5406 1,
5301 GNUNET_NO); 5407 GNUNET_NO);
5302 5408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5409 "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5410 (unsigned int) ntohs (mh->type),
5411 (unsigned int) ntohs (mh->size));
5303 finish_cmc_handling (cmc); 5412 finish_cmc_handling (cmc);
5304 return; 5413 return;
5305 } 5414 }
@@ -5309,6 +5418,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5309 "# CORE messages dropped (FC window overflow)", 5418 "# CORE messages dropped (FC window overflow)",
5310 1, 5419 1,
5311 GNUNET_NO); 5420 GNUNET_NO);
5421 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5422 "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5423 (unsigned int) ntohs (mh->type),
5424 (unsigned int) ntohs (mh->size));
5312 finish_cmc_handling (cmc); 5425 finish_cmc_handling (cmc);
5313 return; 5426 return;
5314 } 5427 }
@@ -5345,6 +5458,10 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5345 perspective of the other peer! */ 5458 perspective of the other peer! */
5346 vl->incoming_fc_window_size_used += size; 5459 vl->incoming_fc_window_size_used += size;
5347 /* TODO-M1 */ 5460 /* TODO-M1 */
5461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5462 "Dropped message of type %u with %u bytes to CORE: no CORE client connected!",
5463 (unsigned int) ntohs (mh->type),
5464 (unsigned int) ntohs (mh->size));
5348 finish_cmc_handling (cmc); 5465 finish_cmc_handling (cmc);
5349 return; 5466 return;
5350 } 5467 }
@@ -6074,6 +6191,15 @@ handle_backchannel_encapsulation (
6074 (const struct GNUNET_MessageHeader *) &be[1]; 6191 (const struct GNUNET_MessageHeader *) &be[1];
6075 uint16_t isize = ntohs (inbox->size); 6192 uint16_t isize = ntohs (inbox->size);
6076 const char *target_communicator = ((const char *) inbox) + isize; 6193 const char *target_communicator = ((const char *) inbox) + isize;
6194 char *sender;
6195 char *self;
6196
6197 GNUNET_asprintf (&sender,
6198 "%s",
6199 GNUNET_i2s (&cmc->im.sender));
6200 GNUNET_asprintf (&self,
6201 "%s",
6202 GNUNET_i2s (&GST_my_identity));
6077 6203
6078 /* Find client providing this communicator */ 6204 /* Find client providing this communicator */
6079 for (tc = clients_head; NULL != tc; tc = tc->next) 6205 for (tc = clients_head; NULL != tc; tc = tc->next)
@@ -6095,8 +6221,9 @@ handle_backchannel_encapsulation (
6095 } 6221 }
6096 /* Finally, deliver backchannel message to communicator */ 6222 /* Finally, deliver backchannel message to communicator */
6097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 6223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6098 "Delivering backchannel message from %s of type %u to %s\n", 6224 "Delivering backchannel message from %s to %s of type %u to %s\n",
6099 GNUNET_i2s (&cmc->im.sender), 6225 sender,
6226 self,
6100 ntohs (inbox->type), 6227 ntohs (inbox->type),
6101 target_communicator); 6228 target_communicator);
6102 env = GNUNET_MQ_msg_extra ( 6229 env = GNUNET_MQ_msg_extra (
@@ -8407,14 +8534,15 @@ fragment_message (struct Queue *queue,
8407 struct PendingMessage *ff; 8534 struct PendingMessage *ff;
8408 uint16_t mtu; 8535 uint16_t mtu;
8409 8536
8410 mtu = (0 == queue->mtu) 8537 mtu = (UINT16_MAX == queue->mtu)
8411 ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo) 8538 ? UINT16_MAX - sizeof(struct GNUNET_TRANSPORT_SendMessageTo)
8412 : queue->mtu; 8539 : queue->mtu;
8413 set_pending_message_uuid (pm); 8540 set_pending_message_uuid (pm);
8414 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8415 "Fragmenting message %llu <%llu> to %s for MTU %u\n", 8542 "Fragmenting message %llu <%llu> with size %u to %s for MTU %u\n",
8416 (unsigned long long) pm->msg_uuid.uuid, 8543 (unsigned long long) pm->msg_uuid.uuid,
8417 pm->logging_uuid, 8544 pm->logging_uuid,
8545 pm->bytes_msg,
8418 GNUNET_i2s (&pm->vl->target), 8546 GNUNET_i2s (&pm->vl->target),
8419 (unsigned int) mtu); 8547 (unsigned int) mtu);
8420 pa = prepare_pending_acknowledgement (queue, dvh, pm); 8548 pa = prepare_pending_acknowledgement (queue, dvh, pm);
@@ -8700,7 +8828,7 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
8700 GNUNET_TRANSPORT_SendMessageTo)) 8828 GNUNET_TRANSPORT_SendMessageTo))
8701 || 8829 ||
8702 (NULL != pos->head_frag /* fragments already exist, should 8830 (NULL != pos->head_frag /* fragments already exist, should
8703 respect that even if MTU is 0 for 8831 respect that even if MTU is UINT16_MAX for
8704 this queue */)) 8832 this queue */))
8705 { 8833 {
8706 frag = GNUNET_YES; 8834 frag = GNUNET_YES;
@@ -9069,12 +9197,17 @@ handle_send_message_ack (void *cls,
9069 qep = qep->next) 9197 qep = qep->next)
9070 { 9198 {
9071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 9199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9072 "QueueEntry MID: %llu, Ack MID: %llu\n", 9200 "QueueEntry MID: %llu on queue QID: %llu, Ack MID: %llu\n",
9073 (unsigned long long) qep->mid, 9201 (unsigned long long) qep->mid,
9202 (unsigned long long) queue->qid,
9074 (unsigned long long) sma->mid); 9203 (unsigned long long) sma->mid);
9075 if (qep->mid != sma->mid) 9204 if (qep->mid != sma->mid)
9076 continue; 9205 continue;
9077 qe = qep; 9206 qe = qep;
9207 if ((NULL != qe->pm)&&(qe->pm->qe != qe))
9208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9209 "For pending message %llu we had retransmissions.\n",
9210 qe->pm->logging_uuid);
9078 break; 9211 break;
9079 } 9212 }
9080 } 9213 }
@@ -9125,12 +9258,26 @@ handle_send_message_ack (void *cls,
9125 GNUNET_NO); 9258 GNUNET_NO);
9126 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9259 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9127 } 9260 }
9261 else if (1 == qe->queue->q_capacity)
9262 {
9263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9264 "Transmission rescheduled due to communicator message queue with qid %u has capacity %lu.\n",
9265 qe->queue->qid,
9266 qe->queue->q_capacity);
9267 /* message queue has capacity; only resume this one queue */
9268 /* queue dropped below threshold; only resume this one queue */
9269 GNUNET_STATISTICS_update (GST_stats,
9270 "# Transmission throttled due to message queue capacity",
9271 -1,
9272 GNUNET_NO);
9273 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9274 }
9128 9275
9129 if (NULL != (pm = qe->pm)) 9276 if (NULL != (pm = qe->pm))
9130 { 9277 {
9131 struct VirtualLink *vl; 9278 struct VirtualLink *vl;
9132 9279
9133 GNUNET_assert (qe == pm->qe); 9280 // GNUNET_assert (qe == pm->qe);
9134 pm->qe = NULL; 9281 pm->qe = NULL;
9135 /* If waiting for this communicator may have blocked transmission 9282 /* If waiting for this communicator may have blocked transmission
9136 of pm on other queues for this neighbour, force schedule 9283 of pm on other queues for this neighbour, force schedule
@@ -9671,16 +9818,20 @@ handle_add_queue_message (void *cls,
9671 addr_len = ntohs (aqm->header.size) - sizeof(*aqm); 9818 addr_len = ntohs (aqm->header.size) - sizeof(*aqm);
9672 addr = (const char *) &aqm[1]; 9819 addr = (const char *) &aqm[1];
9673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 9820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9674 "New queue %s to %s available with QID %llu\n", 9821 "New queue %s to %s available with QID %llu and q_len %lu \n",
9675 addr, 9822 addr,
9676 GNUNET_i2s (&aqm->receiver), 9823 GNUNET_i2s (&aqm->receiver),
9677 (unsigned long long) aqm->qid); 9824 (unsigned long long) aqm->qid,
9825 GNUNET_ntohll (aqm->q_len));
9678 queue = GNUNET_malloc (sizeof(struct Queue) + addr_len); 9826 queue = GNUNET_malloc (sizeof(struct Queue) + addr_len);
9679 queue->tc = tc; 9827 queue->tc = tc;
9680 queue->address = (const char *) &queue[1]; 9828 queue->address = (const char *) &queue[1];
9681 queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL; 9829 queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
9682 queue->qid = aqm->qid; 9830 queue->qid = aqm->qid;
9683 queue->neighbour = neighbour; 9831 queue->neighbour = neighbour;
9832 if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (aqm->q_len))
9833 queue->unlimited_length = GNUNET_YES;
9834 queue->q_capacity = GNUNET_ntohll (aqm->q_len);
9684 memcpy (&queue[1], addr, addr_len); 9835 memcpy (&queue[1], addr, addr_len);
9685 /* notify monitors about new queue */ 9836 /* notify monitors about new queue */
9686 { 9837 {
@@ -9752,10 +9903,14 @@ handle_update_queue_message (void *cls,
9752 target_queue->mtu = ntohl (msg->mtu); 9903 target_queue->mtu = ntohl (msg->mtu);
9753 target_queue->cs = msg->cs; 9904 target_queue->cs = msg->cs;
9754 target_queue->priority = ntohl (msg->priority); 9905 target_queue->priority = ntohl (msg->priority);
9755 /* The update message indicates how many _additional_ 9906 /* The update message indicates how many messages
9756 * messages the queue should be able to handle 9907 * the queue should be able to handle.
9757 */ 9908 */
9758 target_queue->queue_length += GNUNET_ntohll (msg->q_len); 9909 if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (msg->q_len))
9910 target_queue->unlimited_length = GNUNET_YES;
9911 else
9912 target_queue->unlimited_length = GNUNET_NO;
9913 target_queue->q_capacity = GNUNET_ntohll (msg->q_len);
9759 GNUNET_SERVICE_client_continue (tc->client); 9914 GNUNET_SERVICE_client_continue (tc->client);
9760} 9915}
9761 9916
@@ -10179,8 +10334,18 @@ static void
10179shutdown_task (void *cls) 10334shutdown_task (void *cls)
10180{ 10335{
10181 in_shutdown = GNUNET_YES; 10336 in_shutdown = GNUNET_YES;
10337
10182 if (NULL == clients_head) 10338 if (NULL == clients_head)
10183 do_shutdown (cls); 10339 {
10340 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
10341 {
10342 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
10343 "client still connected: %u\n",
10344 tc->type);
10345 }
10346 }
10347 do_shutdown (cls);
10348
10184} 10349}
10185 10350
10186 10351