aboutsummaryrefslogtreecommitdiff
path: root/src/dv
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2013-03-14 16:03:30 +0000
committerChristian Grothoff <christian@grothoff.org>2013-03-14 16:03:30 +0000
commit20b65432c8df1b2f84fcc0dac3b2f5d689f5c888 (patch)
tree6183cf957cffc5e9b415d35d119f8faa897513f4 /src/dv
parent38144d2375191205912027c97a5af6c7bbe87bfe (diff)
downloadgnunet-20b65432c8df1b2f84fcc0dac3b2f5d689f5c888.tar.gz
gnunet-20b65432c8df1b2f84fcc0dac3b2f5d689f5c888.zip
-generate and process ACKs
Diffstat (limited to 'src/dv')
-rw-r--r--src/dv/dv.h2
-rw-r--r--src/dv/dv_api.c91
-rw-r--r--src/dv/gnunet-service-dv.c34
3 files changed, 109 insertions, 18 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h
index ea5215a10..bcc586a0e 100644
--- a/src/dv/dv.h
+++ b/src/dv/dv.h
@@ -121,7 +121,7 @@ struct GNUNET_DV_SendMessage
121 struct GNUNET_MessageHeader header; 121 struct GNUNET_MessageHeader header;
122 122
123 /** 123 /**
124 * Unique ID for this message, for confirm callback. 124 * Unique ID for this message, for confirm callback, must never be zero.
125 */ 125 */
126 uint32_t uid GNUNET_PACKED; 126 uint32_t uid GNUNET_PACKED;
127 127
diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c
index 4b6cc1a5c..fff0896b3 100644
--- a/src/dv/dv_api.c
+++ b/src/dv/dv_api.c
@@ -74,6 +74,11 @@ struct GNUNET_DV_TransmitHandle
74 */ 74 */
75 struct GNUNET_PeerIdentity target; 75 struct GNUNET_PeerIdentity target;
76 76
77 /**
78 * UID of our message, if any.
79 */
80 uint32_t uid;
81
77}; 82};
78 83
79 84
@@ -184,10 +189,17 @@ transmit_pending (void *cls, size_t size, void *buf)
184 th); 189 th);
185 memcpy (&cbuf[ret], th->msg, tsize); 190 memcpy (&cbuf[ret], th->msg, tsize);
186 ret += tsize; 191 ret += tsize;
187 (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks, 192 if (NULL != th->cb)
188 &th->target.hashPubKey, 193 {
189 th, 194 (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
190 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 195 &th->target.hashPubKey,
196 th,
197 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
198 }
199 else
200 {
201 GNUNET_free (th);
202 }
191 } 203 }
192 return ret; 204 return ret;
193} 205}
@@ -215,6 +227,54 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh)
215 227
216 228
217/** 229/**
230 * Closure for 'process_ack'.
231 */
232struct AckContext
233{
234 /**
235 * The ACK message.
236 */
237 const struct GNUNET_DV_AckMessage *ack;
238
239 /**
240 * Our service handle.
241 */
242 struct GNUNET_DV_ServiceHandle *sh;
243};
244
245
246/**
247 * We got an ACK. Check if it matches the given transmit handle, and if
248 * so call the continuation.
249 *
250 * @param cls the 'struct AckContext'
251 * @param key peer identity
252 * @param value the 'struct GNUNET_DV_TransmitHandle'
253 * @return GNUNET_OK if the ACK did not match (continue to iterate)
254 */
255static int
256process_ack (void *cls,
257 const struct GNUNET_HashCode *key,
258 void *value)
259{
260 struct AckContext *ctx = cls;
261 struct GNUNET_DV_TransmitHandle *th = value;
262
263 if (th->uid != ntohl (ctx->ack->uid))
264 return GNUNET_OK;
265 GNUNET_assert (GNUNET_YES ==
266 GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks,
267 key,
268 th));
269 /* FIXME: should distinguish between success and failure here... */
270 th->cb (th->cb_cls,
271 GNUNET_OK);
272 GNUNET_free (th);
273 return GNUNET_NO;
274}
275
276
277/**
218 * Handles a message sent from the DV service to us. 278 * Handles a message sent from the DV service to us.
219 * Parse it out and give it to the plugin. 279 * Parse it out and give it to the plugin.
220 * 280 *
@@ -230,7 +290,9 @@ handle_message_receipt (void *cls,
230 const struct GNUNET_DV_DisconnectMessage *dm; 290 const struct GNUNET_DV_DisconnectMessage *dm;
231 const struct GNUNET_DV_ReceivedMessage *rm; 291 const struct GNUNET_DV_ReceivedMessage *rm;
232 const struct GNUNET_MessageHeader *payload; 292 const struct GNUNET_MessageHeader *payload;
233 293 const struct GNUNET_DV_AckMessage *ack;
294 struct AckContext ctx;
295
234 if (NULL == msg) 296 if (NULL == msg)
235 { 297 {
236 /* Connection closed */ 298 /* Connection closed */
@@ -282,6 +344,21 @@ handle_message_receipt (void *cls,
282 ntohl (rm->distance), 344 ntohl (rm->distance),
283 payload); 345 payload);
284 break; 346 break;
347 case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
348 if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
349 {
350 GNUNET_break (0);
351 reconnect (sh);
352 return;
353 }
354 ack = (const struct GNUNET_DV_AckMessage *) msg;
355 ctx.ack = ack;
356 ctx.sh = sh;
357 GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks,
358 &ack->target.hashPubKey,
359 &process_ack,
360 &ctx);
361 return;
285 default: 362 default:
286 reconnect (sh); 363 reconnect (sh);
287 break; 364 break;
@@ -495,6 +572,10 @@ GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
495 sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); 572 sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
496 sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + 573 sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
497 ntohs (msg->size)); 574 ntohs (msg->size));
575 if (0 == sh->uid_gen)
576 sh->uid_gen = 1;
577 th->uid = sh->uid_gen;
578 sm->uid = htonl (sh->uid_gen++);
498 /* use memcpy here as 'target' may not be sufficiently aligned */ 579 /* use memcpy here as 'target' may not be sufficiently aligned */
499 memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); 580 memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
500 memcpy (&sm[1], msg, ntohs (msg->size)); 581 memcpy (&sm[1], msg, ntohs (msg->size));
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index c42d9344e..e1709be3d 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -28,9 +28,6 @@
28 * @author Nathan Evans 28 * @author Nathan Evans
29 * 29 *
30 * TODO: 30 * TODO:
31 * - even _local_ flow control (send ACK only after core took our message) is
32 * not implemented, but should be (easy fix, but needs adjustments to data
33 * structures)
34 * - distance updates are not properly communicate to US by core, 31 * - distance updates are not properly communicate to US by core,
35 * and conversely we don't give distance updates properly to the plugin yet 32 * and conversely we don't give distance updates properly to the plugin yet
36 * - we send 'ACK' even if a message was dropped due to no route (may 33 * - we send 'ACK' even if a message was dropped due to no route (may
@@ -420,12 +417,12 @@ transmit_to_plugin (void *cls, size_t size, void *buf)
420 * Forward a message from another peer to the plugin. 417 * Forward a message from another peer to the plugin.
421 * 418 *
422 * @param message the message to send to the plugin 419 * @param message the message to send to the plugin
423 * @param distant_neighbor the original sender of the message 420 * @param origin the original sender of the message
424 * @param distnace distance to the original sender of the message 421 * @param distnace distance to the original sender of the message
425 */ 422 */
426static void 423static void
427send_data_to_plugin (const struct GNUNET_MessageHeader *message, 424send_data_to_plugin (const struct GNUNET_MessageHeader *message,
428 const struct GNUNET_PeerIdentity *distant_neighbor, 425 const struct GNUNET_PeerIdentity *origin,
429 uint32_t distance) 426 uint32_t distance)
430{ 427{
431 struct GNUNET_DV_ReceivedMessage *received_msg; 428 struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -443,7 +440,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message,
443 } 440 }
444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445 "Delivering message from peer `%s'\n", 442 "Delivering message from peer `%s'\n",
446 GNUNET_i2s (distant_neighbor)); 443 GNUNET_i2s (origin));
447 size = sizeof (struct GNUNET_DV_ReceivedMessage) + 444 size = sizeof (struct GNUNET_DV_ReceivedMessage) +
448 ntohs (message->size); 445 ntohs (message->size);
449 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 446 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -456,7 +453,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message,
456 received_msg->header.size = htons (size); 453 received_msg->header.size = htons (size);
457 received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV); 454 received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
458 received_msg->distance = htonl (distance); 455 received_msg->distance = htonl (distance);
459 received_msg->sender = *distant_neighbor; 456 received_msg->sender = *origin;
460 memcpy (&received_msg[1], message, ntohs (message->size)); 457 memcpy (&received_msg[1], message, ntohs (message->size));
461 GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, 458 GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head,
462 plugin_pending_tail, 459 plugin_pending_tail,
@@ -611,8 +608,9 @@ core_transmit_notify (void *cls, size_t size, void *buf)
611 dn->pm_tail, 608 dn->pm_tail,
612 pending); 609 pending);
613 memcpy (&cbuf[off], pending->msg, msize); 610 memcpy (&cbuf[off], pending->msg, msize);
614 send_ack_to_plugin (&pending->ultimate_target, 611 if (0 != pending->uid)
615 pending->uid); 612 send_ack_to_plugin (&pending->ultimate_target,
613 pending->uid);
616 GNUNET_free (pending); 614 GNUNET_free (pending);
617 off += msize; 615 off += msize;
618 } 616 }
@@ -633,6 +631,8 @@ core_transmit_notify (void *cls, size_t size, void *buf)
633 * Forward the given payload to the given target. 631 * Forward the given payload to the given target.
634 * 632 *
635 * @param target where to send the message 633 * @param target where to send the message
634 * @param uid unique ID for the message
635 * @param ultimate_target ultimate recipient for the message
636 * @param distance expected (remaining) distance to the target 636 * @param distance expected (remaining) distance to the target
637 * @param sender original sender of the message 637 * @param sender original sender of the message
638 * @param payload payload of the message 638 * @param payload payload of the message
@@ -640,7 +640,9 @@ core_transmit_notify (void *cls, size_t size, void *buf)
640static void 640static void
641forward_payload (struct DirectNeighbor *target, 641forward_payload (struct DirectNeighbor *target,
642 uint32_t distance, 642 uint32_t distance,
643 uint32_t uid,
643 const struct GNUNET_PeerIdentity *sender, 644 const struct GNUNET_PeerIdentity *sender,
645 const struct GNUNET_PeerIdentity *ultimate_target,
644 const struct GNUNET_MessageHeader *payload) 646 const struct GNUNET_MessageHeader *payload)
645{ 647{
646 struct PendingMessage *pm; 648 struct PendingMessage *pm;
@@ -651,7 +653,10 @@ forward_payload (struct DirectNeighbor *target,
651 (0 != memcmp (sender, 653 (0 != memcmp (sender,
652 &my_identity, 654 &my_identity,
653 sizeof (struct GNUNET_PeerIdentity))) ) 655 sizeof (struct GNUNET_PeerIdentity))) )
656 {
657 GNUNET_break (0 == uid);
654 return; 658 return;
659 }
655 msize = sizeof (struct RouteMessage) + ntohs (payload->size); 660 msize = sizeof (struct RouteMessage) + ntohs (payload->size);
656 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 661 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
657 { 662 {
@@ -659,6 +664,8 @@ forward_payload (struct DirectNeighbor *target,
659 return; 664 return;
660 } 665 }
661 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 666 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
667 pm->ultimate_target = *ultimate_target;
668 pm->uid = uid;
662 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; 669 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
663 rm = (struct RouteMessage *) &pm[1]; 670 rm = (struct RouteMessage *) &pm[1];
664 rm->header.size = htons ((uint16_t) msize); 671 rm->header.size = htons ((uint16_t) msize);
@@ -1271,6 +1278,8 @@ handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
1271 } 1278 }
1272 forward_payload (route->next_hop, 1279 forward_payload (route->next_hop,
1273 ntohl (route->target.distance), 1280 ntohl (route->target.distance),
1281 0,
1282 &rm->target,
1274 &rm->sender, 1283 &rm->sender,
1275 payload); 1284 payload);
1276 return GNUNET_OK; 1285 return GNUNET_OK;
@@ -1300,6 +1309,7 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
1300 return; 1309 return;
1301 } 1310 }
1302 msg = (const struct GNUNET_DV_SendMessage *) message; 1311 msg = (const struct GNUNET_DV_SendMessage *) message;
1312 GNUNET_break (0 != ntohl (msg->uid));
1303 payload = (const struct GNUNET_MessageHeader *) &msg[1]; 1313 payload = (const struct GNUNET_MessageHeader *) &msg[1];
1304 if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) 1314 if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
1305 { 1315 {
@@ -1316,14 +1326,14 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
1316 GNUNET_STATISTICS_update (stats, 1326 GNUNET_STATISTICS_update (stats,
1317 "# local messages discarded (no route)", 1327 "# local messages discarded (no route)",
1318 1, GNUNET_NO); 1328 1, GNUNET_NO);
1319 send_ack_to_plugin (&msg->target, htonl (msg->uid)); 1329 send_ack_to_plugin (&msg->target, ntohl (msg->uid));
1320 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1330 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1321 return; 1331 return;
1322 } 1332 }
1323 // FIXME: flow control (send ACK only once message has left the queue...)
1324 send_ack_to_plugin (&msg->target, htonl (msg->uid));
1325 forward_payload (route->next_hop, 1333 forward_payload (route->next_hop,
1326 ntohl (route->target.distance), 1334 ntohl (route->target.distance),
1335 htonl (msg->uid),
1336 &msg->target,
1327 &my_identity, 1337 &my_identity,
1328 payload); 1338 payload);
1329 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1339 GNUNET_SERVER_receive_done (client, GNUNET_OK);