aboutsummaryrefslogtreecommitdiff
path: root/src/dv
diff options
context:
space:
mode:
authorNathan S. Evans <evans@in.tum.de>2010-06-25 09:49:00 +0000
committerNathan S. Evans <evans@in.tum.de>2010-06-25 09:49:00 +0000
commitcc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220 (patch)
tree6cdda0a8084a085505a82d029a4b962535a7e5ce /src/dv
parent7f4dddcdc1004b8699c55fc23f15249a6f9e40b2 (diff)
downloadgnunet-cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220.tar.gz
gnunet-cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220.zip
trying message queues on receiver side
Diffstat (limited to 'src/dv')
-rw-r--r--src/dv/gnunet-service-dv.c125
1 files changed, 79 insertions, 46 deletions
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 00de78aae..fdff7e666 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -243,6 +243,38 @@ struct NeighborUpdateInfo
243}; 243};
244 244
245/** 245/**
246 * Struct to store a single message received with
247 * an unknown sender.
248 */
249struct UnknownSenderMessage
250{
251 /**
252 * Message sender (immediate)
253 */
254 struct GNUNET_PeerIdentity sender;
255
256 /**
257 * The actual message received
258 */
259 struct GNUNET_MessageHeader *message;
260
261 /**
262 * Latency of connection
263 */
264 struct GNUNET_TIME_Relative latency;
265
266 /**
267 * Distance to destination
268 */
269 uint32_t distance;
270
271 /**
272 * Unknown sender id
273 */
274 uint32_t sender_id;
275};
276
277/**
246 * Struct where actual neighbor information is stored, 278 * Struct where actual neighbor information is stored,
247 * referenced by min_heap and max_heap. Freeing dealt 279 * referenced by min_heap and max_heap. Freeing dealt
248 * with when items removed from hashmap. 280 * with when items removed from hashmap.
@@ -279,6 +311,13 @@ struct DirectNeighbor
279 * from DV? 311 * from DV?
280 */ 312 */
281 int hidden; 313 int hidden;
314
315 /**
316 * Save a single message from a direct neighbor from a peer
317 * we don't know on the chance that it will be gossiped about
318 * and we can deliver the message.
319 */
320 struct UnknownSenderMessage pending_message;
282}; 321};
283 322
284 323
@@ -1226,6 +1265,7 @@ void tokenized_message_handler (void *cls,
1226 } 1265 }
1227} 1266}
1228 1267
1268#if DELAY_FORWARDS
1229struct DelayedMessageContext 1269struct DelayedMessageContext
1230{ 1270{
1231 struct GNUNET_PeerIdentity dest; 1271 struct GNUNET_PeerIdentity dest;
@@ -1249,10 +1289,12 @@ void send_message_delayed (void *cls,
1249 default_dv_priority, 1289 default_dv_priority,
1250 msg_ctx->uid, 1290 msg_ctx->uid,
1251 GNUNET_TIME_relative_get_forever()); 1291 GNUNET_TIME_relative_get_forever());
1292 GNUNET_free(msg_ctx->message);
1293 GNUNET_free(msg_ctx);
1252 } 1294 }
1253 GNUNET_free(msg_ctx->message);
1254 GNUNET_free(msg_ctx);
1255} 1295}
1296#endif
1297
1256 1298
1257/** 1299/**
1258 * Core handler for dv data messages. Whatever this message 1300 * Core handler for dv data messages. Whatever this message
@@ -1267,10 +1309,10 @@ void send_message_delayed (void *cls,
1267 * @param distance the distance to the immediate peer 1309 * @param distance the distance to the immediate peer
1268 */ 1310 */
1269static int handle_dv_data_message (void *cls, 1311static int handle_dv_data_message (void *cls,
1270 const struct GNUNET_PeerIdentity * peer, 1312 const struct GNUNET_PeerIdentity * peer,
1271 const struct GNUNET_MessageHeader * message, 1313 const struct GNUNET_MessageHeader * message,
1272 struct GNUNET_TIME_Relative latency, 1314 struct GNUNET_TIME_Relative latency,
1273 uint32_t distance) 1315 uint32_t distance)
1274{ 1316{
1275 const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message; 1317 const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message;
1276 const struct GNUNET_MessageHeader *packed_message; 1318 const struct GNUNET_MessageHeader *packed_message;
@@ -1282,7 +1324,9 @@ static int handle_dv_data_message (void *cls,
1282 struct GNUNET_PeerIdentity *destination; 1324 struct GNUNET_PeerIdentity *destination;
1283 struct FindDestinationContext fdc; 1325 struct FindDestinationContext fdc;
1284 struct TokenizedMessageContext tkm_ctx; 1326 struct TokenizedMessageContext tkm_ctx;
1327#if DELAY_FORWARDS
1285 struct DelayedMessageContext *delayed_context; 1328 struct DelayedMessageContext *delayed_context;
1329#endif
1286#if USE_PEER_ID 1330#if USE_PEER_ID
1287 struct CheckPeerContext checkPeerCtx; 1331 struct CheckPeerContext checkPeerCtx;
1288#endif 1332#endif
@@ -1313,15 +1357,10 @@ static int handle_dv_data_message (void *cls,
1313 } 1357 }
1314 1358
1315 dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors, 1359 dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
1316 &peer->hashPubKey); 1360 &peer->hashPubKey);
1317 if (dn == NULL) 1361 if (dn == NULL)
1318 { 1362 return GNUNET_OK;
1319#if DEBUG_DV 1363
1320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321 "%s: dn NULL!\n", "dv");
1322#endif
1323 return GNUNET_OK;
1324 }
1325 sid = ntohl (incoming->sender); 1364 sid = ntohl (incoming->sender);
1326#if USE_PEER_ID 1365#if USE_PEER_ID
1327 if (sid != 0) 1366 if (sid != 0)
@@ -1344,11 +1383,10 @@ static int handle_dv_data_message (void *cls,
1344 1383
1345 if (pos == NULL) 1384 if (pos == NULL)
1346 { 1385 {
1347 direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
1348#if DEBUG_DV_MESSAGES 1386#if DEBUG_DV_MESSAGES
1387 direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1388 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350 "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id); 1389 "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id);
1351#endif
1352 GNUNET_free(direct_id); 1390 GNUNET_free(direct_id);
1353 pos = dn->referee_head; 1391 pos = dn->referee_head;
1354 while ((NULL != pos) && (pos->referrer_id != sid)) 1392 while ((NULL != pos) && (pos->referrer_id != sid))
@@ -1358,6 +1396,18 @@ static int handle_dv_data_message (void *cls,
1358 GNUNET_free(sender_id); 1396 GNUNET_free(sender_id);
1359 pos = pos->next; 1397 pos = pos->next;
1360 } 1398 }
1399#endif
1400 if (dn->pending_message.sender_id != 0)
1401 {
1402 GNUNET_free(dn->pending_message.message);
1403 }
1404
1405 dn->pending_message.message = GNUNET_malloc(ntohs (message->size));
1406 memcpy(dn->pending_message.message, message, ntohs(message->size));
1407 dn->pending_message.distance = distance;
1408 dn->pending_message.latency = latency;
1409 memcpy(&dn->pending_message.sender, peer, sizeof(struct GNUNET_PeerIdentity));
1410 dn->pending_message.sender_id = sid;
1361 1411
1362#if DEBUG_MESSAGE_DROP 1412#if DEBUG_MESSAGE_DROP
1363 direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); 1413 direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
@@ -1388,27 +1438,6 @@ static int handle_dv_data_message (void *cls,
1388 GNUNET_break_op(0); 1438 GNUNET_break_op(0);
1389 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE"); 1439 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE");
1390 } 1440 }
1391#if NO_MST
1392 offset = 0;
1393 while(offset < packed_message_size)
1394 {
1395 packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset];
1396
1397 GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP);
1398 GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA);
1399 if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
1400 (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
1401 {
1402#if DEBUG_DV_MESSAGES
1403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1404 "%s: Receives %s message(s) for me, uid %u, total size %d cost %u from %s!\n", my_short_id, "DV DATA", ntohl(incoming->uid), ntohs(packed_message->size), pos->cost, GNUNET_i2s(&pos->identity));
1405#endif
1406 GNUNET_assert(memcmp(peer, &pos->identity, sizeof(struct GNUNET_PeerIdentity)) != 0);
1407 send_to_plugin(peer, packed_message, ntohs(packed_message->size), &pos->identity, pos->cost);
1408 }
1409 offset += ntohs(packed_message->size);
1410 }
1411#endif
1412 return GNUNET_OK; 1441 return GNUNET_OK;
1413 } 1442 }
1414 else 1443 else
@@ -1436,7 +1465,7 @@ static int handle_dv_data_message (void *cls,
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437 "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid); 1466 "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid);
1438#endif 1467#endif
1439 return GNUNET_OK; 1468 return GNUNET_OK;
1440 } 1469 }
1441 destination = &fdc.dest->identity; 1470 destination = &fdc.dest->identity;
1442 1471
@@ -1458,18 +1487,12 @@ static int handle_dv_data_message (void *cls,
1458 /* At this point we have a message, and we need to forward it on to the 1487 /* At this point we have a message, and we need to forward it on to the
1459 * next DV hop. 1488 * next DV hop.
1460 */ 1489 */
1461 /* FIXME: Can't send message on, we have to behave.
1462 * We have to tell core we have a message for the next peer, and let
1463 * transport do transport selection on how to get this message to 'em */
1464 /*ret = send_message (&destination,
1465 &original_sender,
1466 packed_message, DV_PRIORITY, DV_DELAY);*/
1467
1468#if DEBUG_DV_MESSAGES 1490#if DEBUG_DV_MESSAGES
1469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470 "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost); 1492 "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost);
1471#endif 1493#endif
1472 1494
1495#if DELAY_FORWARDS
1473 if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value) 1496 if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value)
1474 { 1497 {
1475 delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext)); 1498 delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext));
@@ -1480,10 +1503,10 @@ static int handle_dv_data_message (void *cls,
1480 delayed_context->message_size = packed_message_size; 1503 delayed_context->message_size = packed_message_size;
1481 delayed_context->uid = ntohl(incoming->uid); 1504 delayed_context->uid = ntohl(incoming->uid);
1482 GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context); 1505 GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context);
1483 //GNUNET_SCHEDULER_add_now(sched, &send_message_delayed, delayed_context);
1484 return GNUNET_OK; 1506 return GNUNET_OK;
1485 } 1507 }
1486 else 1508 else
1509#endif
1487 { 1510 {
1488 ret = send_message(destination, 1511 ret = send_message(destination,
1489 original_sender, 1512 original_sender,
@@ -2425,6 +2448,16 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO
2425 neighbor, 2448 neighbor,
2426 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 2449 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2427 2450
2451 if ((referrer_peer_id != 0) && (referrer->pending_message.sender_id == referrer_peer_id)) /* We have a queued message from just learned about peer! */
2452 {
2453#if DEBUG_DV_MESSAGES
2454 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: learned about peer %llu from which we have a previous unknown message, processing!\n", my_short_id, referrer_peer_id);
2455#endif
2456 handle_dv_data_message(NULL, &referrer->pending_message.sender, referrer->pending_message.message, referrer->pending_message.latency, referrer->pending_message.distance);
2457 GNUNET_free(referrer->pending_message.message);
2458 referrer->pending_message.sender_id = 0;
2459 }
2460
2428 if (cost != DIRECT_NEIGHBOR_COST) 2461 if (cost != DIRECT_NEIGHBOR_COST)
2429 { 2462 {
2430 /* Added neighbor, now send HELLO to transport */ 2463 /* Added neighbor, now send HELLO to transport */