aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore/plugin_psycstore_sqlite.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psycstore/plugin_psycstore_sqlite.c')
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c238
1 files changed, 188 insertions, 50 deletions
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 86b969c5d..cb6c5c437 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -130,12 +130,22 @@ struct Plugin
130 /** 130 /**
131 * Precompiled SQL for fragment_get() 131 * Precompiled SQL for fragment_get()
132 */ 132 */
133 sqlite3_stmt *select_fragment; 133 sqlite3_stmt *select_fragments;
134
135 /**
136 * Precompiled SQL for fragment_get()
137 */
138 sqlite3_stmt *select_latest_fragments;
134 139
135 /** 140 /**
136 * Precompiled SQL for message_get() 141 * Precompiled SQL for message_get()
137 */ 142 */
138 sqlite3_stmt *select_message; 143 sqlite3_stmt *select_messages;
144
145 /**
146 * Precompiled SQL for message_get()
147 */
148 sqlite3_stmt *select_latest_messages;
139 149
140 /** 150 /**
141 * Precompiled SQL for message_get_fragment() 151 * Precompiled SQL for message_get_fragment()
@@ -456,8 +466,8 @@ database_setup (struct Plugin *plugin)
456 " multicast_flags, psycstore_flags, data\n" 466 " multicast_flags, psycstore_flags, data\n"
457 "FROM messages\n" 467 "FROM messages\n"
458 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 468 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459 " AND fragment_id = ?;", 469 " AND ? <= fragment_id AND fragment_id <= ?;",
460 &plugin->select_fragment); 470 &plugin->select_fragments);
461 471
462 sql_prepare (plugin->dbh, 472 sql_prepare (plugin->dbh,
463 "SELECT hop_counter, signature, purpose, fragment_id,\n" 473 "SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -465,8 +475,35 @@ database_setup (struct Plugin *plugin)
465 " multicast_flags, psycstore_flags, data\n" 475 " multicast_flags, psycstore_flags, data\n"
466 "FROM messages\n" 476 "FROM messages\n"
467 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 477 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
468 " AND message_id = ? AND fragment_offset = ?;", 478 " AND ? <= message_id AND message_id <= ?;",
469 &plugin->select_message_fragment); 479 &plugin->select_messages);
480
481 sql_prepare (plugin->dbh,
482 "SELECT * FROM\n"
483 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
484 " fragment_offset, message_id, group_generation,\n"
485 " multicast_flags, psycstore_flags, data\n"
486 " FROM messages\n"
487 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
488 " ORDER BY fragment_id DESC\n"
489 " LIMIT ?)\n"
490 "ORDER BY fragment_id;",
491 &plugin->select_latest_fragments);
492
493 sql_prepare (plugin->dbh,
494 "SELECT hop_counter, signature, purpose, fragment_id,\n"
495 " fragment_offset, message_id, group_generation,\n"
496 " multicast_flags, psycstore_flags, data\n"
497 "FROM messages\n"
498 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
499 " AND message_id IN\n"
500 " (SELECT message_id\n"
501 " FROM messages\n"
502 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503 " ORDER BY message_id\n"
504 " DESC LIMIT ?)\n"
505 "ORDER BY fragment_id;",
506 &plugin->select_latest_messages);
470 507
471 sql_prepare (plugin->dbh, 508 sql_prepare (plugin->dbh,
472 "SELECT hop_counter, signature, purpose, fragment_id,\n" 509 "SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -474,8 +511,8 @@ database_setup (struct Plugin *plugin)
474 " multicast_flags, psycstore_flags, data\n" 511 " multicast_flags, psycstore_flags, data\n"
475 "FROM messages\n" 512 "FROM messages\n"
476 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 513 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477 " AND message_id = ?;", 514 " AND message_id = ? AND fragment_offset = ?;",
478 &plugin->select_message); 515 &plugin->select_message_fragment);
479 516
480 sql_prepare (plugin->dbh, 517 sql_prepare (plugin->dbh,
481 "SELECT fragment_id, message_id, group_generation\n" 518 "SELECT fragment_id, message_id, group_generation\n"
@@ -1036,8 +1073,42 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1036 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8)); 1073 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1037} 1074}
1038 1075
1076
1077static int
1078fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1079 uint64_t *returned_fragments,
1080 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1081{
1082 int ret = GNUNET_SYSERR;
1083 int sql_ret;
1084
1085 do
1086 {
1087 sql_ret = sqlite3_step (stmt);
1088 switch (sql_ret)
1089 {
1090 case SQLITE_DONE:
1091 if (ret != GNUNET_OK)
1092 ret = GNUNET_NO;
1093 break;
1094 case SQLITE_ROW:
1095 ret = fragment_row (stmt, cb, cb_cls);
1096 (*returned_fragments)++;
1097 if (ret != GNUNET_YES)
1098 sql_ret = SQLITE_DONE;
1099 break;
1100 default:
1101 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1102 "sqlite3_step");
1103 }
1104 }
1105 while (sql_ret == SQLITE_ROW);
1106
1107 return ret;
1108}
1109
1039/** 1110/**
1040 * Retrieve a message fragment by fragment ID. 1111 * Retrieve a message fragment range by fragment ID.
1041 * 1112 *
1042 * @see GNUNET_PSYCSTORE_fragment_get() 1113 * @see GNUNET_PSYCSTORE_fragment_get()
1043 * 1114 *
@@ -1046,36 +1117,29 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1046static int 1117static int
1047fragment_get (void *cls, 1118fragment_get (void *cls,
1048 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1119 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1049 uint64_t fragment_id, 1120 uint64_t first_fragment_id,
1121 uint64_t last_fragment_id,
1122 uint64_t *returned_fragments,
1050 GNUNET_PSYCSTORE_FragmentCallback cb, 1123 GNUNET_PSYCSTORE_FragmentCallback cb,
1051 void *cb_cls) 1124 void *cb_cls)
1052{ 1125{
1053 struct Plugin *plugin = cls; 1126 struct Plugin *plugin = cls;
1054 sqlite3_stmt *stmt = plugin->select_fragment; 1127 sqlite3_stmt *stmt = plugin->select_fragments;
1055 int ret = GNUNET_SYSERR; 1128 int ret = GNUNET_SYSERR;
1129 *returned_fragments = 0;
1056 1130
1057 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1131 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1058 sizeof (*channel_key), 1132 sizeof (*channel_key),
1059 SQLITE_STATIC) 1133 SQLITE_STATIC)
1060 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id)) 1134 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1135 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1061 { 1136 {
1062 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1063 "sqlite3_bind"); 1138 "sqlite3_bind");
1064 } 1139 }
1065 else 1140 else
1066 { 1141 {
1067 switch (sqlite3_step (stmt)) 1142 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1068 {
1069 case SQLITE_DONE:
1070 ret = GNUNET_NO;
1071 break;
1072 case SQLITE_ROW:
1073 ret = fragment_row (stmt, cb, cb_cls);
1074 break;
1075 default:
1076 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1077 "sqlite3_step");
1078 }
1079 } 1143 }
1080 1144
1081 if (SQLITE_OK != sqlite3_reset (stmt)) 1145 if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1087,8 +1151,52 @@ fragment_get (void *cls,
1087 return ret; 1151 return ret;
1088} 1152}
1089 1153
1154
1090/** 1155/**
1091 * Retrieve all fragments of a message. 1156 * Retrieve a message fragment range by fragment ID.
1157 *
1158 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1159 *
1160 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1161 */
1162static int
1163fragment_get_latest (void *cls,
1164 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1165 uint64_t fragment_limit,
1166 uint64_t *returned_fragments,
1167 GNUNET_PSYCSTORE_FragmentCallback cb,
1168 void *cb_cls)
1169{
1170 struct Plugin *plugin = cls;
1171 sqlite3_stmt *stmt = plugin->select_latest_fragments;
1172 int ret = GNUNET_SYSERR;
1173 *returned_fragments = 0;
1174
1175 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1176 sizeof (*channel_key),
1177 SQLITE_STATIC)
1178 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1179 {
1180 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1181 "sqlite3_bind");
1182 }
1183 else
1184 {
1185 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1186 }
1187
1188 if (SQLITE_OK != sqlite3_reset (stmt))
1189 {
1190 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1191 "sqlite3_reset");
1192 }
1193
1194 return ret;
1195}
1196
1197
1198/**
1199 * Retrieve all fragments of a message ID range.
1092 * 1200 *
1093 * @see GNUNET_PSYCSTORE_message_get() 1201 * @see GNUNET_PSYCSTORE_message_get()
1094 * 1202 *
@@ -1097,48 +1205,29 @@ fragment_get (void *cls,
1097static int 1205static int
1098message_get (void *cls, 1206message_get (void *cls,
1099 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1207 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1100 uint64_t message_id, 1208 uint64_t first_message_id,
1209 uint64_t last_message_id,
1101 uint64_t *returned_fragments, 1210 uint64_t *returned_fragments,
1102 GNUNET_PSYCSTORE_FragmentCallback cb, 1211 GNUNET_PSYCSTORE_FragmentCallback cb,
1103 void *cb_cls) 1212 void *cb_cls)
1104{ 1213{
1105 struct Plugin *plugin = cls; 1214 struct Plugin *plugin = cls;
1106 sqlite3_stmt *stmt = plugin->select_message; 1215 sqlite3_stmt *stmt = plugin->select_messages;
1107 int ret = GNUNET_SYSERR; 1216 int ret = GNUNET_SYSERR;
1108 *returned_fragments = 0; 1217 *returned_fragments = 0;
1109 1218
1110 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1219 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1111 sizeof (*channel_key), 1220 sizeof (*channel_key),
1112 SQLITE_STATIC) 1221 SQLITE_STATIC)
1113 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)) 1222 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1223 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1114 { 1224 {
1115 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1225 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1116 "sqlite3_bind"); 1226 "sqlite3_bind");
1117 } 1227 }
1118 else 1228 else
1119 { 1229 {
1120 int sql_ret; 1230 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1121 do
1122 {
1123 sql_ret = sqlite3_step (stmt);
1124 switch (sql_ret)
1125 {
1126 case SQLITE_DONE:
1127 if (ret != GNUNET_OK)
1128 ret = GNUNET_NO;
1129 break;
1130 case SQLITE_ROW:
1131 ret = fragment_row (stmt, cb, cb_cls);
1132 (*returned_fragments)++;
1133 if (ret != GNUNET_YES)
1134 sql_ret = SQLITE_DONE;
1135 break;
1136 default:
1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1138 "sqlite3_step");
1139 }
1140 }
1141 while (sql_ret == SQLITE_ROW);
1142 } 1231 }
1143 1232
1144 if (SQLITE_OK != sqlite3_reset (stmt)) 1233 if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1150,6 +1239,53 @@ message_get (void *cls,
1150 return ret; 1239 return ret;
1151} 1240}
1152 1241
1242
1243/**
1244 * Retrieve all fragments of the latest messages.
1245 *
1246 * @see GNUNET_PSYCSTORE_message_get_latest()
1247 *
1248 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1249 */
1250static int
1251message_get_latest (void *cls,
1252 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1253 uint64_t message_limit,
1254 uint64_t *returned_fragments,
1255 GNUNET_PSYCSTORE_FragmentCallback cb,
1256 void *cb_cls)
1257{
1258 struct Plugin *plugin = cls;
1259 sqlite3_stmt *stmt = plugin->select_latest_messages;
1260 int ret = GNUNET_SYSERR;
1261 *returned_fragments = 0;
1262
1263 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1264 sizeof (*channel_key),
1265 SQLITE_STATIC)
1266 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1267 sizeof (*channel_key),
1268 SQLITE_STATIC)
1269 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1270 {
1271 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1272 "sqlite3_bind");
1273 }
1274 else
1275 {
1276 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1277 }
1278
1279 if (SQLITE_OK != sqlite3_reset (stmt))
1280 {
1281 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1282 "sqlite3_reset");
1283 }
1284
1285 return ret;
1286}
1287
1288
1153/** 1289/**
1154 * Retrieve a fragment of message specified by its message ID and fragment 1290 * Retrieve a fragment of message specified by its message ID and fragment
1155 * offset. 1291 * offset.
@@ -1777,7 +1913,9 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
1777 api->fragment_store = &fragment_store; 1913 api->fragment_store = &fragment_store;
1778 api->message_add_flags = &message_add_flags; 1914 api->message_add_flags = &message_add_flags;
1779 api->fragment_get = &fragment_get; 1915 api->fragment_get = &fragment_get;
1916 api->fragment_get_latest = &fragment_get_latest;
1780 api->message_get = &message_get; 1917 api->message_get = &message_get;
1918 api->message_get_latest = &message_get_latest;
1781 api->message_get_fragment = &message_get_fragment; 1919 api->message_get_fragment = &message_get_fragment;
1782 api->counters_message_get = &counters_message_get; 1920 api->counters_message_get = &counters_message_get;
1783 api->counters_state_get = &counters_state_get; 1921 api->counters_state_get = &counters_state_get;