diff options
-rw-r--r-- | src/psycstore/Makefile.am | 29 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_postgres.c | 1786 | ||||
-rw-r--r-- | src/psycstore/psycstore.conf.in | 3 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore_postgres.conf | 2 |
4 files changed, 1819 insertions, 1 deletions
diff --git a/src/psycstore/Makefile.am b/src/psycstore/Makefile.am index 4da0a89d8..30e2a9673 100644 --- a/src/psycstore/Makefile.am +++ b/src/psycstore/Makefile.am | |||
@@ -27,6 +27,13 @@ MYSQL_TESTS = test_plugin_psycstore_mysql | |||
27 | endif | 27 | endif |
28 | endif | 28 | endif |
29 | 29 | ||
30 | if HAVE_POSTGRESQL | ||
31 | POSTGRES_PLUGIN = libgnunet_plugin_psycstore_postgres.la | ||
32 | if HAVE_TESTING | ||
33 | POSTGRES_TESTS = test_plugin_psycstore_postgres | ||
34 | endif | ||
35 | endif | ||
36 | |||
30 | if HAVE_SQLITE | 37 | if HAVE_SQLITE |
31 | SQLITE_PLUGIN = libgnunet_plugin_psycstore_sqlite.la | 38 | SQLITE_PLUGIN = libgnunet_plugin_psycstore_sqlite.la |
32 | if HAVE_TESTING | 39 | if HAVE_TESTING |
@@ -61,7 +68,9 @@ gnunet_service_psycstore_LDADD = \ | |||
61 | 68 | ||
62 | plugin_LTLIBRARIES = \ | 69 | plugin_LTLIBRARIES = \ |
63 | $(SQLITE_PLUGIN) \ | 70 | $(SQLITE_PLUGIN) \ |
64 | $(MYSQL_PLUGIN) | 71 | $(MYSQL_PLUGIN) \ |
72 | $(POSTGRES_PLUGIN) | ||
73 | |||
65 | 74 | ||
66 | libgnunet_plugin_psycstore_mysql_la_SOURCES = \ | 75 | libgnunet_plugin_psycstore_mysql_la_SOURCES = \ |
67 | plugin_psycstore_mysql.c | 76 | plugin_psycstore_mysql.c |
@@ -75,6 +84,17 @@ libgnunet_plugin_psycstore_mysql_la_LIBADD = \ | |||
75 | libgnunet_plugin_psycstore_mysql_la_LDFLAGS = \ | 84 | libgnunet_plugin_psycstore_mysql_la_LDFLAGS = \ |
76 | $(GN_PLUGIN_LDFLAGS) | 85 | $(GN_PLUGIN_LDFLAGS) |
77 | 86 | ||
87 | libgnunet_plugin_psycstore_postgres_la_SOURCES = \ | ||
88 | plugin_psycstore_postgres.c | ||
89 | libgnunet_plugin_psycstore_postgres_la_LIBADD = \ | ||
90 | libgnunetpsycstore.la \ | ||
91 | $(top_builddir)/src/postgres/libgnunetpostgres.la \ | ||
92 | $(top_builddir)/src/pq/libgnunetpq.la \ | ||
93 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ | ||
94 | $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) -lpq \ | ||
95 | $(LTLIBINTL) | ||
96 | libgnunet_plugin_psycstore_postgres_la_LDFLAGS = \ | ||
97 | $(GN_PLUGIN_LDFLAGS) $(POSTGRESQL_LDFLAGS) | ||
78 | 98 | ||
79 | libgnunet_plugin_psycstore_sqlite_la_SOURCES = \ | 99 | libgnunet_plugin_psycstore_sqlite_la_SOURCES = \ |
80 | plugin_psycstore_sqlite.c | 100 | plugin_psycstore_sqlite.c |
@@ -92,6 +112,7 @@ if HAVE_TESTING | |||
92 | check_PROGRAMS = \ | 112 | check_PROGRAMS = \ |
93 | $(SQLITE_TESTS) \ | 113 | $(SQLITE_TESTS) \ |
94 | $(MYSQL_TESTS) \ | 114 | $(MYSQL_TESTS) \ |
115 | $(POSTGRES_TESTS) \ | ||
95 | test_psycstore | 116 | test_psycstore |
96 | endif | 117 | endif |
97 | endif | 118 | endif |
@@ -124,3 +145,9 @@ test_plugin_psycstore_mysql_LDADD = \ | |||
124 | $(top_builddir)/src/testing/libgnunettesting.la \ | 145 | $(top_builddir)/src/testing/libgnunettesting.la \ |
125 | $(top_builddir)/src/util/libgnunetutil.la | 146 | $(top_builddir)/src/util/libgnunetutil.la |
126 | 147 | ||
148 | test_plugin_psycstore_postgres_SOURCES = \ | ||
149 | test_plugin_psycstore.c | ||
150 | test_plugin_psycstore_postgres_LDADD = \ | ||
151 | $(top_builddir)/src/testing/libgnunettesting.la \ | ||
152 | $(top_builddir)/src/util/libgnunetutil.la | ||
153 | |||
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c new file mode 100644 index 000000000..8db3c1a61 --- /dev/null +++ b/src/psycstore/plugin_psycstore_postgres.c | |||
@@ -0,0 +1,1786 @@ | |||
1 | /* | ||
2 | * This file is part of GNUnet | ||
3 | * Copyright (C) 2016 GNUnet e.V. | ||
4 | * | ||
5 | * GNUnet is free software; you can redistribute it and/or modify | ||
6 | * it under the terms of the GNU General Public License as published | ||
7 | * by the Free Software Foundation; either version 3, or (at your | ||
8 | * option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU General Public License | ||
16 | * along with GNUnet; see the file COPYING. If not, write to the | ||
17 | * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, | ||
18 | * Boston, MA 02110-1301, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psycstore/plugin_psycstore_postgres.c | ||
23 | * @brief PostgresQL-based psycstore backend | ||
24 | * @author Daniel Golle | ||
25 | * @author Gabor X Toth | ||
26 | * @author Christian Grothoff | ||
27 | * @author Christophe Genevey | ||
28 | */ | ||
29 | |||
30 | #include "platform.h" | ||
31 | #include "gnunet_psycstore_plugin.h" | ||
32 | #include "gnunet_psycstore_service.h" | ||
33 | #include "gnunet_multicast_service.h" | ||
34 | #include "gnunet_crypto_lib.h" | ||
35 | #include "gnunet_psyc_util_lib.h" | ||
36 | #include "psycstore.h" | ||
37 | #include "gnunet_postgres_lib.h" | ||
38 | #include "gnunet_pq_lib.h" | ||
39 | |||
40 | /** | ||
41 | * After how many ms "busy" should a DB operation fail for good? A | ||
42 | * low value makes sure that we are more responsive to requests | ||
43 | * (especially PUTs). A high value guarantees a higher success rate | ||
44 | * (SELECTs in iterate can take several seconds despite LIMIT=1). | ||
45 | * | ||
46 | * The default value of 1s should ensure that users do not experience | ||
47 | * huge latencies while at the same time allowing operations to | ||
48 | * succeed with reasonable probability. | ||
49 | */ | ||
50 | #define BUSY_TIMEOUT_MS 1000 | ||
51 | |||
52 | #define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING | ||
53 | |||
54 | #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__) | ||
55 | |||
56 | enum Transactions { | ||
57 | TRANSACTION_NONE = 0, | ||
58 | TRANSACTION_STATE_MODIFY, | ||
59 | TRANSACTION_STATE_SYNC, | ||
60 | }; | ||
61 | |||
62 | /** | ||
63 | * Context for all functions in this plugin. | ||
64 | */ | ||
65 | struct Plugin | ||
66 | { | ||
67 | |||
68 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
69 | |||
70 | /** | ||
71 | * Native Postgres database handle. | ||
72 | */ | ||
73 | PGconn *dbh; | ||
74 | |||
75 | enum Transactions transaction; | ||
76 | |||
77 | void *cls; | ||
78 | }; | ||
79 | |||
80 | |||
81 | /** | ||
82 | * Initialize the database connections and associated | ||
83 | * data structures (create tables and indices | ||
84 | * as needed as well). | ||
85 | * | ||
86 | * @param plugin the plugin context (state for this module) | ||
87 | * @return GNUNET_OK on success | ||
88 | */ | ||
89 | static int | ||
90 | database_setup (struct Plugin *plugin) | ||
91 | { | ||
92 | /* Open database and precompile statements */ | ||
93 | plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg, | ||
94 | "psycstore-postgres"); | ||
95 | if (NULL == plugin->dbh) | ||
96 | return GNUNET_SYSERR; | ||
97 | |||
98 | /* Create tables */ | ||
99 | if ((GNUNET_OK != | ||
100 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
101 | "CREATE TABLE IF NOT EXISTS channels (\n" | ||
102 | " id SERIAL,\n" | ||
103 | " pub_key BYTEA,\n" | ||
104 | " max_state_message_id INT,\n" | ||
105 | " state_hash_message_id INT,\n" | ||
106 | " PRIMARY KEY(id)\n" | ||
107 | ")" "WITH OIDS")) || | ||
108 | |||
109 | (GNUNET_OK != | ||
110 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
111 | "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" | ||
112 | " ON channels (substring(pub_key from 1 for 5)\n" | ||
113 | ")")) || | ||
114 | |||
115 | (GNUNET_OK != | ||
116 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
117 | "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" | ||
118 | " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
119 | "RETURNS NULL ON NULL INPUT")) || | ||
120 | |||
121 | (GNUNET_OK != | ||
122 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
123 | "CREATE TABLE IF NOT EXISTS slaves (\n" | ||
124 | " id SERIAL,\n" | ||
125 | " pub_key BYTEA,\n" | ||
126 | " PRIMARY KEY(id)\n" | ||
127 | ")" "WITH OIDS")) || | ||
128 | |||
129 | (GNUNET_OK != | ||
130 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
131 | "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" | ||
132 | " ON slaves (substring(pub_key from 1 for 5)\n" | ||
133 | ")")) || | ||
134 | |||
135 | (GNUNET_OK != | ||
136 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
137 | "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" | ||
138 | " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
139 | "RETURNS NULL ON NULL INPUT")) || | ||
140 | |||
141 | (GNUNET_OK != | ||
142 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
143 | "CREATE TABLE IF NOT EXISTS membership (\n" | ||
144 | " channel_id INT NOT NULL REFERENCES channels(id),\n" | ||
145 | " slave_id INT NOT NULL REFERENCES slaves(id),\n" | ||
146 | " did_join INT NOT NULL,\n" | ||
147 | " announced_at BIGINT NOT NULL,\n" | ||
148 | " effective_since BIGINT NOT NULL,\n" | ||
149 | " group_generation BIGINT NOT NULL\n" | ||
150 | ")" "WITH OIDS")) || | ||
151 | |||
152 | (GNUNET_OK != | ||
153 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
154 | "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " | ||
155 | "ON membership (channel_id, slave_id)")) || | ||
156 | |||
157 | /** @todo messages table: add method_name column */ | ||
158 | (GNUNET_OK != | ||
159 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
160 | "CREATE TABLE IF NOT EXISTS messages (\n" | ||
161 | " channel_id INT NOT NULL REFERENCES channels(id),\n" | ||
162 | " hop_counter BIGINT NOT NULL,\n" | ||
163 | " signature BYTEA,\n" | ||
164 | " purpose BYTEA,\n" | ||
165 | " fragment_id BIGINT NOT NULL,\n" | ||
166 | " fragment_offset BIGINT NOT NULL,\n" | ||
167 | " message_id BIGINT NOT NULL,\n" | ||
168 | " group_generation BIGINT NOT NULL,\n" | ||
169 | " multicast_flags BIGINT NOT NULL,\n" | ||
170 | " psycstore_flags BIGINT NOT NULL,\n" | ||
171 | " data BYTEA,\n" | ||
172 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
173 | " UNIQUE (channel_id, message_id, fragment_offset)\n" | ||
174 | ")" "WITH OIDS")) || | ||
175 | |||
176 | (GNUNET_OK != | ||
177 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
178 | "CREATE TABLE IF NOT EXISTS state (\n" | ||
179 | " channel_id INT NOT NULL REFERENCES channels(id),\n" | ||
180 | " name TEXT NOT NULL,\n" | ||
181 | " value_current BYTEA,\n" | ||
182 | " value_signed BYTEA\n" | ||
183 | ")" "WITH OIDS")) || | ||
184 | |||
185 | (GNUNET_OK != | ||
186 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
187 | "CREATE UNIQUE INDEX IF NOT EXISTS state_uniq_idx \n" | ||
188 | " ON state (channel_id, substring(name from 1 for 5)\n" | ||
189 | ")")) || | ||
190 | |||
191 | (GNUNET_OK != | ||
192 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
193 | "CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
194 | " channel_id INT NOT NULL REFERENCES channels(id),\n" | ||
195 | " name TEXT NOT NULL,\n" | ||
196 | " value BYTEA,\n" | ||
197 | " PRIMARY KEY (channel_id)\n" | ||
198 | ")" "WITH OIDS")) || | ||
199 | |||
200 | (GNUNET_OK != | ||
201 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
202 | "CREATE UNIQUE INDEX IF NOT EXISTS state_sync_name_idx \n" | ||
203 | " ON state_sync (substring(name from 1 for 5)\n" | ||
204 | ")"))) | ||
205 | { | ||
206 | PQfinish (plugin->dbh); | ||
207 | plugin->dbh = NULL; | ||
208 | return GNUNET_SYSERR; | ||
209 | } | ||
210 | |||
211 | |||
212 | /* Prepare statements */ | ||
213 | if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
214 | "transaction_begin", | ||
215 | "BEGIN", 0)) || | ||
216 | |||
217 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
218 | "transaction_commit", | ||
219 | "COMMIT", 0)) || | ||
220 | |||
221 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
222 | "transaction_rollback", | ||
223 | "ROLLBACK", 0)) || | ||
224 | |||
225 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
226 | "insert_channel_key", | ||
227 | "INSERT INTO channels (pub_key) VALUES ($1)" | ||
228 | " ON CONFLICT DO NOTHING", 1)) || | ||
229 | |||
230 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
231 | "insert_slave_key", | ||
232 | "INSERT INTO slaves (pub_key) VALUES ($1)" | ||
233 | " ON CONFLICT DO NOTHING", 1)) || | ||
234 | |||
235 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
236 | "insert_membership", | ||
237 | "INSERT INTO membership\n" | ||
238 | " (channel_id, slave_id, did_join, announced_at,\n" | ||
239 | " effective_since, group_generation)\n" | ||
240 | "VALUES (get_chan_id($1),\n" | ||
241 | " get_slave_id($2),\n" | ||
242 | " $3, $4, $5, $6)", 6)) || | ||
243 | |||
244 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
245 | "select_membership", | ||
246 | "SELECT did_join FROM membership\n" | ||
247 | "WHERE channel_id = get_chan_id($1)\n" | ||
248 | " AND slave_id = get_slave_id($2)\n" | ||
249 | " AND effective_since <= $3 AND did_join = 1\n" | ||
250 | "ORDER BY announced_at DESC LIMIT 1", 3)) || | ||
251 | |||
252 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
253 | "insert_fragment", | ||
254 | "INSERT INTO messages\n" | ||
255 | " (channel_id, hop_counter, signature, purpose,\n" | ||
256 | " fragment_id, fragment_offset, message_id,\n" | ||
257 | " group_generation, multicast_flags, psycstore_flags, data)\n" | ||
258 | "VALUES (get_chan_id($1),\n" | ||
259 | " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" | ||
260 | "ON CONFLICT DO NOTHING", 11)) || | ||
261 | |||
262 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
263 | "update_message_flags", | ||
264 | "UPDATE messages\n" | ||
265 | "SET psycstore_flags = psycstore_flags | $1\n" | ||
266 | "WHERE channel_id = get_chan_id($2) \n" | ||
267 | " AND message_id = $3 AND fragment_offset = 0", 3)) || | ||
268 | |||
269 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
270 | "select_fragments", | ||
271 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
272 | " fragment_offset, message_id, group_generation,\n" | ||
273 | " multicast_flags, psycstore_flags, data\n" | ||
274 | "FROM messages\n" | ||
275 | "WHERE channel_id = get_chan_id($1) \n" | ||
276 | " AND $2 <= fragment_id AND fragment_id <= $3", 3)) || | ||
277 | |||
278 | /** @todo select_messages: add method_prefix filter */ | ||
279 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
280 | "select_messages", | ||
281 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
282 | " fragment_offset, message_id, group_generation,\n" | ||
283 | " multicast_flags, psycstore_flags, data\n" | ||
284 | "FROM messages\n" | ||
285 | "WHERE channel_id = get_chan_id($1) \n" | ||
286 | " AND $2 <= message_id AND message_id <= $3" | ||
287 | "LIMIT $4;", 4)) || | ||
288 | |||
289 | /** @todo select_latest_messages: add method_prefix filter */ | ||
290 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
291 | "select_latest_fragments", | ||
292 | "SELECT rev.hop_counter AS hop_counter,\n" | ||
293 | " rev.signature AS signature,\n" | ||
294 | " rev.purpose AS purpose,\n" | ||
295 | " rev.fragment_id AS fragment_id,\n" | ||
296 | " rev.fragment_offset AS fragment_offset,\n" | ||
297 | " rev.message_id AS message_id,\n" | ||
298 | " rev.group_generation AS group_generation,\n" | ||
299 | " rev.multicast_flags AS multicast_flags,\n" | ||
300 | " rev.psycstore_flags AS psycstore_flags,\n" | ||
301 | " rev.data AS data\n" | ||
302 | " FROM\n" | ||
303 | " (SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
304 | " fragment_offset, message_id, group_generation,\n" | ||
305 | " multicast_flags, psycstore_flags, data \n" | ||
306 | " FROM messages\n" | ||
307 | " WHERE channel_id = get_chan_id($1) \n" | ||
308 | " ORDER BY fragment_id DESC\n" | ||
309 | " LIMIT $2) AS rev\n" | ||
310 | " ORDER BY rev.fragment_id;", 2)) || | ||
311 | |||
312 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
313 | "select_latest_messages", | ||
314 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
315 | " fragment_offset, message_id, group_generation,\n" | ||
316 | " multicast_flags, psycstore_flags, data\n" | ||
317 | "FROM messages\n" | ||
318 | "WHERE channel_id = get_chan_id($1)\n" | ||
319 | " AND message_id IN\n" | ||
320 | " (SELECT message_id\n" | ||
321 | " FROM messages\n" | ||
322 | " WHERE channel_id = get_chan_id($2) \n" | ||
323 | " GROUP BY message_id\n" | ||
324 | " ORDER BY message_id\n" | ||
325 | " DESC LIMIT $3)\n" | ||
326 | "ORDER BY fragment_id", 3)) || | ||
327 | |||
328 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
329 | "select_message_fragment", | ||
330 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
331 | " fragment_offset, message_id, group_generation,\n" | ||
332 | " multicast_flags, psycstore_flags, data\n" | ||
333 | "FROM messages\n" | ||
334 | "WHERE channel_id = get_chan_id($1) \n" | ||
335 | " AND message_id = $2 AND fragment_offset = $3", 3)) || | ||
336 | |||
337 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
338 | "select_counters_message", | ||
339 | "SELECT fragment_id, message_id, group_generation\n" | ||
340 | "FROM messages\n" | ||
341 | "WHERE channel_id = get_chan_id($1)\n" | ||
342 | "ORDER BY fragment_id DESC LIMIT 1", 1)) || | ||
343 | |||
344 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
345 | "select_counters_state", | ||
346 | "SELECT max_state_message_id\n" | ||
347 | "FROM channels\n" | ||
348 | "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) || | ||
349 | |||
350 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
351 | "update_max_state_message_id", | ||
352 | "UPDATE channels\n" | ||
353 | "SET max_state_message_id = $1\n" | ||
354 | "WHERE pub_key = $2", 2)) || | ||
355 | |||
356 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
357 | "update_state_hash_message_id", | ||
358 | "UPDATE channels\n" | ||
359 | "SET state_hash_message_id = $1\n" | ||
360 | "WHERE pub_key = $2", 2)) || | ||
361 | |||
362 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
363 | "insert_state_current", | ||
364 | "INSERT INTO state\n" | ||
365 | " (channel_id, name, value_current, value_signed)\n" | ||
366 | "SELECT new.channel_id, new.name,\n" | ||
367 | " new.value_current, old.value_signed\n" | ||
368 | "FROM (SELECT get_chan_id($1) AS channel_id,\n" | ||
369 | " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" | ||
370 | "LEFT JOIN (SELECT channel_id, name, value_signed\n" | ||
371 | " FROM state) AS old\n" | ||
372 | "ON new.channel_id = old.channel_id AND new.name = old.name\n" | ||
373 | "ON CONFLICT ( channel_id, substring(name from 1 for 5) )\n" | ||
374 | " DO UPDATE SET value_current = EXCLUDED.value_current,\n" | ||
375 | " value_signed = EXCLUDED.value_signed", 3)) || | ||
376 | |||
377 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
378 | "delete_state_empty", | ||
379 | "DELETE FROM state\n" | ||
380 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" | ||
381 | " AND (value_current IS NULL OR length(value_current) = 0)\n" | ||
382 | " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) || | ||
383 | |||
384 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
385 | "update_state_signed", | ||
386 | "UPDATE state\n" | ||
387 | "SET value_signed = value_current\n" | ||
388 | "WHERE channel_id = get_chan_id($1) ", 1)) || | ||
389 | |||
390 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
391 | "delete_state", | ||
392 | "DELETE FROM state\n" | ||
393 | "WHERE channel_id = get_chan_id($1) ", 1)) || | ||
394 | |||
395 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
396 | "insert_state_sync", | ||
397 | "INSERT INTO state_sync (channel_id, name, value)\n" | ||
398 | "VALUES (get_chan_id($1), $2, $3)", 3)) || | ||
399 | |||
400 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
401 | "insert_state_from_sync", | ||
402 | "INSERT INTO state\n" | ||
403 | " (channel_id, name, value_current, value_signed)\n" | ||
404 | "SELECT channel_id, name, value, value\n" | ||
405 | "FROM state_sync\n" | ||
406 | "WHERE channel_id = get_chan_id($1)", 1)) || | ||
407 | |||
408 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
409 | "delete_state_sync", | ||
410 | "DELETE FROM state_sync\n" | ||
411 | "WHERE channel_id = get_chan_id($1)", 1)) || | ||
412 | |||
413 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
414 | "select_state_one", | ||
415 | "SELECT value_current\n" | ||
416 | "FROM state\n" | ||
417 | "WHERE channel_id = get_chan_id($1)\n" | ||
418 | " AND name = $2", 2)) || | ||
419 | |||
420 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
421 | "select_state_prefix", | ||
422 | "SELECT name, value_current\n" | ||
423 | "FROM state\n" | ||
424 | "WHERE channel_id = get_chan_id($1)\n" | ||
425 | " AND (name = $2 OR substr(name, 1, $3) = $4 || '_')", 4)) || | ||
426 | |||
427 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
428 | "select_state_signed", | ||
429 | "SELECT name, value_signed\n" | ||
430 | "FROM state\n" | ||
431 | "WHERE channel_id = get_chan_id($1)\n" | ||
432 | " AND value_signed IS NOT NULL", 1))) | ||
433 | { | ||
434 | PQfinish (plugin->dbh); | ||
435 | plugin->dbh = NULL; | ||
436 | return GNUNET_SYSERR; | ||
437 | } | ||
438 | |||
439 | return GNUNET_OK; | ||
440 | } | ||
441 | |||
442 | |||
443 | /** | ||
444 | * Shutdown database connection and associate data | ||
445 | * structures. | ||
446 | * @param plugin the plugin context (state for this module) | ||
447 | */ | ||
448 | static void | ||
449 | database_shutdown (struct Plugin *plugin) | ||
450 | { | ||
451 | PQfinish (plugin->dbh); | ||
452 | plugin->dbh = NULL; | ||
453 | } | ||
454 | |||
455 | |||
456 | /** | ||
457 | * Execute a prepared statement with a @a channel_key argument. | ||
458 | * | ||
459 | * @param plugin Plugin handle. | ||
460 | * @param stmt Statement to execute. | ||
461 | * @param channel_key Public key of the channel. | ||
462 | * | ||
463 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
464 | */ | ||
465 | static int | ||
466 | exec_channel (struct Plugin *plugin, const char *stmt, | ||
467 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
468 | { | ||
469 | PGresult *ret; | ||
470 | struct GNUNET_PQ_QueryParam params[] = { | ||
471 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
472 | GNUNET_PQ_query_param_end | ||
473 | }; | ||
474 | |||
475 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | ||
476 | if (GNUNET_OK != | ||
477 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
478 | ret, | ||
479 | PGRES_COMMAND_OK, | ||
480 | "PQexecPrepared", stmt)) | ||
481 | return GNUNET_SYSERR; | ||
482 | |||
483 | PQclear (ret); | ||
484 | |||
485 | return GNUNET_OK; | ||
486 | } | ||
487 | |||
488 | |||
489 | /** | ||
490 | * Begin a transaction. | ||
491 | */ | ||
492 | static int | ||
493 | transaction_begin (struct Plugin *plugin, enum Transactions transaction) | ||
494 | { | ||
495 | PGresult *ret; | ||
496 | struct GNUNET_PQ_QueryParam params[] = { | ||
497 | GNUNET_PQ_query_param_end | ||
498 | }; | ||
499 | |||
500 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params); | ||
501 | if (GNUNET_OK != | ||
502 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
503 | ret, | ||
504 | PGRES_COMMAND_OK, | ||
505 | "PQexecPrepared", "transaction_begin")) | ||
506 | { | ||
507 | return GNUNET_SYSERR; | ||
508 | } | ||
509 | |||
510 | plugin->transaction = transaction; | ||
511 | PQclear (ret); | ||
512 | return GNUNET_OK; | ||
513 | } | ||
514 | |||
515 | |||
516 | /** | ||
517 | * Commit current transaction. | ||
518 | */ | ||
519 | static int | ||
520 | transaction_commit (struct Plugin *plugin) | ||
521 | { | ||
522 | PGresult *ret; | ||
523 | |||
524 | struct GNUNET_PQ_QueryParam params[] = { | ||
525 | GNUNET_PQ_query_param_end | ||
526 | }; | ||
527 | |||
528 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params); | ||
529 | if (GNUNET_OK != | ||
530 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
531 | ret, | ||
532 | PGRES_COMMAND_OK, | ||
533 | "PQexecPrepared", "transaction_commit")) | ||
534 | { | ||
535 | return GNUNET_SYSERR; | ||
536 | } | ||
537 | |||
538 | PQclear (ret); | ||
539 | plugin->transaction = TRANSACTION_NONE; | ||
540 | return GNUNET_OK; | ||
541 | } | ||
542 | |||
543 | |||
544 | /** | ||
545 | * Roll back current transaction. | ||
546 | */ | ||
547 | static int | ||
548 | transaction_rollback (struct Plugin *plugin) | ||
549 | { | ||
550 | PGresult *ret; | ||
551 | |||
552 | struct GNUNET_PQ_QueryParam params[] = { | ||
553 | GNUNET_PQ_query_param_end | ||
554 | }; | ||
555 | |||
556 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params); | ||
557 | if (GNUNET_OK != | ||
558 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
559 | ret, | ||
560 | PGRES_COMMAND_OK, | ||
561 | "PQexecPrepared", "transaction_rollback")) | ||
562 | { | ||
563 | return GNUNET_SYSERR; | ||
564 | } | ||
565 | |||
566 | PQclear (ret); | ||
567 | plugin->transaction = TRANSACTION_NONE; | ||
568 | return GNUNET_OK; | ||
569 | } | ||
570 | |||
571 | |||
572 | static int | ||
573 | channel_key_store (struct Plugin *plugin, | ||
574 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
575 | { | ||
576 | PGresult *ret; | ||
577 | |||
578 | struct GNUNET_PQ_QueryParam params[] = { | ||
579 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
580 | GNUNET_PQ_query_param_end | ||
581 | }; | ||
582 | |||
583 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params); | ||
584 | if (GNUNET_OK != | ||
585 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
586 | ret, | ||
587 | PGRES_COMMAND_OK, | ||
588 | "PQexecPrepared", "insert_channel_key")) | ||
589 | { | ||
590 | return GNUNET_SYSERR; | ||
591 | } | ||
592 | |||
593 | PQclear (ret); | ||
594 | return GNUNET_OK; | ||
595 | } | ||
596 | |||
597 | |||
598 | static int | ||
599 | slave_key_store (struct Plugin *plugin, | ||
600 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) | ||
601 | { | ||
602 | PGresult *ret; | ||
603 | |||
604 | struct GNUNET_PQ_QueryParam params[] = { | ||
605 | GNUNET_PQ_query_param_auto_from_type (slave_key), | ||
606 | GNUNET_PQ_query_param_end | ||
607 | }; | ||
608 | |||
609 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params); | ||
610 | if (GNUNET_OK != | ||
611 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
612 | ret, | ||
613 | PGRES_COMMAND_OK, | ||
614 | "PQexecPrepared", "insert_slave_key")) | ||
615 | { | ||
616 | return GNUNET_SYSERR; | ||
617 | } | ||
618 | |||
619 | PQclear (ret); | ||
620 | return GNUNET_OK; | ||
621 | } | ||
622 | |||
623 | |||
624 | /** | ||
625 | * Store join/leave events for a PSYC channel in order to be able to answer | ||
626 | * membership test queries later. | ||
627 | * | ||
628 | * @see GNUNET_PSYCSTORE_membership_store() | ||
629 | * | ||
630 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
631 | */ | ||
632 | static int | ||
633 | postgres_membership_store (void *cls, | ||
634 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
635 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
636 | int did_join, | ||
637 | uint64_t announced_at, | ||
638 | uint64_t effective_since, | ||
639 | uint64_t group_generation) | ||
640 | { | ||
641 | PGresult *ret; | ||
642 | struct Plugin *plugin = cls; | ||
643 | |||
644 | uint32_t idid_join = (uint32_t)did_join; | ||
645 | |||
646 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); | ||
647 | |||
648 | if (announced_at > INT64_MAX || | ||
649 | effective_since > INT64_MAX || | ||
650 | group_generation > INT64_MAX) | ||
651 | { | ||
652 | GNUNET_break (0); | ||
653 | return GNUNET_SYSERR; | ||
654 | } | ||
655 | |||
656 | if (GNUNET_OK != channel_key_store (plugin, channel_key) | ||
657 | || GNUNET_OK != slave_key_store (plugin, slave_key)) | ||
658 | return GNUNET_SYSERR; | ||
659 | |||
660 | struct GNUNET_PQ_QueryParam params[] = { | ||
661 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
662 | GNUNET_PQ_query_param_auto_from_type (slave_key), | ||
663 | GNUNET_PQ_query_param_uint32 (&idid_join), | ||
664 | GNUNET_PQ_query_param_uint64 (&announced_at), | ||
665 | GNUNET_PQ_query_param_uint64 (&effective_since), | ||
666 | GNUNET_PQ_query_param_uint64 (&group_generation), | ||
667 | GNUNET_PQ_query_param_end | ||
668 | }; | ||
669 | |||
670 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params); | ||
671 | if (GNUNET_OK != | ||
672 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
673 | ret, | ||
674 | PGRES_COMMAND_OK, | ||
675 | "PQexecPrepared", "insert_slave_key")) | ||
676 | { | ||
677 | return GNUNET_SYSERR; | ||
678 | } | ||
679 | |||
680 | PQclear (ret); | ||
681 | return GNUNET_OK; | ||
682 | } | ||
683 | |||
684 | /** | ||
685 | * Test if a member was admitted to the channel at the given message ID. | ||
686 | * | ||
687 | * @see GNUNET_PSYCSTORE_membership_test() | ||
688 | * | ||
689 | * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not, | ||
690 | * #GNUNET_SYSERR if there was en error. | ||
691 | */ | ||
692 | static int | ||
693 | membership_test (void *cls, | ||
694 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
695 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
696 | uint64_t message_id) | ||
697 | { | ||
698 | PGresult *res; | ||
699 | struct Plugin *plugin = cls; | ||
700 | |||
701 | uint32_t did_join = 0; | ||
702 | |||
703 | int ret = GNUNET_SYSERR; | ||
704 | |||
705 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
706 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
707 | GNUNET_PQ_query_param_auto_from_type (slave_key), | ||
708 | GNUNET_PQ_query_param_uint64 (&message_id), | ||
709 | GNUNET_PQ_query_param_end | ||
710 | }; | ||
711 | |||
712 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select); | ||
713 | if (GNUNET_OK != | ||
714 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
715 | res, | ||
716 | PGRES_COMMAND_OK, | ||
717 | "PQexecPrepared", "select_membership")) | ||
718 | { | ||
719 | return GNUNET_SYSERR; | ||
720 | } | ||
721 | |||
722 | struct GNUNET_PQ_ResultSpec results_select[] = { | ||
723 | GNUNET_PQ_result_spec_uint32 ("did_join", &did_join), | ||
724 | GNUNET_PQ_result_spec_end | ||
725 | }; | ||
726 | |||
727 | switch(GNUNET_PQ_extract_result (res, results_select, 0)) | ||
728 | { | ||
729 | case GNUNET_OK: | ||
730 | ret = GNUNET_YES; | ||
731 | break; | ||
732 | default: | ||
733 | ret = GNUNET_NO; | ||
734 | break; | ||
735 | } | ||
736 | |||
737 | PQclear (res); | ||
738 | |||
739 | return ret; | ||
740 | } | ||
741 | |||
742 | /** | ||
743 | * Store a message fragment sent to a channel. | ||
744 | * | ||
745 | * @see GNUNET_PSYCSTORE_fragment_store() | ||
746 | * | ||
747 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
748 | */ | ||
749 | static int | ||
750 | fragment_store (void *cls, | ||
751 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
752 | const struct GNUNET_MULTICAST_MessageHeader *msg, | ||
753 | uint32_t psycstore_flags) | ||
754 | { | ||
755 | PGresult *res; | ||
756 | struct Plugin *plugin = cls; | ||
757 | |||
758 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); | ||
759 | |||
760 | uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id); | ||
761 | |||
762 | uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset); | ||
763 | uint64_t message_id = GNUNET_ntohll (msg->message_id); | ||
764 | uint64_t group_generation = GNUNET_ntohll (msg->group_generation); | ||
765 | |||
766 | uint64_t hop_counter = ntohl(msg->hop_counter); | ||
767 | uint64_t flags = ntohl(msg->flags); | ||
768 | |||
769 | if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || | ||
770 | message_id > INT64_MAX || group_generation > INT64_MAX) | ||
771 | { | ||
772 | LOG(GNUNET_ERROR_TYPE_ERROR, | ||
773 | "Tried to store fragment with a field > INT64_MAX: " | ||
774 | "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset, | ||
775 | message_id, group_generation); | ||
776 | GNUNET_break (0); | ||
777 | return GNUNET_SYSERR; | ||
778 | } | ||
779 | |||
780 | if (GNUNET_OK != channel_key_store (plugin, channel_key)) | ||
781 | return GNUNET_SYSERR; | ||
782 | |||
783 | struct GNUNET_PQ_QueryParam params_insert[] = { | ||
784 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
785 | GNUNET_PQ_query_param_uint64 (&hop_counter), | ||
786 | GNUNET_PQ_query_param_auto_from_type (&msg->signature), | ||
787 | GNUNET_PQ_query_param_auto_from_type (&msg->purpose), | ||
788 | GNUNET_PQ_query_param_uint64 (&fragment_id), | ||
789 | GNUNET_PQ_query_param_uint64 (&fragment_offset), | ||
790 | GNUNET_PQ_query_param_uint64 (&message_id), | ||
791 | GNUNET_PQ_query_param_uint64 (&group_generation), | ||
792 | GNUNET_PQ_query_param_uint64 (&flags), | ||
793 | GNUNET_PQ_query_param_uint32 (&psycstore_flags), | ||
794 | GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) | ||
795 | - sizeof (*msg)), | ||
796 | GNUNET_PQ_query_param_end | ||
797 | }; | ||
798 | |||
799 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert); | ||
800 | if (GNUNET_OK != | ||
801 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
802 | res, | ||
803 | PGRES_COMMAND_OK, | ||
804 | "PQexecPrepared", "insert_fragment")) | ||
805 | return GNUNET_SYSERR; | ||
806 | |||
807 | PQclear (res); | ||
808 | return GNUNET_OK; | ||
809 | } | ||
810 | |||
811 | /** | ||
812 | * Set additional flags for a given message. | ||
813 | * | ||
814 | * They are OR'd with any existing flags set. | ||
815 | * | ||
816 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
817 | */ | ||
818 | static int | ||
819 | message_add_flags (void *cls, | ||
820 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
821 | uint64_t message_id, | ||
822 | uint64_t psycstore_flags) | ||
823 | { | ||
824 | PGresult *res; | ||
825 | struct Plugin *plugin = cls; | ||
826 | |||
827 | int ret = GNUNET_SYSERR; | ||
828 | |||
829 | struct GNUNET_PQ_QueryParam params_update[] = { | ||
830 | GNUNET_PQ_query_param_uint64 (&psycstore_flags), | ||
831 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
832 | GNUNET_PQ_query_param_uint64 (&message_id), | ||
833 | GNUNET_PQ_query_param_end | ||
834 | }; | ||
835 | |||
836 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update); | ||
837 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
838 | res, | ||
839 | PGRES_COMMAND_OK, | ||
840 | "PQexecPrepared", "update_message_flags")) | ||
841 | return ret; | ||
842 | |||
843 | PQclear (res); | ||
844 | return ret; | ||
845 | } | ||
846 | |||
847 | |||
848 | static int | ||
849 | fragment_row (struct Plugin *plugin, | ||
850 | const char *stmt, | ||
851 | PGresult *res, | ||
852 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
853 | void *cb_cls) | ||
854 | { | ||
855 | uint32_t hop_counter; | ||
856 | void *signature = NULL; | ||
857 | void *purpose = NULL; | ||
858 | size_t signature_size; | ||
859 | size_t purpose_size; | ||
860 | |||
861 | uint64_t fragment_id; | ||
862 | uint64_t fragment_offset; | ||
863 | uint64_t message_id; | ||
864 | uint64_t group_generation; | ||
865 | uint64_t flags; | ||
866 | void *buf; | ||
867 | size_t buf_size; | ||
868 | int ret = GNUNET_SYSERR; | ||
869 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
870 | |||
871 | uint64_t msg_flags; | ||
872 | unsigned int cnt; | ||
873 | |||
874 | struct GNUNET_PQ_ResultSpec results[] = { | ||
875 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | ||
876 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | ||
877 | GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), | ||
878 | GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), | ||
879 | GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), | ||
880 | GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), | ||
881 | GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), | ||
882 | GNUNET_PQ_result_spec_uint64 ("msg_flags", &msg_flags), | ||
883 | GNUNET_PQ_result_spec_uint64 ("flags", &flags), | ||
884 | GNUNET_PQ_result_spec_variable_size ("data", &buf, | ||
885 | &buf_size), | ||
886 | GNUNET_PQ_result_spec_end | ||
887 | }; | ||
888 | |||
889 | if (GNUNET_OK != | ||
890 | GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK, | ||
891 | "PQexecPrepared", | ||
892 | stmt)) | ||
893 | { | ||
894 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
895 | "Failing fragment lookup (postgres error)\n"); | ||
896 | return GNUNET_SYSERR; | ||
897 | } | ||
898 | |||
899 | cnt = PQntuples (res); | ||
900 | if (cnt == 0) | ||
901 | { | ||
902 | ret = GNUNET_NO; | ||
903 | } | ||
904 | else | ||
905 | { | ||
906 | if (GNUNET_OK != GNUNET_PQ_extract_result(res, results, 0)) { | ||
907 | PQclear (res); | ||
908 | return GNUNET_SYSERR; | ||
909 | } | ||
910 | |||
911 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); | ||
912 | |||
913 | mp->header.size = htons (sizeof (*mp) + buf_size); | ||
914 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | ||
915 | mp->hop_counter = htonl (hop_counter); | ||
916 | GNUNET_memcpy (&mp->signature, | ||
917 | signature, | ||
918 | signature_size); | ||
919 | GNUNET_memcpy (&mp->purpose, | ||
920 | purpose, | ||
921 | purpose_size); | ||
922 | mp->fragment_id = GNUNET_htonll (fragment_id); | ||
923 | mp->fragment_offset = GNUNET_htonll (fragment_offset); | ||
924 | mp->message_id = GNUNET_htonll (message_id); | ||
925 | mp->group_generation = GNUNET_htonll (group_generation); | ||
926 | mp->flags = htonl(msg_flags); | ||
927 | |||
928 | GNUNET_memcpy (&mp[1], | ||
929 | buf, | ||
930 | buf_size); | ||
931 | GNUNET_PQ_cleanup_result(results); | ||
932 | ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); | ||
933 | } | ||
934 | |||
935 | PQclear (res); | ||
936 | return ret; | ||
937 | } | ||
938 | |||
939 | |||
940 | static int | ||
941 | fragment_select (struct Plugin *plugin, const char *stmt, | ||
942 | struct GNUNET_PQ_QueryParam *params, | ||
943 | uint64_t *returned_fragments, | ||
944 | GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls) | ||
945 | { | ||
946 | PGresult *res; | ||
947 | int ret = GNUNET_SYSERR; | ||
948 | |||
949 | // FIXME | ||
950 | if (NULL == plugin->dbh || NULL == stmt || NULL == params) | ||
951 | { | ||
952 | fprintf(stderr, "%p %p %p\n", plugin->dbh, stmt, params); | ||
953 | return GNUNET_SYSERR; | ||
954 | } | ||
955 | |||
956 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | ||
957 | if (GNUNET_YES == | ||
958 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
959 | res, | ||
960 | PGRES_COMMAND_OK, | ||
961 | "PQexecPrepared", stmt)) | ||
962 | { | ||
963 | if (PQntuples (res) == 0) | ||
964 | ret = GNUNET_NO; | ||
965 | else | ||
966 | { | ||
967 | ret = fragment_row (plugin, stmt, res, cb, cb_cls); | ||
968 | (*returned_fragments)++; | ||
969 | } | ||
970 | PQclear (res); | ||
971 | } | ||
972 | |||
973 | return ret; | ||
974 | } | ||
975 | |||
976 | /** | ||
977 | * Retrieve a message fragment range by fragment ID. | ||
978 | * | ||
979 | * @see GNUNET_PSYCSTORE_fragment_get() | ||
980 | * | ||
981 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
982 | */ | ||
983 | static int | ||
984 | fragment_get (void *cls, | ||
985 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
986 | uint64_t first_fragment_id, | ||
987 | uint64_t last_fragment_id, | ||
988 | uint64_t *returned_fragments, | ||
989 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
990 | void *cb_cls) | ||
991 | { | ||
992 | struct Plugin *plugin = cls; | ||
993 | *returned_fragments = 0; | ||
994 | |||
995 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
996 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
997 | GNUNET_PQ_query_param_uint64 (&first_fragment_id), | ||
998 | GNUNET_PQ_query_param_uint64 (&last_fragment_id), | ||
999 | GNUNET_PQ_query_param_end | ||
1000 | }; | ||
1001 | |||
1002 | return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls); | ||
1003 | } | ||
1004 | |||
1005 | |||
1006 | /** | ||
1007 | * Retrieve a message fragment range by fragment ID. | ||
1008 | * | ||
1009 | * @see GNUNET_PSYCSTORE_fragment_get_latest() | ||
1010 | * | ||
1011 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1012 | */ | ||
1013 | static int | ||
1014 | fragment_get_latest (void *cls, | ||
1015 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1016 | uint64_t fragment_limit, | ||
1017 | uint64_t *returned_fragments, | ||
1018 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1019 | void *cb_cls) | ||
1020 | { | ||
1021 | struct Plugin *plugin = cls; | ||
1022 | |||
1023 | *returned_fragments = 0; | ||
1024 | |||
1025 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1026 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1027 | GNUNET_PQ_query_param_uint64 (&fragment_limit), | ||
1028 | GNUNET_PQ_query_param_end | ||
1029 | }; | ||
1030 | |||
1031 | return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls); | ||
1032 | } | ||
1033 | |||
1034 | |||
1035 | /** | ||
1036 | * Retrieve all fragments of a message ID range. | ||
1037 | * | ||
1038 | * @see GNUNET_PSYCSTORE_message_get() | ||
1039 | * | ||
1040 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1041 | */ | ||
1042 | static int | ||
1043 | message_get (void *cls, | ||
1044 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1045 | uint64_t first_message_id, | ||
1046 | uint64_t last_message_id, | ||
1047 | uint64_t fragment_limit, | ||
1048 | uint64_t *returned_fragments, | ||
1049 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1050 | void *cb_cls) | ||
1051 | { | ||
1052 | struct Plugin *plugin = cls; | ||
1053 | *returned_fragments = 0; | ||
1054 | |||
1055 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1056 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1057 | GNUNET_PQ_query_param_uint64 (&first_message_id), | ||
1058 | GNUNET_PQ_query_param_uint64 (&last_message_id), | ||
1059 | GNUNET_PQ_query_param_uint64 (&fragment_limit), | ||
1060 | GNUNET_PQ_query_param_end | ||
1061 | }; | ||
1062 | |||
1063 | return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls); | ||
1064 | } | ||
1065 | |||
1066 | |||
1067 | /** | ||
1068 | * Retrieve all fragments of the latest messages. | ||
1069 | * | ||
1070 | * @see GNUNET_PSYCSTORE_message_get_latest() | ||
1071 | * | ||
1072 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1073 | */ | ||
1074 | static int | ||
1075 | message_get_latest (void *cls, | ||
1076 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1077 | uint64_t message_limit, | ||
1078 | uint64_t *returned_fragments, | ||
1079 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1080 | void *cb_cls) | ||
1081 | { | ||
1082 | struct Plugin *plugin = cls; | ||
1083 | *returned_fragments = 0; | ||
1084 | |||
1085 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1086 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1087 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1088 | GNUNET_PQ_query_param_uint64 (&message_limit), | ||
1089 | GNUNET_PQ_query_param_end | ||
1090 | }; | ||
1091 | |||
1092 | return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls); | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | ||
1097 | * Retrieve a fragment of message specified by its message ID and fragment | ||
1098 | * offset. | ||
1099 | * | ||
1100 | * @see GNUNET_PSYCSTORE_message_get_fragment() | ||
1101 | * | ||
1102 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1103 | */ | ||
1104 | static int | ||
1105 | message_get_fragment (void *cls, | ||
1106 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1107 | uint64_t message_id, | ||
1108 | uint64_t fragment_offset, | ||
1109 | GNUNET_PSYCSTORE_FragmentCallback cb, | ||
1110 | void *cb_cls) | ||
1111 | { | ||
1112 | PGresult *res; | ||
1113 | struct Plugin *plugin = cls; | ||
1114 | int ret = GNUNET_SYSERR; | ||
1115 | const char *stmt = "select_message_fragment"; | ||
1116 | |||
1117 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1118 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1119 | GNUNET_PQ_query_param_uint64 (&message_id), | ||
1120 | GNUNET_PQ_query_param_uint64 (&fragment_offset), | ||
1121 | GNUNET_PQ_query_param_end | ||
1122 | }; | ||
1123 | |||
1124 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1125 | if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1126 | res, | ||
1127 | PGRES_COMMAND_OK, | ||
1128 | "PQexecPrepared", stmt)) | ||
1129 | { | ||
1130 | if (PQntuples (res) == 0) | ||
1131 | ret = GNUNET_NO; | ||
1132 | else | ||
1133 | ret = fragment_row (plugin, stmt, res, cb, cb_cls); | ||
1134 | |||
1135 | PQclear (res); | ||
1136 | } | ||
1137 | |||
1138 | return ret; | ||
1139 | } | ||
1140 | |||
1141 | /** | ||
1142 | * Retrieve the max. values of message counters for a channel. | ||
1143 | * | ||
1144 | * @see GNUNET_PSYCSTORE_counters_get() | ||
1145 | * | ||
1146 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1147 | */ | ||
1148 | static int | ||
1149 | counters_message_get (void *cls, | ||
1150 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1151 | uint64_t *max_fragment_id, | ||
1152 | uint64_t *max_message_id, | ||
1153 | uint64_t *max_group_generation) | ||
1154 | { | ||
1155 | PGresult *res; | ||
1156 | struct Plugin *plugin = cls; | ||
1157 | |||
1158 | const char *stmt = "select_counters_message"; | ||
1159 | |||
1160 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1161 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1162 | GNUNET_PQ_query_param_end | ||
1163 | }; | ||
1164 | |||
1165 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1166 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1167 | res, | ||
1168 | PGRES_COMMAND_OK, | ||
1169 | "PQexecPrepared", stmt)) | ||
1170 | { | ||
1171 | return GNUNET_SYSERR; | ||
1172 | } | ||
1173 | |||
1174 | struct GNUNET_PQ_ResultSpec results_select[] = { | ||
1175 | GNUNET_PQ_result_spec_uint64 ("max_fragment_id", max_fragment_id), | ||
1176 | GNUNET_PQ_result_spec_uint64 ("max_message_id", max_message_id), | ||
1177 | GNUNET_PQ_result_spec_uint64 ("max_group_generation", max_group_generation), | ||
1178 | GNUNET_PQ_result_spec_end | ||
1179 | }; | ||
1180 | |||
1181 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, | ||
1182 | results_select, 0)) | ||
1183 | { | ||
1184 | PQclear (res); | ||
1185 | return GNUNET_SYSERR; | ||
1186 | } | ||
1187 | |||
1188 | GNUNET_PQ_cleanup_result(results_select); | ||
1189 | PQclear (res); | ||
1190 | |||
1191 | return GNUNET_OK; | ||
1192 | } | ||
1193 | |||
1194 | /** | ||
1195 | * Retrieve the max. values of state counters for a channel. | ||
1196 | * | ||
1197 | * @see GNUNET_PSYCSTORE_counters_get() | ||
1198 | * | ||
1199 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1200 | */ | ||
1201 | static int | ||
1202 | counters_state_get (void *cls, | ||
1203 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1204 | uint64_t *max_state_message_id) | ||
1205 | { | ||
1206 | PGresult *res; | ||
1207 | struct Plugin *plugin = cls; | ||
1208 | |||
1209 | const char *stmt = "select_counters_state"; | ||
1210 | |||
1211 | int ret = GNUNET_SYSERR; | ||
1212 | |||
1213 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1214 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1215 | GNUNET_PQ_query_param_end | ||
1216 | }; | ||
1217 | |||
1218 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1219 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1220 | res, | ||
1221 | PGRES_COMMAND_OK, | ||
1222 | "PQexecPrepared", stmt)) | ||
1223 | { | ||
1224 | return GNUNET_SYSERR; | ||
1225 | } | ||
1226 | |||
1227 | struct GNUNET_PQ_ResultSpec results_select[] = { | ||
1228 | GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id), | ||
1229 | GNUNET_PQ_result_spec_end | ||
1230 | }; | ||
1231 | |||
1232 | ret = GNUNET_PQ_extract_result (res, | ||
1233 | results_select, 0); | ||
1234 | |||
1235 | if (GNUNET_OK != ret) | ||
1236 | { | ||
1237 | PQclear (res); | ||
1238 | return GNUNET_SYSERR; | ||
1239 | } | ||
1240 | |||
1241 | GNUNET_PQ_cleanup_result(results_select); | ||
1242 | PQclear (res); | ||
1243 | |||
1244 | return ret; | ||
1245 | } | ||
1246 | |||
1247 | |||
1248 | /** | ||
1249 | * Assign a value to a state variable. | ||
1250 | * | ||
1251 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1252 | */ | ||
1253 | static int | ||
1254 | state_assign (struct Plugin *plugin, const char *stmt, | ||
1255 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1256 | const char *name, const void *value, size_t value_size) | ||
1257 | { | ||
1258 | PGresult *res; | ||
1259 | |||
1260 | struct GNUNET_PQ_QueryParam params[] = { | ||
1261 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1262 | GNUNET_PQ_query_param_string (name), | ||
1263 | GNUNET_PQ_query_param_auto_from_type (value), | ||
1264 | GNUNET_PQ_query_param_end | ||
1265 | }; | ||
1266 | |||
1267 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | ||
1268 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1269 | res, | ||
1270 | PGRES_COMMAND_OK, | ||
1271 | "PQexecPrepared", stmt)) | ||
1272 | { | ||
1273 | return GNUNET_SYSERR; | ||
1274 | } | ||
1275 | |||
1276 | PQclear (res); | ||
1277 | |||
1278 | return GNUNET_OK; | ||
1279 | } | ||
1280 | |||
1281 | |||
1282 | static int | ||
1283 | update_message_id (struct Plugin *plugin, const char *stmt, | ||
1284 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1285 | uint64_t message_id) | ||
1286 | { | ||
1287 | PGresult *res; | ||
1288 | |||
1289 | struct GNUNET_PQ_QueryParam params[] = { | ||
1290 | GNUNET_PQ_query_param_uint64 (&message_id), | ||
1291 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1292 | GNUNET_PQ_query_param_end | ||
1293 | }; | ||
1294 | |||
1295 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | ||
1296 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1297 | res, | ||
1298 | PGRES_COMMAND_OK, | ||
1299 | "PQexecPrepared", stmt)) | ||
1300 | { | ||
1301 | return GNUNET_SYSERR; | ||
1302 | } | ||
1303 | |||
1304 | PQclear (res); | ||
1305 | |||
1306 | return GNUNET_OK; | ||
1307 | } | ||
1308 | |||
1309 | |||
1310 | /** | ||
1311 | * Begin modifying current state. | ||
1312 | */ | ||
1313 | static int | ||
1314 | state_modify_begin (void *cls, | ||
1315 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1316 | uint64_t message_id, uint64_t state_delta) | ||
1317 | { | ||
1318 | struct Plugin *plugin = cls; | ||
1319 | |||
1320 | if (state_delta > 0) | ||
1321 | { | ||
1322 | /** | ||
1323 | * We can only apply state modifiers in the current message if modifiers in | ||
1324 | * the previous stateful message (message_id - state_delta) were already | ||
1325 | * applied. | ||
1326 | */ | ||
1327 | |||
1328 | uint64_t max_state_message_id = 0; | ||
1329 | int ret = counters_state_get (plugin, channel_key, &max_state_message_id); | ||
1330 | switch (ret) | ||
1331 | { | ||
1332 | case GNUNET_OK: | ||
1333 | case GNUNET_NO: // no state yet | ||
1334 | ret = GNUNET_OK; | ||
1335 | break; | ||
1336 | default: | ||
1337 | return ret; | ||
1338 | } | ||
1339 | |||
1340 | if (max_state_message_id < message_id - state_delta) | ||
1341 | return GNUNET_NO; /* some stateful messages not yet applied */ | ||
1342 | else if (message_id - state_delta < max_state_message_id) | ||
1343 | return GNUNET_NO; /* changes already applied */ | ||
1344 | } | ||
1345 | |||
1346 | if (TRANSACTION_NONE != plugin->transaction) | ||
1347 | { | ||
1348 | /** @todo FIXME: wait for other transaction to finish */ | ||
1349 | return GNUNET_SYSERR; | ||
1350 | } | ||
1351 | return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); | ||
1352 | } | ||
1353 | |||
1354 | |||
1355 | /** | ||
1356 | * Set the current value of state variable. | ||
1357 | * | ||
1358 | * @see GNUNET_PSYCSTORE_state_modify() | ||
1359 | * | ||
1360 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1361 | */ | ||
1362 | static int | ||
1363 | state_modify_op (void *cls, | ||
1364 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1365 | enum GNUNET_PSYC_Operator op, | ||
1366 | const char *name, const void *value, size_t value_size) | ||
1367 | { | ||
1368 | struct Plugin *plugin = cls; | ||
1369 | GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); | ||
1370 | |||
1371 | switch (op) | ||
1372 | { | ||
1373 | case GNUNET_PSYC_OP_ASSIGN: | ||
1374 | return state_assign (plugin, "insert_state_current", channel_key, | ||
1375 | name, value, value_size); | ||
1376 | |||
1377 | default: /** @todo implement more state operations */ | ||
1378 | GNUNET_break (0); | ||
1379 | return GNUNET_SYSERR; | ||
1380 | } | ||
1381 | } | ||
1382 | |||
1383 | |||
1384 | /** | ||
1385 | * End modifying current state. | ||
1386 | */ | ||
1387 | static int | ||
1388 | state_modify_end (void *cls, | ||
1389 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1390 | uint64_t message_id) | ||
1391 | { | ||
1392 | struct Plugin *plugin = cls; | ||
1393 | GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); | ||
1394 | |||
1395 | return | ||
1396 | GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key) | ||
1397 | && GNUNET_OK == update_message_id (plugin, | ||
1398 | "update_max_state_message_id", | ||
1399 | channel_key, message_id) | ||
1400 | && GNUNET_OK == transaction_commit (plugin) | ||
1401 | ? GNUNET_OK : GNUNET_SYSERR; | ||
1402 | } | ||
1403 | |||
1404 | |||
1405 | /** | ||
1406 | * Begin state synchronization. | ||
1407 | */ | ||
1408 | static int | ||
1409 | state_sync_begin (void *cls, | ||
1410 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1411 | { | ||
1412 | struct Plugin *plugin = cls; | ||
1413 | return exec_channel (plugin, "delete_state_sync", channel_key); | ||
1414 | } | ||
1415 | |||
1416 | |||
1417 | /** | ||
1418 | * Assign current value of a state variable. | ||
1419 | * | ||
1420 | * @see GNUNET_PSYCSTORE_state_modify() | ||
1421 | * | ||
1422 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1423 | */ | ||
1424 | static int | ||
1425 | state_sync_assign (void *cls, | ||
1426 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1427 | const char *name, const void *value, size_t value_size) | ||
1428 | { | ||
1429 | struct Plugin *plugin = cls; | ||
1430 | return state_assign (plugin, "insert_state_sync", channel_key, | ||
1431 | name, value, value_size); | ||
1432 | } | ||
1433 | |||
1434 | |||
1435 | /** | ||
1436 | * End modifying current state. | ||
1437 | */ | ||
1438 | static int | ||
1439 | state_sync_end (void *cls, | ||
1440 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1441 | uint64_t max_state_message_id, | ||
1442 | uint64_t state_hash_message_id) | ||
1443 | { | ||
1444 | struct Plugin *plugin = cls; | ||
1445 | int ret = GNUNET_SYSERR; | ||
1446 | |||
1447 | if (TRANSACTION_NONE != plugin->transaction) | ||
1448 | { | ||
1449 | /** @todo FIXME: wait for other transaction to finish */ | ||
1450 | return GNUNET_SYSERR; | ||
1451 | } | ||
1452 | |||
1453 | GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) | ||
1454 | && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key) | ||
1455 | && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync", | ||
1456 | channel_key) | ||
1457 | && GNUNET_OK == exec_channel (plugin, "delete_state_sync", | ||
1458 | channel_key) | ||
1459 | && GNUNET_OK == update_message_id (plugin, | ||
1460 | "update_state_hash_message_id", | ||
1461 | channel_key, state_hash_message_id) | ||
1462 | && GNUNET_OK == update_message_id (plugin, | ||
1463 | "update_max_state_message_id", | ||
1464 | channel_key, max_state_message_id) | ||
1465 | && GNUNET_OK == transaction_commit (plugin) | ||
1466 | ? ret = GNUNET_OK | ||
1467 | : transaction_rollback (plugin); | ||
1468 | return ret; | ||
1469 | } | ||
1470 | |||
1471 | |||
1472 | /** | ||
1473 | * Delete the whole state. | ||
1474 | * | ||
1475 | * @see GNUNET_PSYCSTORE_state_reset() | ||
1476 | * | ||
1477 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1478 | */ | ||
1479 | static int | ||
1480 | state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1481 | { | ||
1482 | struct Plugin *plugin = cls; | ||
1483 | return exec_channel (plugin, "delete_state", channel_key); | ||
1484 | } | ||
1485 | |||
1486 | |||
1487 | /** | ||
1488 | * Update signed values of state variables in the state store. | ||
1489 | * | ||
1490 | * @see GNUNET_PSYCSTORE_state_hash_update() | ||
1491 | * | ||
1492 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1493 | */ | ||
1494 | static int | ||
1495 | state_update_signed (void *cls, | ||
1496 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | ||
1497 | { | ||
1498 | struct Plugin *plugin = cls; | ||
1499 | return exec_channel (plugin, "update_state_signed", channel_key); | ||
1500 | } | ||
1501 | |||
1502 | |||
1503 | /** | ||
1504 | * Retrieve a state variable by name. | ||
1505 | * | ||
1506 | * @see GNUNET_PSYCSTORE_state_get() | ||
1507 | * | ||
1508 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1509 | */ | ||
1510 | static int | ||
1511 | state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1512 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | ||
1513 | { | ||
1514 | PGresult *res; | ||
1515 | |||
1516 | struct Plugin *plugin = cls; | ||
1517 | int ret = GNUNET_SYSERR; | ||
1518 | |||
1519 | const char *stmt = "select_state_one"; | ||
1520 | |||
1521 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1522 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1523 | GNUNET_PQ_query_param_string (name), | ||
1524 | GNUNET_PQ_query_param_end | ||
1525 | }; | ||
1526 | |||
1527 | void *value_current = NULL; | ||
1528 | size_t value_size = 0; | ||
1529 | |||
1530 | struct GNUNET_PQ_ResultSpec results[] = { | ||
1531 | GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), | ||
1532 | GNUNET_PQ_result_spec_end | ||
1533 | }; | ||
1534 | |||
1535 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1536 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1537 | res, | ||
1538 | PGRES_COMMAND_OK, | ||
1539 | "PQexecPrepared", stmt)) | ||
1540 | { | ||
1541 | return GNUNET_SYSERR; | ||
1542 | } | ||
1543 | |||
1544 | if (PQntuples (res) == 0) | ||
1545 | { | ||
1546 | PQclear (res); | ||
1547 | ret = GNUNET_NO; | ||
1548 | } | ||
1549 | |||
1550 | ret = GNUNET_PQ_extract_result (res, | ||
1551 | results, 0); | ||
1552 | |||
1553 | if (GNUNET_OK != ret) | ||
1554 | { | ||
1555 | PQclear (res); | ||
1556 | return GNUNET_SYSERR; | ||
1557 | } | ||
1558 | |||
1559 | ret = cb (cb_cls, name, value_current, | ||
1560 | value_size); | ||
1561 | |||
1562 | GNUNET_PQ_cleanup_result(results); | ||
1563 | PQclear (res); | ||
1564 | |||
1565 | return ret; | ||
1566 | } | ||
1567 | |||
1568 | |||
1569 | /** | ||
1570 | * Retrieve all state variables for a channel with the given prefix. | ||
1571 | * | ||
1572 | * @see GNUNET_PSYCSTORE_state_get_prefix() | ||
1573 | * | ||
1574 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1575 | */ | ||
1576 | static int | ||
1577 | state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1578 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, | ||
1579 | void *cb_cls) | ||
1580 | { | ||
1581 | PGresult *res; | ||
1582 | struct Plugin *plugin = cls; | ||
1583 | int ret = GNUNET_SYSERR; | ||
1584 | |||
1585 | const char *stmt = "select_state_prefix"; | ||
1586 | |||
1587 | uint32_t name_len = (uint32_t) strlen (name); | ||
1588 | |||
1589 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1590 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1591 | GNUNET_PQ_query_param_string (name), | ||
1592 | GNUNET_PQ_query_param_uint32 (&name_len), | ||
1593 | GNUNET_PQ_query_param_string (name), | ||
1594 | GNUNET_PQ_query_param_end | ||
1595 | }; | ||
1596 | |||
1597 | char *name2 = ""; | ||
1598 | void *value_current = NULL; | ||
1599 | size_t value_size = 0; | ||
1600 | |||
1601 | struct GNUNET_PQ_ResultSpec results[] = { | ||
1602 | GNUNET_PQ_result_spec_string ("name", &name2), | ||
1603 | GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), | ||
1604 | GNUNET_PQ_result_spec_end | ||
1605 | }; | ||
1606 | |||
1607 | do | ||
1608 | { | ||
1609 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1610 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1611 | res, | ||
1612 | PGRES_COMMAND_OK, | ||
1613 | "PQexecPrepared", stmt)) | ||
1614 | { | ||
1615 | break; | ||
1616 | } | ||
1617 | |||
1618 | if (PQntuples (res) == 0) | ||
1619 | { | ||
1620 | PQclear (res); | ||
1621 | ret = GNUNET_NO; | ||
1622 | break; | ||
1623 | } | ||
1624 | |||
1625 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0)) | ||
1626 | { | ||
1627 | PQclear (res); | ||
1628 | break; | ||
1629 | } | ||
1630 | |||
1631 | ret = cb (cb_cls, (const char *) name2, | ||
1632 | value_current, | ||
1633 | value_size); | ||
1634 | GNUNET_PQ_cleanup_result(results); | ||
1635 | } | ||
1636 | while (ret == GNUNET_YES); | ||
1637 | |||
1638 | PQclear (res); | ||
1639 | |||
1640 | return ret; | ||
1641 | } | ||
1642 | |||
1643 | |||
1644 | /** | ||
1645 | * Retrieve all signed state variables for a channel. | ||
1646 | * | ||
1647 | * @see GNUNET_PSYCSTORE_state_get_signed() | ||
1648 | * | ||
1649 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR | ||
1650 | */ | ||
1651 | static int | ||
1652 | state_get_signed (void *cls, | ||
1653 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | ||
1654 | GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | ||
1655 | { | ||
1656 | PGresult *res; | ||
1657 | struct Plugin *plugin = cls; | ||
1658 | int ret = GNUNET_SYSERR; | ||
1659 | |||
1660 | const char *stmt = "select_state_signed"; | ||
1661 | |||
1662 | struct GNUNET_PQ_QueryParam params_select[] = { | ||
1663 | GNUNET_PQ_query_param_auto_from_type (channel_key), | ||
1664 | GNUNET_PQ_query_param_end | ||
1665 | }; | ||
1666 | |||
1667 | char *name = ""; | ||
1668 | void *value_signed = NULL; | ||
1669 | size_t value_size = 0; | ||
1670 | |||
1671 | struct GNUNET_PQ_ResultSpec results[] = { | ||
1672 | GNUNET_PQ_result_spec_string ("name", &name), | ||
1673 | GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size), | ||
1674 | GNUNET_PQ_result_spec_end | ||
1675 | }; | ||
1676 | |||
1677 | do | ||
1678 | { | ||
1679 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1680 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1681 | res, | ||
1682 | PGRES_COMMAND_OK, | ||
1683 | "PQexecPrepared", stmt)) | ||
1684 | { | ||
1685 | break; | ||
1686 | } | ||
1687 | |||
1688 | if (PQntuples (res) == 0) | ||
1689 | { | ||
1690 | PQclear (res); | ||
1691 | ret = GNUNET_NO; | ||
1692 | break; | ||
1693 | } | ||
1694 | |||
1695 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0)) | ||
1696 | { | ||
1697 | PQclear (res); | ||
1698 | break; | ||
1699 | } | ||
1700 | |||
1701 | ret = cb (cb_cls, (const char *) name, | ||
1702 | value_signed, | ||
1703 | value_size); | ||
1704 | |||
1705 | GNUNET_PQ_cleanup_result(results); | ||
1706 | } | ||
1707 | while (ret == GNUNET_YES); | ||
1708 | |||
1709 | PQclear (res); | ||
1710 | |||
1711 | return ret; | ||
1712 | } | ||
1713 | |||
1714 | |||
1715 | /** | ||
1716 | * Entry point for the plugin. | ||
1717 | * | ||
1718 | * @param cls The struct GNUNET_CONFIGURATION_Handle. | ||
1719 | * @return NULL on error, otherwise the plugin context | ||
1720 | */ | ||
1721 | void * | ||
1722 | libgnunet_plugin_psycstore_postgres_init (void *cls) | ||
1723 | { | ||
1724 | static struct Plugin plugin; | ||
1725 | const struct GNUNET_CONFIGURATION_Handle *cfg = cls; | ||
1726 | struct GNUNET_PSYCSTORE_PluginFunctions *api; | ||
1727 | |||
1728 | if (NULL != plugin.cfg) | ||
1729 | return NULL; /* can only initialize once! */ | ||
1730 | memset (&plugin, 0, sizeof (struct Plugin)); | ||
1731 | plugin.cfg = cfg; | ||
1732 | if (GNUNET_OK != database_setup (&plugin)) | ||
1733 | { | ||
1734 | database_shutdown (&plugin); | ||
1735 | return NULL; | ||
1736 | } | ||
1737 | api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions); | ||
1738 | api->cls = &plugin; | ||
1739 | api->membership_store = &postgres_membership_store; | ||
1740 | api->membership_test = &membership_test; | ||
1741 | api->fragment_store = &fragment_store; | ||
1742 | api->message_add_flags = &message_add_flags; | ||
1743 | api->fragment_get = &fragment_get; | ||
1744 | api->fragment_get_latest = &fragment_get_latest; | ||
1745 | api->message_get = &message_get; | ||
1746 | api->message_get_latest = &message_get_latest; | ||
1747 | api->message_get_fragment = &message_get_fragment; | ||
1748 | api->counters_message_get = &counters_message_get; | ||
1749 | api->counters_state_get = &counters_state_get; | ||
1750 | api->state_modify_begin = &state_modify_begin; | ||
1751 | api->state_modify_op = &state_modify_op; | ||
1752 | api->state_modify_end = &state_modify_end; | ||
1753 | api->state_sync_begin = &state_sync_begin; | ||
1754 | api->state_sync_assign = &state_sync_assign; | ||
1755 | api->state_sync_end = &state_sync_end; | ||
1756 | api->state_reset = &state_reset; | ||
1757 | api->state_update_signed = &state_update_signed; | ||
1758 | api->state_get = &state_get; | ||
1759 | api->state_get_prefix = &state_get_prefix; | ||
1760 | api->state_get_signed = &state_get_signed; | ||
1761 | |||
1762 | LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n")); | ||
1763 | return api; | ||
1764 | } | ||
1765 | |||
1766 | |||
1767 | /** | ||
1768 | * Exit point from the plugin. | ||
1769 | * | ||
1770 | * @param cls The plugin context (as returned by "init") | ||
1771 | * @return Always NULL | ||
1772 | */ | ||
1773 | void * | ||
1774 | libgnunet_plugin_psycstore_postgres_done (void *cls) | ||
1775 | { | ||
1776 | struct GNUNET_PSYCSTORE_PluginFunctions *api = cls; | ||
1777 | struct Plugin *plugin = api->cls; | ||
1778 | |||
1779 | database_shutdown (plugin); | ||
1780 | plugin->cfg = NULL; | ||
1781 | GNUNET_free (api); | ||
1782 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n"); | ||
1783 | return NULL; | ||
1784 | } | ||
1785 | |||
1786 | /* end of plugin_psycstore_postgres.c */ | ||
diff --git a/src/psycstore/psycstore.conf.in b/src/psycstore/psycstore.conf.in index 236471e36..82e9e67c2 100644 --- a/src/psycstore/psycstore.conf.in +++ b/src/psycstore/psycstore.conf.in | |||
@@ -23,3 +23,6 @@ CONFIG = ~/.my.cnf | |||
23 | # PASSWORD = | 23 | # PASSWORD = |
24 | # HOST = localhost | 24 | # HOST = localhost |
25 | # PORT = 3306 | 25 | # PORT = 3306 |
26 | |||
27 | [psycstore-postgres] | ||
28 | CONFIG = connect_timeout=10; dbname=gnunet | ||
diff --git a/src/psycstore/test_plugin_psycstore_postgres.conf b/src/psycstore/test_plugin_psycstore_postgres.conf new file mode 100644 index 000000000..55d12005c --- /dev/null +++ b/src/psycstore/test_plugin_psycstore_postgres.conf | |||
@@ -0,0 +1,2 @@ | |||
1 | [psycstore-postgres] | ||
2 | CONFIG = connect_timeout=10; dbname=gnunet | ||