diff options
author | Nathan S. Evans <evans@in.tum.de> | 2010-06-25 09:49:00 +0000 |
---|---|---|
committer | Nathan S. Evans <evans@in.tum.de> | 2010-06-25 09:49:00 +0000 |
commit | cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220 (patch) | |
tree | 6cdda0a8084a085505a82d029a4b962535a7e5ce /src/dv | |
parent | 7f4dddcdc1004b8699c55fc23f15249a6f9e40b2 (diff) | |
download | gnunet-cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220.tar.gz gnunet-cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220.zip |
trying message queues on receiver side
Diffstat (limited to 'src/dv')
-rw-r--r-- | src/dv/gnunet-service-dv.c | 125 |
1 files changed, 79 insertions, 46 deletions
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 00de78aae..fdff7e666 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c | |||
@@ -243,6 +243,38 @@ struct NeighborUpdateInfo | |||
243 | }; | 243 | }; |
244 | 244 | ||
245 | /** | 245 | /** |
246 | * Struct to store a single message received with | ||
247 | * an unknown sender. | ||
248 | */ | ||
249 | struct UnknownSenderMessage | ||
250 | { | ||
251 | /** | ||
252 | * Message sender (immediate) | ||
253 | */ | ||
254 | struct GNUNET_PeerIdentity sender; | ||
255 | |||
256 | /** | ||
257 | * The actual message received | ||
258 | */ | ||
259 | struct GNUNET_MessageHeader *message; | ||
260 | |||
261 | /** | ||
262 | * Latency of connection | ||
263 | */ | ||
264 | struct GNUNET_TIME_Relative latency; | ||
265 | |||
266 | /** | ||
267 | * Distance to destination | ||
268 | */ | ||
269 | uint32_t distance; | ||
270 | |||
271 | /** | ||
272 | * Unknown sender id | ||
273 | */ | ||
274 | uint32_t sender_id; | ||
275 | }; | ||
276 | |||
277 | /** | ||
246 | * Struct where actual neighbor information is stored, | 278 | * Struct where actual neighbor information is stored, |
247 | * referenced by min_heap and max_heap. Freeing dealt | 279 | * referenced by min_heap and max_heap. Freeing dealt |
248 | * with when items removed from hashmap. | 280 | * with when items removed from hashmap. |
@@ -279,6 +311,13 @@ struct DirectNeighbor | |||
279 | * from DV? | 311 | * from DV? |
280 | */ | 312 | */ |
281 | int hidden; | 313 | int hidden; |
314 | |||
315 | /** | ||
316 | * Save a single message from a direct neighbor from a peer | ||
317 | * we don't know on the chance that it will be gossiped about | ||
318 | * and we can deliver the message. | ||
319 | */ | ||
320 | struct UnknownSenderMessage pending_message; | ||
282 | }; | 321 | }; |
283 | 322 | ||
284 | 323 | ||
@@ -1226,6 +1265,7 @@ void tokenized_message_handler (void *cls, | |||
1226 | } | 1265 | } |
1227 | } | 1266 | } |
1228 | 1267 | ||
1268 | #if DELAY_FORWARDS | ||
1229 | struct DelayedMessageContext | 1269 | struct DelayedMessageContext |
1230 | { | 1270 | { |
1231 | struct GNUNET_PeerIdentity dest; | 1271 | struct GNUNET_PeerIdentity dest; |
@@ -1249,10 +1289,12 @@ void send_message_delayed (void *cls, | |||
1249 | default_dv_priority, | 1289 | default_dv_priority, |
1250 | msg_ctx->uid, | 1290 | msg_ctx->uid, |
1251 | GNUNET_TIME_relative_get_forever()); | 1291 | GNUNET_TIME_relative_get_forever()); |
1292 | GNUNET_free(msg_ctx->message); | ||
1293 | GNUNET_free(msg_ctx); | ||
1252 | } | 1294 | } |
1253 | GNUNET_free(msg_ctx->message); | ||
1254 | GNUNET_free(msg_ctx); | ||
1255 | } | 1295 | } |
1296 | #endif | ||
1297 | |||
1256 | 1298 | ||
1257 | /** | 1299 | /** |
1258 | * Core handler for dv data messages. Whatever this message | 1300 | * Core handler for dv data messages. Whatever this message |
@@ -1267,10 +1309,10 @@ void send_message_delayed (void *cls, | |||
1267 | * @param distance the distance to the immediate peer | 1309 | * @param distance the distance to the immediate peer |
1268 | */ | 1310 | */ |
1269 | static int handle_dv_data_message (void *cls, | 1311 | static int handle_dv_data_message (void *cls, |
1270 | const struct GNUNET_PeerIdentity * peer, | 1312 | const struct GNUNET_PeerIdentity * peer, |
1271 | const struct GNUNET_MessageHeader * message, | 1313 | const struct GNUNET_MessageHeader * message, |
1272 | struct GNUNET_TIME_Relative latency, | 1314 | struct GNUNET_TIME_Relative latency, |
1273 | uint32_t distance) | 1315 | uint32_t distance) |
1274 | { | 1316 | { |
1275 | const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message; | 1317 | const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message; |
1276 | const struct GNUNET_MessageHeader *packed_message; | 1318 | const struct GNUNET_MessageHeader *packed_message; |
@@ -1282,7 +1324,9 @@ static int handle_dv_data_message (void *cls, | |||
1282 | struct GNUNET_PeerIdentity *destination; | 1324 | struct GNUNET_PeerIdentity *destination; |
1283 | struct FindDestinationContext fdc; | 1325 | struct FindDestinationContext fdc; |
1284 | struct TokenizedMessageContext tkm_ctx; | 1326 | struct TokenizedMessageContext tkm_ctx; |
1327 | #if DELAY_FORWARDS | ||
1285 | struct DelayedMessageContext *delayed_context; | 1328 | struct DelayedMessageContext *delayed_context; |
1329 | #endif | ||
1286 | #if USE_PEER_ID | 1330 | #if USE_PEER_ID |
1287 | struct CheckPeerContext checkPeerCtx; | 1331 | struct CheckPeerContext checkPeerCtx; |
1288 | #endif | 1332 | #endif |
@@ -1313,15 +1357,10 @@ static int handle_dv_data_message (void *cls, | |||
1313 | } | 1357 | } |
1314 | 1358 | ||
1315 | dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors, | 1359 | dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors, |
1316 | &peer->hashPubKey); | 1360 | &peer->hashPubKey); |
1317 | if (dn == NULL) | 1361 | if (dn == NULL) |
1318 | { | 1362 | return GNUNET_OK; |
1319 | #if DEBUG_DV | 1363 | |
1320 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1321 | "%s: dn NULL!\n", "dv"); | ||
1322 | #endif | ||
1323 | return GNUNET_OK; | ||
1324 | } | ||
1325 | sid = ntohl (incoming->sender); | 1364 | sid = ntohl (incoming->sender); |
1326 | #if USE_PEER_ID | 1365 | #if USE_PEER_ID |
1327 | if (sid != 0) | 1366 | if (sid != 0) |
@@ -1344,11 +1383,10 @@ static int handle_dv_data_message (void *cls, | |||
1344 | 1383 | ||
1345 | if (pos == NULL) | 1384 | if (pos == NULL) |
1346 | { | 1385 | { |
1347 | direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); | ||
1348 | #if DEBUG_DV_MESSAGES | 1386 | #if DEBUG_DV_MESSAGES |
1387 | direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1388 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1350 | "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id); | 1389 | "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id); |
1351 | #endif | ||
1352 | GNUNET_free(direct_id); | 1390 | GNUNET_free(direct_id); |
1353 | pos = dn->referee_head; | 1391 | pos = dn->referee_head; |
1354 | while ((NULL != pos) && (pos->referrer_id != sid)) | 1392 | while ((NULL != pos) && (pos->referrer_id != sid)) |
@@ -1358,6 +1396,18 @@ static int handle_dv_data_message (void *cls, | |||
1358 | GNUNET_free(sender_id); | 1396 | GNUNET_free(sender_id); |
1359 | pos = pos->next; | 1397 | pos = pos->next; |
1360 | } | 1398 | } |
1399 | #endif | ||
1400 | if (dn->pending_message.sender_id != 0) | ||
1401 | { | ||
1402 | GNUNET_free(dn->pending_message.message); | ||
1403 | } | ||
1404 | |||
1405 | dn->pending_message.message = GNUNET_malloc(ntohs (message->size)); | ||
1406 | memcpy(dn->pending_message.message, message, ntohs(message->size)); | ||
1407 | dn->pending_message.distance = distance; | ||
1408 | dn->pending_message.latency = latency; | ||
1409 | memcpy(&dn->pending_message.sender, peer, sizeof(struct GNUNET_PeerIdentity)); | ||
1410 | dn->pending_message.sender_id = sid; | ||
1361 | 1411 | ||
1362 | #if DEBUG_MESSAGE_DROP | 1412 | #if DEBUG_MESSAGE_DROP |
1363 | direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); | 1413 | direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); |
@@ -1388,27 +1438,6 @@ static int handle_dv_data_message (void *cls, | |||
1388 | GNUNET_break_op(0); | 1438 | GNUNET_break_op(0); |
1389 | GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE"); | 1439 | GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE"); |
1390 | } | 1440 | } |
1391 | #if NO_MST | ||
1392 | offset = 0; | ||
1393 | while(offset < packed_message_size) | ||
1394 | { | ||
1395 | packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset]; | ||
1396 | |||
1397 | GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP); | ||
1398 | GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA); | ||
1399 | if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) && | ||
1400 | (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) ) | ||
1401 | { | ||
1402 | #if DEBUG_DV_MESSAGES | ||
1403 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1404 | "%s: Receives %s message(s) for me, uid %u, total size %d cost %u from %s!\n", my_short_id, "DV DATA", ntohl(incoming->uid), ntohs(packed_message->size), pos->cost, GNUNET_i2s(&pos->identity)); | ||
1405 | #endif | ||
1406 | GNUNET_assert(memcmp(peer, &pos->identity, sizeof(struct GNUNET_PeerIdentity)) != 0); | ||
1407 | send_to_plugin(peer, packed_message, ntohs(packed_message->size), &pos->identity, pos->cost); | ||
1408 | } | ||
1409 | offset += ntohs(packed_message->size); | ||
1410 | } | ||
1411 | #endif | ||
1412 | return GNUNET_OK; | 1441 | return GNUNET_OK; |
1413 | } | 1442 | } |
1414 | else | 1443 | else |
@@ -1436,7 +1465,7 @@ static int handle_dv_data_message (void *cls, | |||
1436 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1465 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1437 | "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid); | 1466 | "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid); |
1438 | #endif | 1467 | #endif |
1439 | return GNUNET_OK; | 1468 | return GNUNET_OK; |
1440 | } | 1469 | } |
1441 | destination = &fdc.dest->identity; | 1470 | destination = &fdc.dest->identity; |
1442 | 1471 | ||
@@ -1458,18 +1487,12 @@ static int handle_dv_data_message (void *cls, | |||
1458 | /* At this point we have a message, and we need to forward it on to the | 1487 | /* At this point we have a message, and we need to forward it on to the |
1459 | * next DV hop. | 1488 | * next DV hop. |
1460 | */ | 1489 | */ |
1461 | /* FIXME: Can't send message on, we have to behave. | ||
1462 | * We have to tell core we have a message for the next peer, and let | ||
1463 | * transport do transport selection on how to get this message to 'em */ | ||
1464 | /*ret = send_message (&destination, | ||
1465 | &original_sender, | ||
1466 | packed_message, DV_PRIORITY, DV_DELAY);*/ | ||
1467 | |||
1468 | #if DEBUG_DV_MESSAGES | 1490 | #if DEBUG_DV_MESSAGES |
1469 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1470 | "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost); | 1492 | "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost); |
1471 | #endif | 1493 | #endif |
1472 | 1494 | ||
1495 | #if DELAY_FORWARDS | ||
1473 | if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value) | 1496 | if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value) |
1474 | { | 1497 | { |
1475 | delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext)); | 1498 | delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext)); |
@@ -1480,10 +1503,10 @@ static int handle_dv_data_message (void *cls, | |||
1480 | delayed_context->message_size = packed_message_size; | 1503 | delayed_context->message_size = packed_message_size; |
1481 | delayed_context->uid = ntohl(incoming->uid); | 1504 | delayed_context->uid = ntohl(incoming->uid); |
1482 | GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context); | 1505 | GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context); |
1483 | //GNUNET_SCHEDULER_add_now(sched, &send_message_delayed, delayed_context); | ||
1484 | return GNUNET_OK; | 1506 | return GNUNET_OK; |
1485 | } | 1507 | } |
1486 | else | 1508 | else |
1509 | #endif | ||
1487 | { | 1510 | { |
1488 | ret = send_message(destination, | 1511 | ret = send_message(destination, |
1489 | original_sender, | 1512 | original_sender, |
@@ -2425,6 +2448,16 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO | |||
2425 | neighbor, | 2448 | neighbor, |
2426 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 2449 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
2427 | 2450 | ||
2451 | if ((referrer_peer_id != 0) && (referrer->pending_message.sender_id == referrer_peer_id)) /* We have a queued message from just learned about peer! */ | ||
2452 | { | ||
2453 | #if DEBUG_DV_MESSAGES | ||
2454 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: learned about peer %llu from which we have a previous unknown message, processing!\n", my_short_id, referrer_peer_id); | ||
2455 | #endif | ||
2456 | handle_dv_data_message(NULL, &referrer->pending_message.sender, referrer->pending_message.message, referrer->pending_message.latency, referrer->pending_message.distance); | ||
2457 | GNUNET_free(referrer->pending_message.message); | ||
2458 | referrer->pending_message.sender_id = 0; | ||
2459 | } | ||
2460 | |||
2428 | if (cost != DIRECT_NEIGHBOR_COST) | 2461 | if (cost != DIRECT_NEIGHBOR_COST) |
2429 | { | 2462 | { |
2430 | /* Added neighbor, now send HELLO to transport */ | 2463 | /* Added neighbor, now send HELLO to transport */ |