aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore/plugin_psycstore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psycstore/plugin_psycstore_postgres.c')
-rw-r--r--src/psycstore/plugin_psycstore_postgres.c1530
1 files changed, 1530 insertions, 0 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c
new file mode 100644
index 0000000..33c9960
--- /dev/null
+++ b/src/psycstore/plugin_psycstore_postgres.c
@@ -0,0 +1,1530 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2016 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psycstore/plugin_psycstore_postgres.c
23 * @brief PostgresQL-based psycstore backend
24 * @author Daniel Golle
25 * @author Gabor X Toth
26 * @author Christian Grothoff
27 * @author Christophe Genevey
28 * @author Jeffrey Burdges
29 */
30
31#include "platform.h"
32#include "gnunet_psycstore_plugin.h"
33#include "gnunet_psycstore_service.h"
34#include "gnunet_multicast_service.h"
35#include "gnunet_crypto_lib.h"
36#include "gnunet_psyc_util_lib.h"
37#include "psycstore.h"
38#include "gnunet_pq_lib.h"
39
40/**
41 * After how many ms "busy" should a DB operation fail for good? A
42 * low value makes sure that we are more responsive to requests
43 * (especially PUTs). A high value guarantees a higher success rate
44 * (SELECTs in iterate can take several seconds despite LIMIT=1).
45 *
46 * The default value of 1s should ensure that users do not experience
47 * huge latencies while at the same time allowing operations to
48 * succeed with reasonable probability.
49 */
50#define BUSY_TIMEOUT_MS 1000
51
52#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING
53
54#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__)
55
56enum Transactions {
57 TRANSACTION_NONE = 0,
58 TRANSACTION_STATE_MODIFY,
59 TRANSACTION_STATE_SYNC,
60};
61
62/**
63 * Context for all functions in this plugin.
64 */
65struct Plugin
66{
67
68 const struct GNUNET_CONFIGURATION_Handle *cfg;
69
70 /**
71 * Native Postgres database handle.
72 */
73 PGconn *dbh;
74
75 enum Transactions transaction;
76
77 void *cls;
78};
79
80
81/**
82 * Initialize the database connections and associated
83 * data structures (create tables and indices
84 * as needed as well).
85 *
86 * @param plugin the plugin context (state for this module)
87 * @return #GNUNET_OK on success
88 */
89static int
90database_setup (struct Plugin *plugin)
91{
92 struct GNUNET_PQ_ExecuteStatement es[] = {
93 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
94 " id SERIAL,\n"
95 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
96 " max_state_message_id BIGINT,\n"
97 " state_hash_message_id BIGINT,\n"
98 " PRIMARY KEY(id)\n"
99 ")"
100 "WITH OIDS"),
101 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
102 " ON channels (pub_key)"),
103 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
104 " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
105 "RETURNS NULL ON NULL INPUT"),
106 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
107 " id SERIAL,\n"
108 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
109 " PRIMARY KEY(id)\n"
110 ")"
111 "WITH OIDS"),
112 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
113 " ON slaves (pub_key)"),
114 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
115 " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
116 "RETURNS NULL ON NULL INPUT"),
117 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
118 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
119 " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
120 " did_join INT NOT NULL,\n"
121 " announced_at BIGINT NOT NULL,\n"
122 " effective_since BIGINT NOT NULL,\n"
123 " group_generation BIGINT NOT NULL\n"
124 ")"
125 "WITH OIDS"),
126 GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
127 "ON membership (channel_id, slave_id)"),
128 /** @todo messages table: add method_name column */
129 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
130 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
131 " hop_counter INT NOT NULL,\n"
132 " signature BYTEA CHECK (LENGTH(signature)=64),\n"
133 " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
134 " fragment_id BIGINT NOT NULL,\n"
135 " fragment_offset BIGINT NOT NULL,\n"
136 " message_id BIGINT NOT NULL,\n"
137 " group_generation BIGINT NOT NULL,\n"
138 " multicast_flags INT NOT NULL,\n"
139 " psycstore_flags INT NOT NULL,\n"
140 " data BYTEA,\n"
141 " PRIMARY KEY (channel_id, fragment_id),\n"
142 " UNIQUE (channel_id, message_id, fragment_offset)\n"
143 ")"
144 "WITH OIDS"),
145 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
146 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
147 " name TEXT NOT NULL,\n"
148 " value_current BYTEA,\n"
149 " value_signed BYTEA,\n"
150 " PRIMARY KEY (channel_id, name)\n"
151 ")"
152 "WITH OIDS"),
153 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
154 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
155 " name TEXT NOT NULL,\n"
156 " value BYTEA,\n"
157 " PRIMARY KEY (channel_id, name)\n"
158 ")"
159 "WITH OIDS"),
160 GNUNET_PQ_EXECUTE_STATEMENT_END
161 };
162
163 /* Open database and precompile statements */
164 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
165 "psycstore-postgres");
166 if (NULL == plugin->dbh)
167 return GNUNET_SYSERR;
168 if (GNUNET_OK !=
169 GNUNET_PQ_exec_statements (plugin->dbh,
170 es))
171 {
172 PQfinish (plugin->dbh);
173 plugin->dbh = NULL;
174 return GNUNET_SYSERR;
175 }
176
177 /* Prepare statements */
178 {
179 struct GNUNET_PQ_PreparedStatement ps[] = {
180 GNUNET_PQ_make_prepare ("transaction_begin",
181 "BEGIN", 0),
182 GNUNET_PQ_make_prepare ("transaction_commit",
183 "COMMIT", 0),
184 GNUNET_PQ_make_prepare ("transaction_rollback",
185 "ROLLBACK", 0),
186 GNUNET_PQ_make_prepare ("insert_channel_key",
187 "INSERT INTO channels (pub_key) VALUES ($1)"
188 " ON CONFLICT DO NOTHING", 1),
189 GNUNET_PQ_make_prepare ("insert_slave_key",
190 "INSERT INTO slaves (pub_key) VALUES ($1)"
191 " ON CONFLICT DO NOTHING", 1),
192 GNUNET_PQ_make_prepare ("insert_membership",
193 "INSERT INTO membership\n"
194 " (channel_id, slave_id, did_join, announced_at,\n"
195 " effective_since, group_generation)\n"
196 "VALUES (get_chan_id($1),\n"
197 " get_slave_id($2),\n"
198 " $3, $4, $5, $6)", 6),
199 GNUNET_PQ_make_prepare ("select_membership",
200 "SELECT did_join FROM membership\n"
201 "WHERE channel_id = get_chan_id($1)\n"
202 " AND slave_id = get_slave_id($2)\n"
203 " AND effective_since <= $3 AND did_join = 1\n"
204 "ORDER BY announced_at DESC LIMIT 1", 3),
205 GNUNET_PQ_make_prepare ("insert_fragment",
206 "INSERT INTO messages\n"
207 " (channel_id, hop_counter, signature, purpose,\n"
208 " fragment_id, fragment_offset, message_id,\n"
209 " group_generation, multicast_flags, psycstore_flags, data)\n"
210 "VALUES (get_chan_id($1),\n"
211 " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
212 "ON CONFLICT DO NOTHING", 11),
213 GNUNET_PQ_make_prepare ("update_message_flags",
214 "UPDATE messages\n"
215 "SET psycstore_flags = psycstore_flags | $1\n"
216 "WHERE channel_id = get_chan_id($2) \n"
217 " AND message_id = $3 AND fragment_offset = 0", 3),
218 GNUNET_PQ_make_prepare ("select_fragments",
219 "SELECT hop_counter, signature, purpose, fragment_id,\n"
220 " fragment_offset, message_id, group_generation,\n"
221 " multicast_flags, psycstore_flags, data\n"
222 "FROM messages\n"
223 "WHERE channel_id = get_chan_id($1) \n"
224 " AND $2 <= fragment_id AND fragment_id <= $3", 3),
225 /** @todo select_messages: add method_prefix filter */
226 GNUNET_PQ_make_prepare ("select_messages",
227 "SELECT hop_counter, signature, purpose, fragment_id,\n"
228 " fragment_offset, message_id, group_generation,\n"
229 " multicast_flags, psycstore_flags, data\n"
230 "FROM messages\n"
231 "WHERE channel_id = get_chan_id($1) \n"
232 " AND $2 <= message_id AND message_id <= $3\n"
233 "LIMIT $4;", 4),
234 /** @todo select_latest_messages: add method_prefix filter */
235 GNUNET_PQ_make_prepare ("select_latest_fragments",
236 "SELECT rev.hop_counter AS hop_counter,\n"
237 " rev.signature AS signature,\n"
238 " rev.purpose AS purpose,\n"
239 " rev.fragment_id AS fragment_id,\n"
240 " rev.fragment_offset AS fragment_offset,\n"
241 " rev.message_id AS message_id,\n"
242 " rev.group_generation AS group_generation,\n"
243 " rev.multicast_flags AS multicast_flags,\n"
244 " rev.psycstore_flags AS psycstore_flags,\n"
245 " rev.data AS data\n"
246 " FROM\n"
247 " (SELECT hop_counter, signature, purpose, fragment_id,\n"
248 " fragment_offset, message_id, group_generation,\n"
249 " multicast_flags, psycstore_flags, data \n"
250 " FROM messages\n"
251 " WHERE channel_id = get_chan_id($1) \n"
252 " ORDER BY fragment_id DESC\n"
253 " LIMIT $2) AS rev\n"
254 " ORDER BY rev.fragment_id;", 2),
255 GNUNET_PQ_make_prepare ("select_latest_messages",
256 "SELECT hop_counter, signature, purpose, fragment_id,\n"
257 " fragment_offset, message_id, group_generation,\n"
258 " multicast_flags, psycstore_flags, data\n"
259 "FROM messages\n"
260 "WHERE channel_id = get_chan_id($1)\n"
261 " AND message_id IN\n"
262 " (SELECT message_id\n"
263 " FROM messages\n"
264 " WHERE channel_id = get_chan_id($2) \n"
265 " GROUP BY message_id\n"
266 " ORDER BY message_id\n"
267 " DESC LIMIT $3)\n"
268 "ORDER BY fragment_id", 3),
269 GNUNET_PQ_make_prepare ("select_message_fragment",
270 "SELECT hop_counter, signature, purpose, fragment_id,\n"
271 " fragment_offset, message_id, group_generation,\n"
272 " multicast_flags, psycstore_flags, data\n"
273 "FROM messages\n"
274 "WHERE channel_id = get_chan_id($1) \n"
275 " AND message_id = $2 AND fragment_offset = $3", 3),
276 GNUNET_PQ_make_prepare ("select_counters_message",
277 "SELECT fragment_id, message_id, group_generation\n"
278 "FROM messages\n"
279 "WHERE channel_id = get_chan_id($1)\n"
280 "ORDER BY fragment_id DESC LIMIT 1", 1),
281 GNUNET_PQ_make_prepare ("select_counters_state",
282 "SELECT max_state_message_id\n"
283 "FROM channels\n"
284 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
285 GNUNET_PQ_make_prepare ("update_max_state_message_id",
286 "UPDATE channels\n"
287 "SET max_state_message_id = $1\n"
288 "WHERE pub_key = $2", 2),
289
290 GNUNET_PQ_make_prepare ("update_state_hash_message_id",
291 "UPDATE channels\n"
292 "SET state_hash_message_id = $1\n"
293 "WHERE pub_key = $2", 2),
294 GNUNET_PQ_make_prepare ("insert_state_current",
295 "INSERT INTO state\n"
296 " (channel_id, name, value_current, value_signed)\n"
297 "SELECT new.channel_id, new.name,\n"
298 " new.value_current, old.value_signed\n"
299 "FROM (SELECT get_chan_id($1) AS channel_id,\n"
300 " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
301 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
302 " FROM state) AS old\n"
303 "ON new.channel_id = old.channel_id AND new.name = old.name\n"
304 "ON CONFLICT (channel_id, name)\n"
305 " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
306 " value_signed = EXCLUDED.value_signed", 3),
307 GNUNET_PQ_make_prepare ("delete_state_empty",
308 "DELETE FROM state\n"
309 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
310 " AND (value_current IS NULL OR length(value_current) = 0)\n"
311 " AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
312 GNUNET_PQ_make_prepare ("update_state_signed",
313 "UPDATE state\n"
314 "SET value_signed = value_current\n"
315 "WHERE channel_id = get_chan_id($1) ", 1),
316 GNUNET_PQ_make_prepare ("delete_state",
317 "DELETE FROM state\n"
318 "WHERE channel_id = get_chan_id($1) ", 1),
319 GNUNET_PQ_make_prepare ("insert_state_sync",
320 "INSERT INTO state_sync (channel_id, name, value)\n"
321 "VALUES (get_chan_id($1), $2, $3)", 3),
322 GNUNET_PQ_make_prepare ("insert_state_from_sync",
323 "INSERT INTO state\n"
324 " (channel_id, name, value_current, value_signed)\n"
325 "SELECT channel_id, name, value, value\n"
326 "FROM state_sync\n"
327 "WHERE channel_id = get_chan_id($1)", 1),
328 GNUNET_PQ_make_prepare ("delete_state_sync",
329 "DELETE FROM state_sync\n"
330 "WHERE channel_id = get_chan_id($1)", 1),
331 GNUNET_PQ_make_prepare ("select_state_one",
332 "SELECT value_current\n"
333 "FROM state\n"
334 "WHERE channel_id = get_chan_id($1)\n"
335 " AND name = $2", 2),
336 GNUNET_PQ_make_prepare ("select_state_prefix",
337 "SELECT name, value_current\n"
338 "FROM state\n"
339 "WHERE channel_id = get_chan_id($1)\n"
340 " AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
341 GNUNET_PQ_make_prepare ("select_state_signed",
342 "SELECT name, value_signed\n"
343 "FROM state\n"
344 "WHERE channel_id = get_chan_id($1)\n"
345 " AND value_signed IS NOT NULL", 1),
346 GNUNET_PQ_PREPARED_STATEMENT_END
347 };
348
349 if (GNUNET_OK !=
350 GNUNET_PQ_prepare_statements (plugin->dbh,
351 ps))
352 {
353 PQfinish (plugin->dbh);
354 plugin->dbh = NULL;
355 return GNUNET_SYSERR;
356 }
357 }
358
359 return GNUNET_OK;
360}
361
362
363/**
364 * Shutdown database connection and associate data
365 * structures.
366 * @param plugin the plugin context (state for this module)
367 */
368static void
369database_shutdown (struct Plugin *plugin)
370{
371 PQfinish (plugin->dbh);
372 plugin->dbh = NULL;
373}
374
375
376/**
377 * Execute a prepared statement with a @a channel_key argument.
378 *
379 * @param plugin Plugin handle.
380 * @param stmt Statement to execute.
381 * @param channel_key Public key of the channel.
382 *
383 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
384 */
385static int
386exec_channel (struct Plugin *plugin, const char *stmt,
387 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
388{
389 struct GNUNET_PQ_QueryParam params[] = {
390 GNUNET_PQ_query_param_auto_from_type (channel_key),
391 GNUNET_PQ_query_param_end
392 };
393
394 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
395 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
396 return GNUNET_SYSERR;
397
398 return GNUNET_OK;
399}
400
401
402/**
403 * Begin a transaction.
404 */
405static int
406transaction_begin (struct Plugin *plugin, enum Transactions transaction)
407{
408 struct GNUNET_PQ_QueryParam params[] = {
409 GNUNET_PQ_query_param_end
410 };
411
412 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
413 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
414 return GNUNET_SYSERR;
415
416 plugin->transaction = transaction;
417 return GNUNET_OK;
418}
419
420
421/**
422 * Commit current transaction.
423 */
424static int
425transaction_commit (struct Plugin *plugin)
426{
427 struct GNUNET_PQ_QueryParam params[] = {
428 GNUNET_PQ_query_param_end
429 };
430
431 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
432 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
433 return GNUNET_SYSERR;
434
435 plugin->transaction = TRANSACTION_NONE;
436 return GNUNET_OK;
437}
438
439
440/**
441 * Roll back current transaction.
442 */
443static int
444transaction_rollback (struct Plugin *plugin)
445{
446 struct GNUNET_PQ_QueryParam params[] = {
447 GNUNET_PQ_query_param_end
448 };
449
450 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
451 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
452 return GNUNET_SYSERR;
453
454 plugin->transaction = TRANSACTION_NONE;
455 return GNUNET_OK;
456}
457
458
459static int
460channel_key_store (struct Plugin *plugin,
461 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
462{
463 struct GNUNET_PQ_QueryParam params[] = {
464 GNUNET_PQ_query_param_auto_from_type (channel_key),
465 GNUNET_PQ_query_param_end
466 };
467
468 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
469 GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
470 "insert_channel_key",
471 params))
472 return GNUNET_SYSERR;
473
474 return GNUNET_OK;
475}
476
477
478static int
479slave_key_store (struct Plugin *plugin,
480 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
481{
482 struct GNUNET_PQ_QueryParam params[] = {
483 GNUNET_PQ_query_param_auto_from_type (slave_key),
484 GNUNET_PQ_query_param_end
485 };
486
487 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
488 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
489 return GNUNET_SYSERR;
490
491 return GNUNET_OK;
492}
493
494
495/**
496 * Store join/leave events for a PSYC channel in order to be able to answer
497 * membership test queries later.
498 *
499 * @see GNUNET_PSYCSTORE_membership_store()
500 *
501 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
502 */
503static int
504postgres_membership_store (void *cls,
505 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
506 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
507 int did_join,
508 uint64_t announced_at,
509 uint64_t effective_since,
510 uint64_t group_generation)
511{
512 struct Plugin *plugin = cls;
513 uint32_t idid_join = (uint32_t) did_join;
514
515 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
516
517 if ( (announced_at > INT64_MAX) ||
518 (effective_since > INT64_MAX) ||
519 (group_generation > INT64_MAX) )
520 {
521 GNUNET_break (0);
522 return GNUNET_SYSERR;
523 }
524
525 if ( (GNUNET_OK !=
526 channel_key_store (plugin, channel_key)) ||
527 (GNUNET_OK !=
528 slave_key_store (plugin, slave_key)) )
529 return GNUNET_SYSERR;
530
531 struct GNUNET_PQ_QueryParam params[] = {
532 GNUNET_PQ_query_param_auto_from_type (channel_key),
533 GNUNET_PQ_query_param_auto_from_type (slave_key),
534 GNUNET_PQ_query_param_uint32 (&idid_join),
535 GNUNET_PQ_query_param_uint64 (&announced_at),
536 GNUNET_PQ_query_param_uint64 (&effective_since),
537 GNUNET_PQ_query_param_uint64 (&group_generation),
538 GNUNET_PQ_query_param_end
539 };
540
541 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
542 GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
543 "insert_membership",
544 params))
545 return GNUNET_SYSERR;
546
547 return GNUNET_OK;
548}
549
550/**
551 * Test if a member was admitted to the channel at the given message ID.
552 *
553 * @see GNUNET_PSYCSTORE_membership_test()
554 *
555 * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not,
556 * #GNUNET_SYSERR if there was en error.
557 */
558static int
559membership_test (void *cls,
560 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
561 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
562 uint64_t message_id)
563{
564 struct Plugin *plugin = cls;
565
566 uint32_t did_join = 0;
567
568 struct GNUNET_PQ_QueryParam params_select[] = {
569 GNUNET_PQ_query_param_auto_from_type (channel_key),
570 GNUNET_PQ_query_param_auto_from_type (slave_key),
571 GNUNET_PQ_query_param_uint64 (&message_id),
572 GNUNET_PQ_query_param_end
573 };
574
575 struct GNUNET_PQ_ResultSpec results_select[] = {
576 GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
577 GNUNET_PQ_result_spec_end
578 };
579
580 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
581 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership",
582 params_select, results_select))
583 return GNUNET_SYSERR;
584
585 return GNUNET_OK;
586}
587
588/**
589 * Store a message fragment sent to a channel.
590 *
591 * @see GNUNET_PSYCSTORE_fragment_store()
592 *
593 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
594 */
595static int
596fragment_store (void *cls,
597 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
598 const struct GNUNET_MULTICAST_MessageHeader *msg,
599 uint32_t psycstore_flags)
600{
601 struct Plugin *plugin = cls;
602
603 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
604
605 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
606
607 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
608 uint64_t message_id = GNUNET_ntohll (msg->message_id);
609 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
610
611 uint32_t hop_counter = ntohl(msg->hop_counter);
612 uint32_t flags = ntohl(msg->flags);
613
614 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
615 message_id > INT64_MAX || group_generation > INT64_MAX)
616 {
617 LOG(GNUNET_ERROR_TYPE_ERROR,
618 "Tried to store fragment with a field > INT64_MAX: "
619 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
620 message_id, group_generation);
621 GNUNET_break (0);
622 return GNUNET_SYSERR;
623 }
624
625 if (GNUNET_OK != channel_key_store (plugin, channel_key))
626 return GNUNET_SYSERR;
627
628 struct GNUNET_PQ_QueryParam params_insert[] = {
629 GNUNET_PQ_query_param_auto_from_type (channel_key),
630 GNUNET_PQ_query_param_uint32 (&hop_counter),
631 GNUNET_PQ_query_param_auto_from_type (&msg->signature),
632 GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
633 GNUNET_PQ_query_param_uint64 (&fragment_id),
634 GNUNET_PQ_query_param_uint64 (&fragment_offset),
635 GNUNET_PQ_query_param_uint64 (&message_id),
636 GNUNET_PQ_query_param_uint64 (&group_generation),
637 GNUNET_PQ_query_param_uint32 (&flags),
638 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
639 GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
640 GNUNET_PQ_query_param_end
641 };
642
643 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
644 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
645 return GNUNET_SYSERR;
646
647 return GNUNET_OK;
648}
649
650/**
651 * Set additional flags for a given message.
652 *
653 * They are OR'd with any existing flags set.
654 *
655 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
656 */
657static int
658message_add_flags (void *cls,
659 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
660 uint64_t message_id,
661 uint32_t psycstore_flags)
662{
663 struct Plugin *plugin = cls;
664
665 struct GNUNET_PQ_QueryParam params_update[] = {
666 GNUNET_PQ_query_param_uint32 (&psycstore_flags),
667 GNUNET_PQ_query_param_auto_from_type (channel_key),
668 GNUNET_PQ_query_param_uint64 (&message_id),
669 GNUNET_PQ_query_param_end
670 };
671
672 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
673 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
674 return GNUNET_SYSERR;
675
676 return GNUNET_OK;
677}
678
679
680/**
681 * Closure for #fragment_rows.
682 */
683struct FragmentRowsContext {
684 GNUNET_PSYCSTORE_FragmentCallback cb;
685 void *cb_cls;
686
687 uint64_t *returned_fragments;
688
689 /* I preserved this but I do not see the point since
690 * it cannot stop the loop early and gets overwritten ?? */
691 int ret;
692};
693
694
695/**
696 * Callback that retrieves the results of a SELECT statement
697 * reading form the messages table.
698 *
699 * Only passed to GNUNET_PQ_eval_prepared_multi_select and
700 * has type GNUNET_PQ_PostgresResultHandler.
701 *
702 * @param cls closure
703 * @param result the postgres result
704 * @param num_result the number of results in @a result
705 */
706void fragment_rows (void *cls,
707 PGresult *res,
708 unsigned int num_results)
709{
710 struct FragmentRowsContext *c = cls;
711
712 for (unsigned int i=0;i<num_results;i++)
713 {
714 uint32_t hop_counter;
715 void *signature = NULL;
716 void *purpose = NULL;
717 size_t signature_size;
718 size_t purpose_size;
719 uint64_t fragment_id;
720 uint64_t fragment_offset;
721 uint64_t message_id;
722 uint64_t group_generation;
723 uint32_t flags;
724 void *buf;
725 size_t buf_size;
726 uint32_t msg_flags;
727 struct GNUNET_PQ_ResultSpec results[] = {
728 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
729 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
730 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
731 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
732 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
733 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
734 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
735 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
736 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
737 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
738 GNUNET_PQ_result_spec_end
739 };
740 struct GNUNET_MULTICAST_MessageHeader *mp;
741
742 if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
743 {
744 GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */
745 break; /* nothing more?? */
746 }
747
748 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
749
750 mp->header.size = htons (sizeof (*mp) + buf_size);
751 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
752 mp->hop_counter = htonl (hop_counter);
753 GNUNET_memcpy (&mp->signature,
754 signature, signature_size);
755 GNUNET_memcpy (&mp->purpose,
756 purpose, purpose_size);
757 mp->fragment_id = GNUNET_htonll (fragment_id);
758 mp->fragment_offset = GNUNET_htonll (fragment_offset);
759 mp->message_id = GNUNET_htonll (message_id);
760 mp->group_generation = GNUNET_htonll (group_generation);
761 mp->flags = htonl(msg_flags);
762
763 GNUNET_memcpy (&mp[1],
764 buf, buf_size);
765 GNUNET_PQ_cleanup_result(results);
766 c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
767 if (NULL != c->returned_fragments)
768 (*c->returned_fragments)++;
769 }
770}
771
772
773static int
774fragment_select (struct Plugin *plugin,
775 const char *stmt,
776 struct GNUNET_PQ_QueryParam *params,
777 uint64_t *returned_fragments,
778 GNUNET_PSYCSTORE_FragmentCallback cb,
779 void *cb_cls)
780{
781 /* Stack based closure */
782 struct FragmentRowsContext frc = {
783 .cb = cb,
784 .cb_cls = cb_cls,
785 .returned_fragments = returned_fragments,
786 .ret = GNUNET_SYSERR
787 };
788
789 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
790 stmt, params,
791 &fragment_rows, &frc))
792 return GNUNET_SYSERR;
793 return frc.ret; /* GNUNET_OK ?? */
794}
795
796/**
797 * Retrieve a message fragment range by fragment ID.
798 *
799 * @see GNUNET_PSYCSTORE_fragment_get()
800 *
801 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
802 */
803static int
804fragment_get (void *cls,
805 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
806 uint64_t first_fragment_id,
807 uint64_t last_fragment_id,
808 uint64_t *returned_fragments,
809 GNUNET_PSYCSTORE_FragmentCallback cb,
810 void *cb_cls)
811{
812 struct Plugin *plugin = cls;
813 struct GNUNET_PQ_QueryParam params_select[] = {
814 GNUNET_PQ_query_param_auto_from_type (channel_key),
815 GNUNET_PQ_query_param_uint64 (&first_fragment_id),
816 GNUNET_PQ_query_param_uint64 (&last_fragment_id),
817 GNUNET_PQ_query_param_end
818 };
819
820 *returned_fragments = 0;
821 return fragment_select (plugin,
822 "select_fragments",
823 params_select,
824 returned_fragments,
825 cb, cb_cls);
826}
827
828
829/**
830 * Retrieve a message fragment range by fragment ID.
831 *
832 * @see GNUNET_PSYCSTORE_fragment_get_latest()
833 *
834 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
835 */
836static int
837fragment_get_latest (void *cls,
838 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
839 uint64_t fragment_limit,
840 uint64_t *returned_fragments,
841 GNUNET_PSYCSTORE_FragmentCallback cb,
842 void *cb_cls)
843{
844 struct Plugin *plugin = cls;
845
846 *returned_fragments = 0;
847
848 struct GNUNET_PQ_QueryParam params_select[] = {
849 GNUNET_PQ_query_param_auto_from_type (channel_key),
850 GNUNET_PQ_query_param_uint64 (&fragment_limit),
851 GNUNET_PQ_query_param_end
852 };
853
854 return fragment_select (plugin,
855 "select_latest_fragments",
856 params_select,
857 returned_fragments,
858 cb, cb_cls);
859}
860
861
862/**
863 * Retrieve all fragments of a message ID range.
864 *
865 * @see GNUNET_PSYCSTORE_message_get()
866 *
867 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
868 */
869static int
870message_get (void *cls,
871 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
872 uint64_t first_message_id,
873 uint64_t last_message_id,
874 uint64_t fragment_limit,
875 uint64_t *returned_fragments,
876 GNUNET_PSYCSTORE_FragmentCallback cb,
877 void *cb_cls)
878{
879 struct Plugin *plugin = cls;
880 struct GNUNET_PQ_QueryParam params_select[] = {
881 GNUNET_PQ_query_param_auto_from_type (channel_key),
882 GNUNET_PQ_query_param_uint64 (&first_message_id),
883 GNUNET_PQ_query_param_uint64 (&last_message_id),
884 GNUNET_PQ_query_param_uint64 (&fragment_limit),
885 GNUNET_PQ_query_param_end
886 };
887
888 if (0 == fragment_limit)
889 fragment_limit = INT64_MAX;
890 *returned_fragments = 0;
891 return fragment_select (plugin,
892 "select_messages",
893 params_select,
894 returned_fragments,
895 cb, cb_cls);
896}
897
898
899/**
900 * Retrieve all fragments of the latest messages.
901 *
902 * @see GNUNET_PSYCSTORE_message_get_latest()
903 *
904 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
905 */
906static int
907message_get_latest (void *cls,
908 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
909 uint64_t message_limit,
910 uint64_t *returned_fragments,
911 GNUNET_PSYCSTORE_FragmentCallback cb,
912 void *cb_cls)
913{
914 struct Plugin *plugin = cls;
915 struct GNUNET_PQ_QueryParam params_select[] = {
916 GNUNET_PQ_query_param_auto_from_type (channel_key),
917 GNUNET_PQ_query_param_auto_from_type (channel_key),
918 GNUNET_PQ_query_param_uint64 (&message_limit),
919 GNUNET_PQ_query_param_end
920 };
921
922 *returned_fragments = 0;
923 return fragment_select (plugin,
924 "select_latest_messages",
925 params_select,
926 returned_fragments,
927 cb, cb_cls);
928}
929
930
931/**
932 * Retrieve a fragment of message specified by its message ID and fragment
933 * offset.
934 *
935 * @see GNUNET_PSYCSTORE_message_get_fragment()
936 *
937 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
938 */
939static int
940message_get_fragment (void *cls,
941 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
942 uint64_t message_id,
943 uint64_t fragment_offset,
944 GNUNET_PSYCSTORE_FragmentCallback cb,
945 void *cb_cls)
946{
947 struct Plugin *plugin = cls;
948 const char *stmt = "select_message_fragment";
949
950 struct GNUNET_PQ_QueryParam params_select[] = {
951 GNUNET_PQ_query_param_auto_from_type (channel_key),
952 GNUNET_PQ_query_param_uint64 (&message_id),
953 GNUNET_PQ_query_param_uint64 (&fragment_offset),
954 GNUNET_PQ_query_param_end
955 };
956
957 /* Stack based closure */
958 struct FragmentRowsContext frc = {
959 .cb = cb,
960 .cb_cls = cb_cls,
961 .returned_fragments = NULL,
962 .ret = GNUNET_SYSERR
963 };
964
965 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
966 stmt, params_select,
967 &fragment_rows, &frc))
968 return GNUNET_SYSERR;
969 return frc.ret; /* GNUNET_OK ?? */
970}
971
972/**
973 * Retrieve the max. values of message counters for a channel.
974 *
975 * @see GNUNET_PSYCSTORE_counters_get()
976 *
977 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
978 */
979static int
980counters_message_get (void *cls,
981 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
982 uint64_t *max_fragment_id,
983 uint64_t *max_message_id,
984 uint64_t *max_group_generation)
985{
986 struct Plugin *plugin = cls;
987
988 const char *stmt = "select_counters_message";
989
990 struct GNUNET_PQ_QueryParam params_select[] = {
991 GNUNET_PQ_query_param_auto_from_type (channel_key),
992 GNUNET_PQ_query_param_end
993 };
994
995 struct GNUNET_PQ_ResultSpec results_select[] = {
996 GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
997 GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
998 GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
999 GNUNET_PQ_result_spec_end
1000 };
1001
1002 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1003 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1004 params_select, results_select))
1005 return GNUNET_SYSERR;
1006
1007 return GNUNET_OK;
1008}
1009
1010/**
1011 * Retrieve the max. values of state counters for a channel.
1012 *
1013 * @see GNUNET_PSYCSTORE_counters_get()
1014 *
1015 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1016 */
1017static int
1018counters_state_get (void *cls,
1019 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1020 uint64_t *max_state_message_id)
1021{
1022 struct Plugin *plugin = cls;
1023
1024 const char *stmt = "select_counters_state";
1025
1026 struct GNUNET_PQ_QueryParam params_select[] = {
1027 GNUNET_PQ_query_param_auto_from_type (channel_key),
1028 GNUNET_PQ_query_param_end
1029 };
1030
1031 struct GNUNET_PQ_ResultSpec results_select[] = {
1032 GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1033 GNUNET_PQ_result_spec_end
1034 };
1035
1036 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1037 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1038 params_select, results_select))
1039 return GNUNET_SYSERR;
1040
1041 return GNUNET_OK;
1042}
1043
1044
1045/**
1046 * Assign a value to a state variable.
1047 *
1048 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1049 */
1050static int
1051state_assign (struct Plugin *plugin, const char *stmt,
1052 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1053 const char *name, const void *value, size_t value_size)
1054{
1055 struct GNUNET_PQ_QueryParam params[] = {
1056 GNUNET_PQ_query_param_auto_from_type (channel_key),
1057 GNUNET_PQ_query_param_string (name),
1058 GNUNET_PQ_query_param_fixed_size (value, value_size),
1059 GNUNET_PQ_query_param_end
1060 };
1061
1062 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1063 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1064 return GNUNET_SYSERR;
1065
1066 return GNUNET_OK;
1067}
1068
1069
1070static int
1071update_message_id (struct Plugin *plugin,
1072 const char *stmt,
1073 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1074 uint64_t message_id)
1075{
1076 struct GNUNET_PQ_QueryParam params[] = {
1077 GNUNET_PQ_query_param_uint64 (&message_id),
1078 GNUNET_PQ_query_param_auto_from_type (channel_key),
1079 GNUNET_PQ_query_param_end
1080 };
1081
1082 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1083 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1084 return GNUNET_SYSERR;
1085
1086 return GNUNET_OK;
1087}
1088
1089
1090/**
1091 * Begin modifying current state.
1092 */
1093static int
1094state_modify_begin (void *cls,
1095 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1096 uint64_t message_id, uint64_t state_delta)
1097{
1098 struct Plugin *plugin = cls;
1099
1100 if (state_delta > 0)
1101 {
1102 /**
1103 * We can only apply state modifiers in the current message if modifiers in
1104 * the previous stateful message (message_id - state_delta) were already
1105 * applied.
1106 */
1107
1108 uint64_t max_state_message_id = 0;
1109 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1110 switch (ret)
1111 {
1112 case GNUNET_OK:
1113 case GNUNET_NO: // no state yet
1114 ret = GNUNET_OK;
1115 break;
1116
1117 default:
1118 return ret;
1119 }
1120
1121 if (max_state_message_id < message_id - state_delta)
1122 return GNUNET_NO; /* some stateful messages not yet applied */
1123 else if (message_id - state_delta < max_state_message_id)
1124 return GNUNET_NO; /* changes already applied */
1125 }
1126
1127 if (TRANSACTION_NONE != plugin->transaction)
1128 {
1129 /** @todo FIXME: wait for other transaction to finish */
1130 return GNUNET_SYSERR;
1131 }
1132 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1133}
1134
1135
1136/**
1137 * Set the current value of state variable.
1138 *
1139 * @see GNUNET_PSYCSTORE_state_modify()
1140 *
1141 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1142 */
1143static int
1144state_modify_op (void *cls,
1145 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1146 enum GNUNET_PSYC_Operator op,
1147 const char *name, const void *value, size_t value_size)
1148{
1149 struct Plugin *plugin = cls;
1150 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1151
1152 switch (op)
1153 {
1154 case GNUNET_PSYC_OP_ASSIGN:
1155 return state_assign (plugin, "insert_state_current",
1156 channel_key, name, value, value_size);
1157
1158 default: /** @todo implement more state operations */
1159 GNUNET_break (0);
1160 return GNUNET_SYSERR;
1161 }
1162}
1163
1164
1165/**
1166 * End modifying current state.
1167 */
1168static int
1169state_modify_end (void *cls,
1170 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1171 uint64_t message_id)
1172{
1173 struct Plugin *plugin = cls;
1174 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1175
1176 return
1177 GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key)
1178 && GNUNET_OK == update_message_id (plugin,
1179 "update_max_state_message_id",
1180 channel_key, message_id)
1181 && GNUNET_OK == transaction_commit (plugin)
1182 ? GNUNET_OK : GNUNET_SYSERR;
1183}
1184
1185
1186/**
1187 * Begin state synchronization.
1188 */
1189static int
1190state_sync_begin (void *cls,
1191 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1192{
1193 struct Plugin *plugin = cls;
1194 return exec_channel (plugin, "delete_state_sync", channel_key);
1195}
1196
1197
1198/**
1199 * Assign current value of a state variable.
1200 *
1201 * @see GNUNET_PSYCSTORE_state_modify()
1202 *
1203 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1204 */
1205static int
1206state_sync_assign (void *cls,
1207 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1208 const char *name, const void *value, size_t value_size)
1209{
1210 struct Plugin *plugin = cls;
1211 return state_assign (plugin, "insert_state_sync",
1212 channel_key, name, value, value_size);
1213}
1214
1215
1216/**
1217 * End modifying current state.
1218 */
1219static int
1220state_sync_end (void *cls,
1221 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222 uint64_t max_state_message_id,
1223 uint64_t state_hash_message_id)
1224{
1225 struct Plugin *plugin = cls;
1226 int ret = GNUNET_SYSERR;
1227
1228 if (TRANSACTION_NONE != plugin->transaction)
1229 {
1230 /** @todo FIXME: wait for other transaction to finish */
1231 return GNUNET_SYSERR;
1232 }
1233
1234 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1235 && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key)
1236 && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync",
1237 channel_key)
1238 && GNUNET_OK == exec_channel (plugin, "delete_state_sync",
1239 channel_key)
1240 && GNUNET_OK == update_message_id (plugin,
1241 "update_state_hash_message_id",
1242 channel_key, state_hash_message_id)
1243 && GNUNET_OK == update_message_id (plugin,
1244 "update_max_state_message_id",
1245 channel_key, max_state_message_id)
1246 && GNUNET_OK == transaction_commit (plugin)
1247 ? ret = GNUNET_OK
1248 : transaction_rollback (plugin);
1249 return ret;
1250}
1251
1252
1253/**
1254 * Delete the whole state.
1255 *
1256 * @see GNUNET_PSYCSTORE_state_reset()
1257 *
1258 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1259 */
1260static int
1261state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1262{
1263 struct Plugin *plugin = cls;
1264 return exec_channel (plugin, "delete_state", channel_key);
1265}
1266
1267
1268/**
1269 * Update signed values of state variables in the state store.
1270 *
1271 * @see GNUNET_PSYCSTORE_state_hash_update()
1272 *
1273 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1274 */
1275static int
1276state_update_signed (void *cls,
1277 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
1278{
1279 struct Plugin *plugin = cls;
1280 return exec_channel (plugin, "update_state_signed", channel_key);
1281}
1282
1283
1284/**
1285 * Retrieve a state variable by name.
1286 *
1287 * @see GNUNET_PSYCSTORE_state_get()
1288 *
1289 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1290 */
1291static int
1292state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1293 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1294{
1295 struct Plugin *plugin = cls;
1296
1297 const char *stmt = "select_state_one";
1298
1299 struct GNUNET_PQ_QueryParam params_select[] = {
1300 GNUNET_PQ_query_param_auto_from_type (channel_key),
1301 GNUNET_PQ_query_param_string (name),
1302 GNUNET_PQ_query_param_end
1303 };
1304
1305 void *value_current = NULL;
1306 size_t value_size = 0;
1307
1308 struct GNUNET_PQ_ResultSpec results_select[] = {
1309 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1310 GNUNET_PQ_result_spec_end
1311 };
1312
1313 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1314 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1315 params_select, results_select))
1316 return GNUNET_SYSERR;
1317
1318 return cb (cb_cls, name, value_current,
1319 value_size);
1320}
1321
1322
1323
1324/**
1325 * Closure for #get_state_cb.
1326 */
1327struct GetStateContext {
1328 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
1329 // const char *name,
1330 GNUNET_PSYCSTORE_StateCallback cb;
1331 void *cb_cls;
1332
1333 const char *value_id;
1334
1335 /* I preserved this but I do not see the point since
1336 * it cannot stop the loop early and gets overwritten ?? */
1337 int ret;
1338};
1339
1340
1341/**
1342 * Callback that retrieves the results of a SELECT statement
1343 * reading form the state table.
1344 *
1345 * Only passed to GNUNET_PQ_eval_prepared_multi_select and
1346 * has type GNUNET_PQ_PostgresResultHandler.
1347 *
1348 * @param cls closure
1349 * @param result the postgres result
1350 * @param num_result the number of results in @a result
1351 */
1352static void
1353get_state_cb (void *cls,
1354 PGresult *res,
1355 unsigned int num_results)
1356{
1357 struct GetStateContext *c = cls;
1358
1359 for (unsigned int i=0;i<num_results;i++)
1360 {
1361 char *name = "";
1362 void *value = NULL;
1363 size_t value_size = 0;
1364
1365 struct GNUNET_PQ_ResultSpec results[] = {
1366 GNUNET_PQ_result_spec_string ("name", &name),
1367 GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size),
1368 GNUNET_PQ_result_spec_end
1369 };
1370
1371 if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
1372 {
1373 GNUNET_PQ_cleanup_result(results); /* previously invoked via PQclear?? */
1374 break; /* nothing more?? */
1375 }
1376
1377 c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size);
1378 GNUNET_PQ_cleanup_result(results);
1379 }
1380}
1381
1382/**
1383 * Retrieve all state variables for a channel with the given prefix.
1384 *
1385 * @see GNUNET_PSYCSTORE_state_get_prefix()
1386 *
1387 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1388 */
1389static int
1390state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1391 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1392 void *cb_cls)
1393{
1394 struct Plugin *plugin = cls;
1395
1396 const char *stmt = "select_state_prefix";
1397
1398 uint32_t name_len = (uint32_t) strlen (name);
1399
1400 struct GNUNET_PQ_QueryParam params_select[] = {
1401 GNUNET_PQ_query_param_auto_from_type (channel_key),
1402 GNUNET_PQ_query_param_string (name),
1403 GNUNET_PQ_query_param_uint32 (&name_len),
1404 GNUNET_PQ_query_param_string (name),
1405 GNUNET_PQ_query_param_end
1406 };
1407
1408 struct GetStateContext gsc = {
1409 .cb = cb,
1410 .cb_cls = cb_cls,
1411 .value_id = "value_current",
1412 .ret = GNUNET_NO
1413 };
1414
1415 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1416 stmt, params_select,
1417 &get_state_cb, &gsc))
1418 return GNUNET_SYSERR;
1419 return gsc.ret; /* GNUNET_OK ?? */
1420}
1421
1422
1423/**
1424 * Retrieve all signed state variables for a channel.
1425 *
1426 * @see GNUNET_PSYCSTORE_state_get_signed()
1427 *
1428 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1429 */
1430static int
1431state_get_signed (void *cls,
1432 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1433 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1434{
1435 struct Plugin *plugin = cls;
1436
1437 const char *stmt = "select_state_signed";
1438
1439 struct GNUNET_PQ_QueryParam params_select[] = {
1440 GNUNET_PQ_query_param_auto_from_type (channel_key),
1441 GNUNET_PQ_query_param_end
1442 };
1443
1444 struct GetStateContext gsc = {
1445 .cb = cb,
1446 .cb_cls = cb_cls,
1447 .value_id = "value_signed",
1448 .ret = GNUNET_NO
1449 };
1450
1451 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1452 stmt, params_select,
1453 &get_state_cb, &gsc))
1454 return GNUNET_SYSERR;
1455 return gsc.ret; /* GNUNET_OK ?? */
1456}
1457
1458
1459/**
1460 * Entry point for the plugin.
1461 *
1462 * @param cls The struct GNUNET_CONFIGURATION_Handle.
1463 * @return NULL on error, otherwise the plugin context
1464 */
1465void *
1466libgnunet_plugin_psycstore_postgres_init (void *cls)
1467{
1468 static struct Plugin plugin;
1469 const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
1470 struct GNUNET_PSYCSTORE_PluginFunctions *api;
1471
1472 if (NULL != plugin.cfg)
1473 return NULL; /* can only initialize once! */
1474 memset (&plugin, 0, sizeof (struct Plugin));
1475 plugin.cfg = cfg;
1476 if (GNUNET_OK != database_setup (&plugin))
1477 {
1478 database_shutdown (&plugin);
1479 return NULL;
1480 }
1481 api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions);
1482 api->cls = &plugin;
1483 api->membership_store = &postgres_membership_store;
1484 api->membership_test = &membership_test;
1485 api->fragment_store = &fragment_store;
1486 api->message_add_flags = &message_add_flags;
1487 api->fragment_get = &fragment_get;
1488 api->fragment_get_latest = &fragment_get_latest;
1489 api->message_get = &message_get;
1490 api->message_get_latest = &message_get_latest;
1491 api->message_get_fragment = &message_get_fragment;
1492 api->counters_message_get = &counters_message_get;
1493 api->counters_state_get = &counters_state_get;
1494 api->state_modify_begin = &state_modify_begin;
1495 api->state_modify_op = &state_modify_op;
1496 api->state_modify_end = &state_modify_end;
1497 api->state_sync_begin = &state_sync_begin;
1498 api->state_sync_assign = &state_sync_assign;
1499 api->state_sync_end = &state_sync_end;
1500 api->state_reset = &state_reset;
1501 api->state_update_signed = &state_update_signed;
1502 api->state_get = &state_get;
1503 api->state_get_prefix = &state_get_prefix;
1504 api->state_get_signed = &state_get_signed;
1505
1506 LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n"));
1507 return api;
1508}
1509
1510
1511/**
1512 * Exit point from the plugin.
1513 *
1514 * @param cls The plugin context (as returned by "init")
1515 * @return Always NULL
1516 */
1517void *
1518libgnunet_plugin_psycstore_postgres_done (void *cls)
1519{
1520 struct GNUNET_PSYCSTORE_PluginFunctions *api = cls;
1521 struct Plugin *plugin = api->cls;
1522
1523 database_shutdown (plugin);
1524 plugin->cfg = NULL;
1525 GNUNET_free (api);
1526 LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n");
1527 return NULL;
1528}
1529
1530/* end of plugin_psycstore_postgres.c */