diff options
author | Christian Grothoff <christian@grothoff.org> | 2012-11-10 21:21:01 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2012-11-10 21:21:01 +0000 |
commit | 3b2b7374d2d31f7b2638c82711f40f7113e9f7f0 (patch) | |
tree | 9dbdac6a4cabe0367bffd864c69c0ac8358a2c45 /src/dht | |
parent | 5e756a09598021c6bb22cdbe1b0ac1011679ec0a (diff) | |
download | gnunet-3b2b7374d2d31f7b2638c82711f40f7113e9f7f0.tar.gz gnunet-3b2b7374d2d31f7b2638c82711f40f7113e9f7f0.zip |
-implementing #2435
Diffstat (limited to 'src/dht')
-rw-r--r-- | src/dht/dht.h | 35 | ||||
-rw-r--r-- | src/dht/dht_api.c | 131 | ||||
-rw-r--r-- | src/dht/gnunet-service-dht_clients.c | 103 |
3 files changed, 259 insertions, 10 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h index 772471a7c..f736c8d75 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h | |||
@@ -109,6 +109,39 @@ struct GNUNET_DHT_ClientGetMessage | |||
109 | 109 | ||
110 | 110 | ||
111 | /** | 111 | /** |
112 | * DHT GET RESULTS KNOWN message sent from clients to service. Indicates that a GET | ||
113 | * request should exclude certain results which are already known. | ||
114 | */ | ||
115 | struct GNUNET_DHT_ClientGetResultSeenMessage | ||
116 | { | ||
117 | /** | ||
118 | * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN | ||
119 | */ | ||
120 | struct GNUNET_MessageHeader header; | ||
121 | |||
122 | /** | ||
123 | * Reserved, always 0. | ||
124 | */ | ||
125 | uint32_t reserved GNUNET_PACKED; | ||
126 | |||
127 | /** | ||
128 | * The key we are searching for (to make it easy to find the corresponding | ||
129 | * GET inside the service). | ||
130 | */ | ||
131 | struct GNUNET_HashCode key; | ||
132 | |||
133 | /** | ||
134 | * Unique ID identifying this request. | ||
135 | */ | ||
136 | uint64_t unique_id GNUNET_PACKED; | ||
137 | |||
138 | /* Followed by an array of the hash codes of known results */ | ||
139 | |||
140 | }; | ||
141 | |||
142 | |||
143 | |||
144 | /** | ||
112 | * Reply to a GET send from the service to a client. | 145 | * Reply to a GET send from the service to a client. |
113 | */ | 146 | */ |
114 | struct GNUNET_DHT_ClientResultMessage | 147 | struct GNUNET_DHT_ClientResultMessage |
@@ -325,7 +358,7 @@ struct GNUNET_DHT_MonitorStartStopMessage | |||
325 | struct GNUNET_DHT_MonitorGetMessage | 358 | struct GNUNET_DHT_MonitorGetMessage |
326 | { | 359 | { |
327 | /** | 360 | /** |
328 | * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT | 361 | * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET |
329 | */ | 362 | */ |
330 | struct GNUNET_MessageHeader header; | 363 | struct GNUNET_MessageHeader header; |
331 | 364 | ||
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 7964ba98f..f46900778 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c | |||
@@ -173,15 +173,42 @@ struct GNUNET_DHT_GetHandle | |||
173 | struct PendingMessage *message; | 173 | struct PendingMessage *message; |
174 | 174 | ||
175 | /** | 175 | /** |
176 | * Array of hash codes over the results that we have already | ||
177 | * seen. | ||
178 | */ | ||
179 | struct GNUNET_HashCode *seen_results; | ||
180 | |||
181 | /** | ||
176 | * Key that this get request is for | 182 | * Key that this get request is for |
177 | */ | 183 | */ |
178 | struct GNUNET_HashCode key; | 184 | struct GNUNET_HashCode key; |
179 | 185 | ||
180 | /** | 186 | /** |
181 | * Unique identifier for this request (for key collisions). | 187 | * Unique identifier for this request (for key collisions). |
182 | */ | 188 | */ |
183 | uint64_t unique_id; | 189 | uint64_t unique_id; |
184 | 190 | ||
191 | /** | ||
192 | * Size of the 'seen_results' array. Note that not | ||
193 | * all positions might be used (as we over-allocate). | ||
194 | */ | ||
195 | unsigned int seen_results_size; | ||
196 | |||
197 | /** | ||
198 | * Offset into the 'seen_results' array marking the | ||
199 | * end of the positions that are actually used. | ||
200 | */ | ||
201 | unsigned int seen_results_end; | ||
202 | |||
203 | /** | ||
204 | * Offset into the 'seen_results' array marking the | ||
205 | * position up to where we've send the hash codes to | ||
206 | * the DHT for blocking (needed as we might not be | ||
207 | * able to send all hash codes at once). | ||
208 | */ | ||
209 | unsigned int seen_results_transmission_offset; | ||
210 | |||
211 | |||
185 | }; | 212 | }; |
186 | 213 | ||
187 | 214 | ||
@@ -353,6 +380,50 @@ try_connect (struct GNUNET_DHT_Handle *handle) | |||
353 | 380 | ||
354 | 381 | ||
355 | /** | 382 | /** |
383 | * Queue messages to DHT to block certain results from the result set. | ||
384 | * | ||
385 | * @param get_handle GET to generate messages for. | ||
386 | */ | ||
387 | static void | ||
388 | queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle) | ||
389 | { | ||
390 | struct PendingMessage *pm; | ||
391 | struct GNUNET_DHT_ClientGetResultSeenMessage *msg; | ||
392 | uint16_t msize; | ||
393 | unsigned int delta; | ||
394 | unsigned int max; | ||
395 | |||
396 | while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end) | ||
397 | { | ||
398 | delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset; | ||
399 | max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); | ||
400 | if (delta > max) | ||
401 | delta = max; | ||
402 | msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode); | ||
403 | |||
404 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | ||
405 | msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1]; | ||
406 | pm->msg = &msg->header; | ||
407 | pm->handle = get_handle->dht_handle; | ||
408 | pm->unique_id = get_handle->unique_id; | ||
409 | pm->free_on_send = GNUNET_YES; | ||
410 | pm->in_pending_queue = GNUNET_YES; | ||
411 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); | ||
412 | msg->header.size = htons (msize); | ||
413 | msg->key = get_handle->key; | ||
414 | msg->unique_id = get_handle->unique_id; | ||
415 | memcpy (&msg[1], | ||
416 | &get_handle->seen_results[get_handle->seen_results_transmission_offset], | ||
417 | sizeof (struct GNUNET_HashCode) * delta); | ||
418 | get_handle->seen_results_transmission_offset += delta; | ||
419 | GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head, | ||
420 | get_handle->dht_handle->pending_tail, | ||
421 | pm); | ||
422 | } | ||
423 | } | ||
424 | |||
425 | |||
426 | /** | ||
356 | * Add the request corresponding to the given route handle | 427 | * Add the request corresponding to the given route handle |
357 | * to the pending queue (if it is not already in there). | 428 | * to the pending queue (if it is not already in there). |
358 | * | 429 | * |
@@ -365,16 +436,18 @@ static int | |||
365 | add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value) | 436 | add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value) |
366 | { | 437 | { |
367 | struct GNUNET_DHT_Handle *handle = cls; | 438 | struct GNUNET_DHT_Handle *handle = cls; |
368 | struct GNUNET_DHT_GetHandle *rh = value; | 439 | struct GNUNET_DHT_GetHandle *get_handle = value; |
369 | 440 | ||
370 | if (GNUNET_NO == rh->message->in_pending_queue) | 441 | if (GNUNET_NO == get_handle->message->in_pending_queue) |
371 | { | 442 | { |
372 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 443 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
373 | "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), | 444 | "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), |
374 | handle); | 445 | handle); |
446 | get_handle->seen_results_transmission_offset = 0; | ||
375 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 447 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, |
376 | rh->message); | 448 | get_handle->message); |
377 | rh->message->in_pending_queue = GNUNET_YES; | 449 | queue_filter_messages (get_handle); |
450 | get_handle->message->in_pending_queue = GNUNET_YES; | ||
378 | } | 451 | } |
379 | return GNUNET_YES; | 452 | return GNUNET_YES; |
380 | } | 453 | } |
@@ -578,6 +651,7 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value) | |||
578 | struct GNUNET_DHT_GetHandle *get_handle = value; | 651 | struct GNUNET_DHT_GetHandle *get_handle = value; |
579 | const struct GNUNET_PeerIdentity *put_path; | 652 | const struct GNUNET_PeerIdentity *put_path; |
580 | const struct GNUNET_PeerIdentity *get_path; | 653 | const struct GNUNET_PeerIdentity *get_path; |
654 | struct GNUNET_HashCode hc; | ||
581 | uint32_t put_path_length; | 655 | uint32_t put_path_length; |
582 | uint32_t get_path_length; | 656 | uint32_t get_path_length; |
583 | size_t data_length; | 657 | size_t data_length; |
@@ -614,6 +688,17 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value) | |||
614 | put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; | 688 | put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; |
615 | get_path = &put_path[put_path_length]; | 689 | get_path = &put_path[put_path_length]; |
616 | data = &get_path[get_path_length]; | 690 | data = &get_path[get_path_length]; |
691 | /* remember that we've seen this result */ | ||
692 | GNUNET_CRYPTO_hash (data, data_length, &hc); | ||
693 | if (get_handle->seen_results_size == get_handle->seen_results_end) | ||
694 | GNUNET_array_grow (get_handle->seen_results, | ||
695 | get_handle->seen_results_size, | ||
696 | get_handle->seen_results_size * 2 + 1); | ||
697 | GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset); | ||
698 | get_handle->seen_results[get_handle->seen_results_end++] = hc; | ||
699 | /* no need to block it explicitly, service already knows about it! */ | ||
700 | get_handle->seen_results_transmission_offset++; | ||
701 | |||
617 | get_handle->iter (get_handle->iter_cls, | 702 | get_handle->iter (get_handle->iter_cls, |
618 | GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, | 703 | GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, |
619 | get_path, get_path_length, put_path, put_path_length, | 704 | get_path, get_path_length, put_path, put_path_length, |
@@ -1194,6 +1279,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | |||
1194 | } | 1279 | } |
1195 | 1280 | ||
1196 | 1281 | ||
1282 | |||
1283 | /** | ||
1284 | * Tell the DHT not to return any of the following known results | ||
1285 | * to this client. | ||
1286 | * | ||
1287 | * @param get_handle get operation for which results should be filtered | ||
1288 | * @param num_results number of results to be blocked that are | ||
1289 | * provided in this call (size of the 'results' array) | ||
1290 | * @param results array of hash codes over the 'data' of the results | ||
1291 | * to be blocked | ||
1292 | */ | ||
1293 | void | ||
1294 | GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, | ||
1295 | unsigned int num_results, | ||
1296 | const struct GNUNET_HashCode *results) | ||
1297 | { | ||
1298 | unsigned int needed; | ||
1299 | |||
1300 | needed = get_handle->seen_results_end + num_results; | ||
1301 | if (needed > get_handle->seen_results_size) | ||
1302 | GNUNET_array_grow (get_handle->seen_results, | ||
1303 | get_handle->seen_results_size, | ||
1304 | needed); | ||
1305 | memcpy (&get_handle->seen_results[get_handle->seen_results_end], | ||
1306 | results, | ||
1307 | num_results * sizeof (struct GNUNET_HashCode)); | ||
1308 | get_handle->seen_results_end += num_results; | ||
1309 | queue_filter_messages (get_handle); | ||
1310 | process_pending_messages (get_handle->dht_handle); | ||
1311 | } | ||
1312 | |||
1313 | |||
1197 | /** | 1314 | /** |
1198 | * Stop async DHT-get. | 1315 | * Stop async DHT-get. |
1199 | * | 1316 | * |
@@ -1242,8 +1359,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) | |||
1242 | get_handle->message->in_pending_queue = GNUNET_NO; | 1359 | get_handle->message->in_pending_queue = GNUNET_NO; |
1243 | } | 1360 | } |
1244 | GNUNET_free (get_handle->message); | 1361 | GNUNET_free (get_handle->message); |
1362 | GNUNET_array_grow (get_handle->seen_results, | ||
1363 | get_handle->seen_results_end, | ||
1364 | 0); | ||
1245 | GNUNET_free (get_handle); | 1365 | GNUNET_free (get_handle); |
1246 | |||
1247 | process_pending_messages (handle); | 1366 | process_pending_messages (handle); |
1248 | } | 1367 | } |
1249 | 1368 | ||
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 1d2e1e9bb..593674569 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c | |||
@@ -551,9 +551,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, | |||
551 | 551 | ||
552 | 552 | ||
553 | /** | 553 | /** |
554 | * Handler for any generic DHT messages, calls the appropriate handler | 554 | * Handler for DHT GET messages from the client. |
555 | * depending on message type, sends confirmation if responses aren't otherwise | ||
556 | * expected. | ||
557 | * | 555 | * |
558 | * @param cls closure for the service | 556 | * @param cls closure for the service |
559 | * @param client the client we received this message from | 557 | * @param client the client we received this message from |
@@ -621,6 +619,103 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, | |||
621 | 619 | ||
622 | 620 | ||
623 | /** | 621 | /** |
622 | * Closure for 'find_by_unique_id'. | ||
623 | */ | ||
624 | struct FindByUniqueIdContext | ||
625 | { | ||
626 | /** | ||
627 | * Where to store the result, if found. | ||
628 | */ | ||
629 | struct ClientQueryRecord *cqr; | ||
630 | |||
631 | uint64_t unique_id; | ||
632 | }; | ||
633 | |||
634 | |||
635 | /** | ||
636 | * Function called for each existing DHT record for the given | ||
637 | * query. Checks if it matches the UID given in the closure | ||
638 | * and if so returns the entry as a result. | ||
639 | * | ||
640 | * @param cls the search context | ||
641 | * @param key query for the lookup (not used) | ||
642 | * @param value the 'struct ClientQueryRecord' | ||
643 | * @return GNUNET_YES to continue iteration (result not yet found) | ||
644 | */ | ||
645 | static int | ||
646 | find_by_unique_id (void *cls, | ||
647 | const struct GNUNET_HashCode *key, | ||
648 | void *value) | ||
649 | { | ||
650 | struct FindByUniqueIdContext *fui_ctx = cls; | ||
651 | struct ClientQueryRecord *cqr = value; | ||
652 | |||
653 | if (cqr->unique_id != fui_ctx->unique_id) | ||
654 | return GNUNET_YES; | ||
655 | fui_ctx->cqr = cqr; | ||
656 | return GNUNET_NO; | ||
657 | } | ||
658 | |||
659 | |||
660 | /** | ||
661 | * Handler for "GET result seen" messages from the client. | ||
662 | * | ||
663 | * @param cls closure for the service | ||
664 | * @param client the client we received this message from | ||
665 | * @param message the actual message received | ||
666 | */ | ||
667 | static void | ||
668 | handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, | ||
669 | const struct GNUNET_MessageHeader *message) | ||
670 | { | ||
671 | const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; | ||
672 | uint16_t size; | ||
673 | unsigned int hash_count; | ||
674 | unsigned int old_count; | ||
675 | const struct GNUNET_HashCode *hc; | ||
676 | struct FindByUniqueIdContext fui_ctx; | ||
677 | struct ClientQueryRecord *cqr; | ||
678 | |||
679 | size = ntohs (message->size); | ||
680 | if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) | ||
681 | { | ||
682 | GNUNET_break (0); | ||
683 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
684 | return; | ||
685 | } | ||
686 | seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message; | ||
687 | hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); | ||
688 | if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) | ||
689 | { | ||
690 | GNUNET_break (0); | ||
691 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
692 | return; | ||
693 | } | ||
694 | hc = (const struct GNUNET_HashCode*) &seen[1]; | ||
695 | fui_ctx.unique_id = seen->unique_id; | ||
696 | fui_ctx.cqr = NULL; | ||
697 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, | ||
698 | &seen->key, | ||
699 | &find_by_unique_id, | ||
700 | &fui_ctx); | ||
701 | if (NULL == (cqr = fui_ctx.cqr)) | ||
702 | { | ||
703 | GNUNET_break (0); | ||
704 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
705 | return; | ||
706 | } | ||
707 | /* finally, update 'seen' list */ | ||
708 | old_count = cqr->seen_replies_count; | ||
709 | GNUNET_array_grow (cqr->seen_replies, | ||
710 | cqr->seen_replies_count, | ||
711 | cqr->seen_replies_count + hash_count); | ||
712 | memcpy (&cqr->seen_replies[old_count], | ||
713 | hc, | ||
714 | sizeof (struct GNUNET_HashCode) * hash_count); | ||
715 | } | ||
716 | |||
717 | |||
718 | /** | ||
624 | * Closure for 'remove_by_unique_id'. | 719 | * Closure for 'remove_by_unique_id'. |
625 | */ | 720 | */ |
626 | struct RemoveByUniqueIdContext | 721 | struct RemoveByUniqueIdContext |
@@ -1350,6 +1445,8 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) | |||
1350 | {&handle_dht_local_monitor_stop, NULL, | 1445 | {&handle_dht_local_monitor_stop, NULL, |
1351 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, | 1446 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, |
1352 | sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, | 1447 | sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, |
1448 | {&handle_dht_local_get_result_seen, NULL, | ||
1449 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0}, | ||
1353 | {NULL, NULL, 0, 0} | 1450 | {NULL, NULL, 0, 0} |
1354 | }; | 1451 | }; |
1355 | forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); | 1452 | forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); |