aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-11-10 21:21:01 +0000
committerChristian Grothoff <christian@grothoff.org>2012-11-10 21:21:01 +0000
commit3b2b7374d2d31f7b2638c82711f40f7113e9f7f0 (patch)
tree9dbdac6a4cabe0367bffd864c69c0ac8358a2c45 /src/dht
parent5e756a09598021c6bb22cdbe1b0ac1011679ec0a (diff)
downloadgnunet-3b2b7374d2d31f7b2638c82711f40f7113e9f7f0.tar.gz
gnunet-3b2b7374d2d31f7b2638c82711f40f7113e9f7f0.zip
-implementing #2435
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/dht.h35
-rw-r--r--src/dht/dht_api.c131
-rw-r--r--src/dht/gnunet-service-dht_clients.c103
3 files changed, 259 insertions, 10 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 772471a7c..f736c8d75 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -109,6 +109,39 @@ struct GNUNET_DHT_ClientGetMessage
109 109
110 110
111/** 111/**
112 * DHT GET RESULTS KNOWN message sent from clients to service. Indicates that a GET
113 * request should exclude certain results which are already known.
114 */
115struct GNUNET_DHT_ClientGetResultSeenMessage
116{
117 /**
118 * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN
119 */
120 struct GNUNET_MessageHeader header;
121
122 /**
123 * Reserved, always 0.
124 */
125 uint32_t reserved GNUNET_PACKED;
126
127 /**
128 * The key we are searching for (to make it easy to find the corresponding
129 * GET inside the service).
130 */
131 struct GNUNET_HashCode key;
132
133 /**
134 * Unique ID identifying this request.
135 */
136 uint64_t unique_id GNUNET_PACKED;
137
138 /* Followed by an array of the hash codes of known results */
139
140};
141
142
143
144/**
112 * Reply to a GET send from the service to a client. 145 * Reply to a GET send from the service to a client.
113 */ 146 */
114struct GNUNET_DHT_ClientResultMessage 147struct GNUNET_DHT_ClientResultMessage
@@ -325,7 +358,7 @@ struct GNUNET_DHT_MonitorStartStopMessage
325struct GNUNET_DHT_MonitorGetMessage 358struct GNUNET_DHT_MonitorGetMessage
326{ 359{
327 /** 360 /**
328 * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT 361 * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET
329 */ 362 */
330 struct GNUNET_MessageHeader header; 363 struct GNUNET_MessageHeader header;
331 364
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 7964ba98f..f46900778 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -173,15 +173,42 @@ struct GNUNET_DHT_GetHandle
173 struct PendingMessage *message; 173 struct PendingMessage *message;
174 174
175 /** 175 /**
176 * Array of hash codes over the results that we have already
177 * seen.
178 */
179 struct GNUNET_HashCode *seen_results;
180
181 /**
176 * Key that this get request is for 182 * Key that this get request is for
177 */ 183 */
178 struct GNUNET_HashCode key; 184 struct GNUNET_HashCode key;
179 185
180 /** 186 /**
181 * Unique identifier for this request (for key collisions). 187 * Unique identifier for this request (for key collisions).
182 */ 188 */
183 uint64_t unique_id; 189 uint64_t unique_id;
184 190
191 /**
192 * Size of the 'seen_results' array. Note that not
193 * all positions might be used (as we over-allocate).
194 */
195 unsigned int seen_results_size;
196
197 /**
198 * Offset into the 'seen_results' array marking the
199 * end of the positions that are actually used.
200 */
201 unsigned int seen_results_end;
202
203 /**
204 * Offset into the 'seen_results' array marking the
205 * position up to where we've send the hash codes to
206 * the DHT for blocking (needed as we might not be
207 * able to send all hash codes at once).
208 */
209 unsigned int seen_results_transmission_offset;
210
211
185}; 212};
186 213
187 214
@@ -353,6 +380,50 @@ try_connect (struct GNUNET_DHT_Handle *handle)
353 380
354 381
355/** 382/**
383 * Queue messages to DHT to block certain results from the result set.
384 *
385 * @param get_handle GET to generate messages for.
386 */
387static void
388queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
389{
390 struct PendingMessage *pm;
391 struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
392 uint16_t msize;
393 unsigned int delta;
394 unsigned int max;
395
396 while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
397 {
398 delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
399 max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
400 if (delta > max)
401 delta = max;
402 msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
403
404 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
405 msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
406 pm->msg = &msg->header;
407 pm->handle = get_handle->dht_handle;
408 pm->unique_id = get_handle->unique_id;
409 pm->free_on_send = GNUNET_YES;
410 pm->in_pending_queue = GNUNET_YES;
411 msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
412 msg->header.size = htons (msize);
413 msg->key = get_handle->key;
414 msg->unique_id = get_handle->unique_id;
415 memcpy (&msg[1],
416 &get_handle->seen_results[get_handle->seen_results_transmission_offset],
417 sizeof (struct GNUNET_HashCode) * delta);
418 get_handle->seen_results_transmission_offset += delta;
419 GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
420 get_handle->dht_handle->pending_tail,
421 pm);
422 }
423}
424
425
426/**
356 * Add the request corresponding to the given route handle 427 * Add the request corresponding to the given route handle
357 * to the pending queue (if it is not already in there). 428 * to the pending queue (if it is not already in there).
358 * 429 *
@@ -365,16 +436,18 @@ static int
365add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value) 436add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value)
366{ 437{
367 struct GNUNET_DHT_Handle *handle = cls; 438 struct GNUNET_DHT_Handle *handle = cls;
368 struct GNUNET_DHT_GetHandle *rh = value; 439 struct GNUNET_DHT_GetHandle *get_handle = value;
369 440
370 if (GNUNET_NO == rh->message->in_pending_queue) 441 if (GNUNET_NO == get_handle->message->in_pending_queue)
371 { 442 {
372 LOG (GNUNET_ERROR_TYPE_DEBUG, 443 LOG (GNUNET_ERROR_TYPE_DEBUG,
373 "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), 444 "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
374 handle); 445 handle);
446 get_handle->seen_results_transmission_offset = 0;
375 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 447 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
376 rh->message); 448 get_handle->message);
377 rh->message->in_pending_queue = GNUNET_YES; 449 queue_filter_messages (get_handle);
450 get_handle->message->in_pending_queue = GNUNET_YES;
378 } 451 }
379 return GNUNET_YES; 452 return GNUNET_YES;
380} 453}
@@ -578,6 +651,7 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
578 struct GNUNET_DHT_GetHandle *get_handle = value; 651 struct GNUNET_DHT_GetHandle *get_handle = value;
579 const struct GNUNET_PeerIdentity *put_path; 652 const struct GNUNET_PeerIdentity *put_path;
580 const struct GNUNET_PeerIdentity *get_path; 653 const struct GNUNET_PeerIdentity *get_path;
654 struct GNUNET_HashCode hc;
581 uint32_t put_path_length; 655 uint32_t put_path_length;
582 uint32_t get_path_length; 656 uint32_t get_path_length;
583 size_t data_length; 657 size_t data_length;
@@ -614,6 +688,17 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
614 put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; 688 put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
615 get_path = &put_path[put_path_length]; 689 get_path = &put_path[put_path_length];
616 data = &get_path[get_path_length]; 690 data = &get_path[get_path_length];
691 /* remember that we've seen this result */
692 GNUNET_CRYPTO_hash (data, data_length, &hc);
693 if (get_handle->seen_results_size == get_handle->seen_results_end)
694 GNUNET_array_grow (get_handle->seen_results,
695 get_handle->seen_results_size,
696 get_handle->seen_results_size * 2 + 1);
697 GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
698 get_handle->seen_results[get_handle->seen_results_end++] = hc;
699 /* no need to block it explicitly, service already knows about it! */
700 get_handle->seen_results_transmission_offset++;
701
617 get_handle->iter (get_handle->iter_cls, 702 get_handle->iter (get_handle->iter_cls,
618 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, 703 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
619 get_path, get_path_length, put_path, put_path_length, 704 get_path, get_path_length, put_path, put_path_length,
@@ -1194,6 +1279,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1194} 1279}
1195 1280
1196 1281
1282
1283/**
1284 * Tell the DHT not to return any of the following known results
1285 * to this client.
1286 *
1287 * @param get_handle get operation for which results should be filtered
1288 * @param num_results number of results to be blocked that are
1289 * provided in this call (size of the 'results' array)
1290 * @param results array of hash codes over the 'data' of the results
1291 * to be blocked
1292 */
1293void
1294GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1295 unsigned int num_results,
1296 const struct GNUNET_HashCode *results)
1297{
1298 unsigned int needed;
1299
1300 needed = get_handle->seen_results_end + num_results;
1301 if (needed > get_handle->seen_results_size)
1302 GNUNET_array_grow (get_handle->seen_results,
1303 get_handle->seen_results_size,
1304 needed);
1305 memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1306 results,
1307 num_results * sizeof (struct GNUNET_HashCode));
1308 get_handle->seen_results_end += num_results;
1309 queue_filter_messages (get_handle);
1310 process_pending_messages (get_handle->dht_handle);
1311}
1312
1313
1197/** 1314/**
1198 * Stop async DHT-get. 1315 * Stop async DHT-get.
1199 * 1316 *
@@ -1242,8 +1359,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1242 get_handle->message->in_pending_queue = GNUNET_NO; 1359 get_handle->message->in_pending_queue = GNUNET_NO;
1243 } 1360 }
1244 GNUNET_free (get_handle->message); 1361 GNUNET_free (get_handle->message);
1362 GNUNET_array_grow (get_handle->seen_results,
1363 get_handle->seen_results_end,
1364 0);
1245 GNUNET_free (get_handle); 1365 GNUNET_free (get_handle);
1246
1247 process_pending_messages (handle); 1366 process_pending_messages (handle);
1248} 1367}
1249 1368
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 1d2e1e9bb..593674569 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -551,9 +551,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
551 551
552 552
553/** 553/**
554 * Handler for any generic DHT messages, calls the appropriate handler 554 * Handler for DHT GET messages from the client.
555 * depending on message type, sends confirmation if responses aren't otherwise
556 * expected.
557 * 555 *
558 * @param cls closure for the service 556 * @param cls closure for the service
559 * @param client the client we received this message from 557 * @param client the client we received this message from
@@ -621,6 +619,103 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
621 619
622 620
623/** 621/**
622 * Closure for 'find_by_unique_id'.
623 */
624struct FindByUniqueIdContext
625{
626 /**
627 * Where to store the result, if found.
628 */
629 struct ClientQueryRecord *cqr;
630
631 uint64_t unique_id;
632};
633
634
635/**
636 * Function called for each existing DHT record for the given
637 * query. Checks if it matches the UID given in the closure
638 * and if so returns the entry as a result.
639 *
640 * @param cls the search context
641 * @param key query for the lookup (not used)
642 * @param value the 'struct ClientQueryRecord'
643 * @return GNUNET_YES to continue iteration (result not yet found)
644 */
645static int
646find_by_unique_id (void *cls,
647 const struct GNUNET_HashCode *key,
648 void *value)
649{
650 struct FindByUniqueIdContext *fui_ctx = cls;
651 struct ClientQueryRecord *cqr = value;
652
653 if (cqr->unique_id != fui_ctx->unique_id)
654 return GNUNET_YES;
655 fui_ctx->cqr = cqr;
656 return GNUNET_NO;
657}
658
659
660/**
661 * Handler for "GET result seen" messages from the client.
662 *
663 * @param cls closure for the service
664 * @param client the client we received this message from
665 * @param message the actual message received
666 */
667static void
668handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client,
669 const struct GNUNET_MessageHeader *message)
670{
671 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen;
672 uint16_t size;
673 unsigned int hash_count;
674 unsigned int old_count;
675 const struct GNUNET_HashCode *hc;
676 struct FindByUniqueIdContext fui_ctx;
677 struct ClientQueryRecord *cqr;
678
679 size = ntohs (message->size);
680 if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
681 {
682 GNUNET_break (0);
683 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
684 return;
685 }
686 seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
687 hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
688 if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
689 {
690 GNUNET_break (0);
691 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
692 return;
693 }
694 hc = (const struct GNUNET_HashCode*) &seen[1];
695 fui_ctx.unique_id = seen->unique_id;
696 fui_ctx.cqr = NULL;
697 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
698 &seen->key,
699 &find_by_unique_id,
700 &fui_ctx);
701 if (NULL == (cqr = fui_ctx.cqr))
702 {
703 GNUNET_break (0);
704 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
705 return;
706 }
707 /* finally, update 'seen' list */
708 old_count = cqr->seen_replies_count;
709 GNUNET_array_grow (cqr->seen_replies,
710 cqr->seen_replies_count,
711 cqr->seen_replies_count + hash_count);
712 memcpy (&cqr->seen_replies[old_count],
713 hc,
714 sizeof (struct GNUNET_HashCode) * hash_count);
715}
716
717
718/**
624 * Closure for 'remove_by_unique_id'. 719 * Closure for 'remove_by_unique_id'.
625 */ 720 */
626struct RemoveByUniqueIdContext 721struct RemoveByUniqueIdContext
@@ -1350,6 +1445,8 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
1350 {&handle_dht_local_monitor_stop, NULL, 1445 {&handle_dht_local_monitor_stop, NULL,
1351 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, 1446 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
1352 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, 1447 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
1448 {&handle_dht_local_get_result_seen, NULL,
1449 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
1353 {NULL, NULL, 0, 0} 1450 {NULL, NULL, 0, 0}
1354 }; 1451 };
1355 forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); 1452 forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);