aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/datastore/Makefile.am4
-rw-r--r--src/datastore/datastore_api.c51
-rw-r--r--src/datastore/gnunet-service-datastore.c89
-rw-r--r--src/datastore/perf_datastore_api.c4
-rw-r--r--src/datastore/perf_plugin_datastore.c76
-rw-r--r--src/datastore/plugin_datastore_sqlite.c706
-rw-r--r--src/datastore/plugin_datastore_template.c87
-rw-r--r--src/datastore/test_datastore_api.c68
-rw-r--r--src/datastore/test_datastore_api_management.c26
-rw-r--r--src/fs/gnunet-service-fs.c56
-rw-r--r--src/fs/gnunet-service-fs_pr.c34
-rw-r--r--src/fs/gnunet-service-fs_push.c6
-rw-r--r--src/fs/gnunet-service-fs_put.c12
-rw-r--r--src/include/gnunet_datastore_plugin.h64
-rw-r--r--src/include/gnunet_datastore_service.h88
15 files changed, 601 insertions, 770 deletions
diff --git a/src/datastore/Makefile.am b/src/datastore/Makefile.am
index 4cd80d090..ec0598819 100644
--- a/src/datastore/Makefile.am
+++ b/src/datastore/Makefile.am
@@ -37,6 +37,7 @@ gnunet_service_datastore_LDADD = \
37 $(GN_LIBINTL) 37 $(GN_LIBINTL)
38 38
39if HAVE_MYSQL 39if HAVE_MYSQL
40if HAVE_EXPERIMENTAL
40 MYSQL_PLUGIN = libgnunet_plugin_datastore_mysql.la 41 MYSQL_PLUGIN = libgnunet_plugin_datastore_mysql.la
41 MYSQL_TESTS = \ 42 MYSQL_TESTS = \
42 test_datastore_api_mysql \ 43 test_datastore_api_mysql \
@@ -44,6 +45,7 @@ if HAVE_MYSQL
44 perf_datastore_api_mysql \ 45 perf_datastore_api_mysql \
45 perf_plugin_datastore_mysql 46 perf_plugin_datastore_mysql
46endif 47endif
48endif
47if HAVE_SQLITE 49if HAVE_SQLITE
48 SQLITE_PLUGIN = libgnunet_plugin_datastore_sqlite.la 50 SQLITE_PLUGIN = libgnunet_plugin_datastore_sqlite.la
49 SQLITE_TESTS = \ 51 SQLITE_TESTS = \
@@ -53,6 +55,7 @@ if HAVE_SQLITE
53 perf_plugin_datastore_sqlite 55 perf_plugin_datastore_sqlite
54endif 56endif
55if HAVE_POSTGRES 57if HAVE_POSTGRES
58if HAVE_EXPERIMENTAL
56 POSTGRES_PLUGIN = libgnunet_plugin_datastore_postgres.la 59 POSTGRES_PLUGIN = libgnunet_plugin_datastore_postgres.la
57 POSTGRES_TESTS = \ 60 POSTGRES_TESTS = \
58 test_datastore_api_postgres \ 61 test_datastore_api_postgres \
@@ -60,6 +63,7 @@ if HAVE_POSTGRES
60 perf_datastore_api_postgres \ 63 perf_datastore_api_postgres \
61 perf_plugin_datastore_postgres 64 perf_plugin_datastore_postgres
62endif 65endif
66endif
63 67
64plugin_LTLIBRARIES = \ 68plugin_LTLIBRARIES = \
65 $(SQLITE_PLUGIN) \ 69 $(SQLITE_PLUGIN) \
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 344a70842..dde45f24f 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -305,8 +305,9 @@ transmit_drop (void *cls,
305 * @param h handle to the datastore 305 * @param h handle to the datastore
306 * @param drop set to GNUNET_YES to delete all data in datastore (!) 306 * @param drop set to GNUNET_YES to delete all data in datastore (!)
307 */ 307 */
308void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, 308void
309 int drop) 309GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
310 int drop)
310{ 311{
311 struct GNUNET_DATASTORE_QueueEntry *qe; 312 struct GNUNET_DATASTORE_QueueEntry *qe;
312 313
@@ -668,7 +669,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
668 * @param emsg error message 669 * @param emsg error message
669 */ 670 */
670static void 671static void
671drop_status_cont (void *cls, int result, const char *emsg) 672drop_status_cont (void *cls, int32_t result, const char *emsg)
672{ 673{
673 /* do nothing */ 674 /* do nothing */
674} 675}
@@ -806,7 +807,7 @@ process_status_message (void *cls,
806 */ 807 */
807struct GNUNET_DATASTORE_QueueEntry * 808struct GNUNET_DATASTORE_QueueEntry *
808GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, 809GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
809 int rid, 810 uint32_t rid,
810 const GNUNET_HashCode * key, 811 const GNUNET_HashCode * key,
811 size_t size, 812 size_t size,
812 const void *data, 813 const void *data,
@@ -959,7 +960,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
959 */ 960 */
960struct GNUNET_DATASTORE_QueueEntry * 961struct GNUNET_DATASTORE_QueueEntry *
961GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 962GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
962 int rid, 963 uint32_t rid,
963 unsigned int queue_priority, 964 unsigned int queue_priority,
964 unsigned int max_queue_size, 965 unsigned int max_queue_size,
965 struct GNUNET_TIME_Relative timeout, 966 struct GNUNET_TIME_Relative timeout,
@@ -1022,7 +1023,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1022 */ 1023 */
1023struct GNUNET_DATASTORE_QueueEntry * 1024struct GNUNET_DATASTORE_QueueEntry *
1024GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, 1025GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1025 unsigned long long uid, 1026 uint64_t uid,
1026 uint32_t priority, 1027 uint32_t priority,
1027 struct GNUNET_TIME_Absolute expiration, 1028 struct GNUNET_TIME_Absolute expiration,
1028 unsigned int queue_priority, 1029 unsigned int queue_priority,
@@ -1250,7 +1251,7 @@ process_result_message (void *cls,
1250 do_disconnect (h); 1251 do_disconnect (h);
1251 return; 1252 return;
1252 } 1253 }
1253 GNUNET_DATASTORE_get_next (h); 1254 GNUNET_DATASTORE_iterate_get_next (h);
1254 return; 1255 return;
1255 } 1256 }
1256 dm = (const struct DataMessage*) msg; 1257 dm = (const struct DataMessage*) msg;
@@ -1355,13 +1356,13 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1355 * (or rather, will already have been invoked) 1356 * (or rather, will already have been invoked)
1356 */ 1357 */
1357struct GNUNET_DATASTORE_QueueEntry * 1358struct GNUNET_DATASTORE_QueueEntry *
1358GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1359GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1359 unsigned int queue_priority, 1360 unsigned int queue_priority,
1360 unsigned int max_queue_size, 1361 unsigned int max_queue_size,
1361 struct GNUNET_TIME_Relative timeout, 1362 struct GNUNET_TIME_Relative timeout,
1362 enum GNUNET_BLOCK_Type type, 1363 enum GNUNET_BLOCK_Type type,
1363 GNUNET_DATASTORE_Iterator iter, 1364 GNUNET_DATASTORE_Iterator iter,
1364 void *iter_cls) 1365 void *iter_cls)
1365{ 1366{
1366 struct GNUNET_DATASTORE_QueueEntry *qe; 1367 struct GNUNET_DATASTORE_QueueEntry *qe;
1367 struct GetZeroAnonymityMessage *m; 1368 struct GetZeroAnonymityMessage *m;
@@ -1404,7 +1405,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1404 * in the datastore. The iterator will only be called 1405 * in the datastore. The iterator will only be called
1405 * once initially; if the first call did contain a 1406 * once initially; if the first call did contain a
1406 * result, further results can be obtained by calling 1407 * result, further results can be obtained by calling
1407 * "GNUNET_DATASTORE_get_next" with the given argument. 1408 * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
1408 * 1409 *
1409 * @param h handle to the datastore 1410 * @param h handle to the datastore
1410 * @param key maybe NULL (to match all entries) 1411 * @param key maybe NULL (to match all entries)
@@ -1421,14 +1422,14 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1421 * (or rather, will already have been invoked) 1422 * (or rather, will already have been invoked)
1422 */ 1423 */
1423struct GNUNET_DATASTORE_QueueEntry * 1424struct GNUNET_DATASTORE_QueueEntry *
1424GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, 1425GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1425 const GNUNET_HashCode * key, 1426 const GNUNET_HashCode * key,
1426 enum GNUNET_BLOCK_Type type, 1427 enum GNUNET_BLOCK_Type type,
1427 unsigned int queue_priority, 1428 unsigned int queue_priority,
1428 unsigned int max_queue_size, 1429 unsigned int max_queue_size,
1429 struct GNUNET_TIME_Relative timeout, 1430 struct GNUNET_TIME_Relative timeout,
1430 GNUNET_DATASTORE_Iterator iter, 1431 GNUNET_DATASTORE_Iterator iter,
1431 void *iter_cls) 1432 void *iter_cls)
1432{ 1433{
1433 struct GNUNET_DATASTORE_QueueEntry *qe; 1434 struct GNUNET_DATASTORE_QueueEntry *qe;
1434 struct GetMessage *gm; 1435 struct GetMessage *gm;
@@ -1482,7 +1483,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1482 * @param h handle to the datastore 1483 * @param h handle to the datastore
1483 */ 1484 */
1484void 1485void
1485GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h) 1486GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1486{ 1487{
1487 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; 1488 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1488 1489
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 2538d5ef6..1fa2bbccb 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -42,6 +42,13 @@
42 */ 42 */
43#define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) 43#define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44 44
45/**
46 * How fast are we allowed to query the database for deleting
47 * expired content? (1 item per second).
48 */
49#define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
45#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore") 52#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
46 53
47/** 54/**
@@ -348,10 +355,12 @@ expired_processor (void *cls,
348 if (expiration.abs_value > now.abs_value) 355 if (expiration.abs_value > now.abs_value)
349 { 356 {
350 /* finished processing */ 357 /* finished processing */
351 plugin->api->next_request (next_cls, GNUNET_YES); 358 expired_kill_task
359 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
360 &delete_expired,
361 NULL);
352 return GNUNET_SYSERR; 362 return GNUNET_SYSERR;
353 } 363 }
354 plugin->api->next_request (next_cls, GNUNET_NO);
355#if DEBUG_DATASTORE 364#if DEBUG_DATASTORE
356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357 "Deleting content `%s' of type %u that expired %llu ms ago\n", 366 "Deleting content `%s' of type %u that expired %llu ms ago\n",
@@ -365,7 +374,11 @@ expired_processor (void *cls,
365 GNUNET_YES); 374 GNUNET_YES);
366 GNUNET_CONTAINER_bloomfilter_remove (filter, 375 GNUNET_CONTAINER_bloomfilter_remove (filter,
367 key); 376 key);
368 return GNUNET_NO; /* delete */ 377 expired_kill_task
378 = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
379 &delete_expired,
380 NULL);
381 return GNUNET_NO;
369} 382}
370 383
371 384
@@ -383,15 +396,15 @@ delete_expired (void *cls,
383 const struct GNUNET_SCHEDULER_TaskContext *tc) 396 const struct GNUNET_SCHEDULER_TaskContext *tc)
384{ 397{
385 expired_kill_task = GNUNET_SCHEDULER_NO_TASK; 398 expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
386 plugin->api->iter_ascending_expiration (plugin->api->cls, 399 plugin->api->expiration_get (plugin->api->cls,
387 0, 400 &expired_processor,
388 &expired_processor, 401 NULL);
389 NULL);
390} 402}
391 403
392 404
393/** 405/**
394 * An iterator over a set of items stored in the datastore. 406 * An iterator over a set of items stored in the datastore
407 * that deletes until we're happy with respect to our quota.
395 * 408 *
396 * @param cls closure 409 * @param cls closure
397 * @param next_cls closure to pass to the "next" function. 410 * @param next_cls closure to pass to the "next" function.
@@ -410,31 +423,21 @@ delete_expired (void *cls,
410 * GNUNET_NO to delete the item and continue (if supported) 423 * GNUNET_NO to delete the item and continue (if supported)
411 */ 424 */
412static int 425static int
413manage (void *cls, 426quota_processor (void *cls,
414 void *next_cls, 427 void *next_cls,
415 const GNUNET_HashCode * key, 428 const GNUNET_HashCode * key,
416 uint32_t size, 429 uint32_t size,
417 const void *data, 430 const void *data,
418 enum GNUNET_BLOCK_Type type, 431 enum GNUNET_BLOCK_Type type,
419 uint32_t priority, 432 uint32_t priority,
420 uint32_t anonymity, 433 uint32_t anonymity,
421 struct GNUNET_TIME_Absolute 434 struct GNUNET_TIME_Absolute expiration,
422 expiration, 435 uint64_t uid)
423 uint64_t uid)
424{ 436{
425 unsigned long long *need = cls; 437 unsigned long long *need = cls;
426 438
427 if (NULL == key) 439 if (NULL == key)
428 { 440 return GNUNET_SYSERR;
429 GNUNET_free (need);
430 return GNUNET_SYSERR;
431 }
432 if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
433 *need = 0;
434 else
435 *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
436 plugin->api->next_request (next_cls,
437 (0 == *need) ? GNUNET_YES : GNUNET_NO);
438#if DEBUG_DATASTORE 441#if DEBUG_DATASTORE
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440 "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n", 443 "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
@@ -443,6 +446,10 @@ manage (void *cls,
443 type, 446 type,
444 *need); 447 *need);
445#endif 448#endif
449 if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
450 *need = 0;
451 else
452 *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
446 GNUNET_STATISTICS_update (stats, 453 GNUNET_STATISTICS_update (stats,
447 gettext_noop ("# bytes purged (low-priority)"), 454 gettext_noop ("# bytes purged (low-priority)"),
448 size, 455 size,
@@ -468,19 +475,22 @@ manage (void *cls,
468static void 475static void
469manage_space (unsigned long long need) 476manage_space (unsigned long long need)
470{ 477{
471 unsigned long long *n; 478 unsigned long long last;
472 479
473#if DEBUG_DATASTORE 480#if DEBUG_DATASTORE
474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475 "Asked to free up %llu bytes of cache space\n", 482 "Asked to free up %llu bytes of cache space\n",
476 need); 483 need);
477#endif 484#endif
478 n = GNUNET_malloc (sizeof(unsigned long long)); 485 last = 0;
479 *n = need; 486 while ( (need > 0) &&
480 plugin->api->iter_low_priority (plugin->api->cls, 487 (last != need) )
481 0, 488 {
482 &manage, 489 last = need;
483 n); 490 plugin->api->expiration_get (plugin->api->cls,
491 &quota_processor,
492 &need);
493 }
484} 494}
485 495
486 496
@@ -1250,10 +1260,9 @@ handle_get_random (void *cls,
1250 1, 1260 1,
1251 GNUNET_NO); 1261 GNUNET_NO);
1252 GNUNET_SERVER_client_keep (client); 1262 GNUNET_SERVER_client_keep (client);
1253 plugin->api->iter_migration_order (plugin->api->cls, 1263 plugin->api->replication_get (plugin->api->cls,
1254 GNUNET_BLOCK_TYPE_ANY, 1264 &transmit_item,
1255 &transmit_item, 1265 client);
1256 client);
1257} 1266}
1258 1267
1259/** 1268/**
diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c
index 00d91b4c7..3b89ad233 100644
--- a/src/datastore/perf_datastore_api.c
+++ b/src/datastore/perf_datastore_api.c
@@ -223,13 +223,13 @@ delete_value (void *cls,
223 stored_ops++; 223 stored_ops++;
224 if (stored_bytes < MAX_SIZE) 224 if (stored_bytes < MAX_SIZE)
225 { 225 {
226 GNUNET_DATASTORE_get_next (datastore); 226 GNUNET_DATASTORE_iterate_get_next (datastore);
227 return; 227 return;
228 } 228 }
229 crc->key = *key; 229 crc->key = *key;
230 crc->esize = size; 230 crc->esize = size;
231 memcpy (crc->data, data, size); 231 memcpy (crc->data, data, size);
232 GNUNET_DATASTORE_get_next (datastore); 232 GNUNET_DATASTORE_iterate_get_next (datastore);
233} 233}
234 234
235 235
diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c
index cb25da46b..f7216a5a6 100644
--- a/src/datastore/perf_plugin_datastore.c
+++ b/src/datastore/perf_plugin_datastore.c
@@ -62,7 +62,6 @@ enum RunPhase
62 RP_LP_GET, 62 RP_LP_GET,
63 RP_AE_GET, 63 RP_AE_GET,
64 RP_ZA_GET, 64 RP_ZA_GET,
65 RP_MO_GET,
66 RP_AN_GET 65 RP_AN_GET
67 }; 66 };
68 67
@@ -183,8 +182,9 @@ iterateDummy (void *cls,
183 else 182 else
184 crc->phase = RP_PUT; 183 crc->phase = RP_PUT;
185 } 184 }
186 GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK, 185 crc->cnt = 0;
187 &test, crc); 186 crc->start = GNUNET_TIME_absolute_get ();
187 GNUNET_SCHEDULER_add_now (&test, crc);
188 return GNUNET_OK; 188 return GNUNET_OK;
189 } 189 }
190#if VERBOSE 190#if VERBOSE
@@ -200,6 +200,37 @@ iterateDummy (void *cls,
200 200
201 201
202 202
203static int
204dummy_get (void *cls,
205 void *next_cls,
206 const GNUNET_HashCode * key,
207 uint32_t size,
208 const void *data,
209 enum GNUNET_BLOCK_Type type,
210 uint32_t priority,
211 uint32_t anonymity,
212 struct GNUNET_TIME_Absolute
213 expiration,
214 uint64_t uid)
215{
216 struct CpsRunContext *crc = cls;
217
218 crc->cnt++;
219 if (1000 == crc->cnt)
220 {
221 crc->end = GNUNET_TIME_absolute_get();
222 printf (crc->msg,
223 crc->i,
224 (unsigned long long) (crc->end.abs_value - crc->start.abs_value),
225 crc->cnt);
226 crc->phase++;
227 crc->cnt = 0;
228 crc->start = GNUNET_TIME_absolute_get ();
229 }
230 GNUNET_SCHEDULER_add_now (&test, crc);
231 return GNUNET_OK;
232}
233
203/** 234/**
204 * Function called when the service shuts 235 * Function called when the service shuts
205 * down. Unloads our datastore plugin. 236 * down. Unloads our datastore plugin.
@@ -265,46 +296,31 @@ test (void *cls,
265 (unsigned long long) (crc->end.abs_value - crc->start.abs_value), 296 (unsigned long long) (crc->end.abs_value - crc->start.abs_value),
266 (unsigned int) PUT_10); 297 (unsigned int) PUT_10);
267 crc->i++; 298 crc->i++;
299 crc->start = GNUNET_TIME_absolute_get ();
268 crc->phase = RP_LP_GET; 300 crc->phase = RP_LP_GET;
269 GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK, 301 GNUNET_SCHEDULER_add_after (GNUNET_SCHEDULER_NO_TASK,
270 &test, crc); 302 &test, crc);
271 break; 303 break;
272 case RP_LP_GET: 304 case RP_LP_GET:
273 crc->cnt = 0; 305 crc->msg = "%3u replication iteration took %20llums for %u\n";
274 crc->start = GNUNET_TIME_absolute_get (); 306 crc->api->replication_get (crc->api->cls,
275 crc->msg = "%3u low priority iteration took %20llums for %u\n"; 307 &dummy_get,
276 crc->api->iter_low_priority (crc->api->cls, 0, 308 crc);
277 &iterateDummy,
278 crc);
279 break; 309 break;
280 case RP_AE_GET: 310 case RP_AE_GET:
281 crc->cnt = 0; 311 crc->msg = "%3u expiration iteration took %20llums for %u\n";
282 crc->start = GNUNET_TIME_absolute_get (); 312 crc->api->expiration_get (crc->api->cls,
283 crc->msg = "%3u ascending expiration iteration took %20llums for %u\n"; 313 &dummy_get,
284 crc->api->iter_ascending_expiration (crc->api->cls, 0, 314 crc);
285 &iterateDummy,
286 crc);
287 break; 315 break;
288 case RP_ZA_GET: 316 case RP_ZA_GET:
289 crc->cnt = 0; 317 crc->msg = "%3u zero anonymity iteration took %20llums for %u\n";
290 crc->start = GNUNET_TIME_absolute_get ();
291 crc->msg = "%3u zero anonymity iteration took %20llums for %u\n";
292 crc->api->iter_zero_anonymity (crc->api->cls, 0, 318 crc->api->iter_zero_anonymity (crc->api->cls, 0,
293 &iterateDummy, 319 &iterateDummy,
294 crc); 320 crc);
295 break; 321 break;
296 case RP_MO_GET:
297 crc->cnt = 0;
298 crc->start = GNUNET_TIME_absolute_get ();
299 crc->msg = "%3u migration order iteration took %20llums for %u\n";
300 crc->api->iter_migration_order (crc->api->cls, 0,
301 &iterateDummy,
302 crc);
303 break;
304 case RP_AN_GET: 322 case RP_AN_GET:
305 crc->cnt = 0; 323 crc->msg = "%3u all now iteration took %20llums for %u\n";
306 crc->start = GNUNET_TIME_absolute_get ();
307 crc->msg = "%3u all now iteration took %20llums for %u\n";
308 crc->api->iter_all_now (crc->api->cls, 0, 324 crc->api->iter_all_now (crc->api->cls, 0,
309 &iterateDummy, 325 &iterateDummy,
310 crc); 326 crc);
@@ -312,7 +328,7 @@ test (void *cls,
312 case RP_DONE: 328 case RP_DONE:
313 crc->api->drop (crc->api->cls); 329 crc->api->drop (crc->api->cls);
314 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, 330 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
315 &cleaning_task, crc); 331 &cleaning_task, crc);
316 break; 332 break;
317 } 333 }
318} 334}
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c
index b8661f46d..b05a0a9c1 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -38,43 +38,25 @@
38 */ 38 */
39#define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0) 39#define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40 40
41#define SELECT_IT_LOW_PRIORITY_1 \
42 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\
43 "ORDER BY hash ASC LIMIT 1"
44
45#define SELECT_IT_LOW_PRIORITY_2 \
46 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\
47 "ORDER BY prio ASC, hash ASC LIMIT 1"
48 41
49#define SELECT_IT_NON_ANONYMOUS_1 \ 42#define SELECT_IT_NON_ANONYMOUS_1 \
50 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\ 43 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND hash < ?2) "\
51 " ORDER BY hash DESC LIMIT 1" 44 " ORDER BY hash DESC LIMIT 1"
52 45
53#define SELECT_IT_NON_ANONYMOUS_2 \ 46#define SELECT_IT_NON_ANONYMOUS_2 \
54 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\ 47 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0)"\
55 " ORDER BY prio DESC, hash DESC LIMIT 1" 48 " ORDER BY prio DESC, hash DESC LIMIT 1"
56 49
57#define SELECT_IT_EXPIRATION_TIME_1 \
58 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\
59 " ORDER BY hash ASC LIMIT 1"
60
61#define SELECT_IT_EXPIRATION_TIME_2 \
62 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\
63 " ORDER BY expire ASC, hash ASC LIMIT 1"
64
65#define SELECT_IT_MIGRATION_ORDER_1 \
66 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\
67 " ORDER BY hash DESC LIMIT 1"
68
69#define SELECT_IT_MIGRATION_ORDER_2 \
70 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\
71 " ORDER BY expire DESC, hash DESC LIMIT 1"
72
73 50
74#define SELECT_IT_REPLICATION_ORDER \ 51#define SELECT_IT_REPLICATION_ORDER \
75 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ 52 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "\
76 " ORDER BY repl DESC, Random() LIMIT 1" 53 " ORDER BY repl DESC, Random() LIMIT 1"
77 54
55#define SELECT_IT_EXPIRATION_ORDER \
56 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "\
57 " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "\
58 " ORDER BY prio ASC LIMIT 1"
59
78 60
79/** 61/**
80 * After how many ms "busy" should a DB operation fail for good? 62 * After how many ms "busy" should a DB operation fail for good?
@@ -126,11 +108,16 @@ struct Plugin
126 sqlite3_stmt *updRepl; 108 sqlite3_stmt *updRepl;
127 109
128 /** 110 /**
129 * Precompiled SQL for replication decrement. 111 * Precompiled SQL for replication selection.
130 */ 112 */
131 sqlite3_stmt *selRepl; 113 sqlite3_stmt *selRepl;
132 114
133 /** 115 /**
116 * Precompiled SQL for expiration selection.
117 */
118 sqlite3_stmt *selExpi;
119
120 /**
134 * Precompiled SQL for insertion. 121 * Precompiled SQL for insertion.
135 */ 122 */
136 sqlite3_stmt *insertContent; 123 sqlite3_stmt *insertContent;
@@ -162,18 +149,23 @@ struct Plugin
162 * @return 0 on success 149 * @return 0 on success
163 */ 150 */
164static int 151static int
165sq_prepare (sqlite3 * dbh, const char *zSql, 152sq_prepare (sqlite3 * dbh,
153 const char *zSql,
166 sqlite3_stmt ** ppStmt) 154 sqlite3_stmt ** ppStmt)
167{ 155{
168 char *dummy; 156 char *dummy;
169 int result; 157 int result;
170 result = sqlite3_prepare_v2 (dbh, 158 result = sqlite3_prepare_v2 (dbh,
171 zSql, 159 zSql,
172 strlen (zSql), ppStmt, (const char **) &dummy); 160 strlen (zSql),
161 ppStmt,
162 (const char **) &dummy);
173#if DEBUG_SQLITE 163#if DEBUG_SQLITE
174 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 164 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
175 "sqlite", 165 "sqlite",
176 "Prepared %p: %d\n", *ppStmt, result); 166 "Prepared %p: %d\n",
167 *ppStmt,
168 result);
177#endif 169#endif
178 return result; 170 return result;
179} 171}
@@ -190,21 +182,15 @@ create_indices (sqlite3 * dbh)
190 /* create indices */ 182 /* create indices */
191 sqlite3_exec (dbh, 183 sqlite3_exec (dbh,
192 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL); 184 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
193 sqlite3_exec (dbh,
194 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
195 NULL, NULL);
196 sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL, 185 sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
197 NULL); 186 NULL);
198 sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL, 187 sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL,
199 NULL); 188 NULL);
200 sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL, 189 sqlite3_exec (dbh,
190 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
201 NULL, NULL); 191 NULL, NULL);
202 sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)", 192 sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)",
203 NULL, NULL, NULL); 193 NULL, NULL, NULL);
204 sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL,
205 NULL, NULL);
206 sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL,
207 NULL, NULL);
208} 194}
209 195
210 196
@@ -358,6 +344,9 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
358 SELECT_IT_REPLICATION_ORDER, 344 SELECT_IT_REPLICATION_ORDER,
359 &plugin->selRepl) != SQLITE_OK) || 345 &plugin->selRepl) != SQLITE_OK) ||
360 (sq_prepare (plugin->dbh, 346 (sq_prepare (plugin->dbh,
347 SELECT_IT_EXPIRATION_ORDER,
348 &plugin->selExpi) != SQLITE_OK) ||
349 (sq_prepare (plugin->dbh,
361 "INSERT INTO gn090 (repl, type, prio, " 350 "INSERT INTO gn090 (repl, type, prio, "
362 "anonLevel, expire, hash, vhash, value) VALUES " 351 "anonLevel, expire, hash, vhash, value) VALUES "
363 "(?, ?, ?, ?, ?, ?, ?, ?)", 352 "(?, ?, ?, ?, ?, ?, ?, ?)",
@@ -396,6 +385,8 @@ database_shutdown (struct Plugin *plugin)
396 sqlite3_finalize (plugin->updRepl); 385 sqlite3_finalize (plugin->updRepl);
397 if (plugin->selRepl != NULL) 386 if (plugin->selRepl != NULL)
398 sqlite3_finalize (plugin->selRepl); 387 sqlite3_finalize (plugin->selRepl);
388 if (plugin->selExpi != NULL)
389 sqlite3_finalize (plugin->selExpi);
399 if (plugin->insertContent != NULL) 390 if (plugin->insertContent != NULL)
400 sqlite3_finalize (plugin->insertContent); 391 sqlite3_finalize (plugin->insertContent);
401 result = sqlite3_close(plugin->dbh); 392 result = sqlite3_close(plugin->dbh);
@@ -457,9 +448,9 @@ delete_by_rowid (struct Plugin* plugin,
457 return GNUNET_SYSERR; 448 return GNUNET_SYSERR;
458 } 449 }
459 if (SQLITE_OK != sqlite3_reset (plugin->delRow)) 450 if (SQLITE_OK != sqlite3_reset (plugin->delRow))
460 LOG_SQLITE (plugin, NULL, 451 LOG_SQLITE (plugin, NULL,
461 GNUNET_ERROR_TYPE_ERROR | 452 GNUNET_ERROR_TYPE_ERROR |
462 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); 453 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
463 return GNUNET_OK; 454 return GNUNET_OK;
464} 455}
465 456
@@ -532,11 +523,6 @@ struct NextContext
532 GNUNET_HashCode lastKey; 523 GNUNET_HashCode lastKey;
533 524
534 /** 525 /**
535 * Expiration time of the last value visited.
536 */
537 struct GNUNET_TIME_Absolute lastExpiration;
538
539 /**
540 * Priority of the last value visited. 526 * Priority of the last value visited.
541 */ 527 */
542 unsigned int lastPriority; 528 unsigned int lastPriority;
@@ -566,15 +552,14 @@ sqlite_next_request_cont (void *cls,
566 struct NextContext * nc = cls; 552 struct NextContext * nc = cls;
567 struct Plugin *plugin; 553 struct Plugin *plugin;
568 unsigned long long rowid; 554 unsigned long long rowid;
569 sqlite3_stmt *stmtd;
570 int ret; 555 int ret;
571 unsigned int type;
572 unsigned int size; 556 unsigned int size;
573 unsigned int priority; 557 uint32_t anonymity;
574 unsigned int anonymity; 558 uint32_t priority;
575 struct GNUNET_TIME_Absolute expiration; 559 enum GNUNET_BLOCK_Type type;
576 const GNUNET_HashCode *key; 560 const GNUNET_HashCode *key;
577 const void *data; 561 struct GNUNET_TIME_Absolute expiration;
562 char data[GNUNET_SERVER_MAX_MESSAGE_SIZE];
578 563
579 plugin = nc->plugin; 564 plugin = nc->plugin;
580 plugin->next_task = GNUNET_SCHEDULER_NO_TASK; 565 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
@@ -592,90 +577,72 @@ sqlite_next_request_cont (void *cls,
592 return; 577 return;
593 } 578 }
594 579
595 rowid = sqlite3_column_int64 (nc->stmt, 6);
596 nc->last_rowid = rowid;
597 type = sqlite3_column_int (nc->stmt, 0); 580 type = sqlite3_column_int (nc->stmt, 0);
581 priority = sqlite3_column_int (nc->stmt, 1);
582 anonymity = sqlite3_column_int (nc->stmt, 2);
583 expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
584 key = sqlite3_column_blob (nc->stmt, 4);
598 size = sqlite3_column_bytes (nc->stmt, 5); 585 size = sqlite3_column_bytes (nc->stmt, 5);
586 memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
587 rowid = sqlite3_column_int64 (nc->stmt, 6);
599 if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode)) 588 if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode))
600 { 589 {
601 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 590 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
602 "sqlite", 591 "sqlite",
603 _("Invalid data in database. Trying to fix (by deletion).\n")); 592 _("Invalid data in database. Trying to fix (by deletion).\n"));
604 if (SQLITE_OK != sqlite3_reset (nc->stmt)) 593 if (SQLITE_OK != sqlite3_reset (nc->stmt))
605 LOG_SQLITE (nc->plugin, NULL, 594 LOG_SQLITE (plugin, NULL,
606 GNUNET_ERROR_TYPE_ERROR | 595 GNUNET_ERROR_TYPE_ERROR |
607 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); 596 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
608 if (sq_prepare 597 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
609 (nc->plugin->dbh, 598 plugin->env->duc (plugin->env->cls,
610 "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?", 599 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
611 &stmtd) != SQLITE_OK)
612 {
613 LOG_SQLITE (nc->plugin, NULL,
614 GNUNET_ERROR_TYPE_ERROR |
615 GNUNET_ERROR_TYPE_BULK,
616 "sq_prepare");
617 goto END;
618 }
619
620 if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
621 LOG_SQLITE (nc->plugin, NULL,
622 GNUNET_ERROR_TYPE_ERROR |
623 GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
624 if (SQLITE_DONE != sqlite3_step (stmtd))
625 LOG_SQLITE (nc->plugin, NULL,
626 GNUNET_ERROR_TYPE_ERROR |
627 GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
628 if (SQLITE_OK != sqlite3_finalize (stmtd))
629 LOG_SQLITE (nc->plugin, NULL,
630 GNUNET_ERROR_TYPE_ERROR |
631 GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
632 goto END; 600 goto END;
633 } 601 }
634
635 priority = sqlite3_column_int (nc->stmt, 1);
636 anonymity = sqlite3_column_int (nc->stmt, 2);
637 expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
638 key = sqlite3_column_blob (nc->stmt, 4);
639 nc->lastPriority = priority;
640 nc->lastExpiration = expiration;
641 memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
642 data = sqlite3_column_blob (nc->stmt, 5);
643 nc->count++; 602 nc->count++;
644 ret = nc->iter (nc->iter_cls, 603 nc->last_rowid = rowid;
645 nc, 604 nc->lastPriority = priority;
605 nc->lastKey = *key;
606 if (SQLITE_OK != sqlite3_reset (nc->stmt))
607 LOG_SQLITE (plugin, NULL,
608 GNUNET_ERROR_TYPE_ERROR |
609 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
610 ret = nc->iter (nc->iter_cls, nc,
646 key, 611 key,
647 size, 612 size, data,
648 data, 613 type, priority,
649 type, 614 anonymity, expiration,
650 priority,
651 anonymity,
652 expiration,
653 rowid); 615 rowid);
654 if (ret == GNUNET_SYSERR) 616 switch (ret)
655 { 617 {
618 case GNUNET_SYSERR:
656 nc->end_it = GNUNET_YES; 619 nc->end_it = GNUNET_YES;
657 return; 620 break;
658 } 621 case GNUNET_NO:
659#if DEBUG_SQLITE
660 if (ret == GNUNET_NO)
661 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
662 "sqlite",
663 "Asked to remove entry %llu (%u bytes)\n",
664 (unsigned long long) rowid,
665 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
666#endif
667 if ( (ret == GNUNET_NO) &&
668 (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
669 {
670 plugin->env->duc (plugin->env->cls,
671 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
672#if DEBUG_SQLITE 622#if DEBUG_SQLITE
673 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 623 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
674 "sqlite", 624 "sqlite",
675 "Removed entry %llu (%u bytes)\n", 625 "Asked to remove entry %llu (%u bytes)\n",
676 (unsigned long long) rowid, 626 (unsigned long long) rowid,
677 size + GNUNET_DATASTORE_ENTRY_OVERHEAD); 627 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
678#endif 628#endif
629 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
630 {
631 plugin->env->duc (plugin->env->cls,
632 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
633#if DEBUG_SQLITE
634 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
635 "sqlite",
636 "Removed entry %llu (%u bytes)\n",
637 (unsigned long long) rowid,
638 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
639#endif
640 }
641 break;
642 case GNUNET_YES:
643 break;
644 default:
645 GNUNET_break (0);
679 } 646 }
680} 647}
681 648
@@ -723,7 +690,7 @@ sqlite_next_request (void *next_cls,
723 */ 690 */
724static int 691static int
725sqlite_plugin_put (void *cls, 692sqlite_plugin_put (void *cls,
726 const GNUNET_HashCode * key, 693 const GNUNET_HashCode *key,
727 uint32_t size, 694 uint32_t size,
728 const void *data, 695 const void *data,
729 enum GNUNET_BLOCK_Type type, 696 enum GNUNET_BLOCK_Type type,
@@ -774,37 +741,39 @@ sqlite_plugin_put (void *cls,
774 return GNUNET_SYSERR; 741 return GNUNET_SYSERR;
775 } 742 }
776 n = sqlite3_step (stmt); 743 n = sqlite3_step (stmt);
777 if (n != SQLITE_DONE) 744 switch (n)
778 { 745 {
779 if (n == SQLITE_BUSY) 746 case SQLITE_DONE:
780 { 747 if (SQLITE_OK != sqlite3_reset (stmt))
781 LOG_SQLITE (plugin, msg, 748 LOG_SQLITE (plugin, NULL,
782 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); 749 GNUNET_ERROR_TYPE_ERROR |
783 sqlite3_reset (stmt); 750 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
784 GNUNET_break (0); 751 plugin->env->duc (plugin->env->cls,
785 return GNUNET_NO; 752 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
786 } 753#if DEBUG_SQLITE
754 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
755 "sqlite",
756 "Stored new entry (%u bytes)\n",
757 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
758#endif
759 return GNUNET_OK;
760 case SQLITE_BUSY:
761 GNUNET_break (0);
762 LOG_SQLITE (plugin, msg,
763 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
764 "sqlite3_step");
765 sqlite3_reset (stmt);
766 return GNUNET_SYSERR;
767 default:
787 LOG_SQLITE (plugin, msg, 768 LOG_SQLITE (plugin, msg,
788 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); 769 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
770 "sqlite3_step");
789 sqlite3_reset (stmt); 771 sqlite3_reset (stmt);
790 database_shutdown (plugin); 772 database_shutdown (plugin);
791 database_setup (plugin->env->cfg, 773 database_setup (plugin->env->cfg,
792 plugin); 774 plugin);
793 return GNUNET_SYSERR; 775 return GNUNET_SYSERR;
794 } 776 }
795 if (SQLITE_OK != sqlite3_reset (stmt))
796 LOG_SQLITE (plugin, NULL,
797 GNUNET_ERROR_TYPE_ERROR |
798 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
799 plugin->env->duc (plugin->env->cls,
800 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
801#if DEBUG_SQLITE
802 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
803 "sqlite",
804 "Stored new entry (%u bytes)\n",
805 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
806#endif
807 return GNUNET_OK;
808} 777}
809 778
810 779
@@ -844,21 +813,27 @@ sqlite_plugin_update (void *cls,
844 sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value); 813 sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
845 sqlite3_bind_int64 (plugin->updPrio, 3, uid); 814 sqlite3_bind_int64 (plugin->updPrio, 3, uid);
846 n = sqlite3_step (plugin->updPrio); 815 n = sqlite3_step (plugin->updPrio);
847 if (n != SQLITE_DONE) 816 sqlite3_reset (plugin->updPrio);
848 LOG_SQLITE (plugin, msg, 817 switch (n)
849 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, 818 {
850 "sqlite3_step"); 819 case SQLITE_DONE:
851#if DEBUG_SQLITE 820#if DEBUG_SQLITE
852 else 821 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
853 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 822 "sqlite",
854 "sqlite", 823 "Block updated\n");
855 "Block updated\n");
856#endif 824#endif
857 sqlite3_reset (plugin->updPrio); 825 return GNUNET_OK;
858 826 case SQLITE_BUSY:
859 if (n == SQLITE_BUSY) 827 LOG_SQLITE (plugin, msg,
860 return GNUNET_NO; 828 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
861 return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR; 829 "sqlite3_step");
830 return GNUNET_NO;
831 default:
832 LOG_SQLITE (plugin, msg,
833 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
834 "sqlite3_step");
835 return GNUNET_SYSERR;
836 }
862} 837}
863 838
864 839
@@ -878,26 +853,6 @@ struct IterContext
878 sqlite3_stmt *stmt_2; 853 sqlite3_stmt *stmt_2;
879 854
880 /** 855 /**
881 * FIXME.
882 */
883 int is_asc;
884
885 /**
886 * FIXME.
887 */
888 int is_prio;
889
890 /**
891 * FIXME.
892 */
893 int is_migr;
894
895 /**
896 * FIXME.
897 */
898 int limit_nonanonymous;
899
900 /**
901 * Desired type for blocks returned by this iterator. 856 * Desired type for blocks returned by this iterator.
902 */ 857 */
903 enum GNUNET_BLOCK_Type type; 858 enum GNUNET_BLOCK_Type type;
@@ -934,26 +889,13 @@ iter_next_prepare (void *cls,
934 sqlite3_reset (ic->stmt_1); 889 sqlite3_reset (ic->stmt_1);
935 sqlite3_reset (ic->stmt_2); 890 sqlite3_reset (ic->stmt_2);
936 plugin = nc->plugin; 891 plugin = nc->plugin;
937 if (ic->is_prio)
938 {
939#if DEBUG_SQLITE
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Restricting to results larger than the last priority %u\n",
942 nc->lastPriority);
943#endif
944 sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
945 sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
946 }
947 else
948 {
949#if DEBUG_SQLITE 892#if DEBUG_SQLITE
950 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 893 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951 "Restricting to results larger than the last expiration %llu\n", 894 "Restricting to results larger than the last priority %u\n",
952 (unsigned long long) nc->lastExpiration.abs_value); 895 nc->lastPriority);
953#endif 896#endif
954 sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value); 897 sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
955 sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value); 898 sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
956 }
957#if DEBUG_SQLITE 899#if DEBUG_SQLITE
958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 900 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959 "Restricting to results larger than the last key `%s'\n", 901 "Restricting to results larger than the last key `%s'\n",
@@ -1016,63 +958,56 @@ iter_next_prepare (void *cls,
1016 958
1017 959
1018/** 960/**
1019 * Call a method for each key in the database and 961 * Select a subset of the items in the datastore and call
1020 * call the callback method on it. 962 * the given iterator for each of them.
1021 * 963 *
1022 * @param plugin our plugin context 964 * @param cls our plugin context
1023 * @param type entries of which type should be considered? 965 * @param type entries of which type should be considered?
1024 * @param is_asc are we iterating in ascending order? 966 * Use 0 for any type.
1025 * @param is_prio are we iterating by priority (otherwise by expiration)
1026 * @param is_migr are we iterating in migration order?
1027 * @param limit_nonanonymous are we restricting results to those with anonymity
1028 * level zero?
1029 * @param stmt_str_1 first SQL statement to execute
1030 * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1031 * @param iter function to call on each matching value; 967 * @param iter function to call on each matching value;
1032 * will be called once with a NULL value at the end 968 * will be called once with a NULL value at the end
1033 * @param iter_cls closure for iter 969 * @param iter_cls closure for iter
1034 */ 970 */
1035static void 971static void
1036basic_iter (struct Plugin *plugin, 972sqlite_plugin_iter_zero_anonymity (void *cls,
1037 enum GNUNET_BLOCK_Type type, 973 enum GNUNET_BLOCK_Type type,
1038 int is_asc, 974 PluginIterator iter,
1039 int is_prio, 975 void *iter_cls)
1040 int is_migr,
1041 int limit_nonanonymous,
1042 const char *stmt_str_1,
1043 const char *stmt_str_2,
1044 PluginIterator iter,
1045 void *iter_cls)
1046{ 976{
977 struct Plugin *plugin = cls;
978 struct GNUNET_TIME_Absolute now;
1047 struct NextContext *nc; 979 struct NextContext *nc;
1048 struct IterContext *ic; 980 struct IterContext *ic;
1049 sqlite3_stmt *stmt_1; 981 sqlite3_stmt *stmt_1;
1050 sqlite3_stmt *stmt_2; 982 sqlite3_stmt *stmt_2;
983 char *q;
1051 984
1052#if DEBUG_SQLITE 985 now = GNUNET_TIME_absolute_get ();
1053 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 986 GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_1,
1054 "At %llu, using queries `%s' and `%s'\n", 987 (unsigned long long) now.abs_value);
1055 (unsigned long long) GNUNET_TIME_absolute_get ().abs_value, 988 if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
1056 stmt_str_1,
1057 stmt_str_2);
1058#endif
1059 if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1060 { 989 {
1061 LOG_SQLITE (plugin, NULL, 990 LOG_SQLITE (plugin, NULL,
1062 GNUNET_ERROR_TYPE_ERROR | 991 GNUNET_ERROR_TYPE_ERROR |
1063 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); 992 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1064 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 993 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
994 GNUNET_free (q);
1065 return; 995 return;
1066 } 996 }
1067 if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK) 997 GNUNET_free (q);
998 GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_2,
999 (unsigned long long) now.abs_value);
1000 if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1068 { 1001 {
1069 LOG_SQLITE (plugin, NULL, 1002 LOG_SQLITE (plugin, NULL,
1070 GNUNET_ERROR_TYPE_ERROR | 1003 GNUNET_ERROR_TYPE_ERROR |
1071 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); 1004 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1072 sqlite3_finalize (stmt_1); 1005 sqlite3_finalize (stmt_1);
1073 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 1006 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1007 GNUNET_free (q);
1074 return; 1008 return;
1075 } 1009 }
1010 GNUNET_free (q);
1076 nc = GNUNET_malloc (sizeof(struct NextContext) + 1011 nc = GNUNET_malloc (sizeof(struct NextContext) +
1077 sizeof(struct IterContext)); 1012 sizeof(struct IterContext));
1078 nc->plugin = plugin; 1013 nc->plugin = plugin;
@@ -1083,166 +1018,15 @@ basic_iter (struct Plugin *plugin,
1083 ic->stmt_1 = stmt_1; 1018 ic->stmt_1 = stmt_1;
1084 ic->stmt_2 = stmt_2; 1019 ic->stmt_2 = stmt_2;
1085 ic->type = type; 1020 ic->type = type;
1086 ic->is_asc = is_asc;
1087 ic->is_prio = is_prio;
1088 ic->is_migr = is_migr;
1089 ic->limit_nonanonymous = limit_nonanonymous;
1090 nc->prep = &iter_next_prepare; 1021 nc->prep = &iter_next_prepare;
1091 nc->prep_cls = ic; 1022 nc->prep_cls = ic;
1092 if (is_asc) 1023 nc->lastPriority = 0x7FFFFFFF;
1093 { 1024 memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1094 nc->lastPriority = 0;
1095 nc->lastExpiration.abs_value = 0;
1096 memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1097 }
1098 else
1099 {
1100 nc->lastPriority = 0x7FFFFFFF;
1101 nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL;
1102 memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1103 }
1104 sqlite_next_request (nc, GNUNET_NO); 1025 sqlite_next_request (nc, GNUNET_NO);
1105} 1026}
1106 1027
1107 1028
1108/** 1029/**
1109 * Select a subset of the items in the datastore and call
1110 * the given iterator for each of them.
1111 *
1112 * @param cls our plugin context
1113 * @param type entries of which type should be considered?
1114 * Use 0 for any type.
1115 * @param iter function to call on each matching value;
1116 * will be called once with a NULL value at the end
1117 * @param iter_cls closure for iter
1118 */
1119static void
1120sqlite_plugin_iter_low_priority (void *cls,
1121 enum GNUNET_BLOCK_Type type,
1122 PluginIterator iter,
1123 void *iter_cls)
1124{
1125 basic_iter (cls,
1126 type,
1127 GNUNET_YES, GNUNET_YES,
1128 GNUNET_NO, GNUNET_NO,
1129 SELECT_IT_LOW_PRIORITY_1,
1130 SELECT_IT_LOW_PRIORITY_2,
1131 iter, iter_cls);
1132}
1133
1134
1135/**
1136 * Select a subset of the items in the datastore and call
1137 * the given iterator for each of them.
1138 *
1139 * @param cls our plugin context
1140 * @param type entries of which type should be considered?
1141 * Use 0 for any type.
1142 * @param iter function to call on each matching value;
1143 * will be called once with a NULL value at the end
1144 * @param iter_cls closure for iter
1145 */
1146static void
1147sqlite_plugin_iter_zero_anonymity (void *cls,
1148 enum GNUNET_BLOCK_Type type,
1149 PluginIterator iter,
1150 void *iter_cls)
1151{
1152 struct GNUNET_TIME_Absolute now;
1153 char *q1;
1154 char *q2;
1155
1156 now = GNUNET_TIME_absolute_get ();
1157 GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1158 (unsigned long long) now.abs_value);
1159 GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1160 (unsigned long long) now.abs_value);
1161 basic_iter (cls,
1162 type,
1163 GNUNET_NO, GNUNET_YES,
1164 GNUNET_NO, GNUNET_YES,
1165 q1,
1166 q2,
1167 iter, iter_cls);
1168 GNUNET_free (q1);
1169 GNUNET_free (q2);
1170}
1171
1172
1173
1174/**
1175 * Select a subset of the items in the datastore and call
1176 * the given iterator for each of them.
1177 *
1178 * @param cls our plugin context
1179 * @param type entries of which type should be considered?
1180 * Use 0 for any type.
1181 * @param iter function to call on each matching value;
1182 * will be called once with a NULL value at the end
1183 * @param iter_cls closure for iter
1184 */
1185static void
1186sqlite_plugin_iter_ascending_expiration (void *cls,
1187 enum GNUNET_BLOCK_Type type,
1188 PluginIterator iter,
1189 void *iter_cls)
1190{
1191 struct GNUNET_TIME_Absolute now;
1192 char *q1;
1193 char *q2;
1194
1195 now = GNUNET_TIME_absolute_get ();
1196 GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1197 (unsigned long long) 0*now.abs_value);
1198 GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1199 (unsigned long long) 0*now.abs_value);
1200 basic_iter (cls,
1201 type,
1202 GNUNET_YES, GNUNET_NO,
1203 GNUNET_NO, GNUNET_NO,
1204 q1, q2,
1205 iter, iter_cls);
1206 GNUNET_free (q1);
1207 GNUNET_free (q2);
1208}
1209
1210
1211/**
1212 * Select a subset of the items in the datastore and call
1213 * the given iterator for each of them.
1214 *
1215 * @param cls our plugin context
1216 * @param type entries of which type should be considered?
1217 * Use 0 for any type.
1218 * @param iter function to call on each matching value;
1219 * will be called once with a NULL value at the end
1220 * @param iter_cls closure for iter
1221 */
1222static void
1223sqlite_plugin_iter_migration_order (void *cls,
1224 enum GNUNET_BLOCK_Type type,
1225 PluginIterator iter,
1226 void *iter_cls)
1227{
1228 struct GNUNET_TIME_Absolute now;
1229 char *q;
1230
1231 now = GNUNET_TIME_absolute_get ();
1232 GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1233 (unsigned long long) now.abs_value);
1234 basic_iter (cls,
1235 type,
1236 GNUNET_NO, GNUNET_NO,
1237 GNUNET_YES, GNUNET_NO,
1238 SELECT_IT_MIGRATION_ORDER_1,
1239 q,
1240 iter, iter_cls);
1241 GNUNET_free (q);
1242}
1243
1244
1245/**
1246 * Call sqlite using the already prepared query to get 1030 * Call sqlite using the already prepared query to get
1247 * the next result. 1031 * the next result.
1248 * 1032 *
@@ -1271,19 +1055,20 @@ all_next_prepare (void *cls,
1271 return GNUNET_SYSERR; 1055 return GNUNET_SYSERR;
1272 } 1056 }
1273 plugin = nc->plugin; 1057 plugin = nc->plugin;
1274 if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt))) 1058 ret = sqlite3_step (nc->stmt);
1275 { 1059 switch (ret)
1276 return GNUNET_OK;
1277 }
1278 if (ret != SQLITE_DONE)
1279 { 1060 {
1061 case SQLITE_ROW:
1062 return GNUNET_OK;
1063 case SQLITE_DONE:
1064 return GNUNET_NO;
1065 default:
1280 LOG_SQLITE (plugin, NULL, 1066 LOG_SQLITE (plugin, NULL,
1281 GNUNET_ERROR_TYPE_ERROR | 1067 GNUNET_ERROR_TYPE_ERROR |
1282 GNUNET_ERROR_TYPE_BULK, 1068 GNUNET_ERROR_TYPE_BULK,
1283 "sqlite3_step"); 1069 "sqlite3_step");
1284 return GNUNET_SYSERR; 1070 return GNUNET_SYSERR;
1285 } 1071 }
1286 return GNUNET_NO;
1287} 1072}
1288 1073
1289 1074
@@ -1466,7 +1251,7 @@ sqlite_plugin_get (void *cls,
1466 GNUNET_assert (iter != NULL); 1251 GNUNET_assert (iter != NULL);
1467 if (key == NULL) 1252 if (key == NULL)
1468 { 1253 {
1469 sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls); 1254 sqlite_plugin_iter_all_now (cls, type, iter, iter_cls);
1470 return; 1255 return;
1471 } 1256 }
1472 GNUNET_snprintf (scratch, sizeof (scratch), 1257 GNUNET_snprintf (scratch, sizeof (scratch),
@@ -1561,46 +1346,30 @@ sqlite_plugin_get (void *cls,
1561 1346
1562 1347
1563/** 1348/**
1564 * Get a random item for replication. Returns a single, not expired, random item 1349 * Execute statement that gets a row and call the iterator
1565 * from those with the highest replication counters. The item's 1350 * with the result. Resets the statement afterwards.
1566 * replication counter is decremented by one IF it was positive before.
1567 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1568 * 1351 *
1569 * @param cls closure 1352 * @param plugin the plugin
1570 * @param iter function to call the value (once only). 1353 * @param stmt the statement
1571 * @param iter_cls closure for iter 1354 * @param iter iterator to call
1355 * @param iter_cls closure for 'iter'
1572 */ 1356 */
1573static void 1357static void
1574sqlite_plugin_replication_get (void *cls, 1358execute_get (struct Plugin *plugin,
1575 PluginIterator iter, void *iter_cls) 1359 sqlite3_stmt *stmt,
1360 PluginIterator iter, void *iter_cls)
1576{ 1361{
1577 struct Plugin *plugin = cls;
1578 int n; 1362 int n;
1579 sqlite3_stmt *stmt;
1580 struct GNUNET_TIME_Absolute expiration; 1363 struct GNUNET_TIME_Absolute expiration;
1581 unsigned long long rowid; 1364 unsigned long long rowid;
1365 unsigned int size;
1366 int ret;
1582 1367
1583#if DEBUG_SQLITE
1584 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1585 "sqlite",
1586 "Getting random block based on replication order.\n");
1587#endif
1588 stmt = plugin->selRepl;
1589 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value))
1590 {
1591 LOG_SQLITE (plugin, NULL,
1592 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1593 if (SQLITE_OK != sqlite3_reset (stmt))
1594 LOG_SQLITE (plugin, NULL,
1595 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1596 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
1597 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1598 return;
1599 }
1600 n = sqlite3_step (stmt); 1368 n = sqlite3_step (stmt);
1601 switch (n) 1369 switch (n)
1602 { 1370 {
1603 case SQLITE_ROW: 1371 case SQLITE_ROW:
1372 size = sqlite3_column_bytes (stmt, 5);
1604 rowid = sqlite3_column_int64 (stmt, 6); 1373 rowid = sqlite3_column_int64 (stmt, 6);
1605 if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) 1374 if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1606 { 1375 {
@@ -1611,24 +1380,30 @@ sqlite_plugin_replication_get (void *cls,
1611 LOG_SQLITE (plugin, NULL, 1380 LOG_SQLITE (plugin, NULL,
1612 GNUNET_ERROR_TYPE_ERROR | 1381 GNUNET_ERROR_TYPE_ERROR |
1613 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); 1382 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1614 delete_by_rowid (plugin, rowid); 1383 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1384 plugin->env->duc (plugin->env->cls,
1385 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
1615 break; 1386 break;
1616 } 1387 }
1617 expiration.abs_value = sqlite3_column_int64 (stmt, 3); 1388 expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1618 (void) iter (iter_cls, 1389 ret = iter (iter_cls,
1619 NULL, 1390 NULL,
1620 sqlite3_column_blob (stmt, 4) /* key */, 1391 sqlite3_column_blob (stmt, 4) /* key */,
1621 sqlite3_column_bytes (stmt, 5) /* size of data */, 1392 size,
1622 sqlite3_column_blob (stmt, 5) /* data */, 1393 sqlite3_column_blob (stmt, 5) /* data */,
1623 sqlite3_column_int (stmt, 0) /* type */, 1394 sqlite3_column_int (stmt, 0) /* type */,
1624 sqlite3_column_int (stmt, 1) /* priority */, 1395 sqlite3_column_int (stmt, 1) /* priority */,
1625 sqlite3_column_int (stmt, 2) /* anonymity */, 1396 sqlite3_column_int (stmt, 2) /* anonymity */,
1626 expiration, 1397 expiration,
1627 rowid); 1398 rowid);
1628 if (SQLITE_OK != sqlite3_reset (stmt)) 1399 if (SQLITE_OK != sqlite3_reset (stmt))
1629 LOG_SQLITE (plugin, NULL, 1400 LOG_SQLITE (plugin, NULL,
1630 GNUNET_ERROR_TYPE_ERROR | 1401 GNUNET_ERROR_TYPE_ERROR |
1631 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); 1402 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1403 if ( (GNUNET_NO == ret) &&
1404 (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1405 plugin->env->duc (plugin->env->cls,
1406 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
1632 return; 1407 return;
1633 case SQLITE_DONE: 1408 case SQLITE_DONE:
1634 /* database must be empty */ 1409 /* database must be empty */
@@ -1657,6 +1432,85 @@ sqlite_plugin_replication_get (void *cls,
1657 1432
1658 1433
1659/** 1434/**
1435 * Get a random item for replication. Returns a single, not expired, random item
1436 * from those with the highest replication counters. The item's
1437 * replication counter is decremented by one IF it was positive before.
1438 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1439 *
1440 * @param cls closure
1441 * @param iter function to call the value (once only).
1442 * @param iter_cls closure for iter
1443 */
1444static void
1445sqlite_plugin_replication_get (void *cls,
1446 PluginIterator iter, void *iter_cls)
1447{
1448 struct Plugin *plugin = cls;
1449 sqlite3_stmt *stmt;
1450 struct GNUNET_TIME_Absolute now;
1451
1452#if DEBUG_SQLITE
1453 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1454 "sqlite",
1455 "Getting random block based on replication order.\n");
1456#endif
1457 stmt = plugin->selRepl;
1458 now = GNUNET_TIME_absolute_get ();
1459 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1460 {
1461 LOG_SQLITE (plugin, NULL,
1462 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1463 if (SQLITE_OK != sqlite3_reset (stmt))
1464 LOG_SQLITE (plugin, NULL,
1465 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1466 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
1467 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1468 return;
1469 }
1470 execute_get (plugin, stmt, iter, iter_cls);
1471}
1472
1473
1474
1475/**
1476 * Get a random item that has expired or has low priority.
1477 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1478 *
1479 * @param cls closure
1480 * @param iter function to call the value (once only).
1481 * @param iter_cls closure for iter
1482 */
1483static void
1484sqlite_plugin_expiration_get (void *cls,
1485 PluginIterator iter, void *iter_cls)
1486{
1487 struct Plugin *plugin = cls;
1488 sqlite3_stmt *stmt;
1489 struct GNUNET_TIME_Absolute now;
1490
1491#if DEBUG_SQLITE
1492 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1493 "sqlite",
1494 "Getting random block based on expiration and priority order.\n");
1495#endif
1496 now = GNUNET_TIME_absolute_get ();
1497 stmt = plugin->selExpi;
1498 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1499 {
1500 LOG_SQLITE (plugin, NULL,
1501 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1502 if (SQLITE_OK != sqlite3_reset (stmt))
1503 LOG_SQLITE (plugin, NULL,
1504 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1505 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
1506 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1507 return;
1508 }
1509 execute_get (plugin, stmt, iter, iter_cls);
1510}
1511
1512
1513/**
1660 * Drop database. 1514 * Drop database.
1661 * 1515 *
1662 * @param cls our plugin context 1516 * @param cls our plugin context
@@ -1669,6 +1523,12 @@ sqlite_plugin_drop (void *cls)
1669} 1523}
1670 1524
1671 1525
1526/**
1527 * FIXME.
1528 *
1529 * @param cls the 'struct Plugin'
1530 * @return the size of the database on disk (estimate)
1531 */
1672static unsigned long long 1532static unsigned long long
1673sqlite_plugin_get_size (void *cls) 1533sqlite_plugin_get_size (void *cls)
1674{ 1534{
@@ -1749,11 +1609,9 @@ libgnunet_plugin_datastore_sqlite_init (void *cls)
1749 api->next_request = &sqlite_next_request; 1609 api->next_request = &sqlite_next_request;
1750 api->get = &sqlite_plugin_get; 1610 api->get = &sqlite_plugin_get;
1751 api->replication_get = &sqlite_plugin_replication_get; 1611 api->replication_get = &sqlite_plugin_replication_get;
1612 api->expiration_get = &sqlite_plugin_expiration_get;
1752 api->update = &sqlite_plugin_update; 1613 api->update = &sqlite_plugin_update;
1753 api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1754 api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity; 1614 api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1755 api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1756 api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1757 api->iter_all_now = &sqlite_plugin_iter_all_now; 1615 api->iter_all_now = &sqlite_plugin_iter_all_now;
1758 api->drop = &sqlite_plugin_drop; 1616 api->drop = &sqlite_plugin_drop;
1759 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 1617 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c
index fc67f600e..41d92a117 100644
--- a/src/datastore/plugin_datastore_template.c
+++ b/src/datastore/plugin_datastore_template.c
@@ -155,6 +155,22 @@ template_plugin_replication_get (void *cls,
155 155
156 156
157/** 157/**
158 * Get a random item for expiration.
159 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
160 *
161 * @param cls closure
162 * @param iter function to call the value (once only).
163 * @param iter_cls closure for iter
164 */
165static void
166template_plugin_expiration_get (void *cls,
167 PluginIterator iter, void *iter_cls)
168{
169 GNUNET_break (0);
170}
171
172
173/**
158 * Update the priority for a particular key in the datastore. If 174 * Update the priority for a particular key in the datastore. If
159 * the expiration time in value is different than the time found in 175 * the expiration time in value is different than the time found in
160 * the datastore, the higher value should be kept. For the 176 * the datastore, the higher value should be kept. For the
@@ -201,28 +217,6 @@ template_plugin_update (void *cls,
201 * @param iter_cls closure for iter 217 * @param iter_cls closure for iter
202 */ 218 */
203static void 219static void
204template_plugin_iter_low_priority (void *cls,
205 enum GNUNET_BLOCK_Type type,
206 PluginIterator iter,
207 void *iter_cls)
208{
209 GNUNET_break (0);
210}
211
212
213
214/**
215 * Select a subset of the items in the datastore and call
216 * the given iterator for each of them.
217 *
218 * @param cls our "struct Plugin*"
219 * @param type entries of which type should be considered?
220 * Use 0 for any type.
221 * @param iter function to call on each matching value;
222 * will be called once with a NULL value at the end
223 * @param iter_cls closure for iter
224 */
225static void
226template_plugin_iter_zero_anonymity (void *cls, 220template_plugin_iter_zero_anonymity (void *cls,
227 enum GNUNET_BLOCK_Type type, 221 enum GNUNET_BLOCK_Type type,
228 PluginIterator iter, 222 PluginIterator iter,
@@ -232,51 +226,6 @@ template_plugin_iter_zero_anonymity (void *cls,
232} 226}
233 227
234 228
235
236/**
237 * Select a subset of the items in the datastore and call
238 * the given iterator for each of them.
239 *
240 * @param cls our "struct Plugin*"
241 * @param type entries of which type should be considered?
242 * Use 0 for any type.
243 * @param iter function to call on each matching value;
244 * will be called once with a NULL value at the end
245 * @param iter_cls closure for iter
246 */
247static void
248template_plugin_iter_ascending_expiration (void *cls,
249 enum GNUNET_BLOCK_Type type,
250 PluginIterator iter,
251 void *iter_cls)
252{
253 GNUNET_break (0);
254}
255
256
257
258/**
259 * Select a subset of the items in the datastore and call
260 * the given iterator for each of them.
261 *
262 * @param cls our "struct Plugin*"
263 * @param type entries of which type should be considered?
264 * Use 0 for any type.
265 * @param iter function to call on each matching value;
266 * will be called once with a NULL value at the end
267 * @param iter_cls closure for iter
268 */
269static void
270template_plugin_iter_migration_order (void *cls,
271 enum GNUNET_BLOCK_Type type,
272 PluginIterator iter,
273 void *iter_cls)
274{
275 GNUNET_break (0);
276}
277
278
279
280/** 229/**
281 * Select a subset of the items in the datastore and call 230 * Select a subset of the items in the datastore and call
282 * the given iterator for each of them. 231 * the given iterator for each of them.
@@ -330,11 +279,9 @@ libgnunet_plugin_datastore_template_init (void *cls)
330 api->next_request = &template_plugin_next_request; 279 api->next_request = &template_plugin_next_request;
331 api->get = &template_plugin_get; 280 api->get = &template_plugin_get;
332 api->replication_get = &template_plugin_replication_get; 281 api->replication_get = &template_plugin_replication_get;
282 api->expiration_get = &template_plugin_expiration_get;
333 api->update = &template_plugin_update; 283 api->update = &template_plugin_update;
334 api->iter_low_priority = &template_plugin_iter_low_priority;
335 api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity; 284 api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity;
336 api->iter_ascending_expiration = &template_plugin_iter_ascending_expiration;
337 api->iter_migration_order = &template_plugin_iter_migration_order;
338 api->iter_all_now = &template_plugin_iter_all_now; 285 api->iter_all_now = &template_plugin_iter_all_now;
339 api->drop = &template_plugin_drop; 286 api->drop = &template_plugin_drop;
340 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 287 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c
index 965f05687..6280907ad 100644
--- a/src/datastore/test_datastore_api.c
+++ b/src/datastore/test_datastore_api.c
@@ -210,7 +210,7 @@ check_value (void *cls,
210 GNUNET_assert (priority == get_priority (i)); 210 GNUNET_assert (priority == get_priority (i));
211 GNUNET_assert (anonymity == get_anonymity(i)); 211 GNUNET_assert (anonymity == get_anonymity(i));
212 GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); 212 GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
213 GNUNET_DATASTORE_get_next (datastore); 213 GNUNET_DATASTORE_iterate_get_next (datastore);
214} 214}
215 215
216 216
@@ -249,7 +249,7 @@ delete_value (void *cls,
249 crc->key = *key; 249 crc->key = *key;
250 crc->data = GNUNET_malloc (size); 250 crc->data = GNUNET_malloc (size);
251 memcpy (crc->data, data, size); 251 memcpy (crc->data, data, size);
252 GNUNET_DATASTORE_get_next (datastore); 252 GNUNET_DATASTORE_iterate_get_next (datastore);
253} 253}
254 254
255 255
@@ -329,7 +329,7 @@ check_multiple (void *cls,
329#endif 329#endif
330 if (priority == get_priority (42)) 330 if (priority == get_priority (42))
331 crc->uid = uid; 331 crc->uid = uid;
332 GNUNET_DATASTORE_get_next (datastore); 332 GNUNET_DATASTORE_iterate_get_next (datastore);
333} 333}
334 334
335 335
@@ -370,7 +370,7 @@ check_update (void *cls,
370 } 370 }
371 else 371 else
372 GNUNET_assert (size == get_size (43)); 372 GNUNET_assert (size == get_size (43));
373 GNUNET_DATASTORE_get_next (datastore); 373 GNUNET_DATASTORE_iterate_get_next (datastore);
374} 374}
375 375
376 376
@@ -420,12 +420,12 @@ run_continuation (void *cls,
420 crc->i); 420 crc->i);
421#endif 421#endif
422 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 422 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
423 GNUNET_DATASTORE_get (datastore, 423 GNUNET_DATASTORE_iterate_key (datastore,
424 &crc->key, 424 &crc->key,
425 get_type (crc->i), 425 get_type (crc->i),
426 1, 1, TIMEOUT, 426 1, 1, TIMEOUT,
427 &check_value, 427 &check_value,
428 crc); 428 crc);
429 break; 429 break;
430 case RP_DEL: 430 case RP_DEL:
431 crc->i--; 431 crc->i--;
@@ -437,12 +437,12 @@ run_continuation (void *cls,
437#endif 437#endif
438 crc->data = NULL; 438 crc->data = NULL;
439 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 439 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
440 GNUNET_DATASTORE_get (datastore, 440 GNUNET_DATASTORE_iterate_key (datastore,
441 &crc->key, 441 &crc->key,
442 get_type (crc->i), 442 get_type (crc->i),
443 1, 1, TIMEOUT, 443 1, 1, TIMEOUT,
444 &delete_value, 444 &delete_value,
445 crc); 445 crc);
446 break; 446 break;
447 case RP_DO_DEL: 447 case RP_DO_DEL:
448#if VERBOSE 448#if VERBOSE
@@ -477,12 +477,12 @@ run_continuation (void *cls,
477 crc->i); 477 crc->i);
478#endif 478#endif
479 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 479 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
480 GNUNET_DATASTORE_get (datastore, 480 GNUNET_DATASTORE_iterate_key (datastore,
481 &crc->key, 481 &crc->key,
482 get_type (crc->i), 482 get_type (crc->i),
483 1, 1, TIMEOUT, 483 1, 1, TIMEOUT,
484 &check_nothing, 484 &check_nothing,
485 crc); 485 crc);
486 break; 486 break;
487 case RP_RESERVE: 487 case RP_RESERVE:
488 crc->phase = RP_PUT_MULTIPLE; 488 crc->phase = RP_PUT_MULTIPLE;
@@ -526,12 +526,12 @@ run_continuation (void *cls,
526 crc); 526 crc);
527 break; 527 break;
528 case RP_GET_MULTIPLE: 528 case RP_GET_MULTIPLE:
529 GNUNET_DATASTORE_get (datastore, 529 GNUNET_DATASTORE_iterate_key (datastore,
530 &crc->key, 530 &crc->key,
531 get_type (42), 531 get_type (42),
532 1, 1, TIMEOUT, 532 1, 1, TIMEOUT,
533 &check_multiple, 533 &check_multiple,
534 crc); 534 crc);
535 break; 535 break;
536 case RP_GET_MULTIPLE_NEXT: 536 case RP_GET_MULTIPLE_NEXT:
537 case RP_GET_MULTIPLE_DONE: 537 case RP_GET_MULTIPLE_DONE:
@@ -549,12 +549,12 @@ run_continuation (void *cls,
549 crc); 549 crc);
550 break; 550 break;
551 case RP_UPDATE_VALIDATE: 551 case RP_UPDATE_VALIDATE:
552 GNUNET_DATASTORE_get (datastore, 552 GNUNET_DATASTORE_iterate_key (datastore,
553 &crc->key, 553 &crc->key,
554 get_type (42), 554 get_type (42),
555 1, 1, TIMEOUT, 555 1, 1, TIMEOUT,
556 &check_update, 556 &check_update,
557 crc); 557 crc);
558 break; 558 break;
559 case RP_UPDATE_DONE: 559 case RP_UPDATE_DONE:
560 GNUNET_assert (0); 560 GNUNET_assert (0);
diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c
index 50a426af6..5dfb5cea7 100644
--- a/src/datastore/test_datastore_api_management.c
+++ b/src/datastore/test_datastore_api_management.c
@@ -181,7 +181,7 @@ check_value (void *cls,
181 GNUNET_assert (priority == get_priority (i)); 181 GNUNET_assert (priority == get_priority (i));
182 GNUNET_assert (anonymity == get_anonymity(i)); 182 GNUNET_assert (anonymity == get_anonymity(i));
183 GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); 183 GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
184 GNUNET_DATASTORE_get_next (datastore); 184 GNUNET_DATASTORE_iterate_get_next (datastore);
185} 185}
186 186
187 187
@@ -254,12 +254,12 @@ run_continuation (void *cls,
254 crc->i); 254 crc->i);
255#endif 255#endif
256 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 256 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
257 GNUNET_DATASTORE_get (datastore, 257 GNUNET_DATASTORE_iterate_key (datastore,
258 &crc->key, 258 &crc->key,
259 get_type (crc->i), 259 get_type (crc->i),
260 1, 1, TIMEOUT, 260 1, 1, TIMEOUT,
261 &check_value, 261 &check_value,
262 crc); 262 crc);
263 break; 263 break;
264 case RP_GET_FAIL: 264 case RP_GET_FAIL:
265#if VERBOSE 265#if VERBOSE
@@ -269,12 +269,12 @@ run_continuation (void *cls,
269 crc->i); 269 crc->i);
270#endif 270#endif
271 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 271 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
272 GNUNET_DATASTORE_get (datastore, 272 GNUNET_DATASTORE_iterate_key (datastore,
273 &crc->key, 273 &crc->key,
274 get_type (crc->i), 274 get_type (crc->i),
275 1, 1, TIMEOUT, 275 1, 1, TIMEOUT,
276 &check_nothing, 276 &check_nothing,
277 crc); 277 crc);
278 break; 278 break;
279 case RP_DONE: 279 case RP_DONE:
280 GNUNET_assert (0 == crc->i); 280 GNUNET_assert (0 == crc->i);
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index caf534140..35d89c50f 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1297,7 +1297,7 @@ process_migration_content (void *cls,
1297 MIN_MIGRATION_CONTENT_LIFETIME.rel_value) 1297 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
1298 { 1298 {
1299 /* content will expire soon, don't bother */ 1299 /* content will expire soon, don't bother */
1300 GNUNET_DATASTORE_get_next (dsh); 1300 GNUNET_DATASTORE_iterate_get_next (dsh);
1301 return; 1301 return;
1302 } 1302 }
1303 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1303 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
@@ -1309,7 +1309,7 @@ process_migration_content (void *cls,
1309 &process_migration_content, 1309 &process_migration_content,
1310 NULL)) 1310 NULL))
1311 { 1311 {
1312 GNUNET_DATASTORE_get_next (dsh); 1312 GNUNET_DATASTORE_iterate_get_next (dsh);
1313 } 1313 }
1314 return; 1314 return;
1315 } 1315 }
@@ -1333,7 +1333,7 @@ process_migration_content (void *cls,
1333 GNUNET_CONTAINER_multihashmap_iterate (connected_peers, 1333 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1334 &consider_migration, 1334 &consider_migration,
1335 mb); 1335 mb);
1336 GNUNET_DATASTORE_get_next (dsh); 1336 GNUNET_DATASTORE_iterate_get_next (dsh);
1337} 1337}
1338 1338
1339 1339
@@ -1344,7 +1344,7 @@ static void
1344dht_put_continuation (void *cls, 1344dht_put_continuation (void *cls,
1345 const struct GNUNET_SCHEDULER_TaskContext *tc) 1345 const struct GNUNET_SCHEDULER_TaskContext *tc)
1346{ 1346{
1347 GNUNET_DATASTORE_get_next (dsh); 1347 GNUNET_DATASTORE_iterate_get_next (dsh);
1348} 1348}
1349 1349
1350 1350
@@ -1455,10 +1455,10 @@ gather_dht_put_blocks (void *cls,
1455 { 1455 {
1456 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1456 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1457 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 1457 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1458 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX, 1458 dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (dsh, 0, UINT_MAX,
1459 GNUNET_TIME_UNIT_FOREVER_REL, 1459 GNUNET_TIME_UNIT_FOREVER_REL,
1460 dht_put_type++, 1460 dht_put_type++,
1461 &process_dht_put_content, NULL); 1461 &process_dht_put_content, NULL);
1462 GNUNET_assert (dht_qe != NULL); 1462 GNUNET_assert (dht_qe != NULL);
1463 } 1463 }
1464} 1464}
@@ -3991,7 +3991,7 @@ process_local_reply (void *cls,
3991 pr)) 3991 pr))
3992 if (pr->qe != NULL) 3992 if (pr->qe != NULL)
3993 { 3993 {
3994 GNUNET_DATASTORE_get_next (dsh); 3994 GNUNET_DATASTORE_iterate_get_next (dsh);
3995 } 3995 }
3996 return; 3996 return;
3997 } 3997 }
@@ -4014,7 +4014,7 @@ process_local_reply (void *cls,
4014 -1, -1, 4014 -1, -1,
4015 GNUNET_TIME_UNIT_FOREVER_REL, 4015 GNUNET_TIME_UNIT_FOREVER_REL,
4016 NULL, NULL); 4016 NULL, NULL);
4017 GNUNET_DATASTORE_get_next (dsh); 4017 GNUNET_DATASTORE_iterate_get_next (dsh);
4018 return; 4018 return;
4019 } 4019 }
4020 prq.type = type; 4020 prq.type = type;
@@ -4033,7 +4033,7 @@ process_local_reply (void *cls,
4033 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) 4033 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
4034 { 4034 {
4035 pr->local_only = GNUNET_YES; /* do not forward */ 4035 pr->local_only = GNUNET_YES; /* do not forward */
4036 GNUNET_DATASTORE_get_next (dsh); 4036 GNUNET_DATASTORE_iterate_get_next (dsh);
4037 return; 4037 return;
4038 } 4038 }
4039 if ( (pr->client_request_list == NULL) && 4039 if ( (pr->client_request_list == NULL) &&
@@ -4048,10 +4048,10 @@ process_local_reply (void *cls,
4048 gettext_noop ("# processing result set cut short due to load"), 4048 gettext_noop ("# processing result set cut short due to load"),
4049 1, 4049 1,
4050 GNUNET_NO); 4050 GNUNET_NO);
4051 GNUNET_DATASTORE_get_next (dsh); 4051 GNUNET_DATASTORE_iterate_get_next (dsh);
4052 return; 4052 return;
4053 } 4053 }
4054 GNUNET_DATASTORE_get_next (dsh); 4054 GNUNET_DATASTORE_iterate_get_next (dsh);
4055} 4055}
4056 4056
4057 4057
@@ -4412,14 +4412,14 @@ handle_p2p_get (void *cls,
4412 "Handing request for `%s' to datastore\n", 4412 "Handing request for `%s' to datastore\n",
4413 GNUNET_h2s (&gm->query)); 4413 GNUNET_h2s (&gm->query));
4414#endif 4414#endif
4415 pr->qe = GNUNET_DATASTORE_get (dsh, 4415 pr->qe = GNUNET_DATASTORE_iterate_key (dsh,
4416 &gm->query, 4416 &gm->query,
4417 type, 4417 type,
4418 pr->priority + 1, 4418 pr->priority + 1,
4419 MAX_DATASTORE_QUEUE, 4419 MAX_DATASTORE_QUEUE,
4420 timeout, 4420 timeout,
4421 &process_local_reply, 4421 &process_local_reply,
4422 pr); 4422 pr);
4423 if (NULL == pr->qe) 4423 if (NULL == pr->qe)
4424 { 4424 {
4425 GNUNET_STATISTICS_update (stats, 4425 GNUNET_STATISTICS_update (stats,
@@ -4617,13 +4617,13 @@ handle_start_search (void *cls,
4617 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 4617 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4618 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) 4618 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4619 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */ 4619 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4620 pr->qe = GNUNET_DATASTORE_get (dsh, 4620 pr->qe = GNUNET_DATASTORE_iterate_key (dsh,
4621 &sm->query, 4621 &sm->query,
4622 type, 4622 type,
4623 -3, -1, 4623 -3, -1,
4624 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 4624 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4625 &process_local_reply, 4625 &process_local_reply,
4626 pr); 4626 pr);
4627} 4627}
4628 4628
4629 4629
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index c44a658df..16389e130 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -1047,7 +1047,7 @@ process_local_reply (void *cls,
1047 pr)) 1047 pr))
1048 { 1048 {
1049 if (pr->qe != NULL) 1049 if (pr->qe != NULL)
1050 GNUNET_DATASTORE_get_next (GSF_dsh); 1050 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
1051 } 1051 }
1052 return; 1052 return;
1053 } 1053 }
@@ -1070,7 +1070,7 @@ process_local_reply (void *cls,
1070 -1, -1, 1070 -1, -1,
1071 GNUNET_TIME_UNIT_FOREVER_REL, 1071 GNUNET_TIME_UNIT_FOREVER_REL,
1072 NULL, NULL); 1072 NULL, NULL);
1073 GNUNET_DATASTORE_get_next (GSF_dsh); 1073 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
1074 return; 1074 return;
1075 } 1075 }
1076 prq.type = type; 1076 prq.type = type;
@@ -1112,7 +1112,7 @@ process_local_reply (void *cls,
1112 } 1112 }
1113 return; 1113 return;
1114 } 1114 }
1115 GNUNET_DATASTORE_get_next (GSF_dsh); 1115 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
1116} 1116}
1117 1117
1118 1118
@@ -1132,20 +1132,20 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1132 GNUNET_assert (NULL == pr->llc_cont); 1132 GNUNET_assert (NULL == pr->llc_cont);
1133 pr->llc_cont = cont; 1133 pr->llc_cont = cont;
1134 pr->llc_cont_cls = cont_cls; 1134 pr->llc_cont_cls = cont_cls;
1135 pr->qe = GNUNET_DATASTORE_get (GSF_dsh, 1135 pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh,
1136 &pr->public_data.query, 1136 &pr->public_data.query,
1137 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 1137 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1138 ? GNUNET_BLOCK_TYPE_ANY 1138 ? GNUNET_BLOCK_TYPE_ANY
1139 : pr->public_data.type, 1139 : pr->public_data.type,
1140 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1140 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1141 ? UINT_MAX 1141 ? UINT_MAX
1142 : 1 /* queue priority */, 1142 : 1 /* queue priority */,
1143 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1143 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1144 ? UINT_MAX 1144 ? UINT_MAX
1145 : 1 /* max queue size */, 1145 : 1 /* max queue size */,
1146 GNUNET_TIME_UNIT_FOREVER_REL, 1146 GNUNET_TIME_UNIT_FOREVER_REL,
1147 &process_local_reply, 1147 &process_local_reply,
1148 pr); 1148 pr);
1149} 1149}
1150 1150
1151 1151
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index 094489273..c08b57020 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -507,7 +507,7 @@ process_migration_content (void *cls,
507 MIN_MIGRATION_CONTENT_LIFETIME.rel_value) 507 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
508 { 508 {
509 /* content will expire soon, don't bother */ 509 /* content will expire soon, don't bother */
510 GNUNET_DATASTORE_get_next (GSF_dsh); 510 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
511 return; 511 return;
512 } 512 }
513 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 513 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
@@ -519,7 +519,7 @@ process_migration_content (void *cls,
519 &process_migration_content, 519 &process_migration_content,
520 NULL)) 520 NULL))
521 { 521 {
522 GNUNET_DATASTORE_get_next (GSF_dsh); 522 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
523 } 523 }
524 return; 524 return;
525 } 525 }
@@ -556,7 +556,7 @@ process_migration_content (void *cls,
556 } 556 }
557 pos = pos->next; 557 pos = pos->next;
558 } 558 }
559 GNUNET_DATASTORE_get_next (GSF_dsh); 559 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
560} 560}
561 561
562 562
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index 5fd2ce81c..121a90bcd 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -109,7 +109,7 @@ static void
109dht_put_continuation (void *cls, 109dht_put_continuation (void *cls,
110 const struct GNUNET_SCHEDULER_TaskContext *tc) 110 const struct GNUNET_SCHEDULER_TaskContext *tc)
111{ 111{
112 GNUNET_DATASTORE_get_next (GSF_dsh); 112 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
113} 113}
114 114
115 115
@@ -198,11 +198,11 @@ gather_dht_put_blocks (void *cls,
198 return; 198 return;
199 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 199 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
200 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 200 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
201 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, 201 dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh,
202 0, UINT_MAX, 202 0, UINT_MAX,
203 GNUNET_TIME_UNIT_FOREVER_REL, 203 GNUNET_TIME_UNIT_FOREVER_REL,
204 dht_put_type++, 204 dht_put_type++,
205 &process_dht_put_content, NULL); 205 &process_dht_put_content, NULL);
206 GNUNET_assert (dht_qe != NULL); 206 GNUNET_assert (dht_qe != NULL);
207} 207}
208 208
diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h
index c981ceb1d..34a659163 100644
--- a/src/include/gnunet_datastore_plugin.h
+++ b/src/include/gnunet_datastore_plugin.h
@@ -162,7 +162,7 @@ typedef int (*PluginPut) (void *cls,
162 uint32_t anonymity, 162 uint32_t anonymity,
163 uint32_t replication, 163 uint32_t replication,
164 struct GNUNET_TIME_Absolute expiration, 164 struct GNUNET_TIME_Absolute expiration,
165 char **msg); 165 char **msg);
166 166
167 167
168/** 168/**
@@ -187,26 +187,25 @@ typedef int (*PluginPut) (void *cls,
187 * @param iter_cls closure for iter 187 * @param iter_cls closure for iter
188 */ 188 */
189typedef void (*PluginGet) (void *cls, 189typedef void (*PluginGet) (void *cls,
190 const GNUNET_HashCode * key, 190 const GNUNET_HashCode *key,
191 const GNUNET_HashCode * vhash, 191 const GNUNET_HashCode *vhash,
192 enum GNUNET_BLOCK_Type type, 192 enum GNUNET_BLOCK_Type type,
193 PluginIterator iter, void *iter_cls); 193 PluginIterator iter, void *iter_cls);
194 194
195 195
196 196
197/** 197/**
198 * Get a random item for replication. Returns a single, 198 * Get a random item (additional constraints may apply depending on
199 * not expired, random item 199 * the specific implementation). Calls 'iter' with all values ZERO or
200 * from those with the highest replication counters. The item's 200 * NULL if no item applies, otherwise 'iter' is called once and only
201 * replication counter is decremented by one IF it was positive before. 201 * once with an item, with the 'next_cls' argument being NULL.
202 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
203 * 202 *
204 * @param cls closure 203 * @param cls closure
205 * @param iter function to call the value (once only). 204 * @param iter function to call the value (once only).
206 * @param iter_cls closure for iter 205 * @param iter_cls closure for iter
207 */ 206 */
208typedef void (*PluginReplicationGet) (void *cls, 207typedef void (*PluginRandomGet) (void *cls,
209 PluginIterator iter, void *iter_cls); 208 PluginIterator iter, void *iter_cls);
210 209
211 210
212/** 211/**
@@ -234,13 +233,16 @@ typedef void (*PluginReplicationGet) (void *cls,
234 */ 233 */
235typedef int (*PluginUpdate) (void *cls, 234typedef int (*PluginUpdate) (void *cls,
236 uint64_t uid, 235 uint64_t uid,
237 int delta, struct GNUNET_TIME_Absolute expire, 236 int delta,
237 struct GNUNET_TIME_Absolute expire,
238 char **msg); 238 char **msg);
239 239
240 240
241/** 241/**
242 * Select a subset of the items in the datastore and call 242 * Select a subset of the items in the datastore and call the given
243 * the given iterator for each of them. 243 * iterator for the first item; then allow getting more items by
244 * calling the 'next_request' callback with the given 'next_cls'
245 * argument passed to 'iter'.
244 * 246 *
245 * @param cls closure 247 * @param cls closure
246 * @param type entries of which type should be considered? 248 * @param type entries of which type should be considered?
@@ -258,6 +260,7 @@ typedef void (*PluginSelector) (void *cls,
258 PluginIterator iter, 260 PluginIterator iter,
259 void *iter_cls); 261 void *iter_cls);
260 262
263
261/** 264/**
262 * Drop database. 265 * Drop database.
263 * 266 *
@@ -307,9 +310,18 @@ struct GNUNET_DATASTORE_PluginFunctions
307 310
308 /** 311 /**
309 * Function to get a random item with high replication score from 312 * Function to get a random item with high replication score from
310 * the database, lowering the item's replication score. 313 * the database, lowering the item's replication score. Returns a
314 * single, not expired, random item from those with the highest
315 * replication counters. The item's replication counter is
316 * decremented by one IF it was positive before.
317 */
318 PluginRandomGet replication_get;
319
320 /**
321 * Function to get a random expired item or, if none are expired, one
322 * with a low priority.
311 */ 323 */
312 PluginReplicationGet replication_get; 324 PluginRandomGet expiration_get;
313 325
314 /** 326 /**
315 * Update the priority for a particular key in the datastore. If 327 * Update the priority for a particular key in the datastore. If
@@ -322,31 +334,11 @@ struct GNUNET_DATASTORE_PluginFunctions
322 PluginUpdate update; 334 PluginUpdate update;
323 335
324 /** 336 /**
325 * Iterate over the items in the datastore in ascending 337 * Iterate over content with anonymity level zero.
326 * order of priority.
327 */
328 PluginSelector iter_low_priority;
329
330 /**
331 * Iterate over content with anonymity zero.
332 */ 338 */
333 PluginSelector iter_zero_anonymity; 339 PluginSelector iter_zero_anonymity;
334 340
335 /** 341 /**
336 * Iterate over the items in the datastore in ascending order of
337 * expiration time.
338 */
339 PluginSelector iter_ascending_expiration;
340
341 /**
342 * Iterate over the items in the datastore in migration
343 * order. Call the given function on the next item only
344 * (and then signal 'end' with a second call). This is
345 * a significant difference from all the other iterators!
346 */
347 PluginSelector iter_migration_order;
348
349 /**
350 * Iterate over all the items in the datastore 342 * Iterate over all the items in the datastore
351 * as fast as possible in a single transaction 343 * as fast as possible in a single transaction
352 * (can lock datastore while this happens, focus 344 * (can lock datastore while this happens, focus
diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h
index 1e2e9e050..284d544f5 100644
--- a/src/include/gnunet_datastore_service.h
+++ b/src/include/gnunet_datastore_service.h
@@ -82,11 +82,11 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
82 * @param cls closure 82 * @param cls closure
83 * @param success GNUNET_SYSERR on failure, 83 * @param success GNUNET_SYSERR on failure,
84 * GNUNET_NO on timeout/queue drop 84 * GNUNET_NO on timeout/queue drop
85 * GNUNET_YES on success 85 * GNUNET_YES (or other positive value) on success
86 * @param msg NULL on success, otherwise an error message 86 * @param msg NULL on success, otherwise an error message
87 */ 87 */
88typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, 88typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls,
89 int success, 89 int32_t success,
90 const char *msg); 90 const char *msg);
91 91
92 92
@@ -148,7 +148,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
148 */ 148 */
149struct GNUNET_DATASTORE_QueueEntry * 149struct GNUNET_DATASTORE_QueueEntry *
150GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, 150GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
151 int rid, 151 uint32_t rid,
152 const GNUNET_HashCode * key, 152 const GNUNET_HashCode * key,
153 size_t size, 153 size_t size,
154 const void *data, 154 const void *data,
@@ -187,7 +187,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
187 */ 187 */
188struct GNUNET_DATASTORE_QueueEntry * 188struct GNUNET_DATASTORE_QueueEntry *
189GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 189GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
190 int rid, 190 uint32_t rid,
191 unsigned int queue_priority, 191 unsigned int queue_priority,
192 unsigned int max_queue_size, 192 unsigned int max_queue_size,
193 struct GNUNET_TIME_Relative timeout, 193 struct GNUNET_TIME_Relative timeout,
@@ -214,7 +214,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
214 */ 214 */
215struct GNUNET_DATASTORE_QueueEntry * 215struct GNUNET_DATASTORE_QueueEntry *
216GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, 216GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
217 unsigned long long uid, 217 uint64_t uid,
218 uint32_t priority, 218 uint32_t priority,
219 struct GNUNET_TIME_Absolute expiration, 219 struct GNUNET_TIME_Absolute expiration,
220 unsigned int queue_priority, 220 unsigned int queue_priority,
@@ -287,7 +287,7 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls,
287 * in the datastore. The iterator will only be called 287 * in the datastore. The iterator will only be called
288 * once initially; if the first call did contain a 288 * once initially; if the first call did contain a
289 * result, further results can be obtained by calling 289 * result, further results can be obtained by calling
290 * "GNUNET_DATASTORE_get_next" with the given argument. 290 * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
291 * 291 *
292 * @param h handle to the datastore 292 * @param h handle to the datastore
293 * @param key maybe NULL (to match all entries) 293 * @param key maybe NULL (to match all entries)
@@ -304,24 +304,54 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls,
304 * (or rather, will already have been invoked) 304 * (or rather, will already have been invoked)
305 */ 305 */
306struct GNUNET_DATASTORE_QueueEntry * 306struct GNUNET_DATASTORE_QueueEntry *
307GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, 307GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
308 const GNUNET_HashCode * key, 308 const GNUNET_HashCode * key,
309 enum GNUNET_BLOCK_Type type, 309 enum GNUNET_BLOCK_Type type,
310 unsigned int queue_priority, 310 unsigned int queue_priority,
311 unsigned int max_queue_size, 311 unsigned int max_queue_size,
312 struct GNUNET_TIME_Relative timeout, 312 struct GNUNET_TIME_Relative timeout,
313 GNUNET_DATASTORE_Iterator iter, 313 GNUNET_DATASTORE_Iterator iter,
314 void *iter_cls); 314 void *iter_cls);
315
316
317/**
318 * Get all zero-anonymity values from the datastore.
319 *
320 * @param h handle to the datastore
321 * @param queue_priority ranking of this request in the priority queue
322 * @param max_queue_size at what queue size should this request be dropped
323 * (if other requests of higher priority are in the queue)
324 * @param timeout how long to wait at most for a response
325 * @param type allowed type for the operation (ANY for 'all types')
326 * @param iter function to call on a random value; it
327 * will be called once with a value (if available)
328 * and always once with a value of NULL at the end.
329 * @param iter_cls closure for iter
330 * @return NULL if the entry was not queued, otherwise a handle that can be used to
331 * cancel; note that even if NULL is returned, the callback will be invoked
332 * (or rather, will already have been invoked)
333 */
334struct GNUNET_DATASTORE_QueueEntry *
335GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
336 unsigned int queue_priority,
337 unsigned int max_queue_size,
338 struct GNUNET_TIME_Relative timeout,
339 enum GNUNET_BLOCK_Type type,
340 GNUNET_DATASTORE_Iterator iter,
341 void *iter_cls);
315 342
316 343
317/** 344/**
318 * Function called to trigger obtaining the next result 345 * Function called to trigger obtaining the next result
319 * from the datastore. 346 * from the datastore. ONLY applies for 'GNUNET_DATASTORE_iterate_*'
347 * calls, not for 'get' calls. FIXME: how much mixing of iterate
348 * calls with other operations can we permit!? Should we pass
349 * the 'QueueEntry' instead of the datastore handle here instead?
320 * 350 *
321 * @param h handle to the datastore 351 * @param h handle to the datastore
322 */ 352 */
323void 353void
324GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h); 354GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h);
325 355
326 356
327/** 357/**
@@ -353,32 +383,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
353 void *iter_cls); 383 void *iter_cls);
354 384
355 385
356/**
357 * Get a zero-anonymity value from the datastore.
358 *
359 * @param h handle to the datastore
360 * @param queue_priority ranking of this request in the priority queue
361 * @param max_queue_size at what queue size should this request be dropped
362 * (if other requests of higher priority are in the queue)
363 * @param timeout how long to wait at most for a response
364 * @param type allowed type for the operation
365 * @param iter function to call on a random value; it
366 * will be called once with a value (if available)
367 * and always once with a value of NULL.
368 * @param iter_cls closure for iter
369 * @return NULL if the entry was not queued, otherwise a handle that can be used to
370 * cancel; note that even if NULL is returned, the callback will be invoked
371 * (or rather, will already have been invoked)
372 */
373struct GNUNET_DATASTORE_QueueEntry *
374GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
375 unsigned int queue_priority,
376 unsigned int max_queue_size,
377 struct GNUNET_TIME_Relative timeout,
378 enum GNUNET_BLOCK_Type type,
379 GNUNET_DATASTORE_Iterator iter,
380 void *iter_cls);
381
382 386
383/** 387/**
384 * Cancel a datastore operation. The final callback from the 388 * Cancel a datastore operation. The final callback from the