aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorDavid Barksdale <amatus@amat.us>2017-03-19 15:55:32 -0500
committerDavid Barksdale <amatus@amat.us>2017-03-19 17:38:36 -0500
commit2dde0202c5590eeb051c1346f2b66293d83b87ce (patch)
tree7997191912ee4c70959934d6c9783a0c9f450fec /src/fs
parentd17d833dfd93a81f3540d472d1be4dfb7e9cbd03 (diff)
downloadgnunet-2dde0202c5590eeb051c1346f2b66293d83b87ce.tar.gz
gnunet-2dde0202c5590eeb051c1346f2b66293d83b87ce.zip
[datastore] Fix #3743
This change adds support for key == NULL to the datastore plugins and replaces the offset argument with a next_uid and random arguments to increase performance in the key == NULL case. With the offset argument a datastore plugin would have to count all matching keys before fetching the key at the right offset, which would iterate over the entire database in the case of key == NULL. The offset argument was used in two ways: to iterate over a set of matching values and to start iteration at a random matching value. The new API seperates these into two arguments: if random is true it will return a random matching value, otherwise next_uid can be set to uid + 1 to return the next matching value. The random argument was not added to get_zero_anonymity. This function is used to periodically insert zero anonymity values into the DHT. I don't think it's necessary to randomize this.
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/fs_api.h10
-rw-r--r--src/fs/fs_unindex.c58
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c13
-rw-r--r--src/fs/gnunet-service-fs_pr.c403
-rw-r--r--src/fs/gnunet-service-fs_put.c44
5 files changed, 251 insertions, 277 deletions
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h
index e85de94a7..be22ea73e 100644
--- a/src/fs/fs_api.h
+++ b/src/fs/fs_api.h
@@ -1464,21 +1464,11 @@ struct GNUNET_FS_UnindexContext
1464 struct GNUNET_CRYPTO_FileHashContext *fhc; 1464 struct GNUNET_CRYPTO_FileHashContext *fhc;
1465 1465
1466 /** 1466 /**
1467 * Which values have we seen already?
1468 */
1469 struct GNUNET_CONTAINER_MultiHashMap *seen_dh;
1470
1471 /**
1472 * Overall size of the file. 1467 * Overall size of the file.
1473 */ 1468 */
1474 uint64_t file_size; 1469 uint64_t file_size;
1475 1470
1476 /** 1471 /**
1477 * Random offset given to #GNUNET_DATASTORE_get_key.
1478 */
1479 uint64_t roff;
1480
1481 /**
1482 * When did we start? 1472 * When did we start?
1483 */ 1473 */
1484 struct GNUNET_TIME_Absolute start_time; 1474 struct GNUNET_TIME_Absolute start_time;
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c
index ad1499f00..e1c7ea535 100644
--- a/src/fs/fs_unindex.c
+++ b/src/fs/fs_unindex.c
@@ -312,8 +312,6 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc)
312 uc->fh = NULL; 312 uc->fh = NULL;
313 GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO); 313 GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO);
314 uc->dsh = NULL; 314 uc->dsh = NULL;
315 GNUNET_CONTAINER_multihashmap_destroy (uc->seen_dh);
316 uc->seen_dh = NULL;
317 uc->state = UNINDEX_STATE_FS_NOTIFY; 315 uc->state = UNINDEX_STATE_FS_NOTIFY;
318 GNUNET_FS_unindex_sync_ (uc); 316 GNUNET_FS_unindex_sync_ (uc);
319 uc->mq = GNUNET_CLIENT_connect (uc->h->cfg, 317 uc->mq = GNUNET_CLIENT_connect (uc->h->cfg,
@@ -444,7 +442,6 @@ continue_after_remove (void *cls,
444 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 442 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
445 _("Failed to remove UBlock: %s\n"), 443 _("Failed to remove UBlock: %s\n"),
446 msg); 444 msg);
447 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
448 uc->ksk_offset++; 445 uc->ksk_offset++;
449 GNUNET_FS_unindex_do_remove_kblocks_ (uc); 446 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
450} 447}
@@ -486,34 +483,15 @@ process_kblock_for_unindex (void *cls,
486 const struct UBlock *ub; 483 const struct UBlock *ub;
487 struct GNUNET_FS_Uri *chk_uri; 484 struct GNUNET_FS_Uri *chk_uri;
488 struct GNUNET_HashCode query; 485 struct GNUNET_HashCode query;
489 struct GNUNET_HashCode dh;
490 486
491 uc->dqe = NULL; 487 uc->dqe = NULL;
492 if (NULL == data) 488 if (NULL == data)
493 { 489 {
494 /* no result */ 490 /* no result */
495 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
496 uc->ksk_offset++; 491 uc->ksk_offset++;
497 GNUNET_FS_unindex_do_remove_kblocks_ (uc); 492 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
498 return; 493 return;
499 } 494 }
500 GNUNET_CRYPTO_hash (data,
501 size,
502 &dh);
503 if (GNUNET_YES ==
504 GNUNET_CONTAINER_multihashmap_contains (uc->seen_dh,
505 &dh))
506 {
507 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
508 uc->ksk_offset++;
509 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
510 return;
511 }
512 GNUNET_assert (GNUNET_OK ==
513 GNUNET_CONTAINER_multihashmap_put (uc->seen_dh,
514 &dh,
515 uc,
516 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
517 GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type); 495 GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type);
518 if (size < sizeof (struct UBlock)) 496 if (size < sizeof (struct UBlock))
519 { 497 {
@@ -566,23 +544,24 @@ process_kblock_for_unindex (void *cls,
566 GNUNET_FS_uri_destroy (chk_uri); 544 GNUNET_FS_uri_destroy (chk_uri);
567 /* matches! */ 545 /* matches! */
568 uc->dqe = GNUNET_DATASTORE_remove (uc->dsh, 546 uc->dqe = GNUNET_DATASTORE_remove (uc->dsh,
569 key, 547 key,
570 size, 548 size,
571 data, 549 data,
572 0 /* priority */, 550 0 /* priority */,
573 1 /* queue size */, 551 1 /* queue size */,
574 &continue_after_remove, 552 &continue_after_remove,
575 uc); 553 uc);
576 return; 554 return;
577 get_next: 555 get_next:
578 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, 556 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
579 uc->roff++, 557 uid + 1 /* next_uid */,
580 &uc->uquery, 558 false /* random */,
581 GNUNET_BLOCK_TYPE_FS_UBLOCK, 559 &uc->uquery,
582 0 /* priority */, 560 GNUNET_BLOCK_TYPE_FS_UBLOCK,
561 0 /* priority */,
583 1 /* queue size */, 562 1 /* queue size */,
584 &process_kblock_for_unindex, 563 &process_kblock_for_unindex,
585 uc); 564 uc);
586} 565}
587 566
588 567
@@ -627,13 +606,14 @@ GNUNET_FS_unindex_do_remove_kblocks_ (struct GNUNET_FS_UnindexContext *uc)
627 sizeof (dpub), 606 sizeof (dpub),
628 &uc->uquery); 607 &uc->uquery);
629 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, 608 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
630 uc->roff++, 609 0 /* next_uid */,
631 &uc->uquery, 610 false /* random */,
632 GNUNET_BLOCK_TYPE_FS_UBLOCK, 611 &uc->uquery,
633 0 /* priority */, 612 GNUNET_BLOCK_TYPE_FS_UBLOCK,
613 0 /* priority */,
634 1 /* queue size */, 614 1 /* queue size */,
635 &process_kblock_for_unindex, 615 &process_kblock_for_unindex,
636 uc); 616 uc);
637} 617}
638 618
639 619
@@ -826,8 +806,6 @@ GNUNET_FS_unindex_start (struct GNUNET_FS_Handle *h,
826 uc->start_time = GNUNET_TIME_absolute_get (); 806 uc->start_time = GNUNET_TIME_absolute_get ();
827 uc->file_size = size; 807 uc->file_size = size;
828 uc->client_info = cctx; 808 uc->client_info = cctx;
829 uc->seen_dh = GNUNET_CONTAINER_multihashmap_create (4,
830 GNUNET_NO);
831 GNUNET_FS_unindex_sync_ (uc); 809 GNUNET_FS_unindex_sync_ (uc);
832 pi.status = GNUNET_FS_STATUS_UNINDEX_START; 810 pi.status = GNUNET_FS_STATUS_UNINDEX_START;
833 pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL; 811 pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL;
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
index b1a098175..f8619b812 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -345,12 +345,13 @@ handle_request (void *cls,
345 GNUNET_NO); 345 GNUNET_NO);
346 refresh_timeout_task (sc); 346 refresh_timeout_task (sc);
347 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 347 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
348 0, 348 0 /* next_uid */,
349 &sqm->query, 349 false /* random */,
350 ntohl (sqm->type), 350 &sqm->query,
351 0 /* priority */, 351 ntohl (sqm->type),
352 GSF_datastore_queue_size, 352 0 /* priority */,
353 &handle_datastore_reply, 353 GSF_datastore_queue_size,
354 &handle_datastore_reply,
354 sc); 355 sc);
355 if (NULL == sc->qe) 356 if (NULL == sc->qe)
356 { 357 {
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index b0fda24b5..b736b49c2 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -160,20 +160,27 @@ struct GSF_PendingRequest
160 struct GNUNET_SCHEDULER_Task * warn_task; 160 struct GNUNET_SCHEDULER_Task * warn_task;
161 161
162 /** 162 /**
163 * Current offset for querying our local datastore for results. 163 * Do we have a first UID yet?
164 * Starts at a random value, incremented until we get the same 164 */
165 * UID again (detected using 'first_uid'), which is then used 165 bool have_first_uid;
166 * to termiante the iteration. 166
167 /**
168 * Have we seen a NULL result yet?
167 */ 169 */
168 uint64_t local_result_offset; 170 bool seen_null;
169 171
170 /** 172 /**
171 * Unique ID of the first result from the local datastore; 173 * Unique ID of the first result from the local datastore;
172 * used to detect wrap-around of the offset. 174 * used to terminate the loop.
173 */ 175 */
174 uint64_t first_uid; 176 uint64_t first_uid;
175 177
176 /** 178 /**
179 * Result count.
180 */
181 size_t result_count;
182
183 /**
177 * How often have we retried this request via 'cadet'? 184 * How often have we retried this request via 'cadet'?
178 * (used to bound overall retries). 185 * (used to bound overall retries).
179 */ 186 */
@@ -189,11 +196,6 @@ struct GSF_PendingRequest
189 */ 196 */
190 unsigned int replies_seen_size; 197 unsigned int replies_seen_size;
191 198
192 /**
193 * Do we have a first UID yet?
194 */
195 unsigned int have_first_uid;
196
197}; 199};
198 200
199 201
@@ -332,8 +334,6 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
332 if (NULL != target) 334 if (NULL != target)
333 extra += sizeof (struct GNUNET_PeerIdentity); 335 extra += sizeof (struct GNUNET_PeerIdentity);
334 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra); 336 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
335 pr->local_result_offset =
336 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
337 pr->public_data.query = *query; 337 pr->public_data.query = *query;
338 eptr = (struct GNUNET_HashCode *) &pr[1]; 338 eptr = (struct GNUNET_HashCode *) &pr[1];
339 if (NULL != target) 339 if (NULL != target)
@@ -1340,6 +1340,123 @@ odc_warn_delay_task (void *cls)
1340} 1340}
1341 1341
1342 1342
1343/* Call our continuation (if we have any) */
1344static void
1345call_continuation (struct GSF_PendingRequest *pr)
1346{
1347 GSF_LocalLookupContinuation cont = pr->llc_cont;
1348
1349 GNUNET_assert (NULL == pr->qe);
1350 if (NULL != pr->warn_task)
1351 {
1352 GNUNET_SCHEDULER_cancel (pr->warn_task);
1353 pr->warn_task = NULL;
1354 }
1355 if (NULL == cont)
1356 return; /* no continuation */
1357 pr->llc_cont = NULL;
1358 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1359 {
1360 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1361 {
1362 /* Signal that we are done and that there won't be any
1363 additional results to allow client to clean up state. */
1364 pr->rh (pr->rh_cls,
1365 GNUNET_BLOCK_EVALUATION_OK_LAST,
1366 pr,
1367 UINT32_MAX,
1368 GNUNET_TIME_UNIT_ZERO_ABS,
1369 GNUNET_TIME_UNIT_FOREVER_ABS,
1370 GNUNET_BLOCK_TYPE_ANY,
1371 NULL,
1372 0);
1373 }
1374 /* Finally, call our continuation to signal that we are
1375 done with local processing of this request; i.e. to
1376 start reading again from the client. */
1377 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1378 return;
1379 }
1380
1381 cont (pr->llc_cont_cls, pr, pr->local_result);
1382}
1383
1384
1385/* Update stats and call continuation */
1386static void
1387no_more_local_results (struct GSF_PendingRequest *pr)
1388{
1389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1390 "No further local responses available.\n");
1391#if INSANE_STATISTICS
1392 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1393 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type) )
1394 GNUNET_STATISTICS_update (GSF_stats,
1395 gettext_noop ("# requested DBLOCK or IBLOCK not found"),
1396 1,
1397 GNUNET_NO);
1398#endif
1399 call_continuation (pr);
1400}
1401
1402
1403/* forward declaration */
1404static void
1405process_local_reply (void *cls,
1406 const struct GNUNET_HashCode *key,
1407 size_t size,
1408 const void *data,
1409 enum GNUNET_BLOCK_Type type,
1410 uint32_t priority,
1411 uint32_t anonymity,
1412 struct GNUNET_TIME_Absolute expiration,
1413 uint64_t uid);
1414
1415
1416/* Start a local query */
1417static void
1418start_local_query (struct GSF_PendingRequest *pr,
1419 uint64_t next_uid,
1420 bool random)
1421{
1422 pr->qe_start = GNUNET_TIME_absolute_get ();
1423 pr->warn_task =
1424 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1425 &warn_delay_task,
1426 pr);
1427 pr->qe =
1428 GNUNET_DATASTORE_get_key (GSF_dsh,
1429 next_uid,
1430 random,
1431 &pr->public_data.query,
1432 pr->public_data.type ==
1433 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1434 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1435 (0 !=
1436 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1437 public_data.options)) ? UINT_MAX : 1
1438 /* queue priority */ ,
1439 (0 !=
1440 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1441 public_data.options)) ? UINT_MAX :
1442 GSF_datastore_queue_size
1443 /* max queue size */ ,
1444 &process_local_reply, pr);
1445 if (NULL != pr->qe)
1446 return;
1447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1448 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1449 GNUNET_h2s (&pr->public_data.query),
1450 pr->public_data.type,
1451 (unsigned long long) next_uid);
1452 GNUNET_STATISTICS_update (GSF_stats,
1453 gettext_noop ("# Datastore lookups concluded (error queueing)"),
1454 1,
1455 GNUNET_NO);
1456 call_continuation (pr);
1457}
1458
1459
1343/** 1460/**
1344 * We're processing (local) results for a search request 1461 * We're processing (local) results for a search request
1345 * from another peer. Pass applicable results to the 1462 * from another peer. Pass applicable results to the
@@ -1369,69 +1486,71 @@ process_local_reply (void *cls,
1369 uint64_t uid) 1486 uint64_t uid)
1370{ 1487{
1371 struct GSF_PendingRequest *pr = cls; 1488 struct GSF_PendingRequest *pr = cls;
1372 GSF_LocalLookupContinuation cont;
1373 struct ProcessReplyClosure prq; 1489 struct ProcessReplyClosure prq;
1374 struct GNUNET_HashCode query; 1490 struct GNUNET_HashCode query;
1375 unsigned int old_rf; 1491 unsigned int old_rf;
1376 1492
1377 GNUNET_SCHEDULER_cancel (pr->warn_task); 1493 GNUNET_SCHEDULER_cancel (pr->warn_task);
1378 pr->warn_task = NULL; 1494 pr->warn_task = NULL;
1379 if (NULL != pr->qe) 1495 if (NULL == pr->qe)
1496 goto called_from_on_demand;
1497 pr->qe = NULL;
1498 if ( (NULL == key) &&
1499 pr->seen_null &&
1500 !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1380 { 1501 {
1381 pr->qe = NULL; 1502 /* No results */
1382 if (NULL == key)
1383 {
1384#if INSANE_STATISTICS 1503#if INSANE_STATISTICS
1385 GNUNET_STATISTICS_update (GSF_stats, 1504 GNUNET_STATISTICS_update (GSF_stats,
1386 gettext_noop 1505 gettext_noop
1387 ("# Datastore lookups concluded (no results)"), 1506 ("# Datastore lookups concluded (no results)"),
1388 1, GNUNET_NO); 1507 1, GNUNET_NO);
1389#endif 1508#endif
1390 } 1509 no_more_local_results (pr);
1391 if (GNUNET_NO == pr->have_first_uid) 1510 return;
1392 { 1511 }
1393 pr->first_uid = uid; 1512 if ( ( (NULL == key) &&
1394 pr->have_first_uid = 1; 1513 pr->seen_null ) || /* We have hit the end for the 2nd time OR */
1395 } 1514 ( pr->seen_null &&
1396 else 1515 pr->have_first_uid &&
1397 { 1516 (uid >= pr->first_uid) ) ) /* We have hit the end and past first UID */
1398 if ((uid == pr->first_uid) && (key != NULL)) 1517 {
1399 { 1518 /* Seen all results */
1400 GNUNET_STATISTICS_update (GSF_stats, 1519 GNUNET_STATISTICS_update (GSF_stats,
1401 gettext_noop 1520 gettext_noop
1402 ("# Datastore lookups concluded (seen all)"), 1521 ("# Datastore lookups concluded (seen all)"),
1403 1, GNUNET_NO); 1522 1, GNUNET_NO);
1404 key = NULL; /* all replies seen! */ 1523 no_more_local_results (pr);
1405 } 1524 return;
1406 pr->have_first_uid++;
1407 if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL))
1408 {
1409 GNUNET_STATISTICS_update (GSF_stats,
1410 gettext_noop
1411 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1412 1, GNUNET_NO);
1413 key = NULL; /* all replies seen! */
1414 }
1415 }
1416 } 1525 }
1417 if (NULL == key) 1526 if (NULL == key)
1418 { 1527 {
1419 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 1528 GNUNET_assert (!pr->seen_null);
1420 "No further local responses available.\n"); 1529 pr->seen_null = true;
1421#if INSANE_STATISTICS 1530 start_local_query (pr,
1422 if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || 1531 0 /* next_uid */,
1423 (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK)) 1532 false /* random */);
1424 GNUNET_STATISTICS_update (GSF_stats, 1533 return;
1425 gettext_noop 1534 }
1426 ("# requested DBLOCK or IBLOCK not found"), 1, 1535 if (!pr->have_first_uid)
1427 GNUNET_NO); 1536 {
1428#endif 1537 pr->first_uid = uid;
1429 goto check_error_and_continue; 1538 pr->have_first_uid = true;
1539 }
1540 pr->result_count++;
1541 if (pr->result_count > MAX_RESULTS)
1542 {
1543 GNUNET_STATISTICS_update (GSF_stats,
1544 gettext_noop
1545 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1546 1, GNUNET_NO);
1547 no_more_local_results (pr);
1548 return;
1430 } 1549 }
1431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432 "Received reply for `%s' of type %d with UID %llu from datastore.\n", 1551 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1433 GNUNET_h2s (key), type, (unsigned long long) uid); 1552 GNUNET_h2s (key), type, (unsigned long long) uid);
1434 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1553 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1435 { 1554 {
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437 "Found ONDEMAND block, performing on-demand encoding\n"); 1556 "Found ONDEMAND block, performing on-demand encoding\n");
@@ -1458,33 +1577,12 @@ process_local_reply (void *cls,
1458 gettext_noop ("# on-demand lookups failed"), 1, 1577 gettext_noop ("# on-demand lookups failed"), 1,
1459 GNUNET_NO); 1578 GNUNET_NO);
1460 GNUNET_SCHEDULER_cancel (pr->warn_task); 1579 GNUNET_SCHEDULER_cancel (pr->warn_task);
1461 pr->warn_task = 1580 start_local_query (pr,
1462 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1581 uid + 1 /* next_uid */,
1463 &warn_delay_task, pr); 1582 false /* random */);
1464 pr->qe = 1583 return;
1465 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1466 &pr->public_data.query,
1467 pr->public_data.type ==
1468 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1469 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1470 (0 !=
1471 (GSF_PRO_PRIORITY_UNLIMITED &
1472 pr->public_data.options)) ? UINT_MAX : 1
1473 /* queue priority */ ,
1474 (0 !=
1475 (GSF_PRO_PRIORITY_UNLIMITED &
1476 pr->public_data.options)) ? UINT_MAX :
1477 GSF_datastore_queue_size
1478 /* max queue size */ ,
1479 &process_local_reply, pr);
1480 if (NULL != pr->qe)
1481 return; /* we're done */
1482 GNUNET_STATISTICS_update (GSF_stats,
1483 gettext_noop
1484 ("# Datastore lookups concluded (error queueing)"),
1485 1, GNUNET_NO);
1486 goto check_error_and_continue;
1487 } 1584 }
1585called_from_on_demand:
1488 old_rf = pr->public_data.results_found; 1586 old_rf = pr->public_data.results_found;
1489 memset (&prq, 0, sizeof (prq)); 1587 memset (&prq, 0, sizeof (prq));
1490 prq.data = data; 1588 prq.data = data;
@@ -1496,34 +1594,9 @@ process_local_reply (void *cls,
1496 GNUNET_break (0); 1594 GNUNET_break (0);
1497 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, 1595 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
1498 NULL, NULL); 1596 NULL, NULL);
1499 pr->qe_start = GNUNET_TIME_absolute_get (); 1597 start_local_query (pr,
1500 pr->warn_task = 1598 uid + 1 /* next_uid */,
1501 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1599 false /* random */);
1502 &warn_delay_task, pr);
1503 pr->qe =
1504 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1505 &pr->public_data.query,
1506 pr->public_data.type ==
1507 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1508 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1509 (0 !=
1510 (GSF_PRO_PRIORITY_UNLIMITED &
1511 pr->public_data.options)) ? UINT_MAX : 1
1512 /* queue priority */ ,
1513 (0 !=
1514 (GSF_PRO_PRIORITY_UNLIMITED &
1515 pr->public_data.options)) ? UINT_MAX :
1516 GSF_datastore_queue_size
1517 /* max queue size */ ,
1518 &process_local_reply, pr);
1519 if (NULL == pr->qe)
1520 {
1521 GNUNET_STATISTICS_update (GSF_stats,
1522 gettext_noop
1523 ("# Datastore lookups concluded (error queueing)"),
1524 1, GNUNET_NO);
1525 goto check_error_and_continue;
1526 }
1527 return; 1600 return;
1528 } 1601 }
1529 prq.type = type; 1602 prq.type = type;
@@ -1535,14 +1608,15 @@ process_local_reply (void *cls,
1535 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; 1608 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1536 process_reply (&prq, key, pr); 1609 process_reply (&prq, key, pr);
1537 pr->local_result = prq.eval; 1610 pr->local_result = prq.eval;
1538 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) 1611 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
1539 { 1612 {
1540 GNUNET_STATISTICS_update (GSF_stats, 1613 GNUNET_STATISTICS_update (GSF_stats,
1541 gettext_noop 1614 gettext_noop
1542 ("# Datastore lookups concluded (found last result)"), 1615 ("# Datastore lookups concluded (found last result)"),
1543 1, 1616 1,
1544 GNUNET_NO); 1617 GNUNET_NO);
1545 goto check_error_and_continue; 1618 call_continuation (pr);
1619 return;
1546 } 1620 }
1547 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1621 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1548 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1622 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1554,66 +1628,12 @@ process_local_reply (void *cls,
1554 gettext_noop ("# Datastore lookups concluded (load too high)"), 1628 gettext_noop ("# Datastore lookups concluded (load too high)"),
1555 1, 1629 1,
1556 GNUNET_NO); 1630 GNUNET_NO);
1557 goto check_error_and_continue; 1631 call_continuation (pr);
1558 }
1559 pr->qe_start = GNUNET_TIME_absolute_get ();
1560 pr->warn_task =
1561 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1562 &warn_delay_task,
1563 pr);
1564 pr->qe =
1565 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1566 &pr->public_data.query,
1567 pr->public_data.type ==
1568 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1569 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1570 (0 !=
1571 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1572 public_data.options)) ? UINT_MAX : 1
1573 /* queue priority */ ,
1574 (0 !=
1575 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1576 public_data.options)) ? UINT_MAX :
1577 GSF_datastore_queue_size
1578 /* max queue size */ ,
1579 &process_local_reply, pr);
1580 /* check if we successfully queued another datastore request;
1581 * if so, return, otherwise call our continuation (if we have
1582 * any) */
1583check_error_and_continue:
1584 if (NULL != pr->qe)
1585 return; 1632 return;
1586 if (NULL != pr->warn_task)
1587 {
1588 GNUNET_SCHEDULER_cancel (pr->warn_task);
1589 pr->warn_task = NULL;
1590 } 1633 }
1591 if (NULL == (cont = pr->llc_cont)) 1634 start_local_query (pr,
1592 return; /* no continuation */ 1635 uid + 1 /* next_uid */,
1593 pr->llc_cont = NULL; 1636 false /* random */);
1594 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1595 {
1596 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1597 {
1598 /* Signal that we are done and that there won't be any
1599 additional results to allow client to clean up state. */
1600 pr->rh (pr->rh_cls,
1601 GNUNET_BLOCK_EVALUATION_OK_LAST,
1602 pr,
1603 UINT32_MAX,
1604 GNUNET_TIME_UNIT_ZERO_ABS,
1605 GNUNET_TIME_UNIT_FOREVER_ABS,
1606 GNUNET_BLOCK_TYPE_ANY,
1607 NULL, 0);
1608 }
1609 /* Finally, call our continuation to signal that we are
1610 done with local processing of this request; i.e. to
1611 start reading again from the client. */
1612 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1613 return;
1614 }
1615
1616 cont (pr->llc_cont_cls, pr, pr->local_result);
1617} 1637}
1618 1638
1619 1639
@@ -1657,43 +1677,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1657 GNUNET_assert (NULL == pr->llc_cont); 1677 GNUNET_assert (NULL == pr->llc_cont);
1658 pr->llc_cont = cont; 1678 pr->llc_cont = cont;
1659 pr->llc_cont_cls = cont_cls; 1679 pr->llc_cont_cls = cont_cls;
1660 pr->qe_start = GNUNET_TIME_absolute_get ();
1661 pr->warn_task =
1662 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1663 &warn_delay_task,
1664 pr);
1665#if INSANE_STATISTICS 1680#if INSANE_STATISTICS
1666 GNUNET_STATISTICS_update (GSF_stats, 1681 GNUNET_STATISTICS_update (GSF_stats,
1667 gettext_noop ("# Datastore lookups initiated"), 1, 1682 gettext_noop ("# Datastore lookups initiated"), 1,
1668 GNUNET_NO); 1683 GNUNET_NO);
1669#endif 1684#endif
1670 pr->qe = 1685 start_local_query(pr,
1671 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, 1686 0 /* next_uid */,
1672 &pr->public_data.query, 1687 true /* random */);
1673 pr->public_data.type ==
1674 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1675 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1676 (0 !=
1677 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1678 public_data.options)) ? UINT_MAX : 1
1679 /* queue priority */ ,
1680 (0 !=
1681 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1682 public_data.options)) ? UINT_MAX :
1683 GSF_datastore_queue_size
1684 /* max queue size */ ,
1685 &process_local_reply, pr);
1686 if (NULL != pr->qe)
1687 return;
1688 GNUNET_STATISTICS_update (GSF_stats,
1689 gettext_noop
1690 ("# Datastore lookups concluded (error queueing)"),
1691 1, GNUNET_NO);
1692 GNUNET_SCHEDULER_cancel (pr->warn_task);
1693 pr->warn_task = NULL;
1694 pr->llc_cont = NULL;
1695 if (NULL != cont)
1696 cont (cont_cls, pr, pr->local_result);
1697} 1688}
1698 1689
1699 1690
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index bb4cb4ecb..cd062bf2b 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -72,9 +72,14 @@ struct PutOperator
72 uint64_t zero_anonymity_count_estimate; 72 uint64_t zero_anonymity_count_estimate;
73 73
74 /** 74 /**
75 * Current offset when iterating the database. 75 * Count of results received from the database.
76 */ 76 */
77 uint64_t current_offset; 77 uint64_t result_count;
78
79 /**
80 * Next UID to request when iterating the database.
81 */
82 uint64_t next_uid;
78}; 83};
79 84
80 85
@@ -177,37 +182,43 @@ delay_dht_put_task (void *cls)
177 */ 182 */
178static void 183static void
179process_dht_put_content (void *cls, 184process_dht_put_content (void *cls,
180 const struct GNUNET_HashCode * key, 185 const struct GNUNET_HashCode * key,
181 size_t size, 186 size_t size,
182 const void *data, 187 const void *data,
183 enum GNUNET_BLOCK_Type type, 188 enum GNUNET_BLOCK_Type type,
184 uint32_t priority, uint32_t anonymity, 189 uint32_t priority,
185 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 190 uint32_t anonymity,
191 struct GNUNET_TIME_Absolute expiration,
192 uint64_t uid)
186{ 193{
187 struct PutOperator *po = cls; 194 struct PutOperator *po = cls;
188 195
189 po->dht_qe = NULL; 196 po->dht_qe = NULL;
190 if (key == NULL) 197 if (key == NULL)
191 { 198 {
192 po->zero_anonymity_count_estimate = po->current_offset - 1; 199 po->zero_anonymity_count_estimate = po->result_count;
193 po->current_offset = 0; 200 po->result_count = 0;
201 po->next_uid = 0;
194 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); 202 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
195 return; 203 return;
196 } 204 }
205 po->result_count++;
206 po->next_uid = uid + 1;
197 po->zero_anonymity_count_estimate = 207 po->zero_anonymity_count_estimate =
198 GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); 208 GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate);
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), 210 "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
201 type); 211 type);
202 po->dht_put = GNUNET_DHT_put (GSF_dht, 212 po->dht_put = GNUNET_DHT_put (GSF_dht,
203 key, 213 key,
204 DEFAULT_PUT_REPLICATION, 214 DEFAULT_PUT_REPLICATION,
205 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 215 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
206 type, 216 type,
207 size, 217 size,
208 data, 218 data,
209 expiration, 219 expiration,
210 &delay_dht_put_blocks, po); 220 &delay_dht_put_blocks,
221 po);
211} 222}
212 223
213 224
@@ -223,10 +234,13 @@ gather_dht_put_blocks (void *cls)
223 234
224 po->dht_task = NULL; 235 po->dht_task = NULL;
225 po->dht_qe = 236 po->dht_qe =
226 GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, 237 GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
238 po->next_uid,
239 0,
227 UINT_MAX, 240 UINT_MAX,
228 po->dht_put_type, 241 po->dht_put_type,
229 &process_dht_put_content, po); 242 &process_dht_put_content,
243 po);
230 if (NULL == po->dht_qe) 244 if (NULL == po->dht_qe)
231 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); 245 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
232} 246}