diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-04-17 22:15:43 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-04-17 22:15:53 +0200 |
commit | bd337cf7c6993eb8e97976ec0b088b317c57da0e (patch) | |
tree | 9d2274df3e8f9d4ec5ea0a86a558b942b0f258f4 /src/transport | |
parent | 936ab104abbf037af9405e397829a11467e1baa9 (diff) | |
download | gnunet-bd337cf7c6993eb8e97976ec0b088b317c57da0e.tar.gz gnunet-bd337cf7c6993eb8e97976ec0b088b317c57da0e.zip |
only notify core about validated queues
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 382 |
1 files changed, 298 insertions, 84 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 29bf3bf95..f3874724a 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -35,7 +35,25 @@ | |||
35 | * Implement next: | 35 | * Implement next: |
36 | * - route_message() implementation, including using DV data structures | 36 | * - route_message() implementation, including using DV data structures |
37 | * (but not when routing certain message types, like DV learn, | 37 | * (but not when routing certain message types, like DV learn, |
38 | * MUST pay attention to content here -- or pass extra flags?) | 38 | * looks like now like we need two flags (DV/no-DV, confirmed-only, |
39 | * unconfirmed OK) | ||
40 | * + NOTE: do NOT use PendingMessage for route_message(), as that is | ||
41 | * for fragmentation/reliability and ultimately core flow control! | ||
42 | * => route_message() should pick the queue | ||
43 | * => in case of DV routing, route_message should BOX the message, too. | ||
44 | * - We currently do NEVER tell CORE also about DV-connections (core_visible | ||
45 | * of `struct DistanceVector` is simply never set!) | ||
46 | * + When? Easy if we initiated the DV and got the challenge; do that NOW | ||
47 | * BUT what we passively learned DV (unconfirmed freshness) | ||
48 | * => Do we trigger Challenge->Response there as well, or 'wait' for | ||
49 | * our own DV initiations to discover? | ||
50 | * => What about DV routes that expire? Do we also only count on | ||
51 | * our own DV initiations for maintenance here, or do we | ||
52 | * try to specifically re-confirm the existence of a particular path? | ||
53 | * => OPITMIZATION-FIXME! | ||
54 | * + Where do we track what we told core? Careful: need to check | ||
55 | * the "core_visible' flag in both neighbours and DV before | ||
56 | * sending out notifications to CORE! | ||
39 | * - retransmission logic | 57 | * - retransmission logic |
40 | * - track RTT, distance, loss, etc. => requires extra data structures! | 58 | * - track RTT, distance, loss, etc. => requires extra data structures! |
41 | * | 59 | * |
@@ -55,11 +73,16 @@ | |||
55 | * FIXME (without marks in the code!): | 73 | * FIXME (without marks in the code!): |
56 | * - proper use/initialization of timestamps in messages exchanged | 74 | * - proper use/initialization of timestamps in messages exchanged |
57 | * during DV learning | 75 | * during DV learning |
76 | * - persistence of monotonic time obtained from other peers | ||
77 | * in PEERSTORE (by message type) | ||
58 | * | 78 | * |
59 | * Optimizations: | 79 | * Optimizations: |
60 | * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs | 80 | * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs |
61 | * against our pending message queue (requires additional per neighbour | 81 | * against our pending message queue (requires additional per neighbour |
62 | * hash map to be maintained, avoids possible linear scan on pending msgs) | 82 | * hash map to be maintained, avoids possible linear scan on pending msgs) |
83 | * - queue_send_msg and route_message both by API design have to make copies | ||
84 | * of the payload, and route_message on top of that requires a malloc/free. | ||
85 | * Change design to approximate "zero" copy better... | ||
63 | * | 86 | * |
64 | * Design realizations / discussion: | 87 | * Design realizations / discussion: |
65 | * - communicators do flow control by calling MQ "notify sent" | 88 | * - communicators do flow control by calling MQ "notify sent" |
@@ -1060,6 +1083,13 @@ struct DistanceVector | |||
1060 | * Task scheduled to purge expired paths from @e dv_head MDLL. | 1083 | * Task scheduled to purge expired paths from @e dv_head MDLL. |
1061 | */ | 1084 | */ |
1062 | struct GNUNET_SCHEDULER_Task *timeout_task; | 1085 | struct GNUNET_SCHEDULER_Task *timeout_task; |
1086 | |||
1087 | /** | ||
1088 | * Is one of the DV paths in this struct 'confirmed' and thus | ||
1089 | * the cause for CORE to see this peer as connected? (Note that | ||
1090 | * the same may apply to a `struct Neighbour` at the same time.) | ||
1091 | */ | ||
1092 | int core_visible; | ||
1063 | }; | 1093 | }; |
1064 | 1094 | ||
1065 | 1095 | ||
@@ -1162,11 +1192,26 @@ struct Queue | |||
1162 | struct GNUNET_SCHEDULER_Task *transmit_task; | 1192 | struct GNUNET_SCHEDULER_Task *transmit_task; |
1163 | 1193 | ||
1164 | /** | 1194 | /** |
1195 | * Task scheduled to possibly notfiy core that this queue is no longer | ||
1196 | * counting as confirmed. Runs the #core_queue_visibility_check(). | ||
1197 | */ | ||
1198 | struct GNUNET_SCHEDULER_Task *visibility_task; | ||
1199 | |||
1200 | /** | ||
1165 | * Our current RTT estimate for this queue. | 1201 | * Our current RTT estimate for this queue. |
1166 | */ | 1202 | */ |
1167 | struct GNUNET_TIME_Relative rtt; | 1203 | struct GNUNET_TIME_Relative rtt; |
1168 | 1204 | ||
1169 | /** | 1205 | /** |
1206 | * How long do *we* consider this @e address to be valid? In the past or | ||
1207 | * zero if we have not yet validated it. Can be updated based on | ||
1208 | * challenge-response validations (via address validation logic), or when we | ||
1209 | * receive ACKs that we can definitively map to transmissions via this | ||
1210 | * queue. | ||
1211 | */ | ||
1212 | struct GNUNET_TIME_Absolute validated_until; | ||
1213 | |||
1214 | /** | ||
1170 | * Message ID generator for transmissions on this queue. | 1215 | * Message ID generator for transmissions on this queue. |
1171 | */ | 1216 | */ |
1172 | uint64_t mid_gen; | 1217 | uint64_t mid_gen; |
@@ -1397,6 +1442,11 @@ struct Neighbour | |||
1397 | */ | 1442 | */ |
1398 | struct GNUNET_TIME_Absolute earliest_timeout; | 1443 | struct GNUNET_TIME_Absolute earliest_timeout; |
1399 | 1444 | ||
1445 | /** | ||
1446 | * Do we have a confirmed working queue and are thus visible to | ||
1447 | * CORE? | ||
1448 | */ | ||
1449 | int core_visible; | ||
1400 | }; | 1450 | }; |
1401 | 1451 | ||
1402 | 1452 | ||
@@ -2514,6 +2564,16 @@ schedule_transmit_on_queue (struct Queue *queue) | |||
2514 | 2564 | ||
2515 | 2565 | ||
2516 | /** | 2566 | /** |
2567 | * Check whether the CORE visibility of @a n changed. If so, | ||
2568 | * check whether we need to notify CORE. | ||
2569 | * | ||
2570 | * @param n neighbour to perform the check for | ||
2571 | */ | ||
2572 | static void | ||
2573 | update_neighbour_core_visibility (struct Neighbour *n); | ||
2574 | |||
2575 | |||
2576 | /** | ||
2517 | * Free @a queue. | 2577 | * Free @a queue. |
2518 | * | 2578 | * |
2519 | * @param queue the queue to free | 2579 | * @param queue the queue to free |
@@ -2535,6 +2595,11 @@ free_queue (struct Queue *queue) | |||
2535 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 2595 | GNUNET_SCHEDULER_cancel (queue->transmit_task); |
2536 | queue->transmit_task = NULL; | 2596 | queue->transmit_task = NULL; |
2537 | } | 2597 | } |
2598 | if (NULL != queue->visibility_task) | ||
2599 | { | ||
2600 | GNUNET_SCHEDULER_cancel (queue->visibility_task); | ||
2601 | queue->visibility_task = NULL; | ||
2602 | } | ||
2538 | GNUNET_CONTAINER_MDLL_remove (neighbour, | 2603 | GNUNET_CONTAINER_MDLL_remove (neighbour, |
2539 | neighbour->queue_head, | 2604 | neighbour->queue_head, |
2540 | neighbour->queue_tail, | 2605 | neighbour->queue_tail, |
@@ -2574,9 +2639,12 @@ free_queue (struct Queue *queue) | |||
2574 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); | 2639 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); |
2575 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); | 2640 | GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); |
2576 | GNUNET_free (queue); | 2641 | GNUNET_free (queue); |
2642 | |||
2643 | update_neighbour_core_visibility (neighbour); | ||
2644 | cores_send_disconnect_info (&neighbour->pid); | ||
2645 | |||
2577 | if (NULL == neighbour->queue_head) | 2646 | if (NULL == neighbour->queue_head) |
2578 | { | 2647 | { |
2579 | cores_send_disconnect_info (&neighbour->pid); | ||
2580 | free_neighbour (neighbour); | 2648 | free_neighbour (neighbour); |
2581 | } | 2649 | } |
2582 | } | 2650 | } |
@@ -3209,16 +3277,89 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid, | |||
3209 | 3277 | ||
3210 | 3278 | ||
3211 | /** | 3279 | /** |
3280 | * Send the control message @a payload on @a queue. | ||
3281 | * | ||
3282 | * @param queue the queue to use for transmission | ||
3283 | * @param pm pending message to update once transmission is done, may be NULL! | ||
3284 | * @param payload the payload to send (encapsulated in a | ||
3285 | * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG). | ||
3286 | * @param payload_size number of bytes in @a payload | ||
3287 | */ | ||
3288 | static void | ||
3289 | queue_send_msg (struct Queue *queue, | ||
3290 | struct PendingMessage *pm, | ||
3291 | const void *payload, | ||
3292 | size_t payload_size) | ||
3293 | { | ||
3294 | struct Neighbour *n = queue->neighbour; | ||
3295 | struct GNUNET_TRANSPORT_SendMessageTo *smt; | ||
3296 | struct GNUNET_MQ_Envelope *env; | ||
3297 | |||
3298 | env = GNUNET_MQ_msg_extra (smt, | ||
3299 | payload_size, | ||
3300 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); | ||
3301 | smt->qid = queue->qid; | ||
3302 | smt->mid = queue->mid_gen; | ||
3303 | smt->receiver = n->pid; | ||
3304 | memcpy (&smt[1], | ||
3305 | payload, | ||
3306 | payload_size); | ||
3307 | { | ||
3308 | /* Pass the env to the communicator of queue for transmission. */ | ||
3309 | struct QueueEntry *qe; | ||
3310 | |||
3311 | qe = GNUNET_new (struct QueueEntry); | ||
3312 | qe->mid = queue->mid_gen++; | ||
3313 | qe->queue = queue; | ||
3314 | // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! | ||
3315 | // (also, note that pm may be NULL!) | ||
3316 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, | ||
3317 | queue->queue_tail, | ||
3318 | qe); | ||
3319 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | ||
3320 | queue->queue_length++; | ||
3321 | queue->tc->details.communicator.total_queue_length++; | ||
3322 | GNUNET_MQ_send (queue->tc->mq, | ||
3323 | env); | ||
3324 | } | ||
3325 | } | ||
3326 | |||
3327 | |||
3328 | /** | ||
3212 | * We need to transmit @a hdr to @a target. If necessary, this may | 3329 | * We need to transmit @a hdr to @a target. If necessary, this may |
3213 | * involve DV routing or even broadcasting and fragmentation. | 3330 | * involve DV routing or even broadcasting and fragmentation. |
3214 | * | 3331 | * |
3215 | * @param target peer to receive @a hdr | 3332 | * @param target peer to receive @a hdr |
3216 | * @param hdr header of the message to route | 3333 | * @param hdr header of the message to route and #GNUNET_free() |
3217 | */ | 3334 | */ |
3218 | static void | 3335 | static void |
3219 | route_message (const struct GNUNET_PeerIdentity *target, | 3336 | route_message (const struct GNUNET_PeerIdentity *target, |
3220 | struct GNUNET_MessageHeader *hdr) | 3337 | struct GNUNET_MessageHeader *hdr) |
3221 | { | 3338 | { |
3339 | // Cases: | ||
3340 | // 1: called to transmit backchannel message we initiated | ||
3341 | // 2: called to transmit fragment ack | ||
3342 | // 3: called to transmit reliability box | ||
3343 | // 4: called to forward backchannel message | ||
3344 | // 5: called to forward DV learn message (caller already picked random neighbour(s))! | ||
3345 | // 6: called to forward DV Box message | ||
3346 | // 7: called to forward valdiation response | ||
3347 | |||
3348 | // Choices: | ||
3349 | // a) Send ONLY to a *confirmed* direct neighbour | ||
3350 | // b) Send allowed to *unconfirmed* direct neighbour | ||
3351 | // c) Route also via *confirmed* DV to target | ||
3352 | // c) Route allowed via *unconfirmed DV to target | ||
3353 | // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do! | ||
3354 | |||
3355 | // Case analysis: | ||
3356 | // 1 2 3 4 5 6 7 | ||
3357 | // a X X X X X X X | ||
3358 | // b X X | ||
3359 | // c X X X X X | ||
3360 | // d X | ||
3361 | // | ||
3362 | |||
3222 | // FIXME: this one is tricky: | 3363 | // FIXME: this one is tricky: |
3223 | // - we could try a direct, reliable channel | 3364 | // - we could try a direct, reliable channel |
3224 | // - if that is unavailable / for load balancing, we may try: | 3365 | // - if that is unavailable / for load balancing, we may try: |
@@ -4305,6 +4446,8 @@ check_backchannel_encapsulation (void *cls, | |||
4305 | 4446 | ||
4306 | /** | 4447 | /** |
4307 | * Communicator gave us a backchannel encapsulation. Process the request. | 4448 | * Communicator gave us a backchannel encapsulation. Process the request. |
4449 | * (We are not the origin of the backchannel here, the communicator simply | ||
4450 | * received a backchannel message and we are expected to forward it.) | ||
4308 | * | 4451 | * |
4309 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) | 4452 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) |
4310 | * @param be the message that was received | 4453 | * @param be the message that was received |
@@ -4811,6 +4954,10 @@ handle_dv_learn (void *cls, | |||
4811 | ilat = GNUNET_TIME_relative_multiply (network_latency, | 4954 | ilat = GNUNET_TIME_relative_multiply (network_latency, |
4812 | i); | 4955 | i); |
4813 | path[i] = hops[i-1].hop; | 4956 | path[i] = hops[i-1].hop; |
4957 | // FIXME: mark ALL of these as *confirmed* (with what timeout?) | ||
4958 | // -- and schedule a job for the confirmation to time out! -- | ||
4959 | // and possibly do #cores_send_connect_info() if | ||
4960 | // the respective neighbour is NOT confirmed yet! | ||
4814 | learn_dv_path (path, | 4961 | learn_dv_path (path, |
4815 | i, | 4962 | i, |
4816 | ilat); | 4963 | ilat); |
@@ -5263,6 +5410,107 @@ update_next_challenge_time (struct ValidationState *vs, | |||
5263 | 5410 | ||
5264 | 5411 | ||
5265 | /** | 5412 | /** |
5413 | * Find the queue matching @a pid and @a address. | ||
5414 | * | ||
5415 | * @param pid peer the queue must go to | ||
5416 | * @param address address the queue must use | ||
5417 | * @return NULL if no such queue exists | ||
5418 | */ | ||
5419 | static struct Queue * | ||
5420 | find_queue (const struct GNUNET_PeerIdentity *pid, | ||
5421 | const char *address) | ||
5422 | { | ||
5423 | struct Neighbour *n; | ||
5424 | |||
5425 | n = GNUNET_CONTAINER_multipeermap_get (neighbours, | ||
5426 | pid); | ||
5427 | if (NULL == n) | ||
5428 | return NULL; | ||
5429 | for (struct Queue *pos = n->queue_head; | ||
5430 | NULL != pos; | ||
5431 | pos = pos->next_neighbour) | ||
5432 | { | ||
5433 | if (0 == strcmp (pos->address, | ||
5434 | address)) | ||
5435 | return pos; | ||
5436 | } | ||
5437 | return NULL; | ||
5438 | } | ||
5439 | |||
5440 | |||
5441 | /** | ||
5442 | * Task run periodically to check whether the validity of the given queue has | ||
5443 | * run its course. If so, finds either another queue to take over, or clears | ||
5444 | * the neighbour's `core_visible` flag. In the latter case, gives DV routes a | ||
5445 | * chance to take over, and if that fails, notifies CORE about the disconnect. | ||
5446 | * | ||
5447 | * @param cls a `struct Queue` | ||
5448 | */ | ||
5449 | static void | ||
5450 | core_queue_visibility_check (void *cls) | ||
5451 | { | ||
5452 | struct Queue *q = cls; | ||
5453 | |||
5454 | q->visibility_task = NULL; | ||
5455 | if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) | ||
5456 | { | ||
5457 | q->visibility_task | ||
5458 | = GNUNET_SCHEDULER_add_at (q->validated_until, | ||
5459 | &core_queue_visibility_check, | ||
5460 | q); | ||
5461 | return; | ||
5462 | } | ||
5463 | update_neighbour_core_visibility (q->neighbour); | ||
5464 | } | ||
5465 | |||
5466 | |||
5467 | /** | ||
5468 | * Check whether the CORE visibility of @a n should change. Finds either a | ||
5469 | * queue to preserve the visibility, or clears the neighbour's `core_visible` | ||
5470 | * flag. In the latter case, gives DV routes a chance to take over, and if | ||
5471 | * that fails, notifies CORE about the disconnect. If so, check whether we | ||
5472 | * need to notify CORE. | ||
5473 | * | ||
5474 | * @param n neighbour to perform the check for | ||
5475 | */ | ||
5476 | static void | ||
5477 | update_neighbour_core_visibility (struct Neighbour *n) | ||
5478 | { | ||
5479 | struct DistanceVector *dv; | ||
5480 | |||
5481 | GNUNET_assert (GNUNET_YES == n->core_visible); | ||
5482 | /* Check if _any_ queue of this neighbour is still valid, if so, schedule | ||
5483 | the #core_queue_visibility_check() task for that queue */ | ||
5484 | for (struct Queue *q = n->queue_head; | ||
5485 | NULL != q; | ||
5486 | q = q->next_neighbour) | ||
5487 | { | ||
5488 | if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) | ||
5489 | { | ||
5490 | /* found a valid queue, use this one */ | ||
5491 | q->visibility_task | ||
5492 | = GNUNET_SCHEDULER_add_at (q->validated_until, | ||
5493 | &core_queue_visibility_check, | ||
5494 | q); | ||
5495 | return; | ||
5496 | } | ||
5497 | } | ||
5498 | n->core_visible = GNUNET_NO; | ||
5499 | |||
5500 | /* Check if _any_ DV route to this neighbour is currently | ||
5501 | valid, if so, do NOT tell core about the loss of direct | ||
5502 | connectivity (DV still counts!) */ | ||
5503 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, | ||
5504 | &n->pid); | ||
5505 | if (GNUNET_YES == dv->core_visible) | ||
5506 | return; | ||
5507 | /* Nothing works anymore, need to tell CORE about the loss of | ||
5508 | connectivity! */ | ||
5509 | cores_send_disconnect_info (&n->pid); | ||
5510 | } | ||
5511 | |||
5512 | |||
5513 | /** | ||
5266 | * Communicator gave us a transport address validation response. Process the request. | 5514 | * Communicator gave us a transport address validation response. Process the request. |
5267 | * | 5515 | * |
5268 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) | 5516 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) |
@@ -5279,6 +5527,8 @@ handle_validation_response (void *cls, | |||
5279 | .vs = NULL | 5527 | .vs = NULL |
5280 | }; | 5528 | }; |
5281 | struct GNUNET_TIME_Absolute origin_time; | 5529 | struct GNUNET_TIME_Absolute origin_time; |
5530 | struct Queue *q; | ||
5531 | struct DistanceVector *dv; | ||
5282 | 5532 | ||
5283 | /* check this is one of our challenges */ | 5533 | /* check this is one of our challenges */ |
5284 | (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, | 5534 | (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, |
@@ -5357,8 +5607,39 @@ handle_validation_response (void *cls, | |||
5357 | GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, | 5607 | GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, |
5358 | &peerstore_store_validation_cb, | 5608 | &peerstore_store_validation_cb, |
5359 | vs); | 5609 | vs); |
5360 | // FIXME: should we find the matching queue and update the RTT? | ||
5361 | finish_cmc_handling (cmc); | 5610 | finish_cmc_handling (cmc); |
5611 | |||
5612 | /* Finally, we now possibly have a confirmed (!) working queue, | ||
5613 | update queue status (if queue still is around) */ | ||
5614 | q = find_queue (&vs->pid, | ||
5615 | vs->address); | ||
5616 | if (NULL == q) | ||
5617 | { | ||
5618 | GNUNET_STATISTICS_update (GST_stats, | ||
5619 | "# Queues lost at time of successful validation", | ||
5620 | 1, | ||
5621 | GNUNET_NO); | ||
5622 | return; | ||
5623 | } | ||
5624 | q->validated_until = vs->validated_until; | ||
5625 | q->rtt = vs->validation_rtt; | ||
5626 | if (GNUNET_NO != q->neighbour->core_visible) | ||
5627 | return; /* nothing changed, we are done here */ | ||
5628 | q->neighbour->core_visible = GNUNET_YES; | ||
5629 | q->visibility_task | ||
5630 | = GNUNET_SCHEDULER_add_at (q->validated_until, | ||
5631 | &core_queue_visibility_check, | ||
5632 | q); | ||
5633 | /* Check if _any_ DV route to this neighbour is | ||
5634 | currently valid, if so, do NOT tell core anything! */ | ||
5635 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, | ||
5636 | &q->neighbour->pid); | ||
5637 | if (GNUNET_YES == dv->core_visible) | ||
5638 | return; /* nothing changed, done */ | ||
5639 | /* We lacked a confirmed connection to the neighbour | ||
5640 | before, so tell CORE about it (finally!) */ | ||
5641 | cores_send_connect_info (&q->neighbour->pid, | ||
5642 | GNUNET_BANDWIDTH_ZERO); | ||
5362 | } | 5643 | } |
5363 | 5644 | ||
5364 | 5645 | ||
@@ -5640,19 +5921,19 @@ reliability_box_message (struct PendingMessage *pm) | |||
5640 | /* failed hard */ | 5921 | /* failed hard */ |
5641 | GNUNET_break (0); | 5922 | GNUNET_break (0); |
5642 | client_send_response (pm, | 5923 | client_send_response (pm, |
5643 | GNUNET_NO, | 5924 | GNUNET_NO, |
5644 | 0); | 5925 | 0); |
5645 | return NULL; | 5926 | return NULL; |
5646 | } | 5927 | } |
5647 | bpm = GNUNET_malloc (sizeof (struct PendingMessage) + | 5928 | bpm = GNUNET_malloc (sizeof (struct PendingMessage) + |
5648 | sizeof (rbox) + | 5929 | sizeof (rbox) + |
5649 | pm->bytes_msg); | 5930 | pm->bytes_msg); |
5650 | bpm->target = pm->target; | 5931 | bpm->target = pm->target; |
5651 | bpm->frag_parent = pm; | 5932 | bpm->frag_parent = pm; |
5652 | GNUNET_CONTAINER_MDLL_insert (frag, | 5933 | GNUNET_CONTAINER_MDLL_insert (frag, |
5653 | pm->head_frag, | 5934 | pm->head_frag, |
5654 | pm->tail_frag, | 5935 | pm->tail_frag, |
5655 | bpm); | 5936 | bpm); |
5656 | bpm->timeout = pm->timeout; | 5937 | bpm->timeout = pm->timeout; |
5657 | bpm->pmt = PMT_RELIABILITY_BOX; | 5938 | bpm->pmt = PMT_RELIABILITY_BOX; |
5658 | bpm->bytes_msg = pm->bytes_msg + sizeof (rbox); | 5939 | bpm->bytes_msg = pm->bytes_msg + sizeof (rbox); |
@@ -5663,66 +5944,17 @@ reliability_box_message (struct PendingMessage *pm) | |||
5663 | rbox.msg_uuid = pm->msg_uuid; | 5944 | rbox.msg_uuid = pm->msg_uuid; |
5664 | msg = (char *) &bpm[1]; | 5945 | msg = (char *) &bpm[1]; |
5665 | memcpy (msg, | 5946 | memcpy (msg, |
5666 | &rbox, | 5947 | &rbox, |
5667 | sizeof (rbox)); | 5948 | sizeof (rbox)); |
5668 | memcpy (&msg[sizeof (rbox)], | 5949 | memcpy (&msg[sizeof (rbox)], |
5669 | &pm[1], | 5950 | &pm[1], |
5670 | pm->bytes_msg); | 5951 | pm->bytes_msg); |
5671 | pm->bpm = bpm; | 5952 | pm->bpm = bpm; |
5672 | return bpm; | 5953 | return bpm; |
5673 | } | 5954 | } |
5674 | 5955 | ||
5675 | 5956 | ||
5676 | /** | 5957 | /** |
5677 | * Send the control message @a payload on @a queue. | ||
5678 | * | ||
5679 | * @param queue the queue to use for transmission | ||
5680 | * @param pm pending message to update once transmission is done, may be NULL! | ||
5681 | * @param payload the payload to send (encapsulated in a | ||
5682 | * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG). | ||
5683 | * @param payload_size number of bytes in @a payload | ||
5684 | */ | ||
5685 | static void | ||
5686 | queue_send_msg (struct Queue *queue, | ||
5687 | struct PendingMessage *pm, | ||
5688 | const void *payload, | ||
5689 | size_t payload_size) | ||
5690 | { | ||
5691 | struct Neighbour *n = queue->neighbour; | ||
5692 | struct GNUNET_TRANSPORT_SendMessageTo *smt; | ||
5693 | struct GNUNET_MQ_Envelope *env; | ||
5694 | |||
5695 | env = GNUNET_MQ_msg_extra (smt, | ||
5696 | payload_size, | ||
5697 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); | ||
5698 | smt->qid = queue->qid; | ||
5699 | smt->mid = queue->mid_gen; | ||
5700 | smt->receiver = n->pid; | ||
5701 | memcpy (&smt[1], | ||
5702 | payload, | ||
5703 | payload_size); | ||
5704 | { | ||
5705 | /* Pass the env to the communicator of queue for transmission. */ | ||
5706 | struct QueueEntry *qe; | ||
5707 | |||
5708 | qe = GNUNET_new (struct QueueEntry); | ||
5709 | qe->mid = queue->mid_gen++; | ||
5710 | qe->queue = queue; | ||
5711 | // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! | ||
5712 | // (also, note that pm may be NULL!) | ||
5713 | GNUNET_CONTAINER_DLL_insert (queue->queue_head, | ||
5714 | queue->queue_tail, | ||
5715 | qe); | ||
5716 | GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); | ||
5717 | queue->queue_length++; | ||
5718 | queue->tc->details.communicator.total_queue_length++; | ||
5719 | GNUNET_MQ_send (queue->tc->mq, | ||
5720 | env); | ||
5721 | } | ||
5722 | } | ||
5723 | |||
5724 | |||
5725 | /** | ||
5726 | * We believe we are ready to transmit a message on a queue. Double-checks | 5958 | * We believe we are ready to transmit a message on a queue. Double-checks |
5727 | * with the queue's "tracker_out" and then gives the message to the | 5959 | * with the queue's "tracker_out" and then gives the message to the |
5728 | * communicator for transmission (updating the tracker, and re-scheduling | 5960 | * communicator for transmission (updating the tracker, and re-scheduling |
@@ -6268,7 +6500,6 @@ static void | |||
6268 | validation_start_cb (void *cls) | 6500 | validation_start_cb (void *cls) |
6269 | { | 6501 | { |
6270 | struct ValidationState *vs; | 6502 | struct ValidationState *vs; |
6271 | struct Neighbour *n; | ||
6272 | struct Queue *q; | 6503 | struct Queue *q; |
6273 | 6504 | ||
6274 | (void) cls; | 6505 | (void) cls; |
@@ -6284,23 +6515,8 @@ validation_start_cb (void *cls) | |||
6284 | if (NULL == vs) | 6515 | if (NULL == vs) |
6285 | return; /* woopsie, no more addresses known, should only | 6516 | return; /* woopsie, no more addresses known, should only |
6286 | happen if we're really a lonely peer */ | 6517 | happen if we're really a lonely peer */ |
6287 | n = GNUNET_CONTAINER_multipeermap_get (neighbours, | 6518 | q = find_queue (&vs->pid, |
6288 | &vs->pid); | 6519 | vs->address); |
6289 | q = NULL; | ||
6290 | if (NULL != n) | ||
6291 | { | ||
6292 | for (struct Queue *pos = n->queue_head; | ||
6293 | NULL != pos; | ||
6294 | pos = pos->next_neighbour) | ||
6295 | { | ||
6296 | if (0 == strcmp (pos->address, | ||
6297 | vs->address)) | ||
6298 | { | ||
6299 | q = pos; | ||
6300 | break; | ||
6301 | } | ||
6302 | } | ||
6303 | } | ||
6304 | if (NULL == q) | 6520 | if (NULL == q) |
6305 | { | 6521 | { |
6306 | vs->awaiting_queue = GNUNET_YES; | 6522 | vs->awaiting_queue = GNUNET_YES; |
@@ -6570,8 +6786,6 @@ handle_add_queue_message (void *cls, | |||
6570 | &neighbour->pid, | 6786 | &neighbour->pid, |
6571 | neighbour, | 6787 | neighbour, |
6572 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 6788 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
6573 | cores_send_connect_info (&neighbour->pid, | ||
6574 | GNUNET_BANDWIDTH_ZERO); | ||
6575 | } | 6789 | } |
6576 | addr_len = ntohs (aqm->header.size) - sizeof (*aqm); | 6790 | addr_len = ntohs (aqm->header.size) - sizeof (*aqm); |
6577 | addr = (const char *) &aqm[1]; | 6791 | addr = (const char *) &aqm[1]; |