diff options
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 575 |
1 files changed, 259 insertions, 316 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index c2922dd7e..825d45522 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -24,6 +24,11 @@ | |||
24 | * | 24 | * |
25 | * TODO: | 25 | * TODO: |
26 | * Implement next: | 26 | * Implement next: |
27 | * - complete flow control push back from CORE via TRANSPORT to communicators: | ||
28 | * + resume communicators in handle_client_recv_ok (see FIXME) | ||
29 | * + count transmissions to CORE and suspend communicators if window is full | ||
30 | * - check flow control push back from TRANSPROT to CORE: | ||
31 | * + check when to send ACKs | ||
27 | * - change transport-core API to provide proper flow control in both | 32 | * - change transport-core API to provide proper flow control in both |
28 | * directions, allow multiple messages per peer simultaneously (tag | 33 | * directions, allow multiple messages per peer simultaneously (tag |
29 | * confirmations with unique message ID), and replace quota-out with | 34 | * confirmations with unique message ID), and replace quota-out with |
@@ -114,6 +119,16 @@ | |||
114 | #define MAX_DV_DISCOVERY_SELECTION 16 | 119 | #define MAX_DV_DISCOVERY_SELECTION 16 |
115 | 120 | ||
116 | /** | 121 | /** |
122 | * Window size. How many messages to the same target do we pass | ||
123 | * to CORE without a RECV_OK in between? Small values limit | ||
124 | * thoughput, large values will increase latency. | ||
125 | * | ||
126 | * FIXME-OPTIMIZE: find out what good values are experimentally, | ||
127 | * maybe set adaptively (i.e. to observed available bandwidth). | ||
128 | */ | ||
129 | #define RECV_WINDOW_SIZE 4 | ||
130 | |||
131 | /** | ||
117 | * Minimum number of hops we should forward DV learn messages | 132 | * Minimum number of hops we should forward DV learn messages |
118 | * even if they are NOT useful for us in hope of looping | 133 | * even if they are NOT useful for us in hope of looping |
119 | * back to the initiator? | 134 | * back to the initiator? |
@@ -1100,6 +1115,48 @@ struct PendingMessage; | |||
1100 | */ | 1115 | */ |
1101 | struct DistanceVectorHop; | 1116 | struct DistanceVectorHop; |
1102 | 1117 | ||
1118 | /** | ||
1119 | * A virtual link is another reachable peer that is known to CORE. It | ||
1120 | * can be either a `struct Neighbour` with at least one confirmed | ||
1121 | * `struct Queue`, or a `struct DistanceVector` with at least one | ||
1122 | * confirmed `struct DistanceVectorHop`. With a virtual link we track | ||
1123 | * data that is per neighbour that is not specific to how the | ||
1124 | * connectivity is established. | ||
1125 | */ | ||
1126 | struct VirtualLink | ||
1127 | { | ||
1128 | /** | ||
1129 | * Identity of the peer at the other end of the link. | ||
1130 | */ | ||
1131 | struct GNUNET_PeerIdentity target; | ||
1132 | |||
1133 | /** | ||
1134 | * Task scheduled to possibly notfiy core that this peer is no | ||
1135 | * longer counting as confirmed. Runs the #core_visibility_check(), | ||
1136 | * which checks that some DV-path or a queue exists that is still | ||
1137 | * considered confirmed. | ||
1138 | */ | ||
1139 | struct GNUNET_SCHEDULER_Task *visibility_task; | ||
1140 | |||
1141 | /** | ||
1142 | * Neighbour used by this virtual link, NULL if @e dv is used. | ||
1143 | */ | ||
1144 | struct Neighbour *n; | ||
1145 | |||
1146 | /** | ||
1147 | * Distance vector used by this virtual link, NULL if @e n is used. | ||
1148 | */ | ||
1149 | struct DistanceVector *dv; | ||
1150 | |||
1151 | /** | ||
1152 | * How many more messages can we send to core before we exhaust | ||
1153 | * the receive window of CORE for this peer? If this hits zero, | ||
1154 | * we must tell communicators to stop providing us more messages | ||
1155 | * for this peer. | ||
1156 | */ | ||
1157 | unsigned int core_recv_window; | ||
1158 | }; | ||
1159 | |||
1103 | 1160 | ||
1104 | /** | 1161 | /** |
1105 | * Data structure kept when we are waiting for an acknowledgement. | 1162 | * Data structure kept when we are waiting for an acknowledgement. |
@@ -1316,31 +1373,10 @@ struct DistanceVector | |||
1316 | struct GNUNET_SCHEDULER_Task *timeout_task; | 1373 | struct GNUNET_SCHEDULER_Task *timeout_task; |
1317 | 1374 | ||
1318 | /** | 1375 | /** |
1319 | * Task scheduled to possibly notfiy core that this queue is no longer | 1376 | * Do we have a confirmed working queue and are thus visible to |
1320 | * counting as confirmed. Runs the #core_queue_visibility_check(). | 1377 | * CORE? If so, this is the virtual link, otherwise NULL. |
1321 | */ | ||
1322 | struct GNUNET_SCHEDULER_Task *visibility_task; | ||
1323 | |||
1324 | /** | ||
1325 | * Quota at which CORE is allowed to transmit to this peer | ||
1326 | * (note that the value CORE should actually be told is this | ||
1327 | * value plus the respective value in `struct Neighbour`). | ||
1328 | * Should match the sum of the quotas of all of the paths. | ||
1329 | * | ||
1330 | * FIXME: not yet set, tricky to get right given multiple paths, | ||
1331 | * many of which may be inactive! (=> Idea: measure???) | ||
1332 | * FIXME: how do we set this value initially when we tell CORE? | ||
1333 | * Options: start at a minimum value or at literally zero? | ||
1334 | * (=> Current thought: clean would be zero!) | ||
1335 | */ | ||
1336 | struct GNUNET_BANDWIDTH_Value32NBO quota_out; | ||
1337 | |||
1338 | /** | ||
1339 | * Is one of the DV paths in this struct 'confirmed' and thus | ||
1340 | * the cause for CORE to see this peer as connected? (Note that | ||
1341 | * the same may apply to a `struct Neighbour` at the same time.) | ||
1342 | */ | 1378 | */ |
1343 | int core_visible; | 1379 | struct VirtualLink *link; |
1344 | }; | 1380 | }; |
1345 | 1381 | ||
1346 | 1382 | ||
@@ -1451,12 +1487,6 @@ struct Queue | |||
1451 | struct GNUNET_SCHEDULER_Task *transmit_task; | 1487 | struct GNUNET_SCHEDULER_Task *transmit_task; |
1452 | 1488 | ||
1453 | /** | 1489 | /** |
1454 | * Task scheduled to possibly notfiy core that this queue is no longer | ||
1455 | * counting as confirmed. Runs the #core_queue_visibility_check(). | ||
1456 | */ | ||
1457 | struct GNUNET_SCHEDULER_Task *visibility_task; | ||
1458 | |||
1459 | /** | ||
1460 | * How long do *we* consider this @e address to be valid? In the past or | 1490 | * How long do *we* consider this @e address to be valid? In the past or |
1461 | * zero if we have not yet validated it. Can be updated based on | 1491 | * zero if we have not yet validated it. Can be updated based on |
1462 | * challenge-response validations (via address validation logic), or when we | 1492 | * challenge-response validations (via address validation logic), or when we |
@@ -1643,11 +1673,6 @@ struct Neighbour | |||
1643 | struct Queue *queue_tail; | 1673 | struct Queue *queue_tail; |
1644 | 1674 | ||
1645 | /** | 1675 | /** |
1646 | * Task run to cleanup pending messages that have exceeded their timeout. | ||
1647 | */ | ||
1648 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
1649 | |||
1650 | /** | ||
1651 | * Handle for an operation to fetch @e last_dv_learn_monotime information from | 1676 | * Handle for an operation to fetch @e last_dv_learn_monotime information from |
1652 | * the PEERSTORE, or NULL. | 1677 | * the PEERSTORE, or NULL. |
1653 | */ | 1678 | */ |
@@ -1660,18 +1685,10 @@ struct Neighbour | |||
1660 | struct GNUNET_PEERSTORE_StoreContext *sc; | 1685 | struct GNUNET_PEERSTORE_StoreContext *sc; |
1661 | 1686 | ||
1662 | /** | 1687 | /** |
1663 | * Quota at which CORE is allowed to transmit to this peer | 1688 | * Do we have a confirmed working queue and are thus visible to |
1664 | * (note that the value CORE should actually be told is this | 1689 | * CORE? If so, this is the virtual link, otherwise NULL. |
1665 | * value plus the respective value in `struct DistanceVector`). | ||
1666 | * Should match the sum of the quotas of all of the queues. | ||
1667 | * | ||
1668 | * FIXME: not yet set, tricky to get right given multiple queues! | ||
1669 | * (=> Idea: measure???) | ||
1670 | * FIXME: how do we set this value initially when we tell CORE? | ||
1671 | * Options: start at a minimum value or at literally zero? | ||
1672 | * (=> Current thought: clean would be zero!) | ||
1673 | */ | 1690 | */ |
1674 | struct GNUNET_BANDWIDTH_Value32NBO quota_out; | 1691 | struct VirtualLink *link; |
1675 | 1692 | ||
1676 | /** | 1693 | /** |
1677 | * Latest DVLearn monotonic time seen from this peer. Initialized only | 1694 | * Latest DVLearn monotonic time seen from this peer. Initialized only |
@@ -1680,17 +1697,6 @@ struct Neighbour | |||
1680 | struct GNUNET_TIME_Absolute last_dv_learn_monotime; | 1697 | struct GNUNET_TIME_Absolute last_dv_learn_monotime; |
1681 | 1698 | ||
1682 | /** | 1699 | /** |
1683 | * What is the earliest timeout of any message in @e pending_msg_tail? | ||
1684 | */ | ||
1685 | struct GNUNET_TIME_Absolute earliest_timeout; | ||
1686 | |||
1687 | /** | ||
1688 | * Do we have a confirmed working queue and are thus visible to | ||
1689 | * CORE? | ||
1690 | */ | ||
1691 | int core_visible; | ||
1692 | |||
1693 | /** | ||
1694 | * Do we have the lastest value for @e last_dv_learn_monotime from | 1700 | * Do we have the lastest value for @e last_dv_learn_monotime from |
1695 | * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? | 1701 | * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? |
1696 | */ | 1702 | */ |
@@ -2417,6 +2423,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes; | |||
2417 | static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; | 2423 | static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; |
2418 | 2424 | ||
2419 | /** | 2425 | /** |
2426 | * Map from PIDs to `struct VirtualLink` entries describing | ||
2427 | * links CORE knows to exist. | ||
2428 | */ | ||
2429 | static struct GNUNET_CONTAINER_MultiPeerMap *links; | ||
2430 | |||
2431 | /** | ||
2420 | * Map from challenges to `struct LearnLaunchEntry` values. | 2432 | * Map from challenges to `struct LearnLaunchEntry` values. |
2421 | */ | 2433 | */ |
2422 | static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map; | 2434 | static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map; |
@@ -2564,6 +2576,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece) | |||
2564 | 2576 | ||
2565 | 2577 | ||
2566 | /** | 2578 | /** |
2579 | * Free virtual link. | ||
2580 | * | ||
2581 | * @param vl link data to free | ||
2582 | */ | ||
2583 | static void | ||
2584 | free_virtual_link (struct VirtualLink *vl) | ||
2585 | { | ||
2586 | GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl); | ||
2587 | if (NULL != vl->visibility_task) | ||
2588 | { | ||
2589 | GNUNET_SCHEDULER_cancel (vl->visibility_task); | ||
2590 | vl->visibility_task = NULL; | ||
2591 | } | ||
2592 | GNUNET_break (NULL == vl->n); | ||
2593 | GNUNET_break (NULL == vl->dv); | ||
2594 | GNUNET_free (vl); | ||
2595 | } | ||
2596 | |||
2597 | |||
2598 | /** | ||
2567 | * Free validation state. | 2599 | * Free validation state. |
2568 | * | 2600 | * |
2569 | * @param vs validation state to free | 2601 | * @param vs validation state to free |
@@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv) | |||
2684 | GNUNET_assert ( | 2716 | GNUNET_assert ( |
2685 | GNUNET_YES == | 2717 | GNUNET_YES == |
2686 | GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); | 2718 | GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); |
2687 | if (NULL != dv->visibility_task) | ||
2688 | GNUNET_SCHEDULER_cancel (dv->visibility_task); | ||
2689 | if (NULL != dv->timeout_task) | 2719 | if (NULL != dv->timeout_task) |
2690 | GNUNET_SCHEDULER_cancel (dv->timeout_task); | 2720 | GNUNET_SCHEDULER_cancel (dv->timeout_task); |
2691 | GNUNET_free (dv); | 2721 | GNUNET_free (dv); |
@@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour) | |||
2873 | GNUNET_CONTAINER_multipeermap_remove (neighbours, | 2903 | GNUNET_CONTAINER_multipeermap_remove (neighbours, |
2874 | &neighbour->pid, | 2904 | &neighbour->pid, |
2875 | neighbour)); | 2905 | neighbour)); |
2876 | if (NULL != neighbour->timeout_task) | ||
2877 | GNUNET_SCHEDULER_cancel (neighbour->timeout_task); | ||
2878 | if (NULL != neighbour->reassembly_map) | 2906 | if (NULL != neighbour->reassembly_map) |
2879 | { | 2907 | { |
2880 | GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, | 2908 | GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, |
@@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour) | |||
2917 | * | 2945 | * |
2918 | * @param tc client to inform (must be CORE client) | 2946 | * @param tc client to inform (must be CORE client) |
2919 | * @param pid peer the connection is for | 2947 | * @param pid peer the connection is for |
2920 | * @param quota_out current quota for the peer | ||
2921 | */ | 2948 | */ |
2922 | static void | 2949 | static void |
2923 | core_send_connect_info (struct TransportClient *tc, | 2950 | core_send_connect_info (struct TransportClient *tc, |
2924 | const struct GNUNET_PeerIdentity *pid, | 2951 | const struct GNUNET_PeerIdentity *pid) |
2925 | struct GNUNET_BANDWIDTH_Value32NBO quota_out) | ||
2926 | { | 2952 | { |
2927 | struct GNUNET_MQ_Envelope *env; | 2953 | struct GNUNET_MQ_Envelope *env; |
2928 | struct ConnectInfoMessage *cim; | 2954 | struct ConnectInfoMessage *cim; |
2929 | 2955 | ||
2930 | GNUNET_assert (CT_CORE == tc->type); | 2956 | GNUNET_assert (CT_CORE == tc->type); |
2931 | env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); | 2957 | env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); |
2932 | cim->quota_out = quota_out; | ||
2933 | cim->id = *pid; | 2958 | cim->id = *pid; |
2934 | GNUNET_MQ_send (tc->mq, env); | 2959 | GNUNET_MQ_send (tc->mq, env); |
2935 | } | 2960 | } |
@@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc, | |||
2939 | * Send message to CORE clients that we gained a connection | 2964 | * Send message to CORE clients that we gained a connection |
2940 | * | 2965 | * |
2941 | * @param pid peer the queue was for | 2966 | * @param pid peer the queue was for |
2942 | * @param quota_out current quota for the peer | ||
2943 | */ | 2967 | */ |
2944 | static void | 2968 | static void |
2945 | cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, | 2969 | cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) |
2946 | struct GNUNET_BANDWIDTH_Value32NBO quota_out) | ||
2947 | { | 2970 | { |
2948 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2971 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2949 | "Informing CORE clients about connection to %s\n", | 2972 | "Informing CORE clients about connection to %s\n", |
@@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, | |||
2952 | { | 2975 | { |
2953 | if (CT_CORE != tc->type) | 2976 | if (CT_CORE != tc->type) |
2954 | continue; | 2977 | continue; |
2955 | core_send_connect_info (tc, pid, quota_out); | 2978 | core_send_connect_info (tc, pid); |
2956 | } | 2979 | } |
2957 | } | 2980 | } |
2958 | 2981 | ||
@@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) | |||
3059 | 3082 | ||
3060 | 3083 | ||
3061 | /** | 3084 | /** |
3062 | * Check whether the CORE visibility of @a n changed. If so, | 3085 | * Task run to check whether the hops of the @a cls still |
3063 | * check whether we need to notify CORE. | 3086 | * are validated, or if we need to core about disconnection. |
3064 | * | 3087 | * |
3065 | * @param n neighbour to perform the check for | 3088 | * @param cls a `struct VirtualLink` |
3066 | */ | 3089 | */ |
3067 | static void | 3090 | static void |
3068 | update_neighbour_core_visibility (struct Neighbour *n); | 3091 | check_link_down (void *cls) |
3092 | { | ||
3093 | struct VirtualLink *vl = cls; | ||
3094 | struct DistanceVector *dv = vl->dv; | ||
3095 | struct Neighbour *n = vl->n; | ||
3096 | struct GNUNET_TIME_Absolute dvh_timeout; | ||
3097 | struct GNUNET_TIME_Absolute q_timeout; | ||
3098 | |||
3099 | vl->visibility_task = NULL; | ||
3100 | dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS; | ||
3101 | for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; | ||
3102 | pos = pos->next_dv) | ||
3103 | dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); | ||
3104 | if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) | ||
3105 | vl->dv = NULL; | ||
3106 | q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; | ||
3107 | for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) | ||
3108 | q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); | ||
3109 | if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) | ||
3110 | vl->n = NULL; | ||
3111 | if ((NULL == vl->n) && (NULL == vl->dv)) | ||
3112 | { | ||
3113 | cores_send_disconnect_info (&dv->target); | ||
3114 | free_virtual_link (vl); | ||
3115 | return; | ||
3116 | } | ||
3117 | vl->visibility_task = | ||
3118 | GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout), | ||
3119 | &check_link_down, | ||
3120 | vl); | ||
3121 | } | ||
3069 | 3122 | ||
3070 | 3123 | ||
3071 | /** | 3124 | /** |
@@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue) | |||
3083 | struct QueueEntry *qe; | 3136 | struct QueueEntry *qe; |
3084 | int maxxed; | 3137 | int maxxed; |
3085 | struct PendingAcknowledgement *pa; | 3138 | struct PendingAcknowledgement *pa; |
3139 | struct VirtualLink *vl; | ||
3086 | 3140 | ||
3087 | if (NULL != queue->transmit_task) | 3141 | if (NULL != queue->transmit_task) |
3088 | { | 3142 | { |
3089 | GNUNET_SCHEDULER_cancel (queue->transmit_task); | 3143 | GNUNET_SCHEDULER_cancel (queue->transmit_task); |
3090 | queue->transmit_task = NULL; | 3144 | queue->transmit_task = NULL; |
3091 | } | 3145 | } |
3092 | if (NULL != queue->visibility_task) | ||
3093 | { | ||
3094 | GNUNET_SCHEDULER_cancel (queue->visibility_task); | ||
3095 | queue->visibility_task = NULL; | ||
3096 | } | ||
3097 | while (NULL != (pa = queue->pa_head)) | 3146 | while (NULL != (pa = queue->pa_head)) |
3098 | { | 3147 | { |
3099 | GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); | 3148 | GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); |
@@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue) | |||
3139 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); | 3188 | notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); |
3140 | GNUNET_free (queue); | 3189 | GNUNET_free (queue); |
3141 | 3190 | ||
3142 | update_neighbour_core_visibility (neighbour); | 3191 | vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid); |
3143 | cores_send_disconnect_info (&neighbour->pid); | 3192 | if ((NULL != vl) && (neighbour == vl->n)) |
3144 | 3193 | { | |
3194 | GNUNET_SCHEDULER_cancel (vl->visibility_task); | ||
3195 | check_link_down (vl); | ||
3196 | } | ||
3145 | if (NULL == neighbour->queue_head) | 3197 | if (NULL == neighbour->queue_head) |
3146 | { | 3198 | { |
3147 | free_neighbour (neighbour); | 3199 | free_neighbour (neighbour); |
@@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls, | |||
3281 | void *value) | 3333 | void *value) |
3282 | { | 3334 | { |
3283 | struct TransportClient *tc = cls; | 3335 | struct TransportClient *tc = cls; |
3284 | struct Neighbour *neighbour = value; | ||
3285 | 3336 | ||
3337 | (void) value; | ||
3286 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 3338 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3287 | "Telling new CORE client about existing connection to %s\n", | 3339 | "Telling new CORE client about existing connection to %s\n", |
3288 | GNUNET_i2s (pid)); | 3340 | GNUNET_i2s (pid)); |
3289 | core_send_connect_info (tc, pid, neighbour->quota_out); | 3341 | core_send_connect_info (tc, pid); |
3290 | return GNUNET_OK; | 3342 | return GNUNET_OK; |
3291 | } | 3343 | } |
3292 | 3344 | ||
@@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm, | |||
3469 | if (NULL != tc) | 3521 | if (NULL != tc) |
3470 | { | 3522 | { |
3471 | env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); | 3523 | env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); |
3472 | som->success = htonl ((uint32_t) success); | ||
3473 | som->bytes_msg = htons (pm->bytes_msg); | ||
3474 | som->bytes_physical = htonl (bytes_physical); | ||
3475 | som->peer = target->pid; | 3524 | som->peer = target->pid; |
3476 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 3525 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3477 | "Confirming %s transmission of %u/%u bytes to %s\n", | 3526 | "Confirming %s transmission of %u/%u bytes to %s\n", |
@@ -3486,45 +3535,6 @@ client_send_response (struct PendingMessage *pm, | |||
3486 | 3535 | ||
3487 | 3536 | ||
3488 | /** | 3537 | /** |
3489 | * Checks the message queue for a neighbour for messages that have timed | ||
3490 | * out and purges them. | ||
3491 | * | ||
3492 | * @param cls a `struct Neighbour` | ||
3493 | */ | ||
3494 | static void | ||
3495 | check_queue_timeouts (void *cls) | ||
3496 | { | ||
3497 | struct Neighbour *n = cls; | ||
3498 | struct PendingMessage *pm; | ||
3499 | struct GNUNET_TIME_Absolute now; | ||
3500 | struct GNUNET_TIME_Absolute earliest_timeout; | ||
3501 | |||
3502 | n->timeout_task = NULL; | ||
3503 | earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; | ||
3504 | now = GNUNET_TIME_absolute_get (); | ||
3505 | for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm) | ||
3506 | { | ||
3507 | pm = pos->next_neighbour; | ||
3508 | if (pos->timeout.abs_value_us <= now.abs_value_us) | ||
3509 | { | ||
3510 | GNUNET_STATISTICS_update (GST_stats, | ||
3511 | "# messages dropped (timeout before confirmation)", | ||
3512 | 1, | ||
3513 | GNUNET_NO); | ||
3514 | client_send_response (pm, GNUNET_NO, 0); | ||
3515 | continue; | ||
3516 | } | ||
3517 | earliest_timeout = | ||
3518 | GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout); | ||
3519 | } | ||
3520 | n->earliest_timeout = earliest_timeout; | ||
3521 | if (NULL != n->pending_msg_head) | ||
3522 | n->timeout_task = | ||
3523 | GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n); | ||
3524 | } | ||
3525 | |||
3526 | |||
3527 | /** | ||
3528 | * Create a DV Box message. | 3538 | * Create a DV Box message. |
3529 | * | 3539 | * |
3530 | * @param total_hops how many hops did the message take so far | 3540 | * @param total_hops how many hops did the message take so far |
@@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3689 | const void *payload; | 3699 | const void *payload; |
3690 | size_t payload_size; | 3700 | size_t payload_size; |
3691 | struct TransportDVBoxMessage *dvb; | 3701 | struct TransportDVBoxMessage *dvb; |
3702 | struct VirtualLink *vl; | ||
3692 | 3703 | ||
3693 | GNUNET_assert (CT_CORE == tc->type); | 3704 | GNUNET_assert (CT_CORE == tc->type); |
3694 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; | 3705 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; |
3695 | bytes_msg = ntohs (obmm->size); | 3706 | bytes_msg = ntohs (obmm->size); |
3696 | target = lookup_neighbour (&obm->peer); | 3707 | vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer); |
3697 | if (NULL == target) | 3708 | if (NULL == vl) |
3698 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); | ||
3699 | else | ||
3700 | dv = NULL; | ||
3701 | if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible))) | ||
3702 | { | 3709 | { |
3703 | /* Failure: don't have this peer as a neighbour (anymore). | 3710 | /* Failure: don't have this peer as a neighbour (anymore). |
3704 | Might have gone down asynchronously, so this is NOT | 3711 | Might have gone down asynchronously, so this is NOT |
3705 | a protocol violation by CORE. Still count the event, | 3712 | a protocol violation by CORE. Still count the event, |
3706 | as this should be rare. */ | 3713 | as this should be rare. */ |
3707 | struct GNUNET_MQ_Envelope *env; | ||
3708 | struct SendOkMessage *som; | ||
3709 | |||
3710 | env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); | ||
3711 | som->success = htonl (GNUNET_SYSERR); | ||
3712 | som->bytes_msg = htonl (bytes_msg); | ||
3713 | som->bytes_physical = htonl (0); | ||
3714 | som->peer = obm->peer; | ||
3715 | GNUNET_MQ_send (tc->mq, env); | ||
3716 | GNUNET_SERVICE_client_continue (tc->client); | 3714 | GNUNET_SERVICE_client_continue (tc->client); |
3717 | GNUNET_STATISTICS_update (GST_stats, | 3715 | GNUNET_STATISTICS_update (GST_stats, |
3718 | "# messages dropped (neighbour unknown)", | 3716 | "# messages dropped (neighbour unknown)", |
@@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3720 | GNUNET_NO); | 3718 | GNUNET_NO); |
3721 | return; | 3719 | return; |
3722 | } | 3720 | } |
3721 | target = lookup_neighbour (&obm->peer); | ||
3722 | if (NULL == target) | ||
3723 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); | ||
3724 | else | ||
3725 | dv = NULL; | ||
3726 | GNUNET_assert ((NULL != target) || (NULL != dv)); | ||
3723 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 3727 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3724 | "Sending %u bytes to %s using %s\n", | 3728 | "Sending %u bytes to %s using %s\n", |
3725 | bytes_msg, | 3729 | bytes_msg, |
@@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3756 | pm->client = tc; | 3760 | pm->client = tc; |
3757 | pm->target = target; | 3761 | pm->target = target; |
3758 | pm->bytes_msg = payload_size; | 3762 | pm->bytes_msg = payload_size; |
3759 | pm->timeout = | ||
3760 | GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); | ||
3761 | memcpy (&pm[1], payload, payload_size); | 3763 | memcpy (&pm[1], payload, payload_size); |
3762 | GNUNET_free_non_null (dvb); | 3764 | GNUNET_free_non_null (dvb); |
3763 | dvb = NULL; | 3765 | dvb = NULL; |
@@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) | |||
3777 | tc->details.core.pending_msg_head, | 3779 | tc->details.core.pending_msg_head, |
3778 | tc->details.core.pending_msg_tail, | 3780 | tc->details.core.pending_msg_tail, |
3779 | pm); | 3781 | pm); |
3780 | if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) | ||
3781 | { | ||
3782 | target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; | ||
3783 | if (NULL != target->timeout_task) | ||
3784 | GNUNET_SCHEDULER_cancel (target->timeout_task); | ||
3785 | target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, | ||
3786 | &check_queue_timeouts, | ||
3787 | target); | ||
3788 | } | ||
3789 | if (! was_empty) | 3782 | if (! was_empty) |
3790 | return; /* all queues must already be busy */ | 3783 | return; /* all queues must already be busy */ |
3791 | for (struct Queue *queue = target->queue_head; NULL != queue; | 3784 | for (struct Queue *queue = target->queue_head; NULL != queue; |
@@ -3834,6 +3827,47 @@ check_communicator_available ( | |||
3834 | 3827 | ||
3835 | 3828 | ||
3836 | /** | 3829 | /** |
3830 | * Client confirms that it is done handling message(s) to a particular | ||
3831 | * peer. We may now provide more messages to CORE for this peer. | ||
3832 | * | ||
3833 | * Notifies the respective queues that more messages can now be received. | ||
3834 | * | ||
3835 | * @param cls the client | ||
3836 | * @param rom the message that was sent | ||
3837 | */ | ||
3838 | static void | ||
3839 | handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) | ||
3840 | { | ||
3841 | struct TransportClient *tc = cls; | ||
3842 | struct VirtualLink *vl; | ||
3843 | uint32_t delta; | ||
3844 | |||
3845 | if (CT_CORE != tc->type) | ||
3846 | { | ||
3847 | GNUNET_break (0); | ||
3848 | GNUNET_SERVICE_client_drop (tc->client); | ||
3849 | return; | ||
3850 | } | ||
3851 | vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer); | ||
3852 | if (NULL == vl) | ||
3853 | { | ||
3854 | GNUNET_STATISTICS_update (GST_stats, | ||
3855 | "# RECV_OK dropped: virtual link unknown", | ||
3856 | 1, | ||
3857 | GNUNET_NO); | ||
3858 | GNUNET_SERVICE_client_continue (tc->client); | ||
3859 | return; | ||
3860 | } | ||
3861 | delta = ntohl (rom->increase_window_delta); | ||
3862 | vl->core_recv_window += delta; | ||
3863 | if (delta == vl->core_recv_window) | ||
3864 | { | ||
3865 | // FIXME: resume communicators! | ||
3866 | } | ||
3867 | } | ||
3868 | |||
3869 | |||
3870 | /** | ||
3837 | * Communicator started. Process the request. | 3871 | * Communicator started. Process the request. |
3838 | * | 3872 | * |
3839 | * @param cls the client | 3873 | * @param cls the client |
@@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n, | |||
4090 | for (struct Queue *pos = n->queue_head; NULL != pos; | 4124 | for (struct Queue *pos = n->queue_head; NULL != pos; |
4091 | pos = pos->next_neighbour) | 4125 | pos = pos->next_neighbour) |
4092 | { | 4126 | { |
4093 | /* Count the queue with the visibility task in all cases, as | ||
4094 | otherwise we may end up with no queues just because the | ||
4095 | time for the visibility task just expired but the scheduler | ||
4096 | just ran this task first */ | ||
4097 | if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || | 4127 | if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || |
4098 | (pos->validated_until.abs_value_us > now.abs_value_us) || | 4128 | (pos->validated_until.abs_value_us > now.abs_value_us)) |
4099 | (NULL != pos->visibility_task)) | ||
4100 | candidates++; | 4129 | candidates++; |
4101 | } | 4130 | } |
4102 | if (0 == candidates) | 4131 | if (0 == candidates) |
4103 | { | 4132 | { |
4104 | /* Given that we above check for pos->visibility task, | 4133 | /* This can happen rarely if the last confirmed queue timed |
4105 | this should be strictly impossible. */ | 4134 | out just as we were beginning to process this message. */ |
4106 | GNUNET_break (0); | 4135 | GNUNET_STATISTICS_update (GST_stats, |
4136 | "# route selection failed (all no valid queue)", | ||
4137 | 1, | ||
4138 | GNUNET_NO); | ||
4107 | return; | 4139 | return; |
4108 | } | 4140 | } |
4109 | sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); | 4141 | sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); |
@@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n, | |||
4115 | for (struct Queue *pos = n->queue_head; NULL != pos; | 4147 | for (struct Queue *pos = n->queue_head; NULL != pos; |
4116 | pos = pos->next_neighbour) | 4148 | pos = pos->next_neighbour) |
4117 | { | 4149 | { |
4118 | /* Count the queue with the visibility task in all cases, as | 4150 | if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || |
4119 | otherwise we may end up with no queues just because the | 4151 | (pos->validated_until.abs_value_us > now.abs_value_us)) |
4120 | time for the visibility task just expired but the scheduler | ||
4121 | just ran this task first */ | ||
4122 | if ((pos->validated_until.abs_value_us > now.abs_value_us) || | ||
4123 | (NULL != pos->visibility_task)) | ||
4124 | { | 4152 | { |
4125 | if ((sel1 == candidates) || (sel2 == candidates)) | 4153 | if ((sel1 == candidates) || (sel2 == candidates)) |
4126 | queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); | 4154 | queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); |
@@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target, | |||
4197 | struct GNUNET_MessageHeader *hdr, | 4225 | struct GNUNET_MessageHeader *hdr, |
4198 | enum RouteMessageOptions options) | 4226 | enum RouteMessageOptions options) |
4199 | { | 4227 | { |
4228 | struct VirtualLink *vl; | ||
4200 | struct Neighbour *n; | 4229 | struct Neighbour *n; |
4201 | struct DistanceVector *dv; | 4230 | struct DistanceVector *dv; |
4202 | 4231 | ||
4203 | n = lookup_neighbour (target); | 4232 | vl = GNUNET_CONTAINER_multipeermap_get (links, target); |
4204 | dv = (0 != (options & RMO_DV_ALLOWED)) | 4233 | n = vl->n; |
4205 | ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target) | 4234 | dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; |
4206 | : NULL; | ||
4207 | if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) | 4235 | if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) |
4208 | { | 4236 | { |
4209 | /* if confirmed is required, and we do not have anything | 4237 | /* if confirmed is required, and we do not have anything |
4210 | confirmed, drop respective options */ | 4238 | confirmed, drop respective options */ |
4211 | if ((NULL != n) && (GNUNET_NO == n->core_visible)) | 4239 | if (NULL == n) |
4212 | n = NULL; | 4240 | n = lookup_neighbour (target); |
4213 | if ((NULL != dv) && (GNUNET_NO == dv->core_visible)) | 4241 | if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED))) |
4214 | dv = NULL; | 4242 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target); |
4215 | } | 4243 | } |
4216 | if ((NULL == n) && (NULL == dv)) | 4244 | if ((NULL == n) && (NULL == dv)) |
4217 | { | 4245 | { |
@@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls) | |||
5758 | GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); | 5786 | GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); |
5759 | } | 5787 | } |
5760 | 5788 | ||
5761 | /** | ||
5762 | * Task run to check whether the hops of the @a cls still | ||
5763 | * are validated, or if we need to core about disconnection. | ||
5764 | * | ||
5765 | * @param cls a `struct DistanceVector` (with core_visible set!) | ||
5766 | */ | ||
5767 | static void | ||
5768 | check_dv_path_down (void *cls) | ||
5769 | { | ||
5770 | struct DistanceVector *dv = cls; | ||
5771 | struct Neighbour *n; | ||
5772 | |||
5773 | dv->visibility_task = NULL; | ||
5774 | GNUNET_assert (GNUNET_YES == dv->core_visible); | ||
5775 | for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; | ||
5776 | pos = pos->next_dv) | ||
5777 | { | ||
5778 | if (0 < | ||
5779 | GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us) | ||
5780 | { | ||
5781 | dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until, | ||
5782 | &check_dv_path_down, | ||
5783 | dv); | ||
5784 | return; | ||
5785 | } | ||
5786 | } | ||
5787 | /* all paths invalid, make dv core-invisible */ | ||
5788 | dv->core_visible = GNUNET_NO; | ||
5789 | n = lookup_neighbour (&dv->target); | ||
5790 | if ((NULL != n) && (GNUNET_YES == n->core_visible)) | ||
5791 | return; /* no need to tell core, connection still up! */ | ||
5792 | cores_send_disconnect_info (&dv->target); | ||
5793 | } | ||
5794 | |||
5795 | 5789 | ||
5796 | /** | 5790 | /** |
5797 | * The @a hop is a validated path to the respective target | 5791 | * The @a hop is a validated path to the respective target |
@@ -5804,22 +5798,30 @@ static void | |||
5804 | activate_core_visible_dv_path (struct DistanceVectorHop *hop) | 5798 | activate_core_visible_dv_path (struct DistanceVectorHop *hop) |
5805 | { | 5799 | { |
5806 | struct DistanceVector *dv = hop->dv; | 5800 | struct DistanceVector *dv = hop->dv; |
5807 | struct Neighbour *n; | 5801 | struct VirtualLink *vl; |
5808 | |||
5809 | GNUNET_assert (GNUNET_NO == dv->core_visible); | ||
5810 | GNUNET_assert (NULL == dv->visibility_task); | ||
5811 | 5802 | ||
5812 | dv->core_visible = GNUNET_YES; | 5803 | vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target); |
5813 | dv->visibility_task = | 5804 | if (NULL != vl) |
5814 | GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv); | 5805 | { |
5815 | n = lookup_neighbour (&dv->target); | 5806 | /* Link was already up, remember dv is also now available and we are done */ |
5816 | if ((NULL != n) && (GNUNET_YES == n->core_visible)) | 5807 | vl->dv = dv; |
5817 | return; /* no need to tell core, connection already up! */ | 5808 | return; |
5818 | cores_send_connect_info (&dv->target, | 5809 | } |
5819 | (NULL != n) | 5810 | vl = GNUNET_new (struct VirtualLink); |
5820 | ? GNUNET_BANDWIDTH_value_sum (n->quota_out, | 5811 | vl->target = dv->target; |
5821 | dv->quota_out) | 5812 | vl->dv = dv; |
5822 | : dv->quota_out); | 5813 | vl->core_recv_window = RECV_WINDOW_SIZE; |
5814 | vl->visibility_task = | ||
5815 | GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); | ||
5816 | GNUNET_break (GNUNET_YES == | ||
5817 | GNUNET_CONTAINER_multipeermap_put ( | ||
5818 | links, | ||
5819 | &vl->target, | ||
5820 | vl, | ||
5821 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
5822 | /* We lacked a confirmed connection to the target | ||
5823 | before, so tell CORE about it (finally!) */ | ||
5824 | cores_send_connect_info (&dv->target); | ||
5823 | } | 5825 | } |
5824 | 5826 | ||
5825 | 5827 | ||
@@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, | |||
5934 | GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); | 5936 | GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); |
5935 | GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); | 5937 | GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); |
5936 | GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); | 5938 | GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); |
5937 | if ((GNUNET_NO == dv->core_visible) && | 5939 | if (0 < |
5938 | (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until) | 5940 | GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) |
5939 | .rel_value_us)) | ||
5940 | activate_core_visible_dv_path (pos); | 5941 | activate_core_visible_dv_path (pos); |
5941 | if (last_timeout.rel_value_us < | 5942 | if (last_timeout.rel_value_us < |
5942 | GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, | 5943 | GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, |
@@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, | |||
5976 | next_hop->dv_head, | 5977 | next_hop->dv_head, |
5977 | next_hop->dv_tail, | 5978 | next_hop->dv_tail, |
5978 | hop); | 5979 | hop); |
5979 | if ((GNUNET_NO == dv->core_visible) && | 5980 | if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) |
5980 | (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)) | ||
5981 | activate_core_visible_dv_path (hop); | 5981 | activate_core_visible_dv_path (hop); |
5982 | return GNUNET_YES; | 5982 | return GNUNET_YES; |
5983 | } | 5983 | } |
@@ -6943,75 +6943,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) | |||
6943 | 6943 | ||
6944 | 6944 | ||
6945 | /** | 6945 | /** |
6946 | * Task run periodically to check whether the validity of the given queue has | ||
6947 | * run its course. If so, finds either another queue to take over, or clears | ||
6948 | * the neighbour's `core_visible` flag. In the latter case, gives DV routes a | ||
6949 | * chance to take over, and if that fails, notifies CORE about the disconnect. | ||
6950 | * | ||
6951 | * @param cls a `struct Queue` | ||
6952 | */ | ||
6953 | static void | ||
6954 | core_queue_visibility_check (void *cls) | ||
6955 | { | ||
6956 | struct Queue *q = cls; | ||
6957 | |||
6958 | q->visibility_task = NULL; | ||
6959 | if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) | ||
6960 | { | ||
6961 | q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, | ||
6962 | &core_queue_visibility_check, | ||
6963 | q); | ||
6964 | return; | ||
6965 | } | ||
6966 | update_neighbour_core_visibility (q->neighbour); | ||
6967 | } | ||
6968 | |||
6969 | |||
6970 | /** | ||
6971 | * Check whether the CORE visibility of @a n should change. Finds either a | ||
6972 | * queue to preserve the visibility, or clears the neighbour's `core_visible` | ||
6973 | * flag. In the latter case, gives DV routes a chance to take over, and if | ||
6974 | * that fails, notifies CORE about the disconnect. If so, check whether we | ||
6975 | * need to notify CORE. | ||
6976 | * | ||
6977 | * @param n neighbour to perform the check for | ||
6978 | */ | ||
6979 | static void | ||
6980 | update_neighbour_core_visibility (struct Neighbour *n) | ||
6981 | { | ||
6982 | struct DistanceVector *dv; | ||
6983 | |||
6984 | GNUNET_assert (GNUNET_YES == n->core_visible); | ||
6985 | /* Check if _any_ queue of this neighbour is still valid, if so, schedule | ||
6986 | the #core_queue_visibility_check() task for that queue */ | ||
6987 | for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) | ||
6988 | { | ||
6989 | if (0 != | ||
6990 | GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) | ||
6991 | { | ||
6992 | /* found a valid queue, use this one */ | ||
6993 | q->visibility_task = | ||
6994 | GNUNET_SCHEDULER_add_at (q->validated_until, | ||
6995 | &core_queue_visibility_check, | ||
6996 | q); | ||
6997 | return; | ||
6998 | } | ||
6999 | } | ||
7000 | n->core_visible = GNUNET_NO; | ||
7001 | |||
7002 | /* Check if _any_ DV route to this neighbour is currently | ||
7003 | valid, if so, do NOT tell core about the loss of direct | ||
7004 | connectivity (DV still counts!) */ | ||
7005 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); | ||
7006 | if (GNUNET_YES == dv->core_visible) | ||
7007 | return; | ||
7008 | /* Nothing works anymore, need to tell CORE about the loss of | ||
7009 | connectivity! */ | ||
7010 | cores_send_disconnect_info (&n->pid); | ||
7011 | } | ||
7012 | |||
7013 | |||
7014 | /** | ||
7015 | * Communicator gave us a transport address validation response. Process the | 6946 | * Communicator gave us a transport address validation response. Process the |
7016 | * request. | 6947 | * request. |
7017 | * | 6948 | * |
@@ -7030,8 +6961,8 @@ handle_validation_response ( | |||
7030 | .vs = NULL}; | 6961 | .vs = NULL}; |
7031 | struct GNUNET_TIME_Absolute origin_time; | 6962 | struct GNUNET_TIME_Absolute origin_time; |
7032 | struct Queue *q; | 6963 | struct Queue *q; |
7033 | struct DistanceVector *dv; | ||
7034 | struct Neighbour *n; | 6964 | struct Neighbour *n; |
6965 | struct VirtualLink *vl; | ||
7035 | 6966 | ||
7036 | /* check this is one of our challenges */ | 6967 | /* check this is one of our challenges */ |
7037 | (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, | 6968 | (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, |
@@ -7129,24 +7060,28 @@ handle_validation_response ( | |||
7129 | q->validated_until = vs->validated_until; | 7060 | q->validated_until = vs->validated_until; |
7130 | q->pd.aged_rtt = vs->validation_rtt; | 7061 | q->pd.aged_rtt = vs->validation_rtt; |
7131 | n = q->neighbour; | 7062 | n = q->neighbour; |
7132 | if (GNUNET_NO != n->core_visible) | 7063 | vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid); |
7133 | return; /* nothing changed, we are done here */ | 7064 | if (NULL != vl) |
7134 | n->core_visible = GNUNET_YES; | 7065 | { |
7135 | q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, | 7066 | /* Link was already up, remember n is also now available and we are done */ |
7136 | &core_queue_visibility_check, | 7067 | vl->n = n; |
7137 | q); | 7068 | return; |
7138 | /* Check if _any_ DV route to this neighbour is | 7069 | } |
7139 | currently valid, if so, do NOT tell core anything! */ | 7070 | vl = GNUNET_new (struct VirtualLink); |
7140 | dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); | 7071 | vl->target = n->pid; |
7141 | if ((NULL != dv) && (GNUNET_YES == dv->core_visible)) | 7072 | vl->n = n; |
7142 | return; /* nothing changed, done */ | 7073 | vl->core_recv_window = RECV_WINDOW_SIZE; |
7143 | /* We lacked a confirmed connection to the neighbour | 7074 | vl->visibility_task = |
7075 | GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); | ||
7076 | GNUNET_break (GNUNET_YES == | ||
7077 | GNUNET_CONTAINER_multipeermap_put ( | ||
7078 | links, | ||
7079 | &vl->target, | ||
7080 | vl, | ||
7081 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
7082 | /* We lacked a confirmed connection to the target | ||
7144 | before, so tell CORE about it (finally!) */ | 7083 | before, so tell CORE about it (finally!) */ |
7145 | cores_send_connect_info (&n->pid, | 7084 | cores_send_connect_info (&n->pid); |
7146 | (NULL != dv) | ||
7147 | ? GNUNET_BANDWIDTH_value_sum (dv->quota_out, | ||
7148 | n->quota_out) | ||
7149 | : n->quota_out); | ||
7150 | } | 7085 | } |
7151 | 7086 | ||
7152 | 7087 | ||
@@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls, | |||
8256 | if (NULL == neighbour) | 8191 | if (NULL == neighbour) |
8257 | { | 8192 | { |
8258 | neighbour = GNUNET_new (struct Neighbour); | 8193 | neighbour = GNUNET_new (struct Neighbour); |
8259 | neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; | ||
8260 | neighbour->pid = aqm->receiver; | 8194 | neighbour->pid = aqm->receiver; |
8261 | GNUNET_assert (GNUNET_OK == | 8195 | GNUNET_assert (GNUNET_OK == |
8262 | GNUNET_CONTAINER_multipeermap_put ( | 8196 | GNUNET_CONTAINER_multipeermap_put ( |
@@ -8872,8 +8806,12 @@ do_shutdown (void *cls) | |||
8872 | NULL); | 8806 | NULL); |
8873 | GNUNET_CONTAINER_multishortmap_destroy (pending_acks); | 8807 | GNUNET_CONTAINER_multishortmap_destroy (pending_acks); |
8874 | pending_acks = NULL; | 8808 | pending_acks = NULL; |
8809 | GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours)); | ||
8875 | GNUNET_CONTAINER_multipeermap_destroy (neighbours); | 8810 | GNUNET_CONTAINER_multipeermap_destroy (neighbours); |
8876 | neighbours = NULL; | 8811 | neighbours = NULL; |
8812 | GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links)); | ||
8813 | GNUNET_CONTAINER_multipeermap_destroy (links); | ||
8814 | links = NULL; | ||
8877 | GNUNET_CONTAINER_multipeermap_iterate (backtalkers, | 8815 | GNUNET_CONTAINER_multipeermap_iterate (backtalkers, |
8878 | &free_backtalker_cb, | 8816 | &free_backtalker_cb, |
8879 | NULL); | 8817 | NULL); |
@@ -8926,6 +8864,7 @@ run (void *cls, | |||
8926 | pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); | 8864 | pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); |
8927 | ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); | 8865 | ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); |
8928 | neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); | 8866 | neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); |
8867 | links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES); | ||
8929 | dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); | 8868 | dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); |
8930 | ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); | 8869 | ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); |
8931 | ephemeral_heap = | 8870 | ephemeral_heap = |
@@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN ( | |||
8995 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, | 8934 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, |
8996 | struct OutboundMessage, | 8935 | struct OutboundMessage, |
8997 | NULL), | 8936 | NULL), |
8937 | GNUNET_MQ_hd_fixed_size (client_recv_ok, | ||
8938 | GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK, | ||
8939 | struct RecvOkMessage, | ||
8940 | NULL), | ||
8998 | /* communication with communicators */ | 8941 | /* communication with communicators */ |
8999 | GNUNET_MQ_hd_var_size (communicator_available, | 8942 | GNUNET_MQ_hd_var_size (communicator_available, |
9000 | GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, | 8943 | GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, |