aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:10 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:20 +0200
commit3f945e6798d8d736ceb104b59ea1269a7abdfe8a (patch)
treeb93e3dc99deda0987e85cb256b3903de8bd74853 /src/transport
parent1227fc30369a55b82e77d35d8d128090e37dd437 (diff)
downloadgnunet-3f945e6798d8d736ceb104b59ea1269a7abdfe8a.tar.gz
gnunet-3f945e6798d8d736ceb104b59ea1269a7abdfe8a.zip
towards flow control in TNG
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-service-tng.c575
-rw-r--r--src/transport/gnunet-transport-profiler.c232
-rw-r--r--src/transport/gnunet-transport.c571
-rw-r--r--src/transport/transport-testing.h201
-rw-r--r--src/transport/transport.h56
-rw-r--r--src/transport/transport_api2_core.c506
-rw-r--r--src/transport/transport_api_core.c260
7 files changed, 1055 insertions, 1346 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index c2922dd7e..825d45522 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,6 +24,11 @@
24 * 24 *
25 * TODO: 25 * TODO:
26 * Implement next: 26 * Implement next:
27 * - complete flow control push back from CORE via TRANSPORT to communicators:
28 * + resume communicators in handle_client_recv_ok (see FIXME)
29 * + count transmissions to CORE and suspend communicators if window is full
30 * - check flow control push back from TRANSPROT to CORE:
31 * + check when to send ACKs
27 * - change transport-core API to provide proper flow control in both 32 * - change transport-core API to provide proper flow control in both
28 * directions, allow multiple messages per peer simultaneously (tag 33 * directions, allow multiple messages per peer simultaneously (tag
29 * confirmations with unique message ID), and replace quota-out with 34 * confirmations with unique message ID), and replace quota-out with
@@ -114,6 +119,16 @@
114#define MAX_DV_DISCOVERY_SELECTION 16 119#define MAX_DV_DISCOVERY_SELECTION 16
115 120
116/** 121/**
122 * Window size. How many messages to the same target do we pass
123 * to CORE without a RECV_OK in between? Small values limit
124 * thoughput, large values will increase latency.
125 *
126 * FIXME-OPTIMIZE: find out what good values are experimentally,
127 * maybe set adaptively (i.e. to observed available bandwidth).
128 */
129#define RECV_WINDOW_SIZE 4
130
131/**
117 * Minimum number of hops we should forward DV learn messages 132 * Minimum number of hops we should forward DV learn messages
118 * even if they are NOT useful for us in hope of looping 133 * even if they are NOT useful for us in hope of looping
119 * back to the initiator? 134 * back to the initiator?
@@ -1100,6 +1115,48 @@ struct PendingMessage;
1100 */ 1115 */
1101struct DistanceVectorHop; 1116struct DistanceVectorHop;
1102 1117
1118/**
1119 * A virtual link is another reachable peer that is known to CORE. It
1120 * can be either a `struct Neighbour` with at least one confirmed
1121 * `struct Queue`, or a `struct DistanceVector` with at least one
1122 * confirmed `struct DistanceVectorHop`. With a virtual link we track
1123 * data that is per neighbour that is not specific to how the
1124 * connectivity is established.
1125 */
1126struct VirtualLink
1127{
1128 /**
1129 * Identity of the peer at the other end of the link.
1130 */
1131 struct GNUNET_PeerIdentity target;
1132
1133 /**
1134 * Task scheduled to possibly notfiy core that this peer is no
1135 * longer counting as confirmed. Runs the #core_visibility_check(),
1136 * which checks that some DV-path or a queue exists that is still
1137 * considered confirmed.
1138 */
1139 struct GNUNET_SCHEDULER_Task *visibility_task;
1140
1141 /**
1142 * Neighbour used by this virtual link, NULL if @e dv is used.
1143 */
1144 struct Neighbour *n;
1145
1146 /**
1147 * Distance vector used by this virtual link, NULL if @e n is used.
1148 */
1149 struct DistanceVector *dv;
1150
1151 /**
1152 * How many more messages can we send to core before we exhaust
1153 * the receive window of CORE for this peer? If this hits zero,
1154 * we must tell communicators to stop providing us more messages
1155 * for this peer.
1156 */
1157 unsigned int core_recv_window;
1158};
1159
1103 1160
1104/** 1161/**
1105 * Data structure kept when we are waiting for an acknowledgement. 1162 * Data structure kept when we are waiting for an acknowledgement.
@@ -1316,31 +1373,10 @@ struct DistanceVector
1316 struct GNUNET_SCHEDULER_Task *timeout_task; 1373 struct GNUNET_SCHEDULER_Task *timeout_task;
1317 1374
1318 /** 1375 /**
1319 * Task scheduled to possibly notfiy core that this queue is no longer 1376 * Do we have a confirmed working queue and are thus visible to
1320 * counting as confirmed. Runs the #core_queue_visibility_check(). 1377 * CORE? If so, this is the virtual link, otherwise NULL.
1321 */
1322 struct GNUNET_SCHEDULER_Task *visibility_task;
1323
1324 /**
1325 * Quota at which CORE is allowed to transmit to this peer
1326 * (note that the value CORE should actually be told is this
1327 * value plus the respective value in `struct Neighbour`).
1328 * Should match the sum of the quotas of all of the paths.
1329 *
1330 * FIXME: not yet set, tricky to get right given multiple paths,
1331 * many of which may be inactive! (=> Idea: measure???)
1332 * FIXME: how do we set this value initially when we tell CORE?
1333 * Options: start at a minimum value or at literally zero?
1334 * (=> Current thought: clean would be zero!)
1335 */
1336 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1337
1338 /**
1339 * Is one of the DV paths in this struct 'confirmed' and thus
1340 * the cause for CORE to see this peer as connected? (Note that
1341 * the same may apply to a `struct Neighbour` at the same time.)
1342 */ 1378 */
1343 int core_visible; 1379 struct VirtualLink *link;
1344}; 1380};
1345 1381
1346 1382
@@ -1451,12 +1487,6 @@ struct Queue
1451 struct GNUNET_SCHEDULER_Task *transmit_task; 1487 struct GNUNET_SCHEDULER_Task *transmit_task;
1452 1488
1453 /** 1489 /**
1454 * Task scheduled to possibly notfiy core that this queue is no longer
1455 * counting as confirmed. Runs the #core_queue_visibility_check().
1456 */
1457 struct GNUNET_SCHEDULER_Task *visibility_task;
1458
1459 /**
1460 * How long do *we* consider this @e address to be valid? In the past or 1490 * How long do *we* consider this @e address to be valid? In the past or
1461 * zero if we have not yet validated it. Can be updated based on 1491 * zero if we have not yet validated it. Can be updated based on
1462 * challenge-response validations (via address validation logic), or when we 1492 * challenge-response validations (via address validation logic), or when we
@@ -1643,11 +1673,6 @@ struct Neighbour
1643 struct Queue *queue_tail; 1673 struct Queue *queue_tail;
1644 1674
1645 /** 1675 /**
1646 * Task run to cleanup pending messages that have exceeded their timeout.
1647 */
1648 struct GNUNET_SCHEDULER_Task *timeout_task;
1649
1650 /**
1651 * Handle for an operation to fetch @e last_dv_learn_monotime information from 1676 * Handle for an operation to fetch @e last_dv_learn_monotime information from
1652 * the PEERSTORE, or NULL. 1677 * the PEERSTORE, or NULL.
1653 */ 1678 */
@@ -1660,18 +1685,10 @@ struct Neighbour
1660 struct GNUNET_PEERSTORE_StoreContext *sc; 1685 struct GNUNET_PEERSTORE_StoreContext *sc;
1661 1686
1662 /** 1687 /**
1663 * Quota at which CORE is allowed to transmit to this peer 1688 * Do we have a confirmed working queue and are thus visible to
1664 * (note that the value CORE should actually be told is this 1689 * CORE? If so, this is the virtual link, otherwise NULL.
1665 * value plus the respective value in `struct DistanceVector`).
1666 * Should match the sum of the quotas of all of the queues.
1667 *
1668 * FIXME: not yet set, tricky to get right given multiple queues!
1669 * (=> Idea: measure???)
1670 * FIXME: how do we set this value initially when we tell CORE?
1671 * Options: start at a minimum value or at literally zero?
1672 * (=> Current thought: clean would be zero!)
1673 */ 1690 */
1674 struct GNUNET_BANDWIDTH_Value32NBO quota_out; 1691 struct VirtualLink *link;
1675 1692
1676 /** 1693 /**
1677 * Latest DVLearn monotonic time seen from this peer. Initialized only 1694 * Latest DVLearn monotonic time seen from this peer. Initialized only
@@ -1680,17 +1697,6 @@ struct Neighbour
1680 struct GNUNET_TIME_Absolute last_dv_learn_monotime; 1697 struct GNUNET_TIME_Absolute last_dv_learn_monotime;
1681 1698
1682 /** 1699 /**
1683 * What is the earliest timeout of any message in @e pending_msg_tail?
1684 */
1685 struct GNUNET_TIME_Absolute earliest_timeout;
1686
1687 /**
1688 * Do we have a confirmed working queue and are thus visible to
1689 * CORE?
1690 */
1691 int core_visible;
1692
1693 /**
1694 * Do we have the lastest value for @e last_dv_learn_monotime from 1700 * Do we have the lastest value for @e last_dv_learn_monotime from
1695 * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? 1701 * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
1696 */ 1702 */
@@ -2417,6 +2423,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
2417static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; 2423static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
2418 2424
2419/** 2425/**
2426 * Map from PIDs to `struct VirtualLink` entries describing
2427 * links CORE knows to exist.
2428 */
2429static struct GNUNET_CONTAINER_MultiPeerMap *links;
2430
2431/**
2420 * Map from challenges to `struct LearnLaunchEntry` values. 2432 * Map from challenges to `struct LearnLaunchEntry` values.
2421 */ 2433 */
2422static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map; 2434static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
@@ -2564,6 +2576,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
2564 2576
2565 2577
2566/** 2578/**
2579 * Free virtual link.
2580 *
2581 * @param vl link data to free
2582 */
2583static void
2584free_virtual_link (struct VirtualLink *vl)
2585{
2586 GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
2587 if (NULL != vl->visibility_task)
2588 {
2589 GNUNET_SCHEDULER_cancel (vl->visibility_task);
2590 vl->visibility_task = NULL;
2591 }
2592 GNUNET_break (NULL == vl->n);
2593 GNUNET_break (NULL == vl->dv);
2594 GNUNET_free (vl);
2595}
2596
2597
2598/**
2567 * Free validation state. 2599 * Free validation state.
2568 * 2600 *
2569 * @param vs validation state to free 2601 * @param vs validation state to free
@@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv)
2684 GNUNET_assert ( 2716 GNUNET_assert (
2685 GNUNET_YES == 2717 GNUNET_YES ==
2686 GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); 2718 GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
2687 if (NULL != dv->visibility_task)
2688 GNUNET_SCHEDULER_cancel (dv->visibility_task);
2689 if (NULL != dv->timeout_task) 2719 if (NULL != dv->timeout_task)
2690 GNUNET_SCHEDULER_cancel (dv->timeout_task); 2720 GNUNET_SCHEDULER_cancel (dv->timeout_task);
2691 GNUNET_free (dv); 2721 GNUNET_free (dv);
@@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour)
2873 GNUNET_CONTAINER_multipeermap_remove (neighbours, 2903 GNUNET_CONTAINER_multipeermap_remove (neighbours,
2874 &neighbour->pid, 2904 &neighbour->pid,
2875 neighbour)); 2905 neighbour));
2876 if (NULL != neighbour->timeout_task)
2877 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
2878 if (NULL != neighbour->reassembly_map) 2906 if (NULL != neighbour->reassembly_map)
2879 { 2907 {
2880 GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, 2908 GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
@@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour)
2917 * 2945 *
2918 * @param tc client to inform (must be CORE client) 2946 * @param tc client to inform (must be CORE client)
2919 * @param pid peer the connection is for 2947 * @param pid peer the connection is for
2920 * @param quota_out current quota for the peer
2921 */ 2948 */
2922static void 2949static void
2923core_send_connect_info (struct TransportClient *tc, 2950core_send_connect_info (struct TransportClient *tc,
2924 const struct GNUNET_PeerIdentity *pid, 2951 const struct GNUNET_PeerIdentity *pid)
2925 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2926{ 2952{
2927 struct GNUNET_MQ_Envelope *env; 2953 struct GNUNET_MQ_Envelope *env;
2928 struct ConnectInfoMessage *cim; 2954 struct ConnectInfoMessage *cim;
2929 2955
2930 GNUNET_assert (CT_CORE == tc->type); 2956 GNUNET_assert (CT_CORE == tc->type);
2931 env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); 2957 env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2932 cim->quota_out = quota_out;
2933 cim->id = *pid; 2958 cim->id = *pid;
2934 GNUNET_MQ_send (tc->mq, env); 2959 GNUNET_MQ_send (tc->mq, env);
2935} 2960}
@@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc,
2939 * Send message to CORE clients that we gained a connection 2964 * Send message to CORE clients that we gained a connection
2940 * 2965 *
2941 * @param pid peer the queue was for 2966 * @param pid peer the queue was for
2942 * @param quota_out current quota for the peer
2943 */ 2967 */
2944static void 2968static void
2945cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, 2969cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
2946 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2947{ 2970{
2948 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2949 "Informing CORE clients about connection to %s\n", 2972 "Informing CORE clients about connection to %s\n",
@@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2952 { 2975 {
2953 if (CT_CORE != tc->type) 2976 if (CT_CORE != tc->type)
2954 continue; 2977 continue;
2955 core_send_connect_info (tc, pid, quota_out); 2978 core_send_connect_info (tc, pid);
2956 } 2979 }
2957} 2980}
2958 2981
@@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
3059 3082
3060 3083
3061/** 3084/**
3062 * Check whether the CORE visibility of @a n changed. If so, 3085 * Task run to check whether the hops of the @a cls still
3063 * check whether we need to notify CORE. 3086 * are validated, or if we need to core about disconnection.
3064 * 3087 *
3065 * @param n neighbour to perform the check for 3088 * @param cls a `struct VirtualLink`
3066 */ 3089 */
3067static void 3090static void
3068update_neighbour_core_visibility (struct Neighbour *n); 3091check_link_down (void *cls)
3092{
3093 struct VirtualLink *vl = cls;
3094 struct DistanceVector *dv = vl->dv;
3095 struct Neighbour *n = vl->n;
3096 struct GNUNET_TIME_Absolute dvh_timeout;
3097 struct GNUNET_TIME_Absolute q_timeout;
3098
3099 vl->visibility_task = NULL;
3100 dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3101 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3102 pos = pos->next_dv)
3103 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
3104 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3105 vl->dv = NULL;
3106 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3107 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3108 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3109 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3110 vl->n = NULL;
3111 if ((NULL == vl->n) && (NULL == vl->dv))
3112 {
3113 cores_send_disconnect_info (&dv->target);
3114 free_virtual_link (vl);
3115 return;
3116 }
3117 vl->visibility_task =
3118 GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3119 &check_link_down,
3120 vl);
3121}
3069 3122
3070 3123
3071/** 3124/**
@@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue)
3083 struct QueueEntry *qe; 3136 struct QueueEntry *qe;
3084 int maxxed; 3137 int maxxed;
3085 struct PendingAcknowledgement *pa; 3138 struct PendingAcknowledgement *pa;
3139 struct VirtualLink *vl;
3086 3140
3087 if (NULL != queue->transmit_task) 3141 if (NULL != queue->transmit_task)
3088 { 3142 {
3089 GNUNET_SCHEDULER_cancel (queue->transmit_task); 3143 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3090 queue->transmit_task = NULL; 3144 queue->transmit_task = NULL;
3091 } 3145 }
3092 if (NULL != queue->visibility_task)
3093 {
3094 GNUNET_SCHEDULER_cancel (queue->visibility_task);
3095 queue->visibility_task = NULL;
3096 }
3097 while (NULL != (pa = queue->pa_head)) 3146 while (NULL != (pa = queue->pa_head))
3098 { 3147 {
3099 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); 3148 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
@@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue)
3139 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); 3188 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3140 GNUNET_free (queue); 3189 GNUNET_free (queue);
3141 3190
3142 update_neighbour_core_visibility (neighbour); 3191 vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
3143 cores_send_disconnect_info (&neighbour->pid); 3192 if ((NULL != vl) && (neighbour == vl->n))
3144 3193 {
3194 GNUNET_SCHEDULER_cancel (vl->visibility_task);
3195 check_link_down (vl);
3196 }
3145 if (NULL == neighbour->queue_head) 3197 if (NULL == neighbour->queue_head)
3146 { 3198 {
3147 free_neighbour (neighbour); 3199 free_neighbour (neighbour);
@@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls,
3281 void *value) 3333 void *value)
3282{ 3334{
3283 struct TransportClient *tc = cls; 3335 struct TransportClient *tc = cls;
3284 struct Neighbour *neighbour = value;
3285 3336
3337 (void) value;
3286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3338 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3287 "Telling new CORE client about existing connection to %s\n", 3339 "Telling new CORE client about existing connection to %s\n",
3288 GNUNET_i2s (pid)); 3340 GNUNET_i2s (pid));
3289 core_send_connect_info (tc, pid, neighbour->quota_out); 3341 core_send_connect_info (tc, pid);
3290 return GNUNET_OK; 3342 return GNUNET_OK;
3291} 3343}
3292 3344
@@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm,
3469 if (NULL != tc) 3521 if (NULL != tc)
3470 { 3522 {
3471 env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); 3523 env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
3472 som->success = htonl ((uint32_t) success);
3473 som->bytes_msg = htons (pm->bytes_msg);
3474 som->bytes_physical = htonl (bytes_physical);
3475 som->peer = target->pid; 3524 som->peer = target->pid;
3476 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3525 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3477 "Confirming %s transmission of %u/%u bytes to %s\n", 3526 "Confirming %s transmission of %u/%u bytes to %s\n",
@@ -3486,45 +3535,6 @@ client_send_response (struct PendingMessage *pm,
3486 3535
3487 3536
3488/** 3537/**
3489 * Checks the message queue for a neighbour for messages that have timed
3490 * out and purges them.
3491 *
3492 * @param cls a `struct Neighbour`
3493 */
3494static void
3495check_queue_timeouts (void *cls)
3496{
3497 struct Neighbour *n = cls;
3498 struct PendingMessage *pm;
3499 struct GNUNET_TIME_Absolute now;
3500 struct GNUNET_TIME_Absolute earliest_timeout;
3501
3502 n->timeout_task = NULL;
3503 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
3504 now = GNUNET_TIME_absolute_get ();
3505 for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
3506 {
3507 pm = pos->next_neighbour;
3508 if (pos->timeout.abs_value_us <= now.abs_value_us)
3509 {
3510 GNUNET_STATISTICS_update (GST_stats,
3511 "# messages dropped (timeout before confirmation)",
3512 1,
3513 GNUNET_NO);
3514 client_send_response (pm, GNUNET_NO, 0);
3515 continue;
3516 }
3517 earliest_timeout =
3518 GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
3519 }
3520 n->earliest_timeout = earliest_timeout;
3521 if (NULL != n->pending_msg_head)
3522 n->timeout_task =
3523 GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
3524}
3525
3526
3527/**
3528 * Create a DV Box message. 3538 * Create a DV Box message.
3529 * 3539 *
3530 * @param total_hops how many hops did the message take so far 3540 * @param total_hops how many hops did the message take so far
@@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3689 const void *payload; 3699 const void *payload;
3690 size_t payload_size; 3700 size_t payload_size;
3691 struct TransportDVBoxMessage *dvb; 3701 struct TransportDVBoxMessage *dvb;
3702 struct VirtualLink *vl;
3692 3703
3693 GNUNET_assert (CT_CORE == tc->type); 3704 GNUNET_assert (CT_CORE == tc->type);
3694 obmm = (const struct GNUNET_MessageHeader *) &obm[1]; 3705 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
3695 bytes_msg = ntohs (obmm->size); 3706 bytes_msg = ntohs (obmm->size);
3696 target = lookup_neighbour (&obm->peer); 3707 vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
3697 if (NULL == target) 3708 if (NULL == vl)
3698 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
3699 else
3700 dv = NULL;
3701 if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible)))
3702 { 3709 {
3703 /* Failure: don't have this peer as a neighbour (anymore). 3710 /* Failure: don't have this peer as a neighbour (anymore).
3704 Might have gone down asynchronously, so this is NOT 3711 Might have gone down asynchronously, so this is NOT
3705 a protocol violation by CORE. Still count the event, 3712 a protocol violation by CORE. Still count the event,
3706 as this should be rare. */ 3713 as this should be rare. */
3707 struct GNUNET_MQ_Envelope *env;
3708 struct SendOkMessage *som;
3709
3710 env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
3711 som->success = htonl (GNUNET_SYSERR);
3712 som->bytes_msg = htonl (bytes_msg);
3713 som->bytes_physical = htonl (0);
3714 som->peer = obm->peer;
3715 GNUNET_MQ_send (tc->mq, env);
3716 GNUNET_SERVICE_client_continue (tc->client); 3714 GNUNET_SERVICE_client_continue (tc->client);
3717 GNUNET_STATISTICS_update (GST_stats, 3715 GNUNET_STATISTICS_update (GST_stats,
3718 "# messages dropped (neighbour unknown)", 3716 "# messages dropped (neighbour unknown)",
@@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3720 GNUNET_NO); 3718 GNUNET_NO);
3721 return; 3719 return;
3722 } 3720 }
3721 target = lookup_neighbour (&obm->peer);
3722 if (NULL == target)
3723 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
3724 else
3725 dv = NULL;
3726 GNUNET_assert ((NULL != target) || (NULL != dv));
3723 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3724 "Sending %u bytes to %s using %s\n", 3728 "Sending %u bytes to %s using %s\n",
3725 bytes_msg, 3729 bytes_msg,
@@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3756 pm->client = tc; 3760 pm->client = tc;
3757 pm->target = target; 3761 pm->target = target;
3758 pm->bytes_msg = payload_size; 3762 pm->bytes_msg = payload_size;
3759 pm->timeout =
3760 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
3761 memcpy (&pm[1], payload, payload_size); 3763 memcpy (&pm[1], payload, payload_size);
3762 GNUNET_free_non_null (dvb); 3764 GNUNET_free_non_null (dvb);
3763 dvb = NULL; 3765 dvb = NULL;
@@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
3777 tc->details.core.pending_msg_head, 3779 tc->details.core.pending_msg_head,
3778 tc->details.core.pending_msg_tail, 3780 tc->details.core.pending_msg_tail,
3779 pm); 3781 pm);
3780 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
3781 {
3782 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
3783 if (NULL != target->timeout_task)
3784 GNUNET_SCHEDULER_cancel (target->timeout_task);
3785 target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
3786 &check_queue_timeouts,
3787 target);
3788 }
3789 if (! was_empty) 3782 if (! was_empty)
3790 return; /* all queues must already be busy */ 3783 return; /* all queues must already be busy */
3791 for (struct Queue *queue = target->queue_head; NULL != queue; 3784 for (struct Queue *queue = target->queue_head; NULL != queue;
@@ -3834,6 +3827,47 @@ check_communicator_available (
3834 3827
3835 3828
3836/** 3829/**
3830 * Client confirms that it is done handling message(s) to a particular
3831 * peer. We may now provide more messages to CORE for this peer.
3832 *
3833 * Notifies the respective queues that more messages can now be received.
3834 *
3835 * @param cls the client
3836 * @param rom the message that was sent
3837 */
3838static void
3839handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
3840{
3841 struct TransportClient *tc = cls;
3842 struct VirtualLink *vl;
3843 uint32_t delta;
3844
3845 if (CT_CORE != tc->type)
3846 {
3847 GNUNET_break (0);
3848 GNUNET_SERVICE_client_drop (tc->client);
3849 return;
3850 }
3851 vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
3852 if (NULL == vl)
3853 {
3854 GNUNET_STATISTICS_update (GST_stats,
3855 "# RECV_OK dropped: virtual link unknown",
3856 1,
3857 GNUNET_NO);
3858 GNUNET_SERVICE_client_continue (tc->client);
3859 return;
3860 }
3861 delta = ntohl (rom->increase_window_delta);
3862 vl->core_recv_window += delta;
3863 if (delta == vl->core_recv_window)
3864 {
3865 // FIXME: resume communicators!
3866 }
3867}
3868
3869
3870/**
3837 * Communicator started. Process the request. 3871 * Communicator started. Process the request.
3838 * 3872 *
3839 * @param cls the client 3873 * @param cls the client
@@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n,
4090 for (struct Queue *pos = n->queue_head; NULL != pos; 4124 for (struct Queue *pos = n->queue_head; NULL != pos;
4091 pos = pos->next_neighbour) 4125 pos = pos->next_neighbour)
4092 { 4126 {
4093 /* Count the queue with the visibility task in all cases, as
4094 otherwise we may end up with no queues just because the
4095 time for the visibility task just expired but the scheduler
4096 just ran this task first */
4097 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || 4127 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
4098 (pos->validated_until.abs_value_us > now.abs_value_us) || 4128 (pos->validated_until.abs_value_us > now.abs_value_us))
4099 (NULL != pos->visibility_task))
4100 candidates++; 4129 candidates++;
4101 } 4130 }
4102 if (0 == candidates) 4131 if (0 == candidates)
4103 { 4132 {
4104 /* Given that we above check for pos->visibility task, 4133 /* This can happen rarely if the last confirmed queue timed
4105 this should be strictly impossible. */ 4134 out just as we were beginning to process this message. */
4106 GNUNET_break (0); 4135 GNUNET_STATISTICS_update (GST_stats,
4136 "# route selection failed (all no valid queue)",
4137 1,
4138 GNUNET_NO);
4107 return; 4139 return;
4108 } 4140 }
4109 sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); 4141 sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
@@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n,
4115 for (struct Queue *pos = n->queue_head; NULL != pos; 4147 for (struct Queue *pos = n->queue_head; NULL != pos;
4116 pos = pos->next_neighbour) 4148 pos = pos->next_neighbour)
4117 { 4149 {
4118 /* Count the queue with the visibility task in all cases, as 4150 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
4119 otherwise we may end up with no queues just because the 4151 (pos->validated_until.abs_value_us > now.abs_value_us))
4120 time for the visibility task just expired but the scheduler
4121 just ran this task first */
4122 if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
4123 (NULL != pos->visibility_task))
4124 { 4152 {
4125 if ((sel1 == candidates) || (sel2 == candidates)) 4153 if ((sel1 == candidates) || (sel2 == candidates))
4126 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); 4154 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
@@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target,
4197 struct GNUNET_MessageHeader *hdr, 4225 struct GNUNET_MessageHeader *hdr,
4198 enum RouteMessageOptions options) 4226 enum RouteMessageOptions options)
4199{ 4227{
4228 struct VirtualLink *vl;
4200 struct Neighbour *n; 4229 struct Neighbour *n;
4201 struct DistanceVector *dv; 4230 struct DistanceVector *dv;
4202 4231
4203 n = lookup_neighbour (target); 4232 vl = GNUNET_CONTAINER_multipeermap_get (links, target);
4204 dv = (0 != (options & RMO_DV_ALLOWED)) 4233 n = vl->n;
4205 ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target) 4234 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4206 : NULL;
4207 if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) 4235 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
4208 { 4236 {
4209 /* if confirmed is required, and we do not have anything 4237 /* if confirmed is required, and we do not have anything
4210 confirmed, drop respective options */ 4238 confirmed, drop respective options */
4211 if ((NULL != n) && (GNUNET_NO == n->core_visible)) 4239 if (NULL == n)
4212 n = NULL; 4240 n = lookup_neighbour (target);
4213 if ((NULL != dv) && (GNUNET_NO == dv->core_visible)) 4241 if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
4214 dv = NULL; 4242 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
4215 } 4243 }
4216 if ((NULL == n) && (NULL == dv)) 4244 if ((NULL == n) && (NULL == dv))
4217 { 4245 {
@@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls)
5758 GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); 5786 GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
5759} 5787}
5760 5788
5761/**
5762 * Task run to check whether the hops of the @a cls still
5763 * are validated, or if we need to core about disconnection.
5764 *
5765 * @param cls a `struct DistanceVector` (with core_visible set!)
5766 */
5767static void
5768check_dv_path_down (void *cls)
5769{
5770 struct DistanceVector *dv = cls;
5771 struct Neighbour *n;
5772
5773 dv->visibility_task = NULL;
5774 GNUNET_assert (GNUNET_YES == dv->core_visible);
5775 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5776 pos = pos->next_dv)
5777 {
5778 if (0 <
5779 GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
5780 {
5781 dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
5782 &check_dv_path_down,
5783 dv);
5784 return;
5785 }
5786 }
5787 /* all paths invalid, make dv core-invisible */
5788 dv->core_visible = GNUNET_NO;
5789 n = lookup_neighbour (&dv->target);
5790 if ((NULL != n) && (GNUNET_YES == n->core_visible))
5791 return; /* no need to tell core, connection still up! */
5792 cores_send_disconnect_info (&dv->target);
5793}
5794
5795 5789
5796/** 5790/**
5797 * The @a hop is a validated path to the respective target 5791 * The @a hop is a validated path to the respective target
@@ -5804,22 +5798,30 @@ static void
5804activate_core_visible_dv_path (struct DistanceVectorHop *hop) 5798activate_core_visible_dv_path (struct DistanceVectorHop *hop)
5805{ 5799{
5806 struct DistanceVector *dv = hop->dv; 5800 struct DistanceVector *dv = hop->dv;
5807 struct Neighbour *n; 5801 struct VirtualLink *vl;
5808
5809 GNUNET_assert (GNUNET_NO == dv->core_visible);
5810 GNUNET_assert (NULL == dv->visibility_task);
5811 5802
5812 dv->core_visible = GNUNET_YES; 5803 vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
5813 dv->visibility_task = 5804 if (NULL != vl)
5814 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv); 5805 {
5815 n = lookup_neighbour (&dv->target); 5806 /* Link was already up, remember dv is also now available and we are done */
5816 if ((NULL != n) && (GNUNET_YES == n->core_visible)) 5807 vl->dv = dv;
5817 return; /* no need to tell core, connection already up! */ 5808 return;
5818 cores_send_connect_info (&dv->target, 5809 }
5819 (NULL != n) 5810 vl = GNUNET_new (struct VirtualLink);
5820 ? GNUNET_BANDWIDTH_value_sum (n->quota_out, 5811 vl->target = dv->target;
5821 dv->quota_out) 5812 vl->dv = dv;
5822 : dv->quota_out); 5813 vl->core_recv_window = RECV_WINDOW_SIZE;
5814 vl->visibility_task =
5815 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
5816 GNUNET_break (GNUNET_YES ==
5817 GNUNET_CONTAINER_multipeermap_put (
5818 links,
5819 &vl->target,
5820 vl,
5821 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
5822 /* We lacked a confirmed connection to the target
5823 before, so tell CORE about it (finally!) */
5824 cores_send_connect_info (&dv->target);
5823} 5825}
5824 5826
5825 5827
@@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
5934 GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); 5936 GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
5935 GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); 5937 GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
5936 GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); 5938 GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
5937 if ((GNUNET_NO == dv->core_visible) && 5939 if (0 <
5938 (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until) 5940 GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
5939 .rel_value_us))
5940 activate_core_visible_dv_path (pos); 5941 activate_core_visible_dv_path (pos);
5941 if (last_timeout.rel_value_us < 5942 if (last_timeout.rel_value_us <
5942 GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, 5943 GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
@@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
5976 next_hop->dv_head, 5977 next_hop->dv_head,
5977 next_hop->dv_tail, 5978 next_hop->dv_tail,
5978 hop); 5979 hop);
5979 if ((GNUNET_NO == dv->core_visible) && 5980 if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
5980 (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
5981 activate_core_visible_dv_path (hop); 5981 activate_core_visible_dv_path (hop);
5982 return GNUNET_YES; 5982 return GNUNET_YES;
5983} 5983}
@@ -6943,75 +6943,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
6943 6943
6944 6944
6945/** 6945/**
6946 * Task run periodically to check whether the validity of the given queue has
6947 * run its course. If so, finds either another queue to take over, or clears
6948 * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
6949 * chance to take over, and if that fails, notifies CORE about the disconnect.
6950 *
6951 * @param cls a `struct Queue`
6952 */
6953static void
6954core_queue_visibility_check (void *cls)
6955{
6956 struct Queue *q = cls;
6957
6958 q->visibility_task = NULL;
6959 if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
6960 {
6961 q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
6962 &core_queue_visibility_check,
6963 q);
6964 return;
6965 }
6966 update_neighbour_core_visibility (q->neighbour);
6967}
6968
6969
6970/**
6971 * Check whether the CORE visibility of @a n should change. Finds either a
6972 * queue to preserve the visibility, or clears the neighbour's `core_visible`
6973 * flag. In the latter case, gives DV routes a chance to take over, and if
6974 * that fails, notifies CORE about the disconnect. If so, check whether we
6975 * need to notify CORE.
6976 *
6977 * @param n neighbour to perform the check for
6978 */
6979static void
6980update_neighbour_core_visibility (struct Neighbour *n)
6981{
6982 struct DistanceVector *dv;
6983
6984 GNUNET_assert (GNUNET_YES == n->core_visible);
6985 /* Check if _any_ queue of this neighbour is still valid, if so, schedule
6986 the #core_queue_visibility_check() task for that queue */
6987 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
6988 {
6989 if (0 !=
6990 GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
6991 {
6992 /* found a valid queue, use this one */
6993 q->visibility_task =
6994 GNUNET_SCHEDULER_add_at (q->validated_until,
6995 &core_queue_visibility_check,
6996 q);
6997 return;
6998 }
6999 }
7000 n->core_visible = GNUNET_NO;
7001
7002 /* Check if _any_ DV route to this neighbour is currently
7003 valid, if so, do NOT tell core about the loss of direct
7004 connectivity (DV still counts!) */
7005 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
7006 if (GNUNET_YES == dv->core_visible)
7007 return;
7008 /* Nothing works anymore, need to tell CORE about the loss of
7009 connectivity! */
7010 cores_send_disconnect_info (&n->pid);
7011}
7012
7013
7014/**
7015 * Communicator gave us a transport address validation response. Process the 6946 * Communicator gave us a transport address validation response. Process the
7016 * request. 6947 * request.
7017 * 6948 *
@@ -7030,8 +6961,8 @@ handle_validation_response (
7030 .vs = NULL}; 6961 .vs = NULL};
7031 struct GNUNET_TIME_Absolute origin_time; 6962 struct GNUNET_TIME_Absolute origin_time;
7032 struct Queue *q; 6963 struct Queue *q;
7033 struct DistanceVector *dv;
7034 struct Neighbour *n; 6964 struct Neighbour *n;
6965 struct VirtualLink *vl;
7035 6966
7036 /* check this is one of our challenges */ 6967 /* check this is one of our challenges */
7037 (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, 6968 (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -7129,24 +7060,28 @@ handle_validation_response (
7129 q->validated_until = vs->validated_until; 7060 q->validated_until = vs->validated_until;
7130 q->pd.aged_rtt = vs->validation_rtt; 7061 q->pd.aged_rtt = vs->validation_rtt;
7131 n = q->neighbour; 7062 n = q->neighbour;
7132 if (GNUNET_NO != n->core_visible) 7063 vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
7133 return; /* nothing changed, we are done here */ 7064 if (NULL != vl)
7134 n->core_visible = GNUNET_YES; 7065 {
7135 q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, 7066 /* Link was already up, remember n is also now available and we are done */
7136 &core_queue_visibility_check, 7067 vl->n = n;
7137 q); 7068 return;
7138 /* Check if _any_ DV route to this neighbour is 7069 }
7139 currently valid, if so, do NOT tell core anything! */ 7070 vl = GNUNET_new (struct VirtualLink);
7140 dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); 7071 vl->target = n->pid;
7141 if ((NULL != dv) && (GNUNET_YES == dv->core_visible)) 7072 vl->n = n;
7142 return; /* nothing changed, done */ 7073 vl->core_recv_window = RECV_WINDOW_SIZE;
7143 /* We lacked a confirmed connection to the neighbour 7074 vl->visibility_task =
7075 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
7076 GNUNET_break (GNUNET_YES ==
7077 GNUNET_CONTAINER_multipeermap_put (
7078 links,
7079 &vl->target,
7080 vl,
7081 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
7082 /* We lacked a confirmed connection to the target
7144 before, so tell CORE about it (finally!) */ 7083 before, so tell CORE about it (finally!) */
7145 cores_send_connect_info (&n->pid, 7084 cores_send_connect_info (&n->pid);
7146 (NULL != dv)
7147 ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
7148 n->quota_out)
7149 : n->quota_out);
7150} 7085}
7151 7086
7152 7087
@@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls,
8256 if (NULL == neighbour) 8191 if (NULL == neighbour)
8257 { 8192 {
8258 neighbour = GNUNET_new (struct Neighbour); 8193 neighbour = GNUNET_new (struct Neighbour);
8259 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
8260 neighbour->pid = aqm->receiver; 8194 neighbour->pid = aqm->receiver;
8261 GNUNET_assert (GNUNET_OK == 8195 GNUNET_assert (GNUNET_OK ==
8262 GNUNET_CONTAINER_multipeermap_put ( 8196 GNUNET_CONTAINER_multipeermap_put (
@@ -8872,8 +8806,12 @@ do_shutdown (void *cls)
8872 NULL); 8806 NULL);
8873 GNUNET_CONTAINER_multishortmap_destroy (pending_acks); 8807 GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
8874 pending_acks = NULL; 8808 pending_acks = NULL;
8809 GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
8875 GNUNET_CONTAINER_multipeermap_destroy (neighbours); 8810 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
8876 neighbours = NULL; 8811 neighbours = NULL;
8812 GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
8813 GNUNET_CONTAINER_multipeermap_destroy (links);
8814 links = NULL;
8877 GNUNET_CONTAINER_multipeermap_iterate (backtalkers, 8815 GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
8878 &free_backtalker_cb, 8816 &free_backtalker_cb,
8879 NULL); 8817 NULL);
@@ -8926,6 +8864,7 @@ run (void *cls,
8926 pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); 8864 pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
8927 ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); 8865 ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
8928 neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); 8866 neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
8867 links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
8929 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); 8868 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
8930 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); 8869 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
8931 ephemeral_heap = 8870 ephemeral_heap =
@@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN (
8995 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 8934 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
8996 struct OutboundMessage, 8935 struct OutboundMessage,
8997 NULL), 8936 NULL),
8937 GNUNET_MQ_hd_fixed_size (client_recv_ok,
8938 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
8939 struct RecvOkMessage,
8940 NULL),
8998 /* communication with communicators */ 8941 /* communication with communicators */
8999 GNUNET_MQ_hd_var_size (communicator_available, 8942 GNUNET_MQ_hd_var_size (communicator_available,
9000 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, 8943 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
diff --git a/src/transport/gnunet-transport-profiler.c b/src/transport/gnunet-transport-profiler.c
index 9160a78b2..89f5b4108 100644
--- a/src/transport/gnunet-transport-profiler.c
+++ b/src/transport/gnunet-transport-profiler.c
@@ -32,7 +32,6 @@
32#include "gnunet_protocols.h" 32#include "gnunet_protocols.h"
33#include "gnunet_ats_service.h" 33#include "gnunet_ats_service.h"
34#include "gnunet_transport_service.h" 34#include "gnunet_transport_service.h"
35#include "gnunet_transport_core_service.h"
36 35
37 36
38struct Iteration 37struct Iteration
@@ -54,7 +53,8 @@ struct Iteration
54/** 53/**
55 * Timeout for a connections 54 * Timeout for a connections
56 */ 55 */
57#define CONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 56#define CONNECT_TIMEOUT \
57 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58 58
59/** 59/**
60 * Benchmarking block size in bye 60 * Benchmarking block size in bye
@@ -214,15 +214,16 @@ shutdown_task (void *cls)
214 { 214 {
215 inext = icur->next; 215 inext = icur->next;
216 icur->rate = ((benchmark_count * benchmark_size) / 1024) / 216 icur->rate = ((benchmark_count * benchmark_size) / 1024) /
217 ((float) icur->dur.rel_value_us / (1000 * 1000)); 217 ((float) icur->dur.rel_value_us / (1000 * 1000));
218 if (verbosity > 0) 218 if (verbosity > 0)
219 FPRINTF (stdout, _("%llu B in %llu ms == %.2f KB/s!\n"), 219 FPRINTF (stdout,
220 ((long long unsigned int) benchmark_count * benchmark_size), 220 _ ("%llu B in %llu ms == %.2f KB/s!\n"),
221 ((long long unsigned int) icur->dur.rel_value_us / 1000), 221 ((long long unsigned int) benchmark_count * benchmark_size),
222 (float) icur->rate); 222 ((long long unsigned int) icur->dur.rel_value_us / 1000),
223 (float) icur->rate);
223 224
224 avg_duration += icur->dur.rel_value_us / (1000); 225 avg_duration += icur->dur.rel_value_us / (1000);
225 avg_rate += icur->rate; 226 avg_rate += icur->rate;
226 iterations++; 227 iterations++;
227 } 228 }
228 if (0 == iterations) 229 if (0 == iterations)
@@ -238,19 +239,17 @@ shutdown_task (void *cls)
238 while (NULL != (icur = inext)) 239 while (NULL != (icur = inext))
239 { 240 {
240 inext = icur->next; 241 inext = icur->next;
241 stddev_rate += ((icur->rate-avg_rate) * 242 stddev_rate += ((icur->rate - avg_rate) * (icur->rate - avg_rate));
242 (icur->rate-avg_rate));
243 stddev_duration += (((icur->dur.rel_value_us / 1000) - avg_duration) * 243 stddev_duration += (((icur->dur.rel_value_us / 1000) - avg_duration) *
244 ((icur->dur.rel_value_us / 1000) - avg_duration)); 244 ((icur->dur.rel_value_us / 1000) - avg_duration));
245
246 } 245 }
247 /* Calculate standard deviation rate */ 246 /* Calculate standard deviation rate */
248 stddev_rate = stddev_rate / iterations; 247 stddev_rate = stddev_rate / iterations;
249 stddev_rate = sqrtf(stddev_rate); 248 stddev_rate = sqrtf (stddev_rate);
250 249
251 /* Calculate standard deviation duration */ 250 /* Calculate standard deviation duration */
252 stddev_duration = stddev_duration / iterations; 251 stddev_duration = stddev_duration / iterations;
253 stddev_duration = sqrtf(stddev_duration); 252 stddev_duration = sqrtf (stddev_duration);
254 253
255 /* Output */ 254 /* Output */
256 FPRINTF (stdout, 255 FPRINTF (stdout,
@@ -266,9 +265,7 @@ shutdown_task (void *cls)
266 while (NULL != (icur = inext)) 265 while (NULL != (icur = inext))
267 { 266 {
268 inext = icur->next; 267 inext = icur->next;
269 GNUNET_CONTAINER_DLL_remove (ihead, 268 GNUNET_CONTAINER_DLL_remove (ihead, itail, icur);
270 itail,
271 icur);
272 269
273 FPRINTF (stdout, 270 FPRINTF (stdout,
274 ";%llu;%.2f", 271 ";%llu;%.2f",
@@ -316,27 +313,19 @@ send_msg (void *cls)
316 313
317 if (NULL == mq) 314 if (NULL == mq)
318 return; 315 return;
319 env = GNUNET_MQ_msg_extra (m, 316 env = GNUNET_MQ_msg_extra (m, benchmark_size, GNUNET_MESSAGE_TYPE_DUMMY);
320 benchmark_size, 317 memset (&m[1], 52, benchmark_size - sizeof (struct GNUNET_MessageHeader));
321 GNUNET_MESSAGE_TYPE_DUMMY); 318
322 memset (&m[1],
323 52,
324 benchmark_size - sizeof(struct GNUNET_MessageHeader));
325
326 if (itail->msgs_sent < benchmark_count) 319 if (itail->msgs_sent < benchmark_count)
327 { 320 {
328 GNUNET_MQ_notify_sent (env, 321 GNUNET_MQ_notify_sent (env, &send_msg, NULL);
329 &send_msg,
330 NULL);
331 } 322 }
332 else 323 else
333 { 324 {
334 iteration_done (); 325 iteration_done ();
335 } 326 }
336 GNUNET_MQ_send (mq, 327 GNUNET_MQ_send (mq, env);
337 env); 328 if ((verbosity > 0) && (0 == itail->msgs_sent % 10))
338 if ( (verbosity > 0) &&
339 (0 == itail->msgs_sent % 10) )
340 FPRINTF (stdout, "."); 329 FPRINTF (stdout, ".");
341} 330}
342 331
@@ -351,15 +340,14 @@ iteration_start ()
351 return; 340 return;
352 benchmark_running = GNUNET_YES; 341 benchmark_running = GNUNET_YES;
353 icur = GNUNET_new (struct Iteration); 342 icur = GNUNET_new (struct Iteration);
354 GNUNET_CONTAINER_DLL_insert_tail (ihead, 343 GNUNET_CONTAINER_DLL_insert_tail (ihead, itail, icur);
355 itail, 344 icur->start = GNUNET_TIME_absolute_get ();
356 icur);
357 icur->start = GNUNET_TIME_absolute_get();
358 if (verbosity > 0) 345 if (verbosity > 0)
359 FPRINTF (stdout, 346 FPRINTF (
360 "\nStarting benchmark, starting to send %u messages in %u byte blocks\n", 347 stdout,
361 benchmark_count, 348 "\nStarting benchmark, starting to send %u messages in %u byte blocks\n",
362 benchmark_size); 349 benchmark_count,
350 benchmark_size);
363 send_msg (NULL); 351 send_msg (NULL);
364} 352}
365 353
@@ -393,22 +381,16 @@ iteration_done ()
393static void * 381static void *
394notify_connect (void *cls, 382notify_connect (void *cls,
395 const struct GNUNET_PeerIdentity *peer, 383 const struct GNUNET_PeerIdentity *peer,
396 struct GNUNET_MQ_Handle *m) 384 struct GNUNET_MQ_Handle *m)
397{ 385{
398 if (0 != memcmp (&pid, 386 if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
399 peer,
400 sizeof(struct GNUNET_PeerIdentity)))
401 { 387 {
402 FPRINTF (stdout, 388 FPRINTF (stdout, "Connected to different peer `%s'\n", GNUNET_i2s (&pid));
403 "Connected to different peer `%s'\n",
404 GNUNET_i2s (&pid));
405 return NULL; 389 return NULL;
406 } 390 }
407 391
408 if (verbosity > 0) 392 if (verbosity > 0)
409 FPRINTF (stdout, 393 FPRINTF (stdout, "Successfully connected to `%s'\n", GNUNET_i2s (&pid));
410 "Successfully connected to `%s'\n",
411 GNUNET_i2s (&pid));
412 mq = m; 394 mq = m;
413 iteration_start (); 395 iteration_start ();
414 return NULL; 396 return NULL;
@@ -426,18 +408,16 @@ notify_connect (void *cls,
426static void 408static void
427notify_disconnect (void *cls, 409notify_disconnect (void *cls,
428 const struct GNUNET_PeerIdentity *peer, 410 const struct GNUNET_PeerIdentity *peer,
429 void *internal_cls) 411 void *internal_cls)
430{ 412{
431 if (0 != memcmp (&pid, 413 if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
432 peer,
433 sizeof(struct GNUNET_PeerIdentity)))
434 return; 414 return;
435 mq = NULL; 415 mq = NULL;
436 if (GNUNET_YES == benchmark_running) 416 if (GNUNET_YES == benchmark_running)
437 { 417 {
438 FPRINTF (stdout, 418 FPRINTF (stdout,
439 "Disconnected from peer `%s' while benchmarking\n", 419 "Disconnected from peer `%s' while benchmarking\n",
440 GNUNET_i2s (&pid)); 420 GNUNET_i2s (&pid));
441 return; 421 return;
442 } 422 }
443} 423}
@@ -451,8 +431,7 @@ notify_disconnect (void *cls,
451 * @return #GNUNET_OK 431 * @return #GNUNET_OK
452 */ 432 */
453static int 433static int
454check_dummy (void *cls, 434check_dummy (void *cls, const struct GNUNET_MessageHeader *message)
455 const struct GNUNET_MessageHeader *message)
456{ 435{
457 return GNUNET_OK; /* all messages are fine */ 436 return GNUNET_OK; /* all messages are fine */
458} 437}
@@ -465,30 +444,24 @@ check_dummy (void *cls,
465 * @param message the message 444 * @param message the message
466 */ 445 */
467static void 446static void
468handle_dummy (void *cls, 447handle_dummy (void *cls, const struct GNUNET_MessageHeader *message)
469 const struct GNUNET_MessageHeader *message)
470{ 448{
471 if (! benchmark_receive) 449 if (! benchmark_receive)
472 return; 450 return;
473 if (verbosity > 0) 451 if (verbosity > 0)
474 FPRINTF (stdout, 452 FPRINTF (stdout,
475 "Received %u bytes\n", 453 "Received %u bytes\n",
476 (unsigned int) ntohs (message->size)); 454 (unsigned int) ntohs (message->size));
477} 455}
478 456
479 457
480static int 458static int
481blacklist_cb (void *cls, 459blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
482 const struct GNUNET_PeerIdentity *peer)
483{ 460{
484 if (0 != memcmp (&pid, 461 if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
485 peer,
486 sizeof(struct GNUNET_PeerIdentity)))
487 { 462 {
488 if (verbosity > 0) 463 if (verbosity > 0)
489 FPRINTF (stdout, 464 FPRINTF (stdout, "Denying connection to `%s'\n", GNUNET_i2s (peer));
490 "Denying connection to `%s'\n",
491 GNUNET_i2s (peer));
492 return GNUNET_SYSERR; 465 return GNUNET_SYSERR;
493 } 466 }
494 return GNUNET_OK; 467 return GNUNET_OK;
@@ -509,38 +482,32 @@ run (void *cls,
509 const char *cfgfile, 482 const char *cfgfile,
510 const struct GNUNET_CONFIGURATION_Handle *mycfg) 483 const struct GNUNET_CONFIGURATION_Handle *mycfg)
511{ 484{
512 struct GNUNET_MQ_MessageHandler handlers[] = { 485 struct GNUNET_MQ_MessageHandler handlers[] =
513 GNUNET_MQ_hd_var_size (dummy, 486 {GNUNET_MQ_hd_var_size (dummy,
514 GNUNET_MESSAGE_TYPE_DUMMY, 487 GNUNET_MESSAGE_TYPE_DUMMY,
515 struct GNUNET_MessageHeader, 488 struct GNUNET_MessageHeader,
516 NULL), 489 NULL),
517 GNUNET_MQ_handler_end () 490 GNUNET_MQ_handler_end ()};
518 }; 491
519
520 cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; 492 cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg;
521 493
522 ret = 1; 494 ret = 1;
523 if (GNUNET_MAX_MESSAGE_SIZE <= benchmark_size) 495 if (GNUNET_MAX_MESSAGE_SIZE <= benchmark_size)
524 { 496 {
525 FPRINTF (stderr, 497 FPRINTF (stderr, "Message size too big!\n");
526 "Message size too big!\n");
527 return; 498 return;
528 } 499 }
529 500
530 if (NULL == cpid) 501 if (NULL == cpid)
531 { 502 {
532 FPRINTF (stderr, 503 FPRINTF (stderr, "No peer identity given\n");
533 "No peer identity given\n");
534 return; 504 return;
535 } 505 }
536 if (GNUNET_OK != 506 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_public_key_from_string (cpid,
537 GNUNET_CRYPTO_eddsa_public_key_from_string (cpid, 507 strlen (cpid),
538 strlen (cpid), 508 &pid.public_key))
539 &pid.public_key))
540 { 509 {
541 FPRINTF (stderr, 510 FPRINTF (stderr, "Failed to parse peer identity `%s'\n", cpid);
542 "Failed to parse peer identity `%s'\n",
543 cpid);
544 return; 511 return;
545 } 512 }
546 if (1 == benchmark_send) 513 if (1 == benchmark_send)
@@ -548,7 +515,8 @@ run (void *cls,
548 if (verbosity > 0) 515 if (verbosity > 0)
549 FPRINTF (stderr, 516 FPRINTF (stderr,
550 "Trying to send %u messages with size %u to peer `%s'\n", 517 "Trying to send %u messages with size %u to peer `%s'\n",
551 benchmark_count, benchmark_size, 518 benchmark_count,
519 benchmark_size,
552 GNUNET_i2s (&pid)); 520 GNUNET_i2s (&pid));
553 } 521 }
554 else if (1 == benchmark_receive) 522 else if (1 == benchmark_receive)
@@ -559,50 +527,42 @@ run (void *cls,
559 } 527 }
560 else 528 else
561 { 529 {
562 FPRINTF (stderr, 530 FPRINTF (stderr, "No operation given\n");
563 "No operation given\n");
564 return; 531 return;
565 } 532 }
566 533
567 ats = GNUNET_ATS_connectivity_init (cfg); 534 ats = GNUNET_ATS_connectivity_init (cfg);
568 if (NULL == ats) 535 if (NULL == ats)
569 { 536 {
570 FPRINTF (stderr, 537 FPRINTF (stderr, "Failed to connect to ATS service\n");
571 "Failed to connect to ATS service\n");
572 ret = 1; 538 ret = 1;
573 return; 539 return;
574 } 540 }
575 541
576 handle = GNUNET_TRANSPORT_core_connect (cfg, 542 handle = GNUNET_TRANSPORT_core_connect (cfg,
577 NULL, 543 NULL,
578 handlers, 544 handlers,
579 NULL, 545 NULL,
580 &notify_connect, 546 &notify_connect,
581 &notify_disconnect, 547 &notify_disconnect,
582 NULL); 548 NULL);
583 if (NULL == handle) 549 if (NULL == handle)
584 { 550 {
585 FPRINTF (stderr, 551 FPRINTF (stderr, "Failed to connect to transport service\n");
586 "Failed to connect to transport service\n");
587 GNUNET_ATS_connectivity_done (ats); 552 GNUNET_ATS_connectivity_done (ats);
588 ats = NULL; 553 ats = NULL;
589 ret = 1; 554 ret = 1;
590 return; 555 return;
591 } 556 }
592 557
593 bl_handle = GNUNET_TRANSPORT_blacklist (cfg, 558 bl_handle = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL);
594 &blacklist_cb, 559 ats_sh = GNUNET_ATS_connectivity_suggest (ats, &pid, 1);
595 NULL); 560 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
596 ats_sh = GNUNET_ATS_connectivity_suggest (ats,
597 &pid,
598 1);
599 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
600 NULL);
601} 561}
602 562
603 563
604int 564int
605main (int argc, char * const *argv) 565main (int argc, char *const *argv)
606{ 566{
607 int res; 567 int res;
608 benchmark_count = DEFAULT_MESSAGE_COUNT; 568 benchmark_count = DEFAULT_MESSAGE_COUNT;
@@ -613,46 +573,48 @@ main (int argc, char * const *argv)
613 struct GNUNET_GETOPT_CommandLineOption options[] = { 573 struct GNUNET_GETOPT_CommandLineOption options[] = {
614 574
615 GNUNET_GETOPT_option_flag ('s', 575 GNUNET_GETOPT_option_flag ('s',
616 "send", 576 "send",
617 gettext_noop ("send data to peer"), 577 gettext_noop ("send data to peer"),
618 &benchmark_send), 578 &benchmark_send),
619 GNUNET_GETOPT_option_flag ('r', 579 GNUNET_GETOPT_option_flag ('r',
620 "receive", 580 "receive",
621 gettext_noop ("receive data from peer"), 581 gettext_noop ("receive data from peer"),
622 &benchmark_receive), 582 &benchmark_receive),
623 GNUNET_GETOPT_option_uint ('i', 583 GNUNET_GETOPT_option_uint ('i',
624 "iterations", 584 "iterations",
625 NULL, 585 NULL,
626 gettext_noop ("iterations"), 586 gettext_noop ("iterations"),
627 &benchmark_iterations), 587 &benchmark_iterations),
628 GNUNET_GETOPT_option_uint ('n', 588 GNUNET_GETOPT_option_uint ('n',
629 "number", 589 "number",
630 NULL, 590 NULL,
631 gettext_noop ("number of messages to send"), 591 gettext_noop ("number of messages to send"),
632 &benchmark_count), 592 &benchmark_count),
633 GNUNET_GETOPT_option_uint ('m', 593 GNUNET_GETOPT_option_uint ('m',
634 "messagesize", 594 "messagesize",
635 NULL, 595 NULL,
636 gettext_noop ("message size to use"), 596 gettext_noop ("message size to use"),
637 &benchmark_size), 597 &benchmark_size),
638 GNUNET_GETOPT_option_string ('p', 598 GNUNET_GETOPT_option_string ('p',
639 "peer", 599 "peer",
640 "PEER", 600 "PEER",
641 gettext_noop ("peer identity"), 601 gettext_noop ("peer identity"),
642 &cpid), 602 &cpid),
643 GNUNET_GETOPT_option_verbose (&verbosity), 603 GNUNET_GETOPT_option_verbose (&verbosity),
644 GNUNET_GETOPT_OPTION_END 604 GNUNET_GETOPT_OPTION_END};
645 };
646 605
647 if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) 606 if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
648 return 2; 607 return 2;
649 608
650 res = GNUNET_PROGRAM_run (argc, argv, 609 res =
651 "gnunet-transport", 610 GNUNET_PROGRAM_run (argc,
652 gettext_noop ("Direct access to transport service."), 611 argv,
653 options, 612 "gnunet-transport",
654 &run, NULL); 613 gettext_noop ("Direct access to transport service."),
655 GNUNET_free((void *) argv); 614 options,
615 &run,
616 NULL);
617 GNUNET_free ((void *) argv);
656 if (GNUNET_OK == res) 618 if (GNUNET_OK == res)
657 return ret; 619 return ret;
658 return 1; 620 return 1;
diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c
index c3c1afc38..36c8fc451 100644
--- a/src/transport/gnunet-transport.c
+++ b/src/transport/gnunet-transport.c
@@ -29,12 +29,12 @@
29#include "gnunet_resolver_service.h" 29#include "gnunet_resolver_service.h"
30#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
31#include "gnunet_transport_service.h" 31#include "gnunet_transport_service.h"
32#include "gnunet_transport_core_service.h"
33 32
34/** 33/**
35 * Timeout for a name resolution 34 * Timeout for a name resolution
36 */ 35 */
37#define RESOLUTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 36#define RESOLUTION_TIMEOUT \
37 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
38 38
39/** 39/**
40 * Timeout for an operation 40 * Timeout for an operation
@@ -332,16 +332,13 @@ static struct PeerResolutionContext *rc_tail;
332 * @return #GNUNET_OK (continue to iterate) 332 * @return #GNUNET_OK (continue to iterate)
333 */ 333 */
334static int 334static int
335destroy_it (void *cls, 335destroy_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
336 const struct GNUNET_PeerIdentity *key,
337 void *value)
338{ 336{
339 struct MonitoredPeer *m = value; 337 struct MonitoredPeer *m = value;
340 338
341 GNUNET_assert (GNUNET_OK == 339 GNUNET_assert (
342 GNUNET_CONTAINER_multipeermap_remove (monitored_peers, 340 GNUNET_OK ==
343 key, 341 GNUNET_CONTAINER_multipeermap_remove (monitored_peers, key, value));
344 value));
345 GNUNET_free_non_null (m->address); 342 GNUNET_free_non_null (m->address);
346 GNUNET_free (value); 343 GNUNET_free (value);
347 return GNUNET_OK; 344 return GNUNET_OK;
@@ -384,18 +381,14 @@ shutdown_task (void *cls)
384 next = cur->next; 381 next = cur->next;
385 382
386 GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); 383 GNUNET_TRANSPORT_address_to_string_cancel (cur->asc);
387 GNUNET_CONTAINER_DLL_remove (vc_head, 384 GNUNET_CONTAINER_DLL_remove (vc_head, vc_tail, cur);
388 vc_tail,
389 cur);
390 GNUNET_free (cur->transport); 385 GNUNET_free (cur->transport);
391 GNUNET_HELLO_address_free (cur->addrcp); 386 GNUNET_HELLO_address_free (cur->addrcp);
392 GNUNET_free (cur); 387 GNUNET_free (cur);
393 } 388 }
394 while (NULL != (rc = rc_head)) 389 while (NULL != (rc = rc_head))
395 { 390 {
396 GNUNET_CONTAINER_DLL_remove (rc_head, 391 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
397 rc_tail,
398 rc);
399 GNUNET_TRANSPORT_address_to_string_cancel (rc->asc); 392 GNUNET_TRANSPORT_address_to_string_cancel (rc->asc);
400 GNUNET_free (rc->transport); 393 GNUNET_free (rc->transport);
401 GNUNET_free (rc->addrcp); 394 GNUNET_free (rc->addrcp);
@@ -410,35 +403,30 @@ shutdown_task (void *cls)
410 { 403 {
411 duration = GNUNET_TIME_absolute_get_duration (start_time); 404 duration = GNUNET_TIME_absolute_get_duration (start_time);
412 FPRINTF (stdout, 405 FPRINTF (stdout,
413 _("Transmitted %llu bytes/s (%llu bytes in %s)\n"), 406 _ ("Transmitted %llu bytes/s (%llu bytes in %s)\n"),
414 1000LL * 1000LL * traffic_sent / (1 + duration.rel_value_us), 407 1000LL * 1000LL * traffic_sent / (1 + duration.rel_value_us),
415 traffic_sent, 408 traffic_sent,
416 GNUNET_STRINGS_relative_time_to_string (duration, 409 GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES));
417 GNUNET_YES));
418 } 410 }
419 if (benchmark_receive) 411 if (benchmark_receive)
420 { 412 {
421 duration = GNUNET_TIME_absolute_get_duration (start_time); 413 duration = GNUNET_TIME_absolute_get_duration (start_time);
422 FPRINTF (stdout, 414 FPRINTF (stdout,
423 _("Received %llu bytes/s (%llu bytes in %s)\n"), 415 _ ("Received %llu bytes/s (%llu bytes in %s)\n"),
424 1000LL * 1000LL * traffic_received / (1 + duration.rel_value_us), 416 1000LL * 1000LL * traffic_received / (1 + duration.rel_value_us),
425 traffic_received, 417 traffic_received,
426 GNUNET_STRINGS_relative_time_to_string (duration, 418 GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES));
427 GNUNET_YES));
428 } 419 }
429 420
430 if (NULL != monitored_peers) 421 if (NULL != monitored_peers)
431 { 422 {
432 GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, 423 GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, &destroy_it, NULL);
433 &destroy_it,
434 NULL);
435 GNUNET_CONTAINER_multipeermap_destroy (monitored_peers); 424 GNUNET_CONTAINER_multipeermap_destroy (monitored_peers);
436 monitored_peers = NULL; 425 monitored_peers = NULL;
437 } 426 }
438 if (NULL != monitored_plugins) 427 if (NULL != monitored_plugins)
439 { 428 {
440 GNUNET_break (0 == 429 GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (monitored_plugins));
441 GNUNET_CONTAINER_multipeermap_size (monitored_plugins));
442 GNUNET_CONTAINER_multipeermap_destroy (monitored_plugins); 430 GNUNET_CONTAINER_multipeermap_destroy (monitored_plugins);
443 monitored_plugins = NULL; 431 monitored_plugins = NULL;
444 } 432 }
@@ -463,9 +451,7 @@ operation_timeout (void *cls)
463 op_timeout = NULL; 451 op_timeout = NULL;
464 if ((benchmark_send) || (benchmark_receive)) 452 if ((benchmark_send) || (benchmark_receive))
465 { 453 {
466 FPRINTF (stdout, 454 FPRINTF (stdout, _ ("Failed to connect to `%s'\n"), GNUNET_i2s_full (&pid));
467 _("Failed to connect to `%s'\n"),
468 GNUNET_i2s_full (&pid));
469 GNUNET_SCHEDULER_shutdown (); 455 GNUNET_SCHEDULER_shutdown ();
470 ret = 1; 456 ret = 1;
471 return; 457 return;
@@ -477,21 +463,18 @@ operation_timeout (void *cls)
477 { 463 {
478 next = cur->next; 464 next = cur->next;
479 FPRINTF (stdout, 465 FPRINTF (stdout,
480 _("Failed to resolve address for peer `%s'\n"), 466 _ ("Failed to resolve address for peer `%s'\n"),
481 GNUNET_i2s (&cur->addrcp->peer)); 467 GNUNET_i2s (&cur->addrcp->peer));
482 468
483 GNUNET_CONTAINER_DLL_remove(rc_head, 469 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, cur);
484 rc_tail,
485 cur);
486 GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); 470 GNUNET_TRANSPORT_address_to_string_cancel (cur->asc);
487 GNUNET_free (cur->transport); 471 GNUNET_free (cur->transport);
488 GNUNET_free (cur->addrcp); 472 GNUNET_free (cur->addrcp);
489 GNUNET_free (cur); 473 GNUNET_free (cur);
490
491 } 474 }
492 FPRINTF (stdout, 475 FPRINTF (stdout,
493 "%s", 476 "%s",
494 _("Failed to list connections, timeout occurred\n")); 477 _ ("Failed to list connections, timeout occurred\n"));
495 GNUNET_SCHEDULER_shutdown (); 478 GNUNET_SCHEDULER_shutdown ();
496 ret = 1; 479 ret = 1;
497 return; 480 return;
@@ -512,22 +495,15 @@ do_send (void *cls)
512 struct GNUNET_MessageHeader *m; 495 struct GNUNET_MessageHeader *m;
513 struct GNUNET_MQ_Envelope *env; 496 struct GNUNET_MQ_Envelope *env;
514 497
515 env = GNUNET_MQ_msg_extra (m, 498 env = GNUNET_MQ_msg_extra (m, BLOCKSIZE * 1024, GNUNET_MESSAGE_TYPE_DUMMY);
516 BLOCKSIZE * 1024, 499 memset (&m[1], 52, BLOCKSIZE * 1024 - sizeof (struct GNUNET_MessageHeader));
517 GNUNET_MESSAGE_TYPE_DUMMY);
518 memset (&m[1],
519 52,
520 BLOCKSIZE * 1024 - sizeof(struct GNUNET_MessageHeader));
521 traffic_sent += BLOCKSIZE * 1024; 500 traffic_sent += BLOCKSIZE * 1024;
522 GNUNET_MQ_notify_sent (env, 501 GNUNET_MQ_notify_sent (env, &do_send, mq);
523 &do_send,
524 mq);
525 if (verbosity > 0) 502 if (verbosity > 0)
526 FPRINTF (stdout, 503 FPRINTF (stdout,
527 _("Transmitting %u bytes\n"), 504 _ ("Transmitting %u bytes\n"),
528 (unsigned int) BLOCKSIZE * 1024); 505 (unsigned int) BLOCKSIZE * 1024);
529 GNUNET_MQ_send (mq, 506 GNUNET_MQ_send (mq, env);
530 env);
531} 507}
532 508
533 509
@@ -542,11 +518,9 @@ do_send (void *cls)
542static void * 518static void *
543notify_connect (void *cls, 519notify_connect (void *cls,
544 const struct GNUNET_PeerIdentity *peer, 520 const struct GNUNET_PeerIdentity *peer,
545 struct GNUNET_MQ_Handle *mq) 521 struct GNUNET_MQ_Handle *mq)
546{ 522{
547 if (0 != memcmp (&pid, 523 if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
548 peer,
549 sizeof(struct GNUNET_PeerIdentity)))
550 return NULL; 524 return NULL;
551 ret = 0; 525 ret = 0;
552 if (! benchmark_send) 526 if (! benchmark_send)
@@ -557,10 +531,12 @@ notify_connect (void *cls,
557 op_timeout = NULL; 531 op_timeout = NULL;
558 } 532 }
559 if (verbosity > 0) 533 if (verbosity > 0)
560 FPRINTF (stdout, 534 FPRINTF (
561 _("Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"), 535 stdout,
562 GNUNET_i2s (peer), 536 _ (
563 BLOCKSIZE); 537 "Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"),
538 GNUNET_i2s (peer),
539 BLOCKSIZE);
564 start_time = GNUNET_TIME_absolute_get (); 540 start_time = GNUNET_TIME_absolute_get ();
565 do_send (mq); 541 do_send (mq);
566 return mq; 542 return mq;
@@ -578,19 +554,17 @@ notify_connect (void *cls,
578static void 554static void
579notify_disconnect (void *cls, 555notify_disconnect (void *cls,
580 const struct GNUNET_PeerIdentity *peer, 556 const struct GNUNET_PeerIdentity *peer,
581 void *internal_cls) 557 void *internal_cls)
582{ 558{
583 if (0 != memcmp (&pid, 559 if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
584 peer,
585 sizeof(struct GNUNET_PeerIdentity)))
586 return; 560 return;
587 if (NULL == internal_cls) 561 if (NULL == internal_cls)
588 return; /* not about target peer */ 562 return; /* not about target peer */
589 if (! benchmark_send) 563 if (! benchmark_send)
590 return; /* not transmitting */ 564 return; /* not transmitting */
591 FPRINTF (stdout, 565 FPRINTF (stdout,
592 _("Disconnected from peer `%s' while benchmarking\n"), 566 _ ("Disconnected from peer `%s' while benchmarking\n"),
593 GNUNET_i2s (&pid)); 567 GNUNET_i2s (&pid));
594} 568}
595 569
596 570
@@ -606,16 +580,16 @@ notify_disconnect (void *cls,
606static void * 580static void *
607monitor_notify_connect (void *cls, 581monitor_notify_connect (void *cls,
608 const struct GNUNET_PeerIdentity *peer, 582 const struct GNUNET_PeerIdentity *peer,
609 struct GNUNET_MQ_Handle *mq) 583 struct GNUNET_MQ_Handle *mq)
610{ 584{
611 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); 585 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
612 const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); 586 const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now);
613 587
614 monitor_connect_counter++; 588 monitor_connect_counter++;
615 FPRINTF (stdout, 589 FPRINTF (stdout,
616 _("%24s: %-17s %4s (%u connections in total)\n"), 590 _ ("%24s: %-17s %4s (%u connections in total)\n"),
617 now_str, 591 now_str,
618 _("Connected to"), 592 _ ("Connected to"),
619 GNUNET_i2s (peer), 593 GNUNET_i2s (peer),
620 monitor_connect_counter); 594 monitor_connect_counter);
621 return NULL; 595 return NULL;
@@ -633,18 +607,18 @@ monitor_notify_connect (void *cls,
633static void 607static void
634monitor_notify_disconnect (void *cls, 608monitor_notify_disconnect (void *cls,
635 const struct GNUNET_PeerIdentity *peer, 609 const struct GNUNET_PeerIdentity *peer,
636 void *internal_cls) 610 void *internal_cls)
637{ 611{
638 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); 612 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
639 const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); 613 const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now);
640 614
641 GNUNET_assert(monitor_connect_counter > 0); 615 GNUNET_assert (monitor_connect_counter > 0);
642 monitor_connect_counter--; 616 monitor_connect_counter--;
643 617
644 FPRINTF (stdout, 618 FPRINTF (stdout,
645 _("%24s: %-17s %4s (%u connections in total)\n"), 619 _ ("%24s: %-17s %4s (%u connections in total)\n"),
646 now_str, 620 now_str,
647 _("Disconnected from"), 621 _ ("Disconnected from"),
648 GNUNET_i2s (peer), 622 GNUNET_i2s (peer),
649 monitor_connect_counter); 623 monitor_connect_counter);
650} 624}
@@ -658,8 +632,7 @@ monitor_notify_disconnect (void *cls,
658 * @return #GNUNET_OK 632 * @return #GNUNET_OK
659 */ 633 */
660static int 634static int
661check_dummy (void *cls, 635check_dummy (void *cls, const struct GNUNET_MessageHeader *message)
662 const struct GNUNET_MessageHeader *message)
663{ 636{
664 return GNUNET_OK; /* all messages are fine */ 637 return GNUNET_OK; /* all messages are fine */
665} 638}
@@ -672,15 +645,14 @@ check_dummy (void *cls,
672 * @param message the message 645 * @param message the message
673 */ 646 */
674static void 647static void
675handle_dummy (void *cls, 648handle_dummy (void *cls, const struct GNUNET_MessageHeader *message)
676 const struct GNUNET_MessageHeader *message)
677{ 649{
678 if (! benchmark_receive) 650 if (! benchmark_receive)
679 return; 651 return;
680 if (verbosity > 0) 652 if (verbosity > 0)
681 FPRINTF (stdout, 653 FPRINTF (stdout,
682 _("Received %u bytes\n"), 654 _ ("Received %u bytes\n"),
683 (unsigned int) ntohs (message->size)); 655 (unsigned int) ntohs (message->size));
684 if (0 == traffic_received) 656 if (0 == traffic_received)
685 start_time = GNUNET_TIME_absolute_get (); 657 start_time = GNUNET_TIME_absolute_get ();
686 traffic_received += ntohs (message->size); 658 traffic_received += ntohs (message->size);
@@ -711,24 +683,23 @@ print_info (const struct GNUNET_PeerIdentity *id,
711 struct GNUNET_TIME_Absolute state_timeout) 683 struct GNUNET_TIME_Absolute state_timeout)
712{ 684{
713 685
714 if ( ((GNUNET_YES == iterate_connections) && 686 if (((GNUNET_YES == iterate_connections) && (GNUNET_YES == iterate_all)) ||
715 (GNUNET_YES == iterate_all)) || 687 (GNUNET_YES == monitor_connections))
716 (GNUNET_YES == monitor_connections))
717 { 688 {
718 FPRINTF (stdout, 689 FPRINTF (stdout,
719 _("Peer `%s': %s %s in state `%s' until %s\n"), 690 _ ("Peer `%s': %s %s in state `%s' until %s\n"),
720 GNUNET_i2s (id), 691 GNUNET_i2s (id),
721 (NULL == transport) ? "<none>" : transport, 692 (NULL == transport) ? "<none>" : transport,
722 (NULL == transport) ? "<none>" : addr, 693 (NULL == transport) ? "<none>" : addr,
723 GNUNET_TRANSPORT_ps2s (state), 694 GNUNET_TRANSPORT_ps2s (state),
724 GNUNET_STRINGS_absolute_time_to_string (state_timeout)); 695 GNUNET_STRINGS_absolute_time_to_string (state_timeout));
725 } 696 }
726 else if ( (GNUNET_YES == iterate_connections) && 697 else if ((GNUNET_YES == iterate_connections) &&
727 (GNUNET_TRANSPORT_is_connected(state)) ) 698 (GNUNET_TRANSPORT_is_connected (state)))
728 { 699 {
729 /* Only connected peers, skip state */ 700 /* Only connected peers, skip state */
730 FPRINTF (stdout, 701 FPRINTF (stdout,
731 _("Peer `%s': %s %s\n"), 702 _ ("Peer `%s': %s %s\n"),
732 GNUNET_i2s (id), 703 GNUNET_i2s (id),
733 transport, 704 transport,
734 addr); 705 addr);
@@ -753,9 +724,7 @@ print_info (const struct GNUNET_PeerIdentity *id,
753 * if #GNUNET_SYSERR: communication error (IPC error) 724 * if #GNUNET_SYSERR: communication error (IPC error)
754 */ 725 */
755static void 726static void
756process_peer_string (void *cls, 727process_peer_string (void *cls, const char *address, int res)
757 const char *address,
758 int res)
759{ 728{
760 struct PeerResolutionContext *rc = cls; 729 struct PeerResolutionContext *rc = cls;
761 730
@@ -763,11 +732,12 @@ process_peer_string (void *cls,
763 { 732 {
764 if (GNUNET_SYSERR == res) 733 if (GNUNET_SYSERR == res)
765 { 734 {
766 FPRINTF (stderr, 735 FPRINTF (
767 "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", 736 stderr,
768 GNUNET_i2s (&rc->addrcp->peer), 737 "Failed to convert address for peer `%s' plugin `%s' length %u to string \n",
769 rc->addrcp->transport_name, 738 GNUNET_i2s (&rc->addrcp->peer),
770 (unsigned int) rc->addrcp->address_length); 739 rc->addrcp->transport_name,
740 (unsigned int) rc->addrcp->address_length);
771 print_info (&rc->addrcp->peer, 741 print_info (&rc->addrcp->peer,
772 rc->transport, 742 rc->transport,
773 NULL, 743 NULL,
@@ -818,9 +788,7 @@ process_peer_string (void *cls,
818 } 788 }
819 GNUNET_free (rc->transport); 789 GNUNET_free (rc->transport);
820 GNUNET_free (rc->addrcp); 790 GNUNET_free (rc->addrcp);
821 GNUNET_CONTAINER_DLL_remove (rc_head, 791 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
822 rc_tail,
823 rc);
824 GNUNET_free (rc); 792 GNUNET_free (rc);
825 if ((0 == address_resolutions) && (iterate_connections)) 793 if ((0 == address_resolutions) && (iterate_connections))
826 { 794 {
@@ -854,9 +822,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address,
854 struct PeerResolutionContext *rc; 822 struct PeerResolutionContext *rc;
855 823
856 rc = GNUNET_new (struct PeerResolutionContext); 824 rc = GNUNET_new (struct PeerResolutionContext);
857 GNUNET_CONTAINER_DLL_insert (rc_head, 825 GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
858 rc_tail,
859 rc);
860 address_resolutions++; 826 address_resolutions++;
861 rc->transport = GNUNET_strdup (address->transport_name); 827 rc->transport = GNUNET_strdup (address->transport_name);
862 rc->addrcp = GNUNET_HELLO_address_copy (address); 828 rc->addrcp = GNUNET_HELLO_address_copy (address);
@@ -869,7 +835,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address,
869 numeric, 835 numeric,
870 RESOLUTION_TIMEOUT, 836 RESOLUTION_TIMEOUT,
871 &process_peer_string, 837 &process_peer_string,
872 rc); 838 rc);
873} 839}
874 840
875 841
@@ -897,15 +863,14 @@ process_peer_iteration_cb (void *cls,
897 return; 863 return;
898 } 864 }
899 865
900 if ( (GNUNET_NO == iterate_all) && 866 if ((GNUNET_NO == iterate_all) &&
901 (GNUNET_NO == GNUNET_TRANSPORT_is_connected(state))) 867 (GNUNET_NO == GNUNET_TRANSPORT_is_connected (state)))
902 return; /* Display only connected peers */ 868 return; /* Display only connected peers */
903 869
904 if (NULL != op_timeout) 870 if (NULL != op_timeout)
905 GNUNET_SCHEDULER_cancel (op_timeout); 871 GNUNET_SCHEDULER_cancel (op_timeout);
906 op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, 872 op_timeout =
907 &operation_timeout, 873 GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
908 NULL);
909 874
910 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 875 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911 "Received address for peer `%s': %s\n", 876 "Received address for peer `%s': %s\n",
@@ -913,16 +878,9 @@ process_peer_iteration_cb (void *cls,
913 address ? address->transport_name : ""); 878 address ? address->transport_name : "");
914 879
915 if (NULL != address) 880 if (NULL != address)
916 resolve_peer_address (address, 881 resolve_peer_address (address, numeric, state, state_timeout);
917 numeric,
918 state,
919 state_timeout);
920 else 882 else
921 print_info (peer, 883 print_info (peer, NULL, NULL, state, state_timeout);
922 NULL,
923 NULL,
924 state,
925 state_timeout);
926} 884}
927 885
928 886
@@ -958,7 +916,7 @@ struct PluginMonitorAddress
958 */ 916 */
959static void 917static void
960print_plugin_event_info (struct PluginMonitorAddress *addr, 918print_plugin_event_info (struct PluginMonitorAddress *addr,
961 const struct GNUNET_TRANSPORT_SessionInfo *info) 919 const struct GNUNET_TRANSPORT_SessionInfo *info)
962{ 920{
963 const char *state; 921 const char *state;
964 922
@@ -987,20 +945,22 @@ print_plugin_event_info (struct PluginMonitorAddress *addr,
987 "%s: state %s timeout in %s @ %s%s\n", 945 "%s: state %s timeout in %s @ %s%s\n",
988 GNUNET_i2s (&info->address->peer), 946 GNUNET_i2s (&info->address->peer),
989 state, 947 state,
990 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (info->session_timeout), 948 GNUNET_STRINGS_relative_time_to_string (
991 GNUNET_YES), 949 GNUNET_TIME_absolute_get_remaining (info->session_timeout),
992 addr->str, 950 GNUNET_YES),
951 addr->str,
993 (info->is_inbound == GNUNET_YES) ? " (INBOUND)" : ""); 952 (info->is_inbound == GNUNET_YES) ? " (INBOUND)" : "");
994 fprintf (stdout, 953 fprintf (stdout,
995 "%s: queue has %3u messages and %6u bytes\n", 954 "%s: queue has %3u messages and %6u bytes\n",
996 GNUNET_i2s (&info->address->peer), 955 GNUNET_i2s (&info->address->peer),
997 info->num_msg_pending, 956 info->num_msg_pending,
998 info->num_bytes_pending); 957 info->num_bytes_pending);
999 if (0 != GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us) 958 if (0 !=
959 GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us)
1000 fprintf (stdout, 960 fprintf (stdout,
1001 "%s: receiving blocked until %s\n", 961 "%s: receiving blocked until %s\n",
1002 GNUNET_i2s (&info->address->peer), 962 GNUNET_i2s (&info->address->peer),
1003 GNUNET_STRINGS_absolute_time_to_string (info->receive_delay)); 963 GNUNET_STRINGS_absolute_time_to_string (info->receive_delay));
1004} 964}
1005 965
1006 966
@@ -1021,9 +981,7 @@ print_plugin_event_info (struct PluginMonitorAddress *addr,
1021 * if #GNUNET_SYSERR: communication error (IPC error) 981 * if #GNUNET_SYSERR: communication error (IPC error)
1022 */ 982 */
1023static void 983static void
1024address_cb (void *cls, 984address_cb (void *cls, const char *address, int res)
1025 const char *address,
1026 int res)
1027{ 985{
1028 struct PluginMonitorAddress *addr = cls; 986 struct PluginMonitorAddress *addr = cls;
1029 987
@@ -1035,8 +993,7 @@ address_cb (void *cls,
1035 if (NULL != addr->str) 993 if (NULL != addr->str)
1036 return; 994 return;
1037 addr->str = GNUNET_strdup (address); 995 addr->str = GNUNET_strdup (address);
1038 print_plugin_event_info (addr, 996 print_plugin_event_info (addr, &addr->si);
1039 &addr->si);
1040} 997}
1041 998
1042 999
@@ -1065,8 +1022,7 @@ plugin_monitoring_cb (void *cls,
1065{ 1022{
1066 struct PluginMonitorAddress *addr; 1023 struct PluginMonitorAddress *addr;
1067 1024
1068 if ( (NULL == info) && 1025 if ((NULL == info) && (NULL == session))
1069 (NULL == session) )
1070 return; /* in sync with transport service */ 1026 return; /* in sync with transport service */
1071 addr = *session_ctx; 1027 addr = *session_ctx;
1072 if (NULL == info) 1028 if (NULL == info)
@@ -1084,26 +1040,25 @@ plugin_monitoring_cb (void *cls,
1084 } 1040 }
1085 return; /* shutdown */ 1041 return; /* shutdown */
1086 } 1042 }
1087 if (0 != memcmp (&info->address->peer, 1043 if (0 !=
1088 &pid, 1044 memcmp (&info->address->peer, &pid, sizeof (struct GNUNET_PeerIdentity)))
1089 sizeof (struct GNUNET_PeerIdentity)))
1090 return; /* filtered */ 1045 return; /* filtered */
1091 if (NULL == addr) 1046 if (NULL == addr)
1092 { 1047 {
1093 addr = GNUNET_new (struct PluginMonitorAddress); 1048 addr = GNUNET_new (struct PluginMonitorAddress);
1094 addr->asc = GNUNET_TRANSPORT_address_to_string (cfg, 1049 addr->asc =
1095 info->address, 1050 GNUNET_TRANSPORT_address_to_string (cfg,
1096 numeric, 1051 info->address,
1097 GNUNET_TIME_UNIT_FOREVER_REL, 1052 numeric,
1098 &address_cb, 1053 GNUNET_TIME_UNIT_FOREVER_REL,
1099 addr); 1054 &address_cb,
1055 addr);
1100 *session_ctx = addr; 1056 *session_ctx = addr;
1101 } 1057 }
1102 if (NULL == addr->str) 1058 if (NULL == addr->str)
1103 addr->si = *info; 1059 addr->si = *info;
1104 else 1060 else
1105 print_plugin_event_info (addr, 1061 print_plugin_event_info (addr, info);
1106 info);
1107 if (GNUNET_TRANSPORT_SS_DONE == info->state) 1062 if (GNUNET_TRANSPORT_SS_DONE == info->state)
1108 { 1063 {
1109 if (NULL != addr->asc) 1064 if (NULL != addr->asc)
@@ -1141,38 +1096,35 @@ process_peer_monitoring_cb (void *cls,
1141 { 1096 {
1142 FPRINTF (stdout, 1097 FPRINTF (stdout,
1143 "%s", 1098 "%s",
1144 _("Monitor disconnected from transport service. Reconnecting.\n")); 1099 _ (
1100 "Monitor disconnected from transport service. Reconnecting.\n"));
1145 return; 1101 return;
1146 } 1102 }
1147 1103
1148 if (NULL != op_timeout) 1104 if (NULL != op_timeout)
1149 GNUNET_SCHEDULER_cancel (op_timeout); 1105 GNUNET_SCHEDULER_cancel (op_timeout);
1150 op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, 1106 op_timeout =
1151 &operation_timeout, 1107 GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
1152 NULL);
1153 1108
1154 if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, 1109 if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, peer)))
1155 peer)))
1156 { 1110 {
1157 m = GNUNET_new (struct MonitoredPeer); 1111 m = GNUNET_new (struct MonitoredPeer);
1158 GNUNET_CONTAINER_multipeermap_put (monitored_peers, 1112 GNUNET_CONTAINER_multipeermap_put (
1159 peer, 1113 monitored_peers,
1160 m, 1114 peer,
1161 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 1115 m,
1116 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1162 } 1117 }
1163 else 1118 else
1164 { 1119 {
1165 if ( (m->state == state) && 1120 if ((m->state == state) &&
1166 (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && 1121 (m->state_timeout.abs_value_us == state_timeout.abs_value_us) &&
1167 (NULL == address) && 1122 (NULL == address) && (NULL == m->address))
1168 (NULL == m->address) )
1169 { 1123 {
1170 return; /* No real change */ 1124 return; /* No real change */
1171 } 1125 }
1172 if ( (m->state == state) && 1126 if ((m->state == state) && (NULL != address) && (NULL != m->address) &&
1173 (NULL != address) && 1127 (0 == GNUNET_HELLO_address_cmp (m->address, address)))
1174 (NULL != m->address) &&
1175 (0 == GNUNET_HELLO_address_cmp(m->address, address)) )
1176 return; /* No real change */ 1128 return; /* No real change */
1177 } 1129 }
1178 1130
@@ -1187,16 +1139,9 @@ process_peer_monitoring_cb (void *cls,
1187 m->state_timeout = state_timeout; 1139 m->state_timeout = state_timeout;
1188 1140
1189 if (NULL != address) 1141 if (NULL != address)
1190 resolve_peer_address (m->address, 1142 resolve_peer_address (m->address, numeric, m->state, m->state_timeout);
1191 numeric,
1192 m->state,
1193 m->state_timeout);
1194 else 1143 else
1195 print_info (peer, 1144 print_info (peer, NULL, NULL, m->state, m->state_timeout);
1196 NULL,
1197 NULL,
1198 m->state,
1199 m->state_timeout);
1200} 1145}
1201 1146
1202 1147
@@ -1210,12 +1155,9 @@ process_peer_monitoring_cb (void *cls,
1210 * @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not 1155 * @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not
1211 */ 1156 */
1212static int 1157static int
1213blacklist_cb (void *cls, 1158blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *cpid)
1214 const struct GNUNET_PeerIdentity *cpid)
1215{ 1159{
1216 if (0 == memcmp (cpid, 1160 if (0 == memcmp (cpid, &pid, sizeof (struct GNUNET_PeerIdentity)))
1217 &pid,
1218 sizeof (struct GNUNET_PeerIdentity)))
1219 return GNUNET_SYSERR; 1161 return GNUNET_SYSERR;
1220 return GNUNET_OK; 1162 return GNUNET_OK;
1221} 1163}
@@ -1231,7 +1173,7 @@ blacklist_cb (void *cls,
1231 */ 1173 */
1232static void 1174static void
1233run (void *cls, 1175run (void *cls,
1234 char * const *args, 1176 char *const *args,
1235 const char *cfgfile, 1177 const char *cfgfile,
1236 const struct GNUNET_CONFIGURATION_Handle *mycfg) 1178 const struct GNUNET_CONFIGURATION_Handle *mycfg)
1237{ 1179{
@@ -1241,127 +1183,119 @@ run (void *cls,
1241 1183
1242 cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; 1184 cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg;
1243 1185
1244 counter = benchmark_send + benchmark_receive + iterate_connections 1186 counter = benchmark_send + benchmark_receive + iterate_connections +
1245 + monitor_connections + monitor_connects + do_disconnect + 1187 monitor_connections + monitor_connects + do_disconnect +
1246 monitor_plugins; 1188 monitor_plugins;
1247 1189
1248 if (1 < counter) 1190 if (1 < counter)
1249 { 1191 {
1250 FPRINTF (stderr, 1192 FPRINTF (
1251 _("Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"), 1193 stderr,
1252 "disconnect", 1194 _ (
1253 "benchmark send", 1195 "Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"),
1254 "benchmark receive", 1196 "disconnect",
1255 "information", 1197 "benchmark send",
1256 "monitor", 1198 "benchmark receive",
1257 "events", 1199 "information",
1258 "plugins"); 1200 "monitor",
1201 "events",
1202 "plugins");
1259 return; 1203 return;
1260 } 1204 }
1261 if (0 == counter) 1205 if (0 == counter)
1262 { 1206 {
1263 FPRINTF (stderr, 1207 FPRINTF (
1264 _("No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"), 1208 stderr,
1265 "disconnect", 1209 _ (
1266 "benchmark send", 1210 "No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"),
1267 "benchmark receive", 1211 "disconnect",
1268 "information", 1212 "benchmark send",
1269 "monitor", 1213 "benchmark receive",
1270 "events", 1214 "information",
1271 "plugins"); 1215 "monitor",
1216 "events",
1217 "plugins");
1272 return; 1218 return;
1273 } 1219 }
1274 1220
1275 if (do_disconnect) /* -D: Disconnect from peer */ 1221 if (do_disconnect) /* -D: Disconnect from peer */
1276 { 1222 {
1277 if (0 == memcmp (&zero_pid, 1223 if (0 == memcmp (&zero_pid, &pid, sizeof (pid)))
1278 &pid,
1279 sizeof (pid)))
1280 { 1224 {
1281 FPRINTF (stderr, 1225 FPRINTF (stderr,
1282 _("Option `%s' makes no sense without option `%s'.\n"), 1226 _ ("Option `%s' makes no sense without option `%s'.\n"),
1283 "-D", "-p"); 1227 "-D",
1228 "-p");
1284 ret = 1; 1229 ret = 1;
1285 return; 1230 return;
1286 } 1231 }
1287 blacklist = GNUNET_TRANSPORT_blacklist (cfg, 1232 blacklist = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL);
1288 &blacklist_cb,
1289 NULL);
1290 if (NULL == blacklist) 1233 if (NULL == blacklist)
1291 { 1234 {
1292 FPRINTF (stderr, 1235 FPRINTF (stderr,
1293 "%s", 1236 "%s",
1294 _("Failed to connect to transport service for disconnection\n")); 1237 _ (
1238 "Failed to connect to transport service for disconnection\n"));
1295 ret = 1; 1239 ret = 1;
1296 return; 1240 return;
1297 } 1241 }
1298 FPRINTF (stdout, 1242 FPRINTF (stdout,
1299 "%s", 1243 "%s",
1300 _("Blacklisting request in place, stop with CTRL-C\n")); 1244 _ ("Blacklisting request in place, stop with CTRL-C\n"));
1301 } 1245 }
1302 else if (benchmark_send) /* -s: Benchmark sending */ 1246 else if (benchmark_send) /* -s: Benchmark sending */
1303 { 1247 {
1304 if (0 == memcmp (&zero_pid, 1248 if (0 == memcmp (&zero_pid, &pid, sizeof (pid)))
1305 &pid,
1306 sizeof (pid)))
1307 { 1249 {
1308 FPRINTF (stderr, 1250 FPRINTF (stderr,
1309 _("Option `%s' makes no sense without option `%s'.\n"), 1251 _ ("Option `%s' makes no sense without option `%s'.\n"),
1310 "-s", "-p"); 1252 "-s",
1253 "-p");
1311 ret = 1; 1254 ret = 1;
1312 return; 1255 return;
1313 } 1256 }
1314 handle = GNUNET_TRANSPORT_core_connect (cfg, 1257 handle = GNUNET_TRANSPORT_core_connect (cfg,
1315 NULL, 1258 NULL,
1316 NULL, 1259 NULL,
1317 NULL, 1260 NULL,
1318 &notify_connect, 1261 &notify_connect,
1319 &notify_disconnect, 1262 &notify_disconnect,
1320 NULL); 1263 NULL);
1321 if (NULL == handle) 1264 if (NULL == handle)
1322 { 1265 {
1323 FPRINTF (stderr, 1266 FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
1324 "%s",
1325 _("Failed to connect to transport service\n"));
1326 ret = 1; 1267 ret = 1;
1327 return; 1268 return;
1328 } 1269 }
1329 start_time = GNUNET_TIME_absolute_get (); 1270 start_time = GNUNET_TIME_absolute_get ();
1330 op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, 1271 op_timeout =
1331 &operation_timeout, 1272 GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
1332 NULL);
1333 } 1273 }
1334 else if (benchmark_receive) /* -b: Benchmark receiving */ 1274 else if (benchmark_receive) /* -b: Benchmark receiving */
1335 { 1275 {
1336 struct GNUNET_MQ_MessageHandler handlers[] = { 1276 struct GNUNET_MQ_MessageHandler handlers[] =
1337 GNUNET_MQ_hd_var_size (dummy, 1277 {GNUNET_MQ_hd_var_size (dummy,
1338 GNUNET_MESSAGE_TYPE_DUMMY, 1278 GNUNET_MESSAGE_TYPE_DUMMY,
1339 struct GNUNET_MessageHeader, 1279 struct GNUNET_MessageHeader,
1340 NULL), 1280 NULL),
1341 GNUNET_MQ_handler_end () 1281 GNUNET_MQ_handler_end ()};
1342 };
1343 1282
1344 handle = GNUNET_TRANSPORT_core_connect (cfg, 1283 handle = GNUNET_TRANSPORT_core_connect (cfg,
1345 NULL, 1284 NULL,
1346 handlers, 1285 handlers,
1347 NULL, 1286 NULL,
1348 NULL, 1287 NULL,
1349 NULL, 1288 NULL,
1350 NULL); 1289 NULL);
1351 if (NULL == handle) 1290 if (NULL == handle)
1352 { 1291 {
1353 FPRINTF (stderr, 1292 FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
1354 "%s",
1355 _("Failed to connect to transport service\n"));
1356 ret = 1; 1293 ret = 1;
1357 return; 1294 return;
1358 } 1295 }
1359 if (verbosity > 0) 1296 if (verbosity > 0)
1360 FPRINTF (stdout, 1297 FPRINTF (stdout, "%s", _ ("Starting to receive benchmark data\n"));
1361 "%s",
1362 _("Starting to receive benchmark data\n"));
1363 start_time = GNUNET_TIME_absolute_get (); 1298 start_time = GNUNET_TIME_absolute_get ();
1364
1365 } 1299 }
1366 else if (iterate_connections) /* -i: List information about peers once */ 1300 else if (iterate_connections) /* -i: List information about peers once */
1367 { 1301 {
@@ -1370,42 +1304,38 @@ run (void *cls,
1370 GNUNET_YES, 1304 GNUNET_YES,
1371 &process_peer_iteration_cb, 1305 &process_peer_iteration_cb,
1372 (void *) cfg); 1306 (void *) cfg);
1373 op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, 1307 op_timeout =
1374 &operation_timeout, 1308 GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
1375 NULL);
1376 } 1309 }
1377 else if (monitor_connections) /* -m: List information about peers continuously */ 1310 else if (monitor_connections) /* -m: List information about peers continuously
1311 */
1378 { 1312 {
1379 monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, 1313 monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1380 GNUNET_NO);
1381 pic = GNUNET_TRANSPORT_monitor_peers (cfg, 1314 pic = GNUNET_TRANSPORT_monitor_peers (cfg,
1382 &pid, 1315 &pid,
1383 GNUNET_NO, 1316 GNUNET_NO,
1384 &process_peer_monitoring_cb, 1317 &process_peer_monitoring_cb,
1385 NULL); 1318 NULL);
1386 } 1319 }
1387 else if (monitor_plugins) /* -P: List information about plugins continuously */ 1320 else if (monitor_plugins) /* -P: List information about plugins continuously
1321 */
1388 { 1322 {
1389 monitored_plugins = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); 1323 monitored_plugins = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1390 pm = GNUNET_TRANSPORT_monitor_plugins (cfg, 1324 pm = GNUNET_TRANSPORT_monitor_plugins (cfg, &plugin_monitoring_cb, NULL);
1391 &plugin_monitoring_cb,
1392 NULL);
1393 } 1325 }
1394 else if (monitor_connects) /* -e : Monitor (dis)connect events continuously */ 1326 else if (monitor_connects) /* -e : Monitor (dis)connect events continuously */
1395 { 1327 {
1396 monitor_connect_counter = 0; 1328 monitor_connect_counter = 0;
1397 handle = GNUNET_TRANSPORT_core_connect (cfg, 1329 handle = GNUNET_TRANSPORT_core_connect (cfg,
1398 NULL, 1330 NULL,
1399 NULL, 1331 NULL,
1400 NULL, 1332 NULL,
1401 &monitor_notify_connect, 1333 &monitor_notify_connect,
1402 &monitor_notify_disconnect, 1334 &monitor_notify_disconnect,
1403 NULL); 1335 NULL);
1404 if (NULL == handle) 1336 if (NULL == handle)
1405 { 1337 {
1406 FPRINTF (stderr, 1338 FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
1407 "%s",
1408 _("Failed to connect to transport service\n"));
1409 ret = 1; 1339 ret = 1;
1410 return; 1340 return;
1411 } 1341 }
@@ -1413,75 +1343,86 @@ run (void *cls,
1413 } 1343 }
1414 else 1344 else
1415 { 1345 {
1416 GNUNET_break(0); 1346 GNUNET_break (0);
1417 return; 1347 return;
1418 } 1348 }
1419 1349
1420 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1350 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1421 NULL);
1422} 1351}
1423 1352
1424 1353
1425int 1354int
1426main (int argc, 1355main (int argc, char *const *argv)
1427 char * const *argv)
1428{ 1356{
1429 int res; 1357 int res;
1430 struct GNUNET_GETOPT_CommandLineOption options[] = { 1358 struct GNUNET_GETOPT_CommandLineOption options[] =
1431 GNUNET_GETOPT_option_flag ('a', 1359 {GNUNET_GETOPT_option_flag (
1432 "all", 1360 'a',
1433 gettext_noop ("print information for all peers (instead of only connected peers)"), 1361 "all",
1434 &iterate_all), 1362 gettext_noop (
1435 GNUNET_GETOPT_option_flag ('b', 1363 "print information for all peers (instead of only connected peers)"),
1436 "benchmark", 1364 &iterate_all),
1437 gettext_noop ("measure how fast we are receiving data from all peers (until CTRL-C)"), 1365 GNUNET_GETOPT_option_flag (
1438 &benchmark_receive), 1366 'b',
1439 GNUNET_GETOPT_option_flag ('D', 1367 "benchmark",
1440 "disconnect", 1368 gettext_noop (
1441 gettext_noop ("disconnect from a peer"), 1369 "measure how fast we are receiving data from all peers (until CTRL-C)"),
1442 &do_disconnect), 1370 &benchmark_receive),
1443 GNUNET_GETOPT_option_flag ('i', 1371 GNUNET_GETOPT_option_flag ('D',
1444 "information", 1372 "disconnect",
1445 gettext_noop ("provide information about all current connections (once)"), 1373 gettext_noop ("disconnect from a peer"),
1446 &iterate_connections), 1374 &do_disconnect),
1447 GNUNET_GETOPT_option_flag ('m', 1375 GNUNET_GETOPT_option_flag (
1448 "monitor", 1376 'i',
1449 gettext_noop ("provide information about all current connections (continuously)"), 1377 "information",
1450 &monitor_connections), 1378 gettext_noop (
1451 GNUNET_GETOPT_option_flag ('e', 1379 "provide information about all current connections (once)"),
1452 "events", 1380 &iterate_connections),
1453 gettext_noop ("provide information about all connects and disconnect events (continuously)"), 1381 GNUNET_GETOPT_option_flag (
1454 &monitor_connects), 1382 'm',
1455 GNUNET_GETOPT_option_flag ('n', 1383 "monitor",
1456 "numeric", 1384 gettext_noop (
1457 gettext_noop ("do not resolve hostnames"), 1385 "provide information about all current connections (continuously)"),
1458 &numeric), 1386 &monitor_connections),
1459 GNUNET_GETOPT_option_base32_auto ('p', 1387 GNUNET_GETOPT_option_flag (
1460 "peer", 1388 'e',
1461 "PEER", 1389 "events",
1462 gettext_noop ("peer identity"), 1390 gettext_noop (
1463 &pid), 1391 "provide information about all connects and disconnect events (continuously)"),
1464 GNUNET_GETOPT_option_flag ('P', 1392 &monitor_connects),
1465 "plugins", 1393 GNUNET_GETOPT_option_flag ('n',
1466 gettext_noop ("monitor plugin sessions"), 1394 "numeric",
1467 &monitor_plugins), 1395 gettext_noop ("do not resolve hostnames"),
1468 GNUNET_GETOPT_option_flag ('s', 1396 &numeric),
1469 "send", 1397 GNUNET_GETOPT_option_base32_auto ('p',
1470 gettext_noop 1398 "peer",
1471 ("send data for benchmarking to the other peer (until CTRL-C)"), 1399 "PEER",
1472 &benchmark_send), 1400 gettext_noop ("peer identity"),
1473 GNUNET_GETOPT_option_verbose (&verbosity), 1401 &pid),
1474 GNUNET_GETOPT_OPTION_END 1402 GNUNET_GETOPT_option_flag ('P',
1475 }; 1403 "plugins",
1404 gettext_noop ("monitor plugin sessions"),
1405 &monitor_plugins),
1406 GNUNET_GETOPT_option_flag (
1407 's',
1408 "send",
1409 gettext_noop (
1410 "send data for benchmarking to the other peer (until CTRL-C)"),
1411 &benchmark_send),
1412 GNUNET_GETOPT_option_verbose (&verbosity),
1413 GNUNET_GETOPT_OPTION_END};
1476 1414
1477 if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) 1415 if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1478 return 2; 1416 return 2;
1479 1417
1480 res = GNUNET_PROGRAM_run (argc, argv, 1418 res =
1481 "gnunet-transport", 1419 GNUNET_PROGRAM_run (argc,
1482 gettext_noop ("Direct access to transport service."), 1420 argv,
1483 options, 1421 "gnunet-transport",
1484 &run, NULL); 1422 gettext_noop ("Direct access to transport service."),
1423 options,
1424 &run,
1425 NULL);
1485 GNUNET_free ((void *) argv); 1426 GNUNET_free ((void *) argv);
1486 if (GNUNET_OK == res) 1427 if (GNUNET_OK == res)
1487 return ret; 1428 return ret;
diff --git a/src/transport/transport-testing.h b/src/transport/transport-testing.h
index 4629d6125..83bbf277b 100644
--- a/src/transport/transport-testing.h
+++ b/src/transport/transport-testing.h
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
@@ -30,7 +30,6 @@
30#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
31#include "gnunet_hello_lib.h" 31#include "gnunet_hello_lib.h"
32#include "gnunet_transport_service.h" 32#include "gnunet_transport_service.h"
33#include "gnunet_transport_core_service.h"
34#include "gnunet_transport_hello_service.h" 33#include "gnunet_transport_hello_service.h"
35#include "gnunet_transport_manipulation_service.h" 34#include "gnunet_transport_manipulation_service.h"
36#include "gnunet_testing_lib.h" 35#include "gnunet_testing_lib.h"
@@ -143,7 +142,7 @@ struct GNUNET_TRANSPORT_TESTING_PeerContext
143 * Closure for @e start_cb. 142 * Closure for @e start_cb.
144 */ 143 */
145 void *start_cb_cls; 144 void *start_cb_cls;
146 145
147 /** 146 /**
148 * An unique number to identify the peer 147 * An unique number to identify the peer
149 */ 148 */
@@ -207,12 +206,12 @@ struct GNUNET_TRANSPORT_TESTING_ConnectRequest
207 */ 206 */
208 struct GNUNET_MQ_Handle *mq; 207 struct GNUNET_MQ_Handle *mq;
209 208
210 /** 209 /**
211 * Set if peer1 says the connection is up to peer2. 210 * Set if peer1 says the connection is up to peer2.
212 */ 211 */
213 int p1_c; 212 int p1_c;
214 213
215 /** 214 /**
216 * Set if peer2 says the connection is up to peer1. 215 * Set if peer2 says the connection is up to peer1.
217 */ 216 */
218 int p2_c; 217 int p2_c;
@@ -289,15 +288,16 @@ GNUNET_TRANSPORT_TESTING_done (struct GNUNET_TRANSPORT_TESTING_Handle *tth);
289 * @return the peer context 288 * @return the peer context
290 */ 289 */
291struct GNUNET_TRANSPORT_TESTING_PeerContext * 290struct GNUNET_TRANSPORT_TESTING_PeerContext *
292GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth, 291GNUNET_TRANSPORT_TESTING_start_peer (
293 const char *cfgname, 292 struct GNUNET_TRANSPORT_TESTING_Handle *tth,
294 int peer_id, 293 const char *cfgname,
295 const struct GNUNET_MQ_MessageHandler *handlers, 294 int peer_id,
296 GNUNET_TRANSPORT_NotifyConnect nc, 295 const struct GNUNET_MQ_MessageHandler *handlers,
297 GNUNET_TRANSPORT_NotifyDisconnect nd, 296 GNUNET_TRANSPORT_NotifyConnect nc,
298 void *cb_cls, 297 GNUNET_TRANSPORT_NotifyDisconnect nd,
299 GNUNET_SCHEDULER_TaskCallback start_cb, 298 void *cb_cls,
300 void *start_cb_cls); 299 GNUNET_SCHEDULER_TaskCallback start_cb,
300 void *start_cb_cls);
301 301
302 302
303/** 303/**
@@ -306,7 +306,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth
306 * @param p the peer 306 * @param p the peer
307 */ 307 */
308void 308void
309GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *pc); 309GNUNET_TRANSPORT_TESTING_stop_peer (
310 struct GNUNET_TRANSPORT_TESTING_PeerContext *pc);
310 311
311 312
312/** 313/**
@@ -318,10 +319,10 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext
318 * @return #GNUNET_OK in success otherwise #GNUNET_SYSERR 319 * @return #GNUNET_OK in success otherwise #GNUNET_SYSERR
319 */ 320 */
320int 321int
321GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *p, 322GNUNET_TRANSPORT_TESTING_restart_peer (
322 GNUNET_SCHEDULER_TaskCallback restart_cb, 323 struct GNUNET_TRANSPORT_TESTING_PeerContext *p,
323 void *restart_cb_cls); 324 GNUNET_SCHEDULER_TaskCallback restart_cb,
324 325 void *restart_cb_cls);
325 326
326 327
327/** 328/**
@@ -331,15 +332,17 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerConte
331 * 332 *
332 * @param p1 peer 1 333 * @param p1 peer 1
333 * @param p2 peer 2 334 * @param p2 peer 2
334 * @param cb the callback to call when both peers notified that they are connected 335 * @param cb the callback to call when both peers notified that they are
336 * connected
335 * @param cls callback cls 337 * @param cls callback cls
336 * @return a connect request handle 338 * @return a connect request handle
337 */ 339 */
338struct GNUNET_TRANSPORT_TESTING_ConnectRequest * 340struct GNUNET_TRANSPORT_TESTING_ConnectRequest *
339GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, 341GNUNET_TRANSPORT_TESTING_connect_peers (
340 struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, 342 struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
341 GNUNET_SCHEDULER_TaskCallback cb, 343 struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
342 void *cls); 344 GNUNET_SCHEDULER_TaskCallback cb,
345 void *cls);
343 346
344 347
345/** 348/**
@@ -350,7 +353,8 @@ GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerCont
350 * @param cc a connect request handle 353 * @param cc a connect request handle
351 */ 354 */
352void 355void
353GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); 356GNUNET_TRANSPORT_TESTING_connect_peers_cancel (
357 struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
354 358
355 359
356/** 360/**
@@ -359,9 +363,9 @@ GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_C
359 * @param cls closure 363 * @param cls closure
360 * @param cc request matching the query 364 * @param cc request matching the query
361 */ 365 */
362typedef void 366typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContextCallback) (
363(*GNUNET_TRANSPORT_TESTING_ConnectContextCallback)(void *cls, 367 void *cls,
364 struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); 368 struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
365 369
366 370
367/** 371/**
@@ -369,14 +373,15 @@ typedef void
369 * 373 *
370 * @param p1 first peer 374 * @param p1 first peer
371 * @param p2 second peer 375 * @param p2 second peer
372 * @param cb function to call 376 * @param cb function to call
373 * @param cb_cls closure for @a cb 377 * @param cb_cls closure for @a cb
374 */ 378 */
375void 379void
376GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, 380GNUNET_TRANSPORT_TESTING_find_connecting_context (
377 struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, 381 struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
378 GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb, 382 struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
379 void *cb_cls); 383 GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb,
384 void *cb_cls);
380 385
381 386
382/* ********************** high-level process functions *************** */ 387/* ********************** high-level process functions *************** */
@@ -390,10 +395,10 @@ GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTIN
390 * @param num_peers size of the @a p array 395 * @param num_peers size of the @a p array
391 * @param p the peers that were launched 396 * @param p the peers that were launched
392 */ 397 */
393typedef void 398typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContinuation) (
394(*GNUNET_TRANSPORT_TESTING_ConnectContinuation)(void *cls, 399 void *cls,
395 unsigned int num_peers, 400 unsigned int num_peers,
396 struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]); 401 struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]);
397 402
398 403
399/** 404/**
@@ -423,7 +428,6 @@ struct GNUNET_TRANSPORT_TESTING_TestMessage
423GNUNET_NETWORK_STRUCT_END 428GNUNET_NETWORK_STRUCT_END
424 429
425 430
426
427/** 431/**
428 * Function called by the transport for each received message. 432 * Function called by the transport for each received message.
429 * 433 *
@@ -432,11 +436,11 @@ GNUNET_NETWORK_STRUCT_END
432 * @param sender sender of the message 436 * @param sender sender of the message
433 * @param message the message 437 * @param message the message
434 */ 438 */
435typedef void 439typedef void (*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (
436(*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (void *cls, 440 void *cls,
437 struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, 441 struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
438 const struct GNUNET_PeerIdentity *sender, 442 const struct GNUNET_PeerIdentity *sender,
439 const struct GNUNET_TRANSPORT_TESTING_TestMessage *message); 443 const struct GNUNET_TRANSPORT_TESTING_TestMessage *message);
440 444
441 445
442/** 446/**
@@ -447,10 +451,10 @@ typedef void
447 * @param me peer experiencing the event 451 * @param me peer experiencing the event
448 * @param other peer that connected to @a me 452 * @param other peer that connected to @a me
449 */ 453 */
450typedef void 454typedef void (*GNUNET_TRANSPORT_TESTING_NotifyConnect) (
451(*GNUNET_TRANSPORT_TESTING_NotifyConnect) (void *cls, 455 void *cls,
452 struct GNUNET_TRANSPORT_TESTING_PeerContext *me, 456 struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
453 const struct GNUNET_PeerIdentity *other); 457 const struct GNUNET_PeerIdentity *other);
454 458
455 459
456/** 460/**
@@ -461,10 +465,10 @@ typedef void
461 * @param me peer experiencing the event 465 * @param me peer experiencing the event
462 * @param other peer that disconnected from @a me 466 * @param other peer that disconnected from @a me
463 */ 467 */
464typedef void 468typedef void (*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (
465(*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (void *cls, 469 void *cls,
466 struct GNUNET_TRANSPORT_TESTING_PeerContext *me, 470 struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
467 const struct GNUNET_PeerIdentity *other); 471 const struct GNUNET_PeerIdentity *other);
468 472
469 473
470/** 474/**
@@ -593,7 +597,7 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
593 * message. 597 * message.
594 */ 598 */
595 uint32_t send_num_gen; 599 uint32_t send_num_gen;
596 600
597 /* ******* internal state, clients should not mess with this **** */ 601 /* ******* internal state, clients should not mess with this **** */
598 602
599 /** 603 /**
@@ -625,7 +629,6 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
625 * Array with @e num_peers entries. 629 * Array with @e num_peers entries.
626 */ 630 */
627 struct GNUNET_TRANSPORT_TESTING_InternalPeerContext *ip; 631 struct GNUNET_TRANSPORT_TESTING_InternalPeerContext *ip;
628
629}; 632};
630 633
631 634
@@ -637,8 +640,9 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
637 * @return NULL if @a peer was not found 640 * @return NULL if @a peer was not found
638 */ 641 */
639struct GNUNET_TRANSPORT_TESTING_PeerContext * 642struct GNUNET_TRANSPORT_TESTING_PeerContext *
640GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc, 643GNUNET_TRANSPORT_TESTING_find_peer (
641 const struct GNUNET_PeerIdentity *peer); 644 struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc,
645 const struct GNUNET_PeerIdentity *peer);
642 646
643 647
644/** 648/**
@@ -648,7 +652,8 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck
648 * abort the test, and a shutdown handler to clean up properly 652 * abort the test, and a shutdown handler to clean up properly
649 * on exit. 653 * on exit.
650 * 654 *
651 * @param cls closure of type `struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext` 655 * @param cls closure of type `struct
656 * GNUNET_TRANSPORT_TESTING_ConnectCheckContext`
652 * @param tth_ initialized testing handle 657 * @param tth_ initialized testing handle
653 * @param test_plugin_ name of the plugin 658 * @param test_plugin_ name of the plugin
654 * @param test_name_ name of the test 659 * @param test_name_ name of the test
@@ -657,12 +662,13 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck
657 * @return #GNUNET_SYSERR on error 662 * @return #GNUNET_SYSERR on error
658 */ 663 */
659int 664int
660GNUNET_TRANSPORT_TESTING_connect_check (void *cls, 665GNUNET_TRANSPORT_TESTING_connect_check (
661 struct GNUNET_TRANSPORT_TESTING_Handle *tth_, 666 void *cls,
662 const char *test_plugin_, 667 struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
663 const char *test_name_, 668 const char *test_plugin_,
664 unsigned int num_peers, 669 const char *test_name_,
665 char *cfg_files[]); 670 unsigned int num_peers,
671 char *cfg_files[]);
666 672
667 673
668/** 674/**
@@ -677,13 +683,13 @@ GNUNET_TRANSPORT_TESTING_connect_check (void *cls,
677 * @param cfg_files array of names of configuration files for the peers 683 * @param cfg_files array of names of configuration files for the peers
678 * @return #GNUNET_SYSERR on error 684 * @return #GNUNET_SYSERR on error
679 */ 685 */
680typedef int 686typedef int (*GNUNET_TRANSPORT_TESTING_CheckCallback) (
681(*GNUNET_TRANSPORT_TESTING_CheckCallback)(void *cls, 687 void *cls,
682 struct GNUNET_TRANSPORT_TESTING_Handle *tth_, 688 struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
683 const char *test_plugin_, 689 const char *test_plugin_,
684 const char *test_name_, 690 const char *test_name_,
685 unsigned int num_peers, 691 unsigned int num_peers,
686 char *cfg_files[]); 692 char *cfg_files[]);
687 693
688 694
689/** 695/**
@@ -712,8 +718,12 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
712 * @param check_cls closure for @a check 718 * @param check_cls closure for @a check
713 * @return #GNUNET_OK on success 719 * @return #GNUNET_OK on success
714 */ 720 */
715#define GNUNET_TRANSPORT_TESTING_main(num_peers,check,check_cls) \ 721#define GNUNET_TRANSPORT_TESTING_main(num_peers, check, check_cls) \
716 GNUNET_TRANSPORT_TESTING_main_ (argv[0], __FILE__, num_peers, check, check_cls) 722 GNUNET_TRANSPORT_TESTING_main_ (argv[0], \
723 __FILE__, \
724 num_peers, \
725 check, \
726 check_cls)
717 727
718/* ***************** Convenience functions for sending ********* */ 728/* ***************** Convenience functions for sending ********* */
719 729
@@ -725,7 +735,8 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
725 * @param sender the sending peer 735 * @param sender the sending peer
726 * @param receiver the receiving peer 736 * @param receiver the receiving peer
727 * @param mtype message type to use 737 * @param mtype message type to use
728 * @param msize size of the message, at least `sizeof (struct GNUNET_TRANSPORT_TESTING_TestMessage)` 738 * @param msize size of the message, at least `sizeof (struct
739 * GNUNET_TRANSPORT_TESTING_TestMessage)`
729 * @param num unique message number 740 * @param num unique message number
730 * @param cont continuation to call after transmission 741 * @param cont continuation to call after transmission
731 * @param cont_cls closure for @a cont 742 * @param cont_cls closure for @a cont
@@ -734,13 +745,14 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
734 * #GNUNET_SYSERR if @a msize is illegal 745 * #GNUNET_SYSERR if @a msize is illegal
735 */ 746 */
736int 747int
737GNUNET_TRANSPORT_TESTING_send (struct GNUNET_TRANSPORT_TESTING_PeerContext *sender, 748GNUNET_TRANSPORT_TESTING_send (
738 struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, 749 struct GNUNET_TRANSPORT_TESTING_PeerContext *sender,
739 uint16_t mtype, 750 struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
740 uint16_t msize, 751 uint16_t mtype,
741 uint32_t num, 752 uint16_t msize,
742 GNUNET_SCHEDULER_TaskCallback cont, 753 uint32_t num,
743 void *cont_cls); 754 GNUNET_SCHEDULER_TaskCallback cont,
755 void *cont_cls);
744 756
745 757
746/** 758/**
@@ -771,14 +783,14 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure
771 * the message size, can be NULL in which case the message 783 * the message size, can be NULL in which case the message
772 * size is the default. 784 * size is the default.
773 */ 785 */
774 size_t (*get_size_cb)(unsigned int n); 786 size_t (*get_size_cb) (unsigned int n);
775 787
776 /** 788 /**
777 * Number of messages to be transmitted in a loop. 789 * Number of messages to be transmitted in a loop.
778 * Use zero for "forever" (until external shutdown). 790 * Use zero for "forever" (until external shutdown).
779 */ 791 */
780 unsigned int num_messages; 792 unsigned int num_messages;
781 793
782 /** 794 /**
783 * Function to call after all transmissions, can be NULL. 795 * Function to call after all transmissions, can be NULL.
784 */ 796 */
@@ -788,12 +800,11 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure
788 * Closure for @e cont. 800 * Closure for @e cont.
789 */ 801 */
790 void *cont_cls; 802 void *cont_cls;
791
792}; 803};
793 804
794 805
795/** 806/**
796 * Task that sends a minimalistic test message from the 807 * Task that sends a minimalistic test message from the
797 * first peer to the second peer. 808 * first peer to the second peer.
798 * 809 *
799 * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` 810 * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure`
@@ -804,14 +815,14 @@ void
804GNUNET_TRANSPORT_TESTING_simple_send (void *cls); 815GNUNET_TRANSPORT_TESTING_simple_send (void *cls);
805 816
806/** 817/**
807 * Size of a message sent with 818 * Size of a message sent with
808 * #GNUNET_TRANSPORT_TESTING_large_send(). Big enough 819 * #GNUNET_TRANSPORT_TESTING_large_send(). Big enough
809 * to usually force defragmentation. 820 * to usually force defragmentation.
810 */ 821 */
811#define GNUNET_TRANSPORT_TESTING_LARGE_MESSAGE_SIZE 2600 822#define GNUNET_TRANSPORT_TESTING_LARGE_MESSAGE_SIZE 2600
812 823
813/** 824/**
814 * Task that sends a large test message from the 825 * Task that sends a large test message from the
815 * first peer to the second peer. 826 * first peer to the second peer.
816 * 827 *
817 * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` 828 * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure`
@@ -833,9 +844,10 @@ GNUNET_TRANSPORT_TESTING_large_send (void *cls);
833 * @param other peer that connected. 844 * @param other peer that connected.
834 */ 845 */
835void 846void
836GNUNET_TRANSPORT_TESTING_log_connect (void *cls, 847GNUNET_TRANSPORT_TESTING_log_connect (
837 struct GNUNET_TRANSPORT_TESTING_PeerContext *me, 848 void *cls,
838 const struct GNUNET_PeerIdentity *other); 849 struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
850 const struct GNUNET_PeerIdentity *other);
839 851
840 852
841/** 853/**
@@ -846,10 +858,10 @@ GNUNET_TRANSPORT_TESTING_log_connect (void *cls,
846 * @param other peer that disconnected. 858 * @param other peer that disconnected.
847 */ 859 */
848void 860void
849GNUNET_TRANSPORT_TESTING_log_disconnect (void *cls, 861GNUNET_TRANSPORT_TESTING_log_disconnect (
850 struct GNUNET_TRANSPORT_TESTING_PeerContext *me, 862 void *cls,
851 const struct GNUNET_PeerIdentity *other); 863 struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
852 864 const struct GNUNET_PeerIdentity *other);
853 865
854 866
855/* ********************** low-level filename functions *************** */ 867/* ********************** low-level filename functions *************** */
@@ -875,8 +887,7 @@ GNUNET_TRANSPORT_TESTING_get_test_name (const char *file);
875 * @return configuration name to use 887 * @return configuration name to use
876 */ 888 */
877char * 889char *
878GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, 890GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, int count);
879 int count);
880 891
881 892
882/** 893/**
diff --git a/src/transport/transport.h b/src/transport/transport.h
index d2a3a262b..ed89940cc 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -123,10 +123,21 @@ struct ConnectInfoMessage
123 */ 123 */
124 struct GNUNET_MessageHeader header; 124 struct GNUNET_MessageHeader header;
125 125
126#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
127 defined(GNUNET_TRANSPORT_CORE_VERSION))
128
129 /**
130 * Always zero, for alignment.
131 */
132 uint32_t reserved GNUNET_PACKED;
133
134#else
135
126 /** 136 /**
127 * Current outbound quota for this peer 137 * Current outbound quota for this peer
128 */ 138 */
129 struct GNUNET_BANDWIDTH_Value32NBO quota_out; 139 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
140#endif
130 141
131 /** 142 /**
132 * Identity of the new neighbour. 143 * Identity of the new neighbour.
@@ -163,6 +174,8 @@ struct DisconnectInfoMessage
163 * Message used to set a particular bandwidth quota. Sent TO the 174 * Message used to set a particular bandwidth quota. Sent TO the
164 * service to set an incoming quota, sent FROM the service to update 175 * service to set an incoming quota, sent FROM the service to update
165 * an outgoing quota. 176 * an outgoing quota.
177 *
178 * NOTE: no longer used in TNG!
166 */ 179 */
167struct QuotaSetMessage 180struct QuotaSetMessage
168{ 181{
@@ -215,6 +228,13 @@ struct SendOkMessage
215 */ 228 */
216 struct GNUNET_MessageHeader header; 229 struct GNUNET_MessageHeader header;
217 230
231#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
232 defined(GNUNET_TRANSPORT_CORE_VERSION))
233
234 uint32_t reserved GNUNET_PACKED;
235
236#else
237
218 /** 238 /**
219 * #GNUNET_OK if the transmission succeeded, 239 * #GNUNET_OK if the transmission succeeded,
220 * #GNUNET_SYSERR if it failed (i.e. network disconnect); 240 * #GNUNET_SYSERR if it failed (i.e. network disconnect);
@@ -229,11 +249,13 @@ struct SendOkMessage
229 uint16_t bytes_msg GNUNET_PACKED; 249 uint16_t bytes_msg GNUNET_PACKED;
230 250
231 /** 251 /**
232 * Size of message sent over wire 252 * Size of message sent over wire.
233 * Includes plugin and protocol specific overhead 253 * Includes plugin and protocol specific overheads.
234 */ 254 */
235 uint32_t bytes_physical GNUNET_PACKED; 255 uint32_t bytes_physical GNUNET_PACKED;
236 256
257#endif
258
237 /** 259 /**
238 * Which peer can send more now? 260 * Which peer can send more now?
239 */ 261 */
@@ -242,6 +264,32 @@ struct SendOkMessage
242 264
243 265
244/** 266/**
267 * Message used to notify the transport API that it can
268 * send another message to the transport service.
269 * (Used to implement flow control.)
270 */
271struct RecvOkMessage
272{
273
274 /**
275 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK
276 */
277 struct GNUNET_MessageHeader header;
278
279 /**
280 * Number of messages by which to increase the window, greater or
281 * equal to one.
282 */
283 uint32_t increase_window_delta GNUNET_PACKED;
284
285 /**
286 * Which peer can CORE handle more from now?
287 */
288 struct GNUNET_PeerIdentity peer;
289};
290
291
292/**
245 * Message used to notify the transport service about a message 293 * Message used to notify the transport service about a message
246 * to be transmitted to another peer. The actual message follows. 294 * to be transmitted to another peer. The actual message follows.
247 */ 295 */
@@ -258,10 +306,14 @@ struct OutboundMessage
258 */ 306 */
259 uint32_t reserved GNUNET_PACKED; 307 uint32_t reserved GNUNET_PACKED;
260 308
309#if ! (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
310 defined(GNUNET_TRANSPORT_CORE_VERSION))
311
261 /** 312 /**
262 * Allowed delay. 313 * Allowed delay.
263 */ 314 */
264 struct GNUNET_TIME_RelativeNBO timeout; 315 struct GNUNET_TIME_RelativeNBO timeout;
316#endif
265 317
266 /** 318 /**
267 * Which peer should receive the message? 319 * Which peer should receive the message?
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c
index f00d00a44..a3c49e94f 100644
--- a/src/transport/transport_api2_core.c
+++ b/src/transport/transport_api2_core.c
@@ -32,13 +32,23 @@
32#include "gnunet_transport_core_service.h" 32#include "gnunet_transport_core_service.h"
33#include "transport.h" 33#include "transport.h"
34 34
35#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) 35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
36 36
37/** 37/**
38 * How large to start with for the hashmap of neighbours. 38 * How large to start with for the hashmap of neighbours.
39 */ 39 */
40#define STARTING_NEIGHBOURS_SIZE 16 40#define STARTING_NEIGHBOURS_SIZE 16
41 41
42/**
43 * Window size. How many messages to the same target do we pass
44 * to TRANSPORT without a SEND_OK in between? Small values limit
45 * thoughput, large values will increase latency.
46 *
47 * FIXME-OPTIMIZE: find out what good values are experimentally,
48 * maybe set adaptively (i.e. to observed available bandwidth).
49 */
50#define SEND_WINDOW_SIZE 4
51
42 52
43/** 53/**
44 * Entry in hash table of all of our current (connected) neighbours. 54 * Entry in hash table of all of our current (connected) neighbours.
@@ -72,46 +82,27 @@ struct Neighbour
72 void *handlers_cls; 82 void *handlers_cls;
73 83
74 /** 84 /**
75 * Entry in our readyness heap (which is sorted by @e next_ready 85 * How many messages can we still send to this peer before we should
76 * value). NULL if there is no pending transmission request for 86 * throttle?
77 * this neighbour or if we're waiting for @e is_ready to become
78 * true AFTER the @e out_tracker suggested that this peer's quota
79 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
80 * we should immediately go back into the heap).
81 */ 87 */
82 struct GNUNET_CONTAINER_HeapNode *hn; 88 unsigned int ready_window;
83 89
84 /** 90 /**
85 * Task to trigger MQ when we have enough bandwidth for the 91 * Used to indicate our status if @e env is non-NULL. Set to
86 * next transmission. 92 * #GNUNET_YES if we did pass a message to the MQ and are waiting
93 * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e
94 * ready_window is 0 and @e env is waiting for a
95 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK?
87 */ 96 */
88 struct GNUNET_SCHEDULER_Task *timeout_task; 97 int16_t awaiting_done;
89
90 /**
91 * Outbound bandwidh tracker.
92 */
93 struct GNUNET_BANDWIDTH_Tracker out_tracker;
94
95 /**
96 * Sending consumed more bytes on wire than payload was announced
97 * This overhead is added to the delay of next sending operation
98 */
99 unsigned long long traffic_overhead;
100
101 /**
102 * Is this peer currently ready to receive a message?
103 */
104 int is_ready;
105 98
106 /** 99 /**
107 * Size of the message in @e env. 100 * Size of the message in @e env.
108 */ 101 */
109 uint16_t env_size; 102 uint16_t env_size;
110
111}; 103};
112 104
113 105
114
115/** 106/**
116 * Handle for the transport service (includes all of the 107 * Handle for the transport service (includes all of the
117 * state for the transport service). 108 * state for the transport service).
@@ -141,11 +132,6 @@ struct GNUNET_TRANSPORT_CoreHandle
141 GNUNET_TRANSPORT_NotifyDisconnect nd_cb; 132 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
142 133
143 /** 134 /**
144 * function to call on excess bandwidth events
145 */
146 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
147
148 /**
149 * My client connection to the transport service. 135 * My client connection to the transport service.
150 */ 136 */
151 struct GNUNET_MQ_Handle *mq; 137 struct GNUNET_MQ_Handle *mq;
@@ -181,7 +167,6 @@ struct GNUNET_TRANSPORT_CoreHandle
181 * (if #GNUNET_NO, then @e self is all zeros!). 167 * (if #GNUNET_NO, then @e self is all zeros!).
182 */ 168 */
183 int check_self; 169 int check_self;
184
185}; 170};
186 171
187 172
@@ -206,31 +191,7 @@ static struct Neighbour *
206neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, 191neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
207 const struct GNUNET_PeerIdentity *peer) 192 const struct GNUNET_PeerIdentity *peer)
208{ 193{
209 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, 194 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
210 peer);
211}
212
213
214/**
215 * Function called by the bandwidth tracker if we have excess
216 * bandwidth.
217 *
218 * @param cls the `struct Neighbour` that has excess bandwidth
219 */
220static void
221notify_excess_cb (void *cls)
222{
223 struct Neighbour *n = cls;
224 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
225
226 LOG (GNUNET_ERROR_TYPE_DEBUG,
227 "Notifying CORE that more bandwidth is available for %s\n",
228 GNUNET_i2s (&n->id));
229
230 if (NULL != h->neb_cb)
231 h->neb_cb (h->cls,
232 &n->id,
233 n->handlers_cls);
234} 195}
235 196
236 197
@@ -245,9 +206,7 @@ notify_excess_cb (void *cls)
245 * #GNUNET_NO if not. 206 * #GNUNET_NO if not.
246 */ 207 */
247static int 208static int
248neighbour_delete (void *cls, 209neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
249 const struct GNUNET_PeerIdentity *key,
250 void *value)
251{ 210{
252 struct GNUNET_TRANSPORT_CoreHandle *handle = cls; 211 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
253 struct Neighbour *n = value; 212 struct Neighbour *n = value;
@@ -255,16 +214,8 @@ neighbour_delete (void *cls,
255 LOG (GNUNET_ERROR_TYPE_DEBUG, 214 LOG (GNUNET_ERROR_TYPE_DEBUG,
256 "Dropping entry for neighbour `%s'.\n", 215 "Dropping entry for neighbour `%s'.\n",
257 GNUNET_i2s (key)); 216 GNUNET_i2s (key));
258 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
259 if (NULL != handle->nd_cb) 217 if (NULL != handle->nd_cb)
260 handle->nd_cb (handle->cls, 218 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
261 &n->id,
262 n->handlers_cls);
263 if (NULL != n->timeout_task)
264 {
265 GNUNET_SCHEDULER_cancel (n->timeout_task);
266 n->timeout_task = NULL;
267 }
268 if (NULL != n->env) 219 if (NULL != n->env)
269 { 220 {
270 GNUNET_MQ_send_cancel (n->env); 221 GNUNET_MQ_send_cancel (n->env);
@@ -272,10 +223,9 @@ neighbour_delete (void *cls,
272 } 223 }
273 GNUNET_MQ_destroy (n->mq); 224 GNUNET_MQ_destroy (n->mq);
274 GNUNET_assert (NULL == n->mq); 225 GNUNET_assert (NULL == n->mq);
275 GNUNET_assert (GNUNET_YES == 226 GNUNET_assert (
276 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, 227 GNUNET_YES ==
277 key, 228 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
278 n));
279 GNUNET_free (n); 229 GNUNET_free (n);
280 return GNUNET_YES; 230 return GNUNET_YES;
281} 231}
@@ -291,8 +241,7 @@ neighbour_delete (void *cls,
291 * @param error error code 241 * @param error error code
292 */ 242 */
293static void 243static void
294mq_error_handler (void *cls, 244mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
295 enum GNUNET_MQ_Error error)
296{ 245{
297 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 246 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
298 247
@@ -306,57 +255,42 @@ mq_error_handler (void *cls,
306 * A message from the handler's message queue to a neighbour was 255 * A message from the handler's message queue to a neighbour was
307 * transmitted. Now trigger (possibly delayed) notification of the 256 * transmitted. Now trigger (possibly delayed) notification of the
308 * neighbour's message queue that we are done and thus ready for 257 * neighbour's message queue that we are done and thus ready for
309 * the next message. 258 * the next message. Note that the MQ being ready is independent
259 * of the send window, as we may queue many messages and simply
260 * not pass them to TRANSPORT if the send window is insufficient.
310 * 261 *
311 * @param cls the `struct Neighbour` where the message was sent 262 * @param cls the `struct Neighbour` where the message was sent
312 */ 263 */
313static void 264static void
314notify_send_done_fin (void *cls) 265notify_send_done (void *cls)
315{ 266{
316 struct Neighbour *n = cls; 267 struct Neighbour *n = cls;
317 268
318 n->timeout_task = NULL; 269 n->awaiting_done = GNUNET_NO;
319 n->is_ready = GNUNET_YES; 270 n->env = NULL;
320 GNUNET_MQ_impl_send_continue (n->mq); 271 GNUNET_MQ_impl_send_continue (n->mq);
321} 272}
322 273
323 274
324/** 275/**
325 * A message from the handler's message queue to a neighbour was 276 * We have an envelope waiting for transmission at @a n, and
326 * transmitted. Now trigger (possibly delayed) notification of the 277 * our transmission window is positive. Perform the transmission.
327 * neighbour's message queue that we are done and thus ready for
328 * the next message.
329 * 278 *
330 * @param cls the `struct Neighbour` where the message was sent 279 * @param n neighbour to perform transmission for
331 */ 280 */
332static void 281static void
333notify_send_done (void *cls) 282do_send (struct Neighbour *n)
334{ 283{
335 struct Neighbour *n = cls; 284 GNUNET_assert (0 < n->ready_window);
336 struct GNUNET_TIME_Relative delay; 285 GNUNET_assert (NULL != n->env);
337 286 n->ready_window--;
338 n->timeout_task = NULL; 287 n->awaiting_done = GNUNET_YES;
339 if (NULL != n->env) 288 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
340 { 289 GNUNET_MQ_send (n->h->mq, n->env);
341 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, 290 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 n->env_size + n->traffic_overhead); 291 "Passed message of type %u for neighbour `%s' to TRANSPORT.\n",
343 n->env = NULL; 292 ntohs (GNUNET_MQ_env_get_msg (n->env)->type),
344 n->traffic_overhead = 0; 293 GNUNET_i2s (&n->id));
345 }
346 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
347 128);
348 if (0 == delay.rel_value_us)
349 {
350 n->is_ready = GNUNET_YES;
351 GNUNET_MQ_impl_send_continue (n->mq);
352 return;
353 }
354 GNUNET_MQ_impl_send_in_flight (n->mq);
355 /* cannot send even a small message without violating
356 quota, wait a before allowing MQ to send next message */
357 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
358 &notify_send_done_fin,
359 n);
360} 294}
361 295
362 296
@@ -376,11 +310,9 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
376 void *impl_state) 310 void *impl_state)
377{ 311{
378 struct Neighbour *n = impl_state; 312 struct Neighbour *n = impl_state;
379 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
380 struct OutboundMessage *obm; 313 struct OutboundMessage *obm;
381 uint16_t msize; 314 uint16_t msize;
382 315
383 GNUNET_assert (GNUNET_YES == n->is_ready);
384 msize = ntohs (msg->size); 316 msize = ntohs (msg->size);
385 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) 317 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
386 { 318 {
@@ -388,25 +320,24 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
388 GNUNET_MQ_impl_send_continue (mq); 320 GNUNET_MQ_impl_send_continue (mq);
389 return; 321 return;
390 } 322 }
391 GNUNET_assert (NULL == n->env);
392 n->env = GNUNET_MQ_msg_nested_mh (obm,
393 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
394 msg);
395 obm->reserved = htonl (0);
396 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
397 obm->peer = n->id;
398 GNUNET_assert (NULL == n->timeout_task);
399 n->is_ready = GNUNET_NO;
400 n->env_size = ntohs (msg->size);
401 GNUNET_MQ_notify_sent (n->env,
402 &notify_send_done,
403 n);
404 GNUNET_MQ_send (h->mq,
405 n->env);
406 LOG (GNUNET_ERROR_TYPE_DEBUG, 323 LOG (GNUNET_ERROR_TYPE_DEBUG,
407 "Queued message of type %u for neighbour `%s'.\n", 324 "CORE requested transmission of message of type %u to neighbour `%s'.\n",
408 ntohs (msg->type), 325 ntohs (msg->type),
409 GNUNET_i2s (&n->id)); 326 GNUNET_i2s (&n->id));
327
328 GNUNET_assert (NULL == n->env);
329 n->env =
330 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
331 n->env_size = ntohs (msg->size);
332 obm->reserved = htonl (0);
333 obm->peer = n->id;
334 if (0 == n->ready_window)
335 {
336 LOG (GNUNET_ERROR_TYPE_DEBUG,
337 "Flow control delays transmission to CORE until we see SEND_OK.\n");
338 return; /* can't send yet, need to wait for SEND_OK */
339 }
340 do_send (n);
410} 341}
411 342
412 343
@@ -418,8 +349,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
418 * @param impl_state state of the implementation 349 * @param impl_state state of the implementation
419 */ 350 */
420static void 351static void
421mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 352mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
422 void *impl_state)
423{ 353{
424 struct Neighbour *n = impl_state; 354 struct Neighbour *n = impl_state;
425 355
@@ -436,19 +366,22 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
436 * @param impl_state state specific to the implementation 366 * @param impl_state state specific to the implementation
437 */ 367 */
438static void 368static void
439mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 369mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
440 void *impl_state)
441{ 370{
442 struct Neighbour *n = impl_state; 371 struct Neighbour *n = impl_state;
443 372
444 GNUNET_assert (GNUNET_NO == n->is_ready); 373 n->ready_window++;
445 if (NULL != n->env) 374 if (GNUNET_YES == n->awaiting_done)
446 { 375 {
447 GNUNET_MQ_send_cancel (n->env); 376 GNUNET_MQ_send_cancel (n->env);
448 n->env = NULL; 377 n->env = NULL;
378 n->awaiting_done = GNUNET_NO;
379 }
380 else
381 {
382 GNUNET_assert (0 == n->ready_window);
383 n->env = NULL;
449 } 384 }
450
451 n->is_ready = GNUNET_YES;
452} 385}
453 386
454 387
@@ -461,8 +394,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
461 * @param error error code 394 * @param error error code
462 */ 395 */
463static void 396static void
464peer_mq_error_handler (void *cls, 397peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
465 enum GNUNET_MQ_Error error)
466{ 398{
467 /* struct Neighbour *n = cls; */ 399 /* struct Neighbour *n = cls; */
468 400
@@ -471,47 +403,21 @@ peer_mq_error_handler (void *cls,
471 403
472 404
473/** 405/**
474 * The outbound quota has changed in a way that may require
475 * us to reset the timeout. Update the timeout.
476 *
477 * @param cls the `struct Neighbour` for which the timeout changed
478 */
479static void
480outbound_bw_tracker_update (void *cls)
481{
482 struct Neighbour *n = cls;
483 struct GNUNET_TIME_Relative delay;
484
485 if (NULL == n->timeout_task)
486 return;
487 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
488 128);
489 GNUNET_SCHEDULER_cancel (n->timeout_task);
490 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
491 &notify_send_done,
492 n);
493}
494
495
496/**
497 * Function we use for handling incoming connect messages. 406 * Function we use for handling incoming connect messages.
498 * 407 *
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` 408 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param cim message received 409 * @param cim message received
501 */ 410 */
502static void 411static void
503handle_connect (void *cls, 412handle_connect (void *cls, const struct ConnectInfoMessage *cim)
504 const struct ConnectInfoMessage *cim)
505{ 413{
506 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 414 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
507 struct Neighbour *n; 415 struct Neighbour *n;
508 416
509 LOG (GNUNET_ERROR_TYPE_DEBUG, 417 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Receiving CONNECT message for `%s' with quota %u\n", 418 "Receiving CONNECT message for `%s'\n",
511 GNUNET_i2s (&cim->id), 419 GNUNET_i2s (&cim->id));
512 ntohl (cim->quota_out.value__)); 420 n = neighbour_find (h, &cim->id);
513 n = neighbour_find (h,
514 &cim->id);
515 if (NULL != n) 421 if (NULL != n)
516 { 422 {
517 GNUNET_break (0); 423 GNUNET_break (0);
@@ -521,23 +427,14 @@ handle_connect (void *cls,
521 n = GNUNET_new (struct Neighbour); 427 n = GNUNET_new (struct Neighbour);
522 n->id = cim->id; 428 n->id = cim->id;
523 n->h = h; 429 n->h = h;
524 n->is_ready = GNUNET_YES; 430 n->ready_window = SEND_WINDOW_SIZE;
525 n->traffic_overhead = 0;
526 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
527 &outbound_bw_tracker_update,
528 n,
529 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
530 MAX_BANDWIDTH_CARRY_S,
531 &notify_excess_cb,
532 n);
533 GNUNET_assert (GNUNET_OK == 431 GNUNET_assert (GNUNET_OK ==
534 GNUNET_CONTAINER_multipeermap_put (h->neighbours, 432 GNUNET_CONTAINER_multipeermap_put (
535 &n->id, 433 h->neighbours,
536 n, 434 &n->id,
537 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 435 n,
436 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
538 437
539 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
540 cim->quota_out);
541 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, 438 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
542 &mq_destroy_impl, 439 &mq_destroy_impl,
543 &mq_cancel_impl, 440 &mq_cancel_impl,
@@ -547,11 +444,8 @@ handle_connect (void *cls,
547 n); 444 n);
548 if (NULL != h->nc_cb) 445 if (NULL != h->nc_cb)
549 { 446 {
550 n->handlers_cls = h->nc_cb (h->cls, 447 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
551 &n->id, 448 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
552 n->mq);
553 GNUNET_MQ_set_handlers_closure (n->mq,
554 n->handlers_cls);
555 } 449 }
556} 450}
557 451
@@ -563,8 +457,7 @@ handle_connect (void *cls,
563 * @param dim message received 457 * @param dim message received
564 */ 458 */
565static void 459static void
566handle_disconnect (void *cls, 460handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
567 const struct DisconnectInfoMessage *dim)
568{ 461{
569 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 462 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
570 struct Neighbour *n; 463 struct Neighbour *n;
@@ -573,18 +466,14 @@ handle_disconnect (void *cls,
573 LOG (GNUNET_ERROR_TYPE_DEBUG, 466 LOG (GNUNET_ERROR_TYPE_DEBUG,
574 "Receiving DISCONNECT message for `%s'.\n", 467 "Receiving DISCONNECT message for `%s'.\n",
575 GNUNET_i2s (&dim->peer)); 468 GNUNET_i2s (&dim->peer));
576 n = neighbour_find (h, 469 n = neighbour_find (h, &dim->peer);
577 &dim->peer);
578 if (NULL == n) 470 if (NULL == n)
579 { 471 {
580 GNUNET_break (0); 472 GNUNET_break (0);
581 disconnect_and_schedule_reconnect (h); 473 disconnect_and_schedule_reconnect (h);
582 return; 474 return;
583 } 475 }
584 GNUNET_assert (GNUNET_YES == 476 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
585 neighbour_delete (h,
586 &dim->peer,
587 n));
588} 477}
589 478
590 479
@@ -595,24 +484,15 @@ handle_disconnect (void *cls,
595 * @param okm message received 484 * @param okm message received
596 */ 485 */
597static void 486static void
598handle_send_ok (void *cls, 487handle_send_ok (void *cls, const struct SendOkMessage *okm)
599 const struct SendOkMessage *okm)
600{ 488{
601 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 489 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
602 struct Neighbour *n; 490 struct Neighbour *n;
603 uint16_t bytes_msg;
604 uint32_t bytes_physical;
605 491
606 bytes_msg = ntohs (okm->bytes_msg);
607 bytes_physical = ntohl (okm->bytes_physical);
608 LOG (GNUNET_ERROR_TYPE_DEBUG, 492 LOG (GNUNET_ERROR_TYPE_DEBUG,
609 "Receiving SEND_OK message, transmission to %s %s.\n", 493 "Receiving SEND_OK message for transmission to %s\n",
610 GNUNET_i2s (&okm->peer), 494 GNUNET_i2s (&okm->peer));
611 (GNUNET_OK == ntohs (okm->success)) 495 n = neighbour_find (h, &okm->peer);
612 ? "succeeded"
613 : "failed");
614 n = neighbour_find (h,
615 &okm->peer);
616 if (NULL == n) 496 if (NULL == n)
617 { 497 {
618 /* We should never get a 'SEND_OK' for a peer that we are not 498 /* We should never get a 'SEND_OK' for a peer that we are not
@@ -621,14 +501,9 @@ handle_send_ok (void *cls,
621 disconnect_and_schedule_reconnect (h); 501 disconnect_and_schedule_reconnect (h);
622 return; 502 return;
623 } 503 }
624 if (bytes_physical > bytes_msg) 504 n->ready_window++;
625 { 505 if ((NULL != n->env) && (1 == n->ready_window))
626 LOG (GNUNET_ERROR_TYPE_DEBUG, 506 do_send (n);
627 "Overhead for %u byte message was %u\n",
628 (unsigned int) bytes_msg,
629 (unsigned int) (bytes_physical - bytes_msg));
630 n->traffic_overhead += bytes_physical - bytes_msg;
631 }
632} 507}
633 508
634 509
@@ -639,8 +514,7 @@ handle_send_ok (void *cls,
639 * @param im message received 514 * @param im message received
640 */ 515 */
641static int 516static int
642check_recv (void *cls, 517check_recv (void *cls, const struct InboundMessage *im)
643 const struct InboundMessage *im)
644{ 518{
645 const struct GNUNET_MessageHeader *imm; 519 const struct GNUNET_MessageHeader *imm;
646 uint16_t size; 520 uint16_t size;
@@ -668,12 +542,11 @@ check_recv (void *cls,
668 * @param im message received 542 * @param im message received
669 */ 543 */
670static void 544static void
671handle_recv (void *cls, 545handle_recv (void *cls, const struct InboundMessage *im)
672 const struct InboundMessage *im)
673{ 546{
674 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 547 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
675 const struct GNUNET_MessageHeader *imm 548 const struct GNUNET_MessageHeader *imm =
676 = (const struct GNUNET_MessageHeader *) &im[1]; 549 (const struct GNUNET_MessageHeader *) &im[1];
677 struct Neighbour *n; 550 struct Neighbour *n;
678 551
679 LOG (GNUNET_ERROR_TYPE_DEBUG, 552 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -681,46 +554,14 @@ handle_recv (void *cls,
681 (unsigned int) ntohs (imm->type), 554 (unsigned int) ntohs (imm->type),
682 (unsigned int) ntohs (imm->size), 555 (unsigned int) ntohs (imm->size),
683 GNUNET_i2s (&im->peer)); 556 GNUNET_i2s (&im->peer));
684 n = neighbour_find (h, 557 n = neighbour_find (h, &im->peer);
685 &im->peer);
686 if (NULL == n)
687 {
688 GNUNET_break (0);
689 disconnect_and_schedule_reconnect (h);
690 return;
691 }
692 GNUNET_MQ_inject_message (n->mq,
693 imm);
694}
695
696
697/**
698 * Function we use for handling incoming set quota messages.
699 *
700 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
701 * @param msg message received
702 */
703static void
704handle_set_quota (void *cls,
705 const struct QuotaSetMessage *qm)
706{
707 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
708 struct Neighbour *n;
709
710 n = neighbour_find (h,
711 &qm->peer);
712 if (NULL == n) 558 if (NULL == n)
713 { 559 {
714 GNUNET_break (0); 560 GNUNET_break (0);
715 disconnect_and_schedule_reconnect (h); 561 disconnect_and_schedule_reconnect (h);
716 return; 562 return;
717 } 563 }
718 LOG (GNUNET_ERROR_TYPE_DEBUG, 564 GNUNET_MQ_inject_message (n->mq, imm);
719 "Receiving SET_QUOTA message for `%s' with quota %u\n",
720 GNUNET_i2s (&qm->peer),
721 (unsigned int) ntohl (qm->quota.value__));
722 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
723 qm->quota);
724} 565}
725 566
726 567
@@ -733,46 +574,36 @@ static void
733reconnect (void *cls) 574reconnect (void *cls)
734{ 575{
735 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 576 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
736 struct GNUNET_MQ_MessageHandler handlers[] = { 577 struct GNUNET_MQ_MessageHandler handlers[] =
737 GNUNET_MQ_hd_fixed_size (connect, 578 {GNUNET_MQ_hd_fixed_size (connect,
738 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, 579 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
739 struct ConnectInfoMessage, 580 struct ConnectInfoMessage,
740 h), 581 h),
741 GNUNET_MQ_hd_fixed_size (disconnect, 582 GNUNET_MQ_hd_fixed_size (disconnect,
742 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, 583 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
743 struct DisconnectInfoMessage, 584 struct DisconnectInfoMessage,
744 h), 585 h),
745 GNUNET_MQ_hd_fixed_size (send_ok, 586 GNUNET_MQ_hd_fixed_size (send_ok,
746 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, 587 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
747 struct SendOkMessage, 588 struct SendOkMessage,
748 h), 589 h),
749 GNUNET_MQ_hd_var_size (recv, 590 GNUNET_MQ_hd_var_size (recv,
750 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, 591 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
751 struct InboundMessage, 592 struct InboundMessage,
752 h), 593 h),
753 GNUNET_MQ_hd_fixed_size (set_quota, 594 GNUNET_MQ_handler_end ()};
754 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
755 struct QuotaSetMessage,
756 h),
757 GNUNET_MQ_handler_end ()
758 };
759 struct GNUNET_MQ_Envelope *env; 595 struct GNUNET_MQ_Envelope *env;
760 struct StartMessage *s; 596 struct StartMessage *s;
761 uint32_t options; 597 uint32_t options;
762 598
763 h->reconnect_task = NULL; 599 h->reconnect_task = NULL;
764 LOG (GNUNET_ERROR_TYPE_DEBUG, 600 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
765 "Connecting to transport service.\n");
766 GNUNET_assert (NULL == h->mq); 601 GNUNET_assert (NULL == h->mq);
767 h->mq = GNUNET_CLIENT_connect (h->cfg, 602 h->mq =
768 "transport", 603 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
769 handlers,
770 &mq_error_handler,
771 h);
772 if (NULL == h->mq) 604 if (NULL == h->mq)
773 return; 605 return;
774 env = GNUNET_MQ_msg (s, 606 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
775 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
776 options = 0; 607 options = 0;
777 if (h->check_self) 608 if (h->check_self)
778 options |= 1; 609 options |= 1;
@@ -780,8 +611,7 @@ reconnect (void *cls)
780 options |= 2; 611 options |= 2;
781 s->options = htonl (options); 612 s->options = htonl (options);
782 s->self = h->self; 613 s->self = h->self;
783 GNUNET_MQ_send (h->mq, 614 GNUNET_MQ_send (h->mq, env);
784 env);
785} 615}
786 616
787 617
@@ -793,9 +623,7 @@ reconnect (void *cls)
793static void 623static void
794disconnect (struct GNUNET_TRANSPORT_CoreHandle *h) 624disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
795{ 625{
796 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, 626 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
797 &neighbour_delete,
798 h);
799 if (NULL != h->mq) 627 if (NULL != h->mq)
800 { 628 {
801 GNUNET_MQ_destroy (h->mq); 629 GNUNET_MQ_destroy (h->mq);
@@ -817,12 +645,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
817 disconnect (h); 645 disconnect (h);
818 LOG (GNUNET_ERROR_TYPE_DEBUG, 646 LOG (GNUNET_ERROR_TYPE_DEBUG,
819 "Scheduling task to reconnect to transport service in %s.\n", 647 "Scheduling task to reconnect to transport service in %s.\n",
820 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 648 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
821 GNUNET_YES));
822 h->reconnect_task = 649 h->reconnect_task =
823 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 650 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
824 &reconnect,
825 h);
826 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 651 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
827} 652}
828 653
@@ -840,8 +665,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
840{ 665{
841 struct Neighbour *n; 666 struct Neighbour *n;
842 667
843 n = neighbour_find (handle, 668 n = neighbour_find (handle, peer);
844 peer);
845 if (NULL == n) 669 if (NULL == n)
846 return NULL; 670 return NULL;
847 return n->mq; 671 return n->mq;
@@ -849,6 +673,45 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
849 673
850 674
851/** 675/**
676 * Notification from the CORE service to the TRANSPORT service
677 * that the CORE service has finished processing a message from
678 * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
679 * and that it is thus now OK for TRANSPORT to send more messages
680 * for @a pid.
681 *
682 * Used to provide flow control, this is our equivalent to
683 * #GNUNET_SERVICE_client_continue() of an ordinary service.
684 *
685 * Note that due to the use of a window, TRANSPORT may send multiple
686 * messages destined for the same peer even without an intermediate
687 * call to this function. However, CORE must still call this function
688 * once per message received, as otherwise eventually the window will
689 * be full and TRANSPORT will stop providing messages to CORE for @a
690 * pid.
691 *
692 * @param ch core handle
693 * @param pid which peer was the message from that was fully processed by CORE
694 */
695void
696GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
697 const struct GNUNET_PeerIdentity *pid)
698{
699 struct GNUNET_MQ_Envelope *env;
700 struct RecvOkMessage *rok;
701
702 LOG (GNUNET_ERROR_TYPE_DEBUG,
703 "Message for %s finished CORE processing, sending RECV_OK.\n",
704 GNUNET_i2s (pid));
705 if (NULL == ch->mq)
706 return;
707 env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
708 rok->increase_window_delta = htonl (1);
709 rok->peer = *pid;
710 GNUNET_MQ_send (ch->mq, env);
711}
712
713
714/**
852 * Connect to the transport service. Note that the connection may 715 * Connect to the transport service. Note that the connection may
853 * complete (or fail) asynchronously. 716 * complete (or fail) asynchronously.
854 * 717 *
@@ -859,17 +722,15 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
859 * @param rec receive function to call 722 * @param rec receive function to call
860 * @param nc function to call on connect events 723 * @param nc function to call on connect events
861 * @param nd function to call on disconnect events 724 * @param nd function to call on disconnect events
862 * @param neb function to call if we have excess bandwidth to a peer
863 * @return NULL on error 725 * @return NULL on error
864 */ 726 */
865struct GNUNET_TRANSPORT_CoreHandle * 727struct GNUNET_TRANSPORT_CoreHandle *
866GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 728GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
867 const struct GNUNET_PeerIdentity *self, 729 const struct GNUNET_PeerIdentity *self,
868 const struct GNUNET_MQ_MessageHandler *handlers, 730 const struct GNUNET_MQ_MessageHandler *handlers,
869 void *cls, 731 void *cls,
870 GNUNET_TRANSPORT_NotifyConnect nc, 732 GNUNET_TRANSPORT_NotifyConnect nc,
871 GNUNET_TRANSPORT_NotifyDisconnect nd, 733 GNUNET_TRANSPORT_NotifyDisconnect nd)
872 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
873{ 734{
874 struct GNUNET_TRANSPORT_CoreHandle *h; 735 struct GNUNET_TRANSPORT_CoreHandle *h;
875 unsigned int i; 736 unsigned int i;
@@ -884,19 +745,17 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
884 h->cls = cls; 745 h->cls = cls;
885 h->nc_cb = nc; 746 h->nc_cb = nc;
886 h->nd_cb = nd; 747 h->nd_cb = nd;
887 h->neb_cb = neb;
888 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 748 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
889 if (NULL != handlers) 749 if (NULL != handlers)
890 { 750 {
891 for (i=0;NULL != handlers[i].cb; i++) ; 751 for (i = 0; NULL != handlers[i].cb; i++)
892 h->handlers = GNUNET_new_array (i + 1, 752 ;
893 struct GNUNET_MQ_MessageHandler); 753 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
894 GNUNET_memcpy (h->handlers, 754 GNUNET_memcpy (h->handlers,
895 handlers, 755 handlers,
896 i * sizeof (struct GNUNET_MQ_MessageHandler)); 756 i * sizeof (struct GNUNET_MQ_MessageHandler));
897 } 757 }
898 LOG (GNUNET_ERROR_TYPE_DEBUG, 758 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
899 "Connecting to transport service\n");
900 reconnect (h); 759 reconnect (h);
901 if (NULL == h->mq) 760 if (NULL == h->mq)
902 { 761 {
@@ -905,8 +764,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
905 return NULL; 764 return NULL;
906 } 765 }
907 h->neighbours = 766 h->neighbours =
908 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, 767 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
909 GNUNET_YES);
910 return h; 768 return h;
911} 769}
912 770
@@ -914,13 +772,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
914/** 772/**
915 * Disconnect from the transport service. 773 * Disconnect from the transport service.
916 * 774 *
917 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() 775 * @param handle handle to the service as returned from
776 * #GNUNET_TRANSPORT_core_connect()
918 */ 777 */
919void 778void
920GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) 779GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
921{ 780{
922 LOG (GNUNET_ERROR_TYPE_DEBUG, 781 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
923 "Transport disconnect called!\n");
924 /* this disconnects all neighbours... */ 782 /* this disconnects all neighbours... */
925 disconnect (handle); 783 disconnect (handle);
926 /* and now we stop trying to connect again... */ 784 /* and now we stop trying to connect again... */
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index e86499173..a163d7ccf 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -29,11 +29,10 @@
29#include "gnunet_arm_service.h" 29#include "gnunet_arm_service.h"
30#include "gnunet_hello_lib.h" 30#include "gnunet_hello_lib.h"
31#include "gnunet_protocols.h" 31#include "gnunet_protocols.h"
32#include "gnunet_transport_core_service.h"
33#include "gnunet_transport_service.h" 32#include "gnunet_transport_service.h"
34#include "transport.h" 33#include "transport.h"
35 34
36#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) 35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
37 36
38/** 37/**
39 * If we could not send any payload to a peer for this amount of 38 * If we could not send any payload to a peer for this amount of
@@ -113,11 +112,9 @@ struct Neighbour
113 * Size of the message in @e env. 112 * Size of the message in @e env.
114 */ 113 */
115 uint16_t env_size; 114 uint16_t env_size;
116
117}; 115};
118 116
119 117
120
121/** 118/**
122 * Handle for the transport service (includes all of the 119 * Handle for the transport service (includes all of the
123 * state for the transport service). 120 * state for the transport service).
@@ -187,7 +184,6 @@ struct GNUNET_TRANSPORT_CoreHandle
187 * (if #GNUNET_NO, then @e self is all zeros!). 184 * (if #GNUNET_NO, then @e self is all zeros!).
188 */ 185 */
189 int check_self; 186 int check_self;
190
191}; 187};
192 188
193 189
@@ -212,8 +208,7 @@ static struct Neighbour *
212neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, 208neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
213 const struct GNUNET_PeerIdentity *peer) 209 const struct GNUNET_PeerIdentity *peer)
214{ 210{
215 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, 211 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
216 peer);
217} 212}
218 213
219 214
@@ -234,9 +229,7 @@ notify_excess_cb (void *cls)
234 GNUNET_i2s (&n->id)); 229 GNUNET_i2s (&n->id));
235 230
236 if (NULL != h->neb_cb) 231 if (NULL != h->neb_cb)
237 h->neb_cb (h->cls, 232 h->neb_cb (h->cls, &n->id, n->handlers_cls);
238 &n->id,
239 n->handlers_cls);
240} 233}
241 234
242 235
@@ -251,9 +244,7 @@ notify_excess_cb (void *cls)
251 * #GNUNET_NO if not. 244 * #GNUNET_NO if not.
252 */ 245 */
253static int 246static int
254neighbour_delete (void *cls, 247neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
255 const struct GNUNET_PeerIdentity *key,
256 void *value)
257{ 248{
258 struct GNUNET_TRANSPORT_CoreHandle *handle = cls; 249 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
259 struct Neighbour *n = value; 250 struct Neighbour *n = value;
@@ -263,9 +254,7 @@ neighbour_delete (void *cls,
263 GNUNET_i2s (key)); 254 GNUNET_i2s (key));
264 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); 255 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
265 if (NULL != handle->nd_cb) 256 if (NULL != handle->nd_cb)
266 handle->nd_cb (handle->cls, 257 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
267 &n->id,
268 n->handlers_cls);
269 if (NULL != n->timeout_task) 258 if (NULL != n->timeout_task)
270 { 259 {
271 GNUNET_SCHEDULER_cancel (n->timeout_task); 260 GNUNET_SCHEDULER_cancel (n->timeout_task);
@@ -278,10 +267,9 @@ neighbour_delete (void *cls,
278 } 267 }
279 GNUNET_MQ_destroy (n->mq); 268 GNUNET_MQ_destroy (n->mq);
280 GNUNET_assert (NULL == n->mq); 269 GNUNET_assert (NULL == n->mq);
281 GNUNET_assert (GNUNET_YES == 270 GNUNET_assert (
282 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, 271 GNUNET_YES ==
283 key, 272 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
284 n));
285 GNUNET_free (n); 273 GNUNET_free (n);
286 return GNUNET_YES; 274 return GNUNET_YES;
287} 275}
@@ -297,8 +285,7 @@ neighbour_delete (void *cls,
297 * @param error error code 285 * @param error error code
298 */ 286 */
299static void 287static void
300mq_error_handler (void *cls, 288mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
301 enum GNUNET_MQ_Error error)
302{ 289{
303 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 290 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
304 291
@@ -317,14 +304,12 @@ mq_error_handler (void *cls,
317 * @return #GNUNET_OK if message is well-formed 304 * @return #GNUNET_OK if message is well-formed
318 */ 305 */
319static int 306static int
320check_hello (void *cls, 307check_hello (void *cls, const struct GNUNET_MessageHeader *msg)
321 const struct GNUNET_MessageHeader *msg)
322{ 308{
323 struct GNUNET_PeerIdentity me; 309 struct GNUNET_PeerIdentity me;
324 310
325 if (GNUNET_OK != 311 if (GNUNET_OK !=
326 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, 312 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
327 &me))
328 { 313 {
329 GNUNET_break (0); 314 GNUNET_break (0);
330 return GNUNET_SYSERR; 315 return GNUNET_SYSERR;
@@ -340,8 +325,7 @@ check_hello (void *cls,
340 * @param msg message received 325 * @param msg message received
341 */ 326 */
342static void 327static void
343handle_hello (void *cls, 328handle_hello (void *cls, const struct GNUNET_MessageHeader *msg)
344 const struct GNUNET_MessageHeader *msg)
345{ 329{
346 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ 330 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
347} 331}
@@ -388,8 +372,7 @@ notify_send_done (void *cls)
388 n->env = NULL; 372 n->env = NULL;
389 n->traffic_overhead = 0; 373 n->traffic_overhead = 0;
390 } 374 }
391 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 375 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
392 128);
393 if (0 == delay.rel_value_us) 376 if (0 == delay.rel_value_us)
394 { 377 {
395 n->is_ready = GNUNET_YES; 378 n->is_ready = GNUNET_YES;
@@ -399,9 +382,8 @@ notify_send_done (void *cls)
399 GNUNET_MQ_impl_send_in_flight (n->mq); 382 GNUNET_MQ_impl_send_in_flight (n->mq);
400 /* cannot send even a small message without violating 383 /* cannot send even a small message without violating
401 quota, wait a before allowing MQ to send next message */ 384 quota, wait a before allowing MQ to send next message */
402 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, 385 n->timeout_task =
403 &notify_send_done_fin, 386 GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done_fin, n);
404 n);
405} 387}
406 388
407 389
@@ -434,20 +416,17 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
434 return; 416 return;
435 } 417 }
436 GNUNET_assert (NULL == n->env); 418 GNUNET_assert (NULL == n->env);
437 n->env = GNUNET_MQ_msg_nested_mh (obm, 419 n->env =
438 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 420 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
439 msg);
440 obm->reserved = htonl (0); 421 obm->reserved = htonl (0);
441 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ 422 obm->timeout = GNUNET_TIME_relative_hton (
423 GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
442 obm->peer = n->id; 424 obm->peer = n->id;
443 GNUNET_assert (NULL == n->timeout_task); 425 GNUNET_assert (NULL == n->timeout_task);
444 n->is_ready = GNUNET_NO; 426 n->is_ready = GNUNET_NO;
445 n->env_size = ntohs (msg->size); 427 n->env_size = ntohs (msg->size);
446 GNUNET_MQ_notify_sent (n->env, 428 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
447 &notify_send_done, 429 GNUNET_MQ_send (h->mq, n->env);
448 n);
449 GNUNET_MQ_send (h->mq,
450 n->env);
451 LOG (GNUNET_ERROR_TYPE_DEBUG, 430 LOG (GNUNET_ERROR_TYPE_DEBUG,
452 "Queued message of type %u for neighbour `%s'.\n", 431 "Queued message of type %u for neighbour `%s'.\n",
453 ntohs (msg->type), 432 ntohs (msg->type),
@@ -463,8 +442,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
463 * @param impl_state state of the implementation 442 * @param impl_state state of the implementation
464 */ 443 */
465static void 444static void
466mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 445mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
467 void *impl_state)
468{ 446{
469 struct Neighbour *n = impl_state; 447 struct Neighbour *n = impl_state;
470 448
@@ -481,8 +459,7 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
481 * @param impl_state state specific to the implementation 459 * @param impl_state state specific to the implementation
482 */ 460 */
483static void 461static void
484mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 462mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
485 void *impl_state)
486{ 463{
487 struct Neighbour *n = impl_state; 464 struct Neighbour *n = impl_state;
488 465
@@ -506,8 +483,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
506 * @param error error code 483 * @param error error code
507 */ 484 */
508static void 485static void
509peer_mq_error_handler (void *cls, 486peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
510 enum GNUNET_MQ_Error error)
511{ 487{
512 /* struct Neighbour *n = cls; */ 488 /* struct Neighbour *n = cls; */
513 489
@@ -529,12 +505,9 @@ outbound_bw_tracker_update (void *cls)
529 505
530 if (NULL == n->timeout_task) 506 if (NULL == n->timeout_task)
531 return; 507 return;
532 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 508 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
533 128);
534 GNUNET_SCHEDULER_cancel (n->timeout_task); 509 GNUNET_SCHEDULER_cancel (n->timeout_task);
535 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, 510 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done, n);
536 &notify_send_done,
537 n);
538} 511}
539 512
540 513
@@ -545,8 +518,7 @@ outbound_bw_tracker_update (void *cls)
545 * @param cim message received 518 * @param cim message received
546 */ 519 */
547static void 520static void
548handle_connect (void *cls, 521handle_connect (void *cls, const struct ConnectInfoMessage *cim)
549 const struct ConnectInfoMessage *cim)
550{ 522{
551 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 523 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
552 struct Neighbour *n; 524 struct Neighbour *n;
@@ -555,8 +527,7 @@ handle_connect (void *cls,
555 "Receiving CONNECT message for `%s' with quota %u\n", 527 "Receiving CONNECT message for `%s' with quota %u\n",
556 GNUNET_i2s (&cim->id), 528 GNUNET_i2s (&cim->id),
557 ntohl (cim->quota_out.value__)); 529 ntohl (cim->quota_out.value__));
558 n = neighbour_find (h, 530 n = neighbour_find (h, &cim->id);
559 &cim->id);
560 if (NULL != n) 531 if (NULL != n)
561 { 532 {
562 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */ 533 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */
@@ -576,13 +547,13 @@ handle_connect (void *cls,
576 &notify_excess_cb, 547 &notify_excess_cb,
577 n); 548 n);
578 GNUNET_assert (GNUNET_OK == 549 GNUNET_assert (GNUNET_OK ==
579 GNUNET_CONTAINER_multipeermap_put (h->neighbours, 550 GNUNET_CONTAINER_multipeermap_put (
580 &n->id, 551 h->neighbours,
581 n, 552 &n->id,
582 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 553 n,
554 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
583 555
584 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, 556 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out);
585 cim->quota_out);
586 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, 557 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
587 &mq_destroy_impl, 558 &mq_destroy_impl,
588 &mq_cancel_impl, 559 &mq_cancel_impl,
@@ -592,11 +563,8 @@ handle_connect (void *cls,
592 n); 563 n);
593 if (NULL != h->nc_cb) 564 if (NULL != h->nc_cb)
594 { 565 {
595 n->handlers_cls = h->nc_cb (h->cls, 566 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
596 &n->id, 567 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
597 n->mq);
598 GNUNET_MQ_set_handlers_closure (n->mq,
599 n->handlers_cls);
600 } 568 }
601} 569}
602 570
@@ -608,8 +576,7 @@ handle_connect (void *cls,
608 * @param dim message received 576 * @param dim message received
609 */ 577 */
610static void 578static void
611handle_disconnect (void *cls, 579handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
612 const struct DisconnectInfoMessage *dim)
613{ 580{
614 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 581 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
615 struct Neighbour *n; 582 struct Neighbour *n;
@@ -625,10 +592,7 @@ handle_disconnect (void *cls,
625 disconnect_and_schedule_reconnect (h); 592 disconnect_and_schedule_reconnect (h);
626 return; 593 return;
627 } 594 }
628 GNUNET_assert (GNUNET_YES == 595 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
629 neighbour_delete (h,
630 &dim->peer,
631 n));
632} 596}
633 597
634 598
@@ -639,8 +603,7 @@ handle_disconnect (void *cls,
639 * @param okm message received 603 * @param okm message received
640 */ 604 */
641static void 605static void
642handle_send_ok (void *cls, 606handle_send_ok (void *cls, const struct SendOkMessage *okm)
643 const struct SendOkMessage *okm)
644{ 607{
645 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 608 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
646 struct Neighbour *n; 609 struct Neighbour *n;
@@ -653,8 +616,7 @@ handle_send_ok (void *cls,
653 "Receiving SEND_OK message, transmission to %s %s.\n", 616 "Receiving SEND_OK message, transmission to %s %s.\n",
654 GNUNET_i2s (&okm->peer), 617 GNUNET_i2s (&okm->peer),
655 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); 618 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
656 n = neighbour_find (h, 619 n = neighbour_find (h, &okm->peer);
657 &okm->peer);
658 if (NULL == n) 620 if (NULL == n)
659 { 621 {
660 /* We should never get a 'SEND_OK' for a peer that we are not 622 /* We should never get a 'SEND_OK' for a peer that we are not
@@ -681,8 +643,7 @@ handle_send_ok (void *cls,
681 * @param im message received 643 * @param im message received
682 */ 644 */
683static int 645static int
684check_recv (void *cls, 646check_recv (void *cls, const struct InboundMessage *im)
685 const struct InboundMessage *im)
686{ 647{
687 const struct GNUNET_MessageHeader *imm; 648 const struct GNUNET_MessageHeader *imm;
688 uint16_t size; 649 uint16_t size;
@@ -710,12 +671,11 @@ check_recv (void *cls,
710 * @param im message received 671 * @param im message received
711 */ 672 */
712static void 673static void
713handle_recv (void *cls, 674handle_recv (void *cls, const struct InboundMessage *im)
714 const struct InboundMessage *im)
715{ 675{
716 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 676 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
717 const struct GNUNET_MessageHeader *imm 677 const struct GNUNET_MessageHeader *imm =
718 = (const struct GNUNET_MessageHeader *) &im[1]; 678 (const struct GNUNET_MessageHeader *) &im[1];
719 struct Neighbour *n; 679 struct Neighbour *n;
720 680
721 LOG (GNUNET_ERROR_TYPE_DEBUG, 681 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -730,8 +690,7 @@ handle_recv (void *cls,
730 disconnect_and_schedule_reconnect (h); 690 disconnect_and_schedule_reconnect (h);
731 return; 691 return;
732 } 692 }
733 GNUNET_MQ_inject_message (n->mq, 693 GNUNET_MQ_inject_message (n->mq, imm);
734 imm);
735} 694}
736 695
737 696
@@ -742,8 +701,7 @@ handle_recv (void *cls,
742 * @param msg message received 701 * @param msg message received
743 */ 702 */
744static void 703static void
745handle_set_quota (void *cls, 704handle_set_quota (void *cls, const struct QuotaSetMessage *qm)
746 const struct QuotaSetMessage *qm)
747{ 705{
748 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 706 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
749 struct Neighbour *n; 707 struct Neighbour *n;
@@ -752,16 +710,15 @@ handle_set_quota (void *cls,
752 "Receiving SET_QUOTA message for `%s' with quota %u\n", 710 "Receiving SET_QUOTA message for `%s' with quota %u\n",
753 GNUNET_i2s (&qm->peer), 711 GNUNET_i2s (&qm->peer),
754 ntohl (qm->quota.value__)); 712 ntohl (qm->quota.value__));
755 n = neighbour_find (h, 713 n = neighbour_find (h, &qm->peer);
756 &qm->peer);
757 if (NULL == n) 714 if (NULL == n)
758 { 715 {
759 GNUNET_break (0); /* FIXME: julius reports this assertion fails sometimes? */ 716 GNUNET_break (
717 0); /* FIXME: julius reports this assertion fails sometimes? */
760 disconnect_and_schedule_reconnect (h); 718 disconnect_and_schedule_reconnect (h);
761 return; 719 return;
762 } 720 }
763 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, 721 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota);
764 qm->quota);
765} 722}
766 723
767 724
@@ -774,50 +731,44 @@ static void
774reconnect (void *cls) 731reconnect (void *cls)
775{ 732{
776 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 733 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
777 struct GNUNET_MQ_MessageHandler handlers[] = { 734 struct GNUNET_MQ_MessageHandler handlers[] =
778 GNUNET_MQ_hd_var_size (hello, 735 {GNUNET_MQ_hd_var_size (hello,
779 GNUNET_MESSAGE_TYPE_HELLO, 736 GNUNET_MESSAGE_TYPE_HELLO,
780 struct GNUNET_MessageHeader, 737 struct GNUNET_MessageHeader,
781 h), 738 h),
782 GNUNET_MQ_hd_fixed_size (connect, 739 GNUNET_MQ_hd_fixed_size (connect,
783 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, 740 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
784 struct ConnectInfoMessage, 741 struct ConnectInfoMessage,
785 h), 742 h),
786 GNUNET_MQ_hd_fixed_size (disconnect, 743 GNUNET_MQ_hd_fixed_size (disconnect,
787 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, 744 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
788 struct DisconnectInfoMessage, 745 struct DisconnectInfoMessage,
789 h), 746 h),
790 GNUNET_MQ_hd_fixed_size (send_ok, 747 GNUNET_MQ_hd_fixed_size (send_ok,
791 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, 748 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
792 struct SendOkMessage, 749 struct SendOkMessage,
793 h), 750 h),
794 GNUNET_MQ_hd_var_size (recv, 751 GNUNET_MQ_hd_var_size (recv,
795 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, 752 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
796 struct InboundMessage, 753 struct InboundMessage,
797 h), 754 h),
798 GNUNET_MQ_hd_fixed_size (set_quota, 755 GNUNET_MQ_hd_fixed_size (set_quota,
799 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, 756 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
800 struct QuotaSetMessage, 757 struct QuotaSetMessage,
801 h), 758 h),
802 GNUNET_MQ_handler_end () 759 GNUNET_MQ_handler_end ()};
803 };
804 struct GNUNET_MQ_Envelope *env; 760 struct GNUNET_MQ_Envelope *env;
805 struct StartMessage *s; 761 struct StartMessage *s;
806 uint32_t options; 762 uint32_t options;
807 763
808 h->reconnect_task = NULL; 764 h->reconnect_task = NULL;
809 LOG (GNUNET_ERROR_TYPE_DEBUG, 765 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
810 "Connecting to transport service.\n");
811 GNUNET_assert (NULL == h->mq); 766 GNUNET_assert (NULL == h->mq);
812 h->mq = GNUNET_CLIENT_connect (h->cfg, 767 h->mq =
813 "transport", 768 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
814 handlers,
815 &mq_error_handler,
816 h);
817 if (NULL == h->mq) 769 if (NULL == h->mq)
818 return; 770 return;
819 env = GNUNET_MQ_msg (s, 771 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
820 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
821 options = 0; 772 options = 0;
822 if (h->check_self) 773 if (h->check_self)
823 options |= 1; 774 options |= 1;
@@ -825,8 +776,7 @@ reconnect (void *cls)
825 options |= 2; 776 options |= 2;
826 s->options = htonl (options); 777 s->options = htonl (options);
827 s->self = h->self; 778 s->self = h->self;
828 GNUNET_MQ_send (h->mq, 779 GNUNET_MQ_send (h->mq, env);
829 env);
830} 780}
831 781
832 782
@@ -841,9 +791,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
841{ 791{
842 GNUNET_assert (NULL == h->reconnect_task); 792 GNUNET_assert (NULL == h->reconnect_task);
843 /* Forget about all neighbours that we used to be connected to */ 793 /* Forget about all neighbours that we used to be connected to */
844 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, 794 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
845 &neighbour_delete,
846 h);
847 if (NULL != h->mq) 795 if (NULL != h->mq)
848 { 796 {
849 GNUNET_MQ_destroy (h->mq); 797 GNUNET_MQ_destroy (h->mq);
@@ -851,12 +799,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
851 } 799 }
852 LOG (GNUNET_ERROR_TYPE_DEBUG, 800 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "Scheduling task to reconnect to transport service in %s.\n", 801 "Scheduling task to reconnect to transport service in %s.\n",
854 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 802 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
855 GNUNET_YES));
856 h->reconnect_task = 803 h->reconnect_task =
857 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 804 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
858 &reconnect,
859 h);
860 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 805 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
861} 806}
862 807
@@ -874,8 +819,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
874{ 819{
875 struct Neighbour *n; 820 struct Neighbour *n;
876 821
877 n = neighbour_find (handle, 822 n = neighbour_find (handle, peer);
878 peer);
879 if (NULL == n) 823 if (NULL == n)
880 return NULL; 824 return NULL;
881 return n->mq; 825 return n->mq;
@@ -898,12 +842,12 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
898 */ 842 */
899struct GNUNET_TRANSPORT_CoreHandle * 843struct GNUNET_TRANSPORT_CoreHandle *
900GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 844GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
901 const struct GNUNET_PeerIdentity *self, 845 const struct GNUNET_PeerIdentity *self,
902 const struct GNUNET_MQ_MessageHandler *handlers, 846 const struct GNUNET_MQ_MessageHandler *handlers,
903 void *cls, 847 void *cls,
904 GNUNET_TRANSPORT_NotifyConnect nc, 848 GNUNET_TRANSPORT_NotifyConnect nc,
905 GNUNET_TRANSPORT_NotifyDisconnect nd, 849 GNUNET_TRANSPORT_NotifyDisconnect nd,
906 GNUNET_TRANSPORT_NotifyExcessBandwidth neb) 850 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
907{ 851{
908 struct GNUNET_TRANSPORT_CoreHandle *h; 852 struct GNUNET_TRANSPORT_CoreHandle *h;
909 unsigned int i; 853 unsigned int i;
@@ -922,15 +866,14 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
922 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 866 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
923 if (NULL != handlers) 867 if (NULL != handlers)
924 { 868 {
925 for (i=0;NULL != handlers[i].cb; i++) ; 869 for (i = 0; NULL != handlers[i].cb; i++)
926 h->handlers = GNUNET_new_array (i + 1, 870 ;
927 struct GNUNET_MQ_MessageHandler); 871 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
928 GNUNET_memcpy (h->handlers, 872 GNUNET_memcpy (h->handlers,
929 handlers, 873 handlers,
930 i * sizeof (struct GNUNET_MQ_MessageHandler)); 874 i * sizeof (struct GNUNET_MQ_MessageHandler));
931 } 875 }
932 LOG (GNUNET_ERROR_TYPE_DEBUG, 876 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
933 "Connecting to transport service\n");
934 reconnect (h); 877 reconnect (h);
935 if (NULL == h->mq) 878 if (NULL == h->mq)
936 { 879 {
@@ -939,8 +882,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
939 return NULL; 882 return NULL;
940 } 883 }
941 h->neighbours = 884 h->neighbours =
942 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, 885 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
943 GNUNET_YES);
944 return h; 886 return h;
945} 887}
946 888
@@ -948,13 +890,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
948/** 890/**
949 * Disconnect from the transport service. 891 * Disconnect from the transport service.
950 * 892 *
951 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() 893 * @param handle handle to the service as returned from
894 * #GNUNET_TRANSPORT_core_connect()
952 */ 895 */
953void 896void
954GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) 897GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
955{ 898{
956 LOG (GNUNET_ERROR_TYPE_DEBUG, 899 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
957 "Transport disconnect called!\n");
958 /* this disconnects all neighbours... */ 900 /* this disconnects all neighbours... */
959 if (NULL == handle->reconnect_task) 901 if (NULL == handle->reconnect_task)
960 disconnect_and_schedule_reconnect (handle); 902 disconnect_and_schedule_reconnect (handle);