diff options
author | Christian Grothoff <christian@grothoff.org> | 2013-03-14 16:03:30 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2013-03-14 16:03:30 +0000 |
commit | 20b65432c8df1b2f84fcc0dac3b2f5d689f5c888 (patch) | |
tree | 6183cf957cffc5e9b415d35d119f8faa897513f4 /src/dv | |
parent | 38144d2375191205912027c97a5af6c7bbe87bfe (diff) | |
download | gnunet-20b65432c8df1b2f84fcc0dac3b2f5d689f5c888.tar.gz gnunet-20b65432c8df1b2f84fcc0dac3b2f5d689f5c888.zip |
-generate and process ACKs
Diffstat (limited to 'src/dv')
-rw-r--r-- | src/dv/dv.h | 2 | ||||
-rw-r--r-- | src/dv/dv_api.c | 91 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 34 |
3 files changed, 109 insertions, 18 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h index ea5215a10..bcc586a0e 100644 --- a/src/dv/dv.h +++ b/src/dv/dv.h | |||
@@ -121,7 +121,7 @@ struct GNUNET_DV_SendMessage | |||
121 | struct GNUNET_MessageHeader header; | 121 | struct GNUNET_MessageHeader header; |
122 | 122 | ||
123 | /** | 123 | /** |
124 | * Unique ID for this message, for confirm callback. | 124 | * Unique ID for this message, for confirm callback, must never be zero. |
125 | */ | 125 | */ |
126 | uint32_t uid GNUNET_PACKED; | 126 | uint32_t uid GNUNET_PACKED; |
127 | 127 | ||
diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index 4b6cc1a5c..fff0896b3 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c | |||
@@ -74,6 +74,11 @@ struct GNUNET_DV_TransmitHandle | |||
74 | */ | 74 | */ |
75 | struct GNUNET_PeerIdentity target; | 75 | struct GNUNET_PeerIdentity target; |
76 | 76 | ||
77 | /** | ||
78 | * UID of our message, if any. | ||
79 | */ | ||
80 | uint32_t uid; | ||
81 | |||
77 | }; | 82 | }; |
78 | 83 | ||
79 | 84 | ||
@@ -184,10 +189,17 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
184 | th); | 189 | th); |
185 | memcpy (&cbuf[ret], th->msg, tsize); | 190 | memcpy (&cbuf[ret], th->msg, tsize); |
186 | ret += tsize; | 191 | ret += tsize; |
187 | (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks, | 192 | if (NULL != th->cb) |
188 | &th->target.hashPubKey, | 193 | { |
189 | th, | 194 | (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks, |
190 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 195 | &th->target.hashPubKey, |
196 | th, | ||
197 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
198 | } | ||
199 | else | ||
200 | { | ||
201 | GNUNET_free (th); | ||
202 | } | ||
191 | } | 203 | } |
192 | return ret; | 204 | return ret; |
193 | } | 205 | } |
@@ -215,6 +227,54 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh) | |||
215 | 227 | ||
216 | 228 | ||
217 | /** | 229 | /** |
230 | * Closure for 'process_ack'. | ||
231 | */ | ||
232 | struct AckContext | ||
233 | { | ||
234 | /** | ||
235 | * The ACK message. | ||
236 | */ | ||
237 | const struct GNUNET_DV_AckMessage *ack; | ||
238 | |||
239 | /** | ||
240 | * Our service handle. | ||
241 | */ | ||
242 | struct GNUNET_DV_ServiceHandle *sh; | ||
243 | }; | ||
244 | |||
245 | |||
246 | /** | ||
247 | * We got an ACK. Check if it matches the given transmit handle, and if | ||
248 | * so call the continuation. | ||
249 | * | ||
250 | * @param cls the 'struct AckContext' | ||
251 | * @param key peer identity | ||
252 | * @param value the 'struct GNUNET_DV_TransmitHandle' | ||
253 | * @return GNUNET_OK if the ACK did not match (continue to iterate) | ||
254 | */ | ||
255 | static int | ||
256 | process_ack (void *cls, | ||
257 | const struct GNUNET_HashCode *key, | ||
258 | void *value) | ||
259 | { | ||
260 | struct AckContext *ctx = cls; | ||
261 | struct GNUNET_DV_TransmitHandle *th = value; | ||
262 | |||
263 | if (th->uid != ntohl (ctx->ack->uid)) | ||
264 | return GNUNET_OK; | ||
265 | GNUNET_assert (GNUNET_YES == | ||
266 | GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks, | ||
267 | key, | ||
268 | th)); | ||
269 | /* FIXME: should distinguish between success and failure here... */ | ||
270 | th->cb (th->cb_cls, | ||
271 | GNUNET_OK); | ||
272 | GNUNET_free (th); | ||
273 | return GNUNET_NO; | ||
274 | } | ||
275 | |||
276 | |||
277 | /** | ||
218 | * Handles a message sent from the DV service to us. | 278 | * Handles a message sent from the DV service to us. |
219 | * Parse it out and give it to the plugin. | 279 | * Parse it out and give it to the plugin. |
220 | * | 280 | * |
@@ -230,7 +290,9 @@ handle_message_receipt (void *cls, | |||
230 | const struct GNUNET_DV_DisconnectMessage *dm; | 290 | const struct GNUNET_DV_DisconnectMessage *dm; |
231 | const struct GNUNET_DV_ReceivedMessage *rm; | 291 | const struct GNUNET_DV_ReceivedMessage *rm; |
232 | const struct GNUNET_MessageHeader *payload; | 292 | const struct GNUNET_MessageHeader *payload; |
233 | 293 | const struct GNUNET_DV_AckMessage *ack; | |
294 | struct AckContext ctx; | ||
295 | |||
234 | if (NULL == msg) | 296 | if (NULL == msg) |
235 | { | 297 | { |
236 | /* Connection closed */ | 298 | /* Connection closed */ |
@@ -282,6 +344,21 @@ handle_message_receipt (void *cls, | |||
282 | ntohl (rm->distance), | 344 | ntohl (rm->distance), |
283 | payload); | 345 | payload); |
284 | break; | 346 | break; |
347 | case GNUNET_MESSAGE_TYPE_DV_SEND_ACK: | ||
348 | if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage)) | ||
349 | { | ||
350 | GNUNET_break (0); | ||
351 | reconnect (sh); | ||
352 | return; | ||
353 | } | ||
354 | ack = (const struct GNUNET_DV_AckMessage *) msg; | ||
355 | ctx.ack = ack; | ||
356 | ctx.sh = sh; | ||
357 | GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks, | ||
358 | &ack->target.hashPubKey, | ||
359 | &process_ack, | ||
360 | &ctx); | ||
361 | return; | ||
285 | default: | 362 | default: |
286 | reconnect (sh); | 363 | reconnect (sh); |
287 | break; | 364 | break; |
@@ -495,6 +572,10 @@ GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, | |||
495 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); | 572 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); |
496 | sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + | 573 | sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + |
497 | ntohs (msg->size)); | 574 | ntohs (msg->size)); |
575 | if (0 == sh->uid_gen) | ||
576 | sh->uid_gen = 1; | ||
577 | th->uid = sh->uid_gen; | ||
578 | sm->uid = htonl (sh->uid_gen++); | ||
498 | /* use memcpy here as 'target' may not be sufficiently aligned */ | 579 | /* use memcpy here as 'target' may not be sufficiently aligned */ |
499 | memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); | 580 | memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); |
500 | memcpy (&sm[1], msg, ntohs (msg->size)); | 581 | memcpy (&sm[1], msg, ntohs (msg->size)); |
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index c42d9344e..e1709be3d 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c | |||
@@ -28,9 +28,6 @@ | |||
28 | * @author Nathan Evans | 28 | * @author Nathan Evans |
29 | * | 29 | * |
30 | * TODO: | 30 | * TODO: |
31 | * - even _local_ flow control (send ACK only after core took our message) is | ||
32 | * not implemented, but should be (easy fix, but needs adjustments to data | ||
33 | * structures) | ||
34 | * - distance updates are not properly communicate to US by core, | 31 | * - distance updates are not properly communicate to US by core, |
35 | * and conversely we don't give distance updates properly to the plugin yet | 32 | * and conversely we don't give distance updates properly to the plugin yet |
36 | * - we send 'ACK' even if a message was dropped due to no route (may | 33 | * - we send 'ACK' even if a message was dropped due to no route (may |
@@ -420,12 +417,12 @@ transmit_to_plugin (void *cls, size_t size, void *buf) | |||
420 | * Forward a message from another peer to the plugin. | 417 | * Forward a message from another peer to the plugin. |
421 | * | 418 | * |
422 | * @param message the message to send to the plugin | 419 | * @param message the message to send to the plugin |
423 | * @param distant_neighbor the original sender of the message | 420 | * @param origin the original sender of the message |
424 | * @param distnace distance to the original sender of the message | 421 | * @param distnace distance to the original sender of the message |
425 | */ | 422 | */ |
426 | static void | 423 | static void |
427 | send_data_to_plugin (const struct GNUNET_MessageHeader *message, | 424 | send_data_to_plugin (const struct GNUNET_MessageHeader *message, |
428 | const struct GNUNET_PeerIdentity *distant_neighbor, | 425 | const struct GNUNET_PeerIdentity *origin, |
429 | uint32_t distance) | 426 | uint32_t distance) |
430 | { | 427 | { |
431 | struct GNUNET_DV_ReceivedMessage *received_msg; | 428 | struct GNUNET_DV_ReceivedMessage *received_msg; |
@@ -443,7 +440,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, | |||
443 | } | 440 | } |
444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 441 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
445 | "Delivering message from peer `%s'\n", | 442 | "Delivering message from peer `%s'\n", |
446 | GNUNET_i2s (distant_neighbor)); | 443 | GNUNET_i2s (origin)); |
447 | size = sizeof (struct GNUNET_DV_ReceivedMessage) + | 444 | size = sizeof (struct GNUNET_DV_ReceivedMessage) + |
448 | ntohs (message->size); | 445 | ntohs (message->size); |
449 | if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 446 | if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
@@ -456,7 +453,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, | |||
456 | received_msg->header.size = htons (size); | 453 | received_msg->header.size = htons (size); |
457 | received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV); | 454 | received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV); |
458 | received_msg->distance = htonl (distance); | 455 | received_msg->distance = htonl (distance); |
459 | received_msg->sender = *distant_neighbor; | 456 | received_msg->sender = *origin; |
460 | memcpy (&received_msg[1], message, ntohs (message->size)); | 457 | memcpy (&received_msg[1], message, ntohs (message->size)); |
461 | GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, | 458 | GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, |
462 | plugin_pending_tail, | 459 | plugin_pending_tail, |
@@ -611,8 +608,9 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
611 | dn->pm_tail, | 608 | dn->pm_tail, |
612 | pending); | 609 | pending); |
613 | memcpy (&cbuf[off], pending->msg, msize); | 610 | memcpy (&cbuf[off], pending->msg, msize); |
614 | send_ack_to_plugin (&pending->ultimate_target, | 611 | if (0 != pending->uid) |
615 | pending->uid); | 612 | send_ack_to_plugin (&pending->ultimate_target, |
613 | pending->uid); | ||
616 | GNUNET_free (pending); | 614 | GNUNET_free (pending); |
617 | off += msize; | 615 | off += msize; |
618 | } | 616 | } |
@@ -633,6 +631,8 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
633 | * Forward the given payload to the given target. | 631 | * Forward the given payload to the given target. |
634 | * | 632 | * |
635 | * @param target where to send the message | 633 | * @param target where to send the message |
634 | * @param uid unique ID for the message | ||
635 | * @param ultimate_target ultimate recipient for the message | ||
636 | * @param distance expected (remaining) distance to the target | 636 | * @param distance expected (remaining) distance to the target |
637 | * @param sender original sender of the message | 637 | * @param sender original sender of the message |
638 | * @param payload payload of the message | 638 | * @param payload payload of the message |
@@ -640,7 +640,9 @@ core_transmit_notify (void *cls, size_t size, void *buf) | |||
640 | static void | 640 | static void |
641 | forward_payload (struct DirectNeighbor *target, | 641 | forward_payload (struct DirectNeighbor *target, |
642 | uint32_t distance, | 642 | uint32_t distance, |
643 | uint32_t uid, | ||
643 | const struct GNUNET_PeerIdentity *sender, | 644 | const struct GNUNET_PeerIdentity *sender, |
645 | const struct GNUNET_PeerIdentity *ultimate_target, | ||
644 | const struct GNUNET_MessageHeader *payload) | 646 | const struct GNUNET_MessageHeader *payload) |
645 | { | 647 | { |
646 | struct PendingMessage *pm; | 648 | struct PendingMessage *pm; |
@@ -651,7 +653,10 @@ forward_payload (struct DirectNeighbor *target, | |||
651 | (0 != memcmp (sender, | 653 | (0 != memcmp (sender, |
652 | &my_identity, | 654 | &my_identity, |
653 | sizeof (struct GNUNET_PeerIdentity))) ) | 655 | sizeof (struct GNUNET_PeerIdentity))) ) |
656 | { | ||
657 | GNUNET_break (0 == uid); | ||
654 | return; | 658 | return; |
659 | } | ||
655 | msize = sizeof (struct RouteMessage) + ntohs (payload->size); | 660 | msize = sizeof (struct RouteMessage) + ntohs (payload->size); |
656 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 661 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
657 | { | 662 | { |
@@ -659,6 +664,8 @@ forward_payload (struct DirectNeighbor *target, | |||
659 | return; | 664 | return; |
660 | } | 665 | } |
661 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 666 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); |
667 | pm->ultimate_target = *ultimate_target; | ||
668 | pm->uid = uid; | ||
662 | pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; | 669 | pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; |
663 | rm = (struct RouteMessage *) &pm[1]; | 670 | rm = (struct RouteMessage *) &pm[1]; |
664 | rm->header.size = htons ((uint16_t) msize); | 671 | rm->header.size = htons ((uint16_t) msize); |
@@ -1271,6 +1278,8 @@ handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer, | |||
1271 | } | 1278 | } |
1272 | forward_payload (route->next_hop, | 1279 | forward_payload (route->next_hop, |
1273 | ntohl (route->target.distance), | 1280 | ntohl (route->target.distance), |
1281 | 0, | ||
1282 | &rm->target, | ||
1274 | &rm->sender, | 1283 | &rm->sender, |
1275 | payload); | 1284 | payload); |
1276 | return GNUNET_OK; | 1285 | return GNUNET_OK; |
@@ -1300,6 +1309,7 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1300 | return; | 1309 | return; |
1301 | } | 1310 | } |
1302 | msg = (const struct GNUNET_DV_SendMessage *) message; | 1311 | msg = (const struct GNUNET_DV_SendMessage *) message; |
1312 | GNUNET_break (0 != ntohl (msg->uid)); | ||
1303 | payload = (const struct GNUNET_MessageHeader *) &msg[1]; | 1313 | payload = (const struct GNUNET_MessageHeader *) &msg[1]; |
1304 | if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) | 1314 | if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) |
1305 | { | 1315 | { |
@@ -1316,14 +1326,14 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1316 | GNUNET_STATISTICS_update (stats, | 1326 | GNUNET_STATISTICS_update (stats, |
1317 | "# local messages discarded (no route)", | 1327 | "# local messages discarded (no route)", |
1318 | 1, GNUNET_NO); | 1328 | 1, GNUNET_NO); |
1319 | send_ack_to_plugin (&msg->target, htonl (msg->uid)); | 1329 | send_ack_to_plugin (&msg->target, ntohl (msg->uid)); |
1320 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1330 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1321 | return; | 1331 | return; |
1322 | } | 1332 | } |
1323 | // FIXME: flow control (send ACK only once message has left the queue...) | ||
1324 | send_ack_to_plugin (&msg->target, htonl (msg->uid)); | ||
1325 | forward_payload (route->next_hop, | 1333 | forward_payload (route->next_hop, |
1326 | ntohl (route->target.distance), | 1334 | ntohl (route->target.distance), |
1335 | htonl (msg->uid), | ||
1336 | &msg->target, | ||
1327 | &my_identity, | 1337 | &my_identity, |
1328 | payload); | 1338 | payload); |
1329 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1339 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |