aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/datastore/datastore.h22
-rw-r--r--src/datastore/datastore_api.c27
-rw-r--r--src/datastore/gnunet-datastore.c2
-rw-r--r--src/datastore/gnunet-service-datastore.c20
-rw-r--r--src/datastore/plugin_datastore_heap.c207
-rw-r--r--src/datastore/plugin_datastore_mysql.c201
-rw-r--r--src/datastore/plugin_datastore_postgres.c233
-rw-r--r--src/datastore/plugin_datastore_sqlite.c261
-rw-r--r--src/datastore/plugin_datastore_template.c17
-rw-r--r--src/datastore/test_datastore_api.c43
-rw-r--r--src/datastore/test_datastore_api_management.c20
-rw-r--r--src/datastore/test_plugin_datastore.c4
-rw-r--r--src/fs/fs_api.h10
-rw-r--r--src/fs/fs_unindex.c58
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c13
-rw-r--r--src/fs/gnunet-service-fs_pr.c403
-rw-r--r--src/fs/gnunet-service-fs_put.c44
-rw-r--r--src/include/gnunet_datastore_plugin.h41
-rw-r--r--src/include/gnunet_datastore_service.h20
-rw-r--r--src/include/platform.h1
20 files changed, 594 insertions, 1053 deletions
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h
index 9de72f064..5fd360161 100644
--- a/src/datastore/datastore.h
+++ b/src/datastore/datastore.h
@@ -119,9 +119,14 @@ struct GetKeyMessage
119 uint32_t type GNUNET_PACKED; 119 uint32_t type GNUNET_PACKED;
120 120
121 /** 121 /**
122 * Offset of the result. 122 * UID at which to start the search
123 */ 123 */
124 uint64_t offset GNUNET_PACKED; 124 uint64_t next_uid GNUNET_PACKED;
125
126 /**
127 * If true return a random result
128 */
129 uint32_t random GNUNET_PACKED;
125 130
126 /** 131 /**
127 * Desired key. 132 * Desired key.
@@ -148,9 +153,14 @@ struct GetMessage
148 uint32_t type GNUNET_PACKED; 153 uint32_t type GNUNET_PACKED;
149 154
150 /** 155 /**
151 * Offset of the result. 156 * UID at which to start the search
157 */
158 uint64_t next_uid GNUNET_PACKED;
159
160 /**
161 * If true return a random result
152 */ 162 */
153 uint64_t offset GNUNET_PACKED; 163 uint32_t random GNUNET_PACKED;
154 164
155}; 165};
156 166
@@ -172,9 +182,9 @@ struct GetZeroAnonymityMessage
172 uint32_t type GNUNET_PACKED; 182 uint32_t type GNUNET_PACKED;
173 183
174 /** 184 /**
175 * Offset of the result. 185 * UID at which to start the search
176 */ 186 */
177 uint64_t offset GNUNET_PACKED; 187 uint64_t next_uid GNUNET_PACKED;
178 188
179}; 189};
180 190
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index c677654aa..26e1e501d 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -1325,10 +1325,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1325 * Get a single zero-anonymity value from the datastore. 1325 * Get a single zero-anonymity value from the datastore.
1326 * 1326 *
1327 * @param h handle to the datastore 1327 * @param h handle to the datastore
1328 * @param offset offset of the result (modulo num-results); set to 1328 * @param next_uid return the result with lowest uid >= next_uid
1329 * a random 64-bit value initially; then increment by
1330 * one each time; detect that all results have been found by uid
1331 * being again the first uid ever returned.
1332 * @param queue_priority ranking of this request in the priority queue 1329 * @param queue_priority ranking of this request in the priority queue
1333 * @param max_queue_size at what queue size should this request be dropped 1330 * @param max_queue_size at what queue size should this request be dropped
1334 * (if other requests of higher priority are in the queue) 1331 * (if other requests of higher priority are in the queue)
@@ -1342,7 +1339,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1342 */ 1339 */
1343struct GNUNET_DATASTORE_QueueEntry * 1340struct GNUNET_DATASTORE_QueueEntry *
1344GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1341GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1345 uint64_t offset, 1342 uint64_t next_uid,
1346 unsigned int queue_priority, 1343 unsigned int queue_priority,
1347 unsigned int max_queue_size, 1344 unsigned int max_queue_size,
1348 enum GNUNET_BLOCK_Type type, 1345 enum GNUNET_BLOCK_Type type,
@@ -1357,13 +1354,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1357 GNUNET_assert (NULL != proc); 1354 GNUNET_assert (NULL != proc);
1358 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1355 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1359 LOG (GNUNET_ERROR_TYPE_DEBUG, 1356 LOG (GNUNET_ERROR_TYPE_DEBUG,
1360 "Asked to get %llu-th zero-anonymity entry of type %d\n", 1357 "Asked to get a zero-anonymity entry of type %d\n",
1361 (unsigned long long) offset,
1362 type); 1358 type);
1363 env = GNUNET_MQ_msg (m, 1359 env = GNUNET_MQ_msg (m,
1364 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1360 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1365 m->type = htonl ((uint32_t) type); 1361 m->type = htonl ((uint32_t) type);
1366 m->offset = GNUNET_htonll (offset); 1362 m->next_uid = GNUNET_htonll (next_uid);
1367 qc.rc.proc = proc; 1363 qc.rc.proc = proc;
1368 qc.rc.proc_cls = proc_cls; 1364 qc.rc.proc_cls = proc_cls;
1369 qe = make_queue_entry (h, 1365 qe = make_queue_entry (h,
@@ -1392,10 +1388,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1392 * will only be called once. 1388 * will only be called once.
1393 * 1389 *
1394 * @param h handle to the datastore 1390 * @param h handle to the datastore
1395 * @param offset offset of the result (modulo num-results); set to 1391 * @param next_uid return the result with lowest uid >= next_uid
1396 * a random 64-bit value initially; then increment by 1392 * @param random if true, return a random result instead of using next_uid
1397 * one each time; detect that all results have been found by uid
1398 * being again the first uid ever returned.
1399 * @param key maybe NULL (to match all entries) 1393 * @param key maybe NULL (to match all entries)
1400 * @param type desired type, 0 for any 1394 * @param type desired type, 0 for any
1401 * @param queue_priority ranking of this request in the priority queue 1395 * @param queue_priority ranking of this request in the priority queue
@@ -1409,7 +1403,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1409 */ 1403 */
1410struct GNUNET_DATASTORE_QueueEntry * 1404struct GNUNET_DATASTORE_QueueEntry *
1411GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1405GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1412 uint64_t offset, 1406 uint64_t next_uid,
1407 bool random,
1413 const struct GNUNET_HashCode *key, 1408 const struct GNUNET_HashCode *key,
1414 enum GNUNET_BLOCK_Type type, 1409 enum GNUNET_BLOCK_Type type,
1415 unsigned int queue_priority, 1410 unsigned int queue_priority,
@@ -1433,14 +1428,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1433 env = GNUNET_MQ_msg (gm, 1428 env = GNUNET_MQ_msg (gm,
1434 GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1429 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1435 gm->type = htonl (type); 1430 gm->type = htonl (type);
1436 gm->offset = GNUNET_htonll (offset); 1431 gm->next_uid = GNUNET_htonll (next_uid);
1432 gm->random = random;
1437 } 1433 }
1438 else 1434 else
1439 { 1435 {
1440 env = GNUNET_MQ_msg (gkm, 1436 env = GNUNET_MQ_msg (gkm,
1441 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); 1437 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1442 gkm->type = htonl (type); 1438 gkm->type = htonl (type);
1443 gkm->offset = GNUNET_htonll (offset); 1439 gkm->next_uid = GNUNET_htonll (next_uid);
1440 gkm->random = random;
1444 gkm->key = *key; 1441 gkm->key = *key;
1445 } 1442 }
1446 qc.rc.proc = proc; 1443 qc.rc.proc = proc;
diff --git a/src/datastore/gnunet-datastore.c b/src/datastore/gnunet-datastore.c
index 509c7f8b1..c93bc8dd3 100644
--- a/src/datastore/gnunet-datastore.c
+++ b/src/datastore/gnunet-datastore.c
@@ -171,7 +171,7 @@ static void
171do_get () 171do_get ()
172{ 172{
173 qe = GNUNET_DATASTORE_get_key (db_src, 173 qe = GNUNET_DATASTORE_get_key (db_src,
174 offset, 174 0, false,
175 NULL, GNUNET_BLOCK_TYPE_ANY, 175 NULL, GNUNET_BLOCK_TYPE_ANY,
176 0, 1, 176 0, 1,
177 &do_put, NULL); 177 &do_put, NULL);
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index dabec3d6d..af33c4412 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -984,12 +984,13 @@ handle_put (void *cls,
984 size, 984 size,
985 &vhash); 985 &vhash);
986 plugin->api->get_key (plugin->api->cls, 986 plugin->api->get_key (plugin->api->cls,
987 0, 987 0,
988 &dm->key, 988 false,
989 &vhash, 989 &dm->key,
990 &vhash,
990 ntohl (dm->type), 991 ntohl (dm->type),
991 &check_present, 992 &check_present,
992 pc); 993 pc);
993 GNUNET_SERVICE_client_continue (client); 994 GNUNET_SERVICE_client_continue (client);
994 return; 995 return;
995 } 996 }
@@ -1018,7 +1019,8 @@ handle_get (void *cls,
1018 1, 1019 1,
1019 GNUNET_NO); 1020 GNUNET_NO);
1020 plugin->api->get_key (plugin->api->cls, 1021 plugin->api->get_key (plugin->api->cls,
1021 GNUNET_ntohll (msg->offset), 1022 GNUNET_ntohll (msg->next_uid),
1023 msg->random,
1022 NULL, 1024 NULL,
1023 NULL, 1025 NULL,
1024 ntohl (msg->type), 1026 ntohl (msg->type),
@@ -1069,7 +1071,8 @@ handle_get_key (void *cls,
1069 return; 1071 return;
1070 } 1072 }
1071 plugin->api->get_key (plugin->api->cls, 1073 plugin->api->get_key (plugin->api->cls,
1072 GNUNET_ntohll (msg->offset), 1074 GNUNET_ntohll (msg->next_uid),
1075 msg->random,
1073 &msg->key, 1076 &msg->key,
1074 NULL, 1077 NULL,
1075 ntohl (msg->type), 1078 ntohl (msg->type),
@@ -1131,7 +1134,7 @@ handle_get_zero_anonymity (void *cls,
1131 1, 1134 1,
1132 GNUNET_NO); 1135 GNUNET_NO);
1133 plugin->api->get_zero_anonymity (plugin->api->cls, 1136 plugin->api->get_zero_anonymity (plugin->api->cls,
1134 GNUNET_ntohll (msg->offset), 1137 GNUNET_ntohll (msg->next_uid),
1135 type, 1138 type,
1136 &transmit_item, 1139 &transmit_item,
1137 client); 1140 client);
@@ -1241,6 +1244,7 @@ handle_remove (void *cls,
1241 (uint32_t) ntohl (dm->type)); 1244 (uint32_t) ntohl (dm->type));
1242 plugin->api->get_key (plugin->api->cls, 1245 plugin->api->get_key (plugin->api->cls,
1243 0, 1246 0,
1247 false,
1244 &dm->key, 1248 &dm->key,
1245 &vhash, 1249 &vhash,
1246 (enum GNUNET_BLOCK_Type) ntohl (dm->type), 1250 (enum GNUNET_BLOCK_Type) ntohl (dm->type),
diff --git a/src/datastore/plugin_datastore_heap.c b/src/datastore/plugin_datastore_heap.c
index 199c03a50..e15cacb5b 100644
--- a/src/datastore/plugin_datastore_heap.c
+++ b/src/datastore/plugin_datastore_heap.c
@@ -323,19 +323,19 @@ struct GetContext
323{ 323{
324 324
325 /** 325 /**
326 * Desired result offset / number of results. 326 * Lowest uid to consider.
327 */ 327 */
328 uint64_t offset; 328 uint64_t next_uid;
329 329
330 /** 330 /**
331 * The plugin. 331 * Value with lowest uid >= next_uid found so far.
332 */ 332 */
333 struct Plugin *plugin; 333 struct Value *value;
334 334
335 /** 335 /**
336 * Requested value hash. 336 * Requested value hash.
337 */ 337 */
338 const struct GNUNET_HashCode * vhash; 338 const struct GNUNET_HashCode *vhash;
339 339
340 /** 340 /**
341 * Requested type. 341 * Requested type.
@@ -343,68 +343,15 @@ struct GetContext
343 enum GNUNET_BLOCK_Type type; 343 enum GNUNET_BLOCK_Type type;
344 344
345 /** 345 /**
346 * Function to call with the result. 346 * If true, return a random value
347 */ 347 */
348 PluginDatumProcessor proc; 348 bool random;
349 349
350 /**
351 * Closure for 'proc'.
352 */
353 void *proc_cls;
354}; 350};
355 351
356 352
357/** 353/**
358 * Test if a value matches the specification from the 'get' context 354 * Obtain the matching value with the lowest uid >= next_uid.
359 *
360 * @param gc query
361 * @param value the value to check against the query
362 * @return GNUNET_YES if the value matches
363 */
364static int
365match (const struct GetContext *gc,
366 struct Value *value)
367{
368 struct GNUNET_HashCode vh;
369
370 if ( (gc->type != GNUNET_BLOCK_TYPE_ANY) &&
371 (gc->type != value->type) )
372 return GNUNET_NO;
373 if (NULL != gc->vhash)
374 {
375 GNUNET_CRYPTO_hash (&value[1], value->size, &vh);
376 if (0 != memcmp (&vh, gc->vhash, sizeof (struct GNUNET_HashCode)))
377 return GNUNET_NO;
378 }
379 return GNUNET_YES;
380}
381
382
383/**
384 * Count number of matching values.
385 *
386 * @param cls the 'struct GetContext'
387 * @param key unused
388 * @param val the 'struct Value'
389 * @return GNUNET_YES (continue iteration)
390 */
391static int
392count_iterator (void *cls,
393 const struct GNUNET_HashCode *key,
394 void *val)
395{
396 struct GetContext *gc = cls;
397 struct Value *value = val;
398
399 if (GNUNET_NO == match (gc, value))
400 return GNUNET_OK;
401 gc->offset++;
402 return GNUNET_OK;
403}
404
405
406/**
407 * Obtain matching value at 'offset'.
408 * 355 *
409 * @param cls the 'struct GetContext' 356 * @param cls the 'struct GetContext'
410 * @param key unused 357 * @param key unused
@@ -418,23 +365,29 @@ get_iterator (void *cls,
418{ 365{
419 struct GetContext *gc = cls; 366 struct GetContext *gc = cls;
420 struct Value *value = val; 367 struct Value *value = val;
368 struct GNUNET_HashCode vh;
421 369
422 if (GNUNET_NO == match (gc, value)) 370 if ( (gc->type != GNUNET_BLOCK_TYPE_ANY) &&
371 (gc->type != value->type) )
423 return GNUNET_OK; 372 return GNUNET_OK;
424 if (0 != gc->offset--) 373 if (NULL != gc->vhash)
374 {
375 GNUNET_CRYPTO_hash (&value[1], value->size, &vh);
376 if (0 != memcmp (&vh, gc->vhash, sizeof (struct GNUNET_HashCode)))
377 return GNUNET_OK;
378 }
379 if (gc->random)
380 {
381 gc->value = value;
382 return GNUNET_NO;
383 }
384 if ( (uint64_t) (intptr_t) value < gc->next_uid)
425 return GNUNET_OK; 385 return GNUNET_OK;
426 if (GNUNET_NO == 386 if ( (NULL != gc->value) &&
427 gc->proc (gc->proc_cls, 387 (value > gc->value) )
428 key, 388 return GNUNET_OK;
429 value->size, 389 gc->value = value;
430 &value[1], 390 return GNUNET_OK;
431 value->type,
432 value->priority,
433 value->anonymity,
434 value->expiration,
435 (uint64_t) (long) value))
436 delete_value (gc->plugin, value);
437 return GNUNET_NO;
438} 391}
439 392
440 393
@@ -442,8 +395,8 @@ get_iterator (void *cls,
442 * Get one of the results for a particular key in the datastore. 395 * Get one of the results for a particular key in the datastore.
443 * 396 *
444 * @param cls closure 397 * @param cls closure
445 * @param offset offset of the result (modulo num-results); 398 * @param next_uid return the result with lowest uid >= next_uid
446 * specific ordering does not matter for the offset 399 * @param random if true, return a random result instead of using next_uid
447 * @param key maybe NULL (to match all entries) 400 * @param key maybe NULL (to match all entries)
448 * @param vhash hash of the value, maybe NULL (to 401 * @param vhash hash of the value, maybe NULL (to
449 * match all values that have the right key). 402 * match all values that have the right key).
@@ -457,7 +410,7 @@ get_iterator (void *cls,
457 * @param proc_cls closure for proc 410 * @param proc_cls closure for proc
458 */ 411 */
459static void 412static void
460heap_plugin_get_key (void *cls, uint64_t offset, 413heap_plugin_get_key (void *cls, uint64_t next_uid, bool random,
461 const struct GNUNET_HashCode *key, 414 const struct GNUNET_HashCode *key,
462 const struct GNUNET_HashCode *vhash, 415 const struct GNUNET_HashCode *vhash,
463 enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc, 416 enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
@@ -466,25 +419,14 @@ heap_plugin_get_key (void *cls, uint64_t offset,
466 struct Plugin *plugin = cls; 419 struct Plugin *plugin = cls;
467 struct GetContext gc; 420 struct GetContext gc;
468 421
469 gc.plugin = plugin; 422 gc.value = NULL;
470 gc.offset = 0; 423 gc.next_uid = next_uid;
424 gc.random = random;
471 gc.vhash = vhash; 425 gc.vhash = vhash;
472 gc.type = type; 426 gc.type = type;
473 gc.proc = proc;
474 gc.proc_cls = proc_cls;
475 if (NULL == key) 427 if (NULL == key)
476 { 428 {
477 GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue, 429 GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
478 &count_iterator,
479 &gc);
480 if (0 == gc.offset)
481 {
482 proc (proc_cls,
483 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
484 return;
485 }
486 gc.offset = offset % gc.offset;
487 GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
488 &get_iterator, 430 &get_iterator,
489 &gc); 431 &gc);
490 } 432 }
@@ -492,20 +434,27 @@ heap_plugin_get_key (void *cls, uint64_t offset,
492 { 434 {
493 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue, 435 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
494 key, 436 key,
495 &count_iterator,
496 &gc);
497 if (0 == gc.offset)
498 {
499 proc (proc_cls,
500 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
501 return;
502 }
503 gc.offset = offset % gc.offset;
504 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
505 key,
506 &get_iterator, 437 &get_iterator,
507 &gc); 438 &gc);
508 } 439 }
440 if (NULL == gc.value)
441 {
442 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
443 return;
444 }
445 if (GNUNET_NO ==
446 proc (proc_cls,
447 &gc.value->key,
448 gc.value->size,
449 &gc.value[1],
450 gc.value->type,
451 gc.value->priority,
452 gc.value->anonymity,
453 gc.value->expiration,
454 (uint64_t) (intptr_t) gc.value))
455 {
456 delete_value (plugin, gc.value);
457 }
509} 458}
510 459
511 460
@@ -559,7 +508,7 @@ heap_plugin_get_replication (void *cls,
559 value->priority, 508 value->priority,
560 value->anonymity, 509 value->anonymity,
561 value->expiration, 510 value->expiration,
562 (uint64_t) (long) value)) 511 (uint64_t) (intptr_t) value))
563 delete_value (plugin, value); 512 delete_value (plugin, value);
564} 513}
565 514
@@ -595,7 +544,7 @@ heap_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
595 value->priority, 544 value->priority,
596 value->anonymity, 545 value->anonymity,
597 value->expiration, 546 value->expiration,
598 (uint64_t) (long) value)) 547 (uint64_t) (intptr_t) value))
599 delete_value (plugin, value); 548 delete_value (plugin, value);
600} 549}
601 550
@@ -628,7 +577,7 @@ heap_plugin_update (void *cls,
628{ 577{
629 struct Value *value; 578 struct Value *value;
630 579
631 value = (struct Value*) (long) uid; 580 value = (struct Value*) (intptr_t) uid;
632 GNUNET_assert (NULL != value); 581 GNUNET_assert (NULL != value);
633 if (value->expiration.abs_value_us != expire.abs_value_us) 582 if (value->expiration.abs_value_us != expire.abs_value_us)
634 { 583 {
@@ -649,53 +598,43 @@ heap_plugin_update (void *cls,
649 * Call the given processor on an item with zero anonymity. 598 * Call the given processor on an item with zero anonymity.
650 * 599 *
651 * @param cls our "struct Plugin*" 600 * @param cls our "struct Plugin*"
652 * @param offset offset of the result (modulo num-results); 601 * @param next_uid return the result with lowest uid >= next_uid
653 * specific ordering does not matter for the offset
654 * @param type entries of which type should be considered? 602 * @param type entries of which type should be considered?
655 * Use 0 for any type. 603 * Must not be zero (ANY).
656 * @param proc function to call on each matching value; 604 * @param proc function to call on each matching value;
657 * will be called with NULL if no value matches 605 * will be called with NULL if no value matches
658 * @param proc_cls closure for proc 606 * @param proc_cls closure for proc
659 */ 607 */
660static void 608static void
661heap_plugin_get_zero_anonymity (void *cls, uint64_t offset, 609heap_plugin_get_zero_anonymity (void *cls, uint64_t next_uid,
662 enum GNUNET_BLOCK_Type type, 610 enum GNUNET_BLOCK_Type type,
663 PluginDatumProcessor proc, void *proc_cls) 611 PluginDatumProcessor proc, void *proc_cls)
664{ 612{
665 struct Plugin *plugin = cls; 613 struct Plugin *plugin = cls;
666 struct ZeroAnonByType *zabt; 614 struct ZeroAnonByType *zabt;
667 struct Value *value; 615 struct Value *value = NULL;
668 uint64_t count;
669 616
670 count = 0;
671 for (zabt = plugin->zero_head; NULL != zabt; zabt = zabt->next) 617 for (zabt = plugin->zero_head; NULL != zabt; zabt = zabt->next)
672 { 618 {
673 if ( (type != GNUNET_BLOCK_TYPE_ANY) && 619 if ( (type != GNUNET_BLOCK_TYPE_ANY) &&
674 (type != zabt->type) ) 620 (type != zabt->type) )
675 continue; 621 continue;
676 count += zabt->array_pos; 622 for (int i = 0; i < zabt->array_pos; ++i)
623 {
624 if ( (uint64_t) (intptr_t) zabt->array[i] < next_uid)
625 continue;
626 if ( (NULL != value) &&
627 (zabt->array[i] > value) )
628 continue;
629 value = zabt->array[i];
630 }
677 } 631 }
678 if (0 == count) 632 if (NULL == value)
679 { 633 {
680 proc (proc_cls, 634 proc (proc_cls,
681 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 635 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
682 return; 636 return;
683 } 637 }
684 offset = offset % count;
685 for (zabt = plugin->zero_head; NULL != zabt; zabt = zabt->next)
686 {
687 if ( (type != GNUNET_BLOCK_TYPE_ANY) &&
688 (type != zabt->type) )
689 continue;
690 if (offset >= zabt->array_pos)
691 {
692 offset -= zabt->array_pos;
693 continue;
694 }
695 break;
696 }
697 GNUNET_assert (NULL != zabt);
698 value = zabt->array[offset];
699 if (GNUNET_NO == 638 if (GNUNET_NO ==
700 proc (proc_cls, 639 proc (proc_cls,
701 &value->key, 640 &value->key,
@@ -705,7 +644,7 @@ heap_plugin_get_zero_anonymity (void *cls, uint64_t offset,
705 value->priority, 644 value->priority,
706 value->anonymity, 645 value->anonymity,
707 value->expiration, 646 value->expiration,
708 (uint64_t) (long) value)) 647 (uint64_t) (intptr_t) value))
709 delete_value (plugin, value); 648 delete_value (plugin, value);
710} 649}
711 650
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index 1067064aa..5ae4485cb 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -150,28 +150,19 @@ struct Plugin
150#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?" 150#define DELETE_ENTRY_BY_UID "DELETE FROM gn090 WHERE uid=?"
151 struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid; 151 struct GNUNET_MYSQL_StatementHandle *delete_entry_by_uid;
152 152
153#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash) WHERE hash=?" 153#define SELECT_ENTRY "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
154 struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash; 154 struct GNUNET_MYSQL_StatementHandle *select_entry;
155 155
156#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? ORDER BY uid LIMIT 1 OFFSET ?" 156#define SELECT_ENTRY_BY_HASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash) WHERE hash=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
157 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash; 157 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash;
158 158
159#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=?" 159#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
160 struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_vhash;
161
162#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? ORDER BY uid LIMIT 1 OFFSET ?"
163 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash; 160 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_vhash;
164 161
165#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=?" 162#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
166 struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_and_type;
167
168#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_type_uid) WHERE hash=? AND type=? ORDER BY uid LIMIT 1 OFFSET ?"
169 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type; 163 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_and_type;
170 164
171#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=?" 165#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? AND uid >= ? AND (rvalue >= ? OR 0 = ?) ORDER BY uid LIMIT 1"
172 struct GNUNET_MYSQL_StatementHandle *count_entry_by_hash_vhash_and_type;
173
174#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
175 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type; 166 struct GNUNET_MYSQL_StatementHandle *select_entry_by_hash_vhash_and_type;
176 167
177#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?" 168#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
@@ -185,10 +176,8 @@ struct Plugin
185 176
186#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\ 177#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
187 "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\ 178 "FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) "\
188 "WHERE anonLevel=0 AND type=? AND "\ 179 "WHERE anonLevel=0 AND type=? AND uid >= ? "\
189 "(rvalue >= ? OR"\ 180 "ORDER BY uid LIMIT 1"
190 " NOT EXISTS (SELECT 1 FROM gn090 FORCE INDEX (idx_anonLevel_type_rvalue) WHERE anonLevel=0 AND type=? AND rvalue>=?)) "\
191 "ORDER BY rvalue ASC LIMIT 1"
192 struct GNUNET_MYSQL_StatementHandle *zero_iter; 181 struct GNUNET_MYSQL_StatementHandle *zero_iter;
193 182
194#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1" 183#define SELECT_IT_EXPIRATION "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_expire) WHERE expire < ? ORDER BY expire ASC LIMIT 1"
@@ -541,8 +530,8 @@ execute_select (struct Plugin *plugin,
541 * Get one of the results for a particular key in the datastore. 530 * Get one of the results for a particular key in the datastore.
542 * 531 *
543 * @param cls closure 532 * @param cls closure
544 * @param offset offset of the result (modulo num-results); 533 * @param next_uid return the result with lowest uid >= next_uid
545 * specific ordering does not matter for the offset 534 * @param random if true, return a random result instead of using next_uid
546 * @param key key to match, never NULL 535 * @param key key to match, never NULL
547 * @param vhash hash of the value, maybe NULL (to 536 * @param vhash hash of the value, maybe NULL (to
548 * match all values that have the right key). 537 * match all values that have the right key).
@@ -557,7 +546,8 @@ execute_select (struct Plugin *plugin,
557 */ 546 */
558static void 547static void
559mysql_plugin_get_key (void *cls, 548mysql_plugin_get_key (void *cls,
560 uint64_t offset, 549 uint64_t next_uid,
550 bool random,
561 const struct GNUNET_HashCode *key, 551 const struct GNUNET_HashCode *key,
562 const struct GNUNET_HashCode *vhash, 552 const struct GNUNET_HashCode *vhash,
563 enum GNUNET_BLOCK_Type type, 553 enum GNUNET_BLOCK_Type type,
@@ -565,121 +555,33 @@ mysql_plugin_get_key (void *cls,
565 void *proc_cls) 555 void *proc_cls)
566{ 556{
567 struct Plugin *plugin = cls; 557 struct Plugin *plugin = cls;
568 int ret; 558 uint64_t rvalue;
569 uint64_t total;
570 struct GNUNET_MY_ResultSpec results_get[] = {
571 GNUNET_MY_result_spec_uint64 (&total),
572 GNUNET_MY_result_spec_end
573 };
574 559
575 total = UINT64_MAX; 560 if (random)
576 if (0 != type)
577 { 561 {
578 if (NULL != vhash) 562 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
579 { 563 UINT64_MAX);
580 struct GNUNET_MY_QueryParam params_get[] = { 564 next_uid = 0;
581 GNUNET_MY_query_param_auto_from_type (key),
582 GNUNET_MY_query_param_auto_from_type (vhash),
583 GNUNET_MY_query_param_uint32 (&type),
584 GNUNET_MY_query_param_end
585 };
586
587 ret =
588 GNUNET_MY_exec_prepared (plugin->mc,
589 plugin->count_entry_by_hash_vhash_and_type,
590 params_get);
591 GNUNET_break (GNUNET_OK == ret);
592 if (GNUNET_OK == ret)
593 ret =
594 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
595 results_get);
596 if (GNUNET_OK == ret)
597 GNUNET_break (GNUNET_NO ==
598 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
599 NULL));
600 }
601 else
602 {
603 struct GNUNET_MY_QueryParam params_get[] = {
604 GNUNET_MY_query_param_auto_from_type (key),
605 GNUNET_MY_query_param_uint32 (&type),
606 GNUNET_MY_query_param_end
607 };
608
609 ret =
610 GNUNET_MY_exec_prepared (plugin->mc,
611 plugin->count_entry_by_hash_and_type,
612 params_get);
613 GNUNET_break (GNUNET_OK == ret);
614 if (GNUNET_OK == ret)
615 ret =
616 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
617 results_get);
618 if (GNUNET_OK == ret)
619 GNUNET_break (GNUNET_NO ==
620 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
621 NULL));
622 }
623 } 565 }
624 else 566 else
625 { 567 rvalue = 0;
626 if (NULL != vhash)
627 {
628 struct GNUNET_MY_QueryParam params_get[] = {
629 GNUNET_MY_query_param_auto_from_type (key),
630 GNUNET_MY_query_param_auto_from_type (vhash),
631 GNUNET_MY_query_param_end
632 };
633 568
634 ret = 569 if (NULL == key)
635 GNUNET_MY_exec_prepared (plugin->mc,
636 plugin->count_entry_by_hash_and_vhash,
637 params_get);
638 GNUNET_break (GNUNET_OK == ret);
639 if (GNUNET_OK == ret)
640 ret =
641 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
642 results_get);
643 if (GNUNET_OK == ret)
644 GNUNET_break (GNUNET_NO ==
645 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
646 NULL));
647 }
648 else
649 {
650 struct GNUNET_MY_QueryParam params_get[] = {
651 GNUNET_MY_query_param_auto_from_type (key),
652 GNUNET_MY_query_param_end
653 };
654
655 ret =
656 GNUNET_MY_exec_prepared (plugin->mc,
657 plugin->count_entry_by_hash,
658 params_get);
659 GNUNET_break (GNUNET_OK == ret);
660 if (GNUNET_OK == ret)
661 ret =
662 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
663 results_get);
664 if (GNUNET_OK == ret)
665 GNUNET_break (GNUNET_NO ==
666 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
667 NULL));
668 }
669 }
670 if ( (GNUNET_OK != ret) ||
671 (0 >= total) )
672 { 570 {
673 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 571 struct GNUNET_MY_QueryParam params_select[] = {
674 return; 572 GNUNET_MY_query_param_uint64 (&next_uid),
573 GNUNET_MY_query_param_uint64 (&rvalue),
574 GNUNET_MY_query_param_uint64 (&rvalue),
575 GNUNET_MY_query_param_end
576 };
577
578 execute_select (plugin,
579 plugin->select_entry,
580 proc,
581 proc_cls,
582 params_select);
675 } 583 }
676 offset = offset % total; 584 else if (type != GNUNET_BLOCK_TYPE_ANY)
677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678 "Obtaining %llu/%lld result for GET `%s'\n",
679 (unsigned long long) offset,
680 (unsigned long long) total,
681 GNUNET_h2s (key));
682 if (type != GNUNET_BLOCK_TYPE_ANY)
683 { 585 {
684 if (NULL != vhash) 586 if (NULL != vhash)
685 { 587 {
@@ -687,7 +589,9 @@ mysql_plugin_get_key (void *cls,
687 GNUNET_MY_query_param_auto_from_type (key), 589 GNUNET_MY_query_param_auto_from_type (key),
688 GNUNET_MY_query_param_auto_from_type (vhash), 590 GNUNET_MY_query_param_auto_from_type (vhash),
689 GNUNET_MY_query_param_uint32 (&type), 591 GNUNET_MY_query_param_uint32 (&type),
690 GNUNET_MY_query_param_uint64 (&offset), 592 GNUNET_MY_query_param_uint64 (&next_uid),
593 GNUNET_MY_query_param_uint64 (&rvalue),
594 GNUNET_MY_query_param_uint64 (&rvalue),
691 GNUNET_MY_query_param_end 595 GNUNET_MY_query_param_end
692 }; 596 };
693 597
@@ -702,7 +606,9 @@ mysql_plugin_get_key (void *cls,
702 struct GNUNET_MY_QueryParam params_select[] = { 606 struct GNUNET_MY_QueryParam params_select[] = {
703 GNUNET_MY_query_param_auto_from_type (key), 607 GNUNET_MY_query_param_auto_from_type (key),
704 GNUNET_MY_query_param_uint32 (&type), 608 GNUNET_MY_query_param_uint32 (&type),
705 GNUNET_MY_query_param_uint64 (&offset), 609 GNUNET_MY_query_param_uint64 (&next_uid),
610 GNUNET_MY_query_param_uint64 (&rvalue),
611 GNUNET_MY_query_param_uint64 (&rvalue),
706 GNUNET_MY_query_param_end 612 GNUNET_MY_query_param_end
707 }; 613 };
708 614
@@ -720,7 +626,9 @@ mysql_plugin_get_key (void *cls,
720 struct GNUNET_MY_QueryParam params_select[] = { 626 struct GNUNET_MY_QueryParam params_select[] = {
721 GNUNET_MY_query_param_auto_from_type (key), 627 GNUNET_MY_query_param_auto_from_type (key),
722 GNUNET_MY_query_param_auto_from_type (vhash), 628 GNUNET_MY_query_param_auto_from_type (vhash),
723 GNUNET_MY_query_param_uint64 (&offset), 629 GNUNET_MY_query_param_uint64 (&next_uid),
630 GNUNET_MY_query_param_uint64 (&rvalue),
631 GNUNET_MY_query_param_uint64 (&rvalue),
724 GNUNET_MY_query_param_end 632 GNUNET_MY_query_param_end
725 }; 633 };
726 634
@@ -734,7 +642,9 @@ mysql_plugin_get_key (void *cls,
734 { 642 {
735 struct GNUNET_MY_QueryParam params_select[] = { 643 struct GNUNET_MY_QueryParam params_select[] = {
736 GNUNET_MY_query_param_auto_from_type (key), 644 GNUNET_MY_query_param_auto_from_type (key),
737 GNUNET_MY_query_param_uint64 (&offset), 645 GNUNET_MY_query_param_uint64 (&next_uid),
646 GNUNET_MY_query_param_uint64 (&rvalue),
647 GNUNET_MY_query_param_uint64 (&rvalue),
738 GNUNET_MY_query_param_end 648 GNUNET_MY_query_param_end
739 }; 649 };
740 650
@@ -753,28 +663,26 @@ mysql_plugin_get_key (void *cls,
753 * Get a zero-anonymity datum from the datastore. 663 * Get a zero-anonymity datum from the datastore.
754 * 664 *
755 * @param cls our `struct Plugin *` 665 * @param cls our `struct Plugin *`
756 * @param offset offset of the result 666 * @param next_uid return the result with lowest uid >= next_uid
757 * @param type entries of which type should be considered? 667 * @param type entries of which type should be considered?
758 * Use 0 for any type. 668 * Must not be zero (ANY).
759 * @param proc function to call on a matching value or NULL 669 * @param proc function to call on a matching value;
670 * will be called with NULL if no value matches
760 * @param proc_cls closure for @a proc 671 * @param proc_cls closure for @a proc
761 */ 672 */
762static void 673static void
763mysql_plugin_get_zero_anonymity (void *cls, 674mysql_plugin_get_zero_anonymity (void *cls,
764 uint64_t offset, 675 uint64_t next_uid,
765 enum GNUNET_BLOCK_Type type, 676 enum GNUNET_BLOCK_Type type,
766 PluginDatumProcessor proc, 677 PluginDatumProcessor proc,
767 void *proc_cls) 678 void *proc_cls)
768{ 679{
769 struct Plugin *plugin = cls; 680 struct Plugin *plugin = cls;
770 uint32_t typei = (uint32_t) type; 681 uint32_t typei = (uint32_t) type;
771 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 682
772 UINT64_MAX);
773 struct GNUNET_MY_QueryParam params_zero_iter[] = { 683 struct GNUNET_MY_QueryParam params_zero_iter[] = {
774 GNUNET_MY_query_param_uint32 (&typei), 684 GNUNET_MY_query_param_uint32 (&typei),
775 GNUNET_MY_query_param_uint64 (&rvalue), 685 GNUNET_MY_query_param_uint64 (&next_uid),
776 GNUNET_MY_query_param_uint32 (&typei),
777 GNUNET_MY_query_param_uint64 (&rvalue),
778 GNUNET_MY_query_param_end 686 GNUNET_MY_query_param_end
779 }; 687 };
780 688
@@ -1209,6 +1117,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1209 ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") || 1117 ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1") ||
1210 PINIT (plugin->insert_entry, INSERT_ENTRY) || 1118 PINIT (plugin->insert_entry, INSERT_ENTRY) ||
1211 PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) || 1119 PINIT (plugin->delete_entry_by_uid, DELETE_ENTRY_BY_UID) ||
1120 PINIT (plugin->select_entry, SELECT_ENTRY) ||
1212 PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) || 1121 PINIT (plugin->select_entry_by_hash, SELECT_ENTRY_BY_HASH) ||
1213 PINIT (plugin->select_entry_by_hash_and_vhash, 1122 PINIT (plugin->select_entry_by_hash_and_vhash,
1214 SELECT_ENTRY_BY_HASH_AND_VHASH) || 1123 SELECT_ENTRY_BY_HASH_AND_VHASH) ||
@@ -1216,13 +1125,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1216 SELECT_ENTRY_BY_HASH_AND_TYPE) || 1125 SELECT_ENTRY_BY_HASH_AND_TYPE) ||
1217 PINIT (plugin->select_entry_by_hash_vhash_and_type, 1126 PINIT (plugin->select_entry_by_hash_vhash_and_type,
1218 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) || 1127 SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1219 PINIT (plugin->count_entry_by_hash, COUNT_ENTRY_BY_HASH) ||
1220 PINIT (plugin->get_size, SELECT_SIZE) || 1128 PINIT (plugin->get_size, SELECT_SIZE) ||
1221 PINIT (plugin->count_entry_by_hash_and_vhash,
1222 COUNT_ENTRY_BY_HASH_AND_VHASH) ||
1223 PINIT (plugin->count_entry_by_hash_and_type, COUNT_ENTRY_BY_HASH_AND_TYPE)
1224 || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1225 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) ||
1226 PINIT (plugin->update_entry, UPDATE_ENTRY) || 1129 PINIT (plugin->update_entry, UPDATE_ENTRY) ||
1227 PINIT (plugin->dec_repl, DEC_REPL) || 1130 PINIT (plugin->dec_repl, DEC_REPL) ||
1228 PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) || 1131 PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) ||
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 8b8737935..0376ebb6c 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -80,6 +80,7 @@ init_connection (struct Plugin *plugin)
80 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel 80 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
81 * we do math or inequality tests, so we can't handle the entire range of uint32_t. 81 * we do math or inequality tests, so we can't handle the entire range of uint32_t.
82 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC. 82 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
83 * PostgreSQL also recommends against using WITH OIDS.
83 */ 84 */
84 ret = 85 ret =
85 PQexec (plugin->dbh, 86 PQexec (plugin->dbh,
@@ -176,40 +177,18 @@ init_connection (struct Plugin *plugin)
176 } 177 }
177 PQclear (ret); 178 PQclear (ret);
178 if ((GNUNET_OK != 179 if ((GNUNET_OK !=
179 GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
180 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
181 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
182 "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
183 (GNUNET_OK !=
184 GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
185 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
186 "WHERE hash=$1 AND type=$2 "
187 "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
188 (GNUNET_OK !=
189 GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
190 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
191 "WHERE hash=$1 AND vhash=$2 "
192 "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
193 (GNUNET_OK !=
194 GNUNET_POSTGRES_prepare (plugin->dbh, "get", 180 GNUNET_POSTGRES_prepare (plugin->dbh, "get",
195 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 181 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
196 "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) || 182 "WHERE oid >= $1::bigint AND "
197 (GNUNET_OK != 183 "(rvalue >= $2 OR 0 = $3::smallint) AND "
198 GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt", 184 "(hash = $4 OR 0 = $5::smallint) AND "
199 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) || 185 "(vhash = $6 OR 0 = $7::smallint) AND "
200 (GNUNET_OK != 186 "(type = $8 OR 0 = $9::smallint) "
201 GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett", 187 "ORDER BY oid ASC LIMIT 1", 9)) ||
202 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
203 (GNUNET_OK !=
204 GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
205 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
206 (GNUNET_OK !=
207 GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
208 "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
209 (GNUNET_OK != 188 (GNUNET_OK !=
210 GNUNET_POSTGRES_prepare (plugin->dbh, "put", 189 GNUNET_POSTGRES_prepare (plugin->dbh, "put",
211 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) " 190 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
212 "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) || 191 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
213 (GNUNET_OK != 192 (GNUNET_OK !=
214 GNUNET_POSTGRES_prepare (plugin->dbh, "update", 193 GNUNET_POSTGRES_prepare (plugin->dbh, "update",
215 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END " 194 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
@@ -221,8 +200,9 @@ init_connection (struct Plugin *plugin)
221 (GNUNET_OK != 200 (GNUNET_OK !=
222 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous", 201 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
223 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 202 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
224 "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2", 203 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
225 1)) || 204 "ORDER BY oid ASC LIMIT 1",
205 2)) ||
226 (GNUNET_OK != 206 (GNUNET_OK !=
227 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order", 207 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
228 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 208 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
@@ -328,6 +308,8 @@ postgres_plugin_put (void *cls,
328 struct Plugin *plugin = cls; 308 struct Plugin *plugin = cls;
329 uint32_t utype = type; 309 uint32_t utype = type;
330 struct GNUNET_HashCode vhash; 310 struct GNUNET_HashCode vhash;
311 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
312 UINT64_MAX);
331 PGresult *ret; 313 PGresult *ret;
332 struct GNUNET_PQ_QueryParam params[] = { 314 struct GNUNET_PQ_QueryParam params[] = {
333 GNUNET_PQ_query_param_uint32 (&replication), 315 GNUNET_PQ_query_param_uint32 (&replication),
@@ -335,6 +317,7 @@ postgres_plugin_put (void *cls,
335 GNUNET_PQ_query_param_uint32 (&priority), 317 GNUNET_PQ_query_param_uint32 (&priority),
336 GNUNET_PQ_query_param_uint32 (&anonymity), 318 GNUNET_PQ_query_param_uint32 (&anonymity),
337 GNUNET_PQ_query_param_absolute_time (&expiration), 319 GNUNET_PQ_query_param_absolute_time (&expiration),
320 GNUNET_PQ_query_param_uint64 (&rvalue),
338 GNUNET_PQ_query_param_auto_from_type (key), 321 GNUNET_PQ_query_param_auto_from_type (key),
339 GNUNET_PQ_query_param_auto_from_type (&vhash), 322 GNUNET_PQ_query_param_auto_from_type (&vhash),
340 GNUNET_PQ_query_param_fixed_size (data, size), 323 GNUNET_PQ_query_param_fixed_size (data, size),
@@ -495,12 +478,11 @@ process_result (struct Plugin *plugin,
495 478
496 479
497/** 480/**
498 * Iterate over the results for a particular key 481 * Get one of the results for a particular key in the datastore.
499 * in the datastore.
500 * 482 *
501 * @param cls closure with the 'struct Plugin' 483 * @param cls closure with the 'struct Plugin'
502 * @param offset offset of the result (modulo num-results); 484 * @param next_uid return the result with lowest uid >= next_uid
503 * specific ordering does not matter for the offset 485 * @param random if true, return a random result instead of using next_uid
504 * @param key maybe NULL (to match all entries) 486 * @param key maybe NULL (to match all entries)
505 * @param vhash hash of the value, maybe NULL (to 487 * @param vhash hash of the value, maybe NULL (to
506 * match all values that have the right key). 488 * match all values that have the right key).
@@ -510,160 +492,52 @@ process_result (struct Plugin *plugin,
510 * @param type entries of which type are relevant? 492 * @param type entries of which type are relevant?
511 * Use 0 for any type. 493 * Use 0 for any type.
512 * @param proc function to call on the matching value; 494 * @param proc function to call on the matching value;
513 * will be called once with a NULL if no value matches 495 * will be called with NULL if nothing matches
514 * @param proc_cls closure for iter 496 * @param proc_cls closure for @a proc
515 */ 497 */
516static void 498static void
517postgres_plugin_get_key (void *cls, 499postgres_plugin_get_key (void *cls,
518 uint64_t offset, 500 uint64_t next_uid,
501 bool random,
519 const struct GNUNET_HashCode *key, 502 const struct GNUNET_HashCode *key,
520 const struct GNUNET_HashCode *vhash, 503 const struct GNUNET_HashCode *vhash,
521 enum GNUNET_BLOCK_Type type, 504 enum GNUNET_BLOCK_Type type,
522 PluginDatumProcessor proc, 505 PluginDatumProcessor proc,
523 void *proc_cls) 506 void *proc_cls)
524{ 507{
525 struct Plugin *plugin = cls; 508 struct Plugin *plugin = cls;
526 uint32_t utype = type; 509 uint32_t utype = type;
510 uint16_t use_rvalue = random;
511 uint16_t use_key = NULL != key;
512 uint16_t use_vhash = NULL != vhash;
513 uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
514 uint64_t rvalue;
515 struct GNUNET_PQ_QueryParam params[] = {
516 GNUNET_PQ_query_param_uint64 (&next_uid),
517 GNUNET_PQ_query_param_uint64 (&rvalue),
518 GNUNET_PQ_query_param_uint16 (&use_rvalue),
519 GNUNET_PQ_query_param_auto_from_type (key),
520 GNUNET_PQ_query_param_uint16 (&use_key),
521 GNUNET_PQ_query_param_auto_from_type (vhash),
522 GNUNET_PQ_query_param_uint16 (&use_vhash),
523 GNUNET_PQ_query_param_uint32 (&utype),
524 GNUNET_PQ_query_param_uint16 (&use_type),
525 GNUNET_PQ_query_param_end
526 };
527 PGresult *ret; 527 PGresult *ret;
528 uint64_t total;
529 uint64_t limit_off;
530 528
531 if (0 != type) 529 if (random)
532 { 530 {
533 if (NULL != vhash) 531 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
534 { 532 UINT64_MAX);
535 struct GNUNET_PQ_QueryParam params[] = { 533 next_uid = 0;
536 GNUNET_PQ_query_param_auto_from_type (key),
537 GNUNET_PQ_query_param_auto_from_type (vhash),
538 GNUNET_PQ_query_param_uint32 (&utype),
539 GNUNET_PQ_query_param_end
540 };
541 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
542 "count_getvt",
543 params);
544 }
545 else
546 {
547 struct GNUNET_PQ_QueryParam params[] = {
548 GNUNET_PQ_query_param_auto_from_type (key),
549 GNUNET_PQ_query_param_uint32 (&utype),
550 GNUNET_PQ_query_param_end
551 };
552 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
553 "count_gett",
554 params);
555 }
556 } 534 }
557 else 535 else
558 { 536 rvalue = 0;
559 if (NULL != vhash)
560 {
561 struct GNUNET_PQ_QueryParam params[] = {
562 GNUNET_PQ_query_param_auto_from_type (key),
563 GNUNET_PQ_query_param_auto_from_type (vhash),
564 GNUNET_PQ_query_param_end
565 };
566 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
567 "count_getv",
568 params);
569 }
570 else
571 {
572 struct GNUNET_PQ_QueryParam params[] = {
573 GNUNET_PQ_query_param_auto_from_type (key),
574 GNUNET_PQ_query_param_end
575 };
576 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
577 "count_get",
578 params);
579 }
580 }
581 537
582 if (GNUNET_OK != 538 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
583 GNUNET_POSTGRES_check_result (plugin->dbh, 539 "get",
584 ret, 540 params);
585 PGRES_TUPLES_OK,
586 "PQexecParams",
587 "count"))
588 {
589 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
590 GNUNET_TIME_UNIT_ZERO_ABS, 0);
591 return;
592 }
593 if ( (PQntuples (ret) != 1) ||
594 (PQnfields (ret) != 1) ||
595 (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
596 {
597 GNUNET_break (0);
598 PQclear (ret);
599 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
600 GNUNET_TIME_UNIT_ZERO_ABS, 0);
601 return;
602 }
603 total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
604 PQclear (ret);
605 if (0 == total)
606 {
607 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
608 GNUNET_TIME_UNIT_ZERO_ABS, 0);
609 return;
610 }
611 limit_off = offset % total;
612
613 if (0 != type)
614 {
615 if (NULL != vhash)
616 {
617 struct GNUNET_PQ_QueryParam params[] = {
618 GNUNET_PQ_query_param_auto_from_type (key),
619 GNUNET_PQ_query_param_auto_from_type (vhash),
620 GNUNET_PQ_query_param_uint32 (&utype),
621 GNUNET_PQ_query_param_uint64 (&limit_off),
622 GNUNET_PQ_query_param_end
623 };
624 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
625 "getvt",
626 params);
627 }
628 else
629 {
630 struct GNUNET_PQ_QueryParam params[] = {
631 GNUNET_PQ_query_param_auto_from_type (key),
632 GNUNET_PQ_query_param_uint32 (&utype),
633 GNUNET_PQ_query_param_uint64 (&limit_off),
634 GNUNET_PQ_query_param_end
635 };
636 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
637 "gett",
638 params);
639 }
640 }
641 else
642 {
643 if (NULL != vhash)
644 {
645 struct GNUNET_PQ_QueryParam params[] = {
646 GNUNET_PQ_query_param_auto_from_type (key),
647 GNUNET_PQ_query_param_auto_from_type (vhash),
648 GNUNET_PQ_query_param_uint64 (&limit_off),
649 GNUNET_PQ_query_param_end
650 };
651 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
652 "getv",
653 params);
654 }
655 else
656 {
657 struct GNUNET_PQ_QueryParam params[] = {
658 GNUNET_PQ_query_param_auto_from_type (key),
659 GNUNET_PQ_query_param_uint64 (&limit_off),
660 GNUNET_PQ_query_param_end
661 };
662 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
663 "get",
664 params);
665 }
666 }
667 process_result (plugin, 541 process_result (plugin,
668 proc, 542 proc,
669 proc_cls, 543 proc_cls,
@@ -677,26 +551,25 @@ postgres_plugin_get_key (void *cls,
677 * the given iterator for each of them. 551 * the given iterator for each of them.
678 * 552 *
679 * @param cls our `struct Plugin *` 553 * @param cls our `struct Plugin *`
680 * @param offset offset of the result (modulo num-results); 554 * @param next_uid return the result with lowest uid >= next_uid
681 * specific ordering does not matter for the offset
682 * @param type entries of which type should be considered? 555 * @param type entries of which type should be considered?
683 * Use 0 for any type. 556 * Must not be zero (ANY).
684 * @param proc function to call on the matching value; 557 * @param proc function to call on the matching value;
685 * will be called with a NULL if no value matches 558 * will be called with NULL if no value matches
686 * @param proc_cls closure for @a proc 559 * @param proc_cls closure for @a proc
687 */ 560 */
688static void 561static void
689postgres_plugin_get_zero_anonymity (void *cls, 562postgres_plugin_get_zero_anonymity (void *cls,
690 uint64_t offset, 563 uint64_t next_uid,
691 enum GNUNET_BLOCK_Type type, 564 enum GNUNET_BLOCK_Type type,
692 PluginDatumProcessor proc, 565 PluginDatumProcessor proc,
693 void *proc_cls) 566 void *proc_cls)
694{ 567{
695 struct Plugin *plugin = cls; 568 struct Plugin *plugin = cls;
696 uint32_t utype = type; 569 uint32_t utype = type;
697 struct GNUNET_PQ_QueryParam params[] = { 570 struct GNUNET_PQ_QueryParam params[] = {
698 GNUNET_PQ_query_param_uint32 (&utype), 571 GNUNET_PQ_query_param_uint32 (&utype),
699 GNUNET_PQ_query_param_uint64 (&offset), 572 GNUNET_PQ_query_param_uint64 (&next_uid),
700 GNUNET_PQ_query_param_end 573 GNUNET_PQ_query_param_end
701 }; 574 };
702 PGresult *ret; 575 PGresult *ret;
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c
index 9ca8f056a..76f791ad4 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -130,42 +130,7 @@ struct Plugin
130 /** 130 /**
131 * Precompiled SQL for selection 131 * Precompiled SQL for selection
132 */ 132 */
133 sqlite3_stmt *count_key; 133 sqlite3_stmt *get;
134
135 /**
136 * Precompiled SQL for selection
137 */
138 sqlite3_stmt *count_key_vhash;
139
140 /**
141 * Precompiled SQL for selection
142 */
143 sqlite3_stmt *count_key_type;
144
145 /**
146 * Precompiled SQL for selection
147 */
148 sqlite3_stmt *count_key_vhash_type;
149
150 /**
151 * Precompiled SQL for selection
152 */
153 sqlite3_stmt *get_key;
154
155 /**
156 * Precompiled SQL for selection
157 */
158 sqlite3_stmt *get_key_vhash;
159
160 /**
161 * Precompiled SQL for selection
162 */
163 sqlite3_stmt *get_key_type;
164
165 /**
166 * Precompiled SQL for selection
167 */
168 sqlite3_stmt *get_key_vhash_type;
169 134
170 /** 135 /**
171 * Should the database be dropped on shutdown? 136 * Should the database be dropped on shutdown?
@@ -430,8 +395,10 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
430#if SQLITE_VERSION_NUMBER >= 3007000 395#if SQLITE_VERSION_NUMBER >= 3007000
431 "INDEXED BY idx_anon_type_hash " 396 "INDEXED BY idx_anon_type_hash "
432#endif 397#endif
433 "WHERE (anonLevel = 0 AND type=?1) " 398 "WHERE _ROWID_ >= ? AND "
434 "ORDER BY hash DESC LIMIT 1 OFFSET ?2", 399 "anonLevel = 0 AND "
400 "type = ? "
401 "ORDER BY _ROWID_ ASC LIMIT 1",
435 &plugin->selZeroAnon)) || 402 &plugin->selZeroAnon)) ||
436 (SQLITE_OK != 403 (SQLITE_OK !=
437 sq_prepare (plugin->dbh, 404 sq_prepare (plugin->dbh,
@@ -440,44 +407,14 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
440 &plugin->insertContent)) || 407 &plugin->insertContent)) ||
441 (SQLITE_OK != 408 (SQLITE_OK !=
442 sq_prepare (plugin->dbh, 409 sq_prepare (plugin->dbh,
443 "SELECT count(*) FROM gn090 WHERE hash=?",
444 &plugin->count_key)) ||
445 (SQLITE_OK !=
446 sq_prepare (plugin->dbh,
447 "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=?",
448 &plugin->count_key_vhash)) ||
449 (SQLITE_OK !=
450 sq_prepare (plugin->dbh,
451 "SELECT count(*) FROM gn090 WHERE hash=? AND type=?",
452 &plugin->count_key_type)) ||
453 (SQLITE_OK !=
454 sq_prepare (plugin->dbh,
455 "SELECT count(*) FROM gn090 WHERE hash=? AND vhash=? AND type=?",
456 &plugin->count_key_vhash_type)) ||
457 (SQLITE_OK !=
458 sq_prepare (plugin->dbh,
459 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
460 "WHERE hash=?"
461 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
462 &plugin->get_key)) ||
463 (SQLITE_OK !=
464 sq_prepare (plugin->dbh,
465 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
466 "WHERE hash=? AND vhash=?"
467 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
468 &plugin->get_key_vhash)) ||
469 (SQLITE_OK !=
470 sq_prepare (plugin->dbh,
471 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 " 410 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 "
472 "WHERE hash=? AND type=?" 411 "WHERE _ROWID_ >= ? AND "
473 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?", 412 "(rvalue >= ? OR 0 = ?) AND "
474 &plugin->get_key_type)) || 413 "(hash = ? OR 0 = ?) AND "
475 (SQLITE_OK != 414 "(vhash = ? OR 0 = ?) AND "
476 sq_prepare (plugin->dbh, 415 "(type = ? OR 0 = ?) "
477 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ FROM gn090 " 416 "ORDER BY _ROWID_ ASC LIMIT 1",
478 "WHERE hash=? AND vhash=? AND type=?" 417 &plugin->get)) ||
479 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
480 &plugin->get_key_vhash_type)) ||
481 (SQLITE_OK != 418 (SQLITE_OK !=
482 sq_prepare (plugin->dbh, 419 sq_prepare (plugin->dbh,
483 "DELETE FROM gn090 WHERE _ROWID_ = ?", 420 "DELETE FROM gn090 WHERE _ROWID_ = ?",
@@ -523,22 +460,8 @@ database_shutdown (struct Plugin *plugin)
523 sqlite3_finalize (plugin->selZeroAnon); 460 sqlite3_finalize (plugin->selZeroAnon);
524 if (NULL != plugin->insertContent) 461 if (NULL != plugin->insertContent)
525 sqlite3_finalize (plugin->insertContent); 462 sqlite3_finalize (plugin->insertContent);
526 if (NULL != plugin->count_key) 463 if (NULL != plugin->get)
527 sqlite3_finalize (plugin->count_key); 464 sqlite3_finalize (plugin->get);
528 if (NULL != plugin->count_key_vhash)
529 sqlite3_finalize (plugin->count_key_vhash);
530 if (NULL != plugin->count_key_type)
531 sqlite3_finalize (plugin->count_key_type);
532 if (NULL != plugin->count_key_vhash_type)
533 sqlite3_finalize (plugin->count_key_vhash_type);
534 if (NULL != plugin->count_key)
535 sqlite3_finalize (plugin->get_key);
536 if (NULL != plugin->count_key_vhash)
537 sqlite3_finalize (plugin->get_key_vhash);
538 if (NULL != plugin->count_key_type)
539 sqlite3_finalize (plugin->get_key_type);
540 if (NULL != plugin->count_key_vhash_type)
541 sqlite3_finalize (plugin->get_key_vhash_type);
542 result = sqlite3_close (plugin->dbh); 465 result = sqlite3_close (plugin->dbh);
543#if SQLITE_VERSION_NUMBER >= 3007000 466#if SQLITE_VERSION_NUMBER >= 3007000
544 if (result == SQLITE_BUSY) 467 if (result == SQLITE_BUSY)
@@ -895,38 +818,36 @@ execute_get (struct Plugin *plugin,
895 * the given processor for the item. 818 * the given processor for the item.
896 * 819 *
897 * @param cls our plugin context 820 * @param cls our plugin context
898 * @param offset offset of the result (modulo num-results); 821 * @param next_uid return the result with lowest uid >= next_uid
899 * specific ordering does not matter for the offset
900 * @param type entries of which type should be considered? 822 * @param type entries of which type should be considered?
901 * Use 0 for any type. 823 * Must not be zero (ANY).
902 * @param proc function to call on each matching value; 824 * @param proc function to call on the matching value;
903 * will be called once with a NULL value at the end 825 * will be called with NULL if no value matches
904 * @param proc_cls closure for @a proc 826 * @param proc_cls closure for @a proc
905 */ 827 */
906static void 828static void
907sqlite_plugin_get_zero_anonymity (void *cls, 829sqlite_plugin_get_zero_anonymity (void *cls,
908 uint64_t offset, 830 uint64_t next_uid,
909 enum GNUNET_BLOCK_Type type, 831 enum GNUNET_BLOCK_Type type,
910 PluginDatumProcessor proc, 832 PluginDatumProcessor proc,
911 void *proc_cls) 833 void *proc_cls)
912{ 834{
913 struct Plugin *plugin = cls; 835 struct Plugin *plugin = cls;
914 struct GNUNET_SQ_QueryParam params[] = { 836 struct GNUNET_SQ_QueryParam params[] = {
837 GNUNET_SQ_query_param_uint64 (&next_uid),
915 GNUNET_SQ_query_param_uint32 (&type), 838 GNUNET_SQ_query_param_uint32 (&type),
916 GNUNET_SQ_query_param_uint64 (&offset),
917 GNUNET_SQ_query_param_end 839 GNUNET_SQ_query_param_end
918 }; 840 };
919 sqlite3_stmt *stmt = plugin->selZeroAnon;
920 841
921 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 842 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
922 if (GNUNET_OK != 843 if (GNUNET_OK !=
923 GNUNET_SQ_bind (stmt, 844 GNUNET_SQ_bind (plugin->selZeroAnon,
924 params)) 845 params))
925 { 846 {
926 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 847 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
927 return; 848 return;
928 } 849 }
929 execute_get (plugin, stmt, proc, proc_cls); 850 execute_get (plugin, plugin->selZeroAnon, proc, proc_cls);
930} 851}
931 852
932 853
@@ -934,8 +855,9 @@ sqlite_plugin_get_zero_anonymity (void *cls,
934 * Get results for a particular key in the datastore. 855 * Get results for a particular key in the datastore.
935 * 856 *
936 * @param cls closure 857 * @param cls closure
937 * @param offset offset (mod count). 858 * @param next_uid return the result with lowest uid >= next_uid
938 * @param key key to match, never NULL 859 * @param random if true, return a random result instead of using next_uid
860 * @param key maybe NULL (to match all entries)
939 * @param vhash hash of the value, maybe NULL (to 861 * @param vhash hash of the value, maybe NULL (to
940 * match all values that have the right key). 862 * match all values that have the right key).
941 * Note that for DBlocks there is no difference 863 * Note that for DBlocks there is no difference
@@ -949,7 +871,8 @@ sqlite_plugin_get_zero_anonymity (void *cls,
949 */ 871 */
950static void 872static void
951sqlite_plugin_get_key (void *cls, 873sqlite_plugin_get_key (void *cls,
952 uint64_t offset, 874 uint64_t next_uid,
875 bool random,
953 const struct GNUNET_HashCode *key, 876 const struct GNUNET_HashCode *key,
954 const struct GNUNET_HashCode *vhash, 877 const struct GNUNET_HashCode *vhash,
955 enum GNUNET_BLOCK_Type type, 878 enum GNUNET_BLOCK_Type type,
@@ -957,133 +880,45 @@ sqlite_plugin_get_key (void *cls,
957 void *proc_cls) 880 void *proc_cls)
958{ 881{
959 struct Plugin *plugin = cls; 882 struct Plugin *plugin = cls;
883 uint64_t rvalue;
884 uint16_t use_rvalue = random;
960 uint32_t type32 = (uint32_t) type; 885 uint32_t type32 = (uint32_t) type;
961 int ret; 886 uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
962 int total; 887 uint16_t use_key = NULL != key;
963 uint32_t limit_off; 888 uint16_t use_vhash = NULL != vhash;
964 struct GNUNET_SQ_QueryParam count_params_key[] = { 889 struct GNUNET_SQ_QueryParam params[] = {
965 GNUNET_SQ_query_param_auto_from_type (key), 890 GNUNET_SQ_query_param_uint64 (&next_uid),
966 GNUNET_SQ_query_param_end 891 GNUNET_SQ_query_param_uint64 (&rvalue),
967 }; 892 GNUNET_SQ_query_param_uint16 (&use_rvalue),
968 struct GNUNET_SQ_QueryParam count_params_key_vhash[] = {
969 GNUNET_SQ_query_param_auto_from_type (key),
970 GNUNET_SQ_query_param_auto_from_type (vhash),
971 GNUNET_SQ_query_param_end
972 };
973 struct GNUNET_SQ_QueryParam count_params_key_type[] = {
974 GNUNET_SQ_query_param_auto_from_type (key),
975 GNUNET_SQ_query_param_uint32 (&type32),
976 GNUNET_SQ_query_param_end
977 };
978 struct GNUNET_SQ_QueryParam count_params_key_vhash_type[] = {
979 GNUNET_SQ_query_param_auto_from_type (key),
980 GNUNET_SQ_query_param_auto_from_type (vhash),
981 GNUNET_SQ_query_param_uint32 (&type32),
982 GNUNET_SQ_query_param_end
983 };
984 struct GNUNET_SQ_QueryParam get_params_key[] = {
985 GNUNET_SQ_query_param_auto_from_type (key),
986 GNUNET_SQ_query_param_uint32 (&limit_off),
987 GNUNET_SQ_query_param_end
988 };
989 struct GNUNET_SQ_QueryParam get_params_key_vhash[] = {
990 GNUNET_SQ_query_param_auto_from_type (key),
991 GNUNET_SQ_query_param_auto_from_type (vhash),
992 GNUNET_SQ_query_param_uint32 (&limit_off),
993 GNUNET_SQ_query_param_end
994 };
995 struct GNUNET_SQ_QueryParam get_params_key_type[] = {
996 GNUNET_SQ_query_param_auto_from_type (key),
997 GNUNET_SQ_query_param_uint32 (&type32),
998 GNUNET_SQ_query_param_uint32 (&limit_off),
999 GNUNET_SQ_query_param_end
1000 };
1001 struct GNUNET_SQ_QueryParam get_params_key_vhash_type[] = {
1002 GNUNET_SQ_query_param_auto_from_type (key), 893 GNUNET_SQ_query_param_auto_from_type (key),
894 GNUNET_SQ_query_param_uint16 (&use_key),
1003 GNUNET_SQ_query_param_auto_from_type (vhash), 895 GNUNET_SQ_query_param_auto_from_type (vhash),
896 GNUNET_SQ_query_param_uint16 (&use_vhash),
1004 GNUNET_SQ_query_param_uint32 (&type32), 897 GNUNET_SQ_query_param_uint32 (&type32),
1005 GNUNET_SQ_query_param_uint32 (&limit_off), 898 GNUNET_SQ_query_param_uint16 (&use_type),
1006 GNUNET_SQ_query_param_end 899 GNUNET_SQ_query_param_end
1007 }; 900 };
1008 struct GNUNET_SQ_QueryParam *count_params;
1009 sqlite3_stmt *count_stmt;
1010 struct GNUNET_SQ_QueryParam *get_params;
1011 sqlite3_stmt *get_stmt;
1012 901
1013 if (NULL == vhash) 902 if (random)
1014 { 903 {
1015 if (GNUNET_BLOCK_TYPE_ANY == type) 904 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1016 { 905 UINT64_MAX);
1017 count_params = count_params_key; 906 next_uid = 0;
1018 count_stmt = plugin->count_key;
1019 get_params = get_params_key;
1020 get_stmt = plugin->get_key;
1021 }
1022 else
1023 {
1024 count_params = count_params_key_type;
1025 count_stmt = plugin->count_key_type;
1026 get_params = get_params_key_type;
1027 get_stmt = plugin->get_key_type;
1028 }
1029 } 907 }
1030 else 908 else
1031 { 909 rvalue = 0;
1032 if (GNUNET_BLOCK_TYPE_ANY == type) 910
1033 {
1034 count_params = count_params_key_vhash;
1035 count_stmt = plugin->count_key_vhash;
1036 get_params = get_params_key_vhash;
1037 get_stmt = plugin->get_key_vhash;
1038 }
1039 else
1040 {
1041 count_params = count_params_key_vhash_type;
1042 count_stmt = plugin->count_key_vhash_type;
1043 get_params = get_params_key_vhash_type;
1044 get_stmt = plugin->get_key_vhash_type;
1045 }
1046 }
1047 if (GNUNET_OK !=
1048 GNUNET_SQ_bind (count_stmt,
1049 count_params))
1050 {
1051 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1052 return;
1053 }
1054 ret = sqlite3_step (count_stmt);
1055 if (ret != SQLITE_ROW)
1056 {
1057 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1058 "sqlite_step");
1059 GNUNET_SQ_reset (plugin->dbh,
1060 count_stmt);
1061 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1062 return;
1063 }
1064 total = sqlite3_column_int (count_stmt,
1065 0);
1066 GNUNET_SQ_reset (plugin->dbh,
1067 count_stmt);
1068 if (0 == total)
1069 {
1070 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1071 return;
1072 }
1073 limit_off = (uint32_t) (offset % total);
1074 if (GNUNET_OK != 911 if (GNUNET_OK !=
1075 GNUNET_SQ_bind (get_stmt, 912 GNUNET_SQ_bind (plugin->get,
1076 get_params)) 913 params))
1077 { 914 {
1078 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 915 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1079 return; 916 return;
1080 } 917 }
1081 execute_get (plugin, 918 execute_get (plugin,
1082 get_stmt, 919 plugin->get,
1083 proc, 920 proc,
1084 proc_cls); 921 proc_cls);
1085 GNUNET_SQ_reset (plugin->dbh,
1086 get_stmt);
1087} 922}
1088 923
1089 924
diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c
index a1e03e8ee..187221798 100644
--- a/src/datastore/plugin_datastore_template.c
+++ b/src/datastore/plugin_datastore_template.c
@@ -89,8 +89,8 @@ template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t siz
89 * Get one of the results for a particular key in the datastore. 89 * Get one of the results for a particular key in the datastore.
90 * 90 *
91 * @param cls closure 91 * @param cls closure
92 * @param offset offset of the result (modulo num-results); 92 * @param next_uid return the result with lowest uid >= next_uid
93 * specific ordering does not matter for the offset 93 * @param random if true, return a random result instead of using next_uid
94 * @param key maybe NULL (to match all entries) 94 * @param key maybe NULL (to match all entries)
95 * @param vhash hash of the value, maybe NULL (to 95 * @param vhash hash of the value, maybe NULL (to
96 * match all values that have the right key). 96 * match all values that have the right key).
@@ -104,7 +104,7 @@ template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t siz
104 * @param proc_cls closure for proc 104 * @param proc_cls closure for proc
105 */ 105 */
106static void 106static void
107template_plugin_get_key (void *cls, uint64_t offset, 107template_plugin_get_key (void *cls, uint64_t next_uid, bool random,
108 const struct GNUNET_HashCode * key, 108 const struct GNUNET_HashCode * key,
109 const struct GNUNET_HashCode * vhash, 109 const struct GNUNET_HashCode * vhash,
110 enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc, 110 enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
@@ -185,16 +185,15 @@ template_plugin_update (void *cls, uint64_t uid, uint32_t delta,
185 * Call the given processor on an item with zero anonymity. 185 * Call the given processor on an item with zero anonymity.
186 * 186 *
187 * @param cls our "struct Plugin*" 187 * @param cls our "struct Plugin*"
188 * @param offset offset of the result (modulo num-results); 188 * @param next_uid return the result with lowest uid >= next_uid
189 * specific ordering does not matter for the offset
190 * @param type entries of which type should be considered? 189 * @param type entries of which type should be considered?
191 * Use 0 for any type. 190 * Must not be zero (ANY).
192 * @param proc function to call on each matching value; 191 * @param proc function to call on the matching value;
193 * will be called with NULL if no value matches 192 * will be called with NULL if no value matches
194 * @param proc_cls closure for proc 193 * @param proc_cls closure for proc
195 */ 194 */
196static void 195static void
197template_plugin_get_zero_anonymity (void *cls, uint64_t offset, 196template_plugin_get_zero_anonymity (void *cls, uint64_t next_uid,
198 enum GNUNET_BLOCK_Type type, 197 enum GNUNET_BLOCK_Type type,
199 PluginDatumProcessor proc, void *proc_cls) 198 PluginDatumProcessor proc, void *proc_cls)
200{ 199{
diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c
index a99668240..0da68b266 100644
--- a/src/datastore/test_datastore_api.c
+++ b/src/datastore/test_datastore_api.c
@@ -156,8 +156,6 @@ struct CpsRunContext
156 void *data; 156 void *data;
157 size_t size; 157 size_t size;
158 158
159 uint64_t uid;
160 uint64_t offset;
161 uint64_t first_uid; 159 uint64_t first_uid;
162}; 160};
163 161
@@ -267,7 +265,6 @@ check_value (void *cls,
267 GNUNET_assert (priority == get_priority (i)); 265 GNUNET_assert (priority == get_priority (i));
268 GNUNET_assert (anonymity == get_anonymity (i)); 266 GNUNET_assert (anonymity == get_anonymity (i));
269 GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us); 267 GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us);
270 crc->offset++;
271 if (crc->i == 0) 268 if (crc->i == 0)
272 { 269 {
273 crc->phase = RP_DEL; 270 crc->phase = RP_DEL;
@@ -343,7 +340,6 @@ check_multiple (void *cls,
343 case RP_GET_MULTIPLE: 340 case RP_GET_MULTIPLE:
344 crc->phase = RP_GET_MULTIPLE_NEXT; 341 crc->phase = RP_GET_MULTIPLE_NEXT;
345 crc->first_uid = uid; 342 crc->first_uid = uid;
346 crc->offset++;
347 break; 343 break;
348 case RP_GET_MULTIPLE_NEXT: 344 case RP_GET_MULTIPLE_NEXT:
349 GNUNET_assert (uid != crc->first_uid); 345 GNUNET_assert (uid != crc->first_uid);
@@ -354,8 +350,6 @@ check_multiple (void *cls,
354 crc->phase = RP_ERROR; 350 crc->phase = RP_ERROR;
355 break; 351 break;
356 } 352 }
357 if (priority == get_priority (42))
358 crc->uid = uid;
359 GNUNET_SCHEDULER_add_now (&run_continuation, crc); 353 GNUNET_SCHEDULER_add_now (&run_continuation, crc);
360} 354}
361 355
@@ -400,7 +394,8 @@ run_continuation (void *cls)
400 sizeof (int), 394 sizeof (int),
401 &crc->key); 395 &crc->key);
402 GNUNET_DATASTORE_get_key (datastore, 396 GNUNET_DATASTORE_get_key (datastore,
403 crc->offset, 397 0,
398 false,
404 &crc->key, 399 &crc->key,
405 get_type (crc->i), 400 get_type (crc->i),
406 1, 401 1,
@@ -417,7 +412,8 @@ run_continuation (void *cls)
417 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 412 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
418 GNUNET_assert (NULL != 413 GNUNET_assert (NULL !=
419 GNUNET_DATASTORE_get_key (datastore, 414 GNUNET_DATASTORE_get_key (datastore,
420 crc->offset, 415 0,
416 false,
421 &crc->key, 417 &crc->key,
422 get_type (crc->i), 418 get_type (crc->i),
423 1, 419 1,
@@ -450,9 +446,15 @@ run_continuation (void *cls)
450 crc->i); 446 crc->i);
451 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 447 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
452 GNUNET_assert (NULL != 448 GNUNET_assert (NULL !=
453 GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, 449 GNUNET_DATASTORE_get_key (datastore,
454 get_type (crc->i), 1, 1, 450 0,
455 &check_nothing, crc)); 451 false,
452 &crc->key,
453 get_type (crc->i),
454 1,
455 1,
456 &check_nothing,
457 crc));
456 break; 458 break;
457 case RP_RESERVE: 459 case RP_RESERVE:
458 crc->phase = RP_PUT_MULTIPLE; 460 crc->phase = RP_PUT_MULTIPLE;
@@ -483,19 +485,26 @@ run_continuation (void *cls)
483 case RP_GET_MULTIPLE: 485 case RP_GET_MULTIPLE:
484 GNUNET_assert (NULL != 486 GNUNET_assert (NULL !=
485 GNUNET_DATASTORE_get_key (datastore, 487 GNUNET_DATASTORE_get_key (datastore,
486 crc->offset, 488 0,
489 false,
487 &crc->key, 490 &crc->key,
488 get_type (42), 1, 1, 491 get_type (42),
489 &check_multiple, crc)); 492 1,
493 1,
494 &check_multiple,
495 crc));
490 break; 496 break;
491 case RP_GET_MULTIPLE_NEXT: 497 case RP_GET_MULTIPLE_NEXT:
492 GNUNET_assert (NULL != 498 GNUNET_assert (NULL !=
493 GNUNET_DATASTORE_get_key (datastore, 499 GNUNET_DATASTORE_get_key (datastore,
494 crc->offset, 500 crc->first_uid + 1,
501 false,
495 &crc->key, 502 &crc->key,
496 get_type (42), 503 get_type (42),
497 1, 1, 504 1,
498 &check_multiple, crc)); 505 1,
506 &check_multiple,
507 crc));
499 break; 508 break;
500 case RP_DONE: 509 case RP_DONE:
501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c
index 9a3e5446b..de4dc657f 100644
--- a/src/datastore/test_datastore_api_management.c
+++ b/src/datastore/test_datastore_api_management.c
@@ -58,7 +58,6 @@ struct CpsRunContext
58 const struct GNUNET_CONFIGURATION_Handle *cfg; 58 const struct GNUNET_CONFIGURATION_Handle *cfg;
59 void *data; 59 void *data;
60 enum RunPhase phase; 60 enum RunPhase phase;
61 uint64_t offset;
62}; 61};
63 62
64 63
@@ -159,7 +158,6 @@ check_value (void *cls, const struct GNUNET_HashCode * key, size_t size,
159 GNUNET_assert (priority == get_priority (i)); 158 GNUNET_assert (priority == get_priority (i));
160 GNUNET_assert (anonymity == get_anonymity (i)); 159 GNUNET_assert (anonymity == get_anonymity (i));
161 GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us); 160 GNUNET_assert (expiration.abs_value_us == get_expiration (i).abs_value_us);
162 crc->offset++;
163 crc->i--; 161 crc->i--;
164 if (crc->i == 0) 162 if (crc->i == 0)
165 crc->phase = RP_DONE; 163 crc->phase = RP_DONE;
@@ -221,8 +219,13 @@ run_continuation (void *cls)
221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET", 219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET",
222 crc->i); 220 crc->i);
223 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 221 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
224 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, 222 GNUNET_DATASTORE_get_key (datastore,
225 get_type (crc->i), 1, 1, 223 0,
224 false,
225 &crc->key,
226 get_type (crc->i),
227 1,
228 1,
226 &check_value, 229 &check_value,
227 crc); 230 crc);
228 break; 231 break;
@@ -230,8 +233,13 @@ run_continuation (void *cls)
230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET(f)", 233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "GET(f)",
231 crc->i); 234 crc->i);
232 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 235 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
233 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, 236 GNUNET_DATASTORE_get_key (datastore,
234 get_type (crc->i), 1, 1, 237 0,
238 false,
239 &crc->key,
240 get_type (crc->i),
241 1,
242 1,
235 &check_nothing, 243 &check_nothing,
236 crc); 244 crc);
237 break; 245 break;
diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c
index 9b85d57da..94d93aac6 100644
--- a/src/datastore/test_plugin_datastore.c
+++ b/src/datastore/test_plugin_datastore.c
@@ -64,7 +64,6 @@ struct CpsRunContext
64 enum RunPhase phase; 64 enum RunPhase phase;
65 unsigned int cnt; 65 unsigned int cnt;
66 unsigned int i; 66 unsigned int i;
67 uint64_t offset;
68}; 67};
69 68
70 69
@@ -308,7 +307,8 @@ test (void *cls)
308 "Looking for %s\n", 307 "Looking for %s\n",
309 GNUNET_h2s (&key)); 308 GNUNET_h2s (&key));
310 crc->api->get_key (crc->api->cls, 309 crc->api->get_key (crc->api->cls,
311 crc->offset++, 310 0,
311 false,
312 &key, 312 &key,
313 NULL, 313 NULL,
314 GNUNET_BLOCK_TYPE_ANY, 314 GNUNET_BLOCK_TYPE_ANY,
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h
index e85de94a7..be22ea73e 100644
--- a/src/fs/fs_api.h
+++ b/src/fs/fs_api.h
@@ -1464,21 +1464,11 @@ struct GNUNET_FS_UnindexContext
1464 struct GNUNET_CRYPTO_FileHashContext *fhc; 1464 struct GNUNET_CRYPTO_FileHashContext *fhc;
1465 1465
1466 /** 1466 /**
1467 * Which values have we seen already?
1468 */
1469 struct GNUNET_CONTAINER_MultiHashMap *seen_dh;
1470
1471 /**
1472 * Overall size of the file. 1467 * Overall size of the file.
1473 */ 1468 */
1474 uint64_t file_size; 1469 uint64_t file_size;
1475 1470
1476 /** 1471 /**
1477 * Random offset given to #GNUNET_DATASTORE_get_key.
1478 */
1479 uint64_t roff;
1480
1481 /**
1482 * When did we start? 1472 * When did we start?
1483 */ 1473 */
1484 struct GNUNET_TIME_Absolute start_time; 1474 struct GNUNET_TIME_Absolute start_time;
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c
index ad1499f00..e1c7ea535 100644
--- a/src/fs/fs_unindex.c
+++ b/src/fs/fs_unindex.c
@@ -312,8 +312,6 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc)
312 uc->fh = NULL; 312 uc->fh = NULL;
313 GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO); 313 GNUNET_DATASTORE_disconnect (uc->dsh, GNUNET_NO);
314 uc->dsh = NULL; 314 uc->dsh = NULL;
315 GNUNET_CONTAINER_multihashmap_destroy (uc->seen_dh);
316 uc->seen_dh = NULL;
317 uc->state = UNINDEX_STATE_FS_NOTIFY; 315 uc->state = UNINDEX_STATE_FS_NOTIFY;
318 GNUNET_FS_unindex_sync_ (uc); 316 GNUNET_FS_unindex_sync_ (uc);
319 uc->mq = GNUNET_CLIENT_connect (uc->h->cfg, 317 uc->mq = GNUNET_CLIENT_connect (uc->h->cfg,
@@ -444,7 +442,6 @@ continue_after_remove (void *cls,
444 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 442 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
445 _("Failed to remove UBlock: %s\n"), 443 _("Failed to remove UBlock: %s\n"),
446 msg); 444 msg);
447 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
448 uc->ksk_offset++; 445 uc->ksk_offset++;
449 GNUNET_FS_unindex_do_remove_kblocks_ (uc); 446 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
450} 447}
@@ -486,34 +483,15 @@ process_kblock_for_unindex (void *cls,
486 const struct UBlock *ub; 483 const struct UBlock *ub;
487 struct GNUNET_FS_Uri *chk_uri; 484 struct GNUNET_FS_Uri *chk_uri;
488 struct GNUNET_HashCode query; 485 struct GNUNET_HashCode query;
489 struct GNUNET_HashCode dh;
490 486
491 uc->dqe = NULL; 487 uc->dqe = NULL;
492 if (NULL == data) 488 if (NULL == data)
493 { 489 {
494 /* no result */ 490 /* no result */
495 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
496 uc->ksk_offset++; 491 uc->ksk_offset++;
497 GNUNET_FS_unindex_do_remove_kblocks_ (uc); 492 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
498 return; 493 return;
499 } 494 }
500 GNUNET_CRYPTO_hash (data,
501 size,
502 &dh);
503 if (GNUNET_YES ==
504 GNUNET_CONTAINER_multihashmap_contains (uc->seen_dh,
505 &dh))
506 {
507 GNUNET_CONTAINER_multihashmap_clear (uc->seen_dh);
508 uc->ksk_offset++;
509 GNUNET_FS_unindex_do_remove_kblocks_ (uc);
510 return;
511 }
512 GNUNET_assert (GNUNET_OK ==
513 GNUNET_CONTAINER_multihashmap_put (uc->seen_dh,
514 &dh,
515 uc,
516 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
517 GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type); 495 GNUNET_assert (GNUNET_BLOCK_TYPE_FS_UBLOCK == type);
518 if (size < sizeof (struct UBlock)) 496 if (size < sizeof (struct UBlock))
519 { 497 {
@@ -566,23 +544,24 @@ process_kblock_for_unindex (void *cls,
566 GNUNET_FS_uri_destroy (chk_uri); 544 GNUNET_FS_uri_destroy (chk_uri);
567 /* matches! */ 545 /* matches! */
568 uc->dqe = GNUNET_DATASTORE_remove (uc->dsh, 546 uc->dqe = GNUNET_DATASTORE_remove (uc->dsh,
569 key, 547 key,
570 size, 548 size,
571 data, 549 data,
572 0 /* priority */, 550 0 /* priority */,
573 1 /* queue size */, 551 1 /* queue size */,
574 &continue_after_remove, 552 &continue_after_remove,
575 uc); 553 uc);
576 return; 554 return;
577 get_next: 555 get_next:
578 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, 556 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
579 uc->roff++, 557 uid + 1 /* next_uid */,
580 &uc->uquery, 558 false /* random */,
581 GNUNET_BLOCK_TYPE_FS_UBLOCK, 559 &uc->uquery,
582 0 /* priority */, 560 GNUNET_BLOCK_TYPE_FS_UBLOCK,
561 0 /* priority */,
583 1 /* queue size */, 562 1 /* queue size */,
584 &process_kblock_for_unindex, 563 &process_kblock_for_unindex,
585 uc); 564 uc);
586} 565}
587 566
588 567
@@ -627,13 +606,14 @@ GNUNET_FS_unindex_do_remove_kblocks_ (struct GNUNET_FS_UnindexContext *uc)
627 sizeof (dpub), 606 sizeof (dpub),
628 &uc->uquery); 607 &uc->uquery);
629 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh, 608 uc->dqe = GNUNET_DATASTORE_get_key (uc->dsh,
630 uc->roff++, 609 0 /* next_uid */,
631 &uc->uquery, 610 false /* random */,
632 GNUNET_BLOCK_TYPE_FS_UBLOCK, 611 &uc->uquery,
633 0 /* priority */, 612 GNUNET_BLOCK_TYPE_FS_UBLOCK,
613 0 /* priority */,
634 1 /* queue size */, 614 1 /* queue size */,
635 &process_kblock_for_unindex, 615 &process_kblock_for_unindex,
636 uc); 616 uc);
637} 617}
638 618
639 619
@@ -826,8 +806,6 @@ GNUNET_FS_unindex_start (struct GNUNET_FS_Handle *h,
826 uc->start_time = GNUNET_TIME_absolute_get (); 806 uc->start_time = GNUNET_TIME_absolute_get ();
827 uc->file_size = size; 807 uc->file_size = size;
828 uc->client_info = cctx; 808 uc->client_info = cctx;
829 uc->seen_dh = GNUNET_CONTAINER_multihashmap_create (4,
830 GNUNET_NO);
831 GNUNET_FS_unindex_sync_ (uc); 809 GNUNET_FS_unindex_sync_ (uc);
832 pi.status = GNUNET_FS_STATUS_UNINDEX_START; 810 pi.status = GNUNET_FS_STATUS_UNINDEX_START;
833 pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL; 811 pi.value.unindex.eta = GNUNET_TIME_UNIT_FOREVER_REL;
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
index b1a098175..f8619b812 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -345,12 +345,13 @@ handle_request (void *cls,
345 GNUNET_NO); 345 GNUNET_NO);
346 refresh_timeout_task (sc); 346 refresh_timeout_task (sc);
347 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 347 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
348 0, 348 0 /* next_uid */,
349 &sqm->query, 349 false /* random */,
350 ntohl (sqm->type), 350 &sqm->query,
351 0 /* priority */, 351 ntohl (sqm->type),
352 GSF_datastore_queue_size, 352 0 /* priority */,
353 &handle_datastore_reply, 353 GSF_datastore_queue_size,
354 &handle_datastore_reply,
354 sc); 355 sc);
355 if (NULL == sc->qe) 356 if (NULL == sc->qe)
356 { 357 {
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index b0fda24b5..b736b49c2 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -160,20 +160,27 @@ struct GSF_PendingRequest
160 struct GNUNET_SCHEDULER_Task * warn_task; 160 struct GNUNET_SCHEDULER_Task * warn_task;
161 161
162 /** 162 /**
163 * Current offset for querying our local datastore for results. 163 * Do we have a first UID yet?
164 * Starts at a random value, incremented until we get the same 164 */
165 * UID again (detected using 'first_uid'), which is then used 165 bool have_first_uid;
166 * to termiante the iteration. 166
167 /**
168 * Have we seen a NULL result yet?
167 */ 169 */
168 uint64_t local_result_offset; 170 bool seen_null;
169 171
170 /** 172 /**
171 * Unique ID of the first result from the local datastore; 173 * Unique ID of the first result from the local datastore;
172 * used to detect wrap-around of the offset. 174 * used to terminate the loop.
173 */ 175 */
174 uint64_t first_uid; 176 uint64_t first_uid;
175 177
176 /** 178 /**
179 * Result count.
180 */
181 size_t result_count;
182
183 /**
177 * How often have we retried this request via 'cadet'? 184 * How often have we retried this request via 'cadet'?
178 * (used to bound overall retries). 185 * (used to bound overall retries).
179 */ 186 */
@@ -189,11 +196,6 @@ struct GSF_PendingRequest
189 */ 196 */
190 unsigned int replies_seen_size; 197 unsigned int replies_seen_size;
191 198
192 /**
193 * Do we have a first UID yet?
194 */
195 unsigned int have_first_uid;
196
197}; 199};
198 200
199 201
@@ -332,8 +334,6 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
332 if (NULL != target) 334 if (NULL != target)
333 extra += sizeof (struct GNUNET_PeerIdentity); 335 extra += sizeof (struct GNUNET_PeerIdentity);
334 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra); 336 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
335 pr->local_result_offset =
336 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
337 pr->public_data.query = *query; 337 pr->public_data.query = *query;
338 eptr = (struct GNUNET_HashCode *) &pr[1]; 338 eptr = (struct GNUNET_HashCode *) &pr[1];
339 if (NULL != target) 339 if (NULL != target)
@@ -1340,6 +1340,123 @@ odc_warn_delay_task (void *cls)
1340} 1340}
1341 1341
1342 1342
1343/* Call our continuation (if we have any) */
1344static void
1345call_continuation (struct GSF_PendingRequest *pr)
1346{
1347 GSF_LocalLookupContinuation cont = pr->llc_cont;
1348
1349 GNUNET_assert (NULL == pr->qe);
1350 if (NULL != pr->warn_task)
1351 {
1352 GNUNET_SCHEDULER_cancel (pr->warn_task);
1353 pr->warn_task = NULL;
1354 }
1355 if (NULL == cont)
1356 return; /* no continuation */
1357 pr->llc_cont = NULL;
1358 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1359 {
1360 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1361 {
1362 /* Signal that we are done and that there won't be any
1363 additional results to allow client to clean up state. */
1364 pr->rh (pr->rh_cls,
1365 GNUNET_BLOCK_EVALUATION_OK_LAST,
1366 pr,
1367 UINT32_MAX,
1368 GNUNET_TIME_UNIT_ZERO_ABS,
1369 GNUNET_TIME_UNIT_FOREVER_ABS,
1370 GNUNET_BLOCK_TYPE_ANY,
1371 NULL,
1372 0);
1373 }
1374 /* Finally, call our continuation to signal that we are
1375 done with local processing of this request; i.e. to
1376 start reading again from the client. */
1377 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1378 return;
1379 }
1380
1381 cont (pr->llc_cont_cls, pr, pr->local_result);
1382}
1383
1384
1385/* Update stats and call continuation */
1386static void
1387no_more_local_results (struct GSF_PendingRequest *pr)
1388{
1389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1390 "No further local responses available.\n");
1391#if INSANE_STATISTICS
1392 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1393 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type) )
1394 GNUNET_STATISTICS_update (GSF_stats,
1395 gettext_noop ("# requested DBLOCK or IBLOCK not found"),
1396 1,
1397 GNUNET_NO);
1398#endif
1399 call_continuation (pr);
1400}
1401
1402
1403/* forward declaration */
1404static void
1405process_local_reply (void *cls,
1406 const struct GNUNET_HashCode *key,
1407 size_t size,
1408 const void *data,
1409 enum GNUNET_BLOCK_Type type,
1410 uint32_t priority,
1411 uint32_t anonymity,
1412 struct GNUNET_TIME_Absolute expiration,
1413 uint64_t uid);
1414
1415
1416/* Start a local query */
1417static void
1418start_local_query (struct GSF_PendingRequest *pr,
1419 uint64_t next_uid,
1420 bool random)
1421{
1422 pr->qe_start = GNUNET_TIME_absolute_get ();
1423 pr->warn_task =
1424 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1425 &warn_delay_task,
1426 pr);
1427 pr->qe =
1428 GNUNET_DATASTORE_get_key (GSF_dsh,
1429 next_uid,
1430 random,
1431 &pr->public_data.query,
1432 pr->public_data.type ==
1433 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1434 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1435 (0 !=
1436 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1437 public_data.options)) ? UINT_MAX : 1
1438 /* queue priority */ ,
1439 (0 !=
1440 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1441 public_data.options)) ? UINT_MAX :
1442 GSF_datastore_queue_size
1443 /* max queue size */ ,
1444 &process_local_reply, pr);
1445 if (NULL != pr->qe)
1446 return;
1447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1448 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1449 GNUNET_h2s (&pr->public_data.query),
1450 pr->public_data.type,
1451 (unsigned long long) next_uid);
1452 GNUNET_STATISTICS_update (GSF_stats,
1453 gettext_noop ("# Datastore lookups concluded (error queueing)"),
1454 1,
1455 GNUNET_NO);
1456 call_continuation (pr);
1457}
1458
1459
1343/** 1460/**
1344 * We're processing (local) results for a search request 1461 * We're processing (local) results for a search request
1345 * from another peer. Pass applicable results to the 1462 * from another peer. Pass applicable results to the
@@ -1369,69 +1486,71 @@ process_local_reply (void *cls,
1369 uint64_t uid) 1486 uint64_t uid)
1370{ 1487{
1371 struct GSF_PendingRequest *pr = cls; 1488 struct GSF_PendingRequest *pr = cls;
1372 GSF_LocalLookupContinuation cont;
1373 struct ProcessReplyClosure prq; 1489 struct ProcessReplyClosure prq;
1374 struct GNUNET_HashCode query; 1490 struct GNUNET_HashCode query;
1375 unsigned int old_rf; 1491 unsigned int old_rf;
1376 1492
1377 GNUNET_SCHEDULER_cancel (pr->warn_task); 1493 GNUNET_SCHEDULER_cancel (pr->warn_task);
1378 pr->warn_task = NULL; 1494 pr->warn_task = NULL;
1379 if (NULL != pr->qe) 1495 if (NULL == pr->qe)
1496 goto called_from_on_demand;
1497 pr->qe = NULL;
1498 if ( (NULL == key) &&
1499 pr->seen_null &&
1500 !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1380 { 1501 {
1381 pr->qe = NULL; 1502 /* No results */
1382 if (NULL == key)
1383 {
1384#if INSANE_STATISTICS 1503#if INSANE_STATISTICS
1385 GNUNET_STATISTICS_update (GSF_stats, 1504 GNUNET_STATISTICS_update (GSF_stats,
1386 gettext_noop 1505 gettext_noop
1387 ("# Datastore lookups concluded (no results)"), 1506 ("# Datastore lookups concluded (no results)"),
1388 1, GNUNET_NO); 1507 1, GNUNET_NO);
1389#endif 1508#endif
1390 } 1509 no_more_local_results (pr);
1391 if (GNUNET_NO == pr->have_first_uid) 1510 return;
1392 { 1511 }
1393 pr->first_uid = uid; 1512 if ( ( (NULL == key) &&
1394 pr->have_first_uid = 1; 1513 pr->seen_null ) || /* We have hit the end for the 2nd time OR */
1395 } 1514 ( pr->seen_null &&
1396 else 1515 pr->have_first_uid &&
1397 { 1516 (uid >= pr->first_uid) ) ) /* We have hit the end and past first UID */
1398 if ((uid == pr->first_uid) && (key != NULL)) 1517 {
1399 { 1518 /* Seen all results */
1400 GNUNET_STATISTICS_update (GSF_stats, 1519 GNUNET_STATISTICS_update (GSF_stats,
1401 gettext_noop 1520 gettext_noop
1402 ("# Datastore lookups concluded (seen all)"), 1521 ("# Datastore lookups concluded (seen all)"),
1403 1, GNUNET_NO); 1522 1, GNUNET_NO);
1404 key = NULL; /* all replies seen! */ 1523 no_more_local_results (pr);
1405 } 1524 return;
1406 pr->have_first_uid++;
1407 if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL))
1408 {
1409 GNUNET_STATISTICS_update (GSF_stats,
1410 gettext_noop
1411 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1412 1, GNUNET_NO);
1413 key = NULL; /* all replies seen! */
1414 }
1415 }
1416 } 1525 }
1417 if (NULL == key) 1526 if (NULL == key)
1418 { 1527 {
1419 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 1528 GNUNET_assert (!pr->seen_null);
1420 "No further local responses available.\n"); 1529 pr->seen_null = true;
1421#if INSANE_STATISTICS 1530 start_local_query (pr,
1422 if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || 1531 0 /* next_uid */,
1423 (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK)) 1532 false /* random */);
1424 GNUNET_STATISTICS_update (GSF_stats, 1533 return;
1425 gettext_noop 1534 }
1426 ("# requested DBLOCK or IBLOCK not found"), 1, 1535 if (!pr->have_first_uid)
1427 GNUNET_NO); 1536 {
1428#endif 1537 pr->first_uid = uid;
1429 goto check_error_and_continue; 1538 pr->have_first_uid = true;
1539 }
1540 pr->result_count++;
1541 if (pr->result_count > MAX_RESULTS)
1542 {
1543 GNUNET_STATISTICS_update (GSF_stats,
1544 gettext_noop
1545 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1546 1, GNUNET_NO);
1547 no_more_local_results (pr);
1548 return;
1430 } 1549 }
1431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432 "Received reply for `%s' of type %d with UID %llu from datastore.\n", 1551 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1433 GNUNET_h2s (key), type, (unsigned long long) uid); 1552 GNUNET_h2s (key), type, (unsigned long long) uid);
1434 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1553 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1435 { 1554 {
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437 "Found ONDEMAND block, performing on-demand encoding\n"); 1556 "Found ONDEMAND block, performing on-demand encoding\n");
@@ -1458,33 +1577,12 @@ process_local_reply (void *cls,
1458 gettext_noop ("# on-demand lookups failed"), 1, 1577 gettext_noop ("# on-demand lookups failed"), 1,
1459 GNUNET_NO); 1578 GNUNET_NO);
1460 GNUNET_SCHEDULER_cancel (pr->warn_task); 1579 GNUNET_SCHEDULER_cancel (pr->warn_task);
1461 pr->warn_task = 1580 start_local_query (pr,
1462 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1581 uid + 1 /* next_uid */,
1463 &warn_delay_task, pr); 1582 false /* random */);
1464 pr->qe = 1583 return;
1465 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1466 &pr->public_data.query,
1467 pr->public_data.type ==
1468 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1469 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1470 (0 !=
1471 (GSF_PRO_PRIORITY_UNLIMITED &
1472 pr->public_data.options)) ? UINT_MAX : 1
1473 /* queue priority */ ,
1474 (0 !=
1475 (GSF_PRO_PRIORITY_UNLIMITED &
1476 pr->public_data.options)) ? UINT_MAX :
1477 GSF_datastore_queue_size
1478 /* max queue size */ ,
1479 &process_local_reply, pr);
1480 if (NULL != pr->qe)
1481 return; /* we're done */
1482 GNUNET_STATISTICS_update (GSF_stats,
1483 gettext_noop
1484 ("# Datastore lookups concluded (error queueing)"),
1485 1, GNUNET_NO);
1486 goto check_error_and_continue;
1487 } 1584 }
1585called_from_on_demand:
1488 old_rf = pr->public_data.results_found; 1586 old_rf = pr->public_data.results_found;
1489 memset (&prq, 0, sizeof (prq)); 1587 memset (&prq, 0, sizeof (prq));
1490 prq.data = data; 1588 prq.data = data;
@@ -1496,34 +1594,9 @@ process_local_reply (void *cls,
1496 GNUNET_break (0); 1594 GNUNET_break (0);
1497 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, 1595 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
1498 NULL, NULL); 1596 NULL, NULL);
1499 pr->qe_start = GNUNET_TIME_absolute_get (); 1597 start_local_query (pr,
1500 pr->warn_task = 1598 uid + 1 /* next_uid */,
1501 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1599 false /* random */);
1502 &warn_delay_task, pr);
1503 pr->qe =
1504 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1505 &pr->public_data.query,
1506 pr->public_data.type ==
1507 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1508 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1509 (0 !=
1510 (GSF_PRO_PRIORITY_UNLIMITED &
1511 pr->public_data.options)) ? UINT_MAX : 1
1512 /* queue priority */ ,
1513 (0 !=
1514 (GSF_PRO_PRIORITY_UNLIMITED &
1515 pr->public_data.options)) ? UINT_MAX :
1516 GSF_datastore_queue_size
1517 /* max queue size */ ,
1518 &process_local_reply, pr);
1519 if (NULL == pr->qe)
1520 {
1521 GNUNET_STATISTICS_update (GSF_stats,
1522 gettext_noop
1523 ("# Datastore lookups concluded (error queueing)"),
1524 1, GNUNET_NO);
1525 goto check_error_and_continue;
1526 }
1527 return; 1600 return;
1528 } 1601 }
1529 prq.type = type; 1602 prq.type = type;
@@ -1535,14 +1608,15 @@ process_local_reply (void *cls,
1535 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; 1608 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1536 process_reply (&prq, key, pr); 1609 process_reply (&prq, key, pr);
1537 pr->local_result = prq.eval; 1610 pr->local_result = prq.eval;
1538 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) 1611 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
1539 { 1612 {
1540 GNUNET_STATISTICS_update (GSF_stats, 1613 GNUNET_STATISTICS_update (GSF_stats,
1541 gettext_noop 1614 gettext_noop
1542 ("# Datastore lookups concluded (found last result)"), 1615 ("# Datastore lookups concluded (found last result)"),
1543 1, 1616 1,
1544 GNUNET_NO); 1617 GNUNET_NO);
1545 goto check_error_and_continue; 1618 call_continuation (pr);
1619 return;
1546 } 1620 }
1547 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1621 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1548 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1622 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1554,66 +1628,12 @@ process_local_reply (void *cls,
1554 gettext_noop ("# Datastore lookups concluded (load too high)"), 1628 gettext_noop ("# Datastore lookups concluded (load too high)"),
1555 1, 1629 1,
1556 GNUNET_NO); 1630 GNUNET_NO);
1557 goto check_error_and_continue; 1631 call_continuation (pr);
1558 }
1559 pr->qe_start = GNUNET_TIME_absolute_get ();
1560 pr->warn_task =
1561 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1562 &warn_delay_task,
1563 pr);
1564 pr->qe =
1565 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1566 &pr->public_data.query,
1567 pr->public_data.type ==
1568 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1569 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1570 (0 !=
1571 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1572 public_data.options)) ? UINT_MAX : 1
1573 /* queue priority */ ,
1574 (0 !=
1575 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1576 public_data.options)) ? UINT_MAX :
1577 GSF_datastore_queue_size
1578 /* max queue size */ ,
1579 &process_local_reply, pr);
1580 /* check if we successfully queued another datastore request;
1581 * if so, return, otherwise call our continuation (if we have
1582 * any) */
1583check_error_and_continue:
1584 if (NULL != pr->qe)
1585 return; 1632 return;
1586 if (NULL != pr->warn_task)
1587 {
1588 GNUNET_SCHEDULER_cancel (pr->warn_task);
1589 pr->warn_task = NULL;
1590 } 1633 }
1591 if (NULL == (cont = pr->llc_cont)) 1634 start_local_query (pr,
1592 return; /* no continuation */ 1635 uid + 1 /* next_uid */,
1593 pr->llc_cont = NULL; 1636 false /* random */);
1594 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1595 {
1596 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1597 {
1598 /* Signal that we are done and that there won't be any
1599 additional results to allow client to clean up state. */
1600 pr->rh (pr->rh_cls,
1601 GNUNET_BLOCK_EVALUATION_OK_LAST,
1602 pr,
1603 UINT32_MAX,
1604 GNUNET_TIME_UNIT_ZERO_ABS,
1605 GNUNET_TIME_UNIT_FOREVER_ABS,
1606 GNUNET_BLOCK_TYPE_ANY,
1607 NULL, 0);
1608 }
1609 /* Finally, call our continuation to signal that we are
1610 done with local processing of this request; i.e. to
1611 start reading again from the client. */
1612 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1613 return;
1614 }
1615
1616 cont (pr->llc_cont_cls, pr, pr->local_result);
1617} 1637}
1618 1638
1619 1639
@@ -1657,43 +1677,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1657 GNUNET_assert (NULL == pr->llc_cont); 1677 GNUNET_assert (NULL == pr->llc_cont);
1658 pr->llc_cont = cont; 1678 pr->llc_cont = cont;
1659 pr->llc_cont_cls = cont_cls; 1679 pr->llc_cont_cls = cont_cls;
1660 pr->qe_start = GNUNET_TIME_absolute_get ();
1661 pr->warn_task =
1662 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1663 &warn_delay_task,
1664 pr);
1665#if INSANE_STATISTICS 1680#if INSANE_STATISTICS
1666 GNUNET_STATISTICS_update (GSF_stats, 1681 GNUNET_STATISTICS_update (GSF_stats,
1667 gettext_noop ("# Datastore lookups initiated"), 1, 1682 gettext_noop ("# Datastore lookups initiated"), 1,
1668 GNUNET_NO); 1683 GNUNET_NO);
1669#endif 1684#endif
1670 pr->qe = 1685 start_local_query(pr,
1671 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++, 1686 0 /* next_uid */,
1672 &pr->public_data.query, 1687 true /* random */);
1673 pr->public_data.type ==
1674 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1675 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1676 (0 !=
1677 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1678 public_data.options)) ? UINT_MAX : 1
1679 /* queue priority */ ,
1680 (0 !=
1681 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1682 public_data.options)) ? UINT_MAX :
1683 GSF_datastore_queue_size
1684 /* max queue size */ ,
1685 &process_local_reply, pr);
1686 if (NULL != pr->qe)
1687 return;
1688 GNUNET_STATISTICS_update (GSF_stats,
1689 gettext_noop
1690 ("# Datastore lookups concluded (error queueing)"),
1691 1, GNUNET_NO);
1692 GNUNET_SCHEDULER_cancel (pr->warn_task);
1693 pr->warn_task = NULL;
1694 pr->llc_cont = NULL;
1695 if (NULL != cont)
1696 cont (cont_cls, pr, pr->local_result);
1697} 1688}
1698 1689
1699 1690
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index bb4cb4ecb..cd062bf2b 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -72,9 +72,14 @@ struct PutOperator
72 uint64_t zero_anonymity_count_estimate; 72 uint64_t zero_anonymity_count_estimate;
73 73
74 /** 74 /**
75 * Current offset when iterating the database. 75 * Count of results received from the database.
76 */ 76 */
77 uint64_t current_offset; 77 uint64_t result_count;
78
79 /**
80 * Next UID to request when iterating the database.
81 */
82 uint64_t next_uid;
78}; 83};
79 84
80 85
@@ -177,37 +182,43 @@ delay_dht_put_task (void *cls)
177 */ 182 */
178static void 183static void
179process_dht_put_content (void *cls, 184process_dht_put_content (void *cls,
180 const struct GNUNET_HashCode * key, 185 const struct GNUNET_HashCode * key,
181 size_t size, 186 size_t size,
182 const void *data, 187 const void *data,
183 enum GNUNET_BLOCK_Type type, 188 enum GNUNET_BLOCK_Type type,
184 uint32_t priority, uint32_t anonymity, 189 uint32_t priority,
185 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 190 uint32_t anonymity,
191 struct GNUNET_TIME_Absolute expiration,
192 uint64_t uid)
186{ 193{
187 struct PutOperator *po = cls; 194 struct PutOperator *po = cls;
188 195
189 po->dht_qe = NULL; 196 po->dht_qe = NULL;
190 if (key == NULL) 197 if (key == NULL)
191 { 198 {
192 po->zero_anonymity_count_estimate = po->current_offset - 1; 199 po->zero_anonymity_count_estimate = po->result_count;
193 po->current_offset = 0; 200 po->result_count = 0;
201 po->next_uid = 0;
194 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); 202 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
195 return; 203 return;
196 } 204 }
205 po->result_count++;
206 po->next_uid = uid + 1;
197 po->zero_anonymity_count_estimate = 207 po->zero_anonymity_count_estimate =
198 GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); 208 GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate);
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), 210 "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
201 type); 211 type);
202 po->dht_put = GNUNET_DHT_put (GSF_dht, 212 po->dht_put = GNUNET_DHT_put (GSF_dht,
203 key, 213 key,
204 DEFAULT_PUT_REPLICATION, 214 DEFAULT_PUT_REPLICATION,
205 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 215 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
206 type, 216 type,
207 size, 217 size,
208 data, 218 data,
209 expiration, 219 expiration,
210 &delay_dht_put_blocks, po); 220 &delay_dht_put_blocks,
221 po);
211} 222}
212 223
213 224
@@ -223,10 +234,13 @@ gather_dht_put_blocks (void *cls)
223 234
224 po->dht_task = NULL; 235 po->dht_task = NULL;
225 po->dht_qe = 236 po->dht_qe =
226 GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, 237 GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
238 po->next_uid,
239 0,
227 UINT_MAX, 240 UINT_MAX,
228 po->dht_put_type, 241 po->dht_put_type,
229 &process_dht_put_content, po); 242 &process_dht_put_content,
243 po);
230 if (NULL == po->dht_qe) 244 if (NULL == po->dht_qe)
231 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); 245 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
232} 246}
diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h
index 2295d4e72..b1c9cb7c3 100644
--- a/src/include/gnunet_datastore_plugin.h
+++ b/src/include/gnunet_datastore_plugin.h
@@ -204,9 +204,9 @@ typedef void
204 * Get one of the results for a particular key in the datastore. 204 * Get one of the results for a particular key in the datastore.
205 * 205 *
206 * @param cls closure 206 * @param cls closure
207 * @param offset offset of the result (modulo num-results); 207 * @param next_uid return the result with lowest uid >= next_uid
208 * specific ordering does not matter for the offset 208 * @param random if true, return a random result instead of using next_uid
209 * @param key key to match, never NULL 209 * @param key maybe NULL (to match all entries)
210 * @param vhash hash of the value, maybe NULL (to 210 * @param vhash hash of the value, maybe NULL (to
211 * match all values that have the right key). 211 * match all values that have the right key).
212 * Note that for DBlocks there is no difference 212 * Note that for DBlocks there is no difference
@@ -215,17 +215,18 @@ typedef void
215 * @param type entries of which type are relevant? 215 * @param type entries of which type are relevant?
216 * Use 0 for any type. 216 * Use 0 for any type.
217 * @param proc function to call on the matching value; 217 * @param proc function to call on the matching value;
218 * proc should be called with NULL if there is no result 218 * will be called with NULL if nothing matches
219 * @param proc_cls closure for @a proc 219 * @param proc_cls closure for @a proc
220 */ 220 */
221typedef void 221typedef void
222(*PluginGetKey) (void *cls, 222(*PluginGetKey) (void *cls,
223 uint64_t offset, 223 uint64_t next_uid,
224 const struct GNUNET_HashCode *key, 224 bool random,
225 const struct GNUNET_HashCode *vhash, 225 const struct GNUNET_HashCode *key,
226 enum GNUNET_BLOCK_Type type, 226 const struct GNUNET_HashCode *vhash,
227 PluginDatumProcessor proc, 227 enum GNUNET_BLOCK_Type type,
228 void *proc_cls); 228 PluginDatumProcessor proc,
229 void *proc_cls);
229 230
230 231
231/** 232/**
@@ -285,23 +286,22 @@ typedef void
285 286
286 287
287/** 288/**
288 * Select a single item from the datastore at the specified offset 289 * Select a single item from the datastore (among those applicable).
289 * (among those applicable).
290 * 290 *
291 * @param cls closure 291 * @param cls closure
292 * @param offset offset of the result (modulo num-results); 292 * @param next_uid return the result with lowest uid >= next_uid
293 * specific ordering does not matter for the offset
294 * @param type entries of which type should be considered? 293 * @param type entries of which type should be considered?
295 * Must not be zero (ANY). 294 * Must not be zero (ANY).
296 * @param proc function to call on the matching value 295 * @param proc function to call on the matching value;
296 * will be called with NULL if no value matches
297 * @param proc_cls closure for @a proc 297 * @param proc_cls closure for @a proc
298 */ 298 */
299typedef void 299typedef void
300(*PluginGetType) (void *cls, 300(*PluginGetType) (void *cls,
301 uint64_t offset, 301 uint64_t next_uid,
302 enum GNUNET_BLOCK_Type type, 302 enum GNUNET_BLOCK_Type type,
303 PluginDatumProcessor proc, 303 PluginDatumProcessor proc,
304 void *proc_cls); 304 void *proc_cls);
305 305
306 306
307/** 307/**
@@ -354,9 +354,6 @@ struct GNUNET_DATASTORE_PluginFunctions
354 354
355 /** 355 /**
356 * Get datum (of the specified type) with anonymity level zero. 356 * Get datum (of the specified type) with anonymity level zero.
357 * This function is allowed to ignore the 'offset' argument
358 * and instead return a random result (with zero anonymity of
359 * the correct type) if implementing an offset is expensive.
360 */ 357 */
361 PluginGetType get_zero_anonymity; 358 PluginGetType get_zero_anonymity;
362 359
diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h
index 233598667..830e7da86 100644
--- a/src/include/gnunet_datastore_service.h
+++ b/src/include/gnunet_datastore_service.h
@@ -261,10 +261,8 @@ typedef void
261 * will only be called once. 261 * will only be called once.
262 * 262 *
263 * @param h handle to the datastore 263 * @param h handle to the datastore
264 * @param offset offset of the result (modulo num-results); set to 264 * @param next_uid return the result with lowest uid >= next_uid
265 * a random 64-bit value initially; then increment by 265 * @param random if true, return a random result instead of using next_uid
266 * one each time; detect that all results have been found by uid
267 * being again the first uid ever returned.
268 * @param key maybe NULL (to match all entries) 266 * @param key maybe NULL (to match all entries)
269 * @param type desired type, 0 for any 267 * @param type desired type, 0 for any
270 * @param queue_priority ranking of this request in the priority queue 268 * @param queue_priority ranking of this request in the priority queue
@@ -278,7 +276,8 @@ typedef void
278 */ 276 */
279struct GNUNET_DATASTORE_QueueEntry * 277struct GNUNET_DATASTORE_QueueEntry *
280GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 278GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
281 uint64_t offset, 279 uint64_t next_uid,
280 bool random,
282 const struct GNUNET_HashCode *key, 281 const struct GNUNET_HashCode *key,
283 enum GNUNET_BLOCK_Type type, 282 enum GNUNET_BLOCK_Type type,
284 unsigned int queue_priority, 283 unsigned int queue_priority,
@@ -289,16 +288,9 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
289 288
290/** 289/**
291 * Get a single zero-anonymity value from the datastore. 290 * Get a single zero-anonymity value from the datastore.
292 * Note that some implementations can ignore the 'offset' and
293 * instead return a random zero-anonymity value. In that case,
294 * detecting the wrap-around based on a repeating UID is at best
295 * probabilistic.
296 * 291 *
297 * @param h handle to the datastore 292 * @param h handle to the datastore
298 * @param offset offset of the result (modulo num-results); set to 293 * @param next_uid return the result with lowest uid >= next_uid
299 * a random 64-bit value initially; then increment by
300 * one each time; detect that all results have been found by uid
301 * being again the first uid ever returned.
302 * @param queue_priority ranking of this request in the priority queue 294 * @param queue_priority ranking of this request in the priority queue
303 * @param max_queue_size at what queue size should this request be dropped 295 * @param max_queue_size at what queue size should this request be dropped
304 * (if other requests of higher priority are in the queue) 296 * (if other requests of higher priority are in the queue)
@@ -312,7 +304,7 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
312 */ 304 */
313struct GNUNET_DATASTORE_QueueEntry * 305struct GNUNET_DATASTORE_QueueEntry *
314GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 306GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
315 uint64_t offset, 307 uint64_t next_uid,
316 unsigned int queue_priority, 308 unsigned int queue_priority,
317 unsigned int max_queue_size, 309 unsigned int max_queue_size,
318 enum GNUNET_BLOCK_Type type, 310 enum GNUNET_BLOCK_Type type,
diff --git a/src/include/platform.h b/src/include/platform.h
index add58821f..6095d0258 100644
--- a/src/include/platform.h
+++ b/src/include/platform.h
@@ -110,6 +110,7 @@
110#include <stdlib.h> 110#include <stdlib.h>
111#include <stdint.h> 111#include <stdint.h>
112#include <stdarg.h> 112#include <stdarg.h>
113#include <stdbool.h>
113#include <errno.h> 114#include <errno.h>
114#include <signal.h> 115#include <signal.h>
115#include <libgen.h> 116#include <libgen.h>