aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore/plugin_psycstore_mysql.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-02-11 20:39:36 +0100
committerChristian Grothoff <christian@grothoff.org>2019-02-11 20:39:36 +0100
commit1f59e703d82b47f3aeaf432045a2633c2841169b (patch)
tree6af5609b388cf1906a29b5d572bec2dd8fb2ae1c /src/psycstore/plugin_psycstore_mysql.c
downloadgnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.tar.gz
gnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.zip
initial import from gnunet.git
Diffstat (limited to 'src/psycstore/plugin_psycstore_mysql.c')
-rw-r--r--src/psycstore/plugin_psycstore_mysql.c1960
1 files changed, 1960 insertions, 0 deletions
diff --git a/src/psycstore/plugin_psycstore_mysql.c b/src/psycstore/plugin_psycstore_mysql.c
new file mode 100644
index 0000000..c36b6f7
--- /dev/null
+++ b/src/psycstore/plugin_psycstore_mysql.c
@@ -0,0 +1,1960 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psycstore/plugin_psycstore_mysql.c
23 * @brief mysql-based psycstore backend
24 * @author Gabor X Toth
25 * @author Christian Grothoff
26 * @author Christophe Genevey
27 */
28
29#include "platform.h"
30#include "gnunet_psycstore_plugin.h"
31#include "gnunet_psycstore_service.h"
32#include "gnunet_multicast_service.h"
33#include "gnunet_crypto_lib.h"
34#include "gnunet_psyc_util_lib.h"
35#include "psycstore.h"
36#include "gnunet_my_lib.h"
37#include "gnunet_mysql_lib.h"
38#include <mysql/mysql.h>
39
40/**
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
45 *
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
49 */
50#define BUSY_TIMEOUT_MS 1000
51
52#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54/**
55 * Log an error message at log-level 'level' that indicates
56 * a failure of the command 'cmd' on file 'filename'
57 * with the message given by strerror(errno).
58 */
59#define LOG_MYSQL(db, level, cmd, stmt) \
60 do { \
61 GNUNET_log_from (level, "psycstore-mysql", \
62 _("`%s' failed at %s:%d with error: %s\n"), \
63 cmd, __FILE__, __LINE__, \
64 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \
65 } while (0)
66
67#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__)
68
69enum Transactions {
70 TRANSACTION_NONE = 0,
71 TRANSACTION_STATE_MODIFY,
72 TRANSACTION_STATE_SYNC,
73};
74
75/**
76 * Context for all functions in this plugin.
77 */
78struct Plugin
79{
80
81 const struct GNUNET_CONFIGURATION_Handle *cfg;
82
83 /**
84 * MySQL context.
85 */
86 struct GNUNET_MYSQL_Context *mc;
87
88 /**
89 * Current transaction.
90 */
91 enum Transactions transaction;
92
93 /**
94 * Precompiled SQL for channel_key_store()
95 */
96 struct GNUNET_MYSQL_StatementHandle *insert_channel_key;
97
98 /**
99 * Precompiled SQL for slave_key_store()
100 */
101 struct GNUNET_MYSQL_StatementHandle *insert_slave_key;
102
103 /**
104 * Precompiled SQL for membership_store()
105 */
106 struct GNUNET_MYSQL_StatementHandle *insert_membership;
107
108 /**
109 * Precompiled SQL for membership_test()
110 */
111 struct GNUNET_MYSQL_StatementHandle *select_membership;
112
113 /**
114 * Precompiled SQL for fragment_store()
115 */
116 struct GNUNET_MYSQL_StatementHandle *insert_fragment;
117
118 /**
119 * Precompiled SQL for message_add_flags()
120 */
121 struct GNUNET_MYSQL_StatementHandle *update_message_flags;
122
123 /**
124 * Precompiled SQL for fragment_get()
125 */
126 struct GNUNET_MYSQL_StatementHandle *select_fragments;
127
128 /**
129 * Precompiled SQL for fragment_get()
130 */
131 struct GNUNET_MYSQL_StatementHandle *select_latest_fragments;
132
133 /**
134 * Precompiled SQL for message_get()
135 */
136 struct GNUNET_MYSQL_StatementHandle *select_messages;
137
138 /**
139 * Precompiled SQL for message_get()
140 */
141 struct GNUNET_MYSQL_StatementHandle *select_latest_messages;
142
143 /**
144 * Precompiled SQL for message_get_fragment()
145 */
146 struct GNUNET_MYSQL_StatementHandle *select_message_fragment;
147
148 /**
149 * Precompiled SQL for counters_get_message()
150 */
151 struct GNUNET_MYSQL_StatementHandle *select_counters_message;
152
153 /**
154 * Precompiled SQL for counters_get_state()
155 */
156 struct GNUNET_MYSQL_StatementHandle *select_counters_state;
157
158 /**
159 * Precompiled SQL for state_modify_end()
160 */
161 struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id;
162
163 /**
164 * Precompiled SQL for state_sync_end()
165 */
166 struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id;
167
168 /**
169 * Precompiled SQL for state_modify_op()
170 */
171 struct GNUNET_MYSQL_StatementHandle *insert_state_current;
172
173 /**
174 * Precompiled SQL for state_modify_end()
175 */
176 struct GNUNET_MYSQL_StatementHandle *delete_state_empty;
177
178 /**
179 * Precompiled SQL for state_set_signed()
180 */
181 struct GNUNET_MYSQL_StatementHandle *update_state_signed;
182
183 /**
184 * Precompiled SQL for state_sync()
185 */
186 struct GNUNET_MYSQL_StatementHandle *insert_state_sync;
187
188 /**
189 * Precompiled SQL for state_sync()
190 */
191 struct GNUNET_MYSQL_StatementHandle *delete_state;
192
193 /**
194 * Precompiled SQL for state_sync()
195 */
196 struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync;
197
198 /**
199 * Precompiled SQL for state_sync()
200 */
201 struct GNUNET_MYSQL_StatementHandle *delete_state_sync;
202
203 /**
204 * Precompiled SQL for state_get_signed()
205 */
206 struct GNUNET_MYSQL_StatementHandle *select_state_signed;
207
208 /**
209 * Precompiled SQL for state_get()
210 */
211 struct GNUNET_MYSQL_StatementHandle *select_state_one;
212
213 /**
214 * Precompiled SQL for state_get_prefix()
215 */
216 struct GNUNET_MYSQL_StatementHandle *select_state_prefix;
217
218};
219
220#if DEBUG_PSYCSTORE
221
222static void
223mysql_trace (void *cls, const char *sql)
224{
225 LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql);
226}
227
228#endif
229
230
231/**
232 * @brief Prepare a SQL statement
233 *
234 * @param dbh handle to the database
235 * @param sql SQL statement, UTF-8 encoded
236 * @param stmt set to the prepared statement
237 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
238 */
239static int
240mysql_prepare (struct GNUNET_MYSQL_Context *mc,
241 const char *sql,
242 struct GNUNET_MYSQL_StatementHandle **stmt)
243{
244 *stmt = GNUNET_MYSQL_statement_prepare (mc,
245 sql);
246
247 if (NULL == *stmt)
248 {
249 LOG (GNUNET_ERROR_TYPE_ERROR,
250 _("Error preparing SQL query: %s\n %s\n"),
251 mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)),
252 sql);
253 return GNUNET_SYSERR;
254 }
255 LOG (GNUNET_ERROR_TYPE_DEBUG,
256 "Prepared `%s' / %p\n",
257 sql,
258 stmt);
259 return GNUNET_OK;
260}
261
262
263/**
264 * Initialize the database connections and associated
265 * data structures (create tables and indices
266 * as needed as well).
267 *
268 * @param plugin the plugin context (state for this module)
269 * @return #GNUNET_OK on success
270 */
271static int
272database_setup (struct Plugin *plugin)
273{
274 /* Open database and precompile statements */
275 plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg,
276 "psycstore-mysql");
277
278 if (NULL == plugin->mc)
279 {
280 LOG (GNUNET_ERROR_TYPE_ERROR,
281 _("Unable to initialize Mysql.\n"));
282 return GNUNET_SYSERR;
283 }
284
285#define STMT_RUN(sql) \
286 if (GNUNET_OK != \
287 GNUNET_MYSQL_statement_run (plugin->mc, \
288 sql)) \
289 { \
290 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
291 _("Failed to run SQL statement `%s'\n"), \
292 sql); \
293 return GNUNET_SYSERR; \
294 }
295
296 /* Create tables */
297 STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
298 " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
299 " pub_key BLOB(32),\n"
300 " max_state_message_id BIGINT UNSIGNED,\n"
301 " state_hash_message_id BIGINT UNSIGNED,\n"
302 " PRIMARY KEY(id),\n"
303 " UNIQUE KEY(pub_key(32))\n"
304 ");");
305
306 STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
307 " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
308 " pub_key BLOB(32),\n"
309 " PRIMARY KEY(id),\n"
310 " UNIQUE KEY(pub_key(32))\n"
311 ");");
312
313 STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
314 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
315 " slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
316 " did_join TINYINT NOT NULL,\n"
317 " announced_at BIGINT UNSIGNED NOT NULL,\n"
318 " effective_since BIGINT UNSIGNED NOT NULL,\n"
319 " group_generation BIGINT UNSIGNED NOT NULL\n"
320 ");");
321
322/*** FIX because IF NOT EXISTS doesn't work ***/
323 GNUNET_MYSQL_statement_run (plugin->mc,
324 "CREATE INDEX idx_membership_channel_id_slave_id "
325 "ON membership (channel_id, slave_id);");
326
327 /** @todo messages table: add method_name column */
328 STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
329 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
330 " hop_counter BIGINT UNSIGNED NOT NULL,\n"
331 " signature BLOB,\n"
332 " purpose BLOB,\n"
333 " fragment_id BIGINT UNSIGNED NOT NULL,\n"
334 " fragment_offset BIGINT UNSIGNED NOT NULL,\n"
335 " message_id BIGINT UNSIGNED NOT NULL,\n"
336 " group_generation BIGINT UNSIGNED NOT NULL,\n"
337 " multicast_flags BIGINT UNSIGNED NOT NULL,\n"
338 " psycstore_flags BIGINT UNSIGNED NOT NULL,\n"
339 " data BLOB,\n"
340 " PRIMARY KEY (channel_id, fragment_id),\n"
341 " UNIQUE KEY(channel_id, message_id, fragment_offset)\n"
342 ");");
343
344 STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
345 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
346 " name TEXT NOT NULL,\n"
347 " value_current BLOB,\n"
348 " value_signed BLOB\n"
349 //" PRIMARY KEY (channel_id, name(255))\n"
350 ");");
351
352 STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
353 " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
354 " name TEXT NOT NULL,\n"
355 " value BLOB\n"
356 //" PRIMARY KEY (channel_id, name(255))\n"
357 ");");
358#undef STMT_RUN
359
360 /* Prepare statements */
361#define PREP(stmt,handle) \
362 if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \
363 { \
364 GNUNET_break (0); \
365 return GNUNET_SYSERR; \
366 }
367 PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
368 &plugin->insert_channel_key);
369 PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
370 &plugin->insert_slave_key);
371 PREP ("INSERT INTO membership\n"
372 " (channel_id, slave_id, did_join, announced_at,\n"
373 " effective_since, group_generation)\n"
374 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
375 " (SELECT id FROM slaves WHERE pub_key = ?),\n"
376 " ?, ?, ?, ?);",
377 &plugin->insert_membership);
378 PREP ("SELECT did_join FROM membership\n"
379 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
380 " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n"
381 " AND effective_since <= ? AND did_join = 1\n"
382 "ORDER BY announced_at DESC LIMIT 1;",
383 &plugin->select_membership);
384
385 PREP ("INSERT IGNORE INTO messages\n"
386 " (channel_id, hop_counter, signature, purpose,\n"
387 " fragment_id, fragment_offset, message_id,\n"
388 " group_generation, multicast_flags, psycstore_flags, data)\n"
389 "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n"
390 " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
391 &plugin->insert_fragment);
392
393 PREP ("UPDATE messages\n"
394 "SET psycstore_flags = psycstore_flags | ?\n"
395 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
396 " AND message_id = ? AND fragment_offset = 0;",
397 &plugin->update_message_flags);
398
399 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
400 " fragment_offset, message_id, group_generation,\n"
401 " multicast_flags, psycstore_flags, data\n"
402 "FROM messages\n"
403 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
404 " AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;",
405 &plugin->select_fragments);
406
407 /** @todo select_messages: add method_prefix filter */
408 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
409 " fragment_offset, message_id, group_generation,\n"
410 " multicast_flags, psycstore_flags, data\n"
411 "FROM messages\n"
412 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
413 " AND ? <= message_id AND message_id <= ?\n"
414 "LIMIT ?;",
415 &plugin->select_messages);
416
417 PREP ("SELECT * FROM\n"
418 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
419 " fragment_offset, message_id, group_generation,\n"
420 " multicast_flags, psycstore_flags, data\n"
421 " FROM messages\n"
422 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
423 " ORDER BY fragment_id DESC\n"
424 " LIMIT ?)\n"
425 "ORDER BY fragment_id;",
426 &plugin->select_latest_fragments);
427
428 /** @todo select_latest_messages: add method_prefix filter */
429 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
430 " fragment_offset, message_id, group_generation,\n"
431 " multicast_flags, psycstore_flags, data\n"
432 "FROM messages\n"
433 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
434 " AND message_id IN\n"
435 " (SELECT message_id\n"
436 " FROM messages\n"
437 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
438 " GROUP BY message_id\n"
439 " ORDER BY message_id\n"
440 " DESC LIMIT ?)\n"
441 "ORDER BY fragment_id;",
442 &plugin->select_latest_messages);
443
444 PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n"
445 " fragment_offset, message_id, group_generation,\n"
446 " multicast_flags, psycstore_flags, data\n"
447 "FROM messages\n"
448 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
449 " AND message_id = ? AND fragment_offset = ?;",
450 &plugin->select_message_fragment);
451
452 PREP ("SELECT fragment_id, message_id, group_generation\n"
453 "FROM messages\n"
454 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
455 "ORDER BY fragment_id DESC LIMIT 1;",
456 &plugin->select_counters_message);
457
458 PREP ("SELECT max_state_message_id\n"
459 "FROM channels\n"
460 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
461 &plugin->select_counters_state);
462
463 PREP ("UPDATE channels\n"
464 "SET max_state_message_id = ?\n"
465 "WHERE pub_key = ?;",
466 &plugin->update_max_state_message_id);
467
468 PREP ("UPDATE channels\n"
469 "SET state_hash_message_id = ?\n"
470 "WHERE pub_key = ?;",
471 &plugin->update_state_hash_message_id);
472
473 PREP ("REPLACE INTO state\n"
474 " (channel_id, name, value_current, value_signed)\n"
475 "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
476 "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
477 " (SELECT ?) AS name,\n"
478 " (SELECT ?) AS value_current\n"
479 " ) AS new\n"
480 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
481 " FROM state) AS old\n"
482 "ON new.channel_id = old.channel_id AND new.name = old.name;",
483 &plugin->insert_state_current);
484
485 PREP ("DELETE FROM state\n"
486 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
487 " AND (value_current IS NULL OR length(value_current) = 0)\n"
488 " AND (value_signed IS NULL OR length(value_signed) = 0);",
489 &plugin->delete_state_empty);
490
491 PREP ("UPDATE state\n"
492 "SET value_signed = value_current\n"
493 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
494 &plugin->update_state_signed);
495
496 PREP ("DELETE FROM state\n"
497 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
498 &plugin->delete_state);
499
500 PREP ("INSERT INTO state_sync (channel_id, name, value)\n"
501 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
502 &plugin->insert_state_sync);
503
504 PREP ("INSERT INTO state\n"
505 " (channel_id, name, value_current, value_signed)\n"
506 "SELECT channel_id, name, value, value\n"
507 "FROM state_sync\n"
508 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
509 &plugin->insert_state_from_sync);
510
511 PREP ("DELETE FROM state_sync\n"
512 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
513 &plugin->delete_state_sync);
514
515 PREP ("SELECT value_current\n"
516 "FROM state\n"
517 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
518 " AND name = ?;",
519 &plugin->select_state_one);
520
521 PREP ("SELECT name, value_current\n"
522 "FROM state\n"
523 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
524 " AND (name = ? OR substr(name, 1, ?) = ?);",
525 &plugin->select_state_prefix);
526
527 PREP ("SELECT name, value_signed\n"
528 "FROM state\n"
529 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)"
530 " AND value_signed IS NOT NULL;",
531 &plugin->select_state_signed);
532#undef PREP
533
534 return GNUNET_OK;
535}
536
537
538/**
539 * Shutdown database connection and associate data
540 * structures.
541 * @param plugin the plugin context (state for this module)
542 */
543static void
544database_shutdown (struct Plugin *plugin)
545{
546 GNUNET_MYSQL_context_destroy (plugin->mc);
547}
548
549
550/**
551 * Execute a prepared statement with a @a channel_key argument.
552 *
553 * @param plugin Plugin handle.
554 * @param stmt Statement to execute.
555 * @param channel_key Public key of the channel.
556 *
557 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
558 */
559static int
560exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
561 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
562{
563 struct GNUNET_MY_QueryParam params[] = {
564 GNUNET_MY_query_param_auto_from_type (channel_key),
565 GNUNET_MY_query_param_end
566 };
567
568 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
569 {
570 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
571 "mysql exec_channel", stmt);
572 }
573
574 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
575 {
576 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
577 "mysql_stmt_reset", stmt);
578 return GNUNET_SYSERR;
579 }
580
581 return GNUNET_OK;
582}
583
584
585/**
586 * Begin a transaction.
587 */
588static int
589transaction_begin (struct Plugin *plugin, enum Transactions transaction)
590{
591 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
592 {
593 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
594 return GNUNET_SYSERR;
595 }
596
597 plugin->transaction = transaction;
598 return GNUNET_OK;
599}
600
601
602/**
603 * Commit current transaction.
604 */
605static int
606transaction_commit (struct Plugin *plugin)
607{
608 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
609 {
610 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
611 return GNUNET_SYSERR;
612 }
613
614 plugin->transaction = TRANSACTION_NONE;
615 return GNUNET_OK;
616}
617
618
619/**
620 * Roll back current transaction.
621 */
622static int
623transaction_rollback (struct Plugin *plugin)
624{
625 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
626 {
627 LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
628 return GNUNET_SYSERR;
629 }
630
631 plugin->transaction = TRANSACTION_NONE;
632 return GNUNET_OK;
633}
634
635
636static int
637channel_key_store (struct Plugin *plugin,
638 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
639{
640 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key;
641
642 struct GNUNET_MY_QueryParam params[] = {
643 GNUNET_MY_query_param_auto_from_type (channel_key),
644 GNUNET_MY_query_param_end
645 };
646
647 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
648 {
649 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
650 "mysql exec_prepared", stmt);
651 return GNUNET_SYSERR;
652 }
653
654 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
655 {
656 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
657 "mysql_stmt_reset", stmt);
658 return GNUNET_SYSERR;
659 }
660
661 return GNUNET_OK;
662}
663
664
665static int
666slave_key_store (struct Plugin *plugin,
667 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
668{
669 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key;
670
671 struct GNUNET_MY_QueryParam params[] = {
672 GNUNET_MY_query_param_auto_from_type (slave_key),
673 GNUNET_MY_query_param_end
674 };
675
676 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
677 {
678 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
679 "mysql exec_prepared", stmt);
680 return GNUNET_SYSERR;
681 }
682
683 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
684 {
685 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
686 "mysql_stmt_reset", stmt);
687 return GNUNET_SYSERR;
688 }
689
690 return GNUNET_OK;
691}
692
693
694/**
695 * Store join/leave events for a PSYC channel in order to be able to answer
696 * membership test queries later.
697 *
698 * @see GNUNET_PSYCSTORE_membership_store()
699 *
700 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
701 */
702static int
703mysql_membership_store (void *cls,
704 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
705 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
706 int did_join,
707 uint64_t announced_at,
708 uint64_t effective_since,
709 uint64_t group_generation)
710{
711 struct Plugin *plugin = cls;
712
713 uint32_t idid_join = (uint32_t)did_join;
714
715 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership;
716
717 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
718
719 if (announced_at > INT64_MAX ||
720 effective_since > INT64_MAX ||
721 group_generation > INT64_MAX)
722 {
723 GNUNET_break (0);
724 return GNUNET_SYSERR;
725 }
726
727 if (GNUNET_OK != channel_key_store (plugin, channel_key)
728 || GNUNET_OK != slave_key_store (plugin, slave_key))
729 return GNUNET_SYSERR;
730
731 struct GNUNET_MY_QueryParam params[] = {
732 GNUNET_MY_query_param_auto_from_type (channel_key),
733 GNUNET_MY_query_param_auto_from_type (slave_key),
734 GNUNET_MY_query_param_uint32 (&idid_join),
735 GNUNET_MY_query_param_uint64 (&announced_at),
736 GNUNET_MY_query_param_uint64 (&effective_since),
737 GNUNET_MY_query_param_uint64 (&group_generation),
738 GNUNET_MY_query_param_end
739 };
740
741 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
742 {
743 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
744 "mysql exec_prepared", stmt);
745 return GNUNET_SYSERR;
746 }
747
748 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
749 {
750 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
751 "mysql_stmt_reset", stmt);
752 return GNUNET_SYSERR;
753 }
754 return GNUNET_OK;
755}
756
757/**
758 * Test if a member was admitted to the channel at the given message ID.
759 *
760 * @see GNUNET_PSYCSTORE_membership_test()
761 *
762 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
763 * #GNUNET_SYSERR if there was en error.
764 */
765static int
766membership_test (void *cls,
767 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
768 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
769 uint64_t message_id)
770{
771 struct Plugin *plugin = cls;
772
773 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership;
774
775 uint32_t did_join = 0;
776
777 int ret = GNUNET_SYSERR;
778
779 struct GNUNET_MY_QueryParam params_select[] = {
780 GNUNET_MY_query_param_auto_from_type (channel_key),
781 GNUNET_MY_query_param_auto_from_type (slave_key),
782 GNUNET_MY_query_param_uint64 (&message_id),
783 GNUNET_MY_query_param_end
784 };
785
786 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
787 {
788 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
789 "mysql execute prepared", stmt);
790 return GNUNET_SYSERR;
791 }
792
793 struct GNUNET_MY_ResultSpec results_select[] = {
794 GNUNET_MY_result_spec_uint32 (&did_join),
795 GNUNET_MY_result_spec_end
796 };
797
798 switch (GNUNET_MY_extract_result (stmt, results_select))
799 {
800 case GNUNET_NO:
801 ret = GNUNET_NO;
802 break;
803 case GNUNET_OK:
804 ret = GNUNET_YES;
805 break;
806 default:
807 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
808 "mysql extract_result", stmt);
809 return GNUNET_SYSERR;
810 }
811
812 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
813 {
814 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
815 "mysql_stmt_reset", stmt);
816 return GNUNET_SYSERR;
817 }
818
819 return ret;
820}
821
822/**
823 * Store a message fragment sent to a channel.
824 *
825 * @see GNUNET_PSYCSTORE_fragment_store()
826 *
827 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
828 */
829static int
830fragment_store (void *cls,
831 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
832 const struct GNUNET_MULTICAST_MessageHeader *msg,
833 uint32_t psycstore_flags)
834{
835 struct Plugin *plugin = cls;
836
837 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment;
838
839 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
840
841 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
842
843 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
844 uint64_t message_id = GNUNET_ntohll (msg->message_id);
845 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
846
847 uint64_t hop_counter = ntohl(msg->hop_counter);
848 uint64_t flags = ntohl(msg->flags);
849
850 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
851 message_id > INT64_MAX || group_generation > INT64_MAX)
852 {
853 LOG(GNUNET_ERROR_TYPE_ERROR,
854 "Tried to store fragment with a field > INT64_MAX: "
855 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
856 message_id, group_generation);
857 GNUNET_break (0);
858 return GNUNET_SYSERR;
859 }
860
861 if (GNUNET_OK != channel_key_store (plugin, channel_key))
862 return GNUNET_SYSERR;
863
864 struct GNUNET_MY_QueryParam params_insert[] = {
865 GNUNET_MY_query_param_auto_from_type (channel_key),
866 GNUNET_MY_query_param_uint64 (&hop_counter),
867 GNUNET_MY_query_param_auto_from_type (&msg->signature),
868 GNUNET_MY_query_param_auto_from_type (&msg->purpose),
869 GNUNET_MY_query_param_uint64 (&fragment_id),
870 GNUNET_MY_query_param_uint64 (&fragment_offset),
871 GNUNET_MY_query_param_uint64 (&message_id),
872 GNUNET_MY_query_param_uint64 (&group_generation),
873 GNUNET_MY_query_param_uint64 (&flags),
874 GNUNET_MY_query_param_uint32 (&psycstore_flags),
875 GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
876 - sizeof (*msg)),
877 GNUNET_MY_query_param_end
878 };
879
880 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert))
881 {
882 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
883 "mysql execute prepared", stmt);
884 return GNUNET_SYSERR;
885 }
886
887 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
888 {
889 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
890 "mysql_stmt_reset", stmt);
891 return GNUNET_SYSERR;
892 }
893
894 return GNUNET_OK;
895}
896
897/**
898 * Set additional flags for a given message.
899 *
900 * They are OR'd with any existing flags set.
901 *
902 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
903 */
904static int
905message_add_flags (void *cls,
906 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
907 uint64_t message_id,
908 uint32_t psycstore_flags)
909{
910 struct Plugin *plugin = cls;
911 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
912
913 int sql_ret;
914 int ret = GNUNET_SYSERR;
915
916 struct GNUNET_MY_QueryParam params_update[] = {
917 GNUNET_MY_query_param_uint32 (&psycstore_flags),
918 GNUNET_MY_query_param_auto_from_type (channel_key),
919 GNUNET_MY_query_param_uint64 (&message_id),
920 GNUNET_MY_query_param_end
921 };
922
923 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update);
924 switch (sql_ret)
925 {
926 case GNUNET_OK:
927 ret = GNUNET_OK;
928 break;
929
930 default:
931 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
932 "mysql execute prepared", stmt);
933 }
934
935 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
936 {
937 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
938 "mysql_stmt_reset", stmt);
939 return GNUNET_SYSERR;
940 }
941
942 return ret;
943}
944
945
946static int
947fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
948 GNUNET_PSYCSTORE_FragmentCallback cb,
949 void *cb_cls,
950 uint64_t *returned_fragments)
951{
952
953 uint32_t hop_counter;
954 void *signature = NULL;
955 void *purpose = NULL;
956 size_t signature_size;
957 size_t purpose_size;
958 uint64_t fragment_id;
959 uint64_t fragment_offset;
960 uint64_t message_id;
961 uint64_t group_generation;
962 uint64_t flags;
963 void *buf;
964 size_t buf_size;
965 int ret = GNUNET_SYSERR;
966 int sql_ret;
967 struct GNUNET_MULTICAST_MessageHeader *mp;
968 uint64_t msg_flags;
969 struct GNUNET_MY_ResultSpec results[] = {
970 GNUNET_MY_result_spec_uint32 (&hop_counter),
971 GNUNET_MY_result_spec_variable_size (&signature, &signature_size),
972 GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size),
973 GNUNET_MY_result_spec_uint64 (&fragment_id),
974 GNUNET_MY_result_spec_uint64 (&fragment_offset),
975 GNUNET_MY_result_spec_uint64 (&message_id),
976 GNUNET_MY_result_spec_uint64 (&group_generation),
977 GNUNET_MY_result_spec_uint64 (&msg_flags),
978 GNUNET_MY_result_spec_uint64 (&flags),
979 GNUNET_MY_result_spec_variable_size (&buf,
980 &buf_size),
981 GNUNET_MY_result_spec_end
982 };
983
984 do
985 {
986 sql_ret = GNUNET_MY_extract_result (stmt, results);
987 switch (sql_ret)
988 {
989 case GNUNET_NO:
990 if (ret != GNUNET_YES)
991 ret = GNUNET_NO;
992 break;
993
994 case GNUNET_YES:
995 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
996
997 mp->header.size = htons (sizeof (*mp) + buf_size);
998 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
999 mp->hop_counter = htonl (hop_counter);
1000 GNUNET_memcpy (&mp->signature,
1001 signature,
1002 signature_size);
1003 GNUNET_memcpy (&mp->purpose,
1004 purpose,
1005 purpose_size);
1006 mp->fragment_id = GNUNET_htonll (fragment_id);
1007 mp->fragment_offset = GNUNET_htonll (fragment_offset);
1008 mp->message_id = GNUNET_htonll (message_id);
1009 mp->group_generation = GNUNET_htonll (group_generation);
1010 mp->flags = htonl(msg_flags);
1011
1012 GNUNET_memcpy (&mp[1],
1013 buf,
1014 buf_size);
1015 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
1016 if (NULL != returned_fragments)
1017 (*returned_fragments)++;
1018 GNUNET_MY_cleanup_result (results);
1019 break;
1020
1021 default:
1022 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1023 "mysql extract_result", stmt);
1024 }
1025 }
1026 while (GNUNET_YES == sql_ret);
1027
1028 // for debugging
1029 if (GNUNET_NO == ret)
1030 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1031 "Empty result set\n");
1032
1033 return ret;
1034}
1035
1036
1037static int
1038fragment_select (struct Plugin *plugin,
1039 struct GNUNET_MYSQL_StatementHandle *stmt,
1040 struct GNUNET_MY_QueryParam *params,
1041 uint64_t *returned_fragments,
1042 GNUNET_PSYCSTORE_FragmentCallback cb,
1043 void *cb_cls)
1044{
1045 int ret = GNUNET_SYSERR;
1046 int sql_ret;
1047
1048 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1049 switch (sql_ret)
1050 {
1051 case GNUNET_NO:
1052 if (ret != GNUNET_YES)
1053 ret = GNUNET_NO;
1054 break;
1055
1056 case GNUNET_YES:
1057 ret = fragment_row (stmt, cb, cb_cls, returned_fragments);
1058 break;
1059
1060 default:
1061 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1062 "mysql exec_prepared", stmt);
1063 }
1064 return ret;
1065}
1066
1067
1068/**
1069 * Retrieve a message fragment range by fragment ID.
1070 *
1071 * @see GNUNET_PSYCSTORE_fragment_get()
1072 *
1073 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1074 */
1075static int
1076fragment_get (void *cls,
1077 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1078 uint64_t first_fragment_id,
1079 uint64_t last_fragment_id,
1080 uint64_t *returned_fragments,
1081 GNUNET_PSYCSTORE_FragmentCallback cb,
1082 void *cb_cls)
1083{
1084 struct Plugin *plugin = cls;
1085 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments;
1086 int ret = GNUNET_SYSERR;
1087 struct GNUNET_MY_QueryParam params_select[] = {
1088 GNUNET_MY_query_param_auto_from_type (channel_key),
1089 GNUNET_MY_query_param_uint64 (&first_fragment_id),
1090 GNUNET_MY_query_param_uint64 (&last_fragment_id),
1091 GNUNET_MY_query_param_end
1092 };
1093
1094 *returned_fragments = 0;
1095 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1096
1097 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1098 {
1099 LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1100 "mysql_stmt_reset", stmt);
1101 return GNUNET_SYSERR;
1102 }
1103
1104 return ret;
1105}
1106
1107
1108/**
1109 * Retrieve a message fragment range by fragment ID.
1110 *
1111 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1112 *
1113 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1114 */
1115static int
1116fragment_get_latest (void *cls,
1117 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1118 uint64_t fragment_limit,
1119 uint64_t *returned_fragments,
1120 GNUNET_PSYCSTORE_FragmentCallback cb,
1121 void *cb_cls)
1122{
1123 struct Plugin *plugin = cls;
1124
1125 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments;
1126
1127 int ret = GNUNET_SYSERR;
1128 *returned_fragments = 0;
1129
1130 struct GNUNET_MY_QueryParam params_select[] = {
1131 GNUNET_MY_query_param_auto_from_type (channel_key),
1132 GNUNET_MY_query_param_uint64 (&fragment_limit),
1133 GNUNET_MY_query_param_end
1134 };
1135
1136 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1137
1138 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1139 {
1140 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1141 "mysql_stmt_reset", stmt);
1142 return GNUNET_SYSERR;
1143 }
1144
1145 return ret;
1146}
1147
1148
1149/**
1150 * Retrieve all fragments of a message ID range.
1151 *
1152 * @see GNUNET_PSYCSTORE_message_get()
1153 *
1154 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1155 */
1156static int
1157message_get (void *cls,
1158 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1159 uint64_t first_message_id,
1160 uint64_t last_message_id,
1161 uint64_t fragment_limit,
1162 uint64_t *returned_fragments,
1163 GNUNET_PSYCSTORE_FragmentCallback cb,
1164 void *cb_cls)
1165{
1166 struct Plugin *plugin = cls;
1167 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
1168 int ret;
1169
1170 if (0 == fragment_limit)
1171 fragment_limit = UINT64_MAX;
1172
1173 struct GNUNET_MY_QueryParam params_select[] = {
1174 GNUNET_MY_query_param_auto_from_type (channel_key),
1175 GNUNET_MY_query_param_uint64 (&first_message_id),
1176 GNUNET_MY_query_param_uint64 (&last_message_id),
1177 GNUNET_MY_query_param_uint64 (&fragment_limit),
1178 GNUNET_MY_query_param_end
1179 };
1180
1181 *returned_fragments = 0;
1182 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1183
1184 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1185 {
1186 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1187 "mysql_stmt_reset", stmt);
1188 return GNUNET_SYSERR;
1189 }
1190
1191 return ret;
1192}
1193
1194
1195/**
1196 * Retrieve all fragments of the latest messages.
1197 *
1198 * @see GNUNET_PSYCSTORE_message_get_latest()
1199 *
1200 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1201 */
1202static int
1203message_get_latest (void *cls,
1204 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1205 uint64_t message_limit,
1206 uint64_t *returned_fragments,
1207 GNUNET_PSYCSTORE_FragmentCallback cb,
1208 void *cb_cls)
1209{
1210 struct Plugin *plugin = cls;
1211
1212 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages;
1213
1214 int ret = GNUNET_SYSERR;
1215 *returned_fragments = 0;
1216
1217 struct GNUNET_MY_QueryParam params_select[] = {
1218 GNUNET_MY_query_param_auto_from_type (channel_key),
1219 GNUNET_MY_query_param_auto_from_type (channel_key),
1220 GNUNET_MY_query_param_uint64 (&message_limit),
1221 GNUNET_MY_query_param_end
1222 };
1223
1224 ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls);
1225
1226 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1227 {
1228 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1229 "mysql_stmt_reset", stmt);
1230 return GNUNET_SYSERR;
1231 }
1232
1233 return ret;
1234}
1235
1236
1237/**
1238 * Retrieve a fragment of message specified by its message ID and fragment
1239 * offset.
1240 *
1241 * @see GNUNET_PSYCSTORE_message_get_fragment()
1242 *
1243 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1244 */
1245static int
1246message_get_fragment (void *cls,
1247 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1248 uint64_t message_id,
1249 uint64_t fragment_offset,
1250 GNUNET_PSYCSTORE_FragmentCallback cb,
1251 void *cb_cls)
1252{
1253 struct Plugin *plugin = cls;
1254 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment;
1255 int sql_ret;
1256 int ret = GNUNET_SYSERR;
1257
1258 struct GNUNET_MY_QueryParam params_select[] = {
1259 GNUNET_MY_query_param_auto_from_type (channel_key),
1260 GNUNET_MY_query_param_uint64 (&message_id),
1261 GNUNET_MY_query_param_uint64 (&fragment_offset),
1262 GNUNET_MY_query_param_end
1263 };
1264
1265 sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select);
1266 switch (sql_ret)
1267 {
1268 case GNUNET_NO:
1269 ret = GNUNET_NO;
1270 break;
1271
1272 case GNUNET_OK:
1273 ret = fragment_row (stmt, cb, cb_cls, NULL);
1274 break;
1275
1276 default:
1277 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1278 "mysql execute prepared", stmt);
1279 }
1280
1281 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1282 {
1283 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1284 "mysql_stmt_reset", stmt);
1285 return GNUNET_SYSERR;
1286 }
1287
1288 return ret;
1289}
1290
1291/**
1292 * Retrieve the max. values of message counters for a channel.
1293 *
1294 * @see GNUNET_PSYCSTORE_counters_get()
1295 *
1296 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1297 */
1298static int
1299counters_message_get (void *cls,
1300 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1301 uint64_t *max_fragment_id,
1302 uint64_t *max_message_id,
1303 uint64_t *max_group_generation)
1304{
1305 struct Plugin *plugin = cls;
1306
1307 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message;
1308
1309 int ret = GNUNET_SYSERR;
1310
1311 struct GNUNET_MY_QueryParam params_select[] = {
1312 GNUNET_MY_query_param_auto_from_type (channel_key),
1313 GNUNET_MY_query_param_end
1314 };
1315
1316 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1317 {
1318 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1319 "mysql execute prepared", stmt);
1320 return GNUNET_SYSERR;
1321 }
1322
1323 struct GNUNET_MY_ResultSpec results_select[] = {
1324 GNUNET_MY_result_spec_uint64 (max_fragment_id),
1325 GNUNET_MY_result_spec_uint64 (max_message_id),
1326 GNUNET_MY_result_spec_uint64 (max_group_generation),
1327 GNUNET_MY_result_spec_end
1328 };
1329
1330 ret = GNUNET_MY_extract_result (stmt, results_select);
1331
1332 if (GNUNET_OK != ret)
1333 {
1334 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1335 "mysql extract_result", stmt);
1336 return GNUNET_SYSERR;
1337 }
1338
1339 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1340 {
1341 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1342 "mysql_stmt_reset", stmt);
1343 return GNUNET_SYSERR;
1344 }
1345
1346 return ret;
1347}
1348
1349/**
1350 * Retrieve the max. values of state counters for a channel.
1351 *
1352 * @see GNUNET_PSYCSTORE_counters_get()
1353 *
1354 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1355 */
1356static int
1357counters_state_get (void *cls,
1358 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1359 uint64_t *max_state_message_id)
1360{
1361 struct Plugin *plugin = cls;
1362
1363 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state;
1364
1365 int ret = GNUNET_SYSERR;
1366
1367 struct GNUNET_MY_QueryParam params_select[] = {
1368 GNUNET_MY_query_param_auto_from_type (channel_key),
1369 GNUNET_MY_query_param_end
1370 };
1371
1372 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1373 {
1374 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1375 "mysql execute prepared", stmt);
1376 return GNUNET_SYSERR;
1377 }
1378
1379 struct GNUNET_MY_ResultSpec results_select[] = {
1380 GNUNET_MY_result_spec_uint64 (max_state_message_id),
1381 GNUNET_MY_result_spec_end
1382 };
1383
1384 ret = GNUNET_MY_extract_result (stmt, results_select);
1385
1386 if (GNUNET_OK != ret)
1387 {
1388 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1389 "mysql extract_result", stmt);
1390 return GNUNET_SYSERR;
1391 }
1392
1393 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1394 {
1395 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1396 "mysql_stmt_reset", stmt);
1397 return GNUNET_SYSERR;
1398 }
1399
1400 return ret;
1401}
1402
1403
1404/**
1405 * Assign a value to a state variable.
1406 *
1407 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1408 */
1409static int
1410state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1411 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1412 const char *name, const void *value, size_t value_size)
1413{
1414 int ret = GNUNET_SYSERR;
1415
1416 struct GNUNET_MY_QueryParam params[] = {
1417 GNUNET_MY_query_param_auto_from_type (channel_key),
1418 GNUNET_MY_query_param_string (name),
1419 GNUNET_MY_query_param_fixed_size(value, value_size),
1420 GNUNET_MY_query_param_end
1421 };
1422
1423 ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
1424 if (GNUNET_OK != ret)
1425 {
1426 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1427 "mysql exec_prepared", stmt);
1428 return GNUNET_SYSERR;
1429 }
1430
1431 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1432 {
1433 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1434 "mysql_stmt_reset", stmt);
1435 return GNUNET_SYSERR;
1436 }
1437
1438 return ret;
1439}
1440
1441
1442static int
1443update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
1444 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1445 uint64_t message_id)
1446{
1447 struct GNUNET_MY_QueryParam params[] = {
1448 GNUNET_MY_query_param_uint64 (&message_id),
1449 GNUNET_MY_query_param_auto_from_type (channel_key),
1450 GNUNET_MY_query_param_end
1451 };
1452
1453 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
1454 stmt,
1455 params))
1456 {
1457 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1458 "mysql execute prepared", stmt);
1459 return GNUNET_SYSERR;
1460 }
1461
1462 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1463 {
1464 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1465 "mysql_stmt_reset", stmt);
1466 return GNUNET_SYSERR;
1467 }
1468
1469 return GNUNET_OK;
1470}
1471
1472
1473/**
1474 * Begin modifying current state.
1475 */
1476static int
1477state_modify_begin (void *cls,
1478 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1479 uint64_t message_id, uint64_t state_delta)
1480{
1481 struct Plugin *plugin = cls;
1482
1483 if (state_delta > 0)
1484 {
1485 /**
1486 * We can only apply state modifiers in the current message if modifiers in
1487 * the previous stateful message (message_id - state_delta) were already
1488 * applied.
1489 */
1490
1491 uint64_t max_state_message_id = 0;
1492 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1493 switch (ret)
1494 {
1495 case GNUNET_OK:
1496 case GNUNET_NO: // no state yet
1497 ret = GNUNET_OK;
1498 break;
1499 default:
1500 return ret;
1501 }
1502
1503 if (max_state_message_id < message_id - state_delta)
1504 return GNUNET_NO; /* some stateful messages not yet applied */
1505 else if (message_id - state_delta < max_state_message_id)
1506 return GNUNET_NO; /* changes already applied */
1507 }
1508
1509 if (TRANSACTION_NONE != plugin->transaction)
1510 {
1511 /** @todo FIXME: wait for other transaction to finish */
1512 return GNUNET_SYSERR;
1513 }
1514 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1515}
1516
1517
1518/**
1519 * Set the current value of state variable.
1520 *
1521 * @see GNUNET_PSYCSTORE_state_modify()
1522 *
1523 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1524 */
1525static int
1526state_modify_op (void *cls,
1527 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1528 enum GNUNET_PSYC_Operator op,
1529 const char *name, const void *value, size_t value_size)
1530{
1531 struct Plugin *plugin = cls;
1532 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1533
1534 switch (op)
1535 {
1536 case GNUNET_PSYC_OP_ASSIGN:
1537 return state_assign (plugin, plugin->insert_state_current,
1538 channel_key, name, value, value_size);
1539
1540 default: /** @todo implement more state operations */
1541 GNUNET_break (0);
1542 return GNUNET_SYSERR;
1543 }
1544}
1545
1546
1547/**
1548 * End modifying current state.
1549 */
1550static int
1551state_modify_end (void *cls,
1552 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1553 uint64_t message_id)
1554{
1555 struct Plugin *plugin = cls;
1556 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1557
1558 return
1559 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1560 && GNUNET_OK == update_message_id (plugin,
1561 plugin->update_max_state_message_id,
1562 channel_key, message_id)
1563 && GNUNET_OK == transaction_commit (plugin)
1564 ? GNUNET_OK : GNUNET_SYSERR;
1565}
1566
1567
1568/**
1569 * Begin state synchronization.
1570 */
1571static int
1572state_sync_begin (void *cls,
1573 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1574{
1575 struct Plugin *plugin = cls;
1576 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1577}
1578
1579
1580/**
1581 * Assign current value of a state variable.
1582 *
1583 * @see GNUNET_PSYCSTORE_state_modify()
1584 *
1585 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1586 */
1587static int
1588state_sync_assign (void *cls,
1589 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1590 const char *name, const void *value, size_t value_size)
1591{
1592 struct Plugin *plugin = cls;
1593 return state_assign (cls, plugin->insert_state_sync,
1594 channel_key, name, value, value_size);
1595}
1596
1597
1598/**
1599 * End modifying current state.
1600 */
1601static int
1602state_sync_end (void *cls,
1603 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1604 uint64_t max_state_message_id,
1605 uint64_t state_hash_message_id)
1606{
1607 struct Plugin *plugin = cls;
1608 int ret = GNUNET_SYSERR;
1609
1610 if (TRANSACTION_NONE != plugin->transaction)
1611 {
1612 /** @todo FIXME: wait for other transaction to finish */
1613 return GNUNET_SYSERR;
1614 }
1615
1616 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1617 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1618 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1619 channel_key)
1620 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1621 channel_key)
1622 && GNUNET_OK == update_message_id (plugin,
1623 plugin->update_state_hash_message_id,
1624 channel_key, state_hash_message_id)
1625 && GNUNET_OK == update_message_id (plugin,
1626 plugin->update_max_state_message_id,
1627 channel_key, max_state_message_id)
1628 && GNUNET_OK == transaction_commit (plugin)
1629 ? ret = GNUNET_OK
1630 : transaction_rollback (plugin);
1631 return ret;
1632}
1633
1634
1635/**
1636 * Delete the whole state.
1637 *
1638 * @see GNUNET_PSYCSTORE_state_reset()
1639 *
1640 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1641 */
1642static int
1643state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1644{
1645 struct Plugin *plugin = cls;
1646 return exec_channel (plugin, plugin->delete_state, channel_key);
1647}
1648
1649
1650/**
1651 * Update signed values of state variables in the state store.
1652 *
1653 * @see GNUNET_PSYCSTORE_state_hash_update()
1654 *
1655 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1656 */
1657static int
1658state_update_signed (void *cls,
1659 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1660{
1661 struct Plugin *plugin = cls;
1662 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1663}
1664
1665
1666/**
1667 * Retrieve a state variable by name.
1668 *
1669 * @see GNUNET_PSYCSTORE_state_get()
1670 *
1671 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1672 */
1673static int
1674state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1675 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1676{
1677 struct Plugin *plugin = cls;
1678 int ret = GNUNET_SYSERR;
1679 int sql_ret ;
1680
1681 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one;
1682
1683 struct GNUNET_MY_QueryParam params_select[] = {
1684 GNUNET_MY_query_param_auto_from_type (channel_key),
1685 GNUNET_MY_query_param_string (name),
1686 GNUNET_MY_query_param_end
1687 };
1688
1689 void *value_current = NULL;
1690 size_t value_size = 0;
1691
1692 struct GNUNET_MY_ResultSpec results[] = {
1693 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1694 GNUNET_MY_result_spec_end
1695 };
1696
1697 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1698 {
1699 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1700 "mysql exec_prepared", stmt);
1701 }
1702 else
1703 {
1704 sql_ret = GNUNET_MY_extract_result (stmt, results);
1705 switch (sql_ret)
1706 {
1707 case GNUNET_NO:
1708 ret = GNUNET_NO;
1709 break;
1710
1711 case GNUNET_YES:
1712 ret = cb (cb_cls, name, value_current, value_size);
1713 break;
1714
1715 default:
1716 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1717 "mysql extract_result", stmt);
1718 }
1719 }
1720
1721 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1722 {
1723 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1724 "mysql_stmt_reset", stmt);
1725 return GNUNET_SYSERR;
1726 }
1727
1728 return ret;
1729}
1730
1731
1732/**
1733 * Retrieve all state variables for a channel with the given prefix.
1734 *
1735 * @see GNUNET_PSYCSTORE_state_get_prefix()
1736 *
1737 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1738 */
1739static int
1740state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1741 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1742 void *cb_cls)
1743{
1744 struct Plugin *plugin = cls;
1745 int ret = GNUNET_SYSERR;
1746
1747 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix;
1748
1749 uint32_t name_len = (uint32_t) strlen (name);
1750
1751 struct GNUNET_MY_QueryParam params_select[] = {
1752 GNUNET_MY_query_param_auto_from_type (channel_key),
1753 GNUNET_MY_query_param_string (name),
1754 GNUNET_MY_query_param_uint32 (&name_len),
1755 GNUNET_MY_query_param_string (name),
1756 GNUNET_MY_query_param_end
1757 };
1758
1759 char *name2 = "";
1760 void *value_current = NULL;
1761 size_t value_size = 0;
1762
1763 struct GNUNET_MY_ResultSpec results[] = {
1764 GNUNET_MY_result_spec_string (&name2),
1765 GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
1766 GNUNET_MY_result_spec_end
1767 };;
1768
1769 int sql_ret;
1770
1771 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1772 {
1773 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1774 "mysql exec_prepared", stmt);
1775 return GNUNET_SYSERR;
1776 }
1777
1778 do
1779 {
1780 sql_ret = GNUNET_MY_extract_result (stmt, results);
1781 switch (sql_ret)
1782 {
1783 case GNUNET_NO:
1784 if (ret != GNUNET_YES)
1785 ret = GNUNET_NO;
1786 break;
1787
1788 case GNUNET_YES:
1789 ret = cb (cb_cls, (const char *) name2, value_current, value_size);
1790
1791 if (ret != GNUNET_YES)
1792 sql_ret = GNUNET_NO;
1793 break;
1794
1795 default:
1796 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1797 "mysql extract_result", stmt);
1798 }
1799 }
1800 while (sql_ret == GNUNET_YES);
1801
1802 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1803 {
1804 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1805 "mysql_stmt_reset", stmt);
1806 return GNUNET_SYSERR;
1807 }
1808
1809 return ret;
1810}
1811
1812
1813/**
1814 * Retrieve all signed state variables for a channel.
1815 *
1816 * @see GNUNET_PSYCSTORE_state_get_signed()
1817 *
1818 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1819 */
1820static int
1821state_get_signed (void *cls,
1822 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1823 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1824{
1825 struct Plugin *plugin = cls;
1826 int ret = GNUNET_SYSERR;
1827
1828 struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed;
1829
1830 struct GNUNET_MY_QueryParam params_select[] = {
1831 GNUNET_MY_query_param_auto_from_type (channel_key),
1832 GNUNET_MY_query_param_end
1833 };
1834
1835 int sql_ret;
1836
1837 char *name = "";
1838 void *value_signed = NULL;
1839 size_t value_size = 0;
1840
1841 struct GNUNET_MY_ResultSpec results[] = {
1842 GNUNET_MY_result_spec_string (&name),
1843 GNUNET_MY_result_spec_variable_size (&value_signed, &value_size),
1844 GNUNET_MY_result_spec_end
1845 };
1846
1847 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
1848 {
1849 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1850 "mysql exec_prepared", stmt);
1851 return GNUNET_SYSERR;
1852 }
1853
1854 do
1855 {
1856 sql_ret = GNUNET_MY_extract_result (stmt, results);
1857 switch (sql_ret)
1858 {
1859 case GNUNET_NO:
1860 if (ret != GNUNET_YES)
1861 ret = GNUNET_NO;
1862 break;
1863
1864 case GNUNET_YES:
1865 ret = cb (cb_cls, (const char *) name, value_signed, value_size);
1866
1867 if (ret != GNUNET_YES)
1868 sql_ret = GNUNET_NO;
1869 break;
1870
1871 default:
1872 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1873 "mysql extract_result", stmt);
1874 }
1875 }
1876 while (sql_ret == GNUNET_YES);
1877
1878 if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
1879 {
1880 LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1881 "mysql_stmt_reset", stmt);
1882 return GNUNET_SYSERR;
1883 }
1884
1885 return ret;
1886}
1887
1888
1889/**
1890 * Entry point for the plugin.
1891 *
1892 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1893 * @return NULL on error, otherwise the plugin context
1894 */
1895void *
1896libgnunet_plugin_psycstore_mysql_init (void *cls)
1897{
1898 static struct Plugin plugin;
1899 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1900 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1901
1902 if (NULL != plugin.cfg)
1903 return NULL; /* can only initialize once! */
1904 memset (&plugin, 0, sizeof (struct Plugin));
1905 plugin.cfg = cfg;
1906 if (GNUNET_OK != database_setup (&plugin))
1907 {
1908 database_shutdown (&plugin);
1909 return NULL;
1910 }
1911 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1912 api->cls = &plugin;
1913 api->membership_store = &mysql_membership_store;
1914 api->membership_test = &membership_test;
1915 api->fragment_store = &fragment_store;
1916 api->message_add_flags = &message_add_flags;
1917 api->fragment_get = &fragment_get;
1918 api->fragment_get_latest = &fragment_get_latest;
1919 api->message_get = &message_get;
1920 api->message_get_latest = &message_get_latest;
1921 api->message_get_fragment = &message_get_fragment;
1922 api->counters_message_get = &counters_message_get;
1923 api->counters_state_get = &counters_state_get;
1924 api->state_modify_begin = &state_modify_begin;
1925 api->state_modify_op = &state_modify_op;
1926 api->state_modify_end = &state_modify_end;
1927 api->state_sync_begin = &state_sync_begin;
1928 api->state_sync_assign = &state_sync_assign;
1929 api->state_sync_end = &state_sync_end;
1930 api->state_reset = &state_reset;
1931 api->state_update_signed = &state_update_signed;
1932 api->state_get = &state_get;
1933 api->state_get_prefix = &state_get_prefix;
1934 api->state_get_signed = &state_get_signed;
1935
1936 LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n"));
1937 return api;
1938}
1939
1940
1941/**
1942 * Exit point from the plugin.
1943 *
1944 * @param cls The plugin context (as returned by "init")
1945 * @return Always NULL
1946 */
1947void *
1948libgnunet_plugin_psycstore_mysql_done (void *cls)
1949{
1950 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1951 struct Plugin *plugin = api->cls;
1952
1953 database_shutdown (plugin);
1954 plugin->cfg = NULL;
1955 GNUNET_free (api);
1956 LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n");
1957 return NULL;
1958}
1959
1960/* end of plugin_psycstore_mysql.c */