aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c491
1 files changed, 222 insertions, 269 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 4b9eecc58..dde8c6d7a 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -53,7 +53,7 @@ struct PendingMessage
53 53
54 /** 54 /**
55 * Actual message to be sent, allocated at the end of the struct: 55 * Actual message to be sent, allocated at the end of the struct:
56 * // msg = (cast) &pm[1]; 56 * // msg = (cast) &pm[1];
57 * // memcpy (&pm[1], data, len); 57 * // memcpy (&pm[1], data, len);
58 */ 58 */
59 const struct GNUNET_MessageHeader *msg; 59 const struct GNUNET_MessageHeader *msg;
@@ -224,15 +224,13 @@ find_active_client (struct GNUNET_SERVER_Client *client)
224 } 224 }
225 ret = GNUNET_malloc (sizeof (struct ClientList)); 225 ret = GNUNET_malloc (sizeof (struct ClientList));
226 ret->client_handle = client; 226 ret->client_handle = client;
227 GNUNET_CONTAINER_DLL_insert (client_head, 227 GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret);
228 client_tail,
229 ret);
230 return ret; 228 return ret;
231} 229}
232 230
233 231
234/** 232/**
235 * Iterator over hash map entries that frees all entries 233 * Iterator over hash map entries that frees all entries
236 * associated with the given client. 234 * associated with the given client.
237 * 235 *
238 * @param cls client to search for in source routes 236 * @param cls client to search for in source routes
@@ -250,18 +248,15 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
250 return GNUNET_YES; 248 return GNUNET_YES;
251#if DEBUG_DHT 249#if DEBUG_DHT
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 250 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "Removing client %p's record for key %s\n", 251 "Removing client %p's record for key %s\n", client,
254 client, 252 GNUNET_h2s (key));
255 GNUNET_h2s (key));
256#endif 253#endif
257 GNUNET_assert (GNUNET_YES == 254 GNUNET_assert (GNUNET_YES ==
258 GNUNET_CONTAINER_multihashmap_remove (forward_map, 255 GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
259 key, record)); 256 record));
260 if (NULL != record->hnode) 257 if (NULL != record->hnode)
261 GNUNET_CONTAINER_heap_remove_node (record->hnode); 258 GNUNET_CONTAINER_heap_remove_node (record->hnode);
262 GNUNET_array_grow (record->seen_replies, 259 GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0);
263 record->seen_replies_count,
264 0);
265 GNUNET_free (record); 260 GNUNET_free (record);
266 return GNUNET_YES; 261 return GNUNET_YES;
267} 262}
@@ -276,37 +271,31 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
276 * for the last call when the server is destroyed 271 * for the last call when the server is destroyed
277 */ 272 */
278static void 273static void
279handle_client_disconnect (void *cls, 274handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
280 struct GNUNET_SERVER_Client *client)
281{ 275{
282 struct ClientList *pos; 276 struct ClientList *pos;
283 struct PendingMessage *reply; 277 struct PendingMessage *reply;
284 278
285#if DEBUG_DHT 279#if DEBUG_DHT
286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client);
287 "Local client %p disconnects\n",
288 client);
289#endif 281#endif
290 pos = find_active_client (client); 282 pos = find_active_client (client);
291 GNUNET_CONTAINER_DLL_remove (client_head, 283 GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
292 client_tail,
293 pos);
294 if (pos->transmit_handle != NULL) 284 if (pos->transmit_handle != NULL)
295 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle); 285 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
296 while (NULL != (reply = pos->pending_head)) 286 while (NULL != (reply = pos->pending_head))
297 { 287 {
298 GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, 288 GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
299 reply); 289 GNUNET_free (reply);
300 GNUNET_free (reply); 290 }
301 } 291 GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records,
302 GNUNET_CONTAINER_multihashmap_iterate (forward_map, 292 pos);
303 &remove_client_records, pos);
304 GNUNET_free (pos); 293 GNUNET_free (pos);
305} 294}
306 295
307 296
308/** 297/**
309 * Route the given request via the DHT. This includes updating 298 * Route the given request via the DHT. This includes updating
310 * the bloom filter and retransmission times, building the P2P 299 * the bloom filter and retransmission times, building the P2P
311 * message and initiating the routing operation. 300 * message and initiating the routing operation.
312 */ 301 */
@@ -318,33 +307,30 @@ transmit_request (struct ClientQueryRecord *cqr)
318 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 307 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
319 308
320 GNUNET_STATISTICS_update (GDS_stats, 309 GNUNET_STATISTICS_update (GDS_stats,
321 gettext_noop ("# GET requests from clients injected"), 1, 310 gettext_noop
311 ("# GET requests from clients injected"), 1,
322 GNUNET_NO); 312 GNUNET_NO);
323 reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 313 reply_bf_mutator =
324 UINT32_MAX); 314 (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
325 reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, 315 UINT32_MAX);
326 cqr->seen_replies, 316 reply_bf =
327 cqr->seen_replies_count); 317 GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, cqr->seen_replies,
328 peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 318 cqr->seen_replies_count);
329 DHT_BLOOM_SIZE, 319 peer_bf =
330 GNUNET_CONSTANTS_BLOOMFILTER_K); 320 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
331 GDS_NEIGHBOURS_handle_get (cqr->type, 321 GNUNET_CONSTANTS_BLOOMFILTER_K);
332 cqr->msg_options, 322 GDS_NEIGHBOURS_handle_get (cqr->type, cqr->msg_options, cqr->replication,
333 cqr->replication, 323 0 /* hop count */ ,
334 0 /* hop count */, 324 &cqr->key, cqr->xquery, cqr->xquery_size, reply_bf,
335 &cqr->key, 325 reply_bf_mutator, peer_bf);
336 cqr->xquery,
337 cqr->xquery_size,
338 reply_bf,
339 reply_bf_mutator,
340 peer_bf);
341 GNUNET_CONTAINER_bloomfilter_free (reply_bf); 326 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
342 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 327 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
343 328
344 /* exponential back-off for retries, max 1h */ 329 /* exponential back-off for retries, max 1h */
345 cqr->retry_frequency = 330 cqr->retry_frequency =
346 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, 331 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
347 GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2)); 332 GNUNET_TIME_relative_multiply
333 (cqr->retry_frequency, 2));
348 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency); 334 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
349} 335}
350 336
@@ -359,7 +345,7 @@ transmit_request (struct ClientQueryRecord *cqr)
359 */ 345 */
360static void 346static void
361transmit_next_request_task (void *cls, 347transmit_next_request_task (void *cls,
362 const struct GNUNET_SCHEDULER_TaskContext *tc) 348 const struct GNUNET_SCHEDULER_TaskContext *tc)
363{ 349{
364 struct ClientQueryRecord *cqr; 350 struct ClientQueryRecord *cqr;
365 struct GNUNET_TIME_Relative delay; 351 struct GNUNET_TIME_Relative delay;
@@ -368,22 +354,24 @@ transmit_next_request_task (void *cls,
368 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 354 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
369 return; 355 return;
370 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) 356 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
357 {
358 cqr->hnode = NULL;
359 delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
360 if (delay.rel_value > 0)
371 { 361 {
372 cqr->hnode = NULL; 362 cqr->hnode =
373 delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time); 363 GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
374 if (delay.rel_value > 0) 364 cqr->retry_time.abs_value);
375 { 365 retry_task =
376 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 366 GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task,
377 cqr->retry_time.abs_value); 367 NULL);
378 retry_task = GNUNET_SCHEDULER_add_delayed (delay, 368 return;
379 &transmit_next_request_task,
380 NULL);
381 return;
382 }
383 transmit_request (cqr);
384 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
385 cqr->retry_time.abs_value);
386 } 369 }
370 transmit_request (cqr);
371 cqr->hnode =
372 GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
373 cqr->retry_time.abs_value);
374 }
387} 375}
388 376
389 377
@@ -396,58 +384,52 @@ transmit_next_request_task (void *cls,
396 */ 384 */
397static void 385static void
398handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, 386handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
399 const struct GNUNET_MessageHeader *message) 387 const struct GNUNET_MessageHeader *message)
400{ 388{
401 const struct GNUNET_DHT_ClientPutMessage *dht_msg; 389 const struct GNUNET_DHT_ClientPutMessage *dht_msg;
402 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 390 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
403 uint16_t size; 391 uint16_t size;
404 392
405 size = ntohs (message->size); 393 size = ntohs (message->size);
406 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) 394 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
407 { 395 {
408 GNUNET_break (0); 396 GNUNET_break (0);
409 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 397 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
410 return; 398 return;
411 } 399 }
412 GNUNET_STATISTICS_update (GDS_stats, 400 GNUNET_STATISTICS_update (GDS_stats,
413 gettext_noop ("# PUT requests received from clients"), 1, 401 gettext_noop
402 ("# PUT requests received from clients"), 1,
414 GNUNET_NO); 403 GNUNET_NO);
415 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; 404 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
416 /* give to local clients */ 405 /* give to local clients */
417#if DEBUG_DHT 406#if DEBUG_DHT
418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419 "Handling local PUT of %u-bytes for query %s\n", 408 "Handling local PUT of %u-bytes for query %s\n",
420 size - sizeof (struct GNUNET_DHT_ClientPutMessage), 409 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
421 GNUNET_h2s (&dht_msg->key)); 410 GNUNET_h2s (&dht_msg->key));
422#endif 411#endif
423 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 412 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
424 &dht_msg->key, 413 &dht_msg->key, 0, NULL, 0, NULL,
425 0, NULL, 414 ntohl (dht_msg->type),
426 0, NULL, 415 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
427 ntohl (dht_msg->type), 416 &dht_msg[1]);
428 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
429 &dht_msg[1]);
430 /* store locally */ 417 /* store locally */
431 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 418 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
432 &dht_msg->key, 419 &dht_msg->key, 0, NULL, ntohl (dht_msg->type),
433 0, NULL, 420 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
434 ntohl (dht_msg->type), 421 &dht_msg[1]);
435 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
436 &dht_msg[1]);
437 /* route to other peers */ 422 /* route to other peers */
438 peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 423 peer_bf =
439 DHT_BLOOM_SIZE, 424 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
440 GNUNET_CONSTANTS_BLOOMFILTER_K); 425 GNUNET_CONSTANTS_BLOOMFILTER_K);
441 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), 426 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options),
442 ntohl (dht_msg->options), 427 ntohl (dht_msg->desired_replication_level),
443 ntohl (dht_msg->desired_replication_level), 428 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
444 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 429 0 /* hop count */ ,
445 0 /* hop count */, 430 peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
446 peer_bf, 431 size -
447 &dht_msg->key, 432 sizeof (struct GNUNET_DHT_ClientPutMessage));
448 0, NULL,
449 &dht_msg[1],
450 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
451 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 433 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
452 GNUNET_SERVER_receive_done (client, GNUNET_OK); 434 GNUNET_SERVER_receive_done (client, GNUNET_OK);
453} 435}
@@ -464,58 +446,55 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
464 */ 446 */
465static void 447static void
466handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, 448handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
467 const struct GNUNET_MessageHeader *message) 449 const struct GNUNET_MessageHeader *message)
468{ 450{
469 const struct GNUNET_DHT_ClientGetMessage *get; 451 const struct GNUNET_DHT_ClientGetMessage *get;
470 struct ClientQueryRecord *cqr; 452 struct ClientQueryRecord *cqr;
471 size_t xquery_size; 453 size_t xquery_size;
472 const char* xquery; 454 const char *xquery;
473 uint16_t size; 455 uint16_t size;
474 456
475 size = ntohs (message->size); 457 size = ntohs (message->size);
476 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage)) 458 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
477 { 459 {
478 GNUNET_break (0); 460 GNUNET_break (0);
479 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 461 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
480 return; 462 return;
481 } 463 }
482 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); 464 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
483 get = (const struct GNUNET_DHT_ClientGetMessage *) message; 465 get = (const struct GNUNET_DHT_ClientGetMessage *) message;
484 xquery = (const char*) &get[1]; 466 xquery = (const char *) &get[1];
485 GNUNET_STATISTICS_update (GDS_stats, 467 GNUNET_STATISTICS_update (GDS_stats,
486 gettext_noop ("# GET requests received from clients"), 1, 468 gettext_noop
469 ("# GET requests received from clients"), 1,
487 GNUNET_NO); 470 GNUNET_NO);
488#if DEBUG_DHT 471#if DEBUG_DHT
489 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490 "Received request for %s from local client %p\n", 473 "Received request for %s from local client %p\n",
491 GNUNET_h2s (&get->key), 474 GNUNET_h2s (&get->key), client);
492 client);
493#endif 475#endif
494 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); 476 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
495 cqr->key = get->key; 477 cqr->key = get->key;
496 cqr->client = find_active_client (client); 478 cqr->client = find_active_client (client);
497 cqr->xquery = (void*) &cqr[1]; 479 cqr->xquery = (void *) &cqr[1];
498 memcpy (&cqr[1], xquery, xquery_size); 480 memcpy (&cqr[1], xquery, xquery_size);
499 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 481 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
500 cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS; 482 cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
501 cqr->retry_time = GNUNET_TIME_absolute_get (); 483 cqr->retry_time = GNUNET_TIME_absolute_get ();
502 cqr->unique_id = get->unique_id; 484 cqr->unique_id = get->unique_id;
503 cqr->xquery_size = xquery_size; 485 cqr->xquery_size = xquery_size;
504 cqr->replication = ntohl (get->desired_replication_level); 486 cqr->replication = ntohl (get->desired_replication_level);
505 cqr->msg_options = ntohl (get->options); 487 cqr->msg_options = ntohl (get->options);
506 cqr->type = ntohl (get->type); 488 cqr->type = ntohl (get->type);
507 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, 489 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
508 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 490 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
509 /* start remote requests */ 491 /* start remote requests */
510 if (GNUNET_SCHEDULER_NO_TASK != retry_task) 492 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
511 GNUNET_SCHEDULER_cancel (retry_task); 493 GNUNET_SCHEDULER_cancel (retry_task);
512 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL); 494 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
513 /* perform local lookup */ 495 /* perform local lookup */
514 GDS_DATACACHE_handle_get (&get->key, 496 GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size,
515 cqr->type, 497 NULL, 0);
516 cqr->xquery,
517 xquery_size,
518 NULL, 0);
519 GNUNET_SERVER_receive_done (client, GNUNET_OK); 498 GNUNET_SERVER_receive_done (client, GNUNET_OK);
520} 499}
521 500
@@ -538,7 +517,7 @@ struct RemoveByUniqueIdContext
538 517
539 518
540/** 519/**
541 * Iterator over hash map entries that frees all entries 520 * Iterator over hash map entries that frees all entries
542 * that match the given client and unique ID. 521 * that match the given client and unique ID.
543 * 522 *
544 * @param cls unique ID and client to search for in source routes 523 * @param cls unique ID and client to search for in source routes
@@ -556,9 +535,8 @@ remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
556 return GNUNET_YES; 535 return GNUNET_YES;
557#if DEBUG_DHT 536#if DEBUG_DHT
558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559 "Removing client %p's record for key %s (by unique id)\n", 538 "Removing client %p's record for key %s (by unique id)\n",
560 ctx->client->client_handle, 539 ctx->client->client_handle, GNUNET_h2s (key));
561 GNUNET_h2s (key));
562#endif 540#endif
563 return remove_client_records (ctx->client, key, record); 541 return remove_client_records (ctx->client, key, record);
564} 542}
@@ -575,27 +553,24 @@ remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
575 */ 553 */
576static void 554static void
577handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, 555handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
578 const struct GNUNET_MessageHeader *message) 556 const struct GNUNET_MessageHeader *message)
579{ 557{
580 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = 558 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
581 (const struct GNUNET_DHT_ClientGetStopMessage *) message; 559 (const struct GNUNET_DHT_ClientGetStopMessage *) message;
582 struct RemoveByUniqueIdContext ctx; 560 struct RemoveByUniqueIdContext ctx;
583 561
584 GNUNET_STATISTICS_update (GDS_stats, 562 GNUNET_STATISTICS_update (GDS_stats,
585 gettext_noop ("# GET STOP requests received from clients"), 1, 563 gettext_noop
564 ("# GET STOP requests received from clients"), 1,
586 GNUNET_NO); 565 GNUNET_NO);
587#if DEBUG_DHT 566#if DEBUG_DHT
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n",
589 "Client %p stopped request for key %s\n", 568 client, GNUNET_h2s (&dht_stop_msg->key));
590 client,
591 GNUNET_h2s (&dht_stop_msg->key));
592#endif 569#endif
593 ctx.client = find_active_client (client); 570 ctx.client = find_active_client (client);
594 ctx.unique_id = dht_stop_msg->unique_id; 571 ctx.unique_id = dht_stop_msg->unique_id;
595 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, 572 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
596 &dht_stop_msg->key, 573 &remove_by_unique_id, &ctx);
597 &remove_by_unique_id,
598 &ctx);
599 GNUNET_SERVER_receive_done (client, GNUNET_OK); 574 GNUNET_SERVER_receive_done (client, GNUNET_OK);
600} 575}
601 576
@@ -636,8 +611,8 @@ send_reply_to_client (void *cls, size_t size, void *buf)
636 /* client disconnected */ 611 /* client disconnected */
637#if DEBUG_DHT 612#if DEBUG_DHT
638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639 "Client %p disconnected, pending messages will be discarded\n", 614 "Client %p disconnected, pending messages will be discarded\n",
640 client->client_handle); 615 client->client_handle);
641#endif 616#endif
642 return 0; 617 return 0;
643 } 618 }
@@ -650,20 +625,15 @@ send_reply_to_client (void *cls, size_t size, void *buf)
650 memcpy (&cbuf[off], reply->msg, msize); 625 memcpy (&cbuf[off], reply->msg, msize);
651 GNUNET_free (reply); 626 GNUNET_free (reply);
652#if DEBUG_DHT 627#if DEBUG_DHT
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
654 "Transmitting %u bytes to client %p\n", 629 msize, client->client_handle);
655 msize,
656 client->client_handle);
657#endif 630#endif
658 off += msize; 631 off += msize;
659 } 632 }
660 process_pending_messages (client); 633 process_pending_messages (client);
661#if DEBUG_DHT 634#if DEBUG_DHT
662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
663 "Transmitted %u/%u bytes to client %p\n", 636 (unsigned int) off, (unsigned int) size, client->client_handle);
664 (unsigned int) off,
665 (unsigned int) size,
666 client->client_handle);
667#endif 637#endif
668 return off; 638 return off;
669} 639}
@@ -681,20 +651,17 @@ process_pending_messages (struct ClientList *client)
681 { 651 {
682#if DEBUG_DHT 652#if DEBUG_DHT
683 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
684 "Not asking for transmission to %p now: %s\n", 654 "Not asking for transmission to %p now: %s\n",
685 client->client_handle, 655 client->client_handle,
686 client->pending_head == NULL 656 client->pending_head ==
687 ? "no more messages" 657 NULL ? "no more messages" : "request already pending");
688 : "request already pending");
689#endif 658#endif
690 return; 659 return;
691 } 660 }
692#if DEBUG_DHT 661#if DEBUG_DHT
693 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694 "Asking for transmission of %u bytes to client %p\n", 663 "Asking for transmission of %u bytes to client %p\n",
695 ntohs (client->pending_head-> 664 ntohs (client->pending_head->msg->size), client->client_handle);
696 msg->size),
697 client->client_handle);
698#endif 665#endif
699 client->transmit_handle = 666 client->transmit_handle =
700 GNUNET_SERVER_notify_transmit_ready (client->client_handle, 667 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
@@ -716,7 +683,7 @@ add_pending_message (struct ClientList *client,
716 struct PendingMessage *pending_message) 683 struct PendingMessage *pending_message)
717{ 684{
718 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, 685 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
719 pending_message); 686 pending_message);
720 process_pending_messages (client); 687 process_pending_messages (client);
721} 688}
722 689
@@ -728,7 +695,7 @@ struct ForwardReplyContext
728{ 695{
729 696
730 /** 697 /**
731 * Actual message to send to matching clients. 698 * Actual message to send to matching clients.
732 */ 699 */
733 struct PendingMessage *pm; 700 struct PendingMessage *pm;
734 701
@@ -777,51 +744,43 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
777 int do_free; 744 int do_free;
778 GNUNET_HashCode ch; 745 GNUNET_HashCode ch;
779 unsigned int i; 746 unsigned int i;
780 747
781 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) && 748 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
782 (record->type != frc->type) ) 749 {
750#if DEBUG_DHT
751 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
752 "Record type missmatch, not passing request for key %s to local client\n",
753 GNUNET_h2s (key));
754#endif
755 GNUNET_STATISTICS_update (GDS_stats,
756 gettext_noop
757 ("# Key match, type mismatches in REPLY to CLIENT"),
758 1, GNUNET_NO);
759 return GNUNET_YES; /* type mismatch */
760 }
761 GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
762 for (i = 0; i < record->seen_replies_count; i++)
763 if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode)))
783 { 764 {
784#if DEBUG_DHT 765#if DEBUG_DHT
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786 "Record type missmatch, not passing request for key %s to local client\n", 767 "Duplicate reply, not passing request for key %s to local client\n",
787 GNUNET_h2s (key)); 768 GNUNET_h2s (key));
788#endif 769#endif
789 GNUNET_STATISTICS_update (GDS_stats, 770 GNUNET_STATISTICS_update (GDS_stats,
790 gettext_noop ("# Key match, type mismatches in REPLY to CLIENT"), 1, 771 gettext_noop
791 GNUNET_NO); 772 ("# Duplicate REPLIES to CLIENT request dropped"),
792 return GNUNET_YES; /* type mismatch */ 773 1, GNUNET_NO);
774 return GNUNET_YES; /* duplicate */
793 } 775 }
794 GNUNET_CRYPTO_hash (frc->data,
795 frc->data_size,
796 &ch);
797 for (i=0;i<record->seen_replies_count;i++)
798 if (0 == memcmp (&record->seen_replies[i],
799 &ch,
800 sizeof (GNUNET_HashCode)))
801 {
802#if DEBUG_DHT
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804 "Duplicate reply, not passing request for key %s to local client\n",
805 GNUNET_h2s (key));
806#endif
807 GNUNET_STATISTICS_update (GDS_stats,
808 gettext_noop ("# Duplicate REPLIES to CLIENT request dropped"), 1,
809 GNUNET_NO);
810 return GNUNET_YES; /* duplicate */
811 }
812 eval = 776 eval =
813 GNUNET_BLOCK_evaluate (GDS_block_context, 777 GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
814 record->type, key, 778 record->xquery, record->xquery_size, frc->data,
815 NULL, 0, 779 frc->data_size);
816 record->xquery,
817 record->xquery_size,
818 frc->data,
819 frc->data_size);
820#if DEBUG_DHT 780#if DEBUG_DHT
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 "Evaluation result is %d for key %s for local client's query\n", 782 "Evaluation result is %d for key %s for local client's query\n",
823 (int) eval, 783 (int) eval, GNUNET_h2s (key));
824 GNUNET_h2s (key));
825#endif 784#endif
826 switch (eval) 785 switch (eval)
827 { 786 {
@@ -829,9 +788,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
829 do_free = GNUNET_YES; 788 do_free = GNUNET_YES;
830 break; 789 break;
831 case GNUNET_BLOCK_EVALUATION_OK_MORE: 790 case GNUNET_BLOCK_EVALUATION_OK_MORE:
832 GNUNET_array_append (record->seen_replies, 791 GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch);
833 record->seen_replies_count,
834 ch);
835 do_free = GNUNET_NO; 792 do_free = GNUNET_NO;
836 break; 793 break;
837 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 794 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
@@ -849,38 +806,36 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
849 return GNUNET_NO; 806 return GNUNET_NO;
850 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 807 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
851 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 808 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
852 _("Unsupported block type (%u) in request!\n"), 809 _("Unsupported block type (%u) in request!\n"), record->type);
853 record->type);
854 return GNUNET_NO; 810 return GNUNET_NO;
855 default: 811 default:
856 GNUNET_break (0); 812 GNUNET_break (0);
857 return GNUNET_NO; 813 return GNUNET_NO;
858 } 814 }
859 if (GNUNET_NO == frc->do_copy) 815 if (GNUNET_NO == frc->do_copy)
860 { 816 {
861 /* first time, we can use the original data */ 817 /* first time, we can use the original data */
862 pm = frc->pm; 818 pm = frc->pm;
863 frc->do_copy = GNUNET_YES; 819 frc->do_copy = GNUNET_YES;
864 } 820 }
865 else 821 else
866 { 822 {
867 /* two clients waiting for same reply, must copy for queueing */ 823 /* two clients waiting for same reply, must copy for queueing */
868 pm = GNUNET_malloc (sizeof (struct PendingMessage) + 824 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
869 ntohs (frc->pm->msg->size)); 825 ntohs (frc->pm->msg->size));
870 memcpy (pm, frc->pm, 826 memcpy (pm, frc->pm,
871 sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); 827 sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
872 pm->next = pm->prev = NULL; 828 pm->next = pm->prev = NULL;
873 } 829 }
874 GNUNET_STATISTICS_update (GDS_stats, 830 GNUNET_STATISTICS_update (GDS_stats,
875 gettext_noop ("# RESULTS queued for clients"), 1, 831 gettext_noop ("# RESULTS queued for clients"), 1,
876 GNUNET_NO); 832 GNUNET_NO);
877 reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1]; 833 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
878 reply->unique_id = record->unique_id; 834 reply->unique_id = record->unique_id;
879#if DEBUG_DHT 835#if DEBUG_DHT
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881 "Queueing reply to query %s for client %p\n", 837 "Queueing reply to query %s for client %p\n", GNUNET_h2s (key),
882 GNUNET_h2s (key), 838 record->client->client_handle);
883 record->client->client_handle);
884#endif 839#endif
885 add_pending_message (record->client, pm); 840 add_pending_message (record->client, pm);
886 if (GNUNET_YES == do_free) 841 if (GNUNET_YES == do_free)
@@ -906,14 +861,13 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
906 */ 861 */
907void 862void
908GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, 863GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
909 const GNUNET_HashCode *key, 864 const GNUNET_HashCode * key,
910 unsigned int get_path_length, 865 unsigned int get_path_length,
911 const struct GNUNET_PeerIdentity *get_path, 866 const struct GNUNET_PeerIdentity *get_path,
912 unsigned int put_path_length, 867 unsigned int put_path_length,
913 const struct GNUNET_PeerIdentity *put_path, 868 const struct GNUNET_PeerIdentity *put_path,
914 enum GNUNET_BLOCK_Type type, 869 enum GNUNET_BLOCK_Type type, size_t data_size,
915 size_t data_size, 870 const void *data)
916 const void *data)
917{ 871{
918 struct ForwardReplyContext frc; 872 struct ForwardReplyContext frc;
919 struct PendingMessage *pm; 873 struct PendingMessage *pm;
@@ -921,57 +875,57 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
921 struct GNUNET_PeerIdentity *paths; 875 struct GNUNET_PeerIdentity *paths;
922 size_t msize; 876 size_t msize;
923 877
924 if (NULL == 878 if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key))
925 GNUNET_CONTAINER_multihashmap_get (forward_map, key))
926 { 879 {
927 GNUNET_STATISTICS_update (GDS_stats, 880 GNUNET_STATISTICS_update (GDS_stats,
928 gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1, 881 gettext_noop
929 GNUNET_NO); 882 ("# REPLIES ignored for CLIENTS (no match)"), 1,
930 return; /* no matching request, fast exit! */ 883 GNUNET_NO);
884 return; /* no matching request, fast exit! */
931 } 885 }
932 msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 886 msize =
933 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); 887 sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
888 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
934 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 889 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
935 { 890 {
936 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 891 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
937 _("Could not pass reply to client, message too big!\n")); 892 _("Could not pass reply to client, message too big!\n"));
938 return; 893 return;
939 } 894 }
940 pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage)); 895 pm = (struct PendingMessage *) GNUNET_malloc (msize +
941 reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1]; 896 sizeof (struct PendingMessage));
897 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
942 pm->msg = &reply->header; 898 pm->msg = &reply->header;
943 reply->header.size = htons ((uint16_t) msize); 899 reply->header.size = htons ((uint16_t) msize);
944 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); 900 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
945 reply->type = htonl (type); 901 reply->type = htonl (type);
946 reply->get_path_length = htonl (get_path_length); 902 reply->get_path_length = htonl (get_path_length);
947 reply->put_path_length = htonl (put_path_length); 903 reply->put_path_length = htonl (put_path_length);
948 reply->unique_id = 0; /* filled in later */ 904 reply->unique_id = 0; /* filled in later */
949 reply->expiration = GNUNET_TIME_absolute_hton (expiration); 905 reply->expiration = GNUNET_TIME_absolute_hton (expiration);
950 reply->key = *key; 906 reply->key = *key;
951 paths = (struct GNUNET_PeerIdentity*) &reply[1]; 907 paths = (struct GNUNET_PeerIdentity *) &reply[1];
952 memcpy (paths, put_path, 908 memcpy (paths, put_path,
953 sizeof (struct GNUNET_PeerIdentity) * put_path_length); 909 sizeof (struct GNUNET_PeerIdentity) * put_path_length);
954 memcpy (&paths[put_path_length], 910 memcpy (&paths[put_path_length], get_path,
955 get_path, sizeof (struct GNUNET_PeerIdentity) * get_path_length); 911 sizeof (struct GNUNET_PeerIdentity) * get_path_length);
956 memcpy (&paths[get_path_length + put_path_length], 912 memcpy (&paths[get_path_length + put_path_length], data, data_size);
957 data,
958 data_size);
959 frc.do_copy = GNUNET_NO; 913 frc.do_copy = GNUNET_NO;
960 frc.pm = pm; 914 frc.pm = pm;
961 frc.data = data; 915 frc.data = data;
962 frc.data_size = data_size; 916 frc.data_size = data_size;
963 frc.type = type; 917 frc.type = type;
964 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, 918 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, &forward_reply,
965 &forward_reply, 919 &frc);
966 &frc);
967 if (GNUNET_NO == frc.do_copy) 920 if (GNUNET_NO == frc.do_copy)
968 { 921 {
969 /* did not match any of the requests, free! */ 922 /* did not match any of the requests, free! */
970 GNUNET_STATISTICS_update (GDS_stats, 923 GNUNET_STATISTICS_update (GDS_stats,
971 gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1, 924 gettext_noop
972 GNUNET_NO); 925 ("# REPLIES ignored for CLIENTS (no match)"), 1,
973 GNUNET_free (pm); 926 GNUNET_NO);
974 } 927 GNUNET_free (pm);
928 }
975} 929}
976 930
977 931
@@ -980,17 +934,17 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
980 * 934 *
981 * @param server the initialized server 935 * @param server the initialized server
982 */ 936 */
983void 937void
984GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) 938GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
985{ 939{
986 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { 940 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
987 {&handle_dht_local_put, NULL, 941 {&handle_dht_local_put, NULL,
988 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, 942 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
989 {&handle_dht_local_get, NULL, 943 {&handle_dht_local_get, NULL,
990 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, 944 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
991 {&handle_dht_local_get_stop, NULL, 945 {&handle_dht_local_get_stop, NULL,
992 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 946 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
993 sizeof (struct GNUNET_DHT_ClientGetStopMessage) }, 947 sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
994 {NULL, NULL, 0, 0} 948 {NULL, NULL, 0, 0}
995 }; 949 };
996 forward_map = GNUNET_CONTAINER_multihashmap_create (1024); 950 forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
@@ -1009,10 +963,10 @@ GDS_CLIENTS_done ()
1009 GNUNET_assert (client_head == NULL); 963 GNUNET_assert (client_head == NULL);
1010 GNUNET_assert (client_tail == NULL); 964 GNUNET_assert (client_tail == NULL);
1011 if (GNUNET_SCHEDULER_NO_TASK != retry_task) 965 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
1012 { 966 {
1013 GNUNET_SCHEDULER_cancel (retry_task); 967 GNUNET_SCHEDULER_cancel (retry_task);
1014 retry_task = GNUNET_SCHEDULER_NO_TASK; 968 retry_task = GNUNET_SCHEDULER_NO_TASK;
1015 } 969 }
1016 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); 970 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1017 GNUNET_CONTAINER_heap_destroy (retry_heap); 971 GNUNET_CONTAINER_heap_destroy (retry_heap);
1018 retry_heap = NULL; 972 retry_heap = NULL;
@@ -1022,4 +976,3 @@ GDS_CLIENTS_done ()
1022} 976}
1023 977
1024/* end of gnunet-service-dht_clients.c */ 978/* end of gnunet-service-dht_clients.c */
1025