aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_mysql.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_mysql.c')
-rw-r--r--src/datastore/plugin_datastore_mysql.c434
1 files changed, 269 insertions, 165 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index 1658aa51a..deef46af0 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -208,6 +208,8 @@ struct NextRequestClosure
208 unsigned int count; 208 unsigned int count;
209 209
210 int end_it; 210 int end_it;
211
212 int one_shot;
211}; 213};
212 214
213 215
@@ -284,9 +286,12 @@ struct Plugin
284#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?" 286#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?"
285 struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; 287 struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
286 288
287#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1" 289#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
288 struct GNUNET_MysqlStatementHandle *update_entry; 290 struct GNUNET_MysqlStatementHandle *update_entry;
289 291
292#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
293 struct GNUNET_MysqlStatementHandle *dec_repl;
294
290#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" 295#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
291 struct GNUNET_MysqlStatementHandle *get_size; 296 struct GNUNET_MysqlStatementHandle *get_size;
292 297
@@ -866,144 +871,6 @@ return_ok (void *cls,
866 871
867 872
868/** 873/**
869 * Continuation of "mysql_next_request".
870 *
871 * @param next_cls the next context
872 * @param tc the task context (unused)
873 */
874static void
875mysql_next_request_cont (void *next_cls,
876 const struct GNUNET_SCHEDULER_TaskContext *tc)
877{
878 struct NextRequestClosure *nrc = next_cls;
879 struct Plugin *plugin;
880 int ret;
881 unsigned int type;
882 unsigned int priority;
883 unsigned int anonymity;
884 unsigned long long exp;
885 unsigned long hashSize;
886 unsigned long size;
887 unsigned long long uid;
888 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
889 GNUNET_HashCode key;
890 struct GNUNET_TIME_Absolute expiration;
891 MYSQL_BIND *rbind = nrc->rbind;
892
893 plugin = nrc->plugin;
894 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
895 plugin->next_task_nc = NULL;
896
897 if (GNUNET_YES == nrc->end_it)
898 goto END_SET;
899 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
900 nrc->now = GNUNET_TIME_absolute_get ();
901 hashSize = sizeof (GNUNET_HashCode);
902 memset (nrc->rbind, 0, sizeof (nrc->rbind));
903 rbind = nrc->rbind;
904 rbind[0].buffer_type = MYSQL_TYPE_LONG;
905 rbind[0].buffer = &type;
906 rbind[0].is_unsigned = 1;
907 rbind[1].buffer_type = MYSQL_TYPE_LONG;
908 rbind[1].buffer = &priority;
909 rbind[1].is_unsigned = 1;
910 rbind[2].buffer_type = MYSQL_TYPE_LONG;
911 rbind[2].buffer = &anonymity;
912 rbind[2].is_unsigned = 1;
913 rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
914 rbind[3].buffer = &exp;
915 rbind[3].is_unsigned = 1;
916 rbind[4].buffer_type = MYSQL_TYPE_BLOB;
917 rbind[4].buffer = &key;
918 rbind[4].buffer_length = hashSize;
919 rbind[4].length = &hashSize;
920 rbind[5].buffer_type = MYSQL_TYPE_BLOB;
921 rbind[5].buffer = value;
922 rbind[5].buffer_length = size = sizeof (value);
923 rbind[5].length = &size;
924 rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
925 rbind[6].buffer = &uid;
926 rbind[6].is_unsigned = 1;
927
928 if (GNUNET_OK != nrc->prep (nrc->prep_cls,
929 nrc))
930 goto END_SET;
931 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
932 GNUNET_assert (size <= sizeof(value));
933 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
934 (hashSize != sizeof (GNUNET_HashCode)) )
935 {
936 GNUNET_break (0);
937 goto END_SET;
938 }
939#if DEBUG_MYSQL
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
942 (unsigned int) size,
943 GNUNET_h2s (&key),
944 priority,
945 anonymity,
946 exp);
947#endif
948 expiration.abs_value = exp;
949 ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc,
950 &key,
951 size, value,
952 type, priority, anonymity, expiration,
953 uid);
954 if (ret == GNUNET_SYSERR)
955 {
956 nrc->end_it = GNUNET_YES;
957 return;
958 }
959 if (ret == GNUNET_NO)
960 {
961 do_delete_entry (plugin, uid);
962 if (size != 0)
963 plugin->env->duc (plugin->env->cls,
964 - size);
965 }
966 return;
967 END_SET:
968 /* call dviter with "end of set" */
969 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
970 nrc->dviter (nrc->dviter_cls,
971 NULL, NULL, 0, NULL, 0, 0, 0,
972 GNUNET_TIME_UNIT_ZERO_ABS, 0);
973 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
974 nrc->prep (nrc->prep_cls, NULL);
975 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
976 GNUNET_free (nrc);
977}
978
979
980/**
981 * Function invoked on behalf of a "PluginIterator"
982 * asking the database plugin to call the iterator
983 * with the next item.
984 *
985 * @param next_cls whatever argument was given
986 * to the PluginIterator as "next_cls".
987 * @param end_it set to GNUNET_YES if we
988 * should terminate the iteration early
989 * (iterator should be still called once more
990 * to signal the end of the iteration).
991 */
992static void
993mysql_plugin_next_request (void *next_cls,
994 int end_it)
995{
996 struct NextRequestClosure *nrc = next_cls;
997
998 if (GNUNET_YES == end_it)
999 nrc->end_it = GNUNET_YES;
1000 nrc->plugin->next_task_nc = nrc;
1001 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1002 nrc);
1003}
1004
1005
1006/**
1007 * Get an estimate of how much space the database is 874 * Get an estimate of how much space the database is
1008 * currently using. 875 * currently using.
1009 * 876 *
@@ -1167,6 +1034,152 @@ mysql_plugin_update (void *cls,
1167} 1034}
1168 1035
1169 1036
1037
1038
1039/**
1040 * Continuation of "mysql_next_request".
1041 *
1042 * @param next_cls the next context
1043 * @param tc the task context (unused)
1044 */
1045static void
1046mysql_next_request_cont (void *next_cls,
1047 const struct GNUNET_SCHEDULER_TaskContext *tc)
1048{
1049 struct NextRequestClosure *nrc = next_cls;
1050 struct Plugin *plugin;
1051 int ret;
1052 unsigned int type;
1053 unsigned int priority;
1054 unsigned int anonymity;
1055 unsigned long long exp;
1056 unsigned long hashSize;
1057 unsigned long size;
1058 unsigned long long uid;
1059 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1060 GNUNET_HashCode key;
1061 struct GNUNET_TIME_Absolute expiration;
1062 MYSQL_BIND *rbind = nrc->rbind;
1063
1064 plugin = nrc->plugin;
1065 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1066 plugin->next_task_nc = NULL;
1067
1068 if (GNUNET_YES == nrc->end_it)
1069 goto END_SET;
1070 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1071 nrc->now = GNUNET_TIME_absolute_get ();
1072 hashSize = sizeof (GNUNET_HashCode);
1073 memset (nrc->rbind, 0, sizeof (nrc->rbind));
1074 rbind = nrc->rbind;
1075 rbind[0].buffer_type = MYSQL_TYPE_LONG;
1076 rbind[0].buffer = &type;
1077 rbind[0].is_unsigned = 1;
1078 rbind[1].buffer_type = MYSQL_TYPE_LONG;
1079 rbind[1].buffer = &priority;
1080 rbind[1].is_unsigned = 1;
1081 rbind[2].buffer_type = MYSQL_TYPE_LONG;
1082 rbind[2].buffer = &anonymity;
1083 rbind[2].is_unsigned = 1;
1084 rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
1085 rbind[3].buffer = &exp;
1086 rbind[3].is_unsigned = 1;
1087 rbind[4].buffer_type = MYSQL_TYPE_BLOB;
1088 rbind[4].buffer = &key;
1089 rbind[4].buffer_length = hashSize;
1090 rbind[4].length = &hashSize;
1091 rbind[5].buffer_type = MYSQL_TYPE_BLOB;
1092 rbind[5].buffer = value;
1093 rbind[5].buffer_length = size = sizeof (value);
1094 rbind[5].length = &size;
1095 rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
1096 rbind[6].buffer = &uid;
1097 rbind[6].is_unsigned = 1;
1098
1099 if (GNUNET_OK != nrc->prep (nrc->prep_cls,
1100 nrc))
1101 goto END_SET;
1102 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1103 GNUNET_assert (size <= sizeof(value));
1104 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1105 (hashSize != sizeof (GNUNET_HashCode)) )
1106 {
1107 GNUNET_break (0);
1108 goto END_SET;
1109 }
1110#if DEBUG_MYSQL
1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112 "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n",
1113 (unsigned int) size,
1114 GNUNET_h2s (&key),
1115 priority,
1116 anonymity,
1117 exp);
1118#endif
1119 expiration.abs_value = exp;
1120 ret = nrc->dviter (nrc->dviter_cls,
1121 (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
1122 &key,
1123 size, value,
1124 type, priority, anonymity, expiration,
1125 uid);
1126 if (ret == GNUNET_SYSERR)
1127 {
1128 nrc->end_it = GNUNET_YES;
1129 return;
1130 }
1131 if (ret == GNUNET_NO)
1132 {
1133 do_delete_entry (plugin, uid);
1134 if (size != 0)
1135 plugin->env->duc (plugin->env->cls,
1136 - size);
1137 }
1138 if (nrc->one_shot == GNUNET_YES)
1139 GNUNET_free (nrc);
1140 return;
1141 END_SET:
1142 /* call dviter with "end of set" */
1143 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1144 nrc->dviter (nrc->dviter_cls,
1145 NULL, NULL, 0, NULL, 0, 0, 0,
1146 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1147 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1148 nrc->prep (nrc->prep_cls, NULL);
1149 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1150 GNUNET_free (nrc);
1151}
1152
1153
1154/**
1155 * Function invoked on behalf of a "PluginIterator"
1156 * asking the database plugin to call the iterator
1157 * with the next item.
1158 *
1159 * @param next_cls whatever argument was given
1160 * to the PluginIterator as "next_cls".
1161 * @param end_it set to GNUNET_YES if we
1162 * should terminate the iteration early
1163 * (iterator should be still called once more
1164 * to signal the end of the iteration).
1165 */
1166static void
1167mysql_plugin_next_request (void *next_cls,
1168 int end_it)
1169{
1170 struct NextRequestClosure *nrc = next_cls;
1171
1172 if (GNUNET_YES == end_it)
1173 nrc->end_it = GNUNET_YES;
1174 nrc->plugin->next_task_nc = nrc;
1175 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1176 nrc);
1177}
1178
1179
1180/**
1181 * Context for 'get_statement_prepare'.
1182 */
1170struct GetContext 1183struct GetContext
1171{ 1184{
1172 GNUNET_HashCode key; 1185 GNUNET_HashCode key;
@@ -1466,7 +1479,6 @@ replication_prepare (void *cls,
1466{ 1479{
1467 struct Plugin *plugin = cls; 1480 struct Plugin *plugin = cls;
1468 1481
1469 nrc->end_it = GNUNET_YES;
1470 return prepared_statement_run_select (plugin, 1482 return prepared_statement_run_select (plugin,
1471 plugin->select_replication, 1483 plugin->select_replication,
1472 7, nrc->rbind, 1484 7, nrc->rbind,
@@ -1475,6 +1487,92 @@ replication_prepare (void *cls,
1475} 1487}
1476 1488
1477 1489
1490
1491/**
1492 * Context for 'repl_iter' function.
1493 */
1494struct ReplCtx
1495{
1496
1497 /**
1498 * Plugin handle.
1499 */
1500 struct Plugin *plugin;
1501
1502 /**
1503 * Function to call for the result (or the NULL).
1504 */
1505 PluginIterator iter;
1506
1507 /**
1508 * Closure for iter.
1509 */
1510 void *iter_cls;
1511};
1512
1513
1514/**
1515 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1516 * Decrements the replication counter and calls the original
1517 * iterator.
1518 *
1519 * @param cls closure
1520 * @param next_cls closure to pass to the "next" function.
1521 * @param key key for the content
1522 * @param size number of bytes in data
1523 * @param data content stored
1524 * @param type type of the content
1525 * @param priority priority of the content
1526 * @param anonymity anonymity-level for the content
1527 * @param expiration expiration time for the content
1528 * @param uid unique identifier for the datum;
1529 * maybe 0 if no unique identifier is available
1530 *
1531 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1532 * (continue on call to "next", of course),
1533 * GNUNET_NO to delete the item and continue (if supported)
1534 */
1535static int
1536repl_iter (void *cls,
1537 void *next_cls,
1538 const GNUNET_HashCode *key,
1539 uint32_t size,
1540 const void *data,
1541 enum GNUNET_BLOCK_Type type,
1542 uint32_t priority,
1543 uint32_t anonymity,
1544 struct GNUNET_TIME_Absolute expiration,
1545 uint64_t uid)
1546{
1547 struct ReplCtx *rc = cls;
1548 struct Plugin *plugin = rc->plugin;
1549 unsigned long long oid;
1550 int ret;
1551
1552 ret = rc->iter (rc->iter_cls,
1553 next_cls, key,
1554 size, data,
1555 type, priority, anonymity, expiration,
1556 uid);
1557 if (NULL != key)
1558 {
1559 oid = (unsigned long long) uid;
1560 ret = prepared_statement_run (plugin,
1561 plugin->dec_repl,
1562 NULL,
1563 MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
1564 -1);
1565 if (ret == GNUNET_SYSERR)
1566 {
1567 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1568 "Failed to reduce replication counter\n");
1569 return GNUNET_SYSERR;
1570 }
1571 }
1572 return ret;
1573}
1574
1575
1478/** 1576/**
1479 * Get a random item for replication. Returns a single, not expired, random item 1577 * Get a random item for replication. Returns a single, not expired, random item
1480 * from those with the highest replication counters. The item's 1578 * from those with the highest replication counters. The item's
@@ -1490,18 +1588,23 @@ mysql_plugin_replication_get (void *cls,
1490 PluginIterator iter, void *iter_cls) 1588 PluginIterator iter, void *iter_cls)
1491{ 1589{
1492 struct Plugin *plugin = cls; 1590 struct Plugin *plugin = cls;
1493 struct NextRequestClosure nrc; 1591 struct NextRequestClosure *nrc;
1494 1592 struct ReplCtx rc;
1495 memset (&nrc, 0, sizeof (nrc)); 1593
1496 nrc.plugin = plugin; 1594 rc.plugin = plugin;
1497 nrc.now = GNUNET_TIME_absolute_get (); 1595 rc.iter = iter;
1498 nrc.prep = &replication_prepare; 1596 rc.iter_cls = iter_cls;
1499 nrc.prep_cls = plugin; 1597 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1500 nrc.type = 0; 1598 nrc->plugin = plugin;
1501 nrc.dviter = iter; 1599 nrc->now = GNUNET_TIME_absolute_get ();
1502 nrc.dviter_cls = iter_cls; 1600 nrc->prep = &replication_prepare;
1503 nrc.end_it = GNUNET_NO; 1601 nrc->prep_cls = plugin;
1504 mysql_next_request_cont (&nrc, NULL); 1602 nrc->type = 0;
1603 nrc->dviter = &repl_iter;
1604 nrc->dviter_cls = &rc;
1605 nrc->end_it = GNUNET_NO;
1606 nrc->one_shot = GNUNET_YES;
1607 mysql_next_request_cont (nrc, NULL);
1505} 1608}
1506 1609
1507 1610
@@ -1522,7 +1625,6 @@ expiration_prepare (void *cls,
1522 1625
1523 if (NULL == nrc) 1626 if (NULL == nrc)
1524 return GNUNET_NO; 1627 return GNUNET_NO;
1525 nrc->end_it = GNUNET_YES;
1526 nt = (long long) nrc->now.abs_value; 1628 nt = (long long) nrc->now.abs_value;
1527 return prepared_statement_run_select 1629 return prepared_statement_run_select
1528 (plugin, 1630 (plugin,
@@ -1547,18 +1649,19 @@ mysql_plugin_expiration_get (void *cls,
1547 PluginIterator iter, void *iter_cls) 1649 PluginIterator iter, void *iter_cls)
1548{ 1650{
1549 struct Plugin *plugin = cls; 1651 struct Plugin *plugin = cls;
1550 struct NextRequestClosure nrc; 1652 struct NextRequestClosure *nrc;
1551 1653
1552 memset (&nrc, 0, sizeof (nrc)); 1654 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1553 nrc.plugin = plugin; 1655 nrc->plugin = plugin;
1554 nrc.now = GNUNET_TIME_absolute_get (); 1656 nrc->now = GNUNET_TIME_absolute_get ();
1555 nrc.prep = &expiration_prepare; 1657 nrc->prep = &expiration_prepare;
1556 nrc.prep_cls = plugin; 1658 nrc->prep_cls = plugin;
1557 nrc.type = 0; 1659 nrc->type = 0;
1558 nrc.dviter = iter; 1660 nrc->dviter = iter;
1559 nrc.dviter_cls = iter_cls; 1661 nrc->dviter_cls = iter_cls;
1560 nrc.end_it = GNUNET_NO; 1662 nrc->end_it = GNUNET_NO;
1561 mysql_next_request_cont (&nrc, NULL); 1663 nrc->one_shot = GNUNET_YES;
1664 mysql_next_request_cont (nrc, NULL);
1562} 1665}
1563 1666
1564 1667
@@ -1639,6 +1742,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1639 || PINIT (plugin->count_entry_by_hash_vhash_and_type, 1742 || PINIT (plugin->count_entry_by_hash_vhash_and_type,
1640 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) 1743 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
1641 || PINIT (plugin->update_entry, UPDATE_ENTRY) 1744 || PINIT (plugin->update_entry, UPDATE_ENTRY)
1745 || PINIT (plugin->dec_repl, DEC_REPL)
1642 || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 1746 || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS)
1643 || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 1747 || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION)
1644 || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ) 1748 || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )