aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorChristophe Genevey Metat <genevey.christophe@gmail.com>2016-07-04 15:25:44 +0000
committerChristophe Genevey Metat <genevey.christophe@gmail.com>2016-07-04 15:25:44 +0000
commitbed74ba115e1ff8501482dcb7f94fc0214e647b8 (patch)
tree4684857df3e12f46b55cdf1220a524e2a5a5410e /src/psycstore
parent278eb59cd0a1c1483de7270ae46e5b6b57604aa8 (diff)
downloadgnunet-bed74ba115e1ff8501482dcb7f94fc0214e647b8.tar.gz
gnunet-bed74ba115e1ff8501482dcb7f94fc0214e647b8.zip
psycstore mysql plugin
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/plugin_psycstore_mysql.c2110
1 files changed, 2110 insertions, 0 deletions
diff --git a/src/psycstore/plugin_psycstore_mysql.c b/src/psycstore/plugin_psycstore_mysql.c
new file mode 100644
index 000000000..c766e1a6c
--- /dev/null
+++ b/src/psycstore/plugin_psycstore_mysql.c
@@ -0,0 +1,2110 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21/**
22 * @file psycstore/plugin_psycstore_mysql.c
23 * @brief sqlite-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
26 * @author Christophe Genevey
27 */
28
29/*
30 * FIXME: SQLite3 only supports signed 64-bit integers natively,
31 * thus it can only store 63 bits of the uint64_t's.
32 */
33
34#include "platform.h"
35#include "gnunet_psycstore_plugin.h"
36#include "gnunet_psycstore_service.h"
37#include "gnunet_multicast_service.h"
38#include "gnunet_crypto_lib.h"
39#include "gnunet_psyc_util_lib.h"
40#include "psycstore.h"
41#include <sqlite3.h>
42
43/**
44 * After how many ms "busy" should a DB operation fail for good? A
45 * low value makes sure that we are more responsive to requests
46 * (especially PUTs). A high value guarantees a higher success rate
47 * (SELECTs in iterate can take several seconds despite LIMIT=1).
48 *
49 * The default value of 1s should ensure that users do not experience
50 * huge latencies while at the same time allowing operations to
51 * succeed with reasonable probability.
52 */
53#define BUSY_TIMEOUT_MS 1000
54
55#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
56
57/**
58 * Log an error message at log-level 'level' that indicates
59 * a failure of the command 'cmd' on file 'filename'
60 * with the message given by strerror(errno).
61 */
62#define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0)
63
64#define LOG_MYSQL(db, level, cmd, stmt) do { GNUNET_log_from (level, "psycstore-mysql", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_stmt_error (stmt)); } while(0)
65
66#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
67
68enum Transactions {
69 TRANSACTION_NONE = 0,
70 TRANSACTION_STATE_MODIFY,
71 TRANSACTION_STATE_SYNC,
72};
73
74/**
75 * Context for all functions in this plugin.
76 */
77struct Plugin
78{
79
80 const struct GNUNET_CONFIGURATION_Handle *cfg;
81
82 /**
83 * Database filename.
84 */
85 char *fn;
86
87 /**
88 *Handle to talk to Mysql
89 */
90 struct GNUNET_MYSQL_Context *mc;
91
92 /**
93 * Current transaction.
94 */
95 enum Transactions transaction;
96
97 GNUNET_MYSQL_StatementHandle *transaction_begin;
98
99 GNUNET_MYSQL_StatementHandle *transaction_commit;
100
101 GNUNET_MYSQL_StatementHandle *transaction_rollback;
102
103 /**
104 * Precompiled SQL for channel_key_store()
105 */
106 GNUNET_MYSQL_StatementHandle *insert_channel_key;
107
108
109 /**
110 * Precompiled SQL for slave_key_store()
111 */
112 GNUNET_MYSQL_StatementHandle *insert_slave_key;
113
114 /**
115 * Precompiled SQL for membership_store()
116 */
117 GNUNET_MYSQL_StatementHandle *insert_membership;
118
119 /**
120 * Precompiled SQL for membership_test()
121 */
122 GNUNET_MYSQL_StatementHandle *select_membership;
123
124 /**
125 * Precompiled SQL for fragment_store()
126 */
127 GNUNET_MYSQL_StatementHandle *insert_fragment;
128
129 /**
130 * Precompiled SQL for message_add_flags()
131 */
132 GNUNET_MYSQL_StatementHandle *update_message_flags;
133
134 /**
135 * Precompiled SQL for fragment_get()
136 */
137 GNUNET_MYSQL_StatementHandle *select_fragments;
138
139 /**
140 * Precompiled SQL for fragment_get()
141 */
142 GNUNET_MYSQL_StatementHandle *select_latest_fragments;
143
144 /**
145 * Precompiled SQL for message_get()
146 */
147 GNUNET_MYSQL_StatementHandle *select_messages;
148
149 /**
150 * Precompiled SQL for message_get()
151 */
152 GNUNET_MYSQL_StatementHandle *select_latest_messages;
153
154 /**
155 * Precompiled SQL for message_get_fragment()
156 */
157 GNUNET_MYSQL_StatementHandle *select_message_fragment;
158
159 /**
160 * Precompiled SQL for counters_get_message()
161 */
162 GNUNET_MYSQL_StatementHandle *select_counters_message;
163
164 /**
165 * Precompiled SQL for counters_get_state()
166 */
167 GNUNET_MYSQL_StatementHandle *select_counters_state;
168
169 /**
170 * Precompiled SQL for state_modify_end()
171 */
172 GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
173
174 /**
175 * Precompiled SQL for state_sync_end()
176 */
177 GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
178
179 /**
180 * Precompiled SQL for state_modify_op()
181 */
182 GNUNET_MYSQL_StatementHandle *insert_state_current;
183
184 /**
185 * Precompiled SQL for state_modify_end()
186 */
187 GNUNET_MYSQL_StatementHandle *delete_state_empty;
188
189 /**
190 * Precompiled SQL for state_set_signed()
191 */
192 GNUNET_MYSQL_StatementHandle *update_state_signed;
193
194 /**
195 * Precompiled SQL for state_sync()
196 */
197 GNUNET_MYSQL_StatementHandle *insert_state_sync;
198
199 /**
200 * Precompiled SQL for state_sync()
201 */
202 GNUNET_MYSQL_StatementHandle *delete_state;
203
204 /**
205 * Precompiled SQL for state_sync()
206 */
207 GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
208
209 /**
210 * Precompiled SQL for state_sync()
211 */
212 GNUNET_MYSQL_StatementHandle *delete_state_sync;
213
214 /**
215 * Precompiled SQL for state_get_signed()
216 */
217 GNUNET_MYSQL_StatementHandle *select_state_signed;
218
219 /**
220 * Precompiled SQL for state_get()
221 */
222 GNUNET_MYSQL_StatementHandle *select_state_one;
223
224 /**
225 * Precompiled SQL for state_get_prefix()
226 */
227 GNUNET_MYSQL_StatementHandle *select_state_prefix;
228
229};
230
231#if DEBUG_PSYCSTORE
232
233static void
234sql_trace (void *cls, const char *sql)
235{
236 LOG (GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
237}
238
239#endif
240
241
242/**
243 * @brief Prepare a SQL statement
244 *
245 * @param dbh handle to the database
246 * @param sql SQL statement, UTF-8 encoded
247 * @param stmt set to the prepared statement
248 * @return 0 on success
249 */
250static int
251mysql_prepare (struct GNUNET_MYSQL_Context *mc,
252 const char *sql,
253 struct GNUNET_MYSQL_StatementHandle *stmt)
254{
255 stmt = GNUNET_MYSQL_statement_prepare (mc,
256 sql);
257
258 LOG (GNUNET_ERROR_TYPE_DEBUG,
259 "Prepared `%s' / %p\n", sql, stmt);
260 if(NULL == stmt)
261 LOG (GNUNET_ERROR_TYPE_ERROR,
262 _("Error preparing SQL query: %s\n %s\n"),
263 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (stmt)), sql);
264
265 return 1;
266}
267
268
269/**
270 * @brief Prepare a SQL statement
271 *
272 * @param dbh handle to the database
273 * @param sql SQL statement, UTF-8 encoded
274 * @return 0 on success
275 */
276static int
277mysql_exec (struct GNUNET_MYSQL_Context *mc,
278 struct GNUNET_MYSQL_Statement *sh,
279 struct GNUNET_MY_QueryParam *qp)
280{
281 int result;
282
283 result = GNUNET_MY_exec_prepared (mc, sh, qp);
284 LOG (GNUNET_ERROR_TYPE_DEBUG,
285 "Executed `%s' / %d\n", sql, result);
286 if (GNUNET_OK != result)
287 LOG (GNUNET_ERROR_TYPE_ERROR,
288 _("Error executing SQL query: %s\n %s\n"),
289 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (sh)), sql);
290 return result;
291}
292
293
294/**
295 * Initialize the database connections and associated
296 * data structures (create tables and indices
297 * as needed as well).
298 *
299 * @param plugin the plugin context (state for this module)
300 * @return GNUNET_OK on success
301 */
302static int
303database_setup (struct Plugin *plugin)
304{
305 char *filename;
306
307 if (GNUNET_OK !=
308 GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-mysql",
309 "FILENAME", &filename))
310 {
311 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
312 "psycstore-sqlite", "FILENAME");
313 return GNUNET_SYSERR;
314 }
315 if (GNUNET_OK != GNUNET_DISK_file_test (filename))
316 {
317 if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename))
318 {
319 GNUNET_break (0);
320 GNUNET_free (filename);
321 return GNUNET_SYSERR;
322 }
323 }
324 /* filename should be UTF-8-encoded. If it isn't, it's a bug */
325 plugin->fn = filename;
326
327 /* Open database and precompile statements */
328 plugin->mc = GNUNET_MYSQL_context_create(plugin->cfg, "psycstore-mysql");
329
330 if (NULL == plugin->mc)
331 {
332 LOG (GNUNET_ERROR_TYPE_ERROR,
333 _("Unable to initialize SQLite: %s.\n"),
334 sqlite3_errmsg (plugin->dbh));
335 return GNUNET_SYSERR;
336 }
337
338/*
339#if DEBUG_PSYCSTORE
340 sqlite3_trace (plugin->dbh, &sql_trace, NULL);
341#endif
342
343 sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY");
344 sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL");
345 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
346 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
347 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
348#if ! DEBUG_PSYCSTORE
349 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
350#endif
351 sql_exec (plugin->dbh, "PRAGMA page_size=4096");
352
353 sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS);
354
355*/
356 /* Create tables */
357
358 GNUNET_MYSQL_statement_run (plugin->mc,
359 "CREATE TABLE IF NOT EXISTS channels (\n"
360 " id INT PRIMARY KEY,\n"
361 " pub_key BLOB UNIQUE,\n"
362 " max_state_message_id INT,\n" // last applied state message ID
363 " state_hash_message_id INT\n" // last message ID with a state hash
364 ");");
365
366 GNUNET_MYSQL_statement_run (plugin->mc,
367 "CREATE TABLE IF NOT EXISTS slaves (\n"
368 " id INT PRIMARY KEY,\n"
369 " pub_key BLOB UNIQUE\n"
370 ");");
371
372 GNUNET_MYSQL_statement_run (plugin->mc,
373 "CREATE TABLE IF NOT EXISTS membership (\n"
374 " channel_id INT NOT NULL REFERENCES channels(id),\n"
375 " slave_id INT NOT NULL REFERENCES slaves(id),\n"
376 " did_join INT NOT NULL,\n"
377 " announced_at INT NOT NULL,\n"
378 " effective_since INT NOT NULL,\n"
379 " group_generation INT NOT NULL\n"
380 ");");
381
382 GNUNET_MYSQL_statement_run (plugin->mc,
383 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
384 "ON membership (channel_id, slave_id);");
385
386 /** @todo messages table: add method_name column */
387 GNUNET_MYSQL_statement_run (plugin->mc,
388 "CREATE TABLE IF NOT EXISTS messages (\n"
389 " channel_id INT NOT NULL REFERENCES channels(id),\n"
390 " hop_counter INT NOT NULL,\n"
391 " signature BLOB,\n"
392 " purpose BLOB,\n"
393 " fragment_id INT NOT NULL,\n"
394 " fragment_offset INT NOT NULL,\n"
395 " message_id INT NOT NULL,\n"
396 " group_generation INT NOT NULL,\n"
397 " multicast_flags INT NOT NULL,\n"
398 " psycstore_flags INT NOT NULL,\n"
399 " data BLOB,\n"
400 " PRIMARY KEY (channel_id, fragment_id),\n"
401 " UNIQUE (channel_id, message_id, fragment_offset)\n"
402 ");");
403
404 GNUNET_MYSQL_statement_run (plugin->mc,
405 "CREATE TABLE IF NOT EXISTS state (\n"
406 " channel_id INT NOT NULL REFERENCES channels(id),\n"
407 " name TEXT NOT NULL,\n"
408 " value_current BLOB,\n"
409 " value_signed BLOB,\n"
410 " PRIMARY KEY (channel_id, name)\n"
411 ");");
412
413 GNUNET_MYSQL_statement_run (plugin->mc,
414 "CREATE TABLE IF NOT EXISTS state_sync (\n"
415 " channel_id INT NOT NULL REFERENCES channels(id),\n"
416 " name TEXT NOT NULL,\n"
417 " value BLOB,\n"
418 " PRIMARY KEY (channel_id, name)\n"
419 ");");
420
421 /* Prepare statements */
422 mysql_prepare (plugin->mc,
423 "BEGIN",
424 plugin->transaction_begin);
425
426 mysql_prepare (plugin->mc,
427 "COMMIT",
428 plugin->transaction_commit);
429
430 mysql_prepare (plugin->mc,
431 "ROLLBACK;",
432 plugin->transaction_rollback);
433
434 mysql_prepare (plugin->mc,
435 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
436 plugin->insert_channel_key);
437
438 mysql_prepare (plugin->mc,
439 "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);",
440 plugin->insert_slave_key);
441
442 mysql_prepare (plugin->mc,
443 "INSERT INTO membership\n"
444 " (channel_id, slave_id, did_join, announced_at,\n"
445 " effective_since, group_generation)\n"
446 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
447 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
448 " ?, ?, ?, ?);",
449 plugin->insert_membership);
450
451 mysql_prepare (plugin->mc,
452 "SELECT did_join FROM membership\n"
453 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
454 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
455 " AND effective_since <= ? AND did_join = 1\n"
456 "ORDER BY announced_at DESC LIMIT 1;",
457 plugin->select_membership);
458
459 mysql_prepare (plugin->mc,
460 "INSERT OR IGNORE INTO messages\n"
461 " (channel_id, hop_counter, signature, purpose,\n"
462 " fragment_id, fragment_offset, message_id,\n"
463 " group_generation, multicast_flags, psycstore_flags, data)\n"
464 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
465 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
466 plugin->insert_fragment);
467
468 mysql_prepare (plugin->mc,
469 "UPDATE messages\n"
470 "SET psycstore_flags = psycstore_flags | ?\n"
471 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
472 " AND message_id = ? AND fragment_offset = 0;",
473 plugin->update_message_flags);
474
475 mysql_prepare (plugin->mc,
476 "SELECT hop_counter, signature, purpose, fragment_id,\n"
477 " fragment_offset, message_id, group_generation,\n"
478 " multicast_flags, psycstore_flags, data\n"
479 "FROM messages\n"
480 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
481 " AND ? <= fragment_id AND fragment_id <= ?;",
482 plugin->select_fragments);
483
484 /** @todo select_messages: add method_prefix filter */
485 mysql_prepare (plugin->mc,
486 "SELECT hop_counter, signature, purpose, fragment_id,\n"
487 " fragment_offset, message_id, group_generation,\n"
488 " multicast_flags, psycstore_flags, data\n"
489 "FROM messages\n"
490 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
491 " AND ? <= message_id AND message_id <= ?"
492 "LIMIT ?;",
493 plugin->select_messages);
494
495 mysql_prepare (plugin->mc,
496 "SELECT * FROM\n"
497 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
498 " fragment_offset, message_id, group_generation,\n"
499 " multicast_flags, psycstore_flags, data\n"
500 " FROM messages\n"
501 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
502 " ORDER BY fragment_id DESC\n"
503 " LIMIT ?)\n"
504 "ORDER BY fragment_id;",
505 plugin->select_latest_fragments);
506
507 /** @todo select_latest_messages: add method_prefix filter */
508 mysql_prepare (plugin->mc,
509 "SELECT hop_counter, signature, purpose, fragment_id,\n"
510 " fragment_offset, message_id, group_generation,\n"
511 " multicast_flags, psycstore_flags, data\n"
512 "FROM messages\n"
513 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
514 " AND message_id IN\n"
515 " (SELECT message_id\n"
516 " FROM messages\n"
517 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
518 " GROUP BY message_id\n"
519 " ORDER BY message_id\n"
520 " DESC LIMIT ?)\n"
521 "ORDER BY fragment_id;",
522 plugin->select_latest_messages);
523
524 mysql_prepare (plugin->mc,
525 "SELECT hop_counter, signature, purpose, fragment_id,\n"
526 " fragment_offset, message_id, group_generation,\n"
527 " multicast_flags, psycstore_flags, data\n"
528 "FROM messages\n"
529 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
530 " AND message_id = ? AND fragment_offset = ?;",
531 plugin->select_message_fragment);
532
533 mysql_prepare (plugin->mc,
534 "SELECT fragment_id, message_id, group_generation\n"
535 "FROM messages\n"
536 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
537 "ORDER BY fragment_id DESC LIMIT 1;",
538 plugin->select_counters_message);
539
540 mysql_prepare (plugin->mc,
541 "SELECT max_state_message_id\n"
542 "FROM channels\n"
543 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
544 plugin->select_counters_state);
545
546 mysql_prepare (plugin->mc,
547 "UPDATE channels\n"
548 "SET max_state_message_id = ?\n"
549 "WHERE pub_key = ?;",
550 plugin->update_max_state_message_id);
551
552 mysql_prepare (plugin->mc,
553 "UPDATE channels\n"
554 "SET state_hash_message_id = ?\n"
555 "WHERE pub_key = ?;",
556 plugin->update_state_hash_message_id);
557
558 mysql_prepare (plugin->mc,
559 "INSERT OR REPLACE INTO state\n"
560 " (channel_id, name, value_current, value_signed)\n"
561 "SELECT new.channel_id, new.name,\n"
562 " new.value_current, old.value_signed\n"
563 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
564 " AS channel_id,\n"
565 " ? AS name, ? AS value_current) AS new\n"
566 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
567 " FROM state) AS old\n"
568 "ON new.channel_id = old.channel_id AND new.name = old.name;",
569 plugin->insert_state_current);
570
571 mysql_prepare (plugin->mc,
572 "DELETE FROM state\n"
573 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
574 " AND (value_current IS NULL OR length(value_current) = 0)\n"
575 " AND (value_signed IS NULL OR length(value_signed) = 0);",
576 plugin->delete_state_empty);
577
578 mysql_prepare (plugin->mc,
579 "UPDATE state\n"
580 "SET value_signed = value_current\n"
581 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
582 plugin->update_state_signed);
583
584 mysql_prepare (plugin->mc,
585 "DELETE FROM state\n"
586 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
587 plugin->delete_state);
588
589 mysql_prepare (plugin->mc,
590 "INSERT INTO state_sync (channel_id, name, value)\n"
591 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
592 plugin->insert_state_sync);
593
594 mysql_prepare (plugin->mc,
595 "INSERT INTO state\n"
596 " (channel_id, name, value_current, value_signed)\n"
597 "SELECT channel_id, name, value, value\n"
598 "FROM state_sync\n"
599 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
600 plugin->insert_state_from_sync);
601
602 mysql_prepare (plugin->mc,
603 "DELETE FROM state_sync\n"
604 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
605 plugin->delete_state_sync);
606
607 mysql_prepare (plugin->mc,
608 "SELECT value_current\n"
609 "FROM state\n"
610 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
611 " AND name = ?;",
612 plugin->select_state_one);
613
614 mysql_prepare (plugin->mc,
615 "SELECT name, value_current\n"
616 "FROM state\n"
617 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
618 " AND (name = ? OR substr(name, 1, ?) = ? || '_');",
619 plugin->select_state_prefix);
620
621 mysql_prepare (plugin->mc,
622 "SELECT name, value_signed\n"
623 "FROM state\n"
624 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
625 " AND value_signed IS NOT NULL;",
626 plugin->select_state_signed);
627
628 return GNUNET_OK;
629}
630
631
632/**
633 * Shutdown database connection and associate data
634 * structures.
635 * @param plugin the plugin context (state for this module)
636 */
637static void
638database_shutdown (struct Plugin *plugin)
639{
640 int result;
641 sqlite3_stmt *stmt;
642 while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
643 {
644 result = sqlite3_finalize (stmt);
645 if (SQLITE_OK != result)
646 LOG (GNUNET_ERROR_TYPE_WARNING,
647 "Failed to close statement %p: %d\n", stmt, result);
648 }
649 if (SQLITE_OK != sqlite3_close (plugin->dbh))
650 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
651
652 GNUNET_free_non_null (plugin->fn);
653}
654
655
656/**
657 * Execute a prepared statement with a @a channel_key argument.
658 *
659 * @param plugin Plugin handle.
660 * @param stmt Statement to execute.
661 * @param channel_key Public key of the channel.
662 *
663 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
664 */
665static int
666exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
667 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
668{
669 MYSQL_STMT * statement = NULL;
670 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
671
672 if (NULL == statement)
673 {
674 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
675 "mysql statement invalide", statement);
676 return GNUNET_SYSERR;
677 }
678
679 struct GNUNET_MY_QueryParam params[] = {
680 GNUNET_MY_query_param_auto_from_type (channel_key),
681 GNUNET_MY_query_param_end
682 };
683
684 if(GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
685 stmt,
686 results)
687 {
688 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
689 "mysql exec_channel", stmt);
690 }
691
692 if (0 != mysql_stmt_reset (statement))
693 {
694 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
695 "mysql_stmt_reset", statement);
696 return GNUNET_SYSERR;
697 }
698
699 return GNUNET_OK;
700}
701
702
703/**
704 * Begin a transaction.
705 */
706static int
707transaction_begin (struct Plugin *plugin, enum Transactions transaction)
708{
709 GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
710 MYSQL_STMT * statement = NULL;
711
712 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
713
714 if (NULL == statement)
715 {
716 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
717 "mysql statement invalide", statement);
718 return GNUNET_SYSERR;
719 }
720
721 struct GNUNET_MY_QueryParam params[] = {
722 GNUNET_MY_query_param_end
723 };
724
725 if (GNUNET_OK != GNUNET_MY_extract_result (plugin->mc,
726 stmt,
727 results))
728 {
729 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
730 "mysql extract_result", statement);
731 return GNUNET_SYSERR;
732 }
733
734 if (0 != mysql_stmt_reset (statement))
735 {
736 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
737 "mysql_stmt_reset", statement);
738 return GNUNET_SYSERR;
739 }
740
741 plugin->transaction = transaction;
742 return GNUNET_OK;
743}
744
745
746/**
747 * Commit current transaction.
748 */
749static int
750transaction_commit (struct Plugin *plugin)
751{
752 GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
753 MYSQL_STMT *statement = NULL;
754
755 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
756
757 if (NULL == statement)
758 {
759 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
760 "mysql statement invalide", statement);
761 return GNUNET_SYSERR;
762 }
763
764 struct GNUNET_MY_QueryParam params[] = {
765 GNUNET_MY_query_param_end
766 };
767
768 if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
769 stmt,
770 results))
771 {
772 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
773 "mysql extract_result", statement);
774 return GNUNET_SYSERR;
775 }
776
777 if (0 != mysql_stmt_reset (stmt))
778 {
779 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
780 "mysql_stmt_reset", statement);
781 return GNUNET_SYSERR;
782 }
783
784 plugin->transaction = TRANSACTION_NONE;
785 return GNUNET_OK;
786}
787
788
789/**
790 * Roll back current transaction.
791 */
792static int
793transaction_rollback (struct Plugin *plugin)
794{
795 GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
796 MYSQL_STMT* statement = NULL;
797
798 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
799 if (NULL == statement)
800 {
801 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
802 "mysql statement invalide", statement);
803 return GNUNET_SYSERR;
804 }
805
806 struct GNUNET_MY_QueryParam params[] = {
807 GNUNET_MY_query_param_end
808 };
809
810 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
811 stmt,
812 results))
813 {
814 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
815 "mysql extract_result", statement);
816 return GNUNET_SYSERR;
817 }
818
819 if (0 != mysql_stmt_reset (stmt))
820 {
821 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
822 "mysql_stmt_reset", statement);
823 return GNUNET_SYSERR;
824 }
825
826 plugin->transaction = TRANSACTION_NONE;
827 return GNUNET_OK;
828}
829
830
831static int
832channel_key_store (struct Plugin *plugin,
833 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
834{
835 GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
836
837 MYSQL_STMT *statement = NULL;
838 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
839
840 if(NULL == statement)
841 {
842 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
843 "mysql statement invalide", statement);
844 return GNUNET_SYSERR;
845 }
846
847 struct GNUNET_MY_QueryParam params[] = {
848 GNUNET_MY_query_param_auto_from_type (channel_key),
849 GNUNET_MY_query_param_end
850 };
851
852 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
853 stmt,
854 results))
855 {
856 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
857 "mysql extract_result", statement);
858 return GNUNET_SYSERR;
859 }
860
861 return GNUNET_OK;
862}
863
864
865static int
866slave_key_store (struct Plugin *plugin,
867 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
868{
869 GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
870
871 MYSQL_STMT *statement = NULL;
872 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
873
874 if(NULL == statement)
875 {
876 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
877 "mysql statement invalide", statement);
878 return GNUNET_SYSERR;
879 }
880
881 struct GNUNET_MY_QueryParam params[] = {
882 GNUNET_MY_query_param_auto_from_type (slave_key),
883 GNUNET_MY_query_param_end
884 };
885
886 if (GNUNET_OK != GNUNET_MY_exec_prepared( plugin->mc,
887 stmt,
888 results))
889 {
890 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
891 "mysql extract_result", statement);
892 return GNUNET_SYSERR;
893 }
894
895 if (0 != mysql_stmt_reset (stmt))
896 {
897 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
898 "mysql_stmt_reset", statement);
899 return GNUNET_SYSERR;
900 }
901
902 return GNUNET_OK;
903}
904
905
906/**
907 * Store join/leave events for a PSYC channel in order to be able to answer
908 * membership test queries later.
909 *
910 * @see GNUNET_PSYCSTORE_membership_store()
911 *
912 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
913 */
914static int
915sqlite_membership_store (void *cls,
916 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
917 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
918 int did_join,
919 uint64_t announced_at,
920 uint64_t effective_since,
921 uint64_t group_generation)
922{
923 struct Plugin *plugin = cls;
924// sqlite3_stmt *stmt = plugin->insert_membership;
925
926 uint32_t idid_join = (uint32_t)did_join;
927 uint64_t iannounced_at = (uint64_t)announced_at;
928 uint64_t ieffective_since = (uint64_t)effective_since;
929 uint64_t igroup_generation = (uint64_t)group_generation;
930
931 GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
932
933
934 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
935
936 if (announced_at > INT64_MAX ||
937 effective_since > INT64_MAX ||
938 group_generation > INT64_MAX)
939 {
940 GNUNET_break (0);
941 return GNUNET_SYSERR;
942 }
943
944 if (GNUNET_OK != channel_key_store (plugin, channel_key)
945 || GNUNET_OK != slave_key_store (plugin, slave_key))
946 return GNUNET_SYSERR;
947
948 struct GNUNET_MY_QueryParam params[] = {
949 GNUNET_MY_query_param_auto_from_type (channel_key),
950 GNUNET_MY_query_param_auto_from_type (slave_key),
951 GNUNET_MY_query_param_uint32 (&idid_join),
952 GNUNET_MY_query_param_uint64 (&iannounced_at),
953 GNUNET_MY_query_param_uint64 (&ieffective_since),
954 GNUNET_MY_query_param_uint64 (&igroup_generation),
955 GNUNET_MY_query_param_end
956 };
957
958 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
959 stmt,
960 params))
961 {
962 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
963 "mysql extract_result", statement);
964 return GNUNET_SYSERR;
965 }
966
967 if (0 != mysql_stmt_reset (stmt))
968 {
969 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
970 "mysql_stmt_reset", statement);
971 return GNUNET_SYSERR;
972 }
973 return GNUNET_OK;
974}
975
976/**
977 * Test if a member was admitted to the channel at the given message ID.
978 *
979 * @see GNUNET_PSYCSTORE_membership_test()
980 *
981 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
982 * #GNUNET_SYSERR if there was en error.
983 */
984static int
985membership_test (void *cls,
986 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
987 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
988 uint64_t message_id)
989{
990 struct Plugin *plugin = cls;
991 //sqlite3_stmt *stmt = plugin->select_membership;
992 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
993 MYSQL_STMT *statement = NULL;
994
995 uint32_t did_join = 0;
996
997 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
998
999 if(NULL == statement)
1000 {
1001 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1002 "mysql statement invalide", statement);
1003 return GNUNET_SYSERR;
1004 }
1005
1006 int ret = GNUNET_SYSERR;
1007
1008 struct GNUNET_MY_QueryParam params_select[] = {
1009 GNUNET_MY_query_param_auto_from_type (channel_key),
1010 GNUNET_MY_query_param_auto_from_type (slave_key),
1011 GNUNET_MY_query_param_uint64 (&message_id),
1012 GNUNET_MY_query_param_end
1013 };
1014
1015 if (GNUNET_MY_exec_prepared (plugin->mc,
1016 stmt,
1017 params_select))
1018 {
1019 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1020 "mysql execute prepared", statement);
1021 return GNUNET_SYSERR;
1022 }
1023
1024 struct GNUNET_MY_ResultSpec results_select[] = {
1025 GNUNET_MY_result_spec_uint32 (&did_join),
1026 GNUNET_MY_result_spec_end
1027 };
1028
1029 if (GNUNET_OK != GNUNET_MY_extract_result (stmt,
1030 results_select))
1031 {
1032 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1033 "mysql extract_result", statement);
1034 return GNUNET_SYSERR;
1035 }
1036
1037 if(0 != did_join)
1038 {
1039 ret = GNUNET_YES;
1040 }
1041 else
1042 {
1043 ret = GNUNET_NO;
1044 }
1045
1046 if (0 != mysql_stmt_reset (stmt))
1047 {
1048 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1049 "mysql_stmt_reset", statement);
1050 return GNUNET_SYSERR;
1051 }
1052
1053 return ret;
1054}
1055
1056/**
1057 * Store a message fragment sent to a channel.
1058 *
1059 * @see GNUNET_PSYCSTORE_fragment_store()
1060 *
1061 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1062 */
1063static int
1064fragment_store (void *cls,
1065 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1066 const struct GNUNET_MULTICAST_MessageHeader *msg,
1067 uint32_t psycstore_flags)
1068{
1069 struct Plugin *plugin = cls;
1070
1071 GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
1072
1073
1074 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
1075
1076 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
1077 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
1078 uint64_t message_id = GNUNET_ntohll (msg->message_id);
1079 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
1080
1081 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
1082 message_id > INT64_MAX || group_generation > INT64_MAX)
1083 {
1084 LOG (GNUNET_ERROR_TYPE_ERROR,
1085 "Tried to store fragment with a field > INT64_MAX: "
1086 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
1087 message_id, group_generation);
1088 GNUNET_break (0);
1089 return GNUNET_SYSERR;
1090 }
1091
1092 if (GNUNET_OK != channel_key_store (plugin, channel_key))
1093 return GNUNET_SYSERR;
1094
1095 struct GNUNET_MY_QueryParam params_insert[] = {
1096 GNUNET_MY_query_param_auto_from_type (channel_key),
1097 GNUNET_MY_query_param_uint64 (&msg->hop_counter),
1098 GNUNET_MY_query_param_auto_from_type (msg->signature),
1099 GNUNET_MY_query_param_auto_from_type (msg->purpose),
1100 GNUNET_MY_query_param_uint64 (&fragment_id),
1101 GNUNET_MY_query_param_uint64 (&fragment_offset),
1102 GNUNET_MY_query_param_uint64 (&message_id),
1103 GNUNET_MY_query_param_uint64 (&group_generation),
1104 GNUNET_MY_query_param_uint64 (&msg->flags),
1105 GNUNET_MY_query_param_uint64 (&psycstore_flags),
1106 GNUNET_MY_query_param_auto_from_type (msg[1]),
1107 GNUNET_MY_query_param_end
1108 };
1109
1110 if (GNUNET_MY_exec_prepared (plugin->mc,
1111 stmt,
1112 params_insert))
1113 {
1114 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1115 "mysql execute prepared", statement);
1116 return GNUNET_SYSERR;
1117 }
1118
1119 if (0 != mysql_stmt_reset (stmt))
1120 {
1121 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1122 "mysql_stmt_reset", statement);
1123 return GNUNET_SYSERR;
1124 }
1125
1126 return GNUNET_OK;
1127}
1128
1129/**
1130 * Set additional flags for a given message.
1131 *
1132 * They are OR'd with any existing flags set.
1133 *
1134 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1135 */
1136static int
1137message_add_flags (void *cls,
1138 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1139 uint64_t message_id,
1140 uint64_t psycstore_flags)
1141{
1142 struct Plugin *plugin = cls;
1143
1144 GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
1145 MYSQL_STMT *statement = NULL;
1146
1147 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1148
1149 int ret = GNUNET_SYSERR;
1150
1151 struct GNUNET_MY_QueryParam params_update[] = {
1152 GNUNET_MY_query_param_uint64 (&psycstore_flags),
1153 GNUNET_MY_query_param_auto_from_type (channel_key),
1154 GNUNET_MY_query_param_uint64 (&message_id),
1155 GNUNET_MY_query_param_end
1156 };
1157
1158 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1159 stmt,
1160 params_update))
1161 {
1162 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1163 "mysql execute prepared", statement);
1164 return GNUNET_SYSERR;
1165 }
1166
1167 if (0 != mysql_stmt_reset (stmt))
1168 {
1169 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1170 "mysql_stmt_reset", statement);
1171 return GNUNET_SYSERR;
1172 }
1173
1174 return ret;
1175}
1176
1177/** Extract result from statement **/
1178static int
1179fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1180 void *cb_cls)
1181{
1182 int data_size = sqlite3_column_bytes (stmt, 9);
1183 struct GNUNET_MULTICAST_MessageHeader *msg
1184 = GNUNET_malloc (sizeof (*msg) + data_size);
1185
1186 msg->header.size = htons (sizeof (*msg) + data_size);
1187 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
1188 msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
1189 memcpy (&msg->signature,
1190 sqlite3_column_blob (stmt, 1),
1191 sqlite3_column_bytes (stmt, 1));
1192 memcpy (&msg->purpose,
1193 sqlite3_column_blob (stmt, 2),
1194 sqlite3_column_bytes (stmt, 2));
1195 msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
1196 msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
1197 msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
1198 msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
1199 msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
1200 memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
1201
1202 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1203}
1204
1205
1206static int
1207fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1208 uint64_t *returned_fragments,
1209 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1210{
1211 int ret = GNUNET_SYSERR;
1212 int sql_ret;
1213
1214 do
1215 {
1216 sql_ret = sqlite3_step (stmt);
1217 switch (sql_ret)
1218 {
1219 case SQLITE_DONE:
1220 if (ret != GNUNET_OK)
1221 ret = GNUNET_NO;
1222 break;
1223 case SQLITE_ROW:
1224 ret = fragment_row (stmt, cb, cb_cls);
1225 (*returned_fragments)++;
1226 if (ret != GNUNET_YES)
1227 sql_ret = SQLITE_DONE;
1228 break;
1229 default:
1230 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1231 "sqlite3_step");
1232 }
1233 }
1234 while (sql_ret == SQLITE_ROW);
1235
1236 return ret;
1237}
1238
1239/**
1240 * Retrieve a message fragment range by fragment ID.
1241 *
1242 * @see GNUNET_PSYCSTORE_fragment_get()
1243 *
1244 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1245 */
1246static int
1247fragment_get (void *cls,
1248 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1249 uint64_t first_fragment_id,
1250 uint64_t last_fragment_id,
1251 uint64_t *returned_fragments,
1252 GNUNET_PSYCSTORE_FragmentCallback cb,
1253 void *cb_cls)
1254{
1255 struct Plugin *plugin = cls;
1256
1257 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1258
1259 int ret = GNUNET_SYSERR;
1260 *returned_fragments = 0;
1261
1262 struct GNUNET_MY_QueryParam params_select[] = {
1263 GNUNET_MY_query_param_auto_from_type (channel_key),
1264 GNUNET_MY_query_param_uint64 (&first_fragment_id),
1265 GNUNET_MY_query_param_uint64 (&last_fragment_id),
1266 GNUNET_MY_query_param_end
1267 };
1268
1269 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1270
1271 if (0 != mysql_stmt_reset (stmt))
1272 {
1273 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1274 "mysql_stmt_reset", statement);
1275 return GNUNET_SYSERR;
1276 }
1277
1278 return ret;
1279}
1280
1281
1282/**
1283 * Retrieve a message fragment range by fragment ID.
1284 *
1285 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1286 *
1287 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1288 */
1289static int
1290fragment_get_latest (void *cls,
1291 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1292 uint64_t fragment_limit,
1293 uint64_t *returned_fragments,
1294 GNUNET_PSYCSTORE_FragmentCallback cb,
1295 void *cb_cls)
1296{
1297 struct Plugin *plugin = cls;
1298
1299 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1300
1301 int ret = GNUNET_SYSERR;
1302 *returned_fragments = 0;
1303
1304 struct GNUNET_MY_QueryParam params_select[] = {
1305 GNUNET_MY_query_param_auto_from_type (channel_key),
1306 GNUNET_MY_query_param_uint64 (&fragment_limit),
1307 GNUNET_MY_query_param_end
1308 };
1309
1310 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1311
1312 if (0 != mysql_stmt_reset (stmt))
1313 {
1314 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1315 "mysql_stmt_reset", statement);
1316 return GNUNET_SYSERR;
1317 }
1318
1319 return ret;
1320}
1321
1322
1323/**
1324 * Retrieve all fragments of a message ID range.
1325 *
1326 * @see GNUNET_PSYCSTORE_message_get()
1327 *
1328 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1329 */
1330static int
1331message_get (void *cls,
1332 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1333 uint64_t first_message_id,
1334 uint64_t last_message_id,
1335 uint64_t fragment_limit,
1336 uint64_t *returned_fragments,
1337 GNUNET_PSYCSTORE_FragmentCallback cb,
1338 void *cb_cls)
1339{
1340 struct Plugin *plugin = cls;
1341// sqlite3_stmt *stmt = plugin->select_messages;
1342 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1343
1344 int ret = GNUNET_SYSERR;
1345 *returned_fragments = 0;
1346
1347 struct GNUNET_MY_QueryParams params_select[] = {
1348 GNUNET_MY_query_param_auto_from_type (channel_key),
1349 GNUNET_MY_query_param_uint64 (&first_message_id),
1350 GNUNET_MY_query_param_uint64 (&last_message_id),
1351 GNUNET_MY_query_param_uint64 (&fragment_limit),
1352 GNUNET_MY_query_param_end
1353 };
1354
1355 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1356
1357 if (0 != mysql_stmt_reset (stmt))
1358 {
1359 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1360 "mysql_stmt_reset", statement);
1361 return GNUNET_SYSERR;
1362 }
1363
1364 return ret;
1365}
1366
1367
1368/**
1369 * Retrieve all fragments of the latest messages.
1370 *
1371 * @see GNUNET_PSYCSTORE_message_get_latest()
1372 *
1373 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1374 */
1375static int
1376message_get_latest (void *cls,
1377 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1378 uint64_t message_limit,
1379 uint64_t *returned_fragments,
1380 GNUNET_PSYCSTORE_FragmentCallback cb,
1381 void *cb_cls)
1382{
1383 struct Plugin *plugin = cls;
1384
1385 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1386
1387 int ret = GNUNET_SYSERR;
1388 *returned_fragments = 0;
1389
1390 struct GNUNET_MY_QueryParam params_select[] = {
1391 GNUNET_MY_query_param_auto_from_type (channel_key),
1392 GNUNET_MY_query_param_auto_from_type (channel_key),
1393 GNUNET_MY_query_param_uint64 (&message_limit),
1394 GNUNET_MY_query_param_end
1395 };
1396
1397 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1398
1399 if (0 != mysql_stmt_reset (stmt))
1400 {
1401 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1402 "mysql_stmt_reset", statement);
1403 return GNUNET_SYSERR;
1404 }
1405
1406 return ret;
1407}
1408
1409
1410/**
1411 * Retrieve a fragment of message specified by its message ID and fragment
1412 * offset.
1413 *
1414 * @see GNUNET_PSYCSTORE_message_get_fragment()
1415 *
1416 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1417 */
1418static int
1419message_get_fragment (void *cls,
1420 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1421 uint64_t message_id,
1422 uint64_t fragment_offset,
1423 GNUNET_PSYCSTORE_FragmentCallback cb,
1424 void *cb_cls)
1425{
1426 struct Plugin *plugin = cls;
1427
1428 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1429
1430 int ret = GNUNET_SYSERR;
1431
1432 struct GNUNET_MY_QueryParam params_select[] = {
1433 GNUNET_MY_query_param_auto_from_type (channel_key),
1434 GNUNET_MY_query_param_uint64 (&message_id),
1435 GNUNET_MY_query_param_uint64 (&fragment_offset),
1436 GNUNET_MY_query_param_end
1437 };
1438
1439 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1440 stmt,
1441 params_select))
1442 {
1443 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1444 "mysql execute prepared", statement);
1445 return GNUNET_SYSERR;
1446 }
1447
1448 ret = fragment_row (stmt, cb, cb_cls);
1449
1450 if (0 != mysql_stmt_reset (stmt))
1451 {
1452 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1453 "mysql_stmt_reset", statement);
1454 return GNUNET_SYSERR;
1455 }
1456
1457 return ret;
1458}
1459
1460/**
1461 * Retrieve the max. values of message counters for a channel.
1462 *
1463 * @see GNUNET_PSYCSTORE_counters_get()
1464 *
1465 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1466 */
1467static int
1468counters_message_get (void *cls,
1469 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1470 uint64_t *max_fragment_id,
1471 uint64_t *max_message_id,
1472 uint64_t *max_group_generation)
1473{
1474 struct Plugin *plugin = cls;
1475 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1476 MYSQL_STMT *statement = NULL;
1477
1478 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1479 if (NULL == statement)
1480 {
1481 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1482 "mysql get statement", statement);
1483 return GNUNET_SYSERR;
1484 }
1485
1486 int ret = GNUNET_SYSERR;
1487
1488 struct GNUNET_MY_QueryParam params_select[] = {
1489 GNUNET_MY_query_param_auto_from_type (channel_key),
1490 GNUNET_MY_query_param_end
1491 };
1492
1493 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1494 stmt,
1495 params_select))
1496 {
1497 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1498 "mysql execute prepared", statement);
1499 return GNUNET_SYSERR;
1500 }
1501
1502 struct GNUNET_MY_ResultSpec results_select[] = {
1503 GNUNET_MY_result_spec_uint64 (max_fragment_id),
1504 GNUNET_MY_result_spec_uint64 (max_message_id),
1505 GNUNET_MY_result_spec_uint64 (max_group_generation),
1506 GNUNET_MY_result_spec_end
1507 };
1508
1509 ret = GNUNET_MY_extract_result (stmt,
1510 results_select);
1511
1512 if (GNUNET_OK != ret)
1513 {
1514 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1515 "mysql extract_result", statement);
1516 return GNUNET_SYSERR;
1517 }
1518
1519 if (0 != mysql_stmt_reset (stmt))
1520 {
1521 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1522 "mysql_stmt_reset", statement);
1523 return GNUNET_SYSERR;
1524 }
1525
1526 return ret;
1527}
1528
1529/**
1530 * Retrieve the max. values of state counters for a channel.
1531 *
1532 * @see GNUNET_PSYCSTORE_counters_get()
1533 *
1534 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1535 */
1536static int
1537counters_state_get (void *cls,
1538 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1539 uint64_t *max_state_message_id)
1540{
1541 struct Plugin *plugin = cls;
1542
1543 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1544 MYSQL_STMT *statement = NULL;
1545
1546 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1547 if (NULL == statement)
1548 {
1549 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1550 "mysql get_stmt", statement);
1551 return GNUNET_SYSERR;
1552 }
1553
1554 int ret = GNUNET_SYSERR;
1555
1556 struct GNUNET_MY_QueryParam params_select[] = {
1557 GNUNET_MY_query_param_auto_from_type (channel_key),
1558 GNUNET_MY_query_param_end
1559 };
1560
1561 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1562 stmt,
1563 params_select))
1564 {
1565 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1566 "mysql execute prepared", statement);
1567 return GNUNET_SYSERR;
1568 }
1569
1570 struct GNUNET_MY_ResultSpec results_select[] = {
1571 GNUNET_MY_result_spec_uint64 (max_state_message_id),
1572 GNUNET_MY_result_spec_end
1573 };
1574
1575 ret = GNUNET_MY_extract_result (stmt,
1576 results_select);
1577
1578 if (GNUNET_OK != ret)
1579 {
1580 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1581 "mysql extract_result", statement);
1582 return GNUNET_SYSERR;
1583 }
1584
1585 if (0 != mysql_stmt_reset (stmt))
1586 {
1587 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1588 "mysql_stmt_reset", statement);
1589 return GNUNET_SYSERR;
1590 }
1591
1592 return ret;
1593}
1594
1595
1596/**
1597 * Assign a value to a state variable.
1598 *
1599 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1600 */
1601static int
1602state_assign (struct Plugin *plugin, GNUNET_MYSQL_StatementHandle *stmt,
1603 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1604 const char *name, const void *value, size_t value_size)
1605{
1606 int ret = GNUNET_SYSERR;
1607
1608 MYSQL_STMT *statement = NULL;
1609 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1610
1611 if (NULL == statement)
1612 {
1613 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1614 "mysql get_stmt", statement);
1615 return GNUNET_SYSERR;
1616 }
1617
1618 struct GNUNET_MY_QueryParam params[] = {
1619 GNUNET_MY_query_param_auto_from_type (channel_key),
1620 GNUNET_MY_query_param_string (name),
1621 GNUNET_MY_query_param_auto_from_type (value_size),
1622 GNUNET_MY_query_param_end
1623 };
1624
1625 ret = GNUNET_MY_exec_prepared (plugin->mc,
1626 stmt,
1627 params);
1628
1629 if (GNUNET_OK != ret)
1630 {
1631 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1632 "mysql execute prepared", statement);
1633 return GNUNET_SYSERR;
1634 }
1635
1636 if (0 != mysql_stmt_reset (stmt))
1637 {
1638 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1639 "mysql_stmt_reset", statement);
1640 return GNUNET_SYSERR;
1641 }
1642
1643 return ret;
1644}
1645
1646
1647static int
1648update_message_id (struct Plugin *plugin, GNUNET_MYSQL_StatementHandle *stmt,
1649 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1650 uint64_t message_id)
1651{
1652 MYSQL_STMT *statement = NULL;
1653 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1654
1655 if (NULL == statement)
1656 {
1657 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1658 "mysql get_stmt", statement);
1659 return GNUNET_SYSERR;
1660 }
1661
1662 struct GNUNET_MY_QueryParam params[] = {
1663 GNUNET_MY_query_param_uint64 (&message_id),
1664 GNUNET_MY_query_param_auto_from_type (channel_id),
1665 GNUNET_MY_query_param_end
1666 };
1667
1668 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1669 stmt,
1670 params))
1671 {
1672 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1673 "mysql execute prepared", statement);
1674 return GNUNET_SYSERR;
1675 }
1676
1677 if (0 != mysql_stmt_reset (stmt))
1678 {
1679 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1680 "mysql_stmt_reset", statement);
1681 return GNUNET_SYSERR;
1682 }
1683
1684 return GNUNET_OK;
1685}
1686
1687
1688/**
1689 * Begin modifying current state.
1690 */
1691static int
1692state_modify_begin (void *cls,
1693 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1694 uint64_t message_id, uint64_t state_delta)
1695{
1696 struct Plugin *plugin = cls;
1697
1698 if (state_delta > 0)
1699 {
1700 /**
1701 * We can only apply state modifiers in the current message if modifiers in
1702 * the previous stateful message (message_id - state_delta) were already
1703 * applied.
1704 */
1705
1706 uint64_t max_state_message_id = 0;
1707 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1708 switch (ret)
1709 {
1710 case GNUNET_OK:
1711 case GNUNET_NO: // no state yet
1712 ret = GNUNET_OK;
1713 break;
1714 default:
1715 return ret;
1716 }
1717
1718 if (max_state_message_id < message_id - state_delta)
1719 return GNUNET_NO; /* some stateful messages not yet applied */
1720 else if (message_id - state_delta < max_state_message_id)
1721 return GNUNET_NO; /* changes already applied */
1722 }
1723
1724 if (TRANSACTION_NONE != plugin->transaction)
1725 {
1726 /** @todo FIXME: wait for other transaction to finish */
1727 return GNUNET_SYSERR;
1728 }
1729 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1730}
1731
1732
1733/**
1734 * Set the current value of state variable.
1735 *
1736 * @see GNUNET_PSYCSTORE_state_modify()
1737 *
1738 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1739 */
1740static int
1741state_modify_op (void *cls,
1742 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1743 enum GNUNET_PSYC_Operator op,
1744 const char *name, const void *value, size_t value_size)
1745{
1746 struct Plugin *plugin = cls;
1747 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1748
1749 switch (op)
1750 {
1751 case GNUNET_PSYC_OP_ASSIGN:
1752 return state_assign (plugin, plugin->insert_state_current, channel_key,
1753 name, value, value_size);
1754
1755 default: /** @todo implement more state operations */
1756 GNUNET_break (0);
1757 return GNUNET_SYSERR;
1758 }
1759}
1760
1761
1762/**
1763 * End modifying current state.
1764 */
1765static int
1766state_modify_end (void *cls,
1767 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1768 uint64_t message_id)
1769{
1770 struct Plugin *plugin = cls;
1771 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1772
1773 return
1774 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1775 && GNUNET_OK == update_message_id (plugin,
1776 plugin->update_max_state_message_id,
1777 channel_key, message_id)
1778 && GNUNET_OK == transaction_commit (plugin)
1779 ? GNUNET_OK : GNUNET_SYSERR;
1780}
1781
1782
1783/**
1784 * Begin state synchronization.
1785 */
1786static int
1787state_sync_begin (void *cls,
1788 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1789{
1790 struct Plugin *plugin = cls;
1791 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1792}
1793
1794
1795/**
1796 * Assign current value of a state variable.
1797 *
1798 * @see GNUNET_PSYCSTORE_state_modify()
1799 *
1800 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1801 */
1802static int
1803state_sync_assign (void *cls,
1804 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1805 const char *name, const void *value, size_t value_size)
1806{
1807 struct Plugin *plugin = cls;
1808 return state_assign (cls, plugin->insert_state_sync, channel_key,
1809 name, value, value_size);
1810}
1811
1812
1813/**
1814 * End modifying current state.
1815 */
1816static int
1817state_sync_end (void *cls,
1818 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1819 uint64_t max_state_message_id,
1820 uint64_t state_hash_message_id)
1821{
1822 struct Plugin *plugin = cls;
1823 int ret = GNUNET_SYSERR;
1824
1825 if (TRANSACTION_NONE != plugin->transaction)
1826 {
1827 /** @todo FIXME: wait for other transaction to finish */
1828 return GNUNET_SYSERR;
1829 }
1830
1831 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1832 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1833 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1834 channel_key)
1835 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1836 channel_key)
1837 && GNUNET_OK == update_message_id (plugin,
1838 plugin->update_state_hash_message_id,
1839 channel_key, state_hash_message_id)
1840 && GNUNET_OK == update_message_id (plugin,
1841 plugin->update_max_state_message_id,
1842 channel_key, max_state_message_id)
1843 && GNUNET_OK == transaction_commit (plugin)
1844 ? ret = GNUNET_OK
1845 : transaction_rollback (plugin);
1846 return ret;
1847}
1848
1849
1850/**
1851 * Delete the whole state.
1852 *
1853 * @see GNUNET_PSYCSTORE_state_reset()
1854 *
1855 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1856 */
1857static int
1858state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1859{
1860 struct Plugin *plugin = cls;
1861 return exec_channel (plugin, plugin->delete_state, channel_key);
1862}
1863
1864
1865/**
1866 * Update signed values of state variables in the state store.
1867 *
1868 * @see GNUNET_PSYCSTORE_state_hash_update()
1869 *
1870 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1871 */
1872static int
1873state_update_signed (void *cls,
1874 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1875{
1876 struct Plugin *plugin = cls;
1877 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1878}
1879
1880
1881/**
1882 * Retrieve a state variable by name.
1883 *
1884 * @see GNUNET_PSYCSTORE_state_get()
1885 *
1886 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1887 */
1888static int
1889state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1890 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1891{
1892 struct Plugin *plugin = cls;
1893 int ret = GNUNET_SYSERR;
1894
1895 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1896
1897 struct GNUNET_MY_QueryParam params_select[] = {
1898 GNUNET_MY_query_param_auto_from_type (channel_key),
1899 GNUNET_MY_query_param_string (name),
1900 GNUNET_MY_query_param_end
1901 };
1902
1903 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1904 stmt,
1905 params_select))
1906 {
1907
1908 }
1909
1910 ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0),
1911 sqlite3_column_bytes (stmt, 0));
1912
1913 if (0 != mysql_stmt_reset (stmt))
1914 {
1915 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1916 "mysql_stmt_reset", statement);
1917 return GNUNET_SYSERR;
1918 }
1919
1920 return ret;
1921}
1922
1923
1924/**
1925 * Retrieve all state variables for a channel with the given prefix.
1926 *
1927 * @see GNUNET_PSYCSTORE_state_get_prefix()
1928 *
1929 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1930 */
1931static int
1932state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1933 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1934 void *cb_cls)
1935{
1936 struct Plugin *plugin = cls;
1937 int ret = GNUNET_SYSERR;
1938
1939 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1940 MYSQL_STMT *statement = NULL;
1941 statement = GNUNET_MYSQL_statement_get_stmt (stmt);
1942
1943 if (NULL == statement)
1944 {
1945 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1946 "mysql get_stmt", statement);
1947 return GNUNET_SYSERR;
1948 }
1949
1950 uint32_t name_len = (uint32_t) strlen (name);
1951
1952 struct GNUNET_MY_QueryParam params_select[] = {
1953 GNUNET_MY_query_param_auto_from_type (channel_key),
1954 GNUNET_MY_query_param_string (name),
1955 GNUNET_MY_query_param_uint32 (&name_len),
1956 GNUNET_MY_query_param_string (name),
1957 GNUNET_MY_query_param_end
1958 };
1959
1960 int sql_ret;
1961
1962 sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
1963 stmt,
1964 params_select);
1965
1966 if (GNUNET_OK != sql_ret)
1967 {
1968 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1969 "mysql exec_prepared", statement);
1970 return GNUNET_SYSERR;
1971 }
1972
1973 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
1974 sqlite3_column_blob (stmt, 1),
1975 sqlite3_column_bytes (stmt, 1));
1976
1977 if (0 != mysql_stmt_reset (stmt))
1978 {
1979 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1980 "mysql_stmt_reset", statement);
1981 return GNUNET_SYSERR;
1982 }
1983
1984 return ret;
1985}
1986
1987
1988/**
1989 * Retrieve all signed state variables for a channel.
1990 *
1991 * @see GNUNET_PSYCSTORE_state_get_signed()
1992 *
1993 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1994 */
1995static int
1996state_get_signed (void *cls,
1997 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1998 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1999{
2000 struct Plugin *plugin = cls;
2001 int ret = GNUNET_SYSERR;
2002
2003 //sqlite3_stmt *stmt = plugin->select_state_signed;
2004 GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
2005
2006 struct GNUNET_MY_QueryParam params_select[] = {
2007 GNUNET_MY_query_param_auto_from_type (channel_key),
2008 GNUNET_MY_query_param_end
2009 };
2010
2011 int sql_ret;
2012
2013 sql_ret = GNUNET_MY_exec_prepared (plugin->mc,
2014 stmt,
2015 params_select);
2016
2017 if (GNUNET_OK != sql_ret)
2018 {
2019 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2020 "mysql_exec_prepared", statement);
2021 return GNUNET_SYSERR;
2022 }
2023
2024 ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0),
2025 sqlite3_column_blob (stmt, 1),
2026 sqlite3_column_bytes (stmt, 1));
2027
2028 if (0 != mysql_stmt_reset (stmt))
2029 {
2030 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
2031 "mysql_stmt_reset", statement);
2032 return GNUNET_SYSERR;
2033 }
2034
2035 return ret;
2036}
2037
2038
2039/**
2040 * Entry point for the plugin.
2041 *
2042 * @param cls The struct GNUNET_CONFIGURATION_Handle.
2043 * @return NULL on error, otherwise the plugin context
2044 */
2045void *
2046libgnunet_plugin_psycstore_sqlite_init (void *cls)
2047{
2048 static struct Plugin plugin;
2049 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
2050 struct GNUNET_PSYCSTORE_PluginFunctions *api;
2051
2052 if (NULL != plugin.cfg)
2053 return NULL; /* can only initialize once! */
2054 memset (&plugin, 0, sizeof (struct Plugin));
2055 plugin.cfg = cfg;
2056 if (GNUNET_OK != database_setup (&plugin))
2057 {
2058 database_shutdown (&plugin);
2059 return NULL;
2060 }
2061 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
2062 api->cls = &plugin;
2063 api->membership_store = &sqlite_membership_store;
2064 api->membership_test = &membership_test;
2065 api->fragment_store = &fragment_store;
2066 api->message_add_flags = &message_add_flags;
2067 api->fragment_get = &fragment_get;
2068 api->fragment_get_latest = &fragment_get_latest;
2069 api->message_get = &message_get;
2070 api->message_get_latest = &message_get_latest;
2071 api->message_get_fragment = &message_get_fragment;
2072 api->counters_message_get = &counters_message_get;
2073 api->counters_state_get = &counters_state_get;
2074 api->state_modify_begin = &state_modify_begin;
2075 api->state_modify_op = &state_modify_op;
2076 api->state_modify_end = &state_modify_end;
2077 api->state_sync_begin = &state_sync_begin;
2078 api->state_sync_assign = &state_sync_assign;
2079 api->state_sync_end = &state_sync_end;
2080 api->state_reset = &state_reset;
2081 api->state_update_signed = &state_update_signed;
2082 api->state_get = &state_get;
2083 api->state_get_prefix = &state_get_prefix;
2084 api->state_get_signed = &state_get_signed;
2085
2086 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
2087 return api;
2088}
2089
2090
2091/**
2092 * Exit point from the plugin.
2093 *
2094 * @param cls The plugin context (as returned by "init")
2095 * @return Always NULL
2096 */
2097void *
2098libgnunet_plugin_psycstore_sqlite_done (void *cls)
2099{
2100 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
2101 struct Plugin *plugin = api->cls;
2102
2103 database_shutdown (plugin);
2104 plugin->cfg = NULL;
2105 GNUNET_free (api);
2106 LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n");
2107 return NULL;
2108}
2109
2110/* end of plugin_psycstore_mysql.c */