diff options
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r-- | src/datastore/plugin_datastore_postgres.c | 980 |
1 files changed, 0 insertions, 980 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c deleted file mode 100644 index 8fb0bf6ee..000000000 --- a/src/datastore/plugin_datastore_postgres.c +++ /dev/null | |||
@@ -1,980 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | Copyright (C) 2009-2017 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file datastore/plugin_datastore_postgres.c | ||
23 | * @brief postgres-based datastore backend | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_datastore_plugin.h" | ||
28 | #include "gnunet_pq_lib.h" | ||
29 | |||
30 | |||
31 | /** | ||
32 | * After how many ms "busy" should a DB operation fail for good? | ||
33 | * A low value makes sure that we are more responsive to requests | ||
34 | * (especially PUTs). A high value guarantees a higher success | ||
35 | * rate (SELECTs in iterate can take several seconds despite LIMIT=1). | ||
36 | * | ||
37 | * The default value of 1s should ensure that users do not experience | ||
38 | * huge latencies while at the same time allowing operations to succeed | ||
39 | * with reasonable probability. | ||
40 | */ | ||
41 | #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS | ||
42 | |||
43 | |||
44 | /** | ||
45 | * Context for all functions in this plugin. | ||
46 | */ | ||
47 | struct Plugin | ||
48 | { | ||
49 | /** | ||
50 | * Our execution environment. | ||
51 | */ | ||
52 | struct GNUNET_DATASTORE_PluginEnvironment *env; | ||
53 | |||
54 | /** | ||
55 | * Native Postgres database handle. | ||
56 | */ | ||
57 | struct GNUNET_PQ_Context *dbh; | ||
58 | }; | ||
59 | |||
60 | |||
61 | /** | ||
62 | * @brief Get a database handle | ||
63 | * | ||
64 | * @param plugin global context | ||
65 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on error | ||
66 | */ | ||
67 | static int | ||
68 | init_connection (struct Plugin *plugin) | ||
69 | { | ||
70 | struct GNUNET_PQ_ExecuteStatement es[] = { | ||
71 | /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because | ||
72 | * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel | ||
73 | * we do math or inequality tests, so we can't handle the entire range of uint32_t. | ||
74 | * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC. | ||
75 | */ | ||
76 | GNUNET_PQ_make_try_execute ( | ||
77 | "CREATE SEQUENCE IF NOT EXISTS gn090_oid_seq"), | ||
78 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 (" | ||
79 | " repl INTEGER NOT NULL DEFAULT 0," | ||
80 | " type INTEGER NOT NULL DEFAULT 0," | ||
81 | " prio INTEGER NOT NULL DEFAULT 0," | ||
82 | " anonLevel INTEGER NOT NULL DEFAULT 0," | ||
83 | " expire BIGINT NOT NULL DEFAULT 0," | ||
84 | " rvalue BIGINT NOT NULL DEFAULT 0," | ||
85 | " hash BYTEA NOT NULL DEFAULT ''," | ||
86 | " vhash BYTEA NOT NULL DEFAULT ''," | ||
87 | " value BYTEA NOT NULL DEFAULT ''," | ||
88 | " oid OID NOT NULL DEFAULT nextval('gn090_oid_seq'))"), | ||
89 | GNUNET_PQ_make_try_execute ( | ||
90 | "ALTER SEQUENCE gn090_oid_seq OWNED BY gn090.oid"), | ||
91 | GNUNET_PQ_make_try_execute ( | ||
92 | "CREATE INDEX IF NOT EXISTS oid_hash ON gn090 (oid)"), | ||
93 | GNUNET_PQ_make_try_execute ( | ||
94 | "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"), | ||
95 | GNUNET_PQ_make_try_execute ( | ||
96 | "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"), | ||
97 | GNUNET_PQ_make_try_execute ( | ||
98 | "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"), | ||
99 | GNUNET_PQ_make_try_execute ( | ||
100 | "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"), | ||
101 | GNUNET_PQ_make_try_execute ( | ||
102 | "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"), | ||
103 | GNUNET_PQ_make_try_execute ( | ||
104 | "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"), | ||
105 | GNUNET_PQ_make_try_execute ( | ||
106 | "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"), | ||
107 | GNUNET_PQ_make_execute ( | ||
108 | "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"), | ||
109 | GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"), | ||
110 | GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"), | ||
111 | GNUNET_PQ_EXECUTE_STATEMENT_END | ||
112 | }; | ||
113 | |||
114 | #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid" | ||
115 | struct GNUNET_PQ_PreparedStatement ps[] = { | ||
116 | GNUNET_PQ_make_prepare ("get", | ||
117 | "SELECT " RESULT_COLUMNS " FROM gn090" | ||
118 | " WHERE oid >= $1::bigint AND" | ||
119 | " (rvalue >= $2 OR 0 = $3::smallint) AND" | ||
120 | " (hash = $4 OR 0 = $5::smallint) AND" | ||
121 | " (type = $6 OR 0 = $7::smallint)" | ||
122 | " ORDER BY oid ASC LIMIT 1", | ||
123 | 7), | ||
124 | GNUNET_PQ_make_prepare ("put", | ||
125 | "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) " | ||
126 | "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", | ||
127 | 9), | ||
128 | GNUNET_PQ_make_prepare ("update", | ||
129 | "UPDATE gn090" | ||
130 | " SET prio = prio + $1," | ||
131 | " repl = repl + $2," | ||
132 | " expire = GREATEST(expire, $3)" | ||
133 | " WHERE hash = $4 AND vhash = $5", | ||
134 | 5), | ||
135 | GNUNET_PQ_make_prepare ("decrepl", | ||
136 | "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) " | ||
137 | "WHERE oid = $1", | ||
138 | 1), | ||
139 | GNUNET_PQ_make_prepare ("select_non_anonymous", | ||
140 | "SELECT " RESULT_COLUMNS " FROM gn090 " | ||
141 | "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint " | ||
142 | "ORDER BY oid ASC LIMIT 1", | ||
143 | 2), | ||
144 | GNUNET_PQ_make_prepare ("select_expiration_order", | ||
145 | "(SELECT " RESULT_COLUMNS " FROM gn090 " | ||
146 | "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " | ||
147 | "UNION " | ||
148 | "(SELECT " RESULT_COLUMNS " FROM gn090 " | ||
149 | "ORDER BY prio ASC LIMIT 1) " | ||
150 | "ORDER BY expire ASC LIMIT 1", | ||
151 | 1), | ||
152 | GNUNET_PQ_make_prepare ("select_replication_order", | ||
153 | "SELECT " RESULT_COLUMNS " FROM gn090 " | ||
154 | "ORDER BY repl DESC,RANDOM() LIMIT 1", | ||
155 | 0), | ||
156 | GNUNET_PQ_make_prepare ("delrow", | ||
157 | "DELETE FROM gn090 " | ||
158 | "WHERE oid=$1", | ||
159 | 1), | ||
160 | GNUNET_PQ_make_prepare ("remove", | ||
161 | "DELETE FROM gn090" | ||
162 | " WHERE hash = $1 AND" | ||
163 | " value = $2", | ||
164 | 2), | ||
165 | GNUNET_PQ_make_prepare ("get_keys", | ||
166 | "SELECT hash FROM gn090", | ||
167 | 0), | ||
168 | GNUNET_PQ_make_prepare ("estimate_size", | ||
169 | "SELECT CASE WHEN NOT EXISTS" | ||
170 | " (SELECT 1 FROM gn090)" | ||
171 | " THEN 0" | ||
172 | " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)" | ||
173 | "END AS total", | ||
174 | 0), | ||
175 | GNUNET_PQ_PREPARED_STATEMENT_END | ||
176 | }; | ||
177 | #undef RESULT_COLUMNS | ||
178 | |||
179 | plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg, | ||
180 | "datastore-postgres", | ||
181 | NULL, | ||
182 | es, | ||
183 | ps); | ||
184 | if (NULL == plugin->dbh) | ||
185 | return GNUNET_SYSERR; | ||
186 | return GNUNET_OK; | ||
187 | } | ||
188 | |||
189 | |||
190 | /** | ||
191 | * Get an estimate of how much space the database is | ||
192 | * currently using. | ||
193 | * | ||
194 | * @param cls our `struct Plugin *` | ||
195 | * @return number of bytes used on disk | ||
196 | */ | ||
197 | static void | ||
198 | postgres_plugin_estimate_size (void *cls, | ||
199 | unsigned long long *estimate) | ||
200 | { | ||
201 | struct Plugin *plugin = cls; | ||
202 | uint64_t total; | ||
203 | struct GNUNET_PQ_QueryParam params[] = { | ||
204 | GNUNET_PQ_query_param_end | ||
205 | }; | ||
206 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
207 | GNUNET_PQ_result_spec_uint64 ("total", | ||
208 | &total), | ||
209 | GNUNET_PQ_result_spec_end | ||
210 | }; | ||
211 | enum GNUNET_DB_QueryStatus ret; | ||
212 | |||
213 | if (NULL == estimate) | ||
214 | return; | ||
215 | ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, | ||
216 | "estimate_size", | ||
217 | params, | ||
218 | rs); | ||
219 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret) | ||
220 | { | ||
221 | *estimate = 0LL; | ||
222 | return; | ||
223 | } | ||
224 | *estimate = total; | ||
225 | } | ||
226 | |||
227 | |||
228 | /** | ||
229 | * Store an item in the datastore. | ||
230 | * | ||
231 | * @param cls closure with the `struct Plugin` | ||
232 | * @param key key for the item | ||
233 | * @param absent true if the key was not found in the bloom filter | ||
234 | * @param size number of bytes in data | ||
235 | * @param data content stored | ||
236 | * @param type type of the content | ||
237 | * @param priority priority of the content | ||
238 | * @param anonymity anonymity-level for the content | ||
239 | * @param replication replication-level for the content | ||
240 | * @param expiration expiration time for the content | ||
241 | * @param cont continuation called with success or failure status | ||
242 | * @param cont_cls continuation closure | ||
243 | */ | ||
244 | static void | ||
245 | postgres_plugin_put (void *cls, | ||
246 | const struct GNUNET_HashCode *key, | ||
247 | bool absent, | ||
248 | uint32_t size, | ||
249 | const void *data, | ||
250 | enum GNUNET_BLOCK_Type type, | ||
251 | uint32_t priority, | ||
252 | uint32_t anonymity, | ||
253 | uint32_t replication, | ||
254 | struct GNUNET_TIME_Absolute expiration, | ||
255 | PluginPutCont cont, | ||
256 | void *cont_cls) | ||
257 | { | ||
258 | struct Plugin *plugin = cls; | ||
259 | struct GNUNET_HashCode vhash; | ||
260 | enum GNUNET_DB_QueryStatus ret; | ||
261 | |||
262 | GNUNET_CRYPTO_hash (data, | ||
263 | size, | ||
264 | &vhash); | ||
265 | if (! absent) | ||
266 | { | ||
267 | struct GNUNET_PQ_QueryParam params[] = { | ||
268 | GNUNET_PQ_query_param_uint32 (&priority), | ||
269 | GNUNET_PQ_query_param_uint32 (&replication), | ||
270 | GNUNET_PQ_query_param_absolute_time (&expiration), | ||
271 | GNUNET_PQ_query_param_auto_from_type (key), | ||
272 | GNUNET_PQ_query_param_auto_from_type (&vhash), | ||
273 | GNUNET_PQ_query_param_end | ||
274 | }; | ||
275 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
276 | "update", | ||
277 | params); | ||
278 | if (0 > ret) | ||
279 | { | ||
280 | cont (cont_cls, | ||
281 | key, | ||
282 | size, | ||
283 | GNUNET_SYSERR, | ||
284 | _ ("Postgresql exec failure")); | ||
285 | return; | ||
286 | } | ||
287 | bool affected = (0 != ret); | ||
288 | if (affected) | ||
289 | { | ||
290 | cont (cont_cls, | ||
291 | key, | ||
292 | size, | ||
293 | GNUNET_NO, | ||
294 | NULL); | ||
295 | return; | ||
296 | } | ||
297 | } | ||
298 | |||
299 | { | ||
300 | uint32_t utype = (uint32_t) type; | ||
301 | uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
302 | UINT64_MAX); | ||
303 | struct GNUNET_PQ_QueryParam params[] = { | ||
304 | GNUNET_PQ_query_param_uint32 (&replication), | ||
305 | GNUNET_PQ_query_param_uint32 (&utype), | ||
306 | GNUNET_PQ_query_param_uint32 (&priority), | ||
307 | GNUNET_PQ_query_param_uint32 (&anonymity), | ||
308 | GNUNET_PQ_query_param_absolute_time (&expiration), | ||
309 | GNUNET_PQ_query_param_uint64 (&rvalue), | ||
310 | GNUNET_PQ_query_param_auto_from_type (key), | ||
311 | GNUNET_PQ_query_param_auto_from_type (&vhash), | ||
312 | GNUNET_PQ_query_param_fixed_size (data, size), | ||
313 | GNUNET_PQ_query_param_end | ||
314 | }; | ||
315 | |||
316 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
317 | "put", | ||
318 | params); | ||
319 | if (0 > ret) | ||
320 | { | ||
321 | cont (cont_cls, | ||
322 | key, | ||
323 | size, | ||
324 | GNUNET_SYSERR, | ||
325 | "Postgresql exec failure"); | ||
326 | return; | ||
327 | } | ||
328 | } | ||
329 | plugin->env->duc (plugin->env->cls, | ||
330 | size + GNUNET_DATASTORE_ENTRY_OVERHEAD); | ||
331 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
332 | "datastore-postgres", | ||
333 | "Stored %u bytes in database\n", | ||
334 | (unsigned int) size); | ||
335 | cont (cont_cls, | ||
336 | key, | ||
337 | size, | ||
338 | GNUNET_OK, | ||
339 | NULL); | ||
340 | } | ||
341 | |||
342 | |||
343 | /** | ||
344 | * Closure for #process_result. | ||
345 | */ | ||
346 | struct ProcessResultContext | ||
347 | { | ||
348 | /** | ||
349 | * The plugin handle. | ||
350 | */ | ||
351 | struct Plugin *plugin; | ||
352 | |||
353 | /** | ||
354 | * Function to call on each result. | ||
355 | */ | ||
356 | PluginDatumProcessor proc; | ||
357 | |||
358 | /** | ||
359 | * Closure for @e proc. | ||
360 | */ | ||
361 | void *proc_cls; | ||
362 | }; | ||
363 | |||
364 | |||
365 | /** | ||
366 | * Function invoked to process the result and call the processor of @a | ||
367 | * cls. | ||
368 | * | ||
369 | * @param cls our `struct ProcessResultContext` | ||
370 | * @param res result from exec | ||
371 | * @param num_results number of results in @a res | ||
372 | */ | ||
373 | static void | ||
374 | process_result (void *cls, | ||
375 | PGresult *res, | ||
376 | unsigned int num_results) | ||
377 | { | ||
378 | struct ProcessResultContext *prc = cls; | ||
379 | struct Plugin *plugin = prc->plugin; | ||
380 | |||
381 | if (0 == num_results) | ||
382 | { | ||
383 | /* no result */ | ||
384 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
385 | "datastore-postgres", | ||
386 | "Ending iteration (no more results)\n"); | ||
387 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
388 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
389 | return; | ||
390 | } | ||
391 | if (1 != num_results) | ||
392 | { | ||
393 | GNUNET_break (0); | ||
394 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
395 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
396 | return; | ||
397 | } | ||
398 | /* Technically we don't need the loop here, but nicer in case | ||
399 | we ever relax the condition above. */ | ||
400 | for (unsigned int i = 0; i < num_results; i++) | ||
401 | { | ||
402 | int iret; | ||
403 | uint32_t rowid; | ||
404 | uint32_t utype; | ||
405 | uint32_t anonymity; | ||
406 | uint32_t replication; | ||
407 | uint32_t priority; | ||
408 | size_t size; | ||
409 | void *data; | ||
410 | struct GNUNET_TIME_Absolute expiration_time; | ||
411 | struct GNUNET_HashCode key; | ||
412 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
413 | GNUNET_PQ_result_spec_uint32 ("repl", &replication), | ||
414 | GNUNET_PQ_result_spec_uint32 ("type", &utype), | ||
415 | GNUNET_PQ_result_spec_uint32 ("prio", &priority), | ||
416 | GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), | ||
417 | GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), | ||
418 | GNUNET_PQ_result_spec_auto_from_type ("hash", &key), | ||
419 | GNUNET_PQ_result_spec_variable_size ("value", &data, &size), | ||
420 | GNUNET_PQ_result_spec_uint32 ("oid", &rowid), | ||
421 | GNUNET_PQ_result_spec_end | ||
422 | }; | ||
423 | |||
424 | if (GNUNET_OK != | ||
425 | GNUNET_PQ_extract_result (res, | ||
426 | rs, | ||
427 | i)) | ||
428 | { | ||
429 | GNUNET_break (0); | ||
430 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
431 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
432 | return; | ||
433 | } | ||
434 | |||
435 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
436 | "datastore-postgres", | ||
437 | "Found result of size %u bytes and type %u in database\n", | ||
438 | (unsigned int) size, | ||
439 | (unsigned int) utype); | ||
440 | iret = prc->proc (prc->proc_cls, | ||
441 | &key, | ||
442 | size, | ||
443 | data, | ||
444 | (enum GNUNET_BLOCK_Type) utype, | ||
445 | priority, | ||
446 | anonymity, | ||
447 | replication, | ||
448 | expiration_time, | ||
449 | rowid); | ||
450 | if (iret == GNUNET_NO) | ||
451 | { | ||
452 | struct GNUNET_PQ_QueryParam param[] = { | ||
453 | GNUNET_PQ_query_param_uint32 (&rowid), | ||
454 | GNUNET_PQ_query_param_end | ||
455 | }; | ||
456 | |||
457 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
458 | "Processor asked for item %u to be removed.\n", | ||
459 | (unsigned int) rowid); | ||
460 | if (0 < | ||
461 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
462 | "delrow", | ||
463 | param)) | ||
464 | { | ||
465 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
466 | "datastore-postgres", | ||
467 | "Deleting %u bytes from database\n", | ||
468 | (unsigned int) size); | ||
469 | plugin->env->duc (plugin->env->cls, | ||
470 | -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); | ||
471 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
472 | "datastore-postgres", | ||
473 | "Deleted %u bytes from database\n", | ||
474 | (unsigned int) size); | ||
475 | } | ||
476 | } | ||
477 | GNUNET_PQ_cleanup_result (rs); | ||
478 | } /* for (i) */ | ||
479 | } | ||
480 | |||
481 | |||
482 | /** | ||
483 | * Get one of the results for a particular key in the datastore. | ||
484 | * | ||
485 | * @param cls closure with the `struct Plugin` | ||
486 | * @param next_uid return the result with lowest uid >= next_uid | ||
487 | * @param random if true, return a random result instead of using next_uid | ||
488 | * @param key maybe NULL (to match all entries) | ||
489 | * @param type entries of which type are relevant? | ||
490 | * Use 0 for any type. | ||
491 | * @param proc function to call on the matching value; | ||
492 | * will be called with NULL if nothing matches | ||
493 | * @param proc_cls closure for @a proc | ||
494 | */ | ||
495 | static void | ||
496 | postgres_plugin_get_key (void *cls, | ||
497 | uint64_t next_uid, | ||
498 | bool random, | ||
499 | const struct GNUNET_HashCode *key, | ||
500 | enum GNUNET_BLOCK_Type type, | ||
501 | PluginDatumProcessor proc, | ||
502 | void *proc_cls) | ||
503 | { | ||
504 | struct Plugin *plugin = cls; | ||
505 | uint32_t utype = type; | ||
506 | uint16_t use_rvalue = random; | ||
507 | uint16_t use_key = NULL != key; | ||
508 | uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type; | ||
509 | uint64_t rvalue; | ||
510 | struct GNUNET_PQ_QueryParam params[] = { | ||
511 | GNUNET_PQ_query_param_uint64 (&next_uid), | ||
512 | GNUNET_PQ_query_param_uint64 (&rvalue), | ||
513 | GNUNET_PQ_query_param_uint16 (&use_rvalue), | ||
514 | GNUNET_PQ_query_param_auto_from_type (key), | ||
515 | GNUNET_PQ_query_param_uint16 (&use_key), | ||
516 | GNUNET_PQ_query_param_uint32 (&utype), | ||
517 | GNUNET_PQ_query_param_uint16 (&use_type), | ||
518 | GNUNET_PQ_query_param_end | ||
519 | }; | ||
520 | struct ProcessResultContext prc; | ||
521 | enum GNUNET_DB_QueryStatus res; | ||
522 | |||
523 | if (random) | ||
524 | { | ||
525 | rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
526 | UINT64_MAX); | ||
527 | next_uid = 0; | ||
528 | } | ||
529 | else | ||
530 | { | ||
531 | rvalue = 0; | ||
532 | } | ||
533 | prc.plugin = plugin; | ||
534 | prc.proc = proc; | ||
535 | prc.proc_cls = proc_cls; | ||
536 | |||
537 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
538 | "get", | ||
539 | params, | ||
540 | &process_result, | ||
541 | &prc); | ||
542 | if (0 > res) | ||
543 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
544 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
545 | } | ||
546 | |||
547 | |||
548 | /** | ||
549 | * Select a subset of the items in the datastore and call | ||
550 | * the given iterator for each of them. | ||
551 | * | ||
552 | * @param cls our `struct Plugin *` | ||
553 | * @param next_uid return the result with lowest uid >= next_uid | ||
554 | * @param type entries of which type should be considered? | ||
555 | * Must not be zero (ANY). | ||
556 | * @param proc function to call on the matching value; | ||
557 | * will be called with NULL if no value matches | ||
558 | * @param proc_cls closure for @a proc | ||
559 | */ | ||
560 | static void | ||
561 | postgres_plugin_get_zero_anonymity (void *cls, | ||
562 | uint64_t next_uid, | ||
563 | enum GNUNET_BLOCK_Type type, | ||
564 | PluginDatumProcessor proc, | ||
565 | void *proc_cls) | ||
566 | { | ||
567 | struct Plugin *plugin = cls; | ||
568 | uint32_t utype = type; | ||
569 | struct GNUNET_PQ_QueryParam params[] = { | ||
570 | GNUNET_PQ_query_param_uint32 (&utype), | ||
571 | GNUNET_PQ_query_param_uint64 (&next_uid), | ||
572 | GNUNET_PQ_query_param_end | ||
573 | }; | ||
574 | struct ProcessResultContext prc; | ||
575 | enum GNUNET_DB_QueryStatus res; | ||
576 | |||
577 | prc.plugin = plugin; | ||
578 | prc.proc = proc; | ||
579 | prc.proc_cls = proc_cls; | ||
580 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
581 | "select_non_anonymous", | ||
582 | params, | ||
583 | &process_result, | ||
584 | &prc); | ||
585 | if (0 > res) | ||
586 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
587 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
588 | } | ||
589 | |||
590 | |||
591 | /** | ||
592 | * Context for #repl_iter() function. | ||
593 | */ | ||
594 | struct ReplCtx | ||
595 | { | ||
596 | /** | ||
597 | * Plugin handle. | ||
598 | */ | ||
599 | struct Plugin *plugin; | ||
600 | |||
601 | /** | ||
602 | * Function to call for the result (or the NULL). | ||
603 | */ | ||
604 | PluginDatumProcessor proc; | ||
605 | |||
606 | /** | ||
607 | * Closure for @e proc. | ||
608 | */ | ||
609 | void *proc_cls; | ||
610 | }; | ||
611 | |||
612 | |||
613 | /** | ||
614 | * Wrapper for the iterator for 'sqlite_plugin_replication_get'. | ||
615 | * Decrements the replication counter and calls the original | ||
616 | * iterator. | ||
617 | * | ||
618 | * @param cls closure with the `struct ReplCtx *` | ||
619 | * @param key key for the content | ||
620 | * @param size number of bytes in @a data | ||
621 | * @param data content stored | ||
622 | * @param type type of the content | ||
623 | * @param priority priority of the content | ||
624 | * @param anonymity anonymity-level for the content | ||
625 | * @param replication replication-level for the content | ||
626 | * @param expiration expiration time for the content | ||
627 | * @param uid unique identifier for the datum; | ||
628 | * maybe 0 if no unique identifier is available | ||
629 | * @return #GNUNET_SYSERR to abort the iteration, | ||
630 | * #GNUNET_OK to continue | ||
631 | * (continue on call to "next", of course), | ||
632 | * #GNUNET_NO to delete the item and continue (if supported) | ||
633 | */ | ||
634 | static int | ||
635 | repl_proc (void *cls, | ||
636 | const struct GNUNET_HashCode *key, | ||
637 | uint32_t size, | ||
638 | const void *data, | ||
639 | enum GNUNET_BLOCK_Type type, | ||
640 | uint32_t priority, | ||
641 | uint32_t anonymity, | ||
642 | uint32_t replication, | ||
643 | struct GNUNET_TIME_Absolute expiration, | ||
644 | uint64_t uid) | ||
645 | { | ||
646 | struct ReplCtx *rc = cls; | ||
647 | struct Plugin *plugin = rc->plugin; | ||
648 | int ret; | ||
649 | uint32_t oid = (uint32_t) uid; | ||
650 | struct GNUNET_PQ_QueryParam params[] = { | ||
651 | GNUNET_PQ_query_param_uint32 (&oid), | ||
652 | GNUNET_PQ_query_param_end | ||
653 | }; | ||
654 | enum GNUNET_DB_QueryStatus qret; | ||
655 | |||
656 | ret = rc->proc (rc->proc_cls, | ||
657 | key, | ||
658 | size, | ||
659 | data, | ||
660 | type, | ||
661 | priority, | ||
662 | anonymity, | ||
663 | replication, | ||
664 | expiration, | ||
665 | uid); | ||
666 | if (NULL == key) | ||
667 | return ret; | ||
668 | qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
669 | "decrepl", | ||
670 | params); | ||
671 | if (0 > qret) | ||
672 | return GNUNET_SYSERR; | ||
673 | return ret; | ||
674 | } | ||
675 | |||
676 | |||
677 | /** | ||
678 | * Get a random item for replication. Returns a single, not expired, | ||
679 | * random item from those with the highest replication counters. The | ||
680 | * item's replication counter is decremented by one IF it was positive | ||
681 | * before. Call @a proc with all values ZERO or NULL if the datastore | ||
682 | * is empty. | ||
683 | * | ||
684 | * @param cls closure with the `struct Plugin` | ||
685 | * @param proc function to call the value (once only). | ||
686 | * @param proc_cls closure for @a proc | ||
687 | */ | ||
688 | static void | ||
689 | postgres_plugin_get_replication (void *cls, | ||
690 | PluginDatumProcessor proc, | ||
691 | void *proc_cls) | ||
692 | { | ||
693 | struct Plugin *plugin = cls; | ||
694 | struct GNUNET_PQ_QueryParam params[] = { | ||
695 | GNUNET_PQ_query_param_end | ||
696 | }; | ||
697 | struct ReplCtx rc; | ||
698 | struct ProcessResultContext prc; | ||
699 | enum GNUNET_DB_QueryStatus res; | ||
700 | |||
701 | rc.plugin = plugin; | ||
702 | rc.proc = proc; | ||
703 | rc.proc_cls = proc_cls; | ||
704 | prc.plugin = plugin; | ||
705 | prc.proc = &repl_proc; | ||
706 | prc.proc_cls = &rc; | ||
707 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
708 | "select_replication_order", | ||
709 | params, | ||
710 | &process_result, | ||
711 | &prc); | ||
712 | if (0 > res) | ||
713 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
714 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
715 | } | ||
716 | |||
717 | |||
718 | /** | ||
719 | * Get a random item for expiration. Call @a proc with all values | ||
720 | * ZERO or NULL if the datastore is empty. | ||
721 | * | ||
722 | * @param cls closure with the `struct Plugin` | ||
723 | * @param proc function to call the value (once only). | ||
724 | * @param proc_cls closure for @a proc | ||
725 | */ | ||
726 | static void | ||
727 | postgres_plugin_get_expiration (void *cls, | ||
728 | PluginDatumProcessor proc, | ||
729 | void *proc_cls) | ||
730 | { | ||
731 | struct Plugin *plugin = cls; | ||
732 | struct GNUNET_TIME_Absolute now = { 0 }; | ||
733 | struct GNUNET_PQ_QueryParam params[] = { | ||
734 | GNUNET_PQ_query_param_absolute_time (&now), | ||
735 | GNUNET_PQ_query_param_end | ||
736 | }; | ||
737 | struct ProcessResultContext prc; | ||
738 | |||
739 | now = GNUNET_TIME_absolute_get (); | ||
740 | prc.plugin = plugin; | ||
741 | prc.proc = proc; | ||
742 | prc.proc_cls = proc_cls; | ||
743 | (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
744 | "select_expiration_order", | ||
745 | params, | ||
746 | &process_result, | ||
747 | &prc); | ||
748 | } | ||
749 | |||
750 | |||
751 | /** | ||
752 | * Closure for #process_keys. | ||
753 | */ | ||
754 | struct ProcessKeysContext | ||
755 | { | ||
756 | /** | ||
757 | * Function to call for each key. | ||
758 | */ | ||
759 | PluginKeyProcessor proc; | ||
760 | |||
761 | /** | ||
762 | * Closure for @e proc. | ||
763 | */ | ||
764 | void *proc_cls; | ||
765 | }; | ||
766 | |||
767 | |||
768 | /** | ||
769 | * Function to be called with the results of a SELECT statement | ||
770 | * that has returned @a num_results results. | ||
771 | * | ||
772 | * @param cls closure with a `struct ProcessKeysContext` | ||
773 | * @param result the postgres result | ||
774 | * @param num_result the number of results in @a result | ||
775 | */ | ||
776 | static void | ||
777 | process_keys (void *cls, | ||
778 | PGresult *result, | ||
779 | unsigned int num_results) | ||
780 | { | ||
781 | struct ProcessKeysContext *pkc = cls; | ||
782 | |||
783 | for (unsigned i = 0; i < num_results; i++) | ||
784 | { | ||
785 | struct GNUNET_HashCode key; | ||
786 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
787 | GNUNET_PQ_result_spec_auto_from_type ("hash", | ||
788 | &key), | ||
789 | GNUNET_PQ_result_spec_end | ||
790 | }; | ||
791 | |||
792 | if (GNUNET_OK != | ||
793 | GNUNET_PQ_extract_result (result, | ||
794 | rs, | ||
795 | i)) | ||
796 | { | ||
797 | GNUNET_break (0); | ||
798 | continue; | ||
799 | } | ||
800 | pkc->proc (pkc->proc_cls, | ||
801 | &key, | ||
802 | 1); | ||
803 | GNUNET_PQ_cleanup_result (rs); | ||
804 | } | ||
805 | } | ||
806 | |||
807 | |||
808 | /** | ||
809 | * Get all of the keys in the datastore. | ||
810 | * | ||
811 | * @param cls closure with the `struct Plugin *` | ||
812 | * @param proc function to call on each key | ||
813 | * @param proc_cls closure for @a proc | ||
814 | */ | ||
815 | static void | ||
816 | postgres_plugin_get_keys (void *cls, | ||
817 | PluginKeyProcessor proc, | ||
818 | void *proc_cls) | ||
819 | { | ||
820 | struct Plugin *plugin = cls; | ||
821 | struct GNUNET_PQ_QueryParam params[] = { | ||
822 | GNUNET_PQ_query_param_end | ||
823 | }; | ||
824 | struct ProcessKeysContext pkc; | ||
825 | |||
826 | pkc.proc = proc; | ||
827 | pkc.proc_cls = proc_cls; | ||
828 | (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
829 | "get_keys", | ||
830 | params, | ||
831 | &process_keys, | ||
832 | &pkc); | ||
833 | proc (proc_cls, | ||
834 | NULL, | ||
835 | 0); | ||
836 | } | ||
837 | |||
838 | |||
839 | /** | ||
840 | * Drop database. | ||
841 | * | ||
842 | * @param cls closure with the `struct Plugin *` | ||
843 | */ | ||
844 | static void | ||
845 | postgres_plugin_drop (void *cls) | ||
846 | { | ||
847 | struct Plugin *plugin = cls; | ||
848 | struct GNUNET_PQ_ExecuteStatement es[] = { | ||
849 | GNUNET_PQ_make_execute ("DROP TABLE gn090"), | ||
850 | GNUNET_PQ_EXECUTE_STATEMENT_END | ||
851 | }; | ||
852 | |||
853 | if (GNUNET_OK != | ||
854 | GNUNET_PQ_exec_statements (plugin->dbh, | ||
855 | es)) | ||
856 | GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, | ||
857 | "postgres", | ||
858 | _ ("Failed to drop table from database.\n")); | ||
859 | } | ||
860 | |||
861 | |||
862 | /** | ||
863 | * Remove a particular key in the datastore. | ||
864 | * | ||
865 | * @param cls closure | ||
866 | * @param key key for the content | ||
867 | * @param size number of bytes in data | ||
868 | * @param data content stored | ||
869 | * @param cont continuation called with success or failure status | ||
870 | * @param cont_cls continuation closure for @a cont | ||
871 | */ | ||
872 | static void | ||
873 | postgres_plugin_remove_key (void *cls, | ||
874 | const struct GNUNET_HashCode *key, | ||
875 | uint32_t size, | ||
876 | const void *data, | ||
877 | PluginRemoveCont cont, | ||
878 | void *cont_cls) | ||
879 | { | ||
880 | struct Plugin *plugin = cls; | ||
881 | enum GNUNET_DB_QueryStatus ret; | ||
882 | struct GNUNET_PQ_QueryParam params[] = { | ||
883 | GNUNET_PQ_query_param_auto_from_type (key), | ||
884 | GNUNET_PQ_query_param_fixed_size (data, size), | ||
885 | GNUNET_PQ_query_param_end | ||
886 | }; | ||
887 | |||
888 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
889 | "remove", | ||
890 | params); | ||
891 | if (0 > ret) | ||
892 | { | ||
893 | cont (cont_cls, | ||
894 | key, | ||
895 | size, | ||
896 | GNUNET_SYSERR, | ||
897 | _ ("Postgresql exec failure")); | ||
898 | return; | ||
899 | } | ||
900 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret) | ||
901 | { | ||
902 | cont (cont_cls, | ||
903 | key, | ||
904 | size, | ||
905 | GNUNET_NO, | ||
906 | NULL); | ||
907 | return; | ||
908 | } | ||
909 | plugin->env->duc (plugin->env->cls, | ||
910 | -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); | ||
911 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
912 | "datastore-postgres", | ||
913 | "Deleted %u bytes from database\n", | ||
914 | (unsigned int) size); | ||
915 | cont (cont_cls, | ||
916 | key, | ||
917 | size, | ||
918 | GNUNET_OK, | ||
919 | NULL); | ||
920 | } | ||
921 | |||
922 | |||
923 | /** | ||
924 | * Entry point for the plugin. | ||
925 | * | ||
926 | * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*` | ||
927 | * @return our `struct Plugin *` | ||
928 | */ | ||
929 | void * | ||
930 | libgnunet_plugin_datastore_postgres_init (void *cls) | ||
931 | { | ||
932 | struct GNUNET_DATASTORE_PluginEnvironment *env = cls; | ||
933 | struct GNUNET_DATASTORE_PluginFunctions *api; | ||
934 | struct Plugin *plugin; | ||
935 | |||
936 | plugin = GNUNET_new (struct Plugin); | ||
937 | plugin->env = env; | ||
938 | if (GNUNET_OK != init_connection (plugin)) | ||
939 | { | ||
940 | GNUNET_free (plugin); | ||
941 | return NULL; | ||
942 | } | ||
943 | api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions); | ||
944 | api->cls = plugin; | ||
945 | api->estimate_size = &postgres_plugin_estimate_size; | ||
946 | api->put = &postgres_plugin_put; | ||
947 | api->get_key = &postgres_plugin_get_key; | ||
948 | api->get_replication = &postgres_plugin_get_replication; | ||
949 | api->get_expiration = &postgres_plugin_get_expiration; | ||
950 | api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity; | ||
951 | api->get_keys = &postgres_plugin_get_keys; | ||
952 | api->drop = &postgres_plugin_drop; | ||
953 | api->remove_key = &postgres_plugin_remove_key; | ||
954 | GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, | ||
955 | "datastore-postgres", | ||
956 | _ ("Postgres database running\n")); | ||
957 | return api; | ||
958 | } | ||
959 | |||
960 | |||
961 | /** | ||
962 | * Exit point from the plugin. | ||
963 | * | ||
964 | * @param cls our `struct Plugin *` | ||
965 | * @return always NULL | ||
966 | */ | ||
967 | void * | ||
968 | libgnunet_plugin_datastore_postgres_done (void *cls) | ||
969 | { | ||
970 | struct GNUNET_DATASTORE_PluginFunctions *api = cls; | ||
971 | struct Plugin *plugin = api->cls; | ||
972 | |||
973 | GNUNET_PQ_disconnect (plugin->dbh); | ||
974 | GNUNET_free (plugin); | ||
975 | GNUNET_free (api); | ||
976 | return NULL; | ||
977 | } | ||
978 | |||
979 | |||
980 | /* end of plugin_datastore_postgres.c */ | ||