aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-15 13:08:23 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-15 13:08:23 +0000
commit37a05b2c7724de81f963d45a6bf929608ee00bd5 (patch)
tree58b40b99d1ee97a19fe47c9590b352473736e17d /src/datastore
parenteed12834ada93e5b4218774148534d537a175a89 (diff)
downloadgnunet-37a05b2c7724de81f963d45a6bf929608ee00bd5.tar.gz
gnunet-37a05b2c7724de81f963d45a6bf929608ee00bd5.zip
fixes
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/plugin_datastore_mysql.c434
-rw-r--r--src/datastore/plugin_datastore_postgres.c611
2 files changed, 555 insertions, 490 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index 1658aa51a..deef46af0 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -208,6 +208,8 @@ struct NextRequestClosure
208 unsigned int count; 208 unsigned int count;
209 209
210 int end_it; 210 int end_it;
211
212 int one_shot;
211}; 213};
212 214
213 215
@@ -284,9 +286,12 @@ struct Plugin
284#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?" 286#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
285 struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; 287 struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
286 288
287#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1" 289#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
288 struct GNUNET_MysqlStatementHandle *update_entry; 290 struct GNUNET_MysqlStatementHandle *update_entry;
289 291
292#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
293 struct GNUNET_MysqlStatementHandle *dec_repl;
294
290#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" 295#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
291 struct GNUNET_MysqlStatementHandle *get_size; 296 struct GNUNET_MysqlStatementHandle *get_size;
292 297
@@ -866,144 +871,6 @@ return_ok (void *cls,
866 871
867 872
868/** 873/**
869 * Continuation of "mysql_next_request".
870 *
871 * @param next_cls the next context
872 * @param tc the task context (unused)
873 */
874static void
875mysql_next_request_cont (void *next_cls,
876 const struct GNUNET_SCHEDULER_TaskContext *tc)
877{
878 struct NextRequestClosure *nrc = next_cls;
879 struct Plugin *plugin;
880 int ret;
881 unsigned int type;
882 unsigned int priority;
883 unsigned int anonymity;
884 unsigned long long exp;
885 unsigned long hashSize;
886 unsigned long size;
887 unsigned long long uid;
888 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
889 GNUNET_HashCode key;
890 struct GNUNET_TIME_Absolute expiration;
891 MYSQL_BIND *rbind = nrc->rbind;
892
893 plugin = nrc->plugin;
894 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
895 plugin->next_task_nc = NULL;
896
897 if (GNUNET_YES == nrc->end_it)
898 goto END_SET;
899 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
900 nrc->now = GNUNET_TIME_absolute_get ();
901 hashSize = sizeof (GNUNET_HashCode);
902 memset (nrc->rbind, 0, sizeof (nrc->rbind));
903 rbind = nrc->rbind;
904 rbind[0].buffer_type = MYSQL_TYPE_LONG;
905 rbind[0].buffer = &type;
906 rbind[0].is_unsigned = 1;
907 rbind[1].buffer_type = MYSQL_TYPE_LONG;
908 rbind[1].buffer = &priority;
909 rbind[1].is_unsigned = 1;
910 rbind[2].buffer_type = MYSQL_TYPE_LONG;
911 rbind[2].buffer = &anonymity;
912 rbind[2].is_unsigned = 1;
913 rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
914 rbind[3].buffer = &exp;
915 rbind[3].is_unsigned = 1;
916 rbind[4].buffer_type = MYSQL_TYPE_BLOB;
917 rbind[4].buffer = &key;
918 rbind[4].buffer_length = hashSize;
919 rbind[4].length = &hashSize;
920 rbind[5].buffer_type = MYSQL_TYPE_BLOB;
921 rbind[5].buffer = value;
922 rbind[5].buffer_length = size = sizeof (value);
923 rbind[5].length = &size;
924 rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
925 rbind[6].buffer = &uid;
926 rbind[6].is_unsigned = 1;
927
928 if (GNUNET_OK != nrc->prep (nrc->prep_cls,
929 nrc))
930 goto END_SET;
931 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
932 GNUNET_assert (size <= sizeof(value));
933 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
934 (hashSize != sizeof (GNUNET_HashCode)) )
935 {
936 GNUNET_break (0);
937 goto END_SET;
938 }
939#if DEBUG_MYSQL
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
942 (unsigned int) size,
943 GNUNET_h2s (&key),
944 priority,
945 anonymity,
946 exp);
947#endif
948 expiration.abs_value = exp;
949 ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc,
950 &key,
951 size, value,
952 type, priority, anonymity, expiration,
953 uid);
954 if (ret == GNUNET_SYSERR)
955 {
956 nrc->end_it = GNUNET_YES;
957 return;
958 }
959 if (ret == GNUNET_NO)
960 {
961 do_delete_entry (plugin, uid);
962 if (size != 0)
963 plugin->env->duc (plugin->env->cls,
964 - size);
965 }
966 return;
967 END_SET:
968 /* call dviter with "end of set" */
969 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
970 nrc->dviter (nrc->dviter_cls,
971 NULL, NULL, 0, NULL, 0, 0, 0,
972 GNUNET_TIME_UNIT_ZERO_ABS, 0);
973 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
974 nrc->prep (nrc->prep_cls, NULL);
975 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
976 GNUNET_free (nrc);
977}
978
979
980/**
981 * Function invoked on behalf of a "PluginIterator"
982 * asking the database plugin to call the iterator
983 * with the next item.
984 *
985 * @param next_cls whatever argument was given
986 * to the PluginIterator as "next_cls".
987 * @param end_it set to GNUNET_YES if we
988 * should terminate the iteration early
989 * (iterator should be still called once more
990 * to signal the end of the iteration).
991 */
992static void
993mysql_plugin_next_request (void *next_cls,
994 int end_it)
995{
996 struct NextRequestClosure *nrc = next_cls;
997
998 if (GNUNET_YES == end_it)
999 nrc->end_it = GNUNET_YES;
1000 nrc->plugin->next_task_nc = nrc;
1001 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1002 nrc);
1003}
1004
1005
1006/**
1007 * Get an estimate of how much space the database is 874 * Get an estimate of how much space the database is
1008 * currently using. 875 * currently using.
1009 * 876 *
@@ -1167,6 +1034,152 @@ mysql_plugin_update (void *cls,
1167} 1034}
1168 1035
1169 1036
1037
1038
1039/**
1040 * Continuation of "mysql_next_request".
1041 *
1042 * @param next_cls the next context
1043 * @param tc the task context (unused)
1044 */
1045static void
1046mysql_next_request_cont (void *next_cls,
1047 const struct GNUNET_SCHEDULER_TaskContext *tc)
1048{
1049 struct NextRequestClosure *nrc = next_cls;
1050 struct Plugin *plugin;
1051 int ret;
1052 unsigned int type;
1053 unsigned int priority;
1054 unsigned int anonymity;
1055 unsigned long long exp;
1056 unsigned long hashSize;
1057 unsigned long size;
1058 unsigned long long uid;
1059 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1060 GNUNET_HashCode key;
1061 struct GNUNET_TIME_Absolute expiration;
1062 MYSQL_BIND *rbind = nrc->rbind;
1063
1064 plugin = nrc->plugin;
1065 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1066 plugin->next_task_nc = NULL;
1067
1068 if (GNUNET_YES == nrc->end_it)
1069 goto END_SET;
1070 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1071 nrc->now = GNUNET_TIME_absolute_get ();
1072 hashSize = sizeof (GNUNET_HashCode);
1073 memset (nrc->rbind, 0, sizeof (nrc->rbind));
1074 rbind = nrc->rbind;
1075 rbind[0].buffer_type = MYSQL_TYPE_LONG;
1076 rbind[0].buffer = &type;
1077 rbind[0].is_unsigned = 1;
1078 rbind[1].buffer_type = MYSQL_TYPE_LONG;
1079 rbind[1].buffer = &priority;
1080 rbind[1].is_unsigned = 1;
1081 rbind[2].buffer_type = MYSQL_TYPE_LONG;
1082 rbind[2].buffer = &anonymity;
1083 rbind[2].is_unsigned = 1;
1084 rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1085 rbind[3].buffer = &exp;
1086 rbind[3].is_unsigned = 1;
1087 rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1088 rbind[4].buffer = &key;
1089 rbind[4].buffer_length = hashSize;
1090 rbind[4].length = &hashSize;
1091 rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1092 rbind[5].buffer = value;
1093 rbind[5].buffer_length = size = sizeof (value);
1094 rbind[5].length = &size;
1095 rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1096 rbind[6].buffer = &uid;
1097 rbind[6].is_unsigned = 1;
1098
1099 if (GNUNET_OK != nrc->prep (nrc->prep_cls,
1100 nrc))
1101 goto END_SET;
1102 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1103 GNUNET_assert (size <= sizeof(value));
1104 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1105 (hashSize != sizeof (GNUNET_HashCode)) )
1106 {
1107 GNUNET_break (0);
1108 goto END_SET;
1109 }
1110#if DEBUG_MYSQL
1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112 "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1113 (unsigned int) size,
1114 GNUNET_h2s (&key),
1115 priority,
1116 anonymity,
1117 exp);
1118#endif
1119 expiration.abs_value = exp;
1120 ret = nrc->dviter (nrc->dviter_cls,
1121 (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
1122 &key,
1123 size, value,
1124 type, priority, anonymity, expiration,
1125 uid);
1126 if (ret == GNUNET_SYSERR)
1127 {
1128 nrc->end_it = GNUNET_YES;
1129 return;
1130 }
1131 if (ret == GNUNET_NO)
1132 {
1133 do_delete_entry (plugin, uid);
1134 if (size != 0)
1135 plugin->env->duc (plugin->env->cls,
1136 - size);
1137 }
1138 if (nrc->one_shot == GNUNET_YES)
1139 GNUNET_free (nrc);
1140 return;
1141 END_SET:
1142 /* call dviter with "end of set" */
1143 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1144 nrc->dviter (nrc->dviter_cls,
1145 NULL, NULL, 0, NULL, 0, 0, 0,
1146 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1147 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1148 nrc->prep (nrc->prep_cls, NULL);
1149 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1150 GNUNET_free (nrc);
1151}
1152
1153
1154/**
1155 * Function invoked on behalf of a "PluginIterator"
1156 * asking the database plugin to call the iterator
1157 * with the next item.
1158 *
1159 * @param next_cls whatever argument was given
1160 * to the PluginIterator as "next_cls".
1161 * @param end_it set to GNUNET_YES if we
1162 * should terminate the iteration early
1163 * (iterator should be still called once more
1164 * to signal the end of the iteration).
1165 */
1166static void
1167mysql_plugin_next_request (void *next_cls,
1168 int end_it)
1169{
1170 struct NextRequestClosure *nrc = next_cls;
1171
1172 if (GNUNET_YES == end_it)
1173 nrc->end_it = GNUNET_YES;
1174 nrc->plugin->next_task_nc = nrc;
1175 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1176 nrc);
1177}
1178
1179
1180/**
1181 * Context for 'get_statement_prepare'.
1182 */
1170struct GetContext 1183struct GetContext
1171{ 1184{
1172 GNUNET_HashCode key; 1185 GNUNET_HashCode key;
@@ -1466,7 +1479,6 @@ replication_prepare (void *cls,
1466{ 1479{
1467 struct Plugin *plugin = cls; 1480 struct Plugin *plugin = cls;
1468 1481
1469 nrc->end_it = GNUNET_YES;
1470 return prepared_statement_run_select (plugin, 1482 return prepared_statement_run_select (plugin,
1471 plugin->select_replication, 1483 plugin->select_replication,
1472 7, nrc->rbind, 1484 7, nrc->rbind,
@@ -1475,6 +1487,92 @@ replication_prepare (void *cls,
1475} 1487}
1476 1488
1477 1489
1490
1491/**
1492 * Context for 'repl_iter' function.
1493 */
1494struct ReplCtx
1495{
1496
1497 /**
1498 * Plugin handle.
1499 */
1500 struct Plugin *plugin;
1501
1502 /**
1503 * Function to call for the result (or the NULL).
1504 */
1505 PluginIterator iter;
1506
1507 /**
1508 * Closure for iter.
1509 */
1510 void *iter_cls;
1511};
1512
1513
1514/**
1515 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1516 * Decrements the replication counter and calls the original
1517 * iterator.
1518 *
1519 * @param cls closure
1520 * @param next_cls closure to pass to the "next" function.
1521 * @param key key for the content
1522 * @param size number of bytes in data
1523 * @param data content stored
1524 * @param type type of the content
1525 * @param priority priority of the content
1526 * @param anonymity anonymity-level for the content
1527 * @param expiration expiration time for the content
1528 * @param uid unique identifier for the datum;
1529 * maybe 0 if no unique identifier is available
1530 *
1531 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1532 * (continue on call to "next", of course),
1533 * GNUNET_NO to delete the item and continue (if supported)
1534 */
1535static int
1536repl_iter (void *cls,
1537 void *next_cls,
1538 const GNUNET_HashCode *key,
1539 uint32_t size,
1540 const void *data,
1541 enum GNUNET_BLOCK_Type type,
1542 uint32_t priority,
1543 uint32_t anonymity,
1544 struct GNUNET_TIME_Absolute expiration,
1545 uint64_t uid)
1546{
1547 struct ReplCtx *rc = cls;
1548 struct Plugin *plugin = rc->plugin;
1549 unsigned long long oid;
1550 int ret;
1551
1552 ret = rc->iter (rc->iter_cls,
1553 next_cls, key,
1554 size, data,
1555 type, priority, anonymity, expiration,
1556 uid);
1557 if (NULL != key)
1558 {
1559 oid = (unsigned long long) uid;
1560 ret = prepared_statement_run (plugin,
1561 plugin->dec_repl,
1562 NULL,
1563 MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
1564 -1);
1565 if (ret == GNUNET_SYSERR)
1566 {
1567 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1568 "Failed to reduce replication counter\n");
1569 return GNUNET_SYSERR;
1570 }
1571 }
1572 return ret;
1573}
1574
1575
1478/** 1576/**
1479 * Get a random item for replication. Returns a single, not expired, random item 1577 * Get a random item for replication. Returns a single, not expired, random item
1480 * from those with the highest replication counters. The item's 1578 * from those with the highest replication counters. The item's
@@ -1490,18 +1588,23 @@ mysql_plugin_replication_get (void *cls,
1490 PluginIterator iter, void *iter_cls) 1588 PluginIterator iter, void *iter_cls)
1491{ 1589{
1492 struct Plugin *plugin = cls; 1590 struct Plugin *plugin = cls;
1493 struct NextRequestClosure nrc; 1591 struct NextRequestClosure *nrc;
1494 1592 struct ReplCtx rc;
1495 memset (&nrc, 0, sizeof (nrc)); 1593
1496 nrc.plugin = plugin; 1594 rc.plugin = plugin;
1497 nrc.now = GNUNET_TIME_absolute_get (); 1595 rc.iter = iter;
1498 nrc.prep = &replication_prepare; 1596 rc.iter_cls = iter_cls;
1499 nrc.prep_cls = plugin; 1597 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1500 nrc.type = 0; 1598 nrc->plugin = plugin;
1501 nrc.dviter = iter; 1599 nrc->now = GNUNET_TIME_absolute_get ();
1502 nrc.dviter_cls = iter_cls; 1600 nrc->prep = &replication_prepare;
1503 nrc.end_it = GNUNET_NO; 1601 nrc->prep_cls = plugin;
1504 mysql_next_request_cont (&nrc, NULL); 1602 nrc->type = 0;
1603 nrc->dviter = &repl_iter;
1604 nrc->dviter_cls = &rc;
1605 nrc->end_it = GNUNET_NO;
1606 nrc->one_shot = GNUNET_YES;
1607 mysql_next_request_cont (nrc, NULL);
1505} 1608}
1506 1609
1507 1610
@@ -1522,7 +1625,6 @@ expiration_prepare (void *cls,
1522 1625
1523 if (NULL == nrc) 1626 if (NULL == nrc)
1524 return GNUNET_NO; 1627 return GNUNET_NO;
1525 nrc->end_it = GNUNET_YES;
1526 nt = (long long) nrc->now.abs_value; 1628 nt = (long long) nrc->now.abs_value;
1527 return prepared_statement_run_select 1629 return prepared_statement_run_select
1528 (plugin, 1630 (plugin,
@@ -1547,18 +1649,19 @@ mysql_plugin_expiration_get (void *cls,
1547 PluginIterator iter, void *iter_cls) 1649 PluginIterator iter, void *iter_cls)
1548{ 1650{
1549 struct Plugin *plugin = cls; 1651 struct Plugin *plugin = cls;
1550 struct NextRequestClosure nrc; 1652 struct NextRequestClosure *nrc;
1551 1653
1552 memset (&nrc, 0, sizeof (nrc)); 1654 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1553 nrc.plugin = plugin; 1655 nrc->plugin = plugin;
1554 nrc.now = GNUNET_TIME_absolute_get (); 1656 nrc->now = GNUNET_TIME_absolute_get ();
1555 nrc.prep = &expiration_prepare; 1657 nrc->prep = &expiration_prepare;
1556 nrc.prep_cls = plugin; 1658 nrc->prep_cls = plugin;
1557 nrc.type = 0; 1659 nrc->type = 0;
1558 nrc.dviter = iter; 1660 nrc->dviter = iter;
1559 nrc.dviter_cls = iter_cls; 1661 nrc->dviter_cls = iter_cls;
1560 nrc.end_it = GNUNET_NO; 1662 nrc->end_it = GNUNET_NO;
1561 mysql_next_request_cont (&nrc, NULL); 1663 nrc->one_shot = GNUNET_YES;
1664 mysql_next_request_cont (nrc, NULL);
1562} 1665}
1563 1666
1564 1667
@@ -1639,6 +1742,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1639 || PINIT (plugin->count_entry_by_hash_vhash_and_type, 1742 || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1640 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) 1743 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1641 || PINIT (plugin->update_entry, UPDATE_ENTRY) 1744 || PINIT (plugin->update_entry, UPDATE_ENTRY)
1745 || PINIT (plugin->dec_repl, DEC_REPL)
1642 || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 1746 || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS)
1643 || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 1747 || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION)
1644 || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ) 1748 || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 1ff56da31..2cecfa9a1 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -30,45 +30,6 @@
30 30
31#define DEBUG_POSTGRES GNUNET_NO 31#define DEBUG_POSTGRES GNUNET_NO
32 32
33#define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36 "UNION "\
37 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42#define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45 "UNION "\
46 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51#define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54 "UNION "\
55 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61#define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65 "UNION "\
66 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72/** 33/**
73 * After how many ms "busy" should a DB operation fail for good? 34 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests 35 * A low value makes sure that we are more responsive to requests
@@ -140,7 +101,7 @@ struct NextRequestClosure
140 /** 101 /**
141 * Number of entries found so far 102 * Number of entries found so far
142 */ 103 */
143 long long count; 104 unsigned long long count;
144 105
145 /** 106 /**
146 * Offset this iteration starts at. 107 * Offset this iteration starts at.
@@ -153,24 +114,14 @@ struct NextRequestClosure
153 uint64_t blimit_off; 114 uint64_t blimit_off;
154 115
155 /** 116 /**
156 * Overall number of matching entries. 117 * Current total number of entries found so far, big-endian.
157 */
158 unsigned long long total;
159
160 /**
161 * Expiration value of previous result (possible parameter), big-endian.
162 */ 118 */
163 uint64_t blast_expire; 119 uint64_t bcount;
164 120
165 /** 121 /**
166 * Row ID of last result (possible paramter), big-endian. 122 * Overall number of matching entries.
167 */
168 uint32_t blast_rowid;
169
170 /**
171 * Priority of last result (possible parameter), big-endian.
172 */ 123 */
173 uint32_t blast_prio; 124 unsigned long long total;
174 125
175 /** 126 /**
176 * Type of block (possible paramter), big-endian. 127 * Type of block (possible paramter), big-endian.
@@ -181,6 +132,11 @@ struct NextRequestClosure
181 * Flag set to GNUNET_YES to stop iteration. 132 * Flag set to GNUNET_YES to stop iteration.
182 */ 133 */
183 int end_it; 134 int end_it;
135
136 /**
137 * Flag to indicate that there should only be one result.
138 */
139 int one_shot;
184}; 140};
185 141
186 142
@@ -336,6 +292,7 @@ init_connection (struct Plugin *plugin)
336 GNUNET_free_non_null (conninfo); 292 GNUNET_free_non_null (conninfo);
337 ret = PQexec (plugin->dbh, 293 ret = PQexec (plugin->dbh,
338 "CREATE TABLE gn090 (" 294 "CREATE TABLE gn090 ("
295 " repl INTEGER NOT NULL DEFAULT 0,"
339 " type INTEGER NOT NULL DEFAULT 0," 296 " type INTEGER NOT NULL DEFAULT 0,"
340 " prio INTEGER NOT NULL DEFAULT 0," 297 " prio INTEGER NOT NULL DEFAULT 0,"
341 " anonLevel INTEGER NOT NULL DEFAULT 0," 298 " anonLevel INTEGER NOT NULL DEFAULT 0,"
@@ -385,7 +342,6 @@ init_connection (struct Plugin *plugin)
385 } 342 }
386 } 343 }
387 PQclear (ret); 344 PQclear (ret);
388#if 1
389 ret = PQexec (plugin->dbh, 345 ret = PQexec (plugin->dbh,
390 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"); 346 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
391 if (GNUNET_OK != 347 if (GNUNET_OK !=
@@ -421,44 +377,43 @@ init_connection (struct Plugin *plugin)
421 return GNUNET_SYSERR; 377 return GNUNET_SYSERR;
422 } 378 }
423 PQclear (ret); 379 PQclear (ret);
424#endif
425 if ((GNUNET_OK != 380 if ((GNUNET_OK !=
426 pq_prepare (plugin, 381 pq_prepare (plugin,
427 "getvt", 382 "getvt",
428 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 383 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
429 "WHERE hash=$1 AND vhash=$2 AND type=$3 " 384 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
430 "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5", 385 "ORDER BY oid ASC LIMIT 1 OFFSET $4",
431 5, 386 4,
432 __LINE__)) || 387 __LINE__)) ||
433 (GNUNET_OK != 388 (GNUNET_OK !=
434 pq_prepare (plugin, 389 pq_prepare (plugin,
435 "gett", 390 "gett",
436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 391 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437 "WHERE hash=$1 AND type=$2" 392 "WHERE hash=$1 AND type=$2 "
438 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", 393 "ORDER BY oid ASC LIMIT 1 OFFSET $3",
439 4, 394 3,
440 __LINE__)) || 395 __LINE__)) ||
441 (GNUNET_OK != 396 (GNUNET_OK !=
442 pq_prepare (plugin, 397 pq_prepare (plugin,
443 "getv", 398 "getv",
444 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 399 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
445 "WHERE hash=$1 AND vhash=$2" 400 "WHERE hash=$1 AND vhash=$2 "
446 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", 401 "ORDER BY oid ASC LIMIT 1 OFFSET $3",
447 4, 402 3,
448 __LINE__)) || 403 __LINE__)) ||
449 (GNUNET_OK != 404 (GNUNET_OK !=
450 pq_prepare (plugin, 405 pq_prepare (plugin,
451 "get", 406 "get",
452 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 407 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
453 "WHERE hash=$1" 408 "WHERE hash=$1 "
454 "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3", 409 "ORDER BY oid ASC LIMIT 1 OFFSET $2",
455 3, 410 2,
456 __LINE__)) || 411 __LINE__)) ||
457 (GNUNET_OK != 412 (GNUNET_OK !=
458 pq_prepare (plugin, 413 pq_prepare (plugin,
459 "put", 414 "put",
460 "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) " 415 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
461 "VALUES ($1, $2, $3, $4, $5, $6, $7)", 416 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
462 8, 417 8,
463 __LINE__)) || 418 __LINE__)) ||
464 (GNUNET_OK != 419 (GNUNET_OK !=
@@ -470,32 +425,42 @@ init_connection (struct Plugin *plugin)
470 __LINE__)) || 425 __LINE__)) ||
471 (GNUNET_OK != 426 (GNUNET_OK !=
472 pq_prepare (plugin, 427 pq_prepare (plugin,
473 "select_low_priority", 428 "decrepl",
474 SELECT_IT_LOW_PRIORITY, 429 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
475 2, 430 "WHERE oid = $1",
431 1,
476 __LINE__)) || 432 __LINE__)) ||
477 (GNUNET_OK != 433 (GNUNET_OK !=
478 pq_prepare (plugin, 434 pq_prepare (plugin,
479 "select_non_anonymous", 435 "select_non_anonymous",
480 SELECT_IT_NON_ANONYMOUS, 436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
481 2, 437 "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
438 1,
482 __LINE__)) || 439 __LINE__)) ||
483 (GNUNET_OK != 440 (GNUNET_OK !=
484 pq_prepare (plugin, 441 pq_prepare (plugin,
485 "select_expiration_time", 442 "select_expiration_order",
486 SELECT_IT_EXPIRATION_TIME, 443 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
487 2, 444 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
445 "UNION "
446 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
447 "ORDER BY prio ASC LIMIT 1) "
448 "ORDER BY expire ASC LIMIT 1",
449 1,
488 __LINE__)) || 450 __LINE__)) ||
489 (GNUNET_OK != 451 (GNUNET_OK !=
490 pq_prepare (plugin, 452 pq_prepare (plugin,
491 "select_migration_order", 453 "select_replication_order",
492 SELECT_IT_MIGRATION_ORDER, 454 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
493 3, 455 "ORDER BY repl DESC,RANDOM() LIMIT 1",
456 0,
494 __LINE__)) || 457 __LINE__)) ||
495 (GNUNET_OK != 458 (GNUNET_OK !=
496 pq_prepare (plugin, 459 pq_prepare (plugin,
497 "delrow", 460 "delrow",
498 "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__))) 461 "DELETE FROM gn090 " "WHERE oid=$1",
462 1,
463 __LINE__)))
499 { 464 {
500 PQfinish (plugin->dbh); 465 PQfinish (plugin->dbh);
501 plugin->dbh = NULL; 466 plugin->dbh = NULL;
@@ -610,8 +575,10 @@ postgres_plugin_put (void *cls,
610 uint32_t btype = htonl (type); 575 uint32_t btype = htonl (type);
611 uint32_t bprio = htonl (priority); 576 uint32_t bprio = htonl (priority);
612 uint32_t banon = htonl (anonymity); 577 uint32_t banon = htonl (anonymity);
578 uint32_t brepl = htonl (replication);
613 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__; 579 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
614 const char *paramValues[] = { 580 const char *paramValues[] = {
581 (const char *) &brepl,
615 (const char *) &btype, 582 (const char *) &btype,
616 (const char *) &bprio, 583 (const char *) &bprio,
617 (const char *) &banon, 584 (const char *) &banon,
@@ -621,6 +588,7 @@ postgres_plugin_put (void *cls,
621 (const char *) data 588 (const char *) data
622 }; 589 };
623 int paramLengths[] = { 590 int paramLengths[] = {
591 sizeof (brepl),
624 sizeof (btype), 592 sizeof (btype),
625 sizeof (bprio), 593 sizeof (bprio),
626 sizeof (banon), 594 sizeof (banon),
@@ -629,11 +597,11 @@ postgres_plugin_put (void *cls,
629 sizeof (GNUNET_HashCode), 597 sizeof (GNUNET_HashCode),
630 size 598 size
631 }; 599 };
632 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 }; 600 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
633 601
634 GNUNET_CRYPTO_hash (data, size, &vhash); 602 GNUNET_CRYPTO_hash (data, size, &vhash);
635 ret = PQexecPrepared (plugin->dbh, 603 ret = PQexecPrepared (plugin->dbh,
636 "put", 7, paramValues, paramLengths, paramFormats, 1); 604 "put", 8, paramValues, paramLengths, paramFormats, 1);
637 if (GNUNET_OK != check_result (plugin, ret, 605 if (GNUNET_OK != check_result (plugin, ret,
638 PGRES_COMMAND_OK, 606 PGRES_COMMAND_OK,
639 "PQexecPrepared", "put", __LINE__)) 607 "PQexecPrepared", "put", __LINE__))
@@ -649,6 +617,7 @@ postgres_plugin_put (void *cls,
649 return GNUNET_OK; 617 return GNUNET_OK;
650} 618}
651 619
620
652/** 621/**
653 * Function invoked on behalf of a "PluginIterator" 622 * Function invoked on behalf of a "PluginIterator"
654 * asking the database plugin to call the iterator 623 * asking the database plugin to call the iterator
@@ -690,15 +659,11 @@ postgres_next_request_cont (void *next_cls,
690 GNUNET_TIME_UNIT_ZERO_ABS, 0); 659 GNUNET_TIME_UNIT_ZERO_ABS, 0);
691 GNUNET_free (nrc); 660 GNUNET_free (nrc);
692 return; 661 return;
693 } 662 }
694 663 if (nrc->off == nrc->total)
695 if (nrc->count == 0) 664 nrc->off = 0;
696 nrc->blimit_off = GNUNET_htonll (nrc->off); 665 nrc->blimit_off = GNUNET_htonll (nrc->off);
697 else 666 nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
698 nrc->blimit_off = GNUNET_htonll (0);
699 if (nrc->count + nrc->off == nrc->total)
700 nrc->blast_rowid = htonl (0); /* back to start */
701
702 res = PQexecPrepared (plugin->dbh, 667 res = PQexecPrepared (plugin->dbh,
703 nrc->pname, 668 nrc->pname,
704 nrc->nparams, 669 nrc->nparams,
@@ -773,14 +738,10 @@ postgres_next_request_cont (void *next_cls,
773 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1)); 738 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
774 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2)); 739 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
775 expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3)); 740 expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
776 memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode)); 741 memcpy (&key,
742 PQgetvalue (res, 0, 4),
743 sizeof (GNUNET_HashCode));
777 size = PQgetlength (res, 0, 5); 744 size = PQgetlength (res, 0, 5);
778
779 nrc->blast_prio = htonl (priority);
780 nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
781 nrc->blast_rowid = htonl (rowid);
782 nrc->count++;
783
784#if DEBUG_POSTGRES 745#if DEBUG_POSTGRES
785 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 746 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
786 "datastore-postgres", 747 "datastore-postgres",
@@ -789,7 +750,7 @@ postgres_next_request_cont (void *next_cls,
789 (unsigned int) type); 750 (unsigned int) type);
790#endif 751#endif
791 iret = nrc->iter (nrc->iter_cls, 752 iret = nrc->iter (nrc->iter_cls,
792 nrc, 753 (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
793 &key, 754 &key,
794 size, 755 size,
795 PQgetvalue (res, 0, 5), 756 PQgetvalue (res, 0, 5),
@@ -799,6 +760,11 @@ postgres_next_request_cont (void *next_cls,
799 expiration_time, 760 expiration_time,
800 rowid); 761 rowid);
801 PQclear (res); 762 PQclear (res);
763 if (iret != GNUNET_NO)
764 {
765 nrc->count++;
766 nrc->off++;
767 }
802 if (iret == GNUNET_SYSERR) 768 if (iret == GNUNET_SYSERR)
803 { 769 {
804#if DEBUG_POSTGRES 770#if DEBUG_POSTGRES
@@ -828,6 +794,8 @@ postgres_next_request_cont (void *next_cls,
828#endif 794#endif
829 } 795 }
830 } 796 }
797 if (nrc->one_shot == GNUNET_YES)
798 GNUNET_free (nrc);
831} 799}
832 800
833 801
@@ -858,183 +826,6 @@ postgres_plugin_next_request (void *next_cls,
858 826
859 827
860/** 828/**
861 * Update the priority for a particular key in the datastore. If
862 * the expiration time in value is different than the time found in
863 * the datastore, the higher value should be kept. For the
864 * anonymity level, the lower value is to be used. The specified
865 * priority should be added to the existing priority, ignoring the
866 * priority in value.
867 *
868 * Note that it is possible for multiple values to match this put.
869 * In that case, all of the respective values are updated.
870 *
871 * @param cls our "struct Plugin*"
872 * @param uid unique identifier of the datum
873 * @param delta by how much should the priority
874 * change? If priority + delta < 0 the
875 * priority should be set to 0 (never go
876 * negative).
877 * @param expire new expiration time should be the
878 * MAX of any existing expiration time and
879 * this value
880 * @param msg set to error message
881 * @return GNUNET_OK on success
882 */
883static int
884postgres_plugin_update (void *cls,
885 uint64_t uid,
886 int delta, struct GNUNET_TIME_Absolute expire,
887 char **msg)
888{
889 struct Plugin *plugin = cls;
890 PGresult *ret;
891 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
892 uint32_t boid = htonl ( (uint32_t) uid);
893 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
894 const char *paramValues[] = {
895 (const char *) &bdelta,
896 (const char *) &bexpire,
897 (const char *) &boid,
898 };
899 int paramLengths[] = {
900 sizeof (bdelta),
901 sizeof (bexpire),
902 sizeof (boid),
903 };
904 const int paramFormats[] = { 1, 1, 1 };
905
906 ret = PQexecPrepared (plugin->dbh,
907 "update",
908 3, paramValues, paramLengths, paramFormats, 1);
909 if (GNUNET_OK != check_result (plugin,
910 ret,
911 PGRES_COMMAND_OK,
912 "PQexecPrepared", "update", __LINE__))
913 return GNUNET_SYSERR;
914 PQclear (ret);
915 return GNUNET_OK;
916}
917
918
919/**
920 * Call a method for each key in the database and
921 * call the callback method on it.
922 *
923 * @param plugin global context
924 * @param type entries of which type should be considered?
925 * @param is_asc ascending or descending iteration?
926 * @param iter_select which SELECT method should be used?
927 * @param iter maybe NULL (to just count); iter
928 * should return GNUNET_SYSERR to abort the
929 * iteration, GNUNET_NO to delete the entry and
930 * continue and GNUNET_OK to continue iterating
931 * @param iter_cls closure for 'iter'
932 */
933static void
934postgres_iterate (struct Plugin *plugin,
935 unsigned int type,
936 int is_asc,
937 unsigned int iter_select,
938 PluginIterator iter, void *iter_cls)
939{
940 struct NextRequestClosure *nrc;
941
942 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
943 nrc->count = UINT32_MAX;
944 nrc->plugin = plugin;
945 nrc->iter = iter;
946 nrc->iter_cls = iter_cls;
947 if (is_asc)
948 {
949 nrc->blast_prio = htonl (0);
950 nrc->blast_rowid = htonl (0);
951 nrc->blast_expire = htonl (0);
952 }
953 else
954 {
955 nrc->blast_prio = htonl (0x7FFFFFFFL);
956 nrc->blast_rowid = htonl (0xFFFFFFFF);
957 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
958 }
959 switch (iter_select)
960 {
961 case 0:
962 nrc->pname = "select_low_priority";
963 nrc->nparams = 2;
964 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
965 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
966 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
967 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
968 break;
969 case 1:
970 nrc->pname = "select_non_anonymous";
971 nrc->nparams = 2;
972 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
973 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
974 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
975 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
976 break;
977 case 2:
978 nrc->pname = "select_expiration_time";
979 nrc->nparams = 2;
980 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
981 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
982 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
983 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
984 break;
985 case 3:
986 nrc->pname = "select_migration_order";
987 nrc->nparams = 3;
988 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
989 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
990 nrc->paramValues[2] = (const char *) &nrc->bnow;
991 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
992 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
993 nrc->paramLengths[2] = sizeof (nrc->bnow);
994 break;
995 default:
996 GNUNET_break (0);
997 iter (iter_cls,
998 NULL, NULL, 0, NULL, 0, 0, 0,
999 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1000 GNUNET_free (nrc);
1001 return;
1002 }
1003 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
1004 postgres_plugin_next_request (nrc,
1005 GNUNET_NO);
1006}
1007
1008
1009/**
1010 * Select a subset of the items in the datastore and call
1011 * the given iterator for each of them.
1012 *
1013 * @param cls our "struct Plugin*"
1014 * @param type entries of which type should be considered?
1015 * Use 0 for any type.
1016 * @param iter function to call on each matching value;
1017 * will be called once with a NULL value at the end
1018 * @param iter_cls closure for iter
1019 */
1020static void
1021postgres_plugin_iter_low_priority (void *cls,
1022 enum GNUNET_BLOCK_Type type,
1023 PluginIterator iter,
1024 void *iter_cls)
1025{
1026 struct Plugin *plugin = cls;
1027
1028 postgres_iterate (plugin,
1029 type,
1030 GNUNET_YES, 0,
1031 iter, iter_cls);
1032}
1033
1034
1035
1036
1037/**
1038 * Iterate over the results for a particular key 829 * Iterate over the results for a particular key
1039 * in the datastore. 830 * in the datastore.
1040 * 831 *
@@ -1063,12 +854,7 @@ postgres_plugin_get (void *cls,
1063 const int paramFormats[] = { 1, 1, 1, 1, 1 }; 854 const int paramFormats[] = { 1, 1, 1, 1, 1 };
1064 PGresult *ret; 855 PGresult *ret;
1065 856
1066 if (key == NULL) 857 GNUNET_assert (key != NULL);
1067 {
1068 postgres_plugin_iter_low_priority (plugin, type,
1069 iter, iter_cls);
1070 return;
1071 }
1072 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 858 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1073 nrc->plugin = plugin; 859 nrc->plugin = plugin;
1074 nrc->iter = iter; 860 nrc->iter = iter;
@@ -1087,11 +873,9 @@ postgres_plugin_get (void *cls,
1087 nrc->paramLengths[1] = sizeof (nrc->vhash); 873 nrc->paramLengths[1] = sizeof (nrc->vhash);
1088 nrc->paramValues[2] = (const char *) &nrc->btype; 874 nrc->paramValues[2] = (const char *) &nrc->btype;
1089 nrc->paramLengths[2] = sizeof (nrc->btype); 875 nrc->paramLengths[2] = sizeof (nrc->btype);
1090 nrc->paramValues[3] = (const char *) &nrc->blast_rowid; 876 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1091 nrc->paramLengths[3] = sizeof (nrc->blast_rowid); 877 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1092 nrc->paramValues[4] = (const char *) &nrc->blimit_off; 878 nrc->nparams = 4;
1093 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1094 nrc->nparams = 5;
1095 nrc->pname = "getvt"; 879 nrc->pname = "getvt";
1096 ret = PQexecParams (plugin->dbh, 880 ret = PQexecParams (plugin->dbh,
1097 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 881 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
@@ -1105,11 +889,9 @@ postgres_plugin_get (void *cls,
1105 { 889 {
1106 nrc->paramValues[1] = (const char *) &nrc->btype; 890 nrc->paramValues[1] = (const char *) &nrc->btype;
1107 nrc->paramLengths[1] = sizeof (nrc->btype); 891 nrc->paramLengths[1] = sizeof (nrc->btype);
1108 nrc->paramValues[2] = (const char *) &nrc->blast_rowid; 892 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1109 nrc->paramLengths[2] = sizeof (nrc->blast_rowid); 893 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1110 nrc->paramValues[3] = (const char *) &nrc->blimit_off; 894 nrc->nparams = 3;
1111 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1112 nrc->nparams = 4;
1113 nrc->pname = "gett"; 895 nrc->pname = "gett";
1114 ret = PQexecParams (plugin->dbh, 896 ret = PQexecParams (plugin->dbh,
1115 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 897 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
@@ -1126,11 +908,9 @@ postgres_plugin_get (void *cls,
1126 { 908 {
1127 nrc->paramValues[1] = (const char *) &nrc->vhash; 909 nrc->paramValues[1] = (const char *) &nrc->vhash;
1128 nrc->paramLengths[1] = sizeof (nrc->vhash); 910 nrc->paramLengths[1] = sizeof (nrc->vhash);
1129 nrc->paramValues[2] = (const char *) &nrc->blast_rowid; 911 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1130 nrc->paramLengths[2] = sizeof (nrc->blast_rowid); 912 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1131 nrc->paramValues[3] = (const char *) &nrc->blimit_off; 913 nrc->nparams = 3;
1132 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1133 nrc->nparams = 4;
1134 nrc->pname = "getv"; 914 nrc->pname = "getv";
1135 ret = PQexecParams (plugin->dbh, 915 ret = PQexecParams (plugin->dbh,
1136 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 916 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
@@ -1142,11 +922,9 @@ postgres_plugin_get (void *cls,
1142 } 922 }
1143 else 923 else
1144 { 924 {
1145 nrc->paramValues[1] = (const char *) &nrc->blast_rowid; 925 nrc->paramValues[1] = (const char *) &nrc->blimit_off;
1146 nrc->paramLengths[1] = sizeof (nrc->blast_rowid); 926 nrc->paramLengths[1] = sizeof (nrc->blimit_off);
1147 nrc->paramValues[2] = (const char *) &nrc->blimit_off; 927 nrc->nparams = 2;
1148 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1149 nrc->nparams = 3;
1150 nrc->pname = "get"; 928 nrc->pname = "get";
1151 ret = PQexecParams (plugin->dbh, 929 ret = PQexecParams (plugin->dbh,
1152 "SELECT count(*) FROM gn090 WHERE hash=$1", 930 "SELECT count(*) FROM gn090 WHERE hash=$1",
@@ -1200,6 +978,131 @@ postgres_plugin_get (void *cls,
1200 978
1201 979
1202/** 980/**
981 * Select a subset of the items in the datastore and call
982 * the given iterator for each of them.
983 *
984 * @param cls our "struct Plugin*"
985 * @param type entries of which type should be considered?
986 * Use 0 for any type.
987 * @param iter function to call on each matching value;
988 * will be called once with a NULL value at the end
989 * @param iter_cls closure for iter
990 */
991static void
992postgres_plugin_iter_zero_anonymity (void *cls,
993 enum GNUNET_BLOCK_Type type,
994 PluginIterator iter,
995 void *iter_cls)
996{
997 struct Plugin *plugin = cls;
998 struct NextRequestClosure *nrc;
999
1000 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1001 nrc->btype = htonl ((uint32_t) type);
1002 nrc->plugin = plugin;
1003 nrc->iter = iter;
1004 nrc->iter_cls = iter_cls;
1005 nrc->pname = "select_non_anonymous";
1006 nrc->nparams = 1;
1007 nrc->paramLengths[0] = sizeof (nrc->bcount);
1008 nrc->paramValues[0] = (const char*) &nrc->bcount;
1009 postgres_plugin_next_request (nrc,
1010 GNUNET_NO);
1011}
1012
1013/**
1014 * Context for 'repl_iter' function.
1015 */
1016struct ReplCtx
1017{
1018
1019 /**
1020 * Plugin handle.
1021 */
1022 struct Plugin *plugin;
1023
1024 /**
1025 * Function to call for the result (or the NULL).
1026 */
1027 PluginIterator iter;
1028
1029 /**
1030 * Closure for iter.
1031 */
1032 void *iter_cls;
1033};
1034
1035
1036/**
1037 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1038 * Decrements the replication counter and calls the original
1039 * iterator.
1040 *
1041 * @param cls closure
1042 * @param next_cls closure to pass to the "next" function.
1043 * @param key key for the content
1044 * @param size number of bytes in data
1045 * @param data content stored
1046 * @param type type of the content
1047 * @param priority priority of the content
1048 * @param anonymity anonymity-level for the content
1049 * @param expiration expiration time for the content
1050 * @param uid unique identifier for the datum;
1051 * maybe 0 if no unique identifier is available
1052 *
1053 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1054 * (continue on call to "next", of course),
1055 * GNUNET_NO to delete the item and continue (if supported)
1056 */
1057static int
1058repl_iter (void *cls,
1059 void *next_cls,
1060 const GNUNET_HashCode *key,
1061 uint32_t size,
1062 const void *data,
1063 enum GNUNET_BLOCK_Type type,
1064 uint32_t priority,
1065 uint32_t anonymity,
1066 struct GNUNET_TIME_Absolute expiration,
1067 uint64_t uid)
1068{
1069 struct ReplCtx *rc = cls;
1070 struct Plugin *plugin = rc->plugin;
1071 int ret;
1072 PGresult *qret;
1073 uint32_t boid;
1074
1075 ret = rc->iter (rc->iter_cls,
1076 next_cls, key,
1077 size, data,
1078 type, priority, anonymity, expiration,
1079 uid);
1080 if (NULL != key)
1081 {
1082 boid = htonl ( (uint32_t) uid);
1083 const char *paramValues[] = {
1084 (const char *) &boid,
1085 };
1086 int paramLengths[] = {
1087 sizeof (boid),
1088 };
1089 const int paramFormats[] = { 1 };
1090 qret = PQexecPrepared (plugin->dbh,
1091 "decrepl",
1092 1, paramValues, paramLengths, paramFormats, 1);
1093 if (GNUNET_OK != check_result (plugin,
1094 qret,
1095 PGRES_COMMAND_OK,
1096 "PQexecPrepared",
1097 "decrepl", __LINE__))
1098 return GNUNET_SYSERR;
1099 PQclear (qret);
1100 }
1101 return ret;
1102}
1103
1104
1105/**
1203 * Get a random item for replication. Returns a single, not expired, random item 1106 * Get a random item for replication. Returns a single, not expired, random item
1204 * from those with the highest replication counters. The item's 1107 * from those with the highest replication counters. The item's
1205 * replication counter is decremented by one IF it was positive before. 1108 * replication counter is decremented by one IF it was positive before.
@@ -1213,9 +1116,21 @@ static void
1213postgres_plugin_replication_get (void *cls, 1116postgres_plugin_replication_get (void *cls,
1214 PluginIterator iter, void *iter_cls) 1117 PluginIterator iter, void *iter_cls)
1215{ 1118{
1216 /* FIXME: not implemented! */ 1119 struct Plugin *plugin = cls;
1217 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 1120 struct NextRequestClosure *nrc;
1218 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1121 struct ReplCtx rc;
1122
1123 rc.plugin = plugin;
1124 rc.iter = iter;
1125 rc.iter_cls = iter_cls;
1126 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1127 nrc->one_shot = GNUNET_YES;
1128 nrc->plugin = plugin;
1129 nrc->iter = &repl_iter;
1130 nrc->iter_cls = &rc;
1131 nrc->pname = "select_replication_order";
1132 nrc->nparams = 0;
1133 postgres_next_request_cont (nrc, NULL);
1219} 1134}
1220 1135
1221 1136
@@ -1231,34 +1146,80 @@ static void
1231postgres_plugin_expiration_get (void *cls, 1146postgres_plugin_expiration_get (void *cls,
1232 PluginIterator iter, void *iter_cls) 1147 PluginIterator iter, void *iter_cls)
1233{ 1148{
1234 /* FIXME: not implemented! */ 1149 struct Plugin *plugin = cls;
1235 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 1150 struct NextRequestClosure *nrc;
1236 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1151 uint64_t btime;
1152
1153 btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
1154 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1155 nrc->one_shot = GNUNET_YES;
1156 nrc->plugin = plugin;
1157 nrc->iter = iter;
1158 nrc->iter_cls = iter_cls;
1159 nrc->pname = "select_expiration_order";
1160 nrc->nparams = 1;
1161 nrc->paramValues[0] = (const char *) &btime;
1162 nrc->paramLengths[0] = sizeof (btime);
1163 postgres_next_request_cont (nrc, NULL);
1237} 1164}
1238 1165
1239 1166
1240/** 1167/**
1241 * Select a subset of the items in the datastore and call 1168 * Update the priority for a particular key in the datastore. If
1242 * the given iterator for each of them. 1169 * the expiration time in value is different than the time found in
1170 * the datastore, the higher value should be kept. For the
1171 * anonymity level, the lower value is to be used. The specified
1172 * priority should be added to the existing priority, ignoring the
1173 * priority in value.
1174 *
1175 * Note that it is possible for multiple values to match this put.
1176 * In that case, all of the respective values are updated.
1243 * 1177 *
1244 * @param cls our "struct Plugin*" 1178 * @param cls our "struct Plugin*"
1245 * @param type entries of which type should be considered? 1179 * @param uid unique identifier of the datum
1246 * Use 0 for any type. 1180 * @param delta by how much should the priority
1247 * @param iter function to call on each matching value; 1181 * change? If priority + delta < 0 the
1248 * will be called once with a NULL value at the end 1182 * priority should be set to 0 (never go
1249 * @param iter_cls closure for iter 1183 * negative).
1184 * @param expire new expiration time should be the
1185 * MAX of any existing expiration time and
1186 * this value
1187 * @param msg set to error message
1188 * @return GNUNET_OK on success
1250 */ 1189 */
1251static void 1190static int
1252postgres_plugin_iter_zero_anonymity (void *cls, 1191postgres_plugin_update (void *cls,
1253 enum GNUNET_BLOCK_Type type, 1192 uint64_t uid,
1254 PluginIterator iter, 1193 int delta, struct GNUNET_TIME_Absolute expire,
1255 void *iter_cls) 1194 char **msg)
1256{ 1195{
1257 struct Plugin *plugin = cls; 1196 struct Plugin *plugin = cls;
1197 PGresult *ret;
1198 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1199 uint32_t boid = htonl ( (uint32_t) uid);
1200 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1201 const char *paramValues[] = {
1202 (const char *) &bdelta,
1203 (const char *) &bexpire,
1204 (const char *) &boid,
1205 };
1206 int paramLengths[] = {
1207 sizeof (bdelta),
1208 sizeof (bexpire),
1209 sizeof (boid),
1210 };
1211 const int paramFormats[] = { 1, 1, 1 };
1258 1212
1259 postgres_iterate (plugin, 1213 ret = PQexecPrepared (plugin->dbh,
1260 type, GNUNET_NO, 1, 1214 "update",
1261 iter, iter_cls); 1215 3, paramValues, paramLengths, paramFormats, 1);
1216 if (GNUNET_OK != check_result (plugin,
1217 ret,
1218 PGRES_COMMAND_OK,
1219 "PQexecPrepared", "update", __LINE__))
1220 return GNUNET_SYSERR;
1221 PQclear (ret);
1222 return GNUNET_OK;
1262} 1223}
1263 1224
1264 1225