diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-02-11 20:39:36 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-02-11 20:39:36 +0100 |
commit | 1f59e703d82b47f3aeaf432045a2633c2841169b (patch) | |
tree | 6af5609b388cf1906a29b5d572bec2dd8fb2ae1c /src/psycstore/plugin_psycstore_mysql.c | |
download | gnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.tar.gz gnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.zip |
initial import from gnunet.git
Diffstat (limited to 'src/psycstore/plugin_psycstore_mysql.c')
-rw-r--r-- | src/psycstore/plugin_psycstore_mysql.c | 1960 |
1 files changed, 1960 insertions, 0 deletions
diff --git a/src/psycstore/plugin_psycstore_mysql.c b/src/psycstore/plugin_psycstore_mysql.c new file mode 100644 index 0000000..c36b6f7 --- /dev/null +++ b/src/psycstore/plugin_psycstore_mysql.c | |||
@@ -0,0 +1,1960 @@ | |||
1 | /* | ||
2 | * This file is part of GNUnet | ||
3 | * Copyright (C) 2013 GNUnet e.V. | ||
4 | * | ||
5 | * GNUnet is free software: you can redistribute it and/or modify it | ||
6 | * under the terms of the GNU Affero General Public License as published | ||
7 | * by the Free Software Foundation, either version 3 of the License, | ||
8 | * or (at your option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * Affero General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU Affero General Public License | ||
16 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psycstore/plugin_psycstore_mysql.c | ||
23 | * @brief mysql-based psycstore backend | ||
24 | * @author Gabor X Toth | ||
25 | * @author Christian Grothoff | ||
26 | * @author Christophe Genevey | ||
27 | */ | ||
28 | |||
29 | #include "platform.h" | ||
30 | #include "gnunet_psycstore_plugin.h" | ||
31 | #include "gnunet_psycstore_service.h" | ||
32 | #include "gnunet_multicast_service.h" | ||
33 | #include "gnunet_crypto_lib.h" | ||
34 | #include "gnunet_psyc_util_lib.h" | ||
35 | #include "psycstore.h" | ||
36 | #include "gnunet_my_lib.h" | ||
37 | #include "gnunet_mysql_lib.h" | ||
38 | #include <mysql/mysql.h> | ||
39 | |||
40 | /** | ||
41 | * After how many ms "busy" should a DB operation fail for good? A | ||
42 | * low value makes sure that we are more responsive to requests | ||
43 | * (especially PUTs). A high value guarantees a higher success rate | ||
44 | * (SELECTs in iterate can take several seconds despite LIMIT=1). | ||
45 | * | ||
46 | * The default value of 1s should ensure that users do not experience | ||
47 | * huge latencies while at the same time allowing operations to | ||
48 | * succeed with reasonable probability. | ||
49 | */ | ||
50 | #define BUSY_TIMEOUT_MS 1000 | ||
51 | |||
52 | #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING | ||
53 | |||
54 | /** | ||
55 | * Log an error message at log-level 'level' that indicates | ||
56 | * a failure of the command 'cmd' on file 'filename' | ||
57 | * with the message given by strerror(errno). | ||
58 | */ | ||
59 | #define LOG_MYSQL(db, level, cmd, stmt) \ | ||
60 | do { \ | ||
61 | GNUNET_log_from (level, "psycstore-mysql", \ | ||
62 | _("`%s' failed at %s:%d with error: %s\n"), \ | ||
63 | cmd, __FILE__, __LINE__, \ | ||
64 | mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \ | ||
65 | } while (0) | ||
66 | |||
67 | #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__) | ||
68 | |||
69 | enum Transactions { | ||
70 | TRANSACTION_NONE = 0, | ||
71 | TRANSACTION_STATE_MODIFY, | ||
72 | TRANSACTION_STATE_SYNC, | ||
73 | }; | ||
74 | |||
75 | /** | ||
76 | * Context for all functions in this plugin. | ||
77 | */ | ||
78 | struct Plugin | ||
79 | { | ||
80 | |||
81 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
82 | |||
83 | /** | ||
84 | * MySQL context. | ||
85 | */ | ||
86 | struct GNUNET_MYSQL_Context *mc; | ||
87 | |||
88 | /** | ||
89 | * Current transaction. | ||
90 | */ | ||
91 | enum Transactions transaction; | ||
92 | |||
93 | /** | ||
94 | * Precompiled SQL for channel_key_store() | ||
95 | */ | ||
96 | struct GNUNET_MYSQL_StatementHandle *insert_channel_key; | ||
97 | |||
98 | /** | ||
99 | * Precompiled SQL for slave_key_store() | ||
100 | */ | ||
101 | struct GNUNET_MYSQL_StatementHandle *insert_slave_key; | ||
102 | |||
103 | /** | ||
104 | * Precompiled SQL for membership_store() | ||
105 | */ | ||
106 | struct GNUNET_MYSQL_StatementHandle *insert_membership; | ||
107 | |||
108 | /** | ||
109 | * Precompiled SQL for membership_test() | ||
110 | */ | ||
111 | struct GNUNET_MYSQL_StatementHandle *select_membership; | ||
112 | |||
113 | /** | ||
114 | * Precompiled SQL for fragment_store() | ||
115 | */ | ||
116 | struct GNUNET_MYSQL_StatementHandle *insert_fragment; | ||
117 | |||
118 | /** | ||
119 | * Precompiled SQL for message_add_flags() | ||
120 | */ | ||
121 | struct GNUNET_MYSQL_StatementHandle *update_message_flags; | ||
122 | |||
123 | /** | ||
124 | * Precompiled SQL for fragment_get() | ||
125 | */ | ||
126 | struct GNUNET_MYSQL_StatementHandle *select_fragments; | ||
127 | |||
128 | /** | ||
129 | * Precompiled SQL for fragment_get() | ||
130 | */ | ||
131 | struct GNUNET_MYSQL_StatementHandle *select_latest_fragments; | ||
132 | |||
133 | /** | ||
134 | * Precompiled SQL for message_get() | ||
135 | */ | ||
136 | struct GNUNET_MYSQL_StatementHandle *select_messages; | ||
137 | |||
138 | /** | ||
139 | * Precompiled SQL for message_get() | ||
140 | */ | ||
141 | struct GNUNET_MYSQL_StatementHandle *select_latest_messages; | ||
142 | |||
143 | /** | ||
144 | * Precompiled SQL for message_get_fragment() | ||
145 | */ | ||
146 | struct GNUNET_MYSQL_StatementHandle *select_message_fragment; | ||
147 | |||
148 | /** | ||
149 | * Precompiled SQL for counters_get_message() | ||
150 | */ | ||
151 | struct GNUNET_MYSQL_StatementHandle *select_counters_message; | ||
152 | |||
153 | /** | ||
154 | * Precompiled SQL for counters_get_state() | ||
155 | */ | ||
156 | struct GNUNET_MYSQL_StatementHandle *select_counters_state; | ||
157 | |||
158 | /** | ||
159 | * Precompiled SQL for state_modify_end() | ||
160 | */ | ||
161 | struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id; | ||
162 | |||
163 | /** | ||
164 | * Precompiled SQL for state_sync_end() | ||
165 | */ | ||
166 | struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id; | ||
167 | |||
168 | /** | ||
169 | * Precompiled SQL for state_modify_op() | ||
170 | */ | ||
171 | struct GNUNET_MYSQL_StatementHandle *insert_state_current; | ||
172 | |||
173 | /** | ||
174 | * Precompiled SQL for state_modify_end() | ||
175 | */ | ||
176 | struct GNUNET_MYSQL_StatementHandle *delete_state_empty; | ||
177 | |||
178 | /** | ||
179 | * Precompiled SQL for state_set_signed() | ||
180 | */ | ||
181 | struct GNUNET_MYSQL_StatementHandle *update_state_signed; | ||
182 | |||
183 | /** | ||
184 | * Precompiled SQL for state_sync() | ||
185 | */ | ||
186 | struct GNUNET_MYSQL_StatementHandle *insert_state_sync; | ||
187 | |||
188 | /** | ||
189 | * Precompiled SQL for state_sync() | ||
190 | */ | ||
191 | struct GNUNET_MYSQL_StatementHandle *delete_state; | ||
192 | |||
193 | /** | ||
194 | * Precompiled SQL for state_sync() | ||
195 | */ | ||
196 | struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync; | ||
197 | |||
198 | /** | ||
199 | * Precompiled SQL for state_sync() | ||
200 | */ | ||
201 | struct GNUNET_MYSQL_StatementHandle *delete_state_sync; | ||
202 | |||
203 | /** | ||
204 | * Precompiled SQL for state_get_signed() | ||
205 | */ | ||
206 | struct GNUNET_MYSQL_StatementHandle *select_state_signed; | ||
207 | |||
208 | /** | ||
209 | * Precompiled SQL for state_get() | ||
210 | */ | ||
211 | struct GNUNET_MYSQL_StatementHandle *select_state_one; | ||
212 | |||
213 | /** | ||
214 | * Precompiled SQL for state_get_prefix() | ||
215 | */ | ||
216 | struct GNUNET_MYSQL_StatementHandle *select_state_prefix; | ||
217 | |||
218 | }; | ||
219 | |||
220 | #if DEBUG_PSYCSTORE | ||
221 | |||
222 | static void | ||
223 | mysql_trace (void *cls, const char *sql) | ||
224 | { | ||
225 | LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql); | ||
226 | } | ||
227 | |||
228 | #endif | ||
229 | |||
230 | |||
231 | /** | ||
232 | * @brief Prepare a SQL statement | ||
233 | * | ||
234 | * @param dbh handle to the database | ||
235 | * @param sql SQL statement, UTF-8 encoded | ||
236 | * @param stmt set to the prepared statement | ||
237 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure | ||
238 | */ | ||
239 | static int | ||
240 | mysql_prepare (struct GNUNET_MYSQL_Context *mc, | ||
241 | const char *sql, | ||
242 | struct GNUNET_MYSQL_StatementHandle **stmt) | ||
243 | { | ||
244 | *stmt = GNUNET_MYSQL_statement_prepare (mc, | ||
245 | sql); | ||
246 | |||
247 | if (NULL == *stmt) | ||
248 | { | ||
249 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
250 | _("Error preparing SQL query: %s\n %s\n"), | ||
251 | mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), | ||
252 | sql); | ||
253 | return GNUNET_SYSERR; | ||
254 | } | ||
255 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
256 | "Prepared `%s' / %p\n", | ||
257 | sql, | ||
258 | stmt); | ||
259 | return GNUNET_OK; | ||
260 | } | ||
261 | |||
262 | |||
263 | /** | ||
264 | * Initialize the database connections and associated | ||
265 | * data structures (create tables and indices | ||
266 | * as needed as well). | ||
267 | * | ||
268 | * @param plugin the plugin context (state for this module) | ||
269 | * @return #GNUNET_OK on success | ||
270 | */ | ||
271 | static int | ||
272 | database_setup (struct Plugin *plugin) | ||
273 | { | ||
274 | /* Open database and precompile statements */ | ||
275 | plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg, | ||
276 | "psycstore-mysql"); | ||
277 | |||
278 | if (NULL == plugin->mc) | ||
279 | { | ||
280 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
281 | _("Unable to initialize Mysql.\n")); | ||
282 | return GNUNET_SYSERR; | ||
283 | } | ||
284 | |||
285 | #define STMT_RUN(sql) \ | ||
286 | if (GNUNET_OK != \ | ||
287 | GNUNET_MYSQL_statement_run (plugin->mc, \ | ||
288 | sql)) \ | ||
289 | { \ | ||
290 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \ | ||
291 | _("Failed to run SQL statement `%s'\n"), \ | ||
292 | sql); \ | ||
293 | return GNUNET_SYSERR; \ | ||
294 | } | ||
295 | |||
296 | /* Create tables */ | ||
297 | STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n" | ||
298 | " id BIGINT UNSIGNED AUTO_INCREMENT,\n" | ||
299 | " pub_key BLOB(32),\n" | ||
300 | " max_state_message_id BIGINT UNSIGNED,\n" | ||
301 | " state_hash_message_id BIGINT UNSIGNED,\n" | ||
302 | " PRIMARY KEY(id),\n" | ||
303 | " UNIQUE KEY(pub_key(32))\n" | ||
304 | ");"); | ||
305 | |||
306 | STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n" | ||
307 | " id BIGINT UNSIGNED AUTO_INCREMENT,\n" | ||
308 | " pub_key BLOB(32),\n" | ||
309 | " PRIMARY KEY(id),\n" | ||
310 | " UNIQUE KEY(pub_key(32))\n" | ||
311 | ");"); | ||
312 | |||
313 | STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n" | ||
314 | " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" | ||
315 | " slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n" | ||
316 | " did_join TINYINT NOT NULL,\n" | ||
317 | " announced_at BIGINT UNSIGNED NOT NULL,\n" | ||
318 | " effective_since BIGINT UNSIGNED NOT NULL,\n" | ||
319 | " group_generation BIGINT UNSIGNED NOT NULL\n" | ||
320 | ");"); | ||
321 | |||
322 | /*** FIX because IF NOT EXISTS doesn't work ***/ | ||
323 | GNUNET_MYSQL_statement_run (plugin->mc, | ||
324 | "CREATE INDEX idx_membership_channel_id_slave_id " | ||
325 | "ON membership (channel_id, slave_id);"); | ||
326 | |||
327 | /** @todo messages table: add method_name column */ | ||
328 | STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n" | ||
329 | " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" | ||
330 | " hop_counter BIGINT UNSIGNED NOT NULL,\n" | ||
331 | " signature BLOB,\n" | ||
332 | " purpose BLOB,\n" | ||
333 | " fragment_id BIGINT UNSIGNED NOT NULL,\n" | ||
334 | " fragment_offset BIGINT UNSIGNED NOT NULL,\n" | ||
335 | " message_id BIGINT UNSIGNED NOT NULL,\n" | ||
336 | " group_generation BIGINT UNSIGNED NOT NULL,\n" | ||
337 | " multicast_flags BIGINT UNSIGNED NOT NULL,\n" | ||
338 | " psycstore_flags BIGINT UNSIGNED NOT NULL,\n" | ||
339 | " data BLOB,\n" | ||
340 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
341 | " UNIQUE KEY(channel_id, message_id, fragment_offset)\n" | ||
342 | ");"); | ||
343 | |||
344 | STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n" | ||
345 | " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" | ||
346 | " name TEXT NOT NULL,\n" | ||
347 | " value_current BLOB,\n" | ||
348 | " value_signed BLOB\n" | ||
349 | //" PRIMARY KEY (channel_id, name(255))\n" | ||
350 | ");"); | ||
351 | |||
352 | STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
353 | " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" | ||
354 | " name TEXT NOT NULL,\n" | ||
355 | " value BLOB\n" | ||
356 | //" PRIMARY KEY (channel_id, name(255))\n" | ||
357 | ");"); | ||
358 | #undef STMT_RUN | ||
359 | |||
360 | /* Prepare statements */ | ||
361 | #define PREP(stmt,handle) \ | ||
362 | if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \ | ||
363 | { \ | ||
364 | GNUNET_break (0); \ | ||
365 | return GNUNET_SYSERR; \ | ||
366 | } | ||
367 | PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);", | ||
368 | &plugin->insert_channel_key); | ||
369 | PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);", | ||
370 | &plugin->insert_slave_key); | ||
371 | PREP ("INSERT INTO membership\n" | ||
372 | " (channel_id, slave_id, did_join, announced_at,\n" | ||
373 | " effective_since, group_generation)\n" | ||
374 | "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" | ||
375 | " (SELECT id FROM slaves WHERE pub_key = ?),\n" | ||
376 | " ?, ?, ?, ?);", | ||
377 | &plugin->insert_membership); | ||
378 | PREP ("SELECT did_join FROM membership\n" | ||
379 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
380 | " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n" | ||
381 | " AND effective_since <= ? AND did_join = 1\n" | ||
382 | "ORDER BY announced_at DESC LIMIT 1;", | ||
383 | &plugin->select_membership); | ||
384 | |||
385 | PREP ("INSERT IGNORE INTO messages\n" | ||
386 | " (channel_id, hop_counter, signature, purpose,\n" | ||
387 | " fragment_id, fragment_offset, message_id,\n" | ||
388 | " group_generation, multicast_flags, psycstore_flags, data)\n" | ||
389 | "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" | ||
390 | " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", | ||
391 | &plugin->insert_fragment); | ||
392 | |||
393 | PREP ("UPDATE messages\n" | ||
394 | "SET psycstore_flags = psycstore_flags | ?\n" | ||
395 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
396 | " AND message_id = ? AND fragment_offset = 0;", | ||
397 | &plugin->update_message_flags); | ||
398 | |||
399 | PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
400 | " fragment_offset, message_id, group_generation,\n" | ||
401 | " multicast_flags, psycstore_flags, data\n" | ||
402 | "FROM messages\n" | ||
403 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
404 | " AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;", | ||
405 | &plugin->select_fragments); | ||
406 | |||
407 | /** @todo select_messages: add method_prefix filter */ | ||
408 | PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
409 | " fragment_offset, message_id, group_generation,\n" | ||
410 | " multicast_flags, psycstore_flags, data\n" | ||
411 | "FROM messages\n" | ||
412 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
413 | " AND ? <= message_id AND message_id <= ?\n" | ||
414 | "LIMIT ?;", | ||
415 | &plugin->select_messages); | ||
416 | |||
417 | PREP ("SELECT * FROM\n" | ||
418 | "(SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
419 | " fragment_offset, message_id, group_generation,\n" | ||
420 | " multicast_flags, psycstore_flags, data\n" | ||
421 | " FROM messages\n" | ||
422 | " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
423 | " ORDER BY fragment_id DESC\n" | ||
424 | " LIMIT ?)\n" | ||
425 | "ORDER BY fragment_id;", | ||
426 | &plugin->select_latest_fragments); | ||
427 | |||
428 | /** @todo select_latest_messages: add method_prefix filter */ | ||
429 | PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
430 | " fragment_offset, message_id, group_generation,\n" | ||
431 | " multicast_flags, psycstore_flags, data\n" | ||
432 | "FROM messages\n" | ||
433 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
434 | " AND message_id IN\n" | ||
435 | " (SELECT message_id\n" | ||
436 | " FROM messages\n" | ||
437 | " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
438 | " GROUP BY message_id\n" | ||
439 | " ORDER BY message_id\n" | ||
440 | " DESC LIMIT ?)\n" | ||
441 | "ORDER BY fragment_id;", | ||
442 | &plugin->select_latest_messages); | ||
443 | |||
444 | PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
445 | " fragment_offset, message_id, group_generation,\n" | ||
446 | " multicast_flags, psycstore_flags, data\n" | ||
447 | "FROM messages\n" | ||
448 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
449 | " AND message_id = ? AND fragment_offset = ?;", | ||
450 | &plugin->select_message_fragment); | ||
451 | |||
452 | PREP ("SELECT fragment_id, message_id, group_generation\n" | ||
453 | "FROM messages\n" | ||
454 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
455 | "ORDER BY fragment_id DESC LIMIT 1;", | ||
456 | &plugin->select_counters_message); | ||
457 | |||
458 | PREP ("SELECT max_state_message_id\n" | ||
459 | "FROM channels\n" | ||
460 | "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;", | ||
461 | &plugin->select_counters_state); | ||
462 | |||
463 | PREP ("UPDATE channels\n" | ||
464 | "SET max_state_message_id = ?\n" | ||
465 | "WHERE pub_key = ?;", | ||
466 | &plugin->update_max_state_message_id); | ||
467 | |||
468 | PREP ("UPDATE channels\n" | ||
469 | "SET state_hash_message_id = ?\n" | ||
470 | "WHERE pub_key = ?;", | ||
471 | &plugin->update_state_hash_message_id); | ||
472 | |||
473 | PREP ("REPLACE INTO state\n" | ||
474 | " (channel_id, name, value_current, value_signed)\n" | ||
475 | "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n" | ||
476 | "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n" | ||
477 | " (SELECT ?) AS name,\n" | ||
478 | " (SELECT ?) AS value_current\n" | ||
479 | " ) AS new\n" | ||
480 | "LEFT JOIN (SELECT channel_id, name, value_signed\n" | ||
481 | " FROM state) AS old\n" | ||
482 | "ON new.channel_id = old.channel_id AND new.name = old.name;", | ||
483 | &plugin->insert_state_current); | ||
484 | |||
485 | PREP ("DELETE FROM state\n" | ||
486 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
487 | " AND (value_current IS NULL OR length(value_current) = 0)\n" | ||
488 | " AND (value_signed IS NULL OR length(value_signed) = 0);", | ||
489 | &plugin->delete_state_empty); | ||
490 | |||
491 | PREP ("UPDATE state\n" | ||
492 | "SET value_signed = value_current\n" | ||
493 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", | ||
494 | &plugin->update_state_signed); | ||
495 | |||
496 | PREP ("DELETE FROM state\n" | ||
497 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", | ||
498 | &plugin->delete_state); | ||
499 | |||
500 | PREP ("INSERT INTO state_sync (channel_id, name, value)\n" | ||
501 | "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);", | ||
502 | &plugin->insert_state_sync); | ||
503 | |||
504 | PREP ("INSERT INTO state\n" | ||
505 | " (channel_id, name, value_current, value_signed)\n" | ||
506 | "SELECT channel_id, name, value, value\n" | ||
507 | "FROM state_sync\n" | ||
508 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", | ||
509 | &plugin->insert_state_from_sync); | ||
510 | |||
511 | PREP ("DELETE FROM state_sync\n" | ||
512 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", | ||
513 | &plugin->delete_state_sync); | ||
514 | |||
515 | PREP ("SELECT value_current\n" | ||
516 | "FROM state\n" | ||
517 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
518 | " AND name = ?;", | ||
519 | &plugin->select_state_one); | ||
520 | |||
521 | PREP ("SELECT name, value_current\n" | ||
522 | "FROM state\n" | ||
523 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | ||
524 | " AND (name = ? OR substr(name, 1, ?) = ?);", | ||
525 | &plugin->select_state_prefix); | ||
526 | |||
527 | PREP ("SELECT name, value_signed\n" | ||
528 | "FROM state\n" | ||
529 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)" | ||
530 | " AND value_signed IS NOT NULL;", | ||
531 | &plugin->select_state_signed); | ||
532 | #undef PREP | ||
533 | |||
534 | return GNUNET_OK; | ||
535 | } | ||
536 | |||
537 | |||
538 | /** | ||
539 | * Shutdown database connection and associate data | ||
540 | * structures. | ||
541 | * @param plugin the plugin context (state for this module) | ||
542 | */ | ||
543 | static void | ||
544 | database_shutdown (struct Plugin *plugin) | ||
545 | { | ||
546 | GNUNET_MYSQL_context_destroy (plugin->mc); | ||
547 | } | ||
548 | |||
549 | |||
550 | /** | ||
551 | * Execute a prepared statement with a @a channel_key argument. | ||
552 | * | ||
553 | * @param plugin Plugin handle. | ||
554 | * @param stmt Statement to execute. | ||
555 | * @param channel_key Public key of the channel. | ||
556 | * | ||
557 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
558 | */ | ||
559 | static int | ||
560 | exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, | ||
561 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
562 | { | ||
563 | struct GNUNET_MY_QueryParam params[] = { | ||
564 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
565 | GNUNET_MY_query_param_end | ||
566 | }; | ||
567 | |||
568 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) | ||
569 | { | ||
570 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
571 | "mysql exec_channel", stmt); | ||
572 | } | ||
573 | |||
574 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
575 | { | ||
576 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
577 | "mysql_stmt_reset", stmt); | ||
578 | return GNUNET_SYSERR; | ||
579 | } | ||
580 | |||
581 | return GNUNET_OK; | ||
582 | } | ||
583 | |||
584 | |||
585 | /** | ||
586 | * Begin a transaction. | ||
587 | */ | ||
588 | static int | ||
589 | transaction_begin (struct Plugin *plugin, enum Transactions transaction) | ||
590 | { | ||
591 | if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN")) | ||
592 | { | ||
593 | LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed"); | ||
594 | return GNUNET_SYSERR; | ||
595 | } | ||
596 | |||
597 | plugin->transaction = transaction; | ||
598 | return GNUNET_OK; | ||
599 | } | ||
600 | |||
601 | |||
602 | /** | ||
603 | * Commit current transaction. | ||
604 | */ | ||
605 | static int | ||
606 | transaction_commit (struct Plugin *plugin) | ||
607 | { | ||
608 | if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT")) | ||
609 | { | ||
610 | LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed"); | ||
611 | return GNUNET_SYSERR; | ||
612 | } | ||
613 | |||
614 | plugin->transaction = TRANSACTION_NONE; | ||
615 | return GNUNET_OK; | ||
616 | } | ||
617 | |||
618 | |||
619 | /** | ||
620 | * Roll back current transaction. | ||
621 | */ | ||
622 | static int | ||
623 | transaction_rollback (struct Plugin *plugin) | ||
624 | { | ||
625 | if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK")) | ||
626 | { | ||
627 | LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed"); | ||
628 | return GNUNET_SYSERR; | ||
629 | } | ||
630 | |||
631 | plugin->transaction = TRANSACTION_NONE; | ||
632 | return GNUNET_OK; | ||
633 | } | ||
634 | |||
635 | |||
636 | static int | ||
637 | channel_key_store (struct Plugin *plugin, | ||
638 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
639 | { | ||
640 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key; | ||
641 | |||
642 | struct GNUNET_MY_QueryParam params[] = { | ||
643 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
644 | GNUNET_MY_query_param_end | ||
645 | }; | ||
646 | |||
647 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) | ||
648 | { | ||
649 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
650 | "mysql exec_prepared", stmt); | ||
651 | return GNUNET_SYSERR; | ||
652 | } | ||
653 | |||
654 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
655 | { | ||
656 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
657 | "mysql_stmt_reset", stmt); | ||
658 | return GNUNET_SYSERR; | ||
659 | } | ||
660 | |||
661 | return GNUNET_OK; | ||
662 | } | ||
663 | |||
664 | |||
665 | static int | ||
666 | slave_key_store (struct Plugin *plugin, | ||
667 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) | ||
668 | { | ||
669 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key; | ||
670 | |||
671 | struct GNUNET_MY_QueryParam params[] = { | ||
672 | GNUNET_MY_query_param_auto_from_type (slave_key), | ||
673 | GNUNET_MY_query_param_end | ||
674 | }; | ||
675 | |||
676 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) | ||
677 | { | ||
678 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
679 | "mysql exec_prepared", stmt); | ||
680 | return GNUNET_SYSERR; | ||
681 | } | ||
682 | |||
683 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
684 | { | ||
685 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
686 | "mysql_stmt_reset", stmt); | ||
687 | return GNUNET_SYSERR; | ||
688 | } | ||
689 | |||
690 | return GNUNET_OK; | ||
691 | } | ||
692 | |||
693 | |||
694 | /** | ||
695 | * Store join/leave events for a PSYC channel in order to be able to answer | ||
696 | * membership test queries later. | ||
697 | * | ||
698 | * @see GNUNET_PSYCSTORE_membership_store() | ||
699 | * | ||
700 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
701 | */ | ||
702 | static int | ||
703 | mysql_membership_store (void *cls, | ||
704 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
705 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
706 | int did_join, | ||
707 | uint64_t announced_at, | ||
708 | uint64_t effective_since, | ||
709 | uint64_t group_generation) | ||
710 | { | ||
711 | struct Plugin *plugin = cls; | ||
712 | |||
713 | uint32_t idid_join = (uint32_t)did_join; | ||
714 | |||
715 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership; | ||
716 | |||
717 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); | ||
718 | |||
719 | if (announced_at > INT64_MAX || | ||
720 | effective_since > INT64_MAX || | ||
721 | group_generation > INT64_MAX) | ||
722 | { | ||
723 | GNUNET_break (0); | ||
724 | return GNUNET_SYSERR; | ||
725 | } | ||
726 | |||
727 | if (GNUNET_OK != channel_key_store (plugin, channel_key) | ||
728 | || GNUNET_OK != slave_key_store (plugin, slave_key)) | ||
729 | return GNUNET_SYSERR; | ||
730 | |||
731 | struct GNUNET_MY_QueryParam params[] = { | ||
732 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
733 | GNUNET_MY_query_param_auto_from_type (slave_key), | ||
734 | GNUNET_MY_query_param_uint32 (&idid_join), | ||
735 | GNUNET_MY_query_param_uint64 (&announced_at), | ||
736 | GNUNET_MY_query_param_uint64 (&effective_since), | ||
737 | GNUNET_MY_query_param_uint64 (&group_generation), | ||
738 | GNUNET_MY_query_param_end | ||
739 | }; | ||
740 | |||
741 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) | ||
742 | { | ||
743 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
744 | "mysql exec_prepared", stmt); | ||
745 | return GNUNET_SYSERR; | ||
746 | } | ||
747 | |||
748 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
749 | { | ||
750 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
751 | "mysql_stmt_reset", stmt); | ||
752 | return GNUNET_SYSERR; | ||
753 | } | ||
754 | return GNUNET_OK; | ||
755 | } | ||
756 | |||
757 | /** | ||
758 | * Test if a member was admitted to the channel at the given message ID. | ||
759 | * | ||
760 | * @see GNUNET_PSYCSTORE_membership_test() | ||
761 | * | ||
762 | * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not, | ||
763 | * #GNUNET_SYSERR if there was en error. | ||
764 | */ | ||
765 | static int | ||
766 | membership_test (void *cls, | ||
767 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
768 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
769 | uint64_t message_id) | ||
770 | { | ||
771 | struct Plugin *plugin = cls; | ||
772 | |||
773 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership; | ||
774 | |||
775 | uint32_t did_join = 0; | ||
776 | |||
777 | int ret = GNUNET_SYSERR; | ||
778 | |||
779 | struct GNUNET_MY_QueryParam params_select[] = { | ||
780 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
781 | GNUNET_MY_query_param_auto_from_type (slave_key), | ||
782 | GNUNET_MY_query_param_uint64 (&message_id), | ||
783 | GNUNET_MY_query_param_end | ||
784 | }; | ||
785 | |||
786 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
787 | { | ||
788 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
789 | "mysql execute prepared", stmt); | ||
790 | return GNUNET_SYSERR; | ||
791 | } | ||
792 | |||
793 | struct GNUNET_MY_ResultSpec results_select[] = { | ||
794 | GNUNET_MY_result_spec_uint32 (&did_join), | ||
795 | GNUNET_MY_result_spec_end | ||
796 | }; | ||
797 | |||
798 | switch (GNUNET_MY_extract_result (stmt, results_select)) | ||
799 | { | ||
800 | case GNUNET_NO: | ||
801 | ret = GNUNET_NO; | ||
802 | break; | ||
803 | case GNUNET_OK: | ||
804 | ret = GNUNET_YES; | ||
805 | break; | ||
806 | default: | ||
807 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
808 | "mysql extract_result", stmt); | ||
809 | return GNUNET_SYSERR; | ||
810 | } | ||
811 | |||
812 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
813 | { | ||
814 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
815 | "mysql_stmt_reset", stmt); | ||
816 | return GNUNET_SYSERR; | ||
817 | } | ||
818 | |||
819 | return ret; | ||
820 | } | ||
821 | |||
822 | /** | ||
823 | * Store a message fragment sent to a channel. | ||
824 | * | ||
825 | * @see GNUNET_PSYCSTORE_fragment_store() | ||
826 | * | ||
827 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
828 | */ | ||
829 | static int | ||
830 | fragment_store (void *cls, | ||
831 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
832 | const struct GNUNET_MULTICAST_MessageHeader *msg, | ||
833 | uint32_t psycstore_flags) | ||
834 | { | ||
835 | struct Plugin *plugin = cls; | ||
836 | |||
837 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment; | ||
838 | |||
839 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); | ||
840 | |||
841 | uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id); | ||
842 | |||
843 | uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset); | ||
844 | uint64_t message_id = GNUNET_ntohll (msg->message_id); | ||
845 | uint64_t group_generation = GNUNET_ntohll (msg->group_generation); | ||
846 | |||
847 | uint64_t hop_counter = ntohl(msg->hop_counter); | ||
848 | uint64_t flags = ntohl(msg->flags); | ||
849 | |||
850 | if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || | ||
851 | message_id > INT64_MAX || group_generation > INT64_MAX) | ||
852 | { | ||
853 | LOG(GNUNET_ERROR_TYPE_ERROR, | ||
854 | "Tried to store fragment with a field > INT64_MAX: " | ||
855 | "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset, | ||
856 | message_id, group_generation); | ||
857 | GNUNET_break (0); | ||
858 | return GNUNET_SYSERR; | ||
859 | } | ||
860 | |||
861 | if (GNUNET_OK != channel_key_store (plugin, channel_key)) | ||
862 | return GNUNET_SYSERR; | ||
863 | |||
864 | struct GNUNET_MY_QueryParam params_insert[] = { | ||
865 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
866 | GNUNET_MY_query_param_uint64 (&hop_counter), | ||
867 | GNUNET_MY_query_param_auto_from_type (&msg->signature), | ||
868 | GNUNET_MY_query_param_auto_from_type (&msg->purpose), | ||
869 | GNUNET_MY_query_param_uint64 (&fragment_id), | ||
870 | GNUNET_MY_query_param_uint64 (&fragment_offset), | ||
871 | GNUNET_MY_query_param_uint64 (&message_id), | ||
872 | GNUNET_MY_query_param_uint64 (&group_generation), | ||
873 | GNUNET_MY_query_param_uint64 (&flags), | ||
874 | GNUNET_MY_query_param_uint32 (&psycstore_flags), | ||
875 | GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size) | ||
876 | - sizeof (*msg)), | ||
877 | GNUNET_MY_query_param_end | ||
878 | }; | ||
879 | |||
880 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert)) | ||
881 | { | ||
882 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
883 | "mysql execute prepared", stmt); | ||
884 | return GNUNET_SYSERR; | ||
885 | } | ||
886 | |||
887 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
888 | { | ||
889 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
890 | "mysql_stmt_reset", stmt); | ||
891 | return GNUNET_SYSERR; | ||
892 | } | ||
893 | |||
894 | return GNUNET_OK; | ||
895 | } | ||
896 | |||
897 | /** | ||
898 | * Set additional flags for a given message. | ||
899 | * | ||
900 | * They are OR'd with any existing flags set. | ||
901 | * | ||
902 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
903 | */ | ||
904 | static int | ||
905 | message_add_flags (void *cls, | ||
906 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
907 | uint64_t message_id, | ||
908 | uint32_t psycstore_flags) | ||
909 | { | ||
910 | struct Plugin *plugin = cls; | ||
911 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags; | ||
912 | |||
913 | int sql_ret; | ||
914 | int ret = GNUNET_SYSERR; | ||
915 | |||
916 | struct GNUNET_MY_QueryParam params_update[] = { | ||
917 | GNUNET_MY_query_param_uint32 (&psycstore_flags), | ||
918 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
919 | GNUNET_MY_query_param_uint64 (&message_id), | ||
920 | GNUNET_MY_query_param_end | ||
921 | }; | ||
922 | |||
923 | sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update); | ||
924 | switch (sql_ret) | ||
925 | { | ||
926 | case GNUNET_OK: | ||
927 | ret = GNUNET_OK; | ||
928 | break; | ||
929 | |||
930 | default: | ||
931 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
932 | "mysql execute prepared", stmt); | ||
933 | } | ||
934 | |||
935 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
936 | { | ||
937 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
938 | "mysql_stmt_reset", stmt); | ||
939 | return GNUNET_SYSERR; | ||
940 | } | ||
941 | |||
942 | return ret; | ||
943 | } | ||
944 | |||
945 | |||
946 | static int | ||
947 | fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt, | ||
948 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
949 | void *cb_cls, | ||
950 | uint64_t *returned_fragments) | ||
951 | { | ||
952 | |||
953 | uint32_t hop_counter; | ||
954 | void *signature = NULL; | ||
955 | void *purpose = NULL; | ||
956 | size_t signature_size; | ||
957 | size_t purpose_size; | ||
958 | uint64_t fragment_id; | ||
959 | uint64_t fragment_offset; | ||
960 | uint64_t message_id; | ||
961 | uint64_t group_generation; | ||
962 | uint64_t flags; | ||
963 | void *buf; | ||
964 | size_t buf_size; | ||
965 | int ret = GNUNET_SYSERR; | ||
966 | int sql_ret; | ||
967 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
968 | uint64_t msg_flags; | ||
969 | struct GNUNET_MY_ResultSpec results[] = { | ||
970 | GNUNET_MY_result_spec_uint32 (&hop_counter), | ||
971 | GNUNET_MY_result_spec_variable_size (&signature, &signature_size), | ||
972 | GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size), | ||
973 | GNUNET_MY_result_spec_uint64 (&fragment_id), | ||
974 | GNUNET_MY_result_spec_uint64 (&fragment_offset), | ||
975 | GNUNET_MY_result_spec_uint64 (&message_id), | ||
976 | GNUNET_MY_result_spec_uint64 (&group_generation), | ||
977 | GNUNET_MY_result_spec_uint64 (&msg_flags), | ||
978 | GNUNET_MY_result_spec_uint64 (&flags), | ||
979 | GNUNET_MY_result_spec_variable_size (&buf, | ||
980 | &buf_size), | ||
981 | GNUNET_MY_result_spec_end | ||
982 | }; | ||
983 | |||
984 | do | ||
985 | { | ||
986 | sql_ret = GNUNET_MY_extract_result (stmt, results); | ||
987 | switch (sql_ret) | ||
988 | { | ||
989 | case GNUNET_NO: | ||
990 | if (ret != GNUNET_YES) | ||
991 | ret = GNUNET_NO; | ||
992 | break; | ||
993 | |||
994 | case GNUNET_YES: | ||
995 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); | ||
996 | |||
997 | mp->header.size = htons (sizeof (*mp) + buf_size); | ||
998 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | ||
999 | mp->hop_counter = htonl (hop_counter); | ||
1000 | GNUNET_memcpy (&mp->signature, | ||
1001 | signature, | ||
1002 | signature_size); | ||
1003 | GNUNET_memcpy (&mp->purpose, | ||
1004 | purpose, | ||
1005 | purpose_size); | ||
1006 | mp->fragment_id = GNUNET_htonll (fragment_id); | ||
1007 | mp->fragment_offset = GNUNET_htonll (fragment_offset); | ||
1008 | mp->message_id = GNUNET_htonll (message_id); | ||
1009 | mp->group_generation = GNUNET_htonll (group_generation); | ||
1010 | mp->flags = htonl(msg_flags); | ||
1011 | |||
1012 | GNUNET_memcpy (&mp[1], | ||
1013 | buf, | ||
1014 | buf_size); | ||
1015 | ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); | ||
1016 | if (NULL != returned_fragments) | ||
1017 | (*returned_fragments)++; | ||
1018 | GNUNET_MY_cleanup_result (results); | ||
1019 | break; | ||
1020 | |||
1021 | default: | ||
1022 | LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1023 | "mysql extract_result", stmt); | ||
1024 | } | ||
1025 | } | ||
1026 | while (GNUNET_YES == sql_ret); | ||
1027 | |||
1028 | // for debugging | ||
1029 | if (GNUNET_NO == ret) | ||
1030 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, | ||
1031 | "Empty result set\n"); | ||
1032 | |||
1033 | return ret; | ||
1034 | } | ||
1035 | |||
1036 | |||
1037 | static int | ||
1038 | fragment_select (struct Plugin *plugin, | ||
1039 | struct GNUNET_MYSQL_StatementHandle *stmt, | ||
1040 | struct GNUNET_MY_QueryParam *params, | ||
1041 | uint64_t *returned_fragments, | ||
1042 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1043 | void *cb_cls) | ||
1044 | { | ||
1045 | int ret = GNUNET_SYSERR; | ||
1046 | int sql_ret; | ||
1047 | |||
1048 | sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params); | ||
1049 | switch (sql_ret) | ||
1050 | { | ||
1051 | case GNUNET_NO: | ||
1052 | if (ret != GNUNET_YES) | ||
1053 | ret = GNUNET_NO; | ||
1054 | break; | ||
1055 | |||
1056 | case GNUNET_YES: | ||
1057 | ret = fragment_row (stmt, cb, cb_cls, returned_fragments); | ||
1058 | break; | ||
1059 | |||
1060 | default: | ||
1061 | LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1062 | "mysql exec_prepared", stmt); | ||
1063 | } | ||
1064 | return ret; | ||
1065 | } | ||
1066 | |||
1067 | |||
1068 | /** | ||
1069 | * Retrieve a message fragment range by fragment ID. | ||
1070 | * | ||
1071 | * @see GNUNET_PSYCSTORE_fragment_get() | ||
1072 | * | ||
1073 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1074 | */ | ||
1075 | static int | ||
1076 | fragment_get (void *cls, | ||
1077 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1078 | uint64_t first_fragment_id, | ||
1079 | uint64_t last_fragment_id, | ||
1080 | uint64_t *returned_fragments, | ||
1081 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1082 | void *cb_cls) | ||
1083 | { | ||
1084 | struct Plugin *plugin = cls; | ||
1085 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments; | ||
1086 | int ret = GNUNET_SYSERR; | ||
1087 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1088 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1089 | GNUNET_MY_query_param_uint64 (&first_fragment_id), | ||
1090 | GNUNET_MY_query_param_uint64 (&last_fragment_id), | ||
1091 | GNUNET_MY_query_param_end | ||
1092 | }; | ||
1093 | |||
1094 | *returned_fragments = 0; | ||
1095 | ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); | ||
1096 | |||
1097 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1098 | { | ||
1099 | LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1100 | "mysql_stmt_reset", stmt); | ||
1101 | return GNUNET_SYSERR; | ||
1102 | } | ||
1103 | |||
1104 | return ret; | ||
1105 | } | ||
1106 | |||
1107 | |||
1108 | /** | ||
1109 | * Retrieve a message fragment range by fragment ID. | ||
1110 | * | ||
1111 | * @see GNUNET_PSYCSTORE_fragment_get_latest() | ||
1112 | * | ||
1113 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1114 | */ | ||
1115 | static int | ||
1116 | fragment_get_latest (void *cls, | ||
1117 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1118 | uint64_t fragment_limit, | ||
1119 | uint64_t *returned_fragments, | ||
1120 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1121 | void *cb_cls) | ||
1122 | { | ||
1123 | struct Plugin *plugin = cls; | ||
1124 | |||
1125 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments; | ||
1126 | |||
1127 | int ret = GNUNET_SYSERR; | ||
1128 | *returned_fragments = 0; | ||
1129 | |||
1130 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1131 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1132 | GNUNET_MY_query_param_uint64 (&fragment_limit), | ||
1133 | GNUNET_MY_query_param_end | ||
1134 | }; | ||
1135 | |||
1136 | ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); | ||
1137 | |||
1138 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1139 | { | ||
1140 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1141 | "mysql_stmt_reset", stmt); | ||
1142 | return GNUNET_SYSERR; | ||
1143 | } | ||
1144 | |||
1145 | return ret; | ||
1146 | } | ||
1147 | |||
1148 | |||
1149 | /** | ||
1150 | * Retrieve all fragments of a message ID range. | ||
1151 | * | ||
1152 | * @see GNUNET_PSYCSTORE_message_get() | ||
1153 | * | ||
1154 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1155 | */ | ||
1156 | static int | ||
1157 | message_get (void *cls, | ||
1158 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1159 | uint64_t first_message_id, | ||
1160 | uint64_t last_message_id, | ||
1161 | uint64_t fragment_limit, | ||
1162 | uint64_t *returned_fragments, | ||
1163 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1164 | void *cb_cls) | ||
1165 | { | ||
1166 | struct Plugin *plugin = cls; | ||
1167 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages; | ||
1168 | int ret; | ||
1169 | |||
1170 | if (0 == fragment_limit) | ||
1171 | fragment_limit = UINT64_MAX; | ||
1172 | |||
1173 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1174 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1175 | GNUNET_MY_query_param_uint64 (&first_message_id), | ||
1176 | GNUNET_MY_query_param_uint64 (&last_message_id), | ||
1177 | GNUNET_MY_query_param_uint64 (&fragment_limit), | ||
1178 | GNUNET_MY_query_param_end | ||
1179 | }; | ||
1180 | |||
1181 | *returned_fragments = 0; | ||
1182 | ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); | ||
1183 | |||
1184 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1185 | { | ||
1186 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1187 | "mysql_stmt_reset", stmt); | ||
1188 | return GNUNET_SYSERR; | ||
1189 | } | ||
1190 | |||
1191 | return ret; | ||
1192 | } | ||
1193 | |||
1194 | |||
1195 | /** | ||
1196 | * Retrieve all fragments of the latest messages. | ||
1197 | * | ||
1198 | * @see GNUNET_PSYCSTORE_message_get_latest() | ||
1199 | * | ||
1200 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1201 | */ | ||
1202 | static int | ||
1203 | message_get_latest (void *cls, | ||
1204 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1205 | uint64_t message_limit, | ||
1206 | uint64_t *returned_fragments, | ||
1207 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1208 | void *cb_cls) | ||
1209 | { | ||
1210 | struct Plugin *plugin = cls; | ||
1211 | |||
1212 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages; | ||
1213 | |||
1214 | int ret = GNUNET_SYSERR; | ||
1215 | *returned_fragments = 0; | ||
1216 | |||
1217 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1218 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1219 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1220 | GNUNET_MY_query_param_uint64 (&message_limit), | ||
1221 | GNUNET_MY_query_param_end | ||
1222 | }; | ||
1223 | |||
1224 | ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); | ||
1225 | |||
1226 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1227 | { | ||
1228 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1229 | "mysql_stmt_reset", stmt); | ||
1230 | return GNUNET_SYSERR; | ||
1231 | } | ||
1232 | |||
1233 | return ret; | ||
1234 | } | ||
1235 | |||
1236 | |||
1237 | /** | ||
1238 | * Retrieve a fragment of message specified by its message ID and fragment | ||
1239 | * offset. | ||
1240 | * | ||
1241 | * @see GNUNET_PSYCSTORE_message_get_fragment() | ||
1242 | * | ||
1243 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1244 | */ | ||
1245 | static int | ||
1246 | message_get_fragment (void *cls, | ||
1247 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1248 | uint64_t message_id, | ||
1249 | uint64_t fragment_offset, | ||
1250 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1251 | void *cb_cls) | ||
1252 | { | ||
1253 | struct Plugin *plugin = cls; | ||
1254 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment; | ||
1255 | int sql_ret; | ||
1256 | int ret = GNUNET_SYSERR; | ||
1257 | |||
1258 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1259 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1260 | GNUNET_MY_query_param_uint64 (&message_id), | ||
1261 | GNUNET_MY_query_param_uint64 (&fragment_offset), | ||
1262 | GNUNET_MY_query_param_end | ||
1263 | }; | ||
1264 | |||
1265 | sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select); | ||
1266 | switch (sql_ret) | ||
1267 | { | ||
1268 | case GNUNET_NO: | ||
1269 | ret = GNUNET_NO; | ||
1270 | break; | ||
1271 | |||
1272 | case GNUNET_OK: | ||
1273 | ret = fragment_row (stmt, cb, cb_cls, NULL); | ||
1274 | break; | ||
1275 | |||
1276 | default: | ||
1277 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1278 | "mysql execute prepared", stmt); | ||
1279 | } | ||
1280 | |||
1281 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1282 | { | ||
1283 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1284 | "mysql_stmt_reset", stmt); | ||
1285 | return GNUNET_SYSERR; | ||
1286 | } | ||
1287 | |||
1288 | return ret; | ||
1289 | } | ||
1290 | |||
1291 | /** | ||
1292 | * Retrieve the max. values of message counters for a channel. | ||
1293 | * | ||
1294 | * @see GNUNET_PSYCSTORE_counters_get() | ||
1295 | * | ||
1296 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1297 | */ | ||
1298 | static int | ||
1299 | counters_message_get (void *cls, | ||
1300 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1301 | uint64_t *max_fragment_id, | ||
1302 | uint64_t *max_message_id, | ||
1303 | uint64_t *max_group_generation) | ||
1304 | { | ||
1305 | struct Plugin *plugin = cls; | ||
1306 | |||
1307 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message; | ||
1308 | |||
1309 | int ret = GNUNET_SYSERR; | ||
1310 | |||
1311 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1312 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1313 | GNUNET_MY_query_param_end | ||
1314 | }; | ||
1315 | |||
1316 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
1317 | { | ||
1318 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1319 | "mysql execute prepared", stmt); | ||
1320 | return GNUNET_SYSERR; | ||
1321 | } | ||
1322 | |||
1323 | struct GNUNET_MY_ResultSpec results_select[] = { | ||
1324 | GNUNET_MY_result_spec_uint64 (max_fragment_id), | ||
1325 | GNUNET_MY_result_spec_uint64 (max_message_id), | ||
1326 | GNUNET_MY_result_spec_uint64 (max_group_generation), | ||
1327 | GNUNET_MY_result_spec_end | ||
1328 | }; | ||
1329 | |||
1330 | ret = GNUNET_MY_extract_result (stmt, results_select); | ||
1331 | |||
1332 | if (GNUNET_OK != ret) | ||
1333 | { | ||
1334 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1335 | "mysql extract_result", stmt); | ||
1336 | return GNUNET_SYSERR; | ||
1337 | } | ||
1338 | |||
1339 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1340 | { | ||
1341 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1342 | "mysql_stmt_reset", stmt); | ||
1343 | return GNUNET_SYSERR; | ||
1344 | } | ||
1345 | |||
1346 | return ret; | ||
1347 | } | ||
1348 | |||
1349 | /** | ||
1350 | * Retrieve the max. values of state counters for a channel. | ||
1351 | * | ||
1352 | * @see GNUNET_PSYCSTORE_counters_get() | ||
1353 | * | ||
1354 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1355 | */ | ||
1356 | static int | ||
1357 | counters_state_get (void *cls, | ||
1358 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1359 | uint64_t *max_state_message_id) | ||
1360 | { | ||
1361 | struct Plugin *plugin = cls; | ||
1362 | |||
1363 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state; | ||
1364 | |||
1365 | int ret = GNUNET_SYSERR; | ||
1366 | |||
1367 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1368 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1369 | GNUNET_MY_query_param_end | ||
1370 | }; | ||
1371 | |||
1372 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
1373 | { | ||
1374 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1375 | "mysql execute prepared", stmt); | ||
1376 | return GNUNET_SYSERR; | ||
1377 | } | ||
1378 | |||
1379 | struct GNUNET_MY_ResultSpec results_select[] = { | ||
1380 | GNUNET_MY_result_spec_uint64 (max_state_message_id), | ||
1381 | GNUNET_MY_result_spec_end | ||
1382 | }; | ||
1383 | |||
1384 | ret = GNUNET_MY_extract_result (stmt, results_select); | ||
1385 | |||
1386 | if (GNUNET_OK != ret) | ||
1387 | { | ||
1388 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1389 | "mysql extract_result", stmt); | ||
1390 | return GNUNET_SYSERR; | ||
1391 | } | ||
1392 | |||
1393 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1394 | { | ||
1395 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1396 | "mysql_stmt_reset", stmt); | ||
1397 | return GNUNET_SYSERR; | ||
1398 | } | ||
1399 | |||
1400 | return ret; | ||
1401 | } | ||
1402 | |||
1403 | |||
1404 | /** | ||
1405 | * Assign a value to a state variable. | ||
1406 | * | ||
1407 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1408 | */ | ||
1409 | static int | ||
1410 | state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, | ||
1411 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1412 | const char *name, const void *value, size_t value_size) | ||
1413 | { | ||
1414 | int ret = GNUNET_SYSERR; | ||
1415 | |||
1416 | struct GNUNET_MY_QueryParam params[] = { | ||
1417 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1418 | GNUNET_MY_query_param_string (name), | ||
1419 | GNUNET_MY_query_param_fixed_size(value, value_size), | ||
1420 | GNUNET_MY_query_param_end | ||
1421 | }; | ||
1422 | |||
1423 | ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params); | ||
1424 | if (GNUNET_OK != ret) | ||
1425 | { | ||
1426 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1427 | "mysql exec_prepared", stmt); | ||
1428 | return GNUNET_SYSERR; | ||
1429 | } | ||
1430 | |||
1431 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1432 | { | ||
1433 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1434 | "mysql_stmt_reset", stmt); | ||
1435 | return GNUNET_SYSERR; | ||
1436 | } | ||
1437 | |||
1438 | return ret; | ||
1439 | } | ||
1440 | |||
1441 | |||
1442 | static int | ||
1443 | update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, | ||
1444 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1445 | uint64_t message_id) | ||
1446 | { | ||
1447 | struct GNUNET_MY_QueryParam params[] = { | ||
1448 | GNUNET_MY_query_param_uint64 (&message_id), | ||
1449 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1450 | GNUNET_MY_query_param_end | ||
1451 | }; | ||
1452 | |||
1453 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, | ||
1454 | stmt, | ||
1455 | params)) | ||
1456 | { | ||
1457 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1458 | "mysql execute prepared", stmt); | ||
1459 | return GNUNET_SYSERR; | ||
1460 | } | ||
1461 | |||
1462 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1463 | { | ||
1464 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1465 | "mysql_stmt_reset", stmt); | ||
1466 | return GNUNET_SYSERR; | ||
1467 | } | ||
1468 | |||
1469 | return GNUNET_OK; | ||
1470 | } | ||
1471 | |||
1472 | |||
1473 | /** | ||
1474 | * Begin modifying current state. | ||
1475 | */ | ||
1476 | static int | ||
1477 | state_modify_begin (void *cls, | ||
1478 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1479 | uint64_t message_id, uint64_t state_delta) | ||
1480 | { | ||
1481 | struct Plugin *plugin = cls; | ||
1482 | |||
1483 | if (state_delta > 0) | ||
1484 | { | ||
1485 | /** | ||
1486 | * We can only apply state modifiers in the current message if modifiers in | ||
1487 | * the previous stateful message (message_id - state_delta) were already | ||
1488 | * applied. | ||
1489 | */ | ||
1490 | |||
1491 | uint64_t max_state_message_id = 0; | ||
1492 | int ret = counters_state_get (plugin, channel_key, &max_state_message_id); | ||
1493 | switch (ret) | ||
1494 | { | ||
1495 | case GNUNET_OK: | ||
1496 | case GNUNET_NO: // no state yet | ||
1497 | ret = GNUNET_OK; | ||
1498 | break; | ||
1499 | default: | ||
1500 | return ret; | ||
1501 | } | ||
1502 | |||
1503 | if (max_state_message_id < message_id - state_delta) | ||
1504 | return GNUNET_NO; /* some stateful messages not yet applied */ | ||
1505 | else if (message_id - state_delta < max_state_message_id) | ||
1506 | return GNUNET_NO; /* changes already applied */ | ||
1507 | } | ||
1508 | |||
1509 | if (TRANSACTION_NONE != plugin->transaction) | ||
1510 | { | ||
1511 | /** @todo FIXME: wait for other transaction to finish */ | ||
1512 | return GNUNET_SYSERR; | ||
1513 | } | ||
1514 | return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); | ||
1515 | } | ||
1516 | |||
1517 | |||
1518 | /** | ||
1519 | * Set the current value of state variable. | ||
1520 | * | ||
1521 | * @see GNUNET_PSYCSTORE_state_modify() | ||
1522 | * | ||
1523 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1524 | */ | ||
1525 | static int | ||
1526 | state_modify_op (void *cls, | ||
1527 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1528 | enum GNUNET_PSYC_Operator op, | ||
1529 | const char *name, const void *value, size_t value_size) | ||
1530 | { | ||
1531 | struct Plugin *plugin = cls; | ||
1532 | GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); | ||
1533 | |||
1534 | switch (op) | ||
1535 | { | ||
1536 | case GNUNET_PSYC_OP_ASSIGN: | ||
1537 | return state_assign (plugin, plugin->insert_state_current, | ||
1538 | channel_key, name, value, value_size); | ||
1539 | |||
1540 | default: /** @todo implement more state operations */ | ||
1541 | GNUNET_break (0); | ||
1542 | return GNUNET_SYSERR; | ||
1543 | } | ||
1544 | } | ||
1545 | |||
1546 | |||
1547 | /** | ||
1548 | * End modifying current state. | ||
1549 | */ | ||
1550 | static int | ||
1551 | state_modify_end (void *cls, | ||
1552 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1553 | uint64_t message_id) | ||
1554 | { | ||
1555 | struct Plugin *plugin = cls; | ||
1556 | GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); | ||
1557 | |||
1558 | return | ||
1559 | GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key) | ||
1560 | && GNUNET_OK == update_message_id (plugin, | ||
1561 | plugin->update_max_state_message_id, | ||
1562 | channel_key, message_id) | ||
1563 | && GNUNET_OK == transaction_commit (plugin) | ||
1564 | ? GNUNET_OK : GNUNET_SYSERR; | ||
1565 | } | ||
1566 | |||
1567 | |||
1568 | /** | ||
1569 | * Begin state synchronization. | ||
1570 | */ | ||
1571 | static int | ||
1572 | state_sync_begin (void *cls, | ||
1573 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1574 | { | ||
1575 | struct Plugin *plugin = cls; | ||
1576 | return exec_channel (plugin, plugin->delete_state_sync, channel_key); | ||
1577 | } | ||
1578 | |||
1579 | |||
1580 | /** | ||
1581 | * Assign current value of a state variable. | ||
1582 | * | ||
1583 | * @see GNUNET_PSYCSTORE_state_modify() | ||
1584 | * | ||
1585 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1586 | */ | ||
1587 | static int | ||
1588 | state_sync_assign (void *cls, | ||
1589 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1590 | const char *name, const void *value, size_t value_size) | ||
1591 | { | ||
1592 | struct Plugin *plugin = cls; | ||
1593 | return state_assign (cls, plugin->insert_state_sync, | ||
1594 | channel_key, name, value, value_size); | ||
1595 | } | ||
1596 | |||
1597 | |||
1598 | /** | ||
1599 | * End modifying current state. | ||
1600 | */ | ||
1601 | static int | ||
1602 | state_sync_end (void *cls, | ||
1603 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1604 | uint64_t max_state_message_id, | ||
1605 | uint64_t state_hash_message_id) | ||
1606 | { | ||
1607 | struct Plugin *plugin = cls; | ||
1608 | int ret = GNUNET_SYSERR; | ||
1609 | |||
1610 | if (TRANSACTION_NONE != plugin->transaction) | ||
1611 | { | ||
1612 | /** @todo FIXME: wait for other transaction to finish */ | ||
1613 | return GNUNET_SYSERR; | ||
1614 | } | ||
1615 | |||
1616 | GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) | ||
1617 | && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) | ||
1618 | && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, | ||
1619 | channel_key) | ||
1620 | && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync, | ||
1621 | channel_key) | ||
1622 | && GNUNET_OK == update_message_id (plugin, | ||
1623 | plugin->update_state_hash_message_id, | ||
1624 | channel_key, state_hash_message_id) | ||
1625 | && GNUNET_OK == update_message_id (plugin, | ||
1626 | plugin->update_max_state_message_id, | ||
1627 | channel_key, max_state_message_id) | ||
1628 | && GNUNET_OK == transaction_commit (plugin) | ||
1629 | ? ret = GNUNET_OK | ||
1630 | : transaction_rollback (plugin); | ||
1631 | return ret; | ||
1632 | } | ||
1633 | |||
1634 | |||
1635 | /** | ||
1636 | * Delete the whole state. | ||
1637 | * | ||
1638 | * @see GNUNET_PSYCSTORE_state_reset() | ||
1639 | * | ||
1640 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1641 | */ | ||
1642 | static int | ||
1643 | state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1644 | { | ||
1645 | struct Plugin *plugin = cls; | ||
1646 | return exec_channel (plugin, plugin->delete_state, channel_key); | ||
1647 | } | ||
1648 | |||
1649 | |||
1650 | /** | ||
1651 | * Update signed values of state variables in the state store. | ||
1652 | * | ||
1653 | * @see GNUNET_PSYCSTORE_state_hash_update() | ||
1654 | * | ||
1655 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1656 | */ | ||
1657 | static int | ||
1658 | state_update_signed (void *cls, | ||
1659 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1660 | { | ||
1661 | struct Plugin *plugin = cls; | ||
1662 | return exec_channel (plugin, plugin->update_state_signed, channel_key); | ||
1663 | } | ||
1664 | |||
1665 | |||
1666 | /** | ||
1667 | * Retrieve a state variable by name. | ||
1668 | * | ||
1669 | * @see GNUNET_PSYCSTORE_state_get() | ||
1670 | * | ||
1671 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1672 | */ | ||
1673 | static int | ||
1674 | state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1675 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | ||
1676 | { | ||
1677 | struct Plugin *plugin = cls; | ||
1678 | int ret = GNUNET_SYSERR; | ||
1679 | int sql_ret ; | ||
1680 | |||
1681 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one; | ||
1682 | |||
1683 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1684 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1685 | GNUNET_MY_query_param_string (name), | ||
1686 | GNUNET_MY_query_param_end | ||
1687 | }; | ||
1688 | |||
1689 | void *value_current = NULL; | ||
1690 | size_t value_size = 0; | ||
1691 | |||
1692 | struct GNUNET_MY_ResultSpec results[] = { | ||
1693 | GNUNET_MY_result_spec_variable_size (&value_current, &value_size), | ||
1694 | GNUNET_MY_result_spec_end | ||
1695 | }; | ||
1696 | |||
1697 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
1698 | { | ||
1699 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1700 | "mysql exec_prepared", stmt); | ||
1701 | } | ||
1702 | else | ||
1703 | { | ||
1704 | sql_ret = GNUNET_MY_extract_result (stmt, results); | ||
1705 | switch (sql_ret) | ||
1706 | { | ||
1707 | case GNUNET_NO: | ||
1708 | ret = GNUNET_NO; | ||
1709 | break; | ||
1710 | |||
1711 | case GNUNET_YES: | ||
1712 | ret = cb (cb_cls, name, value_current, value_size); | ||
1713 | break; | ||
1714 | |||
1715 | default: | ||
1716 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1717 | "mysql extract_result", stmt); | ||
1718 | } | ||
1719 | } | ||
1720 | |||
1721 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1722 | { | ||
1723 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1724 | "mysql_stmt_reset", stmt); | ||
1725 | return GNUNET_SYSERR; | ||
1726 | } | ||
1727 | |||
1728 | return ret; | ||
1729 | } | ||
1730 | |||
1731 | |||
1732 | /** | ||
1733 | * Retrieve all state variables for a channel with the given prefix. | ||
1734 | * | ||
1735 | * @see GNUNET_PSYCSTORE_state_get_prefix() | ||
1736 | * | ||
1737 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1738 | */ | ||
1739 | static int | ||
1740 | state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1741 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, | ||
1742 | void *cb_cls) | ||
1743 | { | ||
1744 | struct Plugin *plugin = cls; | ||
1745 | int ret = GNUNET_SYSERR; | ||
1746 | |||
1747 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix; | ||
1748 | |||
1749 | uint32_t name_len = (uint32_t) strlen (name); | ||
1750 | |||
1751 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1752 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1753 | GNUNET_MY_query_param_string (name), | ||
1754 | GNUNET_MY_query_param_uint32 (&name_len), | ||
1755 | GNUNET_MY_query_param_string (name), | ||
1756 | GNUNET_MY_query_param_end | ||
1757 | }; | ||
1758 | |||
1759 | char *name2 = ""; | ||
1760 | void *value_current = NULL; | ||
1761 | size_t value_size = 0; | ||
1762 | |||
1763 | struct GNUNET_MY_ResultSpec results[] = { | ||
1764 | GNUNET_MY_result_spec_string (&name2), | ||
1765 | GNUNET_MY_result_spec_variable_size (&value_current, &value_size), | ||
1766 | GNUNET_MY_result_spec_end | ||
1767 | };; | ||
1768 | |||
1769 | int sql_ret; | ||
1770 | |||
1771 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
1772 | { | ||
1773 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1774 | "mysql exec_prepared", stmt); | ||
1775 | return GNUNET_SYSERR; | ||
1776 | } | ||
1777 | |||
1778 | do | ||
1779 | { | ||
1780 | sql_ret = GNUNET_MY_extract_result (stmt, results); | ||
1781 | switch (sql_ret) | ||
1782 | { | ||
1783 | case GNUNET_NO: | ||
1784 | if (ret != GNUNET_YES) | ||
1785 | ret = GNUNET_NO; | ||
1786 | break; | ||
1787 | |||
1788 | case GNUNET_YES: | ||
1789 | ret = cb (cb_cls, (const char *) name2, value_current, value_size); | ||
1790 | |||
1791 | if (ret != GNUNET_YES) | ||
1792 | sql_ret = GNUNET_NO; | ||
1793 | break; | ||
1794 | |||
1795 | default: | ||
1796 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1797 | "mysql extract_result", stmt); | ||
1798 | } | ||
1799 | } | ||
1800 | while (sql_ret == GNUNET_YES); | ||
1801 | |||
1802 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1803 | { | ||
1804 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1805 | "mysql_stmt_reset", stmt); | ||
1806 | return GNUNET_SYSERR; | ||
1807 | } | ||
1808 | |||
1809 | return ret; | ||
1810 | } | ||
1811 | |||
1812 | |||
1813 | /** | ||
1814 | * Retrieve all signed state variables for a channel. | ||
1815 | * | ||
1816 | * @see GNUNET_PSYCSTORE_state_get_signed() | ||
1817 | * | ||
1818 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1819 | */ | ||
1820 | static int | ||
1821 | state_get_signed (void *cls, | ||
1822 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1823 | GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | ||
1824 | { | ||
1825 | struct Plugin *plugin = cls; | ||
1826 | int ret = GNUNET_SYSERR; | ||
1827 | |||
1828 | struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed; | ||
1829 | |||
1830 | struct GNUNET_MY_QueryParam params_select[] = { | ||
1831 | GNUNET_MY_query_param_auto_from_type (channel_key), | ||
1832 | GNUNET_MY_query_param_end | ||
1833 | }; | ||
1834 | |||
1835 | int sql_ret; | ||
1836 | |||
1837 | char *name = ""; | ||
1838 | void *value_signed = NULL; | ||
1839 | size_t value_size = 0; | ||
1840 | |||
1841 | struct GNUNET_MY_ResultSpec results[] = { | ||
1842 | GNUNET_MY_result_spec_string (&name), | ||
1843 | GNUNET_MY_result_spec_variable_size (&value_signed, &value_size), | ||
1844 | GNUNET_MY_result_spec_end | ||
1845 | }; | ||
1846 | |||
1847 | if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) | ||
1848 | { | ||
1849 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1850 | "mysql exec_prepared", stmt); | ||
1851 | return GNUNET_SYSERR; | ||
1852 | } | ||
1853 | |||
1854 | do | ||
1855 | { | ||
1856 | sql_ret = GNUNET_MY_extract_result (stmt, results); | ||
1857 | switch (sql_ret) | ||
1858 | { | ||
1859 | case GNUNET_NO: | ||
1860 | if (ret != GNUNET_YES) | ||
1861 | ret = GNUNET_NO; | ||
1862 | break; | ||
1863 | |||
1864 | case GNUNET_YES: | ||
1865 | ret = cb (cb_cls, (const char *) name, value_signed, value_size); | ||
1866 | |||
1867 | if (ret != GNUNET_YES) | ||
1868 | sql_ret = GNUNET_NO; | ||
1869 | break; | ||
1870 | |||
1871 | default: | ||
1872 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1873 | "mysql extract_result", stmt); | ||
1874 | } | ||
1875 | } | ||
1876 | while (sql_ret == GNUNET_YES); | ||
1877 | |||
1878 | if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) | ||
1879 | { | ||
1880 | LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | ||
1881 | "mysql_stmt_reset", stmt); | ||
1882 | return GNUNET_SYSERR; | ||
1883 | } | ||
1884 | |||
1885 | return ret; | ||
1886 | } | ||
1887 | |||
1888 | |||
1889 | /** | ||
1890 | * Entry point for the plugin. | ||
1891 | * | ||
1892 | * @param cls The struct GNUNET_CONFIGURATION_Handle. | ||
1893 | * @return NULL on error, otherwise the plugin context | ||
1894 | */ | ||
1895 | void * | ||
1896 | libgnunet_plugin_psycstore_mysql_init (void *cls) | ||
1897 | { | ||
1898 | static struct Plugin plugin; | ||
1899 | const struct GNUNET_CONFIGURATION_Handle *cfg = cls; | ||
1900 | struct GNUNET_PSYCSTORE_PluginFunctions *api; | ||
1901 | |||
1902 | if (NULL != plugin.cfg) | ||
1903 | return NULL; /* can only initialize once! */ | ||
1904 | memset (&plugin, 0, sizeof (struct Plugin)); | ||
1905 | plugin.cfg = cfg; | ||
1906 | if (GNUNET_OK != database_setup (&plugin)) | ||
1907 | { | ||
1908 | database_shutdown (&plugin); | ||
1909 | return NULL; | ||
1910 | } | ||
1911 | api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions); | ||
1912 | api->cls = &plugin; | ||
1913 | api->membership_store = &mysql_membership_store; | ||
1914 | api->membership_test = &membership_test; | ||
1915 | api->fragment_store = &fragment_store; | ||
1916 | api->message_add_flags = &message_add_flags; | ||
1917 | api->fragment_get = &fragment_get; | ||
1918 | api->fragment_get_latest = &fragment_get_latest; | ||
1919 | api->message_get = &message_get; | ||
1920 | api->message_get_latest = &message_get_latest; | ||
1921 | api->message_get_fragment = &message_get_fragment; | ||
1922 | api->counters_message_get = &counters_message_get; | ||
1923 | api->counters_state_get = &counters_state_get; | ||
1924 | api->state_modify_begin = &state_modify_begin; | ||
1925 | api->state_modify_op = &state_modify_op; | ||
1926 | api->state_modify_end = &state_modify_end; | ||
1927 | api->state_sync_begin = &state_sync_begin; | ||
1928 | api->state_sync_assign = &state_sync_assign; | ||
1929 | api->state_sync_end = &state_sync_end; | ||
1930 | api->state_reset = &state_reset; | ||
1931 | api->state_update_signed = &state_update_signed; | ||
1932 | api->state_get = &state_get; | ||
1933 | api->state_get_prefix = &state_get_prefix; | ||
1934 | api->state_get_signed = &state_get_signed; | ||
1935 | |||
1936 | LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n")); | ||
1937 | return api; | ||
1938 | } | ||
1939 | |||
1940 | |||
1941 | /** | ||
1942 | * Exit point from the plugin. | ||
1943 | * | ||
1944 | * @param cls The plugin context (as returned by "init") | ||
1945 | * @return Always NULL | ||
1946 | */ | ||
1947 | void * | ||
1948 | libgnunet_plugin_psycstore_mysql_done (void *cls) | ||
1949 | { | ||
1950 | struct GNUNET_PSYCSTORE_PluginFunctions *api = cls; | ||
1951 | struct Plugin *plugin = api->cls; | ||
1952 | |||
1953 | database_shutdown (plugin); | ||
1954 | plugin->cfg = NULL; | ||
1955 | GNUNET_free (api); | ||
1956 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n"); | ||
1957 | return NULL; | ||
1958 | } | ||
1959 | |||
1960 | /* end of plugin_psycstore_mysql.c */ | ||