diff options
author | David Barksdale <amatus@amat.us> | 2017-03-19 15:55:32 -0500 |
---|---|---|
committer | David Barksdale <amatus@amat.us> | 2017-03-19 17:38:36 -0500 |
commit | 2dde0202c5590eeb051c1346f2b66293d83b87ce (patch) | |
tree | 7997191912ee4c70959934d6c9783a0c9f450fec /src/fs | |
parent | d17d833dfd93a81f3540d472d1be4dfb7e9cbd03 (diff) | |
download | gnunet-2dde0202c5590eeb051c1346f2b66293d83b87ce.tar.gz gnunet-2dde0202c5590eeb051c1346f2b66293d83b87ce.zip |
[datastore] Fix #3743
This change adds support for key == NULL to the datastore plugins
and replaces the offset argument with a next_uid and random arguments to
increase performance in the key == NULL case.
With the offset argument a datastore plugin would have to count all
matching keys before fetching the key at the right offset, which would
iterate over the entire database in the case of key == NULL.
The offset argument was used in two ways: to iterate over a set of
matching values and to start iteration at a random matching value. The new API
seperates these into two arguments: if random is true it will return a
random matching value, otherwise next_uid can be set to uid + 1 to return the
next matching value.
The random argument was not added to get_zero_anonymity. This function
is used to periodically insert zero anonymity values into the DHT. I
don't think it's necessary to randomize this.
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/fs_api.h | 10 | ||||
-rw-r--r-- | src/fs/fs_unindex.c | 58 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_server.c | 13 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 403 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_put.c | 44 |
5 files changed, 251 insertions, 277 deletions
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h index e85de94a7..be22ea73e 100644 --- a/src/fs/fs_api.h +++ b/src/fs/fs_api.h | |||
@@ -1464,21 +1464,11 @@ struct GNUNET_FS_UnindexContext | |||
1464 | struct GNUNET_CRYPTO_FileHashContext *fhc; | 1464 | struct GNUNET_CRYPTO_FileHashContext *fhc; |
1465 | 1465 | ||
1466 | /** | 1466 | /** |
1467 | * Which values have we seen already? | ||
1468 | */ | ||
1469 | struct GNUNET_CONTAINER_MultiHashMap *seen_dh; | ||
1470 | |||
1471 | /** | ||
1472 | * Overall size of the file. | 1467 | * Overall size of the file. |
1473 | */ | 1468 | */ |
1474 | uint64_t file_size; | 1469 | uint64_t file_size; |
1475 | 1470 | ||
1476 | /** | 1471 | /** |
1477 | * Random offset given to #GNUNET_DATASTORE_get_key. | ||
1478 | */ | ||
1479 | uint64_t roff; | ||
1480 | |||
1481 | /** | ||
1482 | * When did we start? | 1472 | * When did we start? |
1483 | */ | 1473 | */ |
1484 | struct GNUNET_TIME_Absolute start_time; | 1474 | struct GNUNET_TIME_Absolute start_time; |
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index ad1499f00..e1c7ea535 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c | |||
@@ -312,8 +312,6 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc) | |||
312 | uc->fh = NULL; | 312 | uc->fh = NULL; |
313 | GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO); | 313 | GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO); |
314 | uc->dsh = NULL; | 314 | uc->dsh = NULL; |
315 | GNUNET_CONTAINER_multihashmap_destroy (uc->seen_dh); | ||
316 | uc->seen_dh = NULL; | ||
317 | uc->state = UNINDEX_STATE_FS_NOTIFY; | 315 | uc->state = UNINDEX_STATE_FS_NOTIFY; |
318 | GNUNET_FS_unindex_sync_ (uc); | 316 | GNUNET_FS_unindex_sync_ (uc); |
319 | uc->mq = GNUNET_CLIENT_connect (uc->h->cfg, | 317 | uc->mq = GNUNET_CLIENT_connect (uc->h->cfg, |
@@ -444,7 +442,6 @@ continue_after_remove (void *cls, | |||
444 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 442 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
445 | _("Failed to remove UBlock: %s\n"), | 443 | _("Failed to remove UBlock: %s\n"), |
446 | msg); | 444 | msg); |
447 | GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh); | ||
448 | uc->ksk_offset++; | 445 | uc->ksk_offset++; |
449 | GNUNET_FS_unindex_do_remove_kblocks_ (uc); | 446 | GNUNET_FS_unindex_do_remove_kblocks_ (uc); |
450 | } | 447 | } |
@@ -486,34 +483,15 @@ process_kblock_for_unindex (void *cls, | |||
486 | const struct UBlock *ub; | 483 | const struct UBlock *ub; |
487 | struct GNUNET_FS_Uri *chk_uri; | 484 | struct GNUNET_FS_Uri *chk_uri; |
488 | struct GNUNET_HashCode query; | 485 | struct GNUNET_HashCode query; |
489 | struct GNUNET_HashCode dh; | ||
490 | 486 | ||
491 | uc->dqe = NULL; | 487 | uc->dqe = NULL; |
492 | if (NULL == data) | 488 | if (NULL == data) |
493 | { | 489 | { |
494 | /* no result */ | 490 | /* no result */ |
495 | GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh); | ||
496 | uc->ksk_offset++; | 491 | uc->ksk_offset++; |
497 | GNUNET_FS_unindex_do_remove_kblocks_ (uc); | 492 | GNUNET_FS_unindex_do_remove_kblocks_ (uc); |
498 | return; | 493 | return; |
499 | } | 494 | } |
500 | GNUNET_CRYPTO_hash (data, | ||
501 | size, | ||
502 | &dh); | ||
503 | if (GNUNET_YES == | ||
504 | GNUNET_CONTAINER_multihashmap_contains (uc->seen_dh, | ||
505 | &dh)) | ||
506 | { | ||
507 | GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh); | ||
508 | uc->ksk_offset++; | ||
509 | GNUNET_FS_unindex_do_remove_kblocks_ (uc); | ||
510 | return; | ||
511 | } | ||
512 | GNUNET_assert (GNUNET_OK == | ||
513 | GNUNET_CONTAINER_multihashmap_put (uc->seen_dh, | ||
514 | &dh, | ||
515 | uc, | ||
516 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
517 | GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type); | 495 | GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type); |
518 | if (size < sizeof (struct UBlock)) | 496 | if (size < sizeof (struct UBlock)) |
519 | { | 497 | { |
@@ -566,23 +544,24 @@ process_kblock_for_unindex (void *cls, | |||
566 | GNUNET_FS_uri_destroy (chk_uri); | 544 | GNUNET_FS_uri_destroy (chk_uri); |
567 | /* matches! */ | 545 | /* matches! */ |
568 | uc->dqe = GNUNET_DATASTORE_remove (uc->dsh, | 546 | uc->dqe = GNUNET_DATASTORE_remove (uc->dsh, |
569 | key, | 547 | key, |
570 | size, | 548 | size, |
571 | data, | 549 | data, |
572 | 0 /* priority */, | 550 | 0 /* priority */, |
573 | 1 /* queue size */, | 551 | 1 /* queue size */, |
574 | &continue_after_remove, | 552 | &continue_after_remove, |
575 | uc); | 553 | uc); |
576 | return; | 554 | return; |
577 | get_next: | 555 | get_next: |
578 | uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, | 556 | uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, |
579 | uc->roff++, | 557 | uid + 1 /* next_uid */, |
580 | &uc->uquery, | 558 | false /* random */, |
581 | GNUNET_BLOCK_TYPE_FS_UBLOCK, | 559 | &uc->uquery, |
582 | 0 /* priority */, | 560 | GNUNET_BLOCK_TYPE_FS_UBLOCK, |
561 | 0 /* priority */, | ||
583 | 1 /* queue size */, | 562 | 1 /* queue size */, |
584 | &process_kblock_for_unindex, | 563 | &process_kblock_for_unindex, |
585 | uc); | 564 | uc); |
586 | } | 565 | } |
587 | 566 | ||
588 | 567 | ||
@@ -627,13 +606,14 @@ GNUNET_FS_unindex_do_remove_kblocks_ (struct GNUNET_FS_UnindexContext *uc) | |||
627 | sizeof (dpub), | 606 | sizeof (dpub), |
628 | &uc->uquery); | 607 | &uc->uquery); |
629 | uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, | 608 | uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, |
630 | uc->roff++, | 609 | 0 /* next_uid */, |
631 | &uc->uquery, | 610 | false /* random */, |
632 | GNUNET_BLOCK_TYPE_FS_UBLOCK, | 611 | &uc->uquery, |
633 | 0 /* priority */, | 612 | GNUNET_BLOCK_TYPE_FS_UBLOCK, |
613 | 0 /* priority */, | ||
634 | 1 /* queue size */, | 614 | 1 /* queue size */, |
635 | &process_kblock_for_unindex, | 615 | &process_kblock_for_unindex, |
636 | uc); | 616 | uc); |
637 | } | 617 | } |
638 | 618 | ||
639 | 619 | ||
@@ -826,8 +806,6 @@ GNUNET_FS_unindex_start (struct GNUNET_FS_Handle *h, | |||
826 | uc->start_time = GNUNET_TIME_absolute_get (); | 806 | uc->start_time = GNUNET_TIME_absolute_get (); |
827 | uc->file_size = size; | 807 | uc->file_size = size; |
828 | uc->client_info = cctx; | 808 | uc->client_info = cctx; |
829 | uc->seen_dh = GNUNET_CONTAINER_multihashmap_create (4, | ||
830 | GNUNET_NO); | ||
831 | GNUNET_FS_unindex_sync_ (uc); | 809 | GNUNET_FS_unindex_sync_ (uc); |
832 | pi.status = GNUNET_FS_STATUS_UNINDEX_START; | 810 | pi.status = GNUNET_FS_STATUS_UNINDEX_START; |
833 | pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL; | 811 | pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL; |
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c index b1a098175..f8619b812 100644 --- a/src/fs/gnunet-service-fs_cadet_server.c +++ b/src/fs/gnunet-service-fs_cadet_server.c | |||
@@ -345,12 +345,13 @@ handle_request (void *cls, | |||
345 | GNUNET_NO); | 345 | GNUNET_NO); |
346 | refresh_timeout_task (sc); | 346 | refresh_timeout_task (sc); |
347 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, | 347 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
348 | 0, | 348 | 0 /* next_uid */, |
349 | &sqm->query, | 349 | false /* random */, |
350 | ntohl (sqm->type), | 350 | &sqm->query, |
351 | 0 /* priority */, | 351 | ntohl (sqm->type), |
352 | GSF_datastore_queue_size, | 352 | 0 /* priority */, |
353 | &handle_datastore_reply, | 353 | GSF_datastore_queue_size, |
354 | &handle_datastore_reply, | ||
354 | sc); | 355 | sc); |
355 | if (NULL == sc->qe) | 356 | if (NULL == sc->qe) |
356 | { | 357 | { |
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index b0fda24b5..b736b49c2 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -160,20 +160,27 @@ struct GSF_PendingRequest | |||
160 | struct GNUNET_SCHEDULER_Task * warn_task; | 160 | struct GNUNET_SCHEDULER_Task * warn_task; |
161 | 161 | ||
162 | /** | 162 | /** |
163 | * Current offset for querying our local datastore for results. | 163 | * Do we have a first UID yet? |
164 | * Starts at a random value, incremented until we get the same | 164 | */ |
165 | * UID again (detected using 'first_uid'), which is then used | 165 | bool have_first_uid; |
166 | * to termiante the iteration. | 166 | |
167 | /** | ||
168 | * Have we seen a NULL result yet? | ||
167 | */ | 169 | */ |
168 | uint64_t local_result_offset; | 170 | bool seen_null; |
169 | 171 | ||
170 | /** | 172 | /** |
171 | * Unique ID of the first result from the local datastore; | 173 | * Unique ID of the first result from the local datastore; |
172 | * used to detect wrap-around of the offset. | 174 | * used to terminate the loop. |
173 | */ | 175 | */ |
174 | uint64_t first_uid; | 176 | uint64_t first_uid; |
175 | 177 | ||
176 | /** | 178 | /** |
179 | * Result count. | ||
180 | */ | ||
181 | size_t result_count; | ||
182 | |||
183 | /** | ||
177 | * How often have we retried this request via 'cadet'? | 184 | * How often have we retried this request via 'cadet'? |
178 | * (used to bound overall retries). | 185 | * (used to bound overall retries). |
179 | */ | 186 | */ |
@@ -189,11 +196,6 @@ struct GSF_PendingRequest | |||
189 | */ | 196 | */ |
190 | unsigned int replies_seen_size; | 197 | unsigned int replies_seen_size; |
191 | 198 | ||
192 | /** | ||
193 | * Do we have a first UID yet? | ||
194 | */ | ||
195 | unsigned int have_first_uid; | ||
196 | |||
197 | }; | 199 | }; |
198 | 200 | ||
199 | 201 | ||
@@ -332,8 +334,6 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
332 | if (NULL != target) | 334 | if (NULL != target) |
333 | extra += sizeof (struct GNUNET_PeerIdentity); | 335 | extra += sizeof (struct GNUNET_PeerIdentity); |
334 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra); | 336 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra); |
335 | pr->local_result_offset = | ||
336 | GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); | ||
337 | pr->public_data.query = *query; | 337 | pr->public_data.query = *query; |
338 | eptr = (struct GNUNET_HashCode *) &pr[1]; | 338 | eptr = (struct GNUNET_HashCode *) &pr[1]; |
339 | if (NULL != target) | 339 | if (NULL != target) |
@@ -1340,6 +1340,123 @@ odc_warn_delay_task (void *cls) | |||
1340 | } | 1340 | } |
1341 | 1341 | ||
1342 | 1342 | ||
1343 | /* Call our continuation (if we have any) */ | ||
1344 | static void | ||
1345 | call_continuation (struct GSF_PendingRequest *pr) | ||
1346 | { | ||
1347 | GSF_LocalLookupContinuation cont = pr->llc_cont; | ||
1348 | |||
1349 | GNUNET_assert (NULL == pr->qe); | ||
1350 | if (NULL != pr->warn_task) | ||
1351 | { | ||
1352 | GNUNET_SCHEDULER_cancel (pr->warn_task); | ||
1353 | pr->warn_task = NULL; | ||
1354 | } | ||
1355 | if (NULL == cont) | ||
1356 | return; /* no continuation */ | ||
1357 | pr->llc_cont = NULL; | ||
1358 | if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options)) | ||
1359 | { | ||
1360 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result) | ||
1361 | { | ||
1362 | /* Signal that we are done and that there won't be any | ||
1363 | additional results to allow client to clean up state. */ | ||
1364 | pr->rh (pr->rh_cls, | ||
1365 | GNUNET_BLOCK_EVALUATION_OK_LAST, | ||
1366 | pr, | ||
1367 | UINT32_MAX, | ||
1368 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
1369 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
1370 | GNUNET_BLOCK_TYPE_ANY, | ||
1371 | NULL, | ||
1372 | 0); | ||
1373 | } | ||
1374 | /* Finally, call our continuation to signal that we are | ||
1375 | done with local processing of this request; i.e. to | ||
1376 | start reading again from the client. */ | ||
1377 | cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST); | ||
1378 | return; | ||
1379 | } | ||
1380 | |||
1381 | cont (pr->llc_cont_cls, pr, pr->local_result); | ||
1382 | } | ||
1383 | |||
1384 | |||
1385 | /* Update stats and call continuation */ | ||
1386 | static void | ||
1387 | no_more_local_results (struct GSF_PendingRequest *pr) | ||
1388 | { | ||
1389 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | ||
1390 | "No further local responses available.\n"); | ||
1391 | #if INSANE_STATISTICS | ||
1392 | if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) || | ||
1393 | (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type) ) | ||
1394 | GNUNET_STATISTICS_update (GSF_stats, | ||
1395 | gettext_noop ("# requested DBLOCK or IBLOCK not found"), | ||
1396 | 1, | ||
1397 | GNUNET_NO); | ||
1398 | #endif | ||
1399 | call_continuation (pr); | ||
1400 | } | ||
1401 | |||
1402 | |||
1403 | /* forward declaration */ | ||
1404 | static void | ||
1405 | process_local_reply (void *cls, | ||
1406 | const struct GNUNET_HashCode *key, | ||
1407 | size_t size, | ||
1408 | const void *data, | ||
1409 | enum GNUNET_BLOCK_Type type, | ||
1410 | uint32_t priority, | ||
1411 | uint32_t anonymity, | ||
1412 | struct GNUNET_TIME_Absolute expiration, | ||
1413 | uint64_t uid); | ||
1414 | |||
1415 | |||
1416 | /* Start a local query */ | ||
1417 | static void | ||
1418 | start_local_query (struct GSF_PendingRequest *pr, | ||
1419 | uint64_t next_uid, | ||
1420 | bool random) | ||
1421 | { | ||
1422 | pr->qe_start = GNUNET_TIME_absolute_get (); | ||
1423 | pr->warn_task = | ||
1424 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, | ||
1425 | &warn_delay_task, | ||
1426 | pr); | ||
1427 | pr->qe = | ||
1428 | GNUNET_DATASTORE_get_key (GSF_dsh, | ||
1429 | next_uid, | ||
1430 | random, | ||
1431 | &pr->public_data.query, | ||
1432 | pr->public_data.type == | ||
1433 | GNUNET_BLOCK_TYPE_FS_DBLOCK ? | ||
1434 | GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, | ||
1435 | (0 != | ||
1436 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1437 | public_data.options)) ? UINT_MAX : 1 | ||
1438 | /* queue priority */ , | ||
1439 | (0 != | ||
1440 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1441 | public_data.options)) ? UINT_MAX : | ||
1442 | GSF_datastore_queue_size | ||
1443 | /* max queue size */ , | ||
1444 | &process_local_reply, pr); | ||
1445 | if (NULL != pr->qe) | ||
1446 | return; | ||
1447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1448 | "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n", | ||
1449 | GNUNET_h2s (&pr->public_data.query), | ||
1450 | pr->public_data.type, | ||
1451 | (unsigned long long) next_uid); | ||
1452 | GNUNET_STATISTICS_update (GSF_stats, | ||
1453 | gettext_noop ("# Datastore lookups concluded (error queueing)"), | ||
1454 | 1, | ||
1455 | GNUNET_NO); | ||
1456 | call_continuation (pr); | ||
1457 | } | ||
1458 | |||
1459 | |||
1343 | /** | 1460 | /** |
1344 | * We're processing (local) results for a search request | 1461 | * We're processing (local) results for a search request |
1345 | * from another peer. Pass applicable results to the | 1462 | * from another peer. Pass applicable results to the |
@@ -1369,69 +1486,71 @@ process_local_reply (void *cls, | |||
1369 | uint64_t uid) | 1486 | uint64_t uid) |
1370 | { | 1487 | { |
1371 | struct GSF_PendingRequest *pr = cls; | 1488 | struct GSF_PendingRequest *pr = cls; |
1372 | GSF_LocalLookupContinuation cont; | ||
1373 | struct ProcessReplyClosure prq; | 1489 | struct ProcessReplyClosure prq; |
1374 | struct GNUNET_HashCode query; | 1490 | struct GNUNET_HashCode query; |
1375 | unsigned int old_rf; | 1491 | unsigned int old_rf; |
1376 | 1492 | ||
1377 | GNUNET_SCHEDULER_cancel (pr->warn_task); | 1493 | GNUNET_SCHEDULER_cancel (pr->warn_task); |
1378 | pr->warn_task = NULL; | 1494 | pr->warn_task = NULL; |
1379 | if (NULL != pr->qe) | 1495 | if (NULL == pr->qe) |
1496 | goto called_from_on_demand; | ||
1497 | pr->qe = NULL; | ||
1498 | if ( (NULL == key) && | ||
1499 | pr->seen_null && | ||
1500 | !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */ | ||
1380 | { | 1501 | { |
1381 | pr->qe = NULL; | 1502 | /* No results */ |
1382 | if (NULL == key) | ||
1383 | { | ||
1384 | #if INSANE_STATISTICS | 1503 | #if INSANE_STATISTICS |
1385 | GNUNET_STATISTICS_update (GSF_stats, | 1504 | GNUNET_STATISTICS_update (GSF_stats, |
1386 | gettext_noop | 1505 | gettext_noop |
1387 | ("# Datastore lookups concluded (no results)"), | 1506 | ("# Datastore lookups concluded (no results)"), |
1388 | 1, GNUNET_NO); | 1507 | 1, GNUNET_NO); |
1389 | #endif | 1508 | #endif |
1390 | } | 1509 | no_more_local_results (pr); |
1391 | if (GNUNET_NO == pr->have_first_uid) | 1510 | return; |
1392 | { | 1511 | } |
1393 | pr->first_uid = uid; | 1512 | if ( ( (NULL == key) && |
1394 | pr->have_first_uid = 1; | 1513 | pr->seen_null ) || /* We have hit the end for the 2nd time OR */ |
1395 | } | 1514 | ( pr->seen_null && |
1396 | else | 1515 | pr->have_first_uid && |
1397 | { | 1516 | (uid >= pr->first_uid) ) ) /* We have hit the end and past first UID */ |
1398 | if ((uid == pr->first_uid) && (key != NULL)) | 1517 | { |
1399 | { | 1518 | /* Seen all results */ |
1400 | GNUNET_STATISTICS_update (GSF_stats, | 1519 | GNUNET_STATISTICS_update (GSF_stats, |
1401 | gettext_noop | 1520 | gettext_noop |
1402 | ("# Datastore lookups concluded (seen all)"), | 1521 | ("# Datastore lookups concluded (seen all)"), |
1403 | 1, GNUNET_NO); | 1522 | 1, GNUNET_NO); |
1404 | key = NULL; /* all replies seen! */ | 1523 | no_more_local_results (pr); |
1405 | } | 1524 | return; |
1406 | pr->have_first_uid++; | ||
1407 | if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL)) | ||
1408 | { | ||
1409 | GNUNET_STATISTICS_update (GSF_stats, | ||
1410 | gettext_noop | ||
1411 | ("# Datastore lookups aborted (more than MAX_RESULTS)"), | ||
1412 | 1, GNUNET_NO); | ||
1413 | key = NULL; /* all replies seen! */ | ||
1414 | } | ||
1415 | } | ||
1416 | } | 1525 | } |
1417 | if (NULL == key) | 1526 | if (NULL == key) |
1418 | { | 1527 | { |
1419 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | 1528 | GNUNET_assert (!pr->seen_null); |
1420 | "No further local responses available.\n"); | 1529 | pr->seen_null = true; |
1421 | #if INSANE_STATISTICS | 1530 | start_local_query (pr, |
1422 | if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || | 1531 | 0 /* next_uid */, |
1423 | (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK)) | 1532 | false /* random */); |
1424 | GNUNET_STATISTICS_update (GSF_stats, | 1533 | return; |
1425 | gettext_noop | 1534 | } |
1426 | ("# requested DBLOCK or IBLOCK not found"), 1, | 1535 | if (!pr->have_first_uid) |
1427 | GNUNET_NO); | 1536 | { |
1428 | #endif | 1537 | pr->first_uid = uid; |
1429 | goto check_error_and_continue; | 1538 | pr->have_first_uid = true; |
1539 | } | ||
1540 | pr->result_count++; | ||
1541 | if (pr->result_count > MAX_RESULTS) | ||
1542 | { | ||
1543 | GNUNET_STATISTICS_update (GSF_stats, | ||
1544 | gettext_noop | ||
1545 | ("# Datastore lookups aborted (more than MAX_RESULTS)"), | ||
1546 | 1, GNUNET_NO); | ||
1547 | no_more_local_results (pr); | ||
1548 | return; | ||
1430 | } | 1549 | } |
1431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1550 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1432 | "Received reply for `%s' of type %d with UID %llu from datastore.\n", | 1551 | "Received reply for `%s' of type %d with UID %llu from datastore.\n", |
1433 | GNUNET_h2s (key), type, (unsigned long long) uid); | 1552 | GNUNET_h2s (key), type, (unsigned long long) uid); |
1434 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 1553 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) |
1435 | { | 1554 | { |
1436 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1555 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1437 | "Found ONDEMAND block, performing on-demand encoding\n"); | 1556 | "Found ONDEMAND block, performing on-demand encoding\n"); |
@@ -1458,33 +1577,12 @@ process_local_reply (void *cls, | |||
1458 | gettext_noop ("# on-demand lookups failed"), 1, | 1577 | gettext_noop ("# on-demand lookups failed"), 1, |
1459 | GNUNET_NO); | 1578 | GNUNET_NO); |
1460 | GNUNET_SCHEDULER_cancel (pr->warn_task); | 1579 | GNUNET_SCHEDULER_cancel (pr->warn_task); |
1461 | pr->warn_task = | 1580 | start_local_query (pr, |
1462 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, | 1581 | uid + 1 /* next_uid */, |
1463 | &warn_delay_task, pr); | 1582 | false /* random */); |
1464 | pr->qe = | 1583 | return; |
1465 | GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1, | ||
1466 | &pr->public_data.query, | ||
1467 | pr->public_data.type == | ||
1468 | GNUNET_BLOCK_TYPE_FS_DBLOCK ? | ||
1469 | GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, | ||
1470 | (0 != | ||
1471 | (GSF_PRO_PRIORITY_UNLIMITED & | ||
1472 | pr->public_data.options)) ? UINT_MAX : 1 | ||
1473 | /* queue priority */ , | ||
1474 | (0 != | ||
1475 | (GSF_PRO_PRIORITY_UNLIMITED & | ||
1476 | pr->public_data.options)) ? UINT_MAX : | ||
1477 | GSF_datastore_queue_size | ||
1478 | /* max queue size */ , | ||
1479 | &process_local_reply, pr); | ||
1480 | if (NULL != pr->qe) | ||
1481 | return; /* we're done */ | ||
1482 | GNUNET_STATISTICS_update (GSF_stats, | ||
1483 | gettext_noop | ||
1484 | ("# Datastore lookups concluded (error queueing)"), | ||
1485 | 1, GNUNET_NO); | ||
1486 | goto check_error_and_continue; | ||
1487 | } | 1584 | } |
1585 | called_from_on_demand: | ||
1488 | old_rf = pr->public_data.results_found; | 1586 | old_rf = pr->public_data.results_found; |
1489 | memset (&prq, 0, sizeof (prq)); | 1587 | memset (&prq, 0, sizeof (prq)); |
1490 | prq.data = data; | 1588 | prq.data = data; |
@@ -1496,34 +1594,9 @@ process_local_reply (void *cls, | |||
1496 | GNUNET_break (0); | 1594 | GNUNET_break (0); |
1497 | GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, | 1595 | GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, |
1498 | NULL, NULL); | 1596 | NULL, NULL); |
1499 | pr->qe_start = GNUNET_TIME_absolute_get (); | 1597 | start_local_query (pr, |
1500 | pr->warn_task = | 1598 | uid + 1 /* next_uid */, |
1501 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, | 1599 | false /* random */); |
1502 | &warn_delay_task, pr); | ||
1503 | pr->qe = | ||
1504 | GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1, | ||
1505 | &pr->public_data.query, | ||
1506 | pr->public_data.type == | ||
1507 | GNUNET_BLOCK_TYPE_FS_DBLOCK ? | ||
1508 | GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, | ||
1509 | (0 != | ||
1510 | (GSF_PRO_PRIORITY_UNLIMITED & | ||
1511 | pr->public_data.options)) ? UINT_MAX : 1 | ||
1512 | /* queue priority */ , | ||
1513 | (0 != | ||
1514 | (GSF_PRO_PRIORITY_UNLIMITED & | ||
1515 | pr->public_data.options)) ? UINT_MAX : | ||
1516 | GSF_datastore_queue_size | ||
1517 | /* max queue size */ , | ||
1518 | &process_local_reply, pr); | ||
1519 | if (NULL == pr->qe) | ||
1520 | { | ||
1521 | GNUNET_STATISTICS_update (GSF_stats, | ||
1522 | gettext_noop | ||
1523 | ("# Datastore lookups concluded (error queueing)"), | ||
1524 | 1, GNUNET_NO); | ||
1525 | goto check_error_and_continue; | ||
1526 | } | ||
1527 | return; | 1600 | return; |
1528 | } | 1601 | } |
1529 | prq.type = type; | 1602 | prq.type = type; |
@@ -1535,14 +1608,15 @@ process_local_reply (void *cls, | |||
1535 | prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; | 1608 | prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; |
1536 | process_reply (&prq, key, pr); | 1609 | process_reply (&prq, key, pr); |
1537 | pr->local_result = prq.eval; | 1610 | pr->local_result = prq.eval; |
1538 | if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) | 1611 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval) |
1539 | { | 1612 | { |
1540 | GNUNET_STATISTICS_update (GSF_stats, | 1613 | GNUNET_STATISTICS_update (GSF_stats, |
1541 | gettext_noop | 1614 | gettext_noop |
1542 | ("# Datastore lookups concluded (found last result)"), | 1615 | ("# Datastore lookups concluded (found last result)"), |
1543 | 1, | 1616 | 1, |
1544 | GNUNET_NO); | 1617 | GNUNET_NO); |
1545 | goto check_error_and_continue; | 1618 | call_continuation (pr); |
1619 | return; | ||
1546 | } | 1620 | } |
1547 | if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && | 1621 | if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && |
1548 | ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || | 1622 | ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || |
@@ -1554,66 +1628,12 @@ process_local_reply (void *cls, | |||
1554 | gettext_noop ("# Datastore lookups concluded (load too high)"), | 1628 | gettext_noop ("# Datastore lookups concluded (load too high)"), |
1555 | 1, | 1629 | 1, |
1556 | GNUNET_NO); | 1630 | GNUNET_NO); |
1557 | goto check_error_and_continue; | 1631 | call_continuation (pr); |
1558 | } | ||
1559 | pr->qe_start = GNUNET_TIME_absolute_get (); | ||
1560 | pr->warn_task = | ||
1561 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, | ||
1562 | &warn_delay_task, | ||
1563 | pr); | ||
1564 | pr->qe = | ||
1565 | GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, | ||
1566 | &pr->public_data.query, | ||
1567 | pr->public_data.type == | ||
1568 | GNUNET_BLOCK_TYPE_FS_DBLOCK ? | ||
1569 | GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, | ||
1570 | (0 != | ||
1571 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1572 | public_data.options)) ? UINT_MAX : 1 | ||
1573 | /* queue priority */ , | ||
1574 | (0 != | ||
1575 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1576 | public_data.options)) ? UINT_MAX : | ||
1577 | GSF_datastore_queue_size | ||
1578 | /* max queue size */ , | ||
1579 | &process_local_reply, pr); | ||
1580 | /* check if we successfully queued another datastore request; | ||
1581 | * if so, return, otherwise call our continuation (if we have | ||
1582 | * any) */ | ||
1583 | check_error_and_continue: | ||
1584 | if (NULL != pr->qe) | ||
1585 | return; | 1632 | return; |
1586 | if (NULL != pr->warn_task) | ||
1587 | { | ||
1588 | GNUNET_SCHEDULER_cancel (pr->warn_task); | ||
1589 | pr->warn_task = NULL; | ||
1590 | } | 1633 | } |
1591 | if (NULL == (cont = pr->llc_cont)) | 1634 | start_local_query (pr, |
1592 | return; /* no continuation */ | 1635 | uid + 1 /* next_uid */, |
1593 | pr->llc_cont = NULL; | 1636 | false /* random */); |
1594 | if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options)) | ||
1595 | { | ||
1596 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result) | ||
1597 | { | ||
1598 | /* Signal that we are done and that there won't be any | ||
1599 | additional results to allow client to clean up state. */ | ||
1600 | pr->rh (pr->rh_cls, | ||
1601 | GNUNET_BLOCK_EVALUATION_OK_LAST, | ||
1602 | pr, | ||
1603 | UINT32_MAX, | ||
1604 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
1605 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
1606 | GNUNET_BLOCK_TYPE_ANY, | ||
1607 | NULL, 0); | ||
1608 | } | ||
1609 | /* Finally, call our continuation to signal that we are | ||
1610 | done with local processing of this request; i.e. to | ||
1611 | start reading again from the client. */ | ||
1612 | cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST); | ||
1613 | return; | ||
1614 | } | ||
1615 | |||
1616 | cont (pr->llc_cont_cls, pr, pr->local_result); | ||
1617 | } | 1637 | } |
1618 | 1638 | ||
1619 | 1639 | ||
@@ -1657,43 +1677,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1657 | GNUNET_assert (NULL == pr->llc_cont); | 1677 | GNUNET_assert (NULL == pr->llc_cont); |
1658 | pr->llc_cont = cont; | 1678 | pr->llc_cont = cont; |
1659 | pr->llc_cont_cls = cont_cls; | 1679 | pr->llc_cont_cls = cont_cls; |
1660 | pr->qe_start = GNUNET_TIME_absolute_get (); | ||
1661 | pr->warn_task = | ||
1662 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, | ||
1663 | &warn_delay_task, | ||
1664 | pr); | ||
1665 | #if INSANE_STATISTICS | 1680 | #if INSANE_STATISTICS |
1666 | GNUNET_STATISTICS_update (GSF_stats, | 1681 | GNUNET_STATISTICS_update (GSF_stats, |
1667 | gettext_noop ("# Datastore lookups initiated"), 1, | 1682 | gettext_noop ("# Datastore lookups initiated"), 1, |
1668 | GNUNET_NO); | 1683 | GNUNET_NO); |
1669 | #endif | 1684 | #endif |
1670 | pr->qe = | 1685 | start_local_query(pr, |
1671 | GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, | 1686 | 0 /* next_uid */, |
1672 | &pr->public_data.query, | 1687 | true /* random */); |
1673 | pr->public_data.type == | ||
1674 | GNUNET_BLOCK_TYPE_FS_DBLOCK ? | ||
1675 | GNUNET_BLOCK_TYPE_ANY : pr->public_data.type, | ||
1676 | (0 != | ||
1677 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1678 | public_data.options)) ? UINT_MAX : 1 | ||
1679 | /* queue priority */ , | ||
1680 | (0 != | ||
1681 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | ||
1682 | public_data.options)) ? UINT_MAX : | ||
1683 | GSF_datastore_queue_size | ||
1684 | /* max queue size */ , | ||
1685 | &process_local_reply, pr); | ||
1686 | if (NULL != pr->qe) | ||
1687 | return; | ||
1688 | GNUNET_STATISTICS_update (GSF_stats, | ||
1689 | gettext_noop | ||
1690 | ("# Datastore lookups concluded (error queueing)"), | ||
1691 | 1, GNUNET_NO); | ||
1692 | GNUNET_SCHEDULER_cancel (pr->warn_task); | ||
1693 | pr->warn_task = NULL; | ||
1694 | pr->llc_cont = NULL; | ||
1695 | if (NULL != cont) | ||
1696 | cont (cont_cls, pr, pr->local_result); | ||
1697 | } | 1688 | } |
1698 | 1689 | ||
1699 | 1690 | ||
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index bb4cb4ecb..cd062bf2b 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c | |||
@@ -72,9 +72,14 @@ struct PutOperator | |||
72 | uint64_t zero_anonymity_count_estimate; | 72 | uint64_t zero_anonymity_count_estimate; |
73 | 73 | ||
74 | /** | 74 | /** |
75 | * Current offset when iterating the database. | 75 | * Count of results received from the database. |
76 | */ | 76 | */ |
77 | uint64_t current_offset; | 77 | uint64_t result_count; |
78 | |||
79 | /** | ||
80 | * Next UID to request when iterating the database. | ||
81 | */ | ||
82 | uint64_t next_uid; | ||
78 | }; | 83 | }; |
79 | 84 | ||
80 | 85 | ||
@@ -177,37 +182,43 @@ delay_dht_put_task (void *cls) | |||
177 | */ | 182 | */ |
178 | static void | 183 | static void |
179 | process_dht_put_content (void *cls, | 184 | process_dht_put_content (void *cls, |
180 | const struct GNUNET_HashCode * key, | 185 | const struct GNUNET_HashCode * key, |
181 | size_t size, | 186 | size_t size, |
182 | const void *data, | 187 | const void *data, |
183 | enum GNUNET_BLOCK_Type type, | 188 | enum GNUNET_BLOCK_Type type, |
184 | uint32_t priority, uint32_t anonymity, | 189 | uint32_t priority, |
185 | struct GNUNET_TIME_Absolute expiration, uint64_t uid) | 190 | uint32_t anonymity, |
191 | struct GNUNET_TIME_Absolute expiration, | ||
192 | uint64_t uid) | ||
186 | { | 193 | { |
187 | struct PutOperator *po = cls; | 194 | struct PutOperator *po = cls; |
188 | 195 | ||
189 | po->dht_qe = NULL; | 196 | po->dht_qe = NULL; |
190 | if (key == NULL) | 197 | if (key == NULL) |
191 | { | 198 | { |
192 | po->zero_anonymity_count_estimate = po->current_offset - 1; | 199 | po->zero_anonymity_count_estimate = po->result_count; |
193 | po->current_offset = 0; | 200 | po->result_count = 0; |
201 | po->next_uid = 0; | ||
194 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); | 202 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); |
195 | return; | 203 | return; |
196 | } | 204 | } |
205 | po->result_count++; | ||
206 | po->next_uid = uid + 1; | ||
197 | po->zero_anonymity_count_estimate = | 207 | po->zero_anonymity_count_estimate = |
198 | GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); | 208 | GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate); |
199 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 209 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
200 | "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), | 210 | "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), |
201 | type); | 211 | type); |
202 | po->dht_put = GNUNET_DHT_put (GSF_dht, | 212 | po->dht_put = GNUNET_DHT_put (GSF_dht, |
203 | key, | 213 | key, |
204 | DEFAULT_PUT_REPLICATION, | 214 | DEFAULT_PUT_REPLICATION, |
205 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | 215 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, |
206 | type, | 216 | type, |
207 | size, | 217 | size, |
208 | data, | 218 | data, |
209 | expiration, | 219 | expiration, |
210 | &delay_dht_put_blocks, po); | 220 | &delay_dht_put_blocks, |
221 | po); | ||
211 | } | 222 | } |
212 | 223 | ||
213 | 224 | ||
@@ -223,10 +234,13 @@ gather_dht_put_blocks (void *cls) | |||
223 | 234 | ||
224 | po->dht_task = NULL; | 235 | po->dht_task = NULL; |
225 | po->dht_qe = | 236 | po->dht_qe = |
226 | GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, | 237 | GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, |
238 | po->next_uid, | ||
239 | 0, | ||
227 | UINT_MAX, | 240 | UINT_MAX, |
228 | po->dht_put_type, | 241 | po->dht_put_type, |
229 | &process_dht_put_content, po); | 242 | &process_dht_put_content, |
243 | po); | ||
230 | if (NULL == po->dht_qe) | 244 | if (NULL == po->dht_qe) |
231 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); | 245 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); |
232 | } | 246 | } |