aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_sqlite.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_sqlite.c')
-rw-r--r--src/datastore/plugin_datastore_sqlite.c918
1 files changed, 183 insertions, 735 deletions
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c
index 6e77ec364..3710b7eb7 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -108,19 +108,14 @@ struct Plugin
108 sqlite3_stmt *selExpi; 108 sqlite3_stmt *selExpi;
109 109
110 /** 110 /**
111 * Precompiled SQL for insertion. 111 * Precompiled SQL for expiration selection.
112 */
113 sqlite3_stmt *insertContent;
114
115 /**
116 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
117 */ 112 */
118 struct NextContext *next_task_nc; 113 sqlite3_stmt *selZeroAnon;
119 114
120 /** 115 /**
121 * Pending task with scheduler for running the next request. 116 * Precompiled SQL for insertion.
122 */ 117 */
123 GNUNET_SCHEDULER_TaskIdentifier next_task; 118 sqlite3_stmt *insertContent;
124 119
125 /** 120 /**
126 * Should the database be dropped on shutdown? 121 * Should the database be dropped on shutdown?
@@ -326,6 +321,11 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
326 " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 " 321 " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
327 " ORDER BY prio ASC LIMIT 1", 322 " ORDER BY prio ASC LIMIT 1",
328 &plugin->selExpi) != SQLITE_OK) || 323 &plugin->selExpi) != SQLITE_OK) ||
324 (sq_prepare (plugin->dbh,
325 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
326 "WHERE (anonLevel = 0 AND type=?1) "
327 "ORDER BY hash DESC LIMIT 1 OFFSET ?2",
328 &plugin->selZeroAnon) != SQLITE_OK) ||
329 (sq_prepare (plugin->dbh, 329 (sq_prepare (plugin->dbh,
330 "INSERT INTO gn090 (repl, type, prio, " 330 "INSERT INTO gn090 (repl, type, prio, "
331 "anonLevel, expire, hash, vhash, value) " 331 "anonLevel, expire, hash, vhash, value) "
@@ -367,6 +367,8 @@ database_shutdown (struct Plugin *plugin)
367 sqlite3_finalize (plugin->selRepl); 367 sqlite3_finalize (plugin->selRepl);
368 if (plugin->selExpi != NULL) 368 if (plugin->selExpi != NULL)
369 sqlite3_finalize (plugin->selExpi); 369 sqlite3_finalize (plugin->selExpi);
370 if (plugin->selZeroAnon != NULL)
371 sqlite3_finalize (plugin->selZeroAnon);
370 if (plugin->insertContent != NULL) 372 if (plugin->insertContent != NULL)
371 sqlite3_finalize (plugin->insertContent); 373 sqlite3_finalize (plugin->insertContent);
372 result = sqlite3_close(plugin->dbh); 374 result = sqlite3_close(plugin->dbh);
@@ -436,247 +438,6 @@ delete_by_rowid (struct Plugin* plugin,
436 438
437 439
438/** 440/**
439 * Context for the universal iterator.
440 */
441struct NextContext;
442
443/**
444 * Type of a function that will prepare
445 * the next iteration.
446 *
447 * @param cls closure
448 * @param nc the next context; NULL for the last
449 * call which gives the callback a chance to
450 * clean up the closure
451 * @return GNUNET_OK on success, GNUNET_NO if there are
452 * no more values, GNUNET_SYSERR on error
453 */
454typedef int (*PrepareFunction)(void *cls,
455 struct NextContext *nc);
456
457
458/**
459 * Context we keep for the "next request" callback.
460 */
461struct NextContext
462{
463 /**
464 * Internal state.
465 */
466 struct Plugin *plugin;
467
468 /**
469 * Function to call on the next value.
470 */
471 PluginIterator iter;
472
473 /**
474 * Closure for iter.
475 */
476 void *iter_cls;
477
478 /**
479 * Function to call to prepare the next
480 * iteration.
481 */
482 PrepareFunction prep;
483
484 /**
485 * Closure for prep.
486 */
487 void *prep_cls;
488
489 /**
490 * Statement that the iterator will get the data
491 * from (updated or set by prep).
492 */
493 sqlite3_stmt *stmt;
494
495 /**
496 * Row ID of the last result.
497 */
498 unsigned long long last_rowid;
499
500 /**
501 * Key of the last result.
502 */
503 GNUNET_HashCode lastKey;
504
505 /**
506 * Priority of the last value visited.
507 */
508 unsigned int lastPriority;
509
510 /**
511 * Number of results processed so far.
512 */
513 unsigned int count;
514
515 /**
516 * Set to GNUNET_YES if we must stop now.
517 */
518 int end_it;
519};
520
521
522/**
523 * Continuation of "sqlite_next_request".
524 *
525 * @param cls the 'struct NextContext*'
526 * @param tc the task context (unused)
527 */
528static void
529sqlite_next_request_cont (void *cls,
530 const struct GNUNET_SCHEDULER_TaskContext *tc)
531{
532 struct NextContext * nc = cls;
533 struct Plugin *plugin;
534 unsigned long long rowid;
535 int ret;
536 unsigned int size;
537 unsigned int hsize;
538 uint32_t anonymity;
539 uint32_t priority;
540 enum GNUNET_BLOCK_Type type;
541 const GNUNET_HashCode *key;
542 struct GNUNET_TIME_Absolute expiration;
543
544 plugin = nc->plugin;
545 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
546 plugin->next_task_nc = NULL;
547 if ( (GNUNET_YES == nc->end_it) ||
548 (GNUNET_OK != (nc->prep(nc->prep_cls,
549 nc))) )
550 {
551#if DEBUG_SQLITE
552 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
553 "sqlite",
554 "Iteration completes after %u results\n",
555 nc->count);
556#endif
557 END:
558 nc->iter (nc->iter_cls,
559 NULL, NULL, 0, NULL, 0, 0, 0,
560 GNUNET_TIME_UNIT_ZERO_ABS, 0);
561 nc->prep (nc->prep_cls, NULL);
562 GNUNET_free (nc);
563 return;
564 }
565
566 type = sqlite3_column_int (nc->stmt, 0);
567 priority = sqlite3_column_int (nc->stmt, 1);
568 anonymity = sqlite3_column_int (nc->stmt, 2);
569 expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
570 hsize = sqlite3_column_bytes (nc->stmt, 4);
571 key = sqlite3_column_blob (nc->stmt, 4);
572 size = sqlite3_column_bytes (nc->stmt, 5);
573 rowid = sqlite3_column_int64 (nc->stmt, 6);
574 if (hsize != sizeof (GNUNET_HashCode))
575 {
576 GNUNET_break (0);
577 if (SQLITE_OK != sqlite3_reset (nc->stmt))
578 LOG_SQLITE (plugin, NULL,
579 GNUNET_ERROR_TYPE_ERROR |
580 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
581 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
582 plugin->env->duc (plugin->env->cls,
583 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
584 goto END;
585 }
586#if DEBUG_SQLITE
587 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
588 "sqlite",
589 "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
590 type,
591 GNUNET_h2s(key),
592 priority,
593 (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
594 (long long) expiration.abs_value);
595#endif
596 if (size > MAX_ITEM_SIZE)
597 {
598 GNUNET_break (0);
599 if (SQLITE_OK != sqlite3_reset (nc->stmt))
600 LOG_SQLITE (plugin, NULL,
601 GNUNET_ERROR_TYPE_ERROR |
602 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
603 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
604 plugin->env->duc (plugin->env->cls,
605 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
606 goto END;
607 }
608 {
609 char data[size];
610
611 memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
612 nc->count++;
613 nc->last_rowid = rowid;
614 nc->lastPriority = priority;
615 nc->lastKey = *key;
616 if (SQLITE_OK != sqlite3_reset (nc->stmt))
617 LOG_SQLITE (plugin, NULL,
618 GNUNET_ERROR_TYPE_ERROR |
619 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
620 ret = nc->iter (nc->iter_cls, nc,
621 &nc->lastKey,
622 size, data,
623 type, priority,
624 anonymity, expiration,
625 rowid);
626 }
627 switch (ret)
628 {
629 case GNUNET_SYSERR:
630 nc->end_it = GNUNET_YES;
631 break;
632 case GNUNET_NO:
633 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
634 {
635#if DEBUG_SQLITE
636 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
637 "sqlite",
638 "Removed entry %llu (%u bytes)\n",
639 (unsigned long long) rowid,
640 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
641#endif
642 plugin->env->duc (plugin->env->cls,
643 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
644 }
645 break;
646 case GNUNET_YES:
647 break;
648 default:
649 GNUNET_break (0);
650 }
651}
652
653
654/**
655 * Function invoked on behalf of a "PluginIterator" asking the
656 * database plugin to call the iterator with the next item.
657 *
658 * @param next_cls whatever argument was given
659 * to the PluginIterator as "next_cls".
660 * @param end_it set to GNUNET_YES if we
661 * should terminate the iteration early
662 * (iterator should be still called once more
663 * to signal the end of the iteration).
664 */
665static void
666sqlite_next_request (void *next_cls,
667 int end_it)
668{
669 struct NextContext * nc= next_cls;
670
671 if (GNUNET_YES == end_it)
672 nc->end_it = GNUNET_YES;
673 nc->plugin->next_task_nc = nc;
674 nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
675 nc);
676}
677
678
679/**
680 * Store an item in the datastore. 441 * Store an item in the datastore.
681 * 442 *
682 * @param cls closure 443 * @param cls closure
@@ -849,355 +610,147 @@ sqlite_plugin_update (void *cls,
849 610
850 611
851/** 612/**
852 * Internal context for an iteration. 613 * Execute statement that gets a row and call the callback
853 */ 614 * with the result. Resets the statement afterwards.
854struct ZeroIterContext
855{
856 /**
857 * First iterator statement for zero-anonymity iteration.
858 */
859 sqlite3_stmt *stmt_1;
860
861 /**
862 * Second iterator statement for zero-anonymity iteration.
863 */
864 sqlite3_stmt *stmt_2;
865
866 /**
867 * Desired type for blocks returned by this iterator.
868 */
869 enum GNUNET_BLOCK_Type type;
870};
871
872
873/**
874 * Prepare our SQL query to obtain the next record from the database.
875 * 615 *
876 * @param cls our "struct ZeroIterContext" 616 * @param plugin the plugin
877 * @param nc NULL to terminate the iteration, otherwise our context for 617 * @param stmt the statement
878 * getting the next result. 618 * @param proc processor to call
879 * @return GNUNET_OK on success, GNUNET_NO if there are no more results, 619 * @param proc_cls closure for 'proc'
880 * GNUNET_SYSERR on error (or end of iteration)
881 */ 620 */
882static int 621static void
883zero_iter_next_prepare (void *cls, 622execute_get (struct Plugin *plugin,
884 struct NextContext *nc) 623 sqlite3_stmt *stmt,
624 PluginDatumProcessor proc, void *proc_cls)
885{ 625{
886 struct ZeroIterContext *ic = cls; 626 int n;
887 struct Plugin *plugin; 627 struct GNUNET_TIME_Absolute expiration;
628 unsigned long long rowid;
629 unsigned int size;
888 int ret; 630 int ret;
889 631
890 if (nc == NULL) 632 n = sqlite3_step (stmt);
891 { 633 switch (n)
892#if DEBUG_SQLITE
893 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894 "Asked to clean up iterator state.\n");
895#endif
896 sqlite3_finalize (ic->stmt_1);
897 sqlite3_finalize (ic->stmt_2);
898 return GNUNET_SYSERR;
899 }
900 plugin = nc->plugin;
901
902 /* first try iter 1 */
903#if DEBUG_SQLITE
904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905 "Restricting to results larger than the last priority %u and key `%s'\n",
906 nc->lastPriority,
907 GNUNET_h2s (&nc->lastKey));
908#endif
909 if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
910 (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2,
911 &nc->lastKey,
912 sizeof (GNUNET_HashCode),
913 SQLITE_TRANSIENT)) )
914 { 634 {
915 LOG_SQLITE (plugin, NULL, 635 case SQLITE_ROW:
916 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); 636 size = sqlite3_column_bytes (stmt, 5);
917 if (SQLITE_OK != sqlite3_reset (ic->stmt_1)) 637 rowid = sqlite3_column_int64 (stmt, 6);
638 if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
639 {
640 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
641 "sqlite",
642 _("Invalid data in database. Trying to fix (by deletion).\n"));
643 if (SQLITE_OK != sqlite3_reset (stmt))
644 LOG_SQLITE (plugin, NULL,
645 GNUNET_ERROR_TYPE_ERROR |
646 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
647 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
648 plugin->env->duc (plugin->env->cls,
649 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
650 break;
651 }
652 expiration.abs_value = sqlite3_column_int64 (stmt, 3);
653 ret = proc (proc_cls,
654 sqlite3_column_blob (stmt, 4) /* key */,
655 size,
656 sqlite3_column_blob (stmt, 5) /* data */,
657 sqlite3_column_int (stmt, 0) /* type */,
658 sqlite3_column_int (stmt, 1) /* priority */,
659 sqlite3_column_int (stmt, 2) /* anonymity */,
660 expiration,
661 rowid);
662 if (SQLITE_OK != sqlite3_reset (stmt))
918 LOG_SQLITE (plugin, NULL, 663 LOG_SQLITE (plugin, NULL,
919 GNUNET_ERROR_TYPE_ERROR | 664 GNUNET_ERROR_TYPE_ERROR |
920 GNUNET_ERROR_TYPE_BULK, 665 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
921 "sqlite3_reset"); 666 if ( (GNUNET_NO == ret) &&
922 return GNUNET_SYSERR; 667 (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
923 } 668 plugin->env->duc (plugin->env->cls,
924 if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1))) 669 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
925 { 670 return;
926#if DEBUG_SQLITE 671 case SQLITE_DONE:
927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 672 /* database must be empty */
928 "Result found using iterator 1\n"); 673 if (SQLITE_OK != sqlite3_reset (stmt))
929#endif
930 nc->stmt = ic->stmt_1;
931 return GNUNET_OK;
932 }
933 if (ret != SQLITE_DONE)
934 {
935 LOG_SQLITE (plugin, NULL,
936 GNUNET_ERROR_TYPE_ERROR |
937 GNUNET_ERROR_TYPE_BULK,
938 "sqlite3_step");
939 if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
940 LOG_SQLITE (plugin, NULL, 674 LOG_SQLITE (plugin, NULL,
941 GNUNET_ERROR_TYPE_ERROR | 675 GNUNET_ERROR_TYPE_ERROR |
942 GNUNET_ERROR_TYPE_BULK, 676 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
943 "sqlite3_reset"); 677 break;
944 return GNUNET_SYSERR; 678 case SQLITE_BUSY:
945 } 679 case SQLITE_ERROR:
946 if (SQLITE_OK != sqlite3_reset (ic->stmt_1)) 680 case SQLITE_MISUSE:
947 LOG_SQLITE (plugin, NULL, 681 default:
948 GNUNET_ERROR_TYPE_ERROR |
949 GNUNET_ERROR_TYPE_BULK,
950 "sqlite3_reset");
951
952 /* now try iter 2 */
953 if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
954 {
955 LOG_SQLITE (plugin, NULL,
956 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
957 return GNUNET_SYSERR;
958 }
959 if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2)))
960 {
961#if DEBUG_SQLITE
962 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963 "Result found using iterator 2\n");
964#endif
965 nc->stmt = ic->stmt_2;
966 return GNUNET_OK;
967 }
968 if (ret != SQLITE_DONE)
969 {
970 LOG_SQLITE (plugin, NULL, 682 LOG_SQLITE (plugin, NULL,
971 GNUNET_ERROR_TYPE_ERROR | 683 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
972 GNUNET_ERROR_TYPE_BULK,
973 "sqlite3_step"); 684 "sqlite3_step");
974 if (SQLITE_OK != sqlite3_reset (ic->stmt_2)) 685 if (SQLITE_OK != sqlite3_reset (stmt))
975 LOG_SQLITE (plugin, NULL, 686 LOG_SQLITE (plugin, NULL,
976 GNUNET_ERROR_TYPE_ERROR | 687 GNUNET_ERROR_TYPE_ERROR |
977 GNUNET_ERROR_TYPE_BULK, 688 GNUNET_ERROR_TYPE_BULK,
978 "sqlite3_reset"); 689 "sqlite3_reset");
979 return GNUNET_SYSERR; 690 GNUNET_break (0);
691 database_shutdown (plugin);
692 database_setup (plugin->env->cfg,
693 plugin);
694 break;
980 } 695 }
981 if (SQLITE_OK != sqlite3_reset (ic->stmt_2)) 696 if (SQLITE_OK != sqlite3_reset (stmt))
982 LOG_SQLITE (plugin, NULL, 697 LOG_SQLITE (plugin, NULL,
983 GNUNET_ERROR_TYPE_ERROR | 698 GNUNET_ERROR_TYPE_ERROR |
984 GNUNET_ERROR_TYPE_BULK, 699 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
985 "sqlite3_reset"); 700 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
986#if DEBUG_SQLITE 701 GNUNET_TIME_UNIT_ZERO_ABS, 0);
987 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988 "No result found using either iterator\n");
989#endif
990 return GNUNET_NO;
991} 702}
992 703
993 704
705
994/** 706/**
995 * Select a subset of the items in the datastore and call 707 * Select a subset of the items in the datastore and call
996 * the given iterator for each of them. 708 * the given processor for the item.
997 * 709 *
998 * @param cls our plugin context 710 * @param cls our plugin context
999 * @param type entries of which type should be considered? 711 * @param type entries of which type should be considered?
1000 * Use 0 for any type. 712 * Use 0 for any type.
1001 * @param iter function to call on each matching value; 713 * @param proc function to call on each matching value;
1002 * will be called once with a NULL value at the end 714 * will be called once with a NULL value at the end
1003 * @param iter_cls closure for iter 715 * @param proc_cls closure for proc
1004 */ 716 */
1005static void 717static void
1006sqlite_plugin_iter_zero_anonymity (void *cls, 718sqlite_plugin_get_zero_anonymity (void *cls,
1007 enum GNUNET_BLOCK_Type type, 719 uint64_t offset,
1008 PluginIterator iter, 720 enum GNUNET_BLOCK_Type type,
1009 void *iter_cls) 721 PluginDatumProcessor proc,
722 void *proc_cls)
1010{ 723{
1011 struct Plugin *plugin = cls; 724 struct Plugin *plugin = cls;
1012 struct GNUNET_TIME_Absolute now; 725 sqlite3_stmt *stmt;
1013 struct NextContext *nc;
1014 struct ZeroIterContext *ic;
1015 sqlite3_stmt *stmt_1;
1016 sqlite3_stmt *stmt_2;
1017 char *q;
1018 726
1019 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 727 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1020 now = GNUNET_TIME_absolute_get (); 728 stmt = plugin->selZeroAnon;
1021 GNUNET_asprintf (&q, 729 if ( (SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) ||
1022 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " 730 (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, offset)) )
1023 "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) "
1024 "ORDER BY hash DESC LIMIT 1",
1025 (unsigned long long) now.abs_value,
1026 type);
1027 if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
1028 { 731 {
1029 LOG_SQLITE (plugin, NULL, 732 LOG_SQLITE (plugin, NULL,
1030 GNUNET_ERROR_TYPE_ERROR | 733 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1031 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); 734 "sqlite3_bind_XXXX");
1032 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 735 if (SQLITE_OK != sqlite3_reset (stmt))
1033 GNUNET_free (q); 736 LOG_SQLITE (plugin, NULL,
1034 return; 737 GNUNET_ERROR_TYPE_ERROR |
1035 } 738 GNUNET_ERROR_TYPE_BULK,
1036 GNUNET_free (q); 739 "sqlite3_reset");
1037 GNUNET_asprintf (&q, 740 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
1038 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " 741 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1039 "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) "
1040 "ORDER BY prio DESC, hash DESC LIMIT 1",
1041 (unsigned long long) now.abs_value,
1042 type);
1043 if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1044 {
1045 LOG_SQLITE (plugin, NULL,
1046 GNUNET_ERROR_TYPE_ERROR |
1047 GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1048 sqlite3_finalize (stmt_1);
1049 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1050 GNUNET_free (q);
1051 return; 742 return;
1052 } 743 }
1053 GNUNET_free (q); 744 execute_get (plugin, stmt, proc, proc_cls);
1054 nc = GNUNET_malloc (sizeof(struct NextContext) +
1055 sizeof(struct ZeroIterContext));
1056 nc->plugin = plugin;
1057 nc->iter = iter;
1058 nc->iter_cls = iter_cls;
1059 nc->stmt = NULL;
1060 ic = (struct ZeroIterContext*) &nc[1];
1061 ic->stmt_1 = stmt_1;
1062 ic->stmt_2 = stmt_2;
1063 ic->type = type;
1064 nc->prep = &zero_iter_next_prepare;
1065 nc->prep_cls = ic;
1066 nc->lastPriority = INT32_MAX;
1067 memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1068 sqlite_next_request (nc, GNUNET_NO);
1069} 745}
1070 746
1071 747
1072/**
1073 * Context for get_next_prepare.
1074 */
1075struct GetNextContext
1076{
1077
1078 /**
1079 * Our prepared statement.
1080 */
1081 sqlite3_stmt *stmt;
1082
1083 /**
1084 * Plugin handle.
1085 */
1086 struct Plugin *plugin;
1087
1088 /**
1089 * Key for the query.
1090 */
1091 GNUNET_HashCode key;
1092
1093 /**
1094 * Vhash for the query.
1095 */
1096 GNUNET_HashCode vhash;
1097
1098 /**
1099 * Expected total number of results.
1100 */
1101 unsigned int total;
1102
1103 /**
1104 * Offset to add for the selected result.
1105 */
1106 unsigned int off;
1107
1108 /**
1109 * Is vhash set?
1110 */
1111 int have_vhash;
1112
1113 /**
1114 * Desired block type.
1115 */
1116 enum GNUNET_BLOCK_Type type;
1117
1118};
1119
1120
1121/**
1122 * Prepare the stmt in 'nc' for the next round of execution, selecting the
1123 * next return value.
1124 *
1125 * @param cls our "struct GetNextContext*"
1126 * @param nc the general context
1127 * @return GNUNET_YES if there are more results,
1128 * GNUNET_NO if there are no more results,
1129 * GNUNET_SYSERR on internal error
1130 */
1131static int
1132get_next_prepare (void *cls,
1133 struct NextContext *nc)
1134{
1135 struct GetNextContext *gnc = cls;
1136 int ret;
1137 int limit_off;
1138 unsigned int sqoff;
1139
1140 if (nc == NULL)
1141 {
1142 sqlite3_finalize (gnc->stmt);
1143 return GNUNET_SYSERR;
1144 }
1145 if (nc->count == gnc->total)
1146 return GNUNET_NO;
1147 if (nc->count + gnc->off == gnc->total)
1148 nc->last_rowid = 0;
1149 if (nc->count == 0)
1150 limit_off = gnc->off;
1151 else
1152 limit_off = 0;
1153 sqlite3_reset (nc->stmt);
1154 sqoff = 1;
1155 ret = sqlite3_bind_blob (nc->stmt,
1156 sqoff++,
1157 &gnc->key,
1158 sizeof (GNUNET_HashCode),
1159 SQLITE_TRANSIENT);
1160 if ((gnc->have_vhash) && (ret == SQLITE_OK))
1161 ret = sqlite3_bind_blob (nc->stmt,
1162 sqoff++,
1163 &gnc->vhash,
1164 sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1165 if ((gnc->type != 0) && (ret == SQLITE_OK))
1166 ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1167 if (ret == SQLITE_OK)
1168 ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
1169 if (ret != SQLITE_OK)
1170 return GNUNET_SYSERR;
1171#if DEBUG_SQLITE
1172 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1173 "sqlite",
1174 "Preparing to GET for key `%s' with type %d at offset %u\n",
1175 GNUNET_h2s (&gnc->key),
1176 gnc->type,
1177 limit_off);
1178#endif
1179 ret = sqlite3_step (nc->stmt);
1180 switch (ret)
1181 {
1182 case SQLITE_ROW:
1183 return GNUNET_OK;
1184 case SQLITE_DONE:
1185 return GNUNET_NO;
1186 default:
1187 LOG_SQLITE (gnc->plugin, NULL,
1188 GNUNET_ERROR_TYPE_ERROR |
1189 GNUNET_ERROR_TYPE_BULK,
1190 "sqlite3_step");
1191 return GNUNET_SYSERR;
1192 }
1193}
1194
1195 748
1196/** 749/**
1197 * Iterate over the results for a particular key 750 * Get results for a particular key in the datastore.
1198 * in the datastore.
1199 * 751 *
1200 * @param cls closure 752 * @param cls closure
753 * @param offset offset (mod count).
1201 * @param key key to match, never NULL 754 * @param key key to match, never NULL
1202 * @param vhash hash of the value, maybe NULL (to 755 * @param vhash hash of the value, maybe NULL (to
1203 * match all values that have the right key). 756 * match all values that have the right key).
@@ -1206,27 +759,27 @@ get_next_prepare (void *cls,
1206 * there may be! 759 * there may be!
1207 * @param type entries of which type are relevant? 760 * @param type entries of which type are relevant?
1208 * Use 0 for any type. 761 * Use 0 for any type.
1209 * @param iter function to call on each matching value; 762 * @param proc function to call on each matching value;
1210 * will be called once with a NULL value at the end 763 * will be called once with a NULL value at the end
1211 * @param iter_cls closure for iter 764 * @param proc_cls closure for proc
1212 */ 765 */
1213static void 766static void
1214sqlite_plugin_get (void *cls, 767sqlite_plugin_get_key (void *cls,
1215 const GNUNET_HashCode *key, 768 uint64_t offset,
1216 const GNUNET_HashCode *vhash, 769 const GNUNET_HashCode *key,
1217 enum GNUNET_BLOCK_Type type, 770 const GNUNET_HashCode *vhash,
1218 PluginIterator iter, void *iter_cls) 771 enum GNUNET_BLOCK_Type type,
772 PluginDatumProcessor proc, void *proc_cls)
1219{ 773{
1220 struct Plugin *plugin = cls; 774 struct Plugin *plugin = cls;
1221 struct GetNextContext *gnc;
1222 struct NextContext *nc;
1223 int ret; 775 int ret;
1224 int total; 776 int total;
777 int limit_off;
778 unsigned int sqoff;
1225 sqlite3_stmt *stmt; 779 sqlite3_stmt *stmt;
1226 char scratch[256]; 780 char scratch[256];
1227 unsigned int sqoff;
1228 781
1229 GNUNET_assert (iter != NULL); 782 GNUNET_assert (proc != NULL);
1230 GNUNET_assert (key != NULL); 783 GNUNET_assert (key != NULL);
1231 GNUNET_snprintf (scratch, sizeof (scratch), 784 GNUNET_snprintf (scratch, sizeof (scratch),
1232 "SELECT count(*) FROM gn090 WHERE hash=?%s%s", 785 "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
@@ -1236,7 +789,7 @@ sqlite_plugin_get (void *cls,
1236 { 789 {
1237 LOG_SQLITE (plugin, NULL, 790 LOG_SQLITE (plugin, NULL,
1238 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare"); 791 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1239 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 792 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1240 return; 793 return;
1241 } 794 }
1242 sqoff = 1; 795 sqoff = 1;
@@ -1253,7 +806,7 @@ sqlite_plugin_get (void *cls,
1253 LOG_SQLITE (plugin, NULL, 806 LOG_SQLITE (plugin, NULL,
1254 GNUNET_ERROR_TYPE_ERROR, "sqlite_bind"); 807 GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1255 sqlite3_finalize (stmt); 808 sqlite3_finalize (stmt);
1256 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 809 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1257 return; 810 return;
1258 } 811 }
1259 ret = sqlite3_step (stmt); 812 ret = sqlite3_step (stmt);
@@ -1263,147 +816,64 @@ sqlite_plugin_get (void *cls,
1263 GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 816 GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK,
1264 "sqlite_step"); 817 "sqlite_step");
1265 sqlite3_finalize (stmt); 818 sqlite3_finalize (stmt);
1266 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 819 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1267 return; 820 return;
1268 } 821 }
1269 total = sqlite3_column_int (stmt, 0); 822 total = sqlite3_column_int (stmt, 0);
1270 sqlite3_finalize (stmt); 823 sqlite3_finalize (stmt);
1271 if (0 == total) 824 if (0 == total)
1272 { 825 {
1273 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 826 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1274 return; 827 return;
1275 } 828 }
829 limit_off = (int) (offset % total);
830 if (limit_off < 0)
831 limit_off += total;
1276 GNUNET_snprintf (scratch, sizeof (scratch), 832 GNUNET_snprintf (scratch, sizeof (scratch),
1277 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ " 833 "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1278 "FROM gn090 WHERE hash=?%s%s " 834 "FROM gn090 WHERE hash=?%s%s "
1279 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?", 835 "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
1280 vhash == NULL ? "" : " AND vhash=?", 836 vhash == NULL ? "" : " AND vhash=?",
1281 type == 0 ? "" : " AND type=?"); 837 type == 0 ? "" : " AND type=?");
1282
1283 if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK) 838 if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1284 { 839 {
1285 LOG_SQLITE (plugin, NULL, 840 LOG_SQLITE (plugin, NULL,
1286 GNUNET_ERROR_TYPE_ERROR | 841 GNUNET_ERROR_TYPE_ERROR |
1287 GNUNET_ERROR_TYPE_BULK, "sqlite_prepare"); 842 GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1288 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 843 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1289 return; 844 return;
1290 } 845 }
1291 nc = GNUNET_malloc (sizeof(struct NextContext) + 846 sqoff = 1;
1292 sizeof(struct GetNextContext)); 847 ret = sqlite3_bind_blob (stmt,
1293 nc->plugin = plugin; 848 sqoff++,
1294 nc->iter = iter; 849 key,
1295 nc->iter_cls = iter_cls; 850 sizeof (GNUNET_HashCode),
1296 nc->stmt = stmt; 851 SQLITE_TRANSIENT);
1297 gnc = (struct GetNextContext*) &nc[1]; 852 if ((vhash != NULL) && (ret == SQLITE_OK))
1298 gnc->total = total; 853 ret = sqlite3_bind_blob (stmt,
1299 gnc->type = type; 854 sqoff++,
1300 gnc->key = *key; 855 vhash,
1301 gnc->plugin = plugin; 856 sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1302 gnc->stmt = stmt; /* alias used for freeing at the end! */ 857 if ((type != 0) && (ret == SQLITE_OK))
1303 if (NULL != vhash) 858 ret = sqlite3_bind_int (stmt, sqoff++, type);
1304 { 859 if (ret == SQLITE_OK)
1305 gnc->have_vhash = GNUNET_YES; 860 ret = sqlite3_bind_int64 (stmt, sqoff++, limit_off);
1306 gnc->vhash = *vhash; 861 if (ret != SQLITE_OK)
1307 }
1308 gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1309 nc->prep = &get_next_prepare;
1310 nc->prep_cls = gnc;
1311 sqlite_next_request (nc, GNUNET_NO);
1312}
1313
1314
1315/**
1316 * Execute statement that gets a row and call the callback
1317 * with the result. Resets the statement afterwards.
1318 *
1319 * @param plugin the plugin
1320 * @param stmt the statement
1321 * @param iter iterator to call
1322 * @param iter_cls closure for 'iter'
1323 */
1324static void
1325execute_get (struct Plugin *plugin,
1326 sqlite3_stmt *stmt,
1327 PluginIterator iter, void *iter_cls)
1328{
1329 int n;
1330 struct GNUNET_TIME_Absolute expiration;
1331 unsigned long long rowid;
1332 unsigned int size;
1333 int ret;
1334
1335 n = sqlite3_step (stmt);
1336 switch (n)
1337 { 862 {
1338 case SQLITE_ROW:
1339 size = sqlite3_column_bytes (stmt, 5);
1340 rowid = sqlite3_column_int64 (stmt, 6);
1341 if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1342 {
1343 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1344 "sqlite",
1345 _("Invalid data in database. Trying to fix (by deletion).\n"));
1346 if (SQLITE_OK != sqlite3_reset (stmt))
1347 LOG_SQLITE (plugin, NULL,
1348 GNUNET_ERROR_TYPE_ERROR |
1349 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1350 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1351 plugin->env->duc (plugin->env->cls,
1352 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
1353 break;
1354 }
1355 expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1356 ret = iter (iter_cls,
1357 NULL,
1358 sqlite3_column_blob (stmt, 4) /* key */,
1359 size,
1360 sqlite3_column_blob (stmt, 5) /* data */,
1361 sqlite3_column_int (stmt, 0) /* type */,
1362 sqlite3_column_int (stmt, 1) /* priority */,
1363 sqlite3_column_int (stmt, 2) /* anonymity */,
1364 expiration,
1365 rowid);
1366 if (SQLITE_OK != sqlite3_reset (stmt))
1367 LOG_SQLITE (plugin, NULL,
1368 GNUNET_ERROR_TYPE_ERROR |
1369 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1370 if ( (GNUNET_NO == ret) &&
1371 (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1372 plugin->env->duc (plugin->env->cls,
1373 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
1374 return;
1375 case SQLITE_DONE:
1376 /* database must be empty */
1377 if (SQLITE_OK != sqlite3_reset (stmt))
1378 LOG_SQLITE (plugin, NULL,
1379 GNUNET_ERROR_TYPE_ERROR |
1380 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1381 break;
1382 case SQLITE_BUSY:
1383 case SQLITE_ERROR:
1384 case SQLITE_MISUSE:
1385 default:
1386 LOG_SQLITE (plugin, NULL, 863 LOG_SQLITE (plugin, NULL,
1387 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 864 GNUNET_ERROR_TYPE_ERROR |
1388 "sqlite3_step"); 865 GNUNET_ERROR_TYPE_BULK, "sqlite_bind");
1389 if (SQLITE_OK != sqlite3_reset (stmt)) 866 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1390 LOG_SQLITE (plugin, NULL, 867 return;
1391 GNUNET_ERROR_TYPE_ERROR |
1392 GNUNET_ERROR_TYPE_BULK,
1393 "sqlite3_reset");
1394 GNUNET_break (0);
1395 database_shutdown (plugin);
1396 database_setup (plugin->env->cfg,
1397 plugin);
1398 break;
1399 } 868 }
1400 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 869 execute_get (plugin, stmt, proc, proc_cls);
1401 GNUNET_TIME_UNIT_ZERO_ABS, 0); 870 sqlite3_finalize (stmt);
1402} 871}
1403 872
1404 873
874
1405/** 875/**
1406 * Context for 'repl_iter' function. 876 * Context for 'repl_proc' function.
1407 */ 877 */
1408struct ReplCtx 878struct ReplCtx
1409{ 879{
@@ -1416,22 +886,21 @@ struct ReplCtx
1416 /** 886 /**
1417 * Function to call for the result (or the NULL). 887 * Function to call for the result (or the NULL).
1418 */ 888 */
1419 PluginIterator iter; 889 PluginDatumProcessor proc;
1420 890
1421 /** 891 /**
1422 * Closure for iter. 892 * Closure for proc.
1423 */ 893 */
1424 void *iter_cls; 894 void *proc_cls;
1425}; 895};
1426 896
1427 897
1428/** 898/**
1429 * Wrapper for the iterator for 'sqlite_plugin_replication_get'. 899 * Wrapper for the processor for 'sqlite_plugin_replication_get'.
1430 * Decrements the replication counter and calls the original 900 * Decrements the replication counter and calls the original
1431 * iterator. 901 * processor.
1432 * 902 *
1433 * @param cls closure 903 * @param cls closure
1434 * @param next_cls closure to pass to the "next" function.
1435 * @param key key for the content 904 * @param key key for the content
1436 * @param size number of bytes in data 905 * @param size number of bytes in data
1437 * @param data content stored 906 * @param data content stored
@@ -1442,13 +911,11 @@ struct ReplCtx
1442 * @param uid unique identifier for the datum; 911 * @param uid unique identifier for the datum;
1443 * maybe 0 if no unique identifier is available 912 * maybe 0 if no unique identifier is available
1444 * 913 *
1445 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue 914 * @return GNUNET_OK for normal return,
1446 * (continue on call to "next", of course), 915 * GNUNET_NO to delete the item
1447 * GNUNET_NO to delete the item and continue (if supported)
1448 */ 916 */
1449static int 917static int
1450repl_iter (void *cls, 918repl_proc (void *cls,
1451 void *next_cls,
1452 const GNUNET_HashCode *key, 919 const GNUNET_HashCode *key,
1453 uint32_t size, 920 uint32_t size,
1454 const void *data, 921 const void *data,
@@ -1462,8 +929,8 @@ repl_iter (void *cls,
1462 struct Plugin *plugin = rc->plugin; 929 struct Plugin *plugin = rc->plugin;
1463 int ret; 930 int ret;
1464 931
1465 ret = rc->iter (rc->iter_cls, 932 ret = rc->proc (rc->proc_cls,
1466 next_cls, key, 933 key,
1467 size, data, 934 size, data,
1468 type, priority, anonymity, expiration, 935 type, priority, anonymity, expiration,
1469 uid); 936 uid);
@@ -1494,15 +961,15 @@ repl_iter (void *cls,
1494 * Get a random item for replication. Returns a single random item 961 * Get a random item for replication. Returns a single random item
1495 * from those with the highest replication counters. The item's 962 * from those with the highest replication counters. The item's
1496 * replication counter is decremented by one IF it was positive before. 963 * replication counter is decremented by one IF it was positive before.
1497 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 964 * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1498 * 965 *
1499 * @param cls closure 966 * @param cls closure
1500 * @param iter function to call the value (once only). 967 * @param proc function to call the value (once only).
1501 * @param iter_cls closure for iter 968 * @param proc_cls closure for proc
1502 */ 969 */
1503static void 970static void
1504sqlite_plugin_replication_get (void *cls, 971sqlite_plugin_get_replication (void *cls,
1505 PluginIterator iter, void *iter_cls) 972 PluginDatumProcessor proc, void *proc_cls)
1506{ 973{
1507 struct Plugin *plugin = cls; 974 struct Plugin *plugin = cls;
1508 struct ReplCtx rc; 975 struct ReplCtx rc;
@@ -1513,24 +980,24 @@ sqlite_plugin_replication_get (void *cls,
1513 "Getting random block based on replication order.\n"); 980 "Getting random block based on replication order.\n");
1514#endif 981#endif
1515 rc.plugin = plugin; 982 rc.plugin = plugin;
1516 rc.iter = iter; 983 rc.proc = proc;
1517 rc.iter_cls = iter_cls; 984 rc.proc_cls = proc_cls;
1518 execute_get (plugin, plugin->selRepl, &repl_iter, &rc); 985 execute_get (plugin, plugin->selRepl, &repl_proc, &rc);
1519} 986}
1520 987
1521 988
1522 989
1523/** 990/**
1524 * Get a random item that has expired or has low priority. 991 * Get a random item that has expired or has low priority.
1525 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 992 * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1526 * 993 *
1527 * @param cls closure 994 * @param cls closure
1528 * @param iter function to call the value (once only). 995 * @param proc function to call the value (once only).
1529 * @param iter_cls closure for iter 996 * @param proc_cls closure for proc
1530 */ 997 */
1531static void 998static void
1532sqlite_plugin_expiration_get (void *cls, 999sqlite_plugin_get_expiration (void *cls,
1533 PluginIterator iter, void *iter_cls) 1000 PluginDatumProcessor proc, void *proc_cls)
1534{ 1001{
1535 struct Plugin *plugin = cls; 1002 struct Plugin *plugin = cls;
1536 sqlite3_stmt *stmt; 1003 sqlite3_stmt *stmt;
@@ -1550,11 +1017,11 @@ sqlite_plugin_expiration_get (void *cls,
1550 if (SQLITE_OK != sqlite3_reset (stmt)) 1017 if (SQLITE_OK != sqlite3_reset (stmt))
1551 LOG_SQLITE (plugin, NULL, 1018 LOG_SQLITE (plugin, NULL,
1552 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); 1019 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1553 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 1020 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
1554 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1021 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1555 return; 1022 return;
1556 } 1023 }
1557 execute_get (plugin, stmt, iter, iter_cls); 1024 execute_get (plugin, stmt, proc, proc_cls);
1558} 1025}
1559 1026
1560 1027
@@ -1579,7 +1046,7 @@ sqlite_plugin_drop (void *cls)
1579 * @return the size of the database on disk (estimate) 1046 * @return the size of the database on disk (estimate)
1580 */ 1047 */
1581static unsigned long long 1048static unsigned long long
1582sqlite_plugin_get_size (void *cls) 1049sqlite_plugin_estimate_size (void *cls)
1583{ 1050{
1584 struct Plugin *plugin = cls; 1051 struct Plugin *plugin = cls;
1585 sqlite3_stmt *stmt; 1052 sqlite3_stmt *stmt;
@@ -1653,14 +1120,13 @@ libgnunet_plugin_datastore_sqlite_init (void *cls)
1653 } 1120 }
1654 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); 1121 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1655 api->cls = &plugin; 1122 api->cls = &plugin;
1656 api->get_size = &sqlite_plugin_get_size; 1123 api->estimate_size = &sqlite_plugin_estimate_size;
1657 api->put = &sqlite_plugin_put; 1124 api->put = &sqlite_plugin_put;
1658 api->next_request = &sqlite_next_request;
1659 api->get = &sqlite_plugin_get;
1660 api->replication_get = &sqlite_plugin_replication_get;
1661 api->expiration_get = &sqlite_plugin_expiration_get;
1662 api->update = &sqlite_plugin_update; 1125 api->update = &sqlite_plugin_update;
1663 api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity; 1126 api->get_key = &sqlite_plugin_get_key;
1127 api->get_replication = &sqlite_plugin_get_replication;
1128 api->get_expiration = &sqlite_plugin_get_expiration;
1129 api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity;
1664 api->drop = &sqlite_plugin_drop; 1130 api->drop = &sqlite_plugin_drop;
1665 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 1131 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1666 "sqlite", _("Sqlite database running\n")); 1132 "sqlite", _("Sqlite database running\n"));
@@ -1684,27 +1150,9 @@ libgnunet_plugin_datastore_sqlite_done (void *cls)
1684#if DEBUG_SQLITE 1150#if DEBUG_SQLITE
1685 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 1151 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1686 "sqlite", 1152 "sqlite",
1687 "sqlite plugin is doneing\n"); 1153 "sqlite plugin is done\n");
1688#endif 1154#endif
1689 1155
1690 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1691 {
1692#if DEBUG_SQLITE
1693 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1694 "sqlite",
1695 "Canceling next task\n");
1696#endif
1697 GNUNET_SCHEDULER_cancel (plugin->next_task);
1698 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1699#if DEBUG_SQLITE
1700 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1701 "sqlite",
1702 "Prep'ing next task\n");
1703#endif
1704 plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1705 GNUNET_free (plugin->next_task_nc);
1706 plugin->next_task_nc = NULL;
1707 }
1708 fn = NULL; 1156 fn = NULL;
1709 if (plugin->drop_on_shutdown) 1157 if (plugin->drop_on_shutdown)
1710 fn = GNUNET_strdup (plugin->fn); 1158 fn = GNUNET_strdup (plugin->fn);