diff options
Diffstat (limited to 'src/plugin/datastore/plugin_datastore_postgres.c')
-rw-r--r-- | src/plugin/datastore/plugin_datastore_postgres.c | 930 |
1 files changed, 930 insertions, 0 deletions
diff --git a/src/plugin/datastore/plugin_datastore_postgres.c b/src/plugin/datastore/plugin_datastore_postgres.c new file mode 100644 index 000000000..5fcacc17b --- /dev/null +++ b/src/plugin/datastore/plugin_datastore_postgres.c | |||
@@ -0,0 +1,930 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | Copyright (C) 2009-2017, 2022 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file datastore/plugin_datastore_postgres.c | ||
23 | * @brief postgres-based datastore backend | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_datastore_plugin.h" | ||
28 | #include "gnunet_pq_lib.h" | ||
29 | |||
30 | |||
31 | /** | ||
32 | * After how many ms "busy" should a DB operation fail for good? | ||
33 | * A low value makes sure that we are more responsive to requests | ||
34 | * (especially PUTs). A high value guarantees a higher success | ||
35 | * rate (SELECTs in iterate can take several seconds despite LIMIT=1). | ||
36 | * | ||
37 | * The default value of 1s should ensure that users do not experience | ||
38 | * huge latencies while at the same time allowing operations to succeed | ||
39 | * with reasonable probability. | ||
40 | */ | ||
41 | #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS | ||
42 | |||
43 | |||
44 | /** | ||
45 | * Context for all functions in this plugin. | ||
46 | */ | ||
47 | struct Plugin | ||
48 | { | ||
49 | /** | ||
50 | * Our execution environment. | ||
51 | */ | ||
52 | struct GNUNET_DATASTORE_PluginEnvironment *env; | ||
53 | |||
54 | /** | ||
55 | * Native Postgres database handle. | ||
56 | */ | ||
57 | struct GNUNET_PQ_Context *dbh; | ||
58 | }; | ||
59 | |||
60 | |||
61 | /** | ||
62 | * @brief Get a database handle | ||
63 | * | ||
64 | * @param plugin global context | ||
65 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on error | ||
66 | */ | ||
67 | static enum GNUNET_GenericReturnValue | ||
68 | init_connection (struct Plugin *plugin) | ||
69 | { | ||
70 | #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid" | ||
71 | struct GNUNET_PQ_PreparedStatement ps[] = { | ||
72 | GNUNET_PQ_make_prepare ("get", | ||
73 | "SELECT " RESULT_COLUMNS | ||
74 | " FROM datastore.gn090" | ||
75 | " WHERE oid >= $1::bigint AND" | ||
76 | " (rvalue >= $2 OR 0 = $3::smallint) AND" | ||
77 | " (hash = $4 OR 0 = $5::smallint) AND" | ||
78 | " (type = $6 OR 0 = $7::smallint)" | ||
79 | " ORDER BY oid ASC LIMIT 1"), | ||
80 | GNUNET_PQ_make_prepare ("put", | ||
81 | "INSERT INTO datastore.gn090" | ||
82 | " (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) " | ||
83 | "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"), | ||
84 | GNUNET_PQ_make_prepare ("update", | ||
85 | "UPDATE datastore.gn090" | ||
86 | " SET prio = prio + $1," | ||
87 | " repl = repl + $2," | ||
88 | " expire = GREATEST(expire, $3)" | ||
89 | " WHERE hash = $4 AND vhash = $5"), | ||
90 | GNUNET_PQ_make_prepare ("decrepl", | ||
91 | "UPDATE datastore.gn090" | ||
92 | " SET repl = GREATEST (repl - 1, 0)" | ||
93 | " WHERE oid = $1"), | ||
94 | GNUNET_PQ_make_prepare ("select_non_anonymous", | ||
95 | "SELECT " RESULT_COLUMNS | ||
96 | " FROM datastore.gn090" | ||
97 | " WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint" | ||
98 | " ORDER BY oid ASC LIMIT 1"), | ||
99 | GNUNET_PQ_make_prepare ("select_expiration_order", | ||
100 | "(SELECT " RESULT_COLUMNS | ||
101 | " FROM datastore.gn090" | ||
102 | " WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " | ||
103 | "UNION " | ||
104 | "(SELECT " RESULT_COLUMNS | ||
105 | " FROM datastore.gn090" | ||
106 | " ORDER BY prio ASC LIMIT 1)" | ||
107 | " ORDER BY expire ASC LIMIT 1"), | ||
108 | GNUNET_PQ_make_prepare ("select_replication_order", | ||
109 | "SELECT " RESULT_COLUMNS | ||
110 | " FROM datastore.gn090" | ||
111 | " ORDER BY repl DESC,RANDOM() LIMIT 1"), | ||
112 | GNUNET_PQ_make_prepare ("delrow", | ||
113 | "DELETE FROM datastore.gn090" | ||
114 | " WHERE oid=$1"), | ||
115 | GNUNET_PQ_make_prepare ("remove", | ||
116 | "DELETE FROM datastore.gn090" | ||
117 | " WHERE hash = $1 AND" | ||
118 | " value = $2"), | ||
119 | GNUNET_PQ_make_prepare ("get_keys", | ||
120 | "SELECT hash" | ||
121 | " FROM datastore.gn090"), | ||
122 | GNUNET_PQ_make_prepare ("estimate_size", | ||
123 | "SELECT CASE WHEN NOT EXISTS" | ||
124 | " (SELECT 1 FROM datastore.gn090)" | ||
125 | " THEN 0" | ||
126 | " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*)" | ||
127 | " FROM datastore.gn090)" | ||
128 | "END AS total"), | ||
129 | GNUNET_PQ_PREPARED_STATEMENT_END | ||
130 | }; | ||
131 | #undef RESULT_COLUMNS | ||
132 | |||
133 | plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg, | ||
134 | "datastore-postgres", | ||
135 | "datastore-", | ||
136 | NULL, | ||
137 | ps); | ||
138 | if (NULL == plugin->dbh) | ||
139 | return GNUNET_SYSERR; | ||
140 | return GNUNET_OK; | ||
141 | } | ||
142 | |||
143 | |||
144 | /** | ||
145 | * Get an estimate of how much space the database is | ||
146 | * currently using. | ||
147 | * | ||
148 | * @param cls our `struct Plugin *` | ||
149 | * @return number of bytes used on disk | ||
150 | */ | ||
151 | static void | ||
152 | postgres_plugin_estimate_size (void *cls, | ||
153 | unsigned long long *estimate) | ||
154 | { | ||
155 | struct Plugin *plugin = cls; | ||
156 | uint64_t total; | ||
157 | struct GNUNET_PQ_QueryParam params[] = { | ||
158 | GNUNET_PQ_query_param_end | ||
159 | }; | ||
160 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
161 | GNUNET_PQ_result_spec_uint64 ("total", | ||
162 | &total), | ||
163 | GNUNET_PQ_result_spec_end | ||
164 | }; | ||
165 | enum GNUNET_DB_QueryStatus ret; | ||
166 | |||
167 | if (NULL == estimate) | ||
168 | return; | ||
169 | ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, | ||
170 | "estimate_size", | ||
171 | params, | ||
172 | rs); | ||
173 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret) | ||
174 | { | ||
175 | *estimate = 0LL; | ||
176 | return; | ||
177 | } | ||
178 | *estimate = total; | ||
179 | } | ||
180 | |||
181 | |||
182 | /** | ||
183 | * Store an item in the datastore. | ||
184 | * | ||
185 | * @param cls closure with the `struct Plugin` | ||
186 | * @param key key for the item | ||
187 | * @param absent true if the key was not found in the bloom filter | ||
188 | * @param size number of bytes in data | ||
189 | * @param data content stored | ||
190 | * @param type type of the content | ||
191 | * @param priority priority of the content | ||
192 | * @param anonymity anonymity-level for the content | ||
193 | * @param replication replication-level for the content | ||
194 | * @param expiration expiration time for the content | ||
195 | * @param cont continuation called with success or failure status | ||
196 | * @param cont_cls continuation closure | ||
197 | */ | ||
198 | static void | ||
199 | postgres_plugin_put (void *cls, | ||
200 | const struct GNUNET_HashCode *key, | ||
201 | bool absent, | ||
202 | uint32_t size, | ||
203 | const void *data, | ||
204 | enum GNUNET_BLOCK_Type type, | ||
205 | uint32_t priority, | ||
206 | uint32_t anonymity, | ||
207 | uint32_t replication, | ||
208 | struct GNUNET_TIME_Absolute expiration, | ||
209 | PluginPutCont cont, | ||
210 | void *cont_cls) | ||
211 | { | ||
212 | struct Plugin *plugin = cls; | ||
213 | struct GNUNET_HashCode vhash; | ||
214 | enum GNUNET_DB_QueryStatus ret; | ||
215 | |||
216 | GNUNET_CRYPTO_hash (data, | ||
217 | size, | ||
218 | &vhash); | ||
219 | if (! absent) | ||
220 | { | ||
221 | struct GNUNET_PQ_QueryParam params[] = { | ||
222 | GNUNET_PQ_query_param_uint32 (&priority), | ||
223 | GNUNET_PQ_query_param_uint32 (&replication), | ||
224 | GNUNET_PQ_query_param_absolute_time (&expiration), | ||
225 | GNUNET_PQ_query_param_auto_from_type (key), | ||
226 | GNUNET_PQ_query_param_auto_from_type (&vhash), | ||
227 | GNUNET_PQ_query_param_end | ||
228 | }; | ||
229 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
230 | "update", | ||
231 | params); | ||
232 | if (0 > ret) | ||
233 | { | ||
234 | cont (cont_cls, | ||
235 | key, | ||
236 | size, | ||
237 | GNUNET_SYSERR, | ||
238 | _ ("Postgresql exec failure")); | ||
239 | return; | ||
240 | } | ||
241 | bool affected = (0 != ret); | ||
242 | if (affected) | ||
243 | { | ||
244 | cont (cont_cls, | ||
245 | key, | ||
246 | size, | ||
247 | GNUNET_NO, | ||
248 | NULL); | ||
249 | return; | ||
250 | } | ||
251 | } | ||
252 | |||
253 | { | ||
254 | uint32_t utype = (uint32_t) type; | ||
255 | uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
256 | UINT64_MAX); | ||
257 | struct GNUNET_PQ_QueryParam params[] = { | ||
258 | GNUNET_PQ_query_param_uint32 (&replication), | ||
259 | GNUNET_PQ_query_param_uint32 (&utype), | ||
260 | GNUNET_PQ_query_param_uint32 (&priority), | ||
261 | GNUNET_PQ_query_param_uint32 (&anonymity), | ||
262 | GNUNET_PQ_query_param_absolute_time (&expiration), | ||
263 | GNUNET_PQ_query_param_uint64 (&rvalue), | ||
264 | GNUNET_PQ_query_param_auto_from_type (key), | ||
265 | GNUNET_PQ_query_param_auto_from_type (&vhash), | ||
266 | GNUNET_PQ_query_param_fixed_size (data, size), | ||
267 | GNUNET_PQ_query_param_end | ||
268 | }; | ||
269 | |||
270 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
271 | "put", | ||
272 | params); | ||
273 | if (0 > ret) | ||
274 | { | ||
275 | cont (cont_cls, | ||
276 | key, | ||
277 | size, | ||
278 | GNUNET_SYSERR, | ||
279 | "Postgresql exec failure"); | ||
280 | return; | ||
281 | } | ||
282 | } | ||
283 | plugin->env->duc (plugin->env->cls, | ||
284 | size + GNUNET_DATASTORE_ENTRY_OVERHEAD); | ||
285 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
286 | "datastore-postgres", | ||
287 | "Stored %u bytes in database\n", | ||
288 | (unsigned int) size); | ||
289 | cont (cont_cls, | ||
290 | key, | ||
291 | size, | ||
292 | GNUNET_OK, | ||
293 | NULL); | ||
294 | } | ||
295 | |||
296 | |||
297 | /** | ||
298 | * Closure for #process_result. | ||
299 | */ | ||
300 | struct ProcessResultContext | ||
301 | { | ||
302 | /** | ||
303 | * The plugin handle. | ||
304 | */ | ||
305 | struct Plugin *plugin; | ||
306 | |||
307 | /** | ||
308 | * Function to call on each result. | ||
309 | */ | ||
310 | PluginDatumProcessor proc; | ||
311 | |||
312 | /** | ||
313 | * Closure for @e proc. | ||
314 | */ | ||
315 | void *proc_cls; | ||
316 | }; | ||
317 | |||
318 | |||
319 | /** | ||
320 | * Function invoked to process the result and call the processor of @a | ||
321 | * cls. | ||
322 | * | ||
323 | * @param cls our `struct ProcessResultContext` | ||
324 | * @param res result from exec | ||
325 | * @param num_results number of results in @a res | ||
326 | */ | ||
327 | static void | ||
328 | process_result (void *cls, | ||
329 | PGresult *res, | ||
330 | unsigned int num_results) | ||
331 | { | ||
332 | struct ProcessResultContext *prc = cls; | ||
333 | struct Plugin *plugin = prc->plugin; | ||
334 | |||
335 | if (0 == num_results) | ||
336 | { | ||
337 | /* no result */ | ||
338 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
339 | "datastore-postgres", | ||
340 | "Ending iteration (no more results)\n"); | ||
341 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
342 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
343 | return; | ||
344 | } | ||
345 | if (1 != num_results) | ||
346 | { | ||
347 | GNUNET_break (0); | ||
348 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
349 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
350 | return; | ||
351 | } | ||
352 | /* Technically we don't need the loop here, but nicer in case | ||
353 | we ever relax the condition above. */ | ||
354 | for (unsigned int i = 0; i < num_results; i++) | ||
355 | { | ||
356 | int iret; | ||
357 | uint64_t rowid; | ||
358 | uint32_t utype; | ||
359 | uint32_t anonymity; | ||
360 | uint32_t replication; | ||
361 | uint32_t priority; | ||
362 | size_t size; | ||
363 | void *data; | ||
364 | struct GNUNET_TIME_Absolute expiration_time; | ||
365 | struct GNUNET_HashCode key; | ||
366 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
367 | GNUNET_PQ_result_spec_uint32 ("repl", &replication), | ||
368 | GNUNET_PQ_result_spec_uint32 ("type", &utype), | ||
369 | GNUNET_PQ_result_spec_uint32 ("prio", &priority), | ||
370 | GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), | ||
371 | GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), | ||
372 | GNUNET_PQ_result_spec_auto_from_type ("hash", &key), | ||
373 | GNUNET_PQ_result_spec_variable_size ("value", &data, &size), | ||
374 | GNUNET_PQ_result_spec_uint64 ("oid", &rowid), | ||
375 | GNUNET_PQ_result_spec_end | ||
376 | }; | ||
377 | |||
378 | if (GNUNET_OK != | ||
379 | GNUNET_PQ_extract_result (res, | ||
380 | rs, | ||
381 | i)) | ||
382 | { | ||
383 | GNUNET_break (0); | ||
384 | prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
385 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
386 | return; | ||
387 | } | ||
388 | |||
389 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
390 | "datastore-postgres", | ||
391 | "Found result of size %u bytes and type %u in database\n", | ||
392 | (unsigned int) size, | ||
393 | (unsigned int) utype); | ||
394 | iret = prc->proc (prc->proc_cls, | ||
395 | &key, | ||
396 | size, | ||
397 | data, | ||
398 | (enum GNUNET_BLOCK_Type) utype, | ||
399 | priority, | ||
400 | anonymity, | ||
401 | replication, | ||
402 | expiration_time, | ||
403 | rowid); | ||
404 | if (iret == GNUNET_NO) | ||
405 | { | ||
406 | struct GNUNET_PQ_QueryParam param[] = { | ||
407 | GNUNET_PQ_query_param_uint64 (&rowid), | ||
408 | GNUNET_PQ_query_param_end | ||
409 | }; | ||
410 | |||
411 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
412 | "Processor asked for item %u to be removed.\n", | ||
413 | (unsigned int) rowid); | ||
414 | if (0 < | ||
415 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
416 | "delrow", | ||
417 | param)) | ||
418 | { | ||
419 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
420 | "datastore-postgres", | ||
421 | "Deleting %u bytes from database\n", | ||
422 | (unsigned int) size); | ||
423 | plugin->env->duc (plugin->env->cls, | ||
424 | -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); | ||
425 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
426 | "datastore-postgres", | ||
427 | "Deleted %u bytes from database\n", | ||
428 | (unsigned int) size); | ||
429 | } | ||
430 | } | ||
431 | GNUNET_PQ_cleanup_result (rs); | ||
432 | } /* for (i) */ | ||
433 | } | ||
434 | |||
435 | |||
436 | /** | ||
437 | * Get one of the results for a particular key in the datastore. | ||
438 | * | ||
439 | * @param cls closure with the `struct Plugin` | ||
440 | * @param next_uid return the result with lowest uid >= next_uid | ||
441 | * @param random if true, return a random result instead of using next_uid | ||
442 | * @param key maybe NULL (to match all entries) | ||
443 | * @param type entries of which type are relevant? | ||
444 | * Use 0 for any type. | ||
445 | * @param proc function to call on the matching value; | ||
446 | * will be called with NULL if nothing matches | ||
447 | * @param proc_cls closure for @a proc | ||
448 | */ | ||
449 | static void | ||
450 | postgres_plugin_get_key (void *cls, | ||
451 | uint64_t next_uid, | ||
452 | bool random, | ||
453 | const struct GNUNET_HashCode *key, | ||
454 | enum GNUNET_BLOCK_Type type, | ||
455 | PluginDatumProcessor proc, | ||
456 | void *proc_cls) | ||
457 | { | ||
458 | struct Plugin *plugin = cls; | ||
459 | uint32_t utype = type; | ||
460 | uint16_t use_rvalue = random; | ||
461 | uint16_t use_key = NULL != key; | ||
462 | uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type; | ||
463 | uint64_t rvalue; | ||
464 | struct GNUNET_PQ_QueryParam params[] = { | ||
465 | GNUNET_PQ_query_param_uint64 (&next_uid), | ||
466 | GNUNET_PQ_query_param_uint64 (&rvalue), | ||
467 | GNUNET_PQ_query_param_uint16 (&use_rvalue), | ||
468 | GNUNET_PQ_query_param_auto_from_type (key), | ||
469 | GNUNET_PQ_query_param_uint16 (&use_key), | ||
470 | GNUNET_PQ_query_param_uint32 (&utype), | ||
471 | GNUNET_PQ_query_param_uint16 (&use_type), | ||
472 | GNUNET_PQ_query_param_end | ||
473 | }; | ||
474 | struct ProcessResultContext prc; | ||
475 | enum GNUNET_DB_QueryStatus res; | ||
476 | |||
477 | if (random) | ||
478 | { | ||
479 | rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
480 | UINT64_MAX); | ||
481 | next_uid = 0; | ||
482 | } | ||
483 | else | ||
484 | { | ||
485 | rvalue = 0; | ||
486 | } | ||
487 | prc.plugin = plugin; | ||
488 | prc.proc = proc; | ||
489 | prc.proc_cls = proc_cls; | ||
490 | |||
491 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
492 | "get", | ||
493 | params, | ||
494 | &process_result, | ||
495 | &prc); | ||
496 | if (0 > res) | ||
497 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
498 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
499 | } | ||
500 | |||
501 | |||
502 | /** | ||
503 | * Select a subset of the items in the datastore and call | ||
504 | * the given iterator for each of them. | ||
505 | * | ||
506 | * @param cls our `struct Plugin *` | ||
507 | * @param next_uid return the result with lowest uid >= next_uid | ||
508 | * @param type entries of which type should be considered? | ||
509 | * Must not be zero (ANY). | ||
510 | * @param proc function to call on the matching value; | ||
511 | * will be called with NULL if no value matches | ||
512 | * @param proc_cls closure for @a proc | ||
513 | */ | ||
514 | static void | ||
515 | postgres_plugin_get_zero_anonymity (void *cls, | ||
516 | uint64_t next_uid, | ||
517 | enum GNUNET_BLOCK_Type type, | ||
518 | PluginDatumProcessor proc, | ||
519 | void *proc_cls) | ||
520 | { | ||
521 | struct Plugin *plugin = cls; | ||
522 | uint32_t utype = type; | ||
523 | struct GNUNET_PQ_QueryParam params[] = { | ||
524 | GNUNET_PQ_query_param_uint32 (&utype), | ||
525 | GNUNET_PQ_query_param_uint64 (&next_uid), | ||
526 | GNUNET_PQ_query_param_end | ||
527 | }; | ||
528 | struct ProcessResultContext prc; | ||
529 | enum GNUNET_DB_QueryStatus res; | ||
530 | |||
531 | prc.plugin = plugin; | ||
532 | prc.proc = proc; | ||
533 | prc.proc_cls = proc_cls; | ||
534 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
535 | "select_non_anonymous", | ||
536 | params, | ||
537 | &process_result, | ||
538 | &prc); | ||
539 | if (0 > res) | ||
540 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
541 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
542 | } | ||
543 | |||
544 | |||
545 | /** | ||
546 | * Context for #repl_iter() function. | ||
547 | */ | ||
548 | struct ReplCtx | ||
549 | { | ||
550 | /** | ||
551 | * Plugin handle. | ||
552 | */ | ||
553 | struct Plugin *plugin; | ||
554 | |||
555 | /** | ||
556 | * Function to call for the result (or the NULL). | ||
557 | */ | ||
558 | PluginDatumProcessor proc; | ||
559 | |||
560 | /** | ||
561 | * Closure for @e proc. | ||
562 | */ | ||
563 | void *proc_cls; | ||
564 | }; | ||
565 | |||
566 | |||
567 | /** | ||
568 | * Wrapper for the iterator for 'sqlite_plugin_replication_get'. | ||
569 | * Decrements the replication counter and calls the original | ||
570 | * iterator. | ||
571 | * | ||
572 | * @param cls closure with the `struct ReplCtx *` | ||
573 | * @param key key for the content | ||
574 | * @param size number of bytes in @a data | ||
575 | * @param data content stored | ||
576 | * @param type type of the content | ||
577 | * @param priority priority of the content | ||
578 | * @param anonymity anonymity-level for the content | ||
579 | * @param replication replication-level for the content | ||
580 | * @param expiration expiration time for the content | ||
581 | * @param uid unique identifier for the datum; | ||
582 | * maybe 0 if no unique identifier is available | ||
583 | * @return #GNUNET_SYSERR to abort the iteration, | ||
584 | * #GNUNET_OK to continue | ||
585 | * (continue on call to "next", of course), | ||
586 | * #GNUNET_NO to delete the item and continue (if supported) | ||
587 | */ | ||
588 | static int | ||
589 | repl_proc (void *cls, | ||
590 | const struct GNUNET_HashCode *key, | ||
591 | uint32_t size, | ||
592 | const void *data, | ||
593 | enum GNUNET_BLOCK_Type type, | ||
594 | uint32_t priority, | ||
595 | uint32_t anonymity, | ||
596 | uint32_t replication, | ||
597 | struct GNUNET_TIME_Absolute expiration, | ||
598 | uint64_t uid) | ||
599 | { | ||
600 | struct ReplCtx *rc = cls; | ||
601 | struct Plugin *plugin = rc->plugin; | ||
602 | int ret; | ||
603 | struct GNUNET_PQ_QueryParam params[] = { | ||
604 | GNUNET_PQ_query_param_uint64 (&uid), | ||
605 | GNUNET_PQ_query_param_end | ||
606 | }; | ||
607 | enum GNUNET_DB_QueryStatus qret; | ||
608 | |||
609 | ret = rc->proc (rc->proc_cls, | ||
610 | key, | ||
611 | size, | ||
612 | data, | ||
613 | type, | ||
614 | priority, | ||
615 | anonymity, | ||
616 | replication, | ||
617 | expiration, | ||
618 | uid); | ||
619 | if (NULL == key) | ||
620 | return ret; | ||
621 | qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
622 | "decrepl", | ||
623 | params); | ||
624 | if (0 > qret) | ||
625 | return GNUNET_SYSERR; | ||
626 | return ret; | ||
627 | } | ||
628 | |||
629 | |||
630 | /** | ||
631 | * Get a random item for replication. Returns a single, not expired, | ||
632 | * random item from those with the highest replication counters. The | ||
633 | * item's replication counter is decremented by one IF it was positive | ||
634 | * before. Call @a proc with all values ZERO or NULL if the datastore | ||
635 | * is empty. | ||
636 | * | ||
637 | * @param cls closure with the `struct Plugin` | ||
638 | * @param proc function to call the value (once only). | ||
639 | * @param proc_cls closure for @a proc | ||
640 | */ | ||
641 | static void | ||
642 | postgres_plugin_get_replication (void *cls, | ||
643 | PluginDatumProcessor proc, | ||
644 | void *proc_cls) | ||
645 | { | ||
646 | struct Plugin *plugin = cls; | ||
647 | struct GNUNET_PQ_QueryParam params[] = { | ||
648 | GNUNET_PQ_query_param_end | ||
649 | }; | ||
650 | struct ReplCtx rc; | ||
651 | struct ProcessResultContext prc; | ||
652 | enum GNUNET_DB_QueryStatus res; | ||
653 | |||
654 | rc.plugin = plugin; | ||
655 | rc.proc = proc; | ||
656 | rc.proc_cls = proc_cls; | ||
657 | prc.plugin = plugin; | ||
658 | prc.proc = &repl_proc; | ||
659 | prc.proc_cls = &rc; | ||
660 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
661 | "select_replication_order", | ||
662 | params, | ||
663 | &process_result, | ||
664 | &prc); | ||
665 | if (0 > res) | ||
666 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, | ||
667 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
668 | } | ||
669 | |||
670 | |||
671 | /** | ||
672 | * Get a random item for expiration. Call @a proc with all values | ||
673 | * ZERO or NULL if the datastore is empty. | ||
674 | * | ||
675 | * @param cls closure with the `struct Plugin` | ||
676 | * @param proc function to call the value (once only). | ||
677 | * @param proc_cls closure for @a proc | ||
678 | */ | ||
679 | static void | ||
680 | postgres_plugin_get_expiration (void *cls, | ||
681 | PluginDatumProcessor proc, | ||
682 | void *proc_cls) | ||
683 | { | ||
684 | struct Plugin *plugin = cls; | ||
685 | struct GNUNET_TIME_Absolute now = { 0 }; | ||
686 | struct GNUNET_PQ_QueryParam params[] = { | ||
687 | GNUNET_PQ_query_param_absolute_time (&now), | ||
688 | GNUNET_PQ_query_param_end | ||
689 | }; | ||
690 | struct ProcessResultContext prc; | ||
691 | |||
692 | now = GNUNET_TIME_absolute_get (); | ||
693 | prc.plugin = plugin; | ||
694 | prc.proc = proc; | ||
695 | prc.proc_cls = proc_cls; | ||
696 | (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
697 | "select_expiration_order", | ||
698 | params, | ||
699 | &process_result, | ||
700 | &prc); | ||
701 | } | ||
702 | |||
703 | |||
704 | /** | ||
705 | * Closure for #process_keys. | ||
706 | */ | ||
707 | struct ProcessKeysContext | ||
708 | { | ||
709 | /** | ||
710 | * Function to call for each key. | ||
711 | */ | ||
712 | PluginKeyProcessor proc; | ||
713 | |||
714 | /** | ||
715 | * Closure for @e proc. | ||
716 | */ | ||
717 | void *proc_cls; | ||
718 | }; | ||
719 | |||
720 | |||
721 | /** | ||
722 | * Function to be called with the results of a SELECT statement | ||
723 | * that has returned @a num_results results. | ||
724 | * | ||
725 | * @param cls closure with a `struct ProcessKeysContext` | ||
726 | * @param result the postgres result | ||
727 | * @param num_results the number of results in @a result | ||
728 | */ | ||
729 | static void | ||
730 | process_keys (void *cls, | ||
731 | PGresult *result, | ||
732 | unsigned int num_results) | ||
733 | { | ||
734 | struct ProcessKeysContext *pkc = cls; | ||
735 | |||
736 | for (unsigned i = 0; i < num_results; i++) | ||
737 | { | ||
738 | struct GNUNET_HashCode key; | ||
739 | struct GNUNET_PQ_ResultSpec rs[] = { | ||
740 | GNUNET_PQ_result_spec_auto_from_type ("hash", | ||
741 | &key), | ||
742 | GNUNET_PQ_result_spec_end | ||
743 | }; | ||
744 | |||
745 | if (GNUNET_OK != | ||
746 | GNUNET_PQ_extract_result (result, | ||
747 | rs, | ||
748 | i)) | ||
749 | { | ||
750 | GNUNET_break (0); | ||
751 | continue; | ||
752 | } | ||
753 | pkc->proc (pkc->proc_cls, | ||
754 | &key, | ||
755 | 1); | ||
756 | GNUNET_PQ_cleanup_result (rs); | ||
757 | } | ||
758 | } | ||
759 | |||
760 | |||
761 | /** | ||
762 | * Get all of the keys in the datastore. | ||
763 | * | ||
764 | * @param cls closure with the `struct Plugin *` | ||
765 | * @param proc function to call on each key | ||
766 | * @param proc_cls closure for @a proc | ||
767 | */ | ||
768 | static void | ||
769 | postgres_plugin_get_keys (void *cls, | ||
770 | PluginKeyProcessor proc, | ||
771 | void *proc_cls) | ||
772 | { | ||
773 | struct Plugin *plugin = cls; | ||
774 | struct GNUNET_PQ_QueryParam params[] = { | ||
775 | GNUNET_PQ_query_param_end | ||
776 | }; | ||
777 | struct ProcessKeysContext pkc; | ||
778 | |||
779 | pkc.proc = proc; | ||
780 | pkc.proc_cls = proc_cls; | ||
781 | (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, | ||
782 | "get_keys", | ||
783 | params, | ||
784 | &process_keys, | ||
785 | &pkc); | ||
786 | proc (proc_cls, | ||
787 | NULL, | ||
788 | 0); | ||
789 | } | ||
790 | |||
791 | |||
792 | /** | ||
793 | * Drop database. | ||
794 | * | ||
795 | * @param cls closure with the `struct Plugin *` | ||
796 | */ | ||
797 | static void | ||
798 | postgres_plugin_drop (void *cls) | ||
799 | { | ||
800 | struct Plugin *plugin = cls; | ||
801 | struct GNUNET_PQ_ExecuteStatement es[] = { | ||
802 | GNUNET_PQ_make_execute ("DROP TABLE gn090"), | ||
803 | GNUNET_PQ_EXECUTE_STATEMENT_END | ||
804 | }; | ||
805 | |||
806 | if (GNUNET_OK != | ||
807 | GNUNET_PQ_exec_statements (plugin->dbh, | ||
808 | es)) | ||
809 | GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, | ||
810 | "postgres", | ||
811 | _ ("Failed to drop table from database.\n")); | ||
812 | } | ||
813 | |||
814 | |||
815 | /** | ||
816 | * Remove a particular key in the datastore. | ||
817 | * | ||
818 | * @param cls closure | ||
819 | * @param key key for the content | ||
820 | * @param size number of bytes in data | ||
821 | * @param data content stored | ||
822 | * @param cont continuation called with success or failure status | ||
823 | * @param cont_cls continuation closure for @a cont | ||
824 | */ | ||
825 | static void | ||
826 | postgres_plugin_remove_key (void *cls, | ||
827 | const struct GNUNET_HashCode *key, | ||
828 | uint32_t size, | ||
829 | const void *data, | ||
830 | PluginRemoveCont cont, | ||
831 | void *cont_cls) | ||
832 | { | ||
833 | struct Plugin *plugin = cls; | ||
834 | enum GNUNET_DB_QueryStatus ret; | ||
835 | struct GNUNET_PQ_QueryParam params[] = { | ||
836 | GNUNET_PQ_query_param_auto_from_type (key), | ||
837 | GNUNET_PQ_query_param_fixed_size (data, size), | ||
838 | GNUNET_PQ_query_param_end | ||
839 | }; | ||
840 | |||
841 | ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh, | ||
842 | "remove", | ||
843 | params); | ||
844 | if (0 > ret) | ||
845 | { | ||
846 | cont (cont_cls, | ||
847 | key, | ||
848 | size, | ||
849 | GNUNET_SYSERR, | ||
850 | _ ("Postgresql exec failure")); | ||
851 | return; | ||
852 | } | ||
853 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret) | ||
854 | { | ||
855 | cont (cont_cls, | ||
856 | key, | ||
857 | size, | ||
858 | GNUNET_NO, | ||
859 | NULL); | ||
860 | return; | ||
861 | } | ||
862 | plugin->env->duc (plugin->env->cls, | ||
863 | -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); | ||
864 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
865 | "datastore-postgres", | ||
866 | "Deleted %u bytes from database\n", | ||
867 | (unsigned int) size); | ||
868 | cont (cont_cls, | ||
869 | key, | ||
870 | size, | ||
871 | GNUNET_OK, | ||
872 | NULL); | ||
873 | } | ||
874 | |||
875 | |||
876 | /** | ||
877 | * Entry point for the plugin. | ||
878 | * | ||
879 | * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*` | ||
880 | * @return our `struct Plugin *` | ||
881 | */ | ||
882 | void * | ||
883 | libgnunet_plugin_datastore_postgres_init (void *cls) | ||
884 | { | ||
885 | struct GNUNET_DATASTORE_PluginEnvironment *env = cls; | ||
886 | struct GNUNET_DATASTORE_PluginFunctions *api; | ||
887 | struct Plugin *plugin; | ||
888 | |||
889 | plugin = GNUNET_new (struct Plugin); | ||
890 | plugin->env = env; | ||
891 | if (GNUNET_OK != init_connection (plugin)) | ||
892 | { | ||
893 | GNUNET_free (plugin); | ||
894 | return NULL; | ||
895 | } | ||
896 | api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions); | ||
897 | api->cls = plugin; | ||
898 | api->estimate_size = &postgres_plugin_estimate_size; | ||
899 | api->put = &postgres_plugin_put; | ||
900 | api->get_key = &postgres_plugin_get_key; | ||
901 | api->get_replication = &postgres_plugin_get_replication; | ||
902 | api->get_expiration = &postgres_plugin_get_expiration; | ||
903 | api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity; | ||
904 | api->get_keys = &postgres_plugin_get_keys; | ||
905 | api->drop = &postgres_plugin_drop; | ||
906 | api->remove_key = &postgres_plugin_remove_key; | ||
907 | return api; | ||
908 | } | ||
909 | |||
910 | |||
911 | /** | ||
912 | * Exit point from the plugin. | ||
913 | * | ||
914 | * @param cls our `struct Plugin *` | ||
915 | * @return always NULL | ||
916 | */ | ||
917 | void * | ||
918 | libgnunet_plugin_datastore_postgres_done (void *cls) | ||
919 | { | ||
920 | struct GNUNET_DATASTORE_PluginFunctions *api = cls; | ||
921 | struct Plugin *plugin = api->cls; | ||
922 | |||
923 | GNUNET_PQ_disconnect (plugin->dbh); | ||
924 | GNUNET_free (plugin); | ||
925 | GNUNET_free (api); | ||
926 | return NULL; | ||
927 | } | ||
928 | |||
929 | |||
930 | /* end of plugin_datastore_postgres.c */ | ||