aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-01-02 14:53:35 +0100
committerChristian Grothoff <christian@grothoff.org>2022-01-02 14:53:43 +0100
commit329f0458fa1fce45ce1c31e86771ffefb22e941e (patch)
tree19e39124cf5c817cd32df308f23d4ea26ef5dbaa /src/dht/gnunet-service-dht_clients.c
parentbb686c48354853aa725e493e85edce0602ed85e2 (diff)
downloadgnunet-329f0458fa1fce45ce1c31e86771ffefb22e941e.tar.gz
gnunet-329f0458fa1fce45ce1c31e86771ffefb22e941e.zip
clean up am_closest_peer and other functions
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c469
1 files changed, 240 insertions, 229 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index feccde8a8..aa41f519c 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V. 3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4 4
5 GNUnet is free software: you can redistribute it and/or modify it 5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published 6 under the terms of the GNU Affero General Public License as published
@@ -83,7 +83,7 @@ struct ClientQueryRecord
83 const void *xquery; 83 const void *xquery;
84 84
85 /** 85 /**
86 * Replies we have already seen for this request. 86 * Array of (hashes of) replies we have already seen for this request.
87 */ 87 */
88 struct GNUNET_HashCode *seen_replies; 88 struct GNUNET_HashCode *seen_replies;
89 89
@@ -114,7 +114,7 @@ struct ClientQueryRecord
114 size_t xquery_size; 114 size_t xquery_size;
115 115
116 /** 116 /**
117 * Number of entries in 'seen_replies'. 117 * Number of entries in @e seen_replies.
118 */ 118 */
119 unsigned int seen_replies_count; 119 unsigned int seen_replies_count;
120 120
@@ -125,10 +125,8 @@ struct ClientQueryRecord
125 125
126 /** 126 /**
127 * Any message options for this request 127 * Any message options for this request
128 *
129 * FIXME: why uint32_t instead of enum?
130 */ 128 */
131 uint32_t msg_options; 129 enum GNUNET_DHT_RouteOption msg_options;
132 130
133 /** 131 /**
134 * The type for the data for the GET request. 132 * The type for the data for the GET request.
@@ -153,14 +151,19 @@ struct ClientMonitorRecord
153 struct ClientMonitorRecord *prev; 151 struct ClientMonitorRecord *prev;
154 152
155 /** 153 /**
156 * Type of blocks that are of interest 154 * Client to notify of these requests.
157 */ 155 */
158 enum GNUNET_BLOCK_Type type; 156 struct ClientHandle *ch;
157
158 /**
159 * Key of data of interest. All bits zero for 'all'.
160 */
161 struct GNUNET_HashCode key;
159 162
160 /** 163 /**
161 * Key of data of interest, NULL for all. 164 * Type of blocks that are of interest
162 */ 165 */
163 struct GNUNET_HashCode *key; 166 enum GNUNET_BLOCK_Type type;
164 167
165 /** 168 /**
166 * Flag whether to notify about GET messages. 169 * Flag whether to notify about GET messages.
@@ -177,10 +180,6 @@ struct ClientMonitorRecord
177 */ 180 */
178 uint16_t put; 181 uint16_t put;
179 182
180 /**
181 * Client to notify of these requests.
182 */
183 struct ClientHandle *ch;
184}; 183};
185 184
186 185
@@ -212,6 +211,7 @@ struct ClientHandle
212 struct GNUNET_MQ_Handle *mq; 211 struct GNUNET_MQ_Handle *mq;
213}; 212};
214 213
214
215/** 215/**
216 * Our handle to the BLOCK library. 216 * Our handle to the BLOCK library.
217 */ 217 */
@@ -264,7 +264,7 @@ static struct GNUNET_SCHEDULER_Task *retry_task;
264 * @param record record to remove 264 * @param record record to remove
265 */ 265 */
266static void 266static void
267remove_client_record (struct ClientQueryRecord *record) 267remove_client_query_record (struct ClientQueryRecord *record)
268{ 268{
269 struct ClientHandle *ch = record->ch; 269 struct ClientHandle *ch = record->ch;
270 270
@@ -300,6 +300,7 @@ client_connect_cb (void *cls,
300{ 300{
301 struct ClientHandle *ch; 301 struct ClientHandle *ch;
302 302
303 (void) cls;
303 ch = GNUNET_new (struct ClientHandle); 304 ch = GNUNET_new (struct ClientHandle);
304 ch->client = client; 305 ch->client = client;
305 ch->mq = mq; 306 ch->mq = mq;
@@ -321,34 +322,35 @@ client_disconnect_cb (void *cls,
321 void *app_ctx) 322 void *app_ctx)
322{ 323{
323 struct ClientHandle *ch = app_ctx; 324 struct ClientHandle *ch = app_ctx;
324 struct ClientQueryRecord *cqr;
325 struct ClientMonitorRecord *monitor;
326 325
326 (void) cls;
327 (void) client;
327 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328 "Local client %p disconnects\n", 329 "Local client %p disconnects\n",
329 ch); 330 ch);
330 monitor = monitor_head;
331 while (NULL != monitor)
332 { 331 {
333 if (monitor->ch == ch) 332 struct ClientMonitorRecord *next;
334 {
335 struct ClientMonitorRecord *next;
336 333
334 for (struct ClientMonitorRecord *monitor = monitor_head;
335 NULL != monitor;
336 monitor = next)
337 {
337 next = monitor->next; 338 next = monitor->next;
338 GNUNET_free (monitor->key); 339 if (monitor->ch != ch)
340 continue;
339 GNUNET_CONTAINER_DLL_remove (monitor_head, 341 GNUNET_CONTAINER_DLL_remove (monitor_head,
340 monitor_tail, 342 monitor_tail,
341 monitor); 343 monitor);
342 GNUNET_free (monitor); 344 GNUNET_free (monitor);
343 monitor = next;
344 }
345 else
346 {
347 monitor = monitor->next;
348 } 345 }
349 } 346 }
350 while (NULL != (cqr = ch->cqr_head)) 347
351 remove_client_record (cqr); 348 {
349 struct ClientQueryRecord *cqr;
350
351 while (NULL != (cqr = ch->cqr_head))
352 remove_client_query_record (cqr);
353 }
352 GNUNET_free (ch); 354 GNUNET_free (ch);
353} 355}
354 356
@@ -357,6 +359,8 @@ client_disconnect_cb (void *cls,
357 * Route the given request via the DHT. This includes updating 359 * Route the given request via the DHT. This includes updating
358 * the bloom filter and retransmission times, building the P2P 360 * the bloom filter and retransmission times, building the P2P
359 * message and initiating the routing operation. 361 * message and initiating the routing operation.
362 *
363 * @param cqr request to transmit
360 */ 364 */
361static void 365static void
362transmit_request (struct ClientQueryRecord *cqr) 366transmit_request (struct ClientQueryRecord *cqr)
@@ -365,8 +369,7 @@ transmit_request (struct ClientQueryRecord *cqr)
365 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 369 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
366 370
367 GNUNET_STATISTICS_update (GDS_stats, 371 GNUNET_STATISTICS_update (GDS_stats,
368 gettext_noop ( 372 "# GET requests from clients injected",
369 "# GET requests from clients injected"),
370 1, 373 1,
371 GNUNET_NO); 374 GNUNET_NO);
372 bg = GNUNET_BLOCK_group_create (GDS_block_context, 375 bg = GNUNET_BLOCK_group_create (GDS_block_context,
@@ -374,8 +377,8 @@ transmit_request (struct ClientQueryRecord *cqr)
374 GNUNET_CRYPTO_random_u32 ( 377 GNUNET_CRYPTO_random_u32 (
375 GNUNET_CRYPTO_QUALITY_WEAK, 378 GNUNET_CRYPTO_QUALITY_WEAK,
376 UINT32_MAX), 379 UINT32_MAX),
377 NULL, 380 NULL, /* raw data */
378 0, 381 0, /* raw data size */
379 "seen-set-size", 382 "seen-set-size",
380 cqr->seen_replies_count, 383 cqr->seen_replies_count,
381 NULL); 384 NULL);
@@ -403,8 +406,8 @@ transmit_request (struct ClientQueryRecord *cqr)
403 GNUNET_BLOCK_group_destroy (bg); 406 GNUNET_BLOCK_group_destroy (bg);
404 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 407 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
405 408
406 /* exponential back-off for retries. 409 /* Exponential back-off for retries.
407 * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */ 410 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
408 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency); 411 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
409 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency); 412 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
410} 413}
@@ -421,14 +424,12 @@ static void
421transmit_next_request_task (void *cls) 424transmit_next_request_task (void *cls)
422{ 425{
423 struct ClientQueryRecord *cqr; 426 struct ClientQueryRecord *cqr;
424 struct GNUNET_TIME_Relative delay;
425 427
426 retry_task = NULL; 428 retry_task = NULL;
427 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) 429 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
428 { 430 {
429 cqr->hnode = NULL; 431 cqr->hnode = NULL;
430 delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time); 432 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
431 if (delay.rel_value_us > 0)
432 { 433 {
433 cqr->hnode 434 cqr->hnode
434 = GNUNET_CONTAINER_heap_insert (retry_heap, 435 = GNUNET_CONTAINER_heap_insert (retry_heap,
@@ -456,11 +457,19 @@ transmit_next_request_task (void *cls)
456 * @param dht_msg the actual message received 457 * @param dht_msg the actual message received
457 * @return #GNUNET_OK (always) 458 * @return #GNUNET_OK (always)
458 */ 459 */
459static int 460static enum GNUNET_GenericReturnValue
460check_dht_local_put (void *cls, 461check_dht_local_put (void *cls,
461 const struct GNUNET_DHT_ClientPutMessage *dht_msg) 462 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
462{ 463{
463 /* always well-formed */ 464 uint32_t replication_level;
465
466 (void) cls;
467 replication_level = ntohl (dht_msg->desired_replication_level);
468 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
469 {
470 GNUNET_break_op (0);
471 return GNUNET_SYSERR;
472 }
464 return GNUNET_OK; 473 return GNUNET_OK;
465} 474}
466 475
@@ -476,89 +485,91 @@ handle_dht_local_put (void *cls,
476 const struct GNUNET_DHT_ClientPutMessage *dht_msg) 485 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
477{ 486{
478 struct ClientHandle *ch = cls; 487 struct ClientHandle *ch = cls;
479 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 488 uint16_t size = ntohs (dht_msg->header.size);
480 uint16_t size; 489 uint32_t type = ntohl (dht_msg->type);
490 struct GNUNET_TIME_Absolute expiration
491 = GNUNET_TIME_absolute_ntoh (dht_msg->expiration);
492 enum GNUNET_DHT_RouteOption options
493 = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options);
494 uint32_t replication_level
495 = ntohl (dht_msg->desired_replication_level);
481 496
482 size = ntohs (dht_msg->header.size); 497 LOG (GNUNET_ERROR_TYPE_DEBUG,
498 "Handling local PUT of %lu-bytes for query %s of type %u\n",
499 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
500 GNUNET_h2s (&dht_msg->key),
501 (unsigned int) type);
483 GNUNET_STATISTICS_update (GDS_stats, 502 GNUNET_STATISTICS_update (GDS_stats,
484 gettext_noop ( 503 "# PUT requests received from clients",
485 "# PUT requests received from clients"),
486 1, 504 1,
487 GNUNET_NO); 505 GNUNET_NO);
488 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 506 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
489 "CLIENT-PUT %s\n", 507 "CLIENT-PUT %s\n",
490 GNUNET_h2s_full (&dht_msg->key)); 508 GNUNET_h2s_full (&dht_msg->key));
491 /* give to local clients */ 509 /* give to local clients */
492 LOG (GNUNET_ERROR_TYPE_DEBUG, 510 GDS_CLIENTS_handle_reply (expiration,
493 "Handling local PUT of %lu-bytes for query %s\n",
494 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
495 GNUNET_h2s (&dht_msg->key));
496 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
497 &dht_msg->key,
498 &dht_msg->key, 511 &dht_msg->key,
499 0,
500 NULL,
501 0,
502 NULL,
503 ntohl (dht_msg->type),
504 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
505 &dht_msg[1]);
506 /* store locally */
507 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
508 &dht_msg->key, 512 &dht_msg->key,
509 0, 513 0, NULL, /* get path */
510 NULL, 514 0, NULL, /* put path */
511 ntohl (dht_msg->type), 515 type,
512 size - sizeof(struct GNUNET_DHT_ClientPutMessage), 516 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
513 &dht_msg[1]); 517 &dht_msg[1]);
514 /* route to other peers */ 518 {
515 peer_bf 519 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
516 = GNUNET_CONTAINER_bloomfilter_init (NULL, 520
517 DHT_BLOOM_SIZE, 521 peer_bf
518 GNUNET_CONSTANTS_BLOOMFILTER_K); 522 = GNUNET_CONTAINER_bloomfilter_init (NULL,
519 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), 523 DHT_BLOOM_SIZE,
520 ntohl (dht_msg->options), 524 GNUNET_CONSTANTS_BLOOMFILTER_K);
521 ntohl (dht_msg->desired_replication_level), 525 /* store locally */
522 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 526 if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
523 0 /* hop count */, 527 (GDS_am_closest_peer (&dht_msg->key,
524 peer_bf, 528 peer_bf)))
525 &dht_msg->key, 529 GDS_DATACACHE_handle_put (
526 0, 530 expiration,
527 NULL, 531 &dht_msg->key,
528 &dht_msg[1], 532 0, NULL, /* put path */
529 size - sizeof(struct GNUNET_DHT_ClientPutMessage)); 533 type,
530 GDS_CLIENTS_process_put (ntohl (dht_msg->options), 534 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
531 ntohl (dht_msg->type), 535 &dht_msg[1]);
532 0, 536 /* route to other peers */
533 ntohl (dht_msg->desired_replication_level), 537 if (GNUNET_OK !=
534 1, 538 GDS_NEIGHBOURS_handle_put (
535 GDS_NEIGHBOURS_get_id (), 539 type,
536 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 540 options,
537 &dht_msg->key, 541 replication_level,
538 &dht_msg[1], 542 expiration,
539 size - sizeof(struct GNUNET_DHT_ClientPutMessage)); 543 0 /* hop count */,
540 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 544 peer_bf,
545 &dht_msg->key,
546 0, NULL, /* put path */
547 &dht_msg[1],
548 size - sizeof(struct GNUNET_DHT_ClientPutMessage)))
549 {
550 GNUNET_STATISTICS_update (GDS_stats,
551 "# Local PUT requests not routed",
552 1,
553 GNUNET_NO);
554 }
555 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
556 }
557 GDS_CLIENTS_process_put (
558 options,
559 type,
560 0, /* hop count */
561 replication_level,
562 1, /* path length */
563 GDS_NEIGHBOURS_get_id (),
564 expiration,
565 &dht_msg->key,
566 &dht_msg[1],
567 size - sizeof(struct GNUNET_DHT_ClientPutMessage));
541 GNUNET_SERVICE_client_continue (ch->client); 568 GNUNET_SERVICE_client_continue (ch->client);
542} 569}
543 570
544 571
545/** 572/**
546 * Check DHT GET messages from the client.
547 *
548 * @param cls the client we received this message from
549 * @param message the actual message received
550 * @return #GNUNET_OK (always)
551 */
552static int
553check_dht_local_get (void *cls,
554 const struct GNUNET_DHT_ClientGetMessage *get)
555{
556 /* always well-formed */
557 return GNUNET_OK;
558}
559
560
561/**
562 * Handle a result from local datacache for a GET operation. 573 * Handle a result from local datacache for a GET operation.
563 * 574 *
564 * @param cls the `struct ClientHandle` of the client doing the query 575 * @param cls the `struct ClientHandle` of the client doing the query
@@ -584,7 +595,7 @@ handle_local_result (void *cls,
584 const void *data, 595 const void *data,
585 size_t data_size) 596 size_t data_size)
586{ 597{
587 // FIXME: this needs some clean up: inline the function, 598 // FIXME: this may deserve some clean up: inline the function,
588 // possibly avoid even looking up the client! 599 // possibly avoid even looking up the client!
589 GDS_CLIENTS_handle_reply (expiration_time, 600 GDS_CLIENTS_handle_reply (expiration_time,
590 key, 601 key,
@@ -597,6 +608,24 @@ handle_local_result (void *cls,
597 608
598 609
599/** 610/**
611 * Check DHT GET messages from the client.
612 *
613 * @param cls the client we received this message from
614 * @param message the actual message received
615 * @return #GNUNET_OK (always)
616 */
617static enum GNUNET_GenericReturnValue
618check_dht_local_get (void *cls,
619 const struct GNUNET_DHT_ClientGetMessage *get)
620{
621 (void) cls;
622 (void) get;
623 /* always well-formed */
624 return GNUNET_OK;
625}
626
627
628/**
600 * Handler for DHT GET messages from the client. 629 * Handler for DHT GET messages from the client.
601 * 630 *
602 * @param cls the client we received this message from 631 * @param cls the client we received this message from
@@ -608,23 +637,20 @@ handle_dht_local_get (void *cls,
608{ 637{
609 struct ClientHandle *ch = cls; 638 struct ClientHandle *ch = cls;
610 struct ClientQueryRecord *cqr; 639 struct ClientQueryRecord *cqr;
611 size_t xquery_size; 640 uint16_t size = ntohs (get->header.size);
612 const char *xquery; 641 const char *xquery = (const char *) &get[1];
613 uint16_t size; 642 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
614 643
615 size = ntohs (get->header.size);
616 xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
617 xquery = (const char *) &get[1];
618 GNUNET_STATISTICS_update (GDS_stats,
619 gettext_noop
620 ("# GET requests received from clients"), 1,
621 GNUNET_NO);
622 LOG (GNUNET_ERROR_TYPE_DEBUG, 644 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Received GET request for %s from local client %p, xq: %.*s\n", 645 "Received GET request for %s from local client %p, xq: %.*s\n",
624 GNUNET_h2s (&get->key), 646 GNUNET_h2s (&get->key),
625 ch->client, 647 ch->client,
626 (int) xquery_size, 648 (int) xquery_size,
627 xquery); 649 xquery);
650 GNUNET_STATISTICS_update (GDS_stats,
651 "# GET requests received from clients",
652 1,
653 GNUNET_NO);
628 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 654 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
629 "CLIENT-GET %s\n", 655 "CLIENT-GET %s\n",
630 GNUNET_h2s_full (&get->key)); 656 GNUNET_h2s_full (&get->key));
@@ -632,15 +658,19 @@ handle_dht_local_get (void *cls,
632 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size); 658 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
633 cqr->key = get->key; 659 cqr->key = get->key;
634 cqr->ch = ch; 660 cqr->ch = ch;
635 cqr->xquery = (void *) &cqr[1]; 661 cqr->xquery = (const void *) &cqr[1];
636 GNUNET_memcpy (&cqr[1], xquery, xquery_size); 662 GNUNET_memcpy (&cqr[1],
637 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 663 xquery,
664 xquery_size);
665 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
666 cqr,
667 0);
638 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS; 668 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
639 cqr->retry_time = GNUNET_TIME_absolute_get (); 669 cqr->retry_time = GNUNET_TIME_absolute_get ();
640 cqr->unique_id = get->unique_id; 670 cqr->unique_id = get->unique_id;
641 cqr->xquery_size = xquery_size; 671 cqr->xquery_size = xquery_size;
642 cqr->replication = ntohl (get->desired_replication_level); 672 cqr->replication = ntohl (get->desired_replication_level);
643 cqr->msg_options = ntohl (get->options); 673 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
644 cqr->type = ntohl (get->type); 674 cqr->type = ntohl (get->type);
645 GNUNET_CONTAINER_DLL_insert (ch->cqr_head, 675 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
646 ch->cqr_tail, 676 ch->cqr_tail,
@@ -649,11 +679,11 @@ handle_dht_local_get (void *cls,
649 &cqr->key, 679 &cqr->key,
650 cqr, 680 cqr,
651 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 681 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
652 GDS_CLIENTS_process_get (ntohl (get->options), 682 GDS_CLIENTS_process_get (cqr->msg_options,
653 ntohl (get->type), 683 cqr->type,
654 0, 684 0, /* hop count */
655 ntohl (get->desired_replication_level), 685 cqr->replication,
656 1, 686 1, /* path length */
657 GDS_NEIGHBOURS_get_id (), 687 GDS_NEIGHBOURS_get_id (),
658 &get->key); 688 &get->key);
659 /* start remote requests */ 689 /* start remote requests */
@@ -697,7 +727,7 @@ struct FindByUniqueIdContext
697 * @param value the `struct ClientQueryRecord` 727 * @param value the `struct ClientQueryRecord`
698 * @return #GNUNET_YES to continue iteration (result not yet found) 728 * @return #GNUNET_YES to continue iteration (result not yet found)
699 */ 729 */
700static int 730static enum GNUNET_GenericReturnValue
701find_by_unique_id (void *cls, 731find_by_unique_id (void *cls,
702 const struct GNUNET_HashCode *key, 732 const struct GNUNET_HashCode *key,
703 void *value) 733 void *value)
@@ -719,19 +749,17 @@ find_by_unique_id (void *cls,
719 * @param message the actual message received 749 * @param message the actual message received
720 * @return #GNUNET_OK if @a seen is well-formed 750 * @return #GNUNET_OK if @a seen is well-formed
721 */ 751 */
722static int 752static enum GNUNET_GenericReturnValue
723check_dht_local_get_result_seen (void *cls, 753check_dht_local_get_result_seen (
724 const struct 754 void *cls,
725 GNUNET_DHT_ClientGetResultSeenMessage *seen) 755 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
726{ 756{
727 uint16_t size; 757 uint16_t size = ntohs (seen->header.size);
728 unsigned int hash_count; 758 unsigned int hash_count =
729 759 (size - sizeof(*seen))
730 size = ntohs (seen->header.size); 760 / sizeof(struct GNUNET_HashCode);
731 hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)) 761
732 / sizeof(struct GNUNET_HashCode); 762 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
733 if (size != sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)
734 + hash_count * sizeof(struct GNUNET_HashCode))
735 { 763 {
736 GNUNET_break (0); 764 GNUNET_break (0);
737 return GNUNET_SYSERR; 765 return GNUNET_SYSERR;
@@ -747,24 +775,21 @@ check_dht_local_get_result_seen (void *cls,
747 * @param message the actual message received 775 * @param message the actual message received
748 */ 776 */
749static void 777static void
750handle_dht_local_get_result_seen (void *cls, 778handle_dht_local_get_result_seen (
751 const struct 779 void *cls,
752 GNUNET_DHT_ClientGetResultSeenMessage *seen) 780 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
753{ 781{
754 struct ClientHandle *ch = cls; 782 struct ClientHandle *ch = cls;
755 uint16_t size; 783 uint16_t size = ntohs (seen->header.size);
756 unsigned int hash_count; 784 unsigned int hash_count = (size - sizeof(*seen))
785 / sizeof(struct GNUNET_HashCode);
786 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
787 struct FindByUniqueIdContext fui_ctx = {
788 .unique_id = seen->unique_id
789 };
757 unsigned int old_count; 790 unsigned int old_count;
758 const struct GNUNET_HashCode *hc;
759 struct FindByUniqueIdContext fui_ctx;
760 struct ClientQueryRecord *cqr; 791 struct ClientQueryRecord *cqr;
761 792
762 size = ntohs (seen->header.size);
763 hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage))
764 / sizeof(struct GNUNET_HashCode);
765 hc = (const struct GNUNET_HashCode*) &seen[1];
766 fui_ctx.unique_id = seen->unique_id;
767 fui_ctx.cqr = NULL;
768 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, 793 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
769 &seen->key, 794 &seen->key,
770 &find_by_unique_id, 795 &find_by_unique_id,
@@ -812,7 +837,7 @@ struct RemoveByUniqueIdContext
812 * @param value value in the hash map, a ClientQueryRecord 837 * @param value value in the hash map, a ClientQueryRecord
813 * @return #GNUNET_YES (we should continue to iterate) 838 * @return #GNUNET_YES (we should continue to iterate)
814 */ 839 */
815static int 840static enum GNUNET_GenericReturnValue
816remove_by_unique_id (void *cls, 841remove_by_unique_id (void *cls,
817 const struct GNUNET_HashCode *key, 842 const struct GNUNET_HashCode *key,
818 void *value) 843 void *value)
@@ -826,7 +851,7 @@ remove_by_unique_id (void *cls,
826 "Removing client %p's record for key %s (by unique id)\n", 851 "Removing client %p's record for key %s (by unique id)\n",
827 ctx->ch->client, 852 ctx->ch->client,
828 GNUNET_h2s (key)); 853 GNUNET_h2s (key));
829 remove_client_record (cqr); 854 remove_client_query_record (cqr);
830 return GNUNET_YES; 855 return GNUNET_YES;
831} 856}
832 857
@@ -840,9 +865,9 @@ remove_by_unique_id (void *cls,
840 * 865 *
841 */ 866 */
842static void 867static void
843handle_dht_local_get_stop (void *cls, 868handle_dht_local_get_stop (
844 const struct 869 void *cls,
845 GNUNET_DHT_ClientGetStopMessage *dht_stop_msg) 870 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
846{ 871{
847 struct ClientHandle *ch = cls; 872 struct ClientHandle *ch = cls;
848 struct RemoveByUniqueIdContext ctx; 873 struct RemoveByUniqueIdContext ctx;
@@ -885,17 +910,8 @@ handle_dht_local_monitor (void *cls,
885 r->get = ntohs (msg->get); 910 r->get = ntohs (msg->get);
886 r->get_resp = ntohs (msg->get_resp); 911 r->get_resp = ntohs (msg->get_resp);
887 r->put = ntohs (msg->put); 912 r->put = ntohs (msg->put);
888 if (0 == ntohs (msg->filter_key)) 913 if (0 != ntohs (msg->filter_key))
889 { 914 r->key = msg->key;
890 r->key = NULL;
891 }
892 else
893 {
894 r->key = GNUNET_new (struct GNUNET_HashCode);
895 GNUNET_memcpy (r->key,
896 &msg->key,
897 sizeof(struct GNUNET_HashCode));
898 }
899 GNUNET_CONTAINER_DLL_insert (monitor_head, 915 GNUNET_CONTAINER_DLL_insert (monitor_head,
900 monitor_tail, 916 monitor_tail,
901 r); 917 r);
@@ -910,39 +926,35 @@ handle_dht_local_monitor (void *cls,
910 * @param msg the actual message received 926 * @param msg the actual message received
911 */ 927 */
912static void 928static void
913handle_dht_local_monitor_stop (void *cls, 929handle_dht_local_monitor_stop (
914 const struct 930 void *cls,
915 GNUNET_DHT_MonitorStartStopMessage *msg) 931 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
916{ 932{
917 struct ClientHandle *ch = cls; 933 struct ClientHandle *ch = cls;
918 struct ClientMonitorRecord *r;
919 int keys_match;
920 934
921 GNUNET_SERVICE_client_continue (ch->client); 935 GNUNET_SERVICE_client_continue (ch->client);
922 for (r = monitor_head; NULL != r; r = r->next) 936 for (struct ClientMonitorRecord *r = monitor_head;
937 NULL != r;
938 r = r->next)
923 { 939 {
924 if (NULL == r->key) 940 bool keys_match;
925 { 941
926 keys_match = (0 == ntohs (msg->filter_key)); 942 keys_match =
927 } 943 (GNUNET_is_zero (&r->key))
928 else 944 ? (0 == ntohs (msg->filter_key))
929 { 945 : ( (0 != ntohs (msg->filter_key)) &&
930 keys_match = ((0 != ntohs (msg->filter_key)) && 946 (! GNUNET_memcmp (&r->key,
931 (! memcmp (r->key, 947 &msg->key)) );
932 &msg->key, 948 if ( (ch == r->ch) &&
933 sizeof(struct GNUNET_HashCode)))); 949 (ntohl (msg->type) == r->type) &&
934 } 950 (r->get == msg->get) &&
935 if ((ch == r->ch) && 951 (r->get_resp == msg->get_resp) &&
936 (ntohl (msg->type) == r->type) && 952 (r->put == msg->put) &&
937 (r->get == msg->get) && 953 keys_match)
938 (r->get_resp == msg->get_resp) &&
939 (r->put == msg->put) &&
940 keys_match)
941 { 954 {
942 GNUNET_CONTAINER_DLL_remove (monitor_head, 955 GNUNET_CONTAINER_DLL_remove (monitor_head,
943 monitor_tail, 956 monitor_tail,
944 r); 957 r);
945 GNUNET_free (r->key);
946 GNUNET_free (r); 958 GNUNET_free (r);
947 return; /* Delete only ONE entry */ 959 return; /* Delete only ONE entry */
948 } 960 }
@@ -1140,7 +1152,7 @@ forward_reply (void *cls,
1140 GNUNET_MQ_send (record->ch->mq, 1152 GNUNET_MQ_send (record->ch->mq,
1141 env); 1153 env);
1142 if (GNUNET_YES == do_free) 1154 if (GNUNET_YES == do_free)
1143 remove_client_record (record); 1155 remove_client_query_record (record);
1144 return GNUNET_YES; 1156 return GNUNET_YES;
1145} 1157}
1146 1158
@@ -1215,7 +1227,7 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1215 1227
1216/** 1228/**
1217 * Check if some client is monitoring GET messages and notify 1229 * Check if some client is monitoring GET messages and notify
1218 * them in that case. 1230 * them in that case. If tracked, @a path should include the local peer.
1219 * 1231 *
1220 * @param options Options, for instance RecordRoute, DemultiplexEverywhere. 1232 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1221 * @param type The type of data in the request. 1233 * @param type The type of data in the request.
@@ -1234,20 +1246,19 @@ GDS_CLIENTS_process_get (uint32_t options,
1234 const struct GNUNET_PeerIdentity *path, 1246 const struct GNUNET_PeerIdentity *path,
1235 const struct GNUNET_HashCode *key) 1247 const struct GNUNET_HashCode *key)
1236{ 1248{
1237 struct ClientMonitorRecord *m; 1249 struct ClientHandle **cl = NULL;
1238 struct ClientHandle **cl; 1250 unsigned int cl_size = 0;
1239 unsigned int cl_size;
1240 1251
1241 cl = NULL; 1252 for (struct ClientMonitorRecord *m = monitor_head;
1242 cl_size = 0; 1253 NULL != m;
1243 for (m = monitor_head; NULL != m; m = m->next) 1254 m = m->next)
1244 { 1255 {
1245 if (((GNUNET_BLOCK_TYPE_ANY == m->type) || 1256 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1246 (m->type == type)) && 1257 (m->type == type)) &&
1247 ((NULL == m->key) || 1258 ( (GNUNET_is_zero (&m->key)) ||
1248 (0 == memcmp (key, 1259 (0 ==
1249 m->key, 1260 GNUNET_memcmp (key,
1250 sizeof(struct GNUNET_HashCode))))) 1261 &m->key))))
1251 { 1262 {
1252 struct GNUNET_MQ_Envelope *env; 1263 struct GNUNET_MQ_Envelope *env;
1253 struct GNUNET_DHT_MonitorGetMessage *mmsg; 1264 struct GNUNET_DHT_MonitorGetMessage *mmsg;
@@ -1264,7 +1275,6 @@ GDS_CLIENTS_process_get (uint32_t options,
1264 GNUNET_array_append (cl, 1275 GNUNET_array_append (cl,
1265 cl_size, 1276 cl_size,
1266 m->ch); 1277 m->ch);
1267
1268 msize = path_length * sizeof(struct GNUNET_PeerIdentity); 1278 msize = path_length * sizeof(struct GNUNET_PeerIdentity);
1269 env = GNUNET_MQ_msg_extra (mmsg, 1279 env = GNUNET_MQ_msg_extra (mmsg,
1270 msize, 1280 msize,
@@ -1298,17 +1308,18 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1298 const void *data, 1308 const void *data,
1299 size_t size) 1309 size_t size)
1300{ 1310{
1301 struct ClientMonitorRecord *m; 1311 struct ClientHandle **cl = NULL;
1302 struct ClientHandle **cl; 1312 unsigned int cl_size = 0;
1303 unsigned int cl_size;
1304 1313
1305 cl = NULL; 1314 for (struct ClientMonitorRecord *m = monitor_head;
1306 cl_size = 0; 1315 NULL != m;
1307 for (m = monitor_head; NULL != m; m = m->next) 1316 m = m->next)
1308 { 1317 {
1309 if (((GNUNET_BLOCK_TYPE_ANY == m->type) || (m->type == type) ) && 1318 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1310 ((NULL == m->key) || 1319 (m->type == type) ) &&
1311 (memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0) )) 1320 ( (GNUNET_is_zero (&m->key)) ||
1321 (0 == GNUNET_memcmp (key,
1322 &m->key)) ) )
1312 { 1323 {
1313 struct GNUNET_MQ_Envelope *env; 1324 struct GNUNET_MQ_Envelope *env;
1314 struct GNUNET_DHT_MonitorGetRespMessage *mmsg; 1325 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
@@ -1325,7 +1336,6 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1325 GNUNET_array_append (cl, 1336 GNUNET_array_append (cl,
1326 cl_size, 1337 cl_size,
1327 m->ch); 1338 m->ch);
1328
1329 msize = size; 1339 msize = size;
1330 msize += (get_path_length + put_path_length) 1340 msize += (get_path_length + put_path_length)
1331 * sizeof(struct GNUNET_PeerIdentity); 1341 * sizeof(struct GNUNET_PeerIdentity);
@@ -1357,7 +1367,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1357 1367
1358/** 1368/**
1359 * Check if some client is monitoring PUT messages and notify 1369 * Check if some client is monitoring PUT messages and notify
1360 * them in that case. 1370 * them in that case. The @a path should include our own peer ID.
1361 * 1371 *
1362 * @param options Options, for instance RecordRoute, DemultiplexEverywhere. 1372 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1363 * @param type The type of data in the request. 1373 * @param type The type of data in the request.
@@ -1382,17 +1392,19 @@ GDS_CLIENTS_process_put (uint32_t options,
1382 const void *data, 1392 const void *data,
1383 size_t size) 1393 size_t size)
1384{ 1394{
1385 struct ClientMonitorRecord *m; 1395 struct ClientHandle **cl = NULL;
1386 struct ClientHandle **cl; 1396 unsigned int cl_size = 0;
1387 unsigned int cl_size;
1388 1397
1389 cl = NULL; 1398 for (struct ClientMonitorRecord *m = monitor_head;
1390 cl_size = 0; 1399 NULL != m;
1391 for (m = monitor_head; NULL != m; m = m->next) 1400 m = m->next)
1392 { 1401 {
1393 if (((GNUNET_BLOCK_TYPE_ANY == m->type) || (m->type == type) ) && 1402 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1394 ((NULL == m->key) || 1403 (m->type == type) ) &&
1395 (memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0) )) 1404 ( (GNUNET_is_zero (&m->key)) ||
1405 (0 ==
1406 GNUNET_memcmp (key,
1407 &m->key)) ) )
1396 { 1408 {
1397 struct GNUNET_MQ_Envelope *env; 1409 struct GNUNET_MQ_Envelope *env;
1398 struct GNUNET_DHT_MonitorPutMessage *mmsg; 1410 struct GNUNET_DHT_MonitorPutMessage *mmsg;
@@ -1409,7 +1421,6 @@ GDS_CLIENTS_process_put (uint32_t options,
1409 GNUNET_array_append (cl, 1421 GNUNET_array_append (cl,
1410 cl_size, 1422 cl_size,
1411 m->ch); 1423 m->ch);
1412
1413 msize = size; 1424 msize = size;
1414 msize += path_length * sizeof(struct GNUNET_PeerIdentity); 1425 msize += path_length * sizeof(struct GNUNET_PeerIdentity);
1415 env = GNUNET_MQ_msg_extra (mmsg, 1426 env = GNUNET_MQ_msg_extra (mmsg,