aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-17 22:15:43 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-17 22:15:53 +0200
commitbd337cf7c6993eb8e97976ec0b088b317c57da0e (patch)
tree9d2274df3e8f9d4ec5ea0a86a558b942b0f258f4 /src/transport
parent936ab104abbf037af9405e397829a11467e1baa9 (diff)
downloadgnunet-bd337cf7c6993eb8e97976ec0b088b317c57da0e.tar.gz
gnunet-bd337cf7c6993eb8e97976ec0b088b317c57da0e.zip
only notify core about validated queues
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-service-tng.c382
1 files changed, 298 insertions, 84 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 29bf3bf95..f3874724a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -35,7 +35,25 @@
35 * Implement next: 35 * Implement next:
36 * - route_message() implementation, including using DV data structures 36 * - route_message() implementation, including using DV data structures
37 * (but not when routing certain message types, like DV learn, 37 * (but not when routing certain message types, like DV learn,
38 * MUST pay attention to content here -- or pass extra flags?) 38 * looks like now like we need two flags (DV/no-DV, confirmed-only,
39 * unconfirmed OK)
40 * + NOTE: do NOT use PendingMessage for route_message(), as that is
41 * for fragmentation/reliability and ultimately core flow control!
42 * => route_message() should pick the queue
43 * => in case of DV routing, route_message should BOX the message, too.
44 * - We currently do NEVER tell CORE also about DV-connections (core_visible
45 * of `struct DistanceVector` is simply never set!)
46 * + When? Easy if we initiated the DV and got the challenge; do that NOW
47 * BUT what we passively learned DV (unconfirmed freshness)
48 * => Do we trigger Challenge->Response there as well, or 'wait' for
49 * our own DV initiations to discover?
50 * => What about DV routes that expire? Do we also only count on
51 * our own DV initiations for maintenance here, or do we
52 * try to specifically re-confirm the existence of a particular path?
53 * => OPITMIZATION-FIXME!
54 * + Where do we track what we told core? Careful: need to check
55 * the "core_visible' flag in both neighbours and DV before
56 * sending out notifications to CORE!
39 * - retransmission logic 57 * - retransmission logic
40 * - track RTT, distance, loss, etc. => requires extra data structures! 58 * - track RTT, distance, loss, etc. => requires extra data structures!
41 * 59 *
@@ -55,11 +73,16 @@
55 * FIXME (without marks in the code!): 73 * FIXME (without marks in the code!):
56 * - proper use/initialization of timestamps in messages exchanged 74 * - proper use/initialization of timestamps in messages exchanged
57 * during DV learning 75 * during DV learning
76 * - persistence of monotonic time obtained from other peers
77 * in PEERSTORE (by message type)
58 * 78 *
59 * Optimizations: 79 * Optimizations:
60 * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs 80 * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
61 * against our pending message queue (requires additional per neighbour 81 * against our pending message queue (requires additional per neighbour
62 * hash map to be maintained, avoids possible linear scan on pending msgs) 82 * hash map to be maintained, avoids possible linear scan on pending msgs)
83 * - queue_send_msg and route_message both by API design have to make copies
84 * of the payload, and route_message on top of that requires a malloc/free.
85 * Change design to approximate "zero" copy better...
63 * 86 *
64 * Design realizations / discussion: 87 * Design realizations / discussion:
65 * - communicators do flow control by calling MQ "notify sent" 88 * - communicators do flow control by calling MQ "notify sent"
@@ -1060,6 +1083,13 @@ struct DistanceVector
1060 * Task scheduled to purge expired paths from @e dv_head MDLL. 1083 * Task scheduled to purge expired paths from @e dv_head MDLL.
1061 */ 1084 */
1062 struct GNUNET_SCHEDULER_Task *timeout_task; 1085 struct GNUNET_SCHEDULER_Task *timeout_task;
1086
1087 /**
1088 * Is one of the DV paths in this struct 'confirmed' and thus
1089 * the cause for CORE to see this peer as connected? (Note that
1090 * the same may apply to a `struct Neighbour` at the same time.)
1091 */
1092 int core_visible;
1063}; 1093};
1064 1094
1065 1095
@@ -1162,11 +1192,26 @@ struct Queue
1162 struct GNUNET_SCHEDULER_Task *transmit_task; 1192 struct GNUNET_SCHEDULER_Task *transmit_task;
1163 1193
1164 /** 1194 /**
1195 * Task scheduled to possibly notfiy core that this queue is no longer
1196 * counting as confirmed. Runs the #core_queue_visibility_check().
1197 */
1198 struct GNUNET_SCHEDULER_Task *visibility_task;
1199
1200 /**
1165 * Our current RTT estimate for this queue. 1201 * Our current RTT estimate for this queue.
1166 */ 1202 */
1167 struct GNUNET_TIME_Relative rtt; 1203 struct GNUNET_TIME_Relative rtt;
1168 1204
1169 /** 1205 /**
1206 * How long do *we* consider this @e address to be valid? In the past or
1207 * zero if we have not yet validated it. Can be updated based on
1208 * challenge-response validations (via address validation logic), or when we
1209 * receive ACKs that we can definitively map to transmissions via this
1210 * queue.
1211 */
1212 struct GNUNET_TIME_Absolute validated_until;
1213
1214 /**
1170 * Message ID generator for transmissions on this queue. 1215 * Message ID generator for transmissions on this queue.
1171 */ 1216 */
1172 uint64_t mid_gen; 1217 uint64_t mid_gen;
@@ -1397,6 +1442,11 @@ struct Neighbour
1397 */ 1442 */
1398 struct GNUNET_TIME_Absolute earliest_timeout; 1443 struct GNUNET_TIME_Absolute earliest_timeout;
1399 1444
1445 /**
1446 * Do we have a confirmed working queue and are thus visible to
1447 * CORE?
1448 */
1449 int core_visible;
1400}; 1450};
1401 1451
1402 1452
@@ -2514,6 +2564,16 @@ schedule_transmit_on_queue (struct Queue *queue)
2514 2564
2515 2565
2516/** 2566/**
2567 * Check whether the CORE visibility of @a n changed. If so,
2568 * check whether we need to notify CORE.
2569 *
2570 * @param n neighbour to perform the check for
2571 */
2572static void
2573update_neighbour_core_visibility (struct Neighbour *n);
2574
2575
2576/**
2517 * Free @a queue. 2577 * Free @a queue.
2518 * 2578 *
2519 * @param queue the queue to free 2579 * @param queue the queue to free
@@ -2535,6 +2595,11 @@ free_queue (struct Queue *queue)
2535 GNUNET_SCHEDULER_cancel (queue->transmit_task); 2595 GNUNET_SCHEDULER_cancel (queue->transmit_task);
2536 queue->transmit_task = NULL; 2596 queue->transmit_task = NULL;
2537 } 2597 }
2598 if (NULL != queue->visibility_task)
2599 {
2600 GNUNET_SCHEDULER_cancel (queue->visibility_task);
2601 queue->visibility_task = NULL;
2602 }
2538 GNUNET_CONTAINER_MDLL_remove (neighbour, 2603 GNUNET_CONTAINER_MDLL_remove (neighbour,
2539 neighbour->queue_head, 2604 neighbour->queue_head,
2540 neighbour->queue_tail, 2605 neighbour->queue_tail,
@@ -2574,9 +2639,12 @@ free_queue (struct Queue *queue)
2574 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); 2639 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2575 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); 2640 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2576 GNUNET_free (queue); 2641 GNUNET_free (queue);
2642
2643 update_neighbour_core_visibility (neighbour);
2644 cores_send_disconnect_info (&neighbour->pid);
2645
2577 if (NULL == neighbour->queue_head) 2646 if (NULL == neighbour->queue_head)
2578 { 2647 {
2579 cores_send_disconnect_info (&neighbour->pid);
2580 free_neighbour (neighbour); 2648 free_neighbour (neighbour);
2581 } 2649 }
2582} 2650}
@@ -3209,16 +3277,89 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
3209 3277
3210 3278
3211/** 3279/**
3280 * Send the control message @a payload on @a queue.
3281 *
3282 * @param queue the queue to use for transmission
3283 * @param pm pending message to update once transmission is done, may be NULL!
3284 * @param payload the payload to send (encapsulated in a
3285 * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
3286 * @param payload_size number of bytes in @a payload
3287 */
3288static void
3289queue_send_msg (struct Queue *queue,
3290 struct PendingMessage *pm,
3291 const void *payload,
3292 size_t payload_size)
3293{
3294 struct Neighbour *n = queue->neighbour;
3295 struct GNUNET_TRANSPORT_SendMessageTo *smt;
3296 struct GNUNET_MQ_Envelope *env;
3297
3298 env = GNUNET_MQ_msg_extra (smt,
3299 payload_size,
3300 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3301 smt->qid = queue->qid;
3302 smt->mid = queue->mid_gen;
3303 smt->receiver = n->pid;
3304 memcpy (&smt[1],
3305 payload,
3306 payload_size);
3307 {
3308 /* Pass the env to the communicator of queue for transmission. */
3309 struct QueueEntry *qe;
3310
3311 qe = GNUNET_new (struct QueueEntry);
3312 qe->mid = queue->mid_gen++;
3313 qe->queue = queue;
3314 // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
3315 // (also, note that pm may be NULL!)
3316 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3317 queue->queue_tail,
3318 qe);
3319 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3320 queue->queue_length++;
3321 queue->tc->details.communicator.total_queue_length++;
3322 GNUNET_MQ_send (queue->tc->mq,
3323 env);
3324 }
3325}
3326
3327
3328/**
3212 * We need to transmit @a hdr to @a target. If necessary, this may 3329 * We need to transmit @a hdr to @a target. If necessary, this may
3213 * involve DV routing or even broadcasting and fragmentation. 3330 * involve DV routing or even broadcasting and fragmentation.
3214 * 3331 *
3215 * @param target peer to receive @a hdr 3332 * @param target peer to receive @a hdr
3216 * @param hdr header of the message to route 3333 * @param hdr header of the message to route and #GNUNET_free()
3217 */ 3334 */
3218static void 3335static void
3219route_message (const struct GNUNET_PeerIdentity *target, 3336route_message (const struct GNUNET_PeerIdentity *target,
3220 struct GNUNET_MessageHeader *hdr) 3337 struct GNUNET_MessageHeader *hdr)
3221{ 3338{
3339 // Cases:
3340 // 1: called to transmit backchannel message we initiated
3341 // 2: called to transmit fragment ack
3342 // 3: called to transmit reliability box
3343 // 4: called to forward backchannel message
3344 // 5: called to forward DV learn message (caller already picked random neighbour(s))!
3345 // 6: called to forward DV Box message
3346 // 7: called to forward valdiation response
3347
3348 // Choices:
3349 // a) Send ONLY to a *confirmed* direct neighbour
3350 // b) Send allowed to *unconfirmed* direct neighbour
3351 // c) Route also via *confirmed* DV to target
3352 // c) Route allowed via *unconfirmed DV to target
3353 // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do!
3354
3355 // Case analysis:
3356 // 1 2 3 4 5 6 7
3357 // a X X X X X X X
3358 // b X X
3359 // c X X X X X
3360 // d X
3361 //
3362
3222 // FIXME: this one is tricky: 3363 // FIXME: this one is tricky:
3223 // - we could try a direct, reliable channel 3364 // - we could try a direct, reliable channel
3224 // - if that is unavailable / for load balancing, we may try: 3365 // - if that is unavailable / for load balancing, we may try:
@@ -4305,6 +4446,8 @@ check_backchannel_encapsulation (void *cls,
4305 4446
4306/** 4447/**
4307 * Communicator gave us a backchannel encapsulation. Process the request. 4448 * Communicator gave us a backchannel encapsulation. Process the request.
4449 * (We are not the origin of the backchannel here, the communicator simply
4450 * received a backchannel message and we are expected to forward it.)
4308 * 4451 *
4309 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) 4452 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4310 * @param be the message that was received 4453 * @param be the message that was received
@@ -4811,6 +4954,10 @@ handle_dv_learn (void *cls,
4811 ilat = GNUNET_TIME_relative_multiply (network_latency, 4954 ilat = GNUNET_TIME_relative_multiply (network_latency,
4812 i); 4955 i);
4813 path[i] = hops[i-1].hop; 4956 path[i] = hops[i-1].hop;
4957 // FIXME: mark ALL of these as *confirmed* (with what timeout?)
4958 // -- and schedule a job for the confirmation to time out! --
4959 // and possibly do #cores_send_connect_info() if
4960 // the respective neighbour is NOT confirmed yet!
4814 learn_dv_path (path, 4961 learn_dv_path (path,
4815 i, 4962 i,
4816 ilat); 4963 ilat);
@@ -5263,6 +5410,107 @@ update_next_challenge_time (struct ValidationState *vs,
5263 5410
5264 5411
5265/** 5412/**
5413 * Find the queue matching @a pid and @a address.
5414 *
5415 * @param pid peer the queue must go to
5416 * @param address address the queue must use
5417 * @return NULL if no such queue exists
5418 */
5419static struct Queue *
5420find_queue (const struct GNUNET_PeerIdentity *pid,
5421 const char *address)
5422{
5423 struct Neighbour *n;
5424
5425 n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5426 pid);
5427 if (NULL == n)
5428 return NULL;
5429 for (struct Queue *pos = n->queue_head;
5430 NULL != pos;
5431 pos = pos->next_neighbour)
5432 {
5433 if (0 == strcmp (pos->address,
5434 address))
5435 return pos;
5436 }
5437 return NULL;
5438}
5439
5440
5441/**
5442 * Task run periodically to check whether the validity of the given queue has
5443 * run its course. If so, finds either another queue to take over, or clears
5444 * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
5445 * chance to take over, and if that fails, notifies CORE about the disconnect.
5446 *
5447 * @param cls a `struct Queue`
5448 */
5449static void
5450core_queue_visibility_check (void *cls)
5451{
5452 struct Queue *q = cls;
5453
5454 q->visibility_task = NULL;
5455 if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
5456 {
5457 q->visibility_task
5458 = GNUNET_SCHEDULER_add_at (q->validated_until,
5459 &core_queue_visibility_check,
5460 q);
5461 return;
5462 }
5463 update_neighbour_core_visibility (q->neighbour);
5464}
5465
5466
5467/**
5468 * Check whether the CORE visibility of @a n should change. Finds either a
5469 * queue to preserve the visibility, or clears the neighbour's `core_visible`
5470 * flag. In the latter case, gives DV routes a chance to take over, and if
5471 * that fails, notifies CORE about the disconnect. If so, check whether we
5472 * need to notify CORE.
5473 *
5474 * @param n neighbour to perform the check for
5475 */
5476static void
5477update_neighbour_core_visibility (struct Neighbour *n)
5478{
5479 struct DistanceVector *dv;
5480
5481 GNUNET_assert (GNUNET_YES == n->core_visible);
5482 /* Check if _any_ queue of this neighbour is still valid, if so, schedule
5483 the #core_queue_visibility_check() task for that queue */
5484 for (struct Queue *q = n->queue_head;
5485 NULL != q;
5486 q = q->next_neighbour)
5487 {
5488 if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
5489 {
5490 /* found a valid queue, use this one */
5491 q->visibility_task
5492 = GNUNET_SCHEDULER_add_at (q->validated_until,
5493 &core_queue_visibility_check,
5494 q);
5495 return;
5496 }
5497 }
5498 n->core_visible = GNUNET_NO;
5499
5500 /* Check if _any_ DV route to this neighbour is currently
5501 valid, if so, do NOT tell core about the loss of direct
5502 connectivity (DV still counts!) */
5503 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
5504 &n->pid);
5505 if (GNUNET_YES == dv->core_visible)
5506 return;
5507 /* Nothing works anymore, need to tell CORE about the loss of
5508 connectivity! */
5509 cores_send_disconnect_info (&n->pid);
5510}
5511
5512
5513/**
5266 * Communicator gave us a transport address validation response. Process the request. 5514 * Communicator gave us a transport address validation response. Process the request.
5267 * 5515 *
5268 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) 5516 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
@@ -5279,6 +5527,8 @@ handle_validation_response (void *cls,
5279 .vs = NULL 5527 .vs = NULL
5280 }; 5528 };
5281 struct GNUNET_TIME_Absolute origin_time; 5529 struct GNUNET_TIME_Absolute origin_time;
5530 struct Queue *q;
5531 struct DistanceVector *dv;
5282 5532
5283 /* check this is one of our challenges */ 5533 /* check this is one of our challenges */
5284 (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, 5534 (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -5357,8 +5607,39 @@ handle_validation_response (void *cls,
5357 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, 5607 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
5358 &peerstore_store_validation_cb, 5608 &peerstore_store_validation_cb,
5359 vs); 5609 vs);
5360 // FIXME: should we find the matching queue and update the RTT?
5361 finish_cmc_handling (cmc); 5610 finish_cmc_handling (cmc);
5611
5612 /* Finally, we now possibly have a confirmed (!) working queue,
5613 update queue status (if queue still is around) */
5614 q = find_queue (&vs->pid,
5615 vs->address);
5616 if (NULL == q)
5617 {
5618 GNUNET_STATISTICS_update (GST_stats,
5619 "# Queues lost at time of successful validation",
5620 1,
5621 GNUNET_NO);
5622 return;
5623 }
5624 q->validated_until = vs->validated_until;
5625 q->rtt = vs->validation_rtt;
5626 if (GNUNET_NO != q->neighbour->core_visible)
5627 return; /* nothing changed, we are done here */
5628 q->neighbour->core_visible = GNUNET_YES;
5629 q->visibility_task
5630 = GNUNET_SCHEDULER_add_at (q->validated_until,
5631 &core_queue_visibility_check,
5632 q);
5633 /* Check if _any_ DV route to this neighbour is
5634 currently valid, if so, do NOT tell core anything! */
5635 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
5636 &q->neighbour->pid);
5637 if (GNUNET_YES == dv->core_visible)
5638 return; /* nothing changed, done */
5639 /* We lacked a confirmed connection to the neighbour
5640 before, so tell CORE about it (finally!) */
5641 cores_send_connect_info (&q->neighbour->pid,
5642 GNUNET_BANDWIDTH_ZERO);
5362} 5643}
5363 5644
5364 5645
@@ -5640,19 +5921,19 @@ reliability_box_message (struct PendingMessage *pm)
5640 /* failed hard */ 5921 /* failed hard */
5641 GNUNET_break (0); 5922 GNUNET_break (0);
5642 client_send_response (pm, 5923 client_send_response (pm,
5643 GNUNET_NO, 5924 GNUNET_NO,
5644 0); 5925 0);
5645 return NULL; 5926 return NULL;
5646 } 5927 }
5647 bpm = GNUNET_malloc (sizeof (struct PendingMessage) + 5928 bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
5648 sizeof (rbox) + 5929 sizeof (rbox) +
5649 pm->bytes_msg); 5930 pm->bytes_msg);
5650 bpm->target = pm->target; 5931 bpm->target = pm->target;
5651 bpm->frag_parent = pm; 5932 bpm->frag_parent = pm;
5652 GNUNET_CONTAINER_MDLL_insert (frag, 5933 GNUNET_CONTAINER_MDLL_insert (frag,
5653 pm->head_frag, 5934 pm->head_frag,
5654 pm->tail_frag, 5935 pm->tail_frag,
5655 bpm); 5936 bpm);
5656 bpm->timeout = pm->timeout; 5937 bpm->timeout = pm->timeout;
5657 bpm->pmt = PMT_RELIABILITY_BOX; 5938 bpm->pmt = PMT_RELIABILITY_BOX;
5658 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox); 5939 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
@@ -5663,66 +5944,17 @@ reliability_box_message (struct PendingMessage *pm)
5663 rbox.msg_uuid = pm->msg_uuid; 5944 rbox.msg_uuid = pm->msg_uuid;
5664 msg = (char *) &bpm[1]; 5945 msg = (char *) &bpm[1];
5665 memcpy (msg, 5946 memcpy (msg,
5666 &rbox, 5947 &rbox,
5667 sizeof (rbox)); 5948 sizeof (rbox));
5668 memcpy (&msg[sizeof (rbox)], 5949 memcpy (&msg[sizeof (rbox)],
5669 &pm[1], 5950 &pm[1],
5670 pm->bytes_msg); 5951 pm->bytes_msg);
5671 pm->bpm = bpm; 5952 pm->bpm = bpm;
5672 return bpm; 5953 return bpm;
5673} 5954}
5674 5955
5675 5956
5676/** 5957/**
5677 * Send the control message @a payload on @a queue.
5678 *
5679 * @param queue the queue to use for transmission
5680 * @param pm pending message to update once transmission is done, may be NULL!
5681 * @param payload the payload to send (encapsulated in a
5682 * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
5683 * @param payload_size number of bytes in @a payload
5684 */
5685static void
5686queue_send_msg (struct Queue *queue,
5687 struct PendingMessage *pm,
5688 const void *payload,
5689 size_t payload_size)
5690{
5691 struct Neighbour *n = queue->neighbour;
5692 struct GNUNET_TRANSPORT_SendMessageTo *smt;
5693 struct GNUNET_MQ_Envelope *env;
5694
5695 env = GNUNET_MQ_msg_extra (smt,
5696 payload_size,
5697 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
5698 smt->qid = queue->qid;
5699 smt->mid = queue->mid_gen;
5700 smt->receiver = n->pid;
5701 memcpy (&smt[1],
5702 payload,
5703 payload_size);
5704 {
5705 /* Pass the env to the communicator of queue for transmission. */
5706 struct QueueEntry *qe;
5707
5708 qe = GNUNET_new (struct QueueEntry);
5709 qe->mid = queue->mid_gen++;
5710 qe->queue = queue;
5711 // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
5712 // (also, note that pm may be NULL!)
5713 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
5714 queue->queue_tail,
5715 qe);
5716 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
5717 queue->queue_length++;
5718 queue->tc->details.communicator.total_queue_length++;
5719 GNUNET_MQ_send (queue->tc->mq,
5720 env);
5721 }
5722}
5723
5724
5725/**
5726 * We believe we are ready to transmit a message on a queue. Double-checks 5958 * We believe we are ready to transmit a message on a queue. Double-checks
5727 * with the queue's "tracker_out" and then gives the message to the 5959 * with the queue's "tracker_out" and then gives the message to the
5728 * communicator for transmission (updating the tracker, and re-scheduling 5960 * communicator for transmission (updating the tracker, and re-scheduling
@@ -6268,7 +6500,6 @@ static void
6268validation_start_cb (void *cls) 6500validation_start_cb (void *cls)
6269{ 6501{
6270 struct ValidationState *vs; 6502 struct ValidationState *vs;
6271 struct Neighbour *n;
6272 struct Queue *q; 6503 struct Queue *q;
6273 6504
6274 (void) cls; 6505 (void) cls;
@@ -6284,23 +6515,8 @@ validation_start_cb (void *cls)
6284 if (NULL == vs) 6515 if (NULL == vs)
6285 return; /* woopsie, no more addresses known, should only 6516 return; /* woopsie, no more addresses known, should only
6286 happen if we're really a lonely peer */ 6517 happen if we're really a lonely peer */
6287 n = GNUNET_CONTAINER_multipeermap_get (neighbours, 6518 q = find_queue (&vs->pid,
6288 &vs->pid); 6519 vs->address);
6289 q = NULL;
6290 if (NULL != n)
6291 {
6292 for (struct Queue *pos = n->queue_head;
6293 NULL != pos;
6294 pos = pos->next_neighbour)
6295 {
6296 if (0 == strcmp (pos->address,
6297 vs->address))
6298 {
6299 q = pos;
6300 break;
6301 }
6302 }
6303 }
6304 if (NULL == q) 6520 if (NULL == q)
6305 { 6521 {
6306 vs->awaiting_queue = GNUNET_YES; 6522 vs->awaiting_queue = GNUNET_YES;
@@ -6570,8 +6786,6 @@ handle_add_queue_message (void *cls,
6570 &neighbour->pid, 6786 &neighbour->pid,
6571 neighbour, 6787 neighbour,
6572 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 6788 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6573 cores_send_connect_info (&neighbour->pid,
6574 GNUNET_BANDWIDTH_ZERO);
6575 } 6789 }
6576 addr_len = ntohs (aqm->header.size) - sizeof (*aqm); 6790 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
6577 addr = (const char *) &aqm[1]; 6791 addr = (const char *) &aqm[1];