aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c980
1 files changed, 0 insertions, 980 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
deleted file mode 100644
index 8fb0bf6ee..000000000
--- a/src/datastore/plugin_datastore_postgres.c
+++ /dev/null
@@ -1,980 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2009-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file datastore/plugin_datastore_postgres.c
23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_datastore_plugin.h"
28#include "gnunet_pq_lib.h"
29
30
31/**
32 * After how many ms "busy" should a DB operation fail for good?
33 * A low value makes sure that we are more responsive to requests
34 * (especially PUTs). A high value guarantees a higher success
35 * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
36 *
37 * The default value of 1s should ensure that users do not experience
38 * huge latencies while at the same time allowing operations to succeed
39 * with reasonable probability.
40 */
41#define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
42
43
44/**
45 * Context for all functions in this plugin.
46 */
47struct Plugin
48{
49 /**
50 * Our execution environment.
51 */
52 struct GNUNET_DATASTORE_PluginEnvironment *env;
53
54 /**
55 * Native Postgres database handle.
56 */
57 struct GNUNET_PQ_Context *dbh;
58};
59
60
61/**
62 * @brief Get a database handle
63 *
64 * @param plugin global context
65 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
66 */
67static int
68init_connection (struct Plugin *plugin)
69{
70 struct GNUNET_PQ_ExecuteStatement es[] = {
71 /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
72 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
73 * we do math or inequality tests, so we can't handle the entire range of uint32_t.
74 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
75 */
76 GNUNET_PQ_make_try_execute (
77 "CREATE SEQUENCE IF NOT EXISTS gn090_oid_seq"),
78 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
79 " repl INTEGER NOT NULL DEFAULT 0,"
80 " type INTEGER NOT NULL DEFAULT 0,"
81 " prio INTEGER NOT NULL DEFAULT 0,"
82 " anonLevel INTEGER NOT NULL DEFAULT 0,"
83 " expire BIGINT NOT NULL DEFAULT 0,"
84 " rvalue BIGINT NOT NULL DEFAULT 0,"
85 " hash BYTEA NOT NULL DEFAULT '',"
86 " vhash BYTEA NOT NULL DEFAULT '',"
87 " value BYTEA NOT NULL DEFAULT '',"
88 " oid OID NOT NULL DEFAULT nextval('gn090_oid_seq'))"),
89 GNUNET_PQ_make_try_execute (
90 "ALTER SEQUENCE gn090_oid_seq OWNED BY gn090.oid"),
91 GNUNET_PQ_make_try_execute (
92 "CREATE INDEX IF NOT EXISTS oid_hash ON gn090 (oid)"),
93 GNUNET_PQ_make_try_execute (
94 "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
95 GNUNET_PQ_make_try_execute (
96 "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
97 GNUNET_PQ_make_try_execute (
98 "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
99 GNUNET_PQ_make_try_execute (
100 "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
101 GNUNET_PQ_make_try_execute (
102 "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
103 GNUNET_PQ_make_try_execute (
104 "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
105 GNUNET_PQ_make_try_execute (
106 "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
107 GNUNET_PQ_make_execute (
108 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
109 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
110 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
111 GNUNET_PQ_EXECUTE_STATEMENT_END
112 };
113
114#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
115 struct GNUNET_PQ_PreparedStatement ps[] = {
116 GNUNET_PQ_make_prepare ("get",
117 "SELECT " RESULT_COLUMNS " FROM gn090"
118 " WHERE oid >= $1::bigint AND"
119 " (rvalue >= $2 OR 0 = $3::smallint) AND"
120 " (hash = $4 OR 0 = $5::smallint) AND"
121 " (type = $6 OR 0 = $7::smallint)"
122 " ORDER BY oid ASC LIMIT 1",
123 7),
124 GNUNET_PQ_make_prepare ("put",
125 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
126 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
127 9),
128 GNUNET_PQ_make_prepare ("update",
129 "UPDATE gn090"
130 " SET prio = prio + $1,"
131 " repl = repl + $2,"
132 " expire = GREATEST(expire, $3)"
133 " WHERE hash = $4 AND vhash = $5",
134 5),
135 GNUNET_PQ_make_prepare ("decrepl",
136 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
137 "WHERE oid = $1",
138 1),
139 GNUNET_PQ_make_prepare ("select_non_anonymous",
140 "SELECT " RESULT_COLUMNS " FROM gn090 "
141 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
142 "ORDER BY oid ASC LIMIT 1",
143 2),
144 GNUNET_PQ_make_prepare ("select_expiration_order",
145 "(SELECT " RESULT_COLUMNS " FROM gn090 "
146 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
147 "UNION "
148 "(SELECT " RESULT_COLUMNS " FROM gn090 "
149 "ORDER BY prio ASC LIMIT 1) "
150 "ORDER BY expire ASC LIMIT 1",
151 1),
152 GNUNET_PQ_make_prepare ("select_replication_order",
153 "SELECT " RESULT_COLUMNS " FROM gn090 "
154 "ORDER BY repl DESC,RANDOM() LIMIT 1",
155 0),
156 GNUNET_PQ_make_prepare ("delrow",
157 "DELETE FROM gn090 "
158 "WHERE oid=$1",
159 1),
160 GNUNET_PQ_make_prepare ("remove",
161 "DELETE FROM gn090"
162 " WHERE hash = $1 AND"
163 " value = $2",
164 2),
165 GNUNET_PQ_make_prepare ("get_keys",
166 "SELECT hash FROM gn090",
167 0),
168 GNUNET_PQ_make_prepare ("estimate_size",
169 "SELECT CASE WHEN NOT EXISTS"
170 " (SELECT 1 FROM gn090)"
171 " THEN 0"
172 " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
173 "END AS total",
174 0),
175 GNUNET_PQ_PREPARED_STATEMENT_END
176 };
177#undef RESULT_COLUMNS
178
179 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
180 "datastore-postgres",
181 NULL,
182 es,
183 ps);
184 if (NULL == plugin->dbh)
185 return GNUNET_SYSERR;
186 return GNUNET_OK;
187}
188
189
190/**
191 * Get an estimate of how much space the database is
192 * currently using.
193 *
194 * @param cls our `struct Plugin *`
195 * @return number of bytes used on disk
196 */
197static void
198postgres_plugin_estimate_size (void *cls,
199 unsigned long long *estimate)
200{
201 struct Plugin *plugin = cls;
202 uint64_t total;
203 struct GNUNET_PQ_QueryParam params[] = {
204 GNUNET_PQ_query_param_end
205 };
206 struct GNUNET_PQ_ResultSpec rs[] = {
207 GNUNET_PQ_result_spec_uint64 ("total",
208 &total),
209 GNUNET_PQ_result_spec_end
210 };
211 enum GNUNET_DB_QueryStatus ret;
212
213 if (NULL == estimate)
214 return;
215 ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
216 "estimate_size",
217 params,
218 rs);
219 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
220 {
221 *estimate = 0LL;
222 return;
223 }
224 *estimate = total;
225}
226
227
228/**
229 * Store an item in the datastore.
230 *
231 * @param cls closure with the `struct Plugin`
232 * @param key key for the item
233 * @param absent true if the key was not found in the bloom filter
234 * @param size number of bytes in data
235 * @param data content stored
236 * @param type type of the content
237 * @param priority priority of the content
238 * @param anonymity anonymity-level for the content
239 * @param replication replication-level for the content
240 * @param expiration expiration time for the content
241 * @param cont continuation called with success or failure status
242 * @param cont_cls continuation closure
243 */
244static void
245postgres_plugin_put (void *cls,
246 const struct GNUNET_HashCode *key,
247 bool absent,
248 uint32_t size,
249 const void *data,
250 enum GNUNET_BLOCK_Type type,
251 uint32_t priority,
252 uint32_t anonymity,
253 uint32_t replication,
254 struct GNUNET_TIME_Absolute expiration,
255 PluginPutCont cont,
256 void *cont_cls)
257{
258 struct Plugin *plugin = cls;
259 struct GNUNET_HashCode vhash;
260 enum GNUNET_DB_QueryStatus ret;
261
262 GNUNET_CRYPTO_hash (data,
263 size,
264 &vhash);
265 if (! absent)
266 {
267 struct GNUNET_PQ_QueryParam params[] = {
268 GNUNET_PQ_query_param_uint32 (&priority),
269 GNUNET_PQ_query_param_uint32 (&replication),
270 GNUNET_PQ_query_param_absolute_time (&expiration),
271 GNUNET_PQ_query_param_auto_from_type (key),
272 GNUNET_PQ_query_param_auto_from_type (&vhash),
273 GNUNET_PQ_query_param_end
274 };
275 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
276 "update",
277 params);
278 if (0 > ret)
279 {
280 cont (cont_cls,
281 key,
282 size,
283 GNUNET_SYSERR,
284 _ ("Postgresql exec failure"));
285 return;
286 }
287 bool affected = (0 != ret);
288 if (affected)
289 {
290 cont (cont_cls,
291 key,
292 size,
293 GNUNET_NO,
294 NULL);
295 return;
296 }
297 }
298
299 {
300 uint32_t utype = (uint32_t) type;
301 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
302 UINT64_MAX);
303 struct GNUNET_PQ_QueryParam params[] = {
304 GNUNET_PQ_query_param_uint32 (&replication),
305 GNUNET_PQ_query_param_uint32 (&utype),
306 GNUNET_PQ_query_param_uint32 (&priority),
307 GNUNET_PQ_query_param_uint32 (&anonymity),
308 GNUNET_PQ_query_param_absolute_time (&expiration),
309 GNUNET_PQ_query_param_uint64 (&rvalue),
310 GNUNET_PQ_query_param_auto_from_type (key),
311 GNUNET_PQ_query_param_auto_from_type (&vhash),
312 GNUNET_PQ_query_param_fixed_size (data, size),
313 GNUNET_PQ_query_param_end
314 };
315
316 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
317 "put",
318 params);
319 if (0 > ret)
320 {
321 cont (cont_cls,
322 key,
323 size,
324 GNUNET_SYSERR,
325 "Postgresql exec failure");
326 return;
327 }
328 }
329 plugin->env->duc (plugin->env->cls,
330 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
331 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
332 "datastore-postgres",
333 "Stored %u bytes in database\n",
334 (unsigned int) size);
335 cont (cont_cls,
336 key,
337 size,
338 GNUNET_OK,
339 NULL);
340}
341
342
343/**
344 * Closure for #process_result.
345 */
346struct ProcessResultContext
347{
348 /**
349 * The plugin handle.
350 */
351 struct Plugin *plugin;
352
353 /**
354 * Function to call on each result.
355 */
356 PluginDatumProcessor proc;
357
358 /**
359 * Closure for @e proc.
360 */
361 void *proc_cls;
362};
363
364
365/**
366 * Function invoked to process the result and call the processor of @a
367 * cls.
368 *
369 * @param cls our `struct ProcessResultContext`
370 * @param res result from exec
371 * @param num_results number of results in @a res
372 */
373static void
374process_result (void *cls,
375 PGresult *res,
376 unsigned int num_results)
377{
378 struct ProcessResultContext *prc = cls;
379 struct Plugin *plugin = prc->plugin;
380
381 if (0 == num_results)
382 {
383 /* no result */
384 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
385 "datastore-postgres",
386 "Ending iteration (no more results)\n");
387 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
388 GNUNET_TIME_UNIT_ZERO_ABS, 0);
389 return;
390 }
391 if (1 != num_results)
392 {
393 GNUNET_break (0);
394 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
395 GNUNET_TIME_UNIT_ZERO_ABS, 0);
396 return;
397 }
398 /* Technically we don't need the loop here, but nicer in case
399 we ever relax the condition above. */
400 for (unsigned int i = 0; i < num_results; i++)
401 {
402 int iret;
403 uint32_t rowid;
404 uint32_t utype;
405 uint32_t anonymity;
406 uint32_t replication;
407 uint32_t priority;
408 size_t size;
409 void *data;
410 struct GNUNET_TIME_Absolute expiration_time;
411 struct GNUNET_HashCode key;
412 struct GNUNET_PQ_ResultSpec rs[] = {
413 GNUNET_PQ_result_spec_uint32 ("repl", &replication),
414 GNUNET_PQ_result_spec_uint32 ("type", &utype),
415 GNUNET_PQ_result_spec_uint32 ("prio", &priority),
416 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
417 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
418 GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
419 GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
420 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
421 GNUNET_PQ_result_spec_end
422 };
423
424 if (GNUNET_OK !=
425 GNUNET_PQ_extract_result (res,
426 rs,
427 i))
428 {
429 GNUNET_break (0);
430 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
431 GNUNET_TIME_UNIT_ZERO_ABS, 0);
432 return;
433 }
434
435 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
436 "datastore-postgres",
437 "Found result of size %u bytes and type %u in database\n",
438 (unsigned int) size,
439 (unsigned int) utype);
440 iret = prc->proc (prc->proc_cls,
441 &key,
442 size,
443 data,
444 (enum GNUNET_BLOCK_Type) utype,
445 priority,
446 anonymity,
447 replication,
448 expiration_time,
449 rowid);
450 if (iret == GNUNET_NO)
451 {
452 struct GNUNET_PQ_QueryParam param[] = {
453 GNUNET_PQ_query_param_uint32 (&rowid),
454 GNUNET_PQ_query_param_end
455 };
456
457 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458 "Processor asked for item %u to be removed.\n",
459 (unsigned int) rowid);
460 if (0 <
461 GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
462 "delrow",
463 param))
464 {
465 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
466 "datastore-postgres",
467 "Deleting %u bytes from database\n",
468 (unsigned int) size);
469 plugin->env->duc (plugin->env->cls,
470 -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
471 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
472 "datastore-postgres",
473 "Deleted %u bytes from database\n",
474 (unsigned int) size);
475 }
476 }
477 GNUNET_PQ_cleanup_result (rs);
478 } /* for (i) */
479}
480
481
482/**
483 * Get one of the results for a particular key in the datastore.
484 *
485 * @param cls closure with the `struct Plugin`
486 * @param next_uid return the result with lowest uid >= next_uid
487 * @param random if true, return a random result instead of using next_uid
488 * @param key maybe NULL (to match all entries)
489 * @param type entries of which type are relevant?
490 * Use 0 for any type.
491 * @param proc function to call on the matching value;
492 * will be called with NULL if nothing matches
493 * @param proc_cls closure for @a proc
494 */
495static void
496postgres_plugin_get_key (void *cls,
497 uint64_t next_uid,
498 bool random,
499 const struct GNUNET_HashCode *key,
500 enum GNUNET_BLOCK_Type type,
501 PluginDatumProcessor proc,
502 void *proc_cls)
503{
504 struct Plugin *plugin = cls;
505 uint32_t utype = type;
506 uint16_t use_rvalue = random;
507 uint16_t use_key = NULL != key;
508 uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
509 uint64_t rvalue;
510 struct GNUNET_PQ_QueryParam params[] = {
511 GNUNET_PQ_query_param_uint64 (&next_uid),
512 GNUNET_PQ_query_param_uint64 (&rvalue),
513 GNUNET_PQ_query_param_uint16 (&use_rvalue),
514 GNUNET_PQ_query_param_auto_from_type (key),
515 GNUNET_PQ_query_param_uint16 (&use_key),
516 GNUNET_PQ_query_param_uint32 (&utype),
517 GNUNET_PQ_query_param_uint16 (&use_type),
518 GNUNET_PQ_query_param_end
519 };
520 struct ProcessResultContext prc;
521 enum GNUNET_DB_QueryStatus res;
522
523 if (random)
524 {
525 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
526 UINT64_MAX);
527 next_uid = 0;
528 }
529 else
530 {
531 rvalue = 0;
532 }
533 prc.plugin = plugin;
534 prc.proc = proc;
535 prc.proc_cls = proc_cls;
536
537 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
538 "get",
539 params,
540 &process_result,
541 &prc);
542 if (0 > res)
543 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
544 GNUNET_TIME_UNIT_ZERO_ABS, 0);
545}
546
547
548/**
549 * Select a subset of the items in the datastore and call
550 * the given iterator for each of them.
551 *
552 * @param cls our `struct Plugin *`
553 * @param next_uid return the result with lowest uid >= next_uid
554 * @param type entries of which type should be considered?
555 * Must not be zero (ANY).
556 * @param proc function to call on the matching value;
557 * will be called with NULL if no value matches
558 * @param proc_cls closure for @a proc
559 */
560static void
561postgres_plugin_get_zero_anonymity (void *cls,
562 uint64_t next_uid,
563 enum GNUNET_BLOCK_Type type,
564 PluginDatumProcessor proc,
565 void *proc_cls)
566{
567 struct Plugin *plugin = cls;
568 uint32_t utype = type;
569 struct GNUNET_PQ_QueryParam params[] = {
570 GNUNET_PQ_query_param_uint32 (&utype),
571 GNUNET_PQ_query_param_uint64 (&next_uid),
572 GNUNET_PQ_query_param_end
573 };
574 struct ProcessResultContext prc;
575 enum GNUNET_DB_QueryStatus res;
576
577 prc.plugin = plugin;
578 prc.proc = proc;
579 prc.proc_cls = proc_cls;
580 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
581 "select_non_anonymous",
582 params,
583 &process_result,
584 &prc);
585 if (0 > res)
586 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
587 GNUNET_TIME_UNIT_ZERO_ABS, 0);
588}
589
590
591/**
592 * Context for #repl_iter() function.
593 */
594struct ReplCtx
595{
596 /**
597 * Plugin handle.
598 */
599 struct Plugin *plugin;
600
601 /**
602 * Function to call for the result (or the NULL).
603 */
604 PluginDatumProcessor proc;
605
606 /**
607 * Closure for @e proc.
608 */
609 void *proc_cls;
610};
611
612
613/**
614 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
615 * Decrements the replication counter and calls the original
616 * iterator.
617 *
618 * @param cls closure with the `struct ReplCtx *`
619 * @param key key for the content
620 * @param size number of bytes in @a data
621 * @param data content stored
622 * @param type type of the content
623 * @param priority priority of the content
624 * @param anonymity anonymity-level for the content
625 * @param replication replication-level for the content
626 * @param expiration expiration time for the content
627 * @param uid unique identifier for the datum;
628 * maybe 0 if no unique identifier is available
629 * @return #GNUNET_SYSERR to abort the iteration,
630 * #GNUNET_OK to continue
631 * (continue on call to "next", of course),
632 * #GNUNET_NO to delete the item and continue (if supported)
633 */
634static int
635repl_proc (void *cls,
636 const struct GNUNET_HashCode *key,
637 uint32_t size,
638 const void *data,
639 enum GNUNET_BLOCK_Type type,
640 uint32_t priority,
641 uint32_t anonymity,
642 uint32_t replication,
643 struct GNUNET_TIME_Absolute expiration,
644 uint64_t uid)
645{
646 struct ReplCtx *rc = cls;
647 struct Plugin *plugin = rc->plugin;
648 int ret;
649 uint32_t oid = (uint32_t) uid;
650 struct GNUNET_PQ_QueryParam params[] = {
651 GNUNET_PQ_query_param_uint32 (&oid),
652 GNUNET_PQ_query_param_end
653 };
654 enum GNUNET_DB_QueryStatus qret;
655
656 ret = rc->proc (rc->proc_cls,
657 key,
658 size,
659 data,
660 type,
661 priority,
662 anonymity,
663 replication,
664 expiration,
665 uid);
666 if (NULL == key)
667 return ret;
668 qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
669 "decrepl",
670 params);
671 if (0 > qret)
672 return GNUNET_SYSERR;
673 return ret;
674}
675
676
677/**
678 * Get a random item for replication. Returns a single, not expired,
679 * random item from those with the highest replication counters. The
680 * item's replication counter is decremented by one IF it was positive
681 * before. Call @a proc with all values ZERO or NULL if the datastore
682 * is empty.
683 *
684 * @param cls closure with the `struct Plugin`
685 * @param proc function to call the value (once only).
686 * @param proc_cls closure for @a proc
687 */
688static void
689postgres_plugin_get_replication (void *cls,
690 PluginDatumProcessor proc,
691 void *proc_cls)
692{
693 struct Plugin *plugin = cls;
694 struct GNUNET_PQ_QueryParam params[] = {
695 GNUNET_PQ_query_param_end
696 };
697 struct ReplCtx rc;
698 struct ProcessResultContext prc;
699 enum GNUNET_DB_QueryStatus res;
700
701 rc.plugin = plugin;
702 rc.proc = proc;
703 rc.proc_cls = proc_cls;
704 prc.plugin = plugin;
705 prc.proc = &repl_proc;
706 prc.proc_cls = &rc;
707 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
708 "select_replication_order",
709 params,
710 &process_result,
711 &prc);
712 if (0 > res)
713 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
714 GNUNET_TIME_UNIT_ZERO_ABS, 0);
715}
716
717
718/**
719 * Get a random item for expiration. Call @a proc with all values
720 * ZERO or NULL if the datastore is empty.
721 *
722 * @param cls closure with the `struct Plugin`
723 * @param proc function to call the value (once only).
724 * @param proc_cls closure for @a proc
725 */
726static void
727postgres_plugin_get_expiration (void *cls,
728 PluginDatumProcessor proc,
729 void *proc_cls)
730{
731 struct Plugin *plugin = cls;
732 struct GNUNET_TIME_Absolute now = { 0 };
733 struct GNUNET_PQ_QueryParam params[] = {
734 GNUNET_PQ_query_param_absolute_time (&now),
735 GNUNET_PQ_query_param_end
736 };
737 struct ProcessResultContext prc;
738
739 now = GNUNET_TIME_absolute_get ();
740 prc.plugin = plugin;
741 prc.proc = proc;
742 prc.proc_cls = proc_cls;
743 (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
744 "select_expiration_order",
745 params,
746 &process_result,
747 &prc);
748}
749
750
751/**
752 * Closure for #process_keys.
753 */
754struct ProcessKeysContext
755{
756 /**
757 * Function to call for each key.
758 */
759 PluginKeyProcessor proc;
760
761 /**
762 * Closure for @e proc.
763 */
764 void *proc_cls;
765};
766
767
768/**
769 * Function to be called with the results of a SELECT statement
770 * that has returned @a num_results results.
771 *
772 * @param cls closure with a `struct ProcessKeysContext`
773 * @param result the postgres result
774 * @param num_result the number of results in @a result
775 */
776static void
777process_keys (void *cls,
778 PGresult *result,
779 unsigned int num_results)
780{
781 struct ProcessKeysContext *pkc = cls;
782
783 for (unsigned i = 0; i < num_results; i++)
784 {
785 struct GNUNET_HashCode key;
786 struct GNUNET_PQ_ResultSpec rs[] = {
787 GNUNET_PQ_result_spec_auto_from_type ("hash",
788 &key),
789 GNUNET_PQ_result_spec_end
790 };
791
792 if (GNUNET_OK !=
793 GNUNET_PQ_extract_result (result,
794 rs,
795 i))
796 {
797 GNUNET_break (0);
798 continue;
799 }
800 pkc->proc (pkc->proc_cls,
801 &key,
802 1);
803 GNUNET_PQ_cleanup_result (rs);
804 }
805}
806
807
808/**
809 * Get all of the keys in the datastore.
810 *
811 * @param cls closure with the `struct Plugin *`
812 * @param proc function to call on each key
813 * @param proc_cls closure for @a proc
814 */
815static void
816postgres_plugin_get_keys (void *cls,
817 PluginKeyProcessor proc,
818 void *proc_cls)
819{
820 struct Plugin *plugin = cls;
821 struct GNUNET_PQ_QueryParam params[] = {
822 GNUNET_PQ_query_param_end
823 };
824 struct ProcessKeysContext pkc;
825
826 pkc.proc = proc;
827 pkc.proc_cls = proc_cls;
828 (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
829 "get_keys",
830 params,
831 &process_keys,
832 &pkc);
833 proc (proc_cls,
834 NULL,
835 0);
836}
837
838
839/**
840 * Drop database.
841 *
842 * @param cls closure with the `struct Plugin *`
843 */
844static void
845postgres_plugin_drop (void *cls)
846{
847 struct Plugin *plugin = cls;
848 struct GNUNET_PQ_ExecuteStatement es[] = {
849 GNUNET_PQ_make_execute ("DROP TABLE gn090"),
850 GNUNET_PQ_EXECUTE_STATEMENT_END
851 };
852
853 if (GNUNET_OK !=
854 GNUNET_PQ_exec_statements (plugin->dbh,
855 es))
856 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
857 "postgres",
858 _ ("Failed to drop table from database.\n"));
859}
860
861
862/**
863 * Remove a particular key in the datastore.
864 *
865 * @param cls closure
866 * @param key key for the content
867 * @param size number of bytes in data
868 * @param data content stored
869 * @param cont continuation called with success or failure status
870 * @param cont_cls continuation closure for @a cont
871 */
872static void
873postgres_plugin_remove_key (void *cls,
874 const struct GNUNET_HashCode *key,
875 uint32_t size,
876 const void *data,
877 PluginRemoveCont cont,
878 void *cont_cls)
879{
880 struct Plugin *plugin = cls;
881 enum GNUNET_DB_QueryStatus ret;
882 struct GNUNET_PQ_QueryParam params[] = {
883 GNUNET_PQ_query_param_auto_from_type (key),
884 GNUNET_PQ_query_param_fixed_size (data, size),
885 GNUNET_PQ_query_param_end
886 };
887
888 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
889 "remove",
890 params);
891 if (0 > ret)
892 {
893 cont (cont_cls,
894 key,
895 size,
896 GNUNET_SYSERR,
897 _ ("Postgresql exec failure"));
898 return;
899 }
900 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
901 {
902 cont (cont_cls,
903 key,
904 size,
905 GNUNET_NO,
906 NULL);
907 return;
908 }
909 plugin->env->duc (plugin->env->cls,
910 -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
911 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
912 "datastore-postgres",
913 "Deleted %u bytes from database\n",
914 (unsigned int) size);
915 cont (cont_cls,
916 key,
917 size,
918 GNUNET_OK,
919 NULL);
920}
921
922
923/**
924 * Entry point for the plugin.
925 *
926 * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
927 * @return our `struct Plugin *`
928 */
929void *
930libgnunet_plugin_datastore_postgres_init (void *cls)
931{
932 struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
933 struct GNUNET_DATASTORE_PluginFunctions *api;
934 struct Plugin *plugin;
935
936 plugin = GNUNET_new (struct Plugin);
937 plugin->env = env;
938 if (GNUNET_OK != init_connection (plugin))
939 {
940 GNUNET_free (plugin);
941 return NULL;
942 }
943 api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
944 api->cls = plugin;
945 api->estimate_size = &postgres_plugin_estimate_size;
946 api->put = &postgres_plugin_put;
947 api->get_key = &postgres_plugin_get_key;
948 api->get_replication = &postgres_plugin_get_replication;
949 api->get_expiration = &postgres_plugin_get_expiration;
950 api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
951 api->get_keys = &postgres_plugin_get_keys;
952 api->drop = &postgres_plugin_drop;
953 api->remove_key = &postgres_plugin_remove_key;
954 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
955 "datastore-postgres",
956 _ ("Postgres database running\n"));
957 return api;
958}
959
960
961/**
962 * Exit point from the plugin.
963 *
964 * @param cls our `struct Plugin *`
965 * @return always NULL
966 */
967void *
968libgnunet_plugin_datastore_postgres_done (void *cls)
969{
970 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
971 struct Plugin *plugin = api->cls;
972
973 GNUNET_PQ_disconnect (plugin->dbh);
974 GNUNET_free (plugin);
975 GNUNET_free (api);
976 return NULL;
977}
978
979
980/* end of plugin_datastore_postgres.c */