aboutsummaryrefslogtreecommitdiff
path: root/src/plugin/datacache/plugin_datacache_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugin/datacache/plugin_datacache_postgres.c')
-rw-r--r--src/plugin/datacache/plugin_datacache_postgres.c616
1 files changed, 616 insertions, 0 deletions
diff --git a/src/plugin/datacache/plugin_datacache_postgres.c b/src/plugin/datacache/plugin_datacache_postgres.c
new file mode 100644
index 000000000..8bfd04aea
--- /dev/null
+++ b/src/plugin/datacache/plugin_datacache_postgres.c
@@ -0,0 +1,616 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2006, 2009, 2010, 2012, 2015, 2017, 2018, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file datacache/plugin_datacache_postgres.c
23 * @brief postgres for an implementation of a database backend for the datacache
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_pq_lib.h"
29#include "gnunet_datacache_plugin.h"
30
31#define LOG(kind, ...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
32
33/**
34 * Per-entry overhead estimate
35 */
36#define OVERHEAD (sizeof(struct GNUNET_HashCode) + 24)
37
38/**
39 * Context for all functions in this plugin.
40 */
41struct Plugin
42{
43 /**
44 * Our execution environment.
45 */
46 struct GNUNET_DATACACHE_PluginEnvironment *env;
47
48 /**
49 * Native Postgres database handle.
50 */
51 struct GNUNET_PQ_Context *dbh;
52
53 /**
54 * Number of key-value pairs in the database.
55 */
56 unsigned int num_items;
57};
58
59
60/**
61 * @brief Get a database handle
62 *
63 * @param plugin global context
64 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
65 */
66static enum GNUNET_GenericReturnValue
67init_connection (struct Plugin *plugin)
68{
69 struct GNUNET_PQ_PreparedStatement ps[] = {
70 GNUNET_PQ_make_prepare ("getkt",
71 "SELECT expiration_time,type,ro,value,trunc,path"
72 " FROM datacache.gn180dc"
73 " WHERE key=$1 AND type=$2 AND expiration_time >= $3"),
74 GNUNET_PQ_make_prepare ("getk",
75 "SELECT expiration_time,type,ro,value,trunc,path"
76 " FROM datacache.gn180dc"
77 " WHERE key=$1 AND expiration_time >= $2"),
78 GNUNET_PQ_make_prepare ("getex",
79 "SELECT LENGTH(value) AS len,oid,key"
80 " FROM datacache.gn180dc"
81 " WHERE expiration_time < $1"
82 " ORDER BY expiration_time ASC LIMIT 1"),
83 GNUNET_PQ_make_prepare ("getm",
84 "SELECT LENGTH(value) AS len,oid,key"
85 " FROM datacache.gn180dc"
86 " ORDER BY prox ASC, expiration_time ASC LIMIT 1"),
87 GNUNET_PQ_make_prepare ("get_closest",
88 "(SELECT expiration_time,type,ro,value,trunc,path,key"
89 " FROM datacache.gn180dc"
90 " WHERE key >= $1"
91 " AND expiration_time >= $2"
92 " AND ( (type = $3) OR ( 0 = $3) )"
93 " ORDER BY key ASC"
94 " LIMIT $4)"
95 " UNION "
96 "(SELECT expiration_time,type,ro,value,trunc,path,key"
97 " FROM datacache.gn180dc"
98 " WHERE key <= $1"
99 " AND expiration_time >= $2"
100 " AND ( (type = $3) OR ( 0 = $3) )"
101 " ORDER BY key DESC"
102 " LIMIT $4)"),
103 GNUNET_PQ_make_prepare ("delrow",
104 "DELETE FROM datacache.gn180dc"
105 " WHERE oid=$1"),
106 GNUNET_PQ_make_prepare ("put",
107 "INSERT INTO datacache.gn180dc"
108 " (type, ro, prox, expiration_time, key, value, trunc, path) "
109 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"),
110 GNUNET_PQ_PREPARED_STATEMENT_END
111 };
112
113 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
114 "datacache-postgres",
115 "datacache-",
116 NULL,
117 ps);
118 if (NULL == plugin->dbh)
119 return GNUNET_SYSERR;
120 return GNUNET_OK;
121}
122
123
124/**
125 * Store an item in the datastore.
126 *
127 * @param cls closure (our `struct Plugin`)
128 * @param prox proximity of @a key to my PID
129 * @param block data to store
130 * @return 0 if duplicate, -1 on error, number of bytes used otherwise
131 */
132static ssize_t
133postgres_plugin_put (void *cls,
134 uint32_t prox,
135 const struct GNUNET_DATACACHE_Block *block)
136{
137 struct Plugin *plugin = cls;
138 uint32_t type32 = (uint32_t) block->type;
139 uint32_t ro32 = (uint32_t) block->type;
140 struct GNUNET_PQ_QueryParam params[] = {
141 GNUNET_PQ_query_param_uint32 (&type32),
142 GNUNET_PQ_query_param_uint32 (&ro32),
143 GNUNET_PQ_query_param_uint32 (&prox),
144 GNUNET_PQ_query_param_absolute_time (&block->expiration_time),
145 GNUNET_PQ_query_param_auto_from_type (&block->key),
146 GNUNET_PQ_query_param_fixed_size (block->data,
147 block->data_size),
148 GNUNET_PQ_query_param_auto_from_type (&block->trunc_peer),
149 (0 == block->put_path_length)
150 ? GNUNET_PQ_query_param_null ()
151 : GNUNET_PQ_query_param_fixed_size (
152 block->put_path,
153 block->put_path_length
154 * sizeof(struct GNUNET_DHT_PathElement)),
155 GNUNET_PQ_query_param_end
156 };
157 enum GNUNET_DB_QueryStatus ret;
158
159 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
160 "put",
161 params);
162 if (0 > ret)
163 return -1;
164 plugin->num_items++;
165 return block->data_size + OVERHEAD;
166}
167
168
169/**
170 * Closure for #handle_results.
171 */
172struct HandleResultContext
173{
174 /**
175 * Function to call on each result, may be NULL.
176 */
177 GNUNET_DATACACHE_Iterator iter;
178
179 /**
180 * Closure for @e iter.
181 */
182 void *iter_cls;
183
184 /**
185 * Key used.
186 */
187 const struct GNUNET_HashCode *key;
188};
189
190
191/**
192 * Function to be called with the results of a SELECT statement
193 * that has returned @a num_results results. Parse the result
194 * and call the callback given in @a cls
195 *
196 * @param cls closure of type `struct HandleResultContext`
197 * @param result the postgres result
198 * @param num_results the number of results in @a result
199 */
200static void
201handle_results (void *cls,
202 PGresult *result,
203 unsigned int num_results)
204{
205 struct HandleResultContext *hrc = cls;
206
207 for (unsigned int i = 0; i < num_results; i++)
208 {
209 uint32_t type32;
210 uint32_t bro32;
211 void *data;
212 struct GNUNET_DATACACHE_Block block;
213 void *path = NULL;
214 size_t path_size = 0;
215 struct GNUNET_PQ_ResultSpec rs[] = {
216 GNUNET_PQ_result_spec_absolute_time ("expiration_time",
217 &block.expiration_time),
218 GNUNET_PQ_result_spec_uint32 ("type",
219 &type32),
220 GNUNET_PQ_result_spec_uint32 ("ro",
221 &bro32),
222 GNUNET_PQ_result_spec_variable_size ("value",
223 &data,
224 &block.data_size),
225 GNUNET_PQ_result_spec_auto_from_type ("trunc",
226 &block.trunc_peer),
227 GNUNET_PQ_result_spec_allow_null (
228 GNUNET_PQ_result_spec_variable_size ("path",
229 &path,
230 &path_size),
231 NULL),
232 GNUNET_PQ_result_spec_end
233 };
234
235 if (GNUNET_YES !=
236 GNUNET_PQ_extract_result (result,
237 rs,
238 i))
239 {
240 GNUNET_break (0);
241 return;
242 }
243 if (0 != (path_size % sizeof(struct GNUNET_DHT_PathElement)))
244 {
245 GNUNET_break (0);
246 path_size = 0;
247 path = NULL;
248 }
249 block.data = data;
250 block.put_path = path;
251 block.put_path_length
252 = path_size / sizeof (struct GNUNET_DHT_PathElement);
253 block.type = (enum GNUNET_BLOCK_Type) type32;
254 block.ro = (enum GNUNET_DHT_RouteOption) bro32;
255 block.key = *hrc->key;
256 LOG (GNUNET_ERROR_TYPE_DEBUG,
257 "Found result of size %u bytes and type %u in database\n",
258 (unsigned int) block.data_size,
259 (unsigned int) block.type);
260 if ( (NULL != hrc->iter) &&
261 (GNUNET_SYSERR ==
262 hrc->iter (hrc->iter_cls,
263 &block)) )
264 {
265 LOG (GNUNET_ERROR_TYPE_DEBUG,
266 "Ending iteration (client error)\n");
267 GNUNET_PQ_cleanup_result (rs);
268 return;
269 }
270 GNUNET_PQ_cleanup_result (rs);
271 }
272}
273
274
275/**
276 * Iterate over the results for a particular key
277 * in the datastore.
278 *
279 * @param cls closure (our `struct Plugin`)
280 * @param key key to look for
281 * @param type entries of which type are relevant?
282 * @param iter maybe NULL (to just count)
283 * @param iter_cls closure for @a iter
284 * @return the number of results found
285 */
286static unsigned int
287postgres_plugin_get (void *cls,
288 const struct GNUNET_HashCode *key,
289 enum GNUNET_BLOCK_Type type,
290 GNUNET_DATACACHE_Iterator iter,
291 void *iter_cls)
292{
293 struct Plugin *plugin = cls;
294 uint32_t type32 = (uint32_t) type;
295 struct GNUNET_TIME_Absolute now = { 0 };
296 struct GNUNET_PQ_QueryParam paramk[] = {
297 GNUNET_PQ_query_param_auto_from_type (key),
298 GNUNET_PQ_query_param_absolute_time (&now),
299 GNUNET_PQ_query_param_end
300 };
301 struct GNUNET_PQ_QueryParam paramkt[] = {
302 GNUNET_PQ_query_param_auto_from_type (key),
303 GNUNET_PQ_query_param_uint32 (&type32),
304 GNUNET_PQ_query_param_absolute_time (&now),
305 GNUNET_PQ_query_param_end
306 };
307 enum GNUNET_DB_QueryStatus res;
308 struct HandleResultContext hr_ctx;
309
310 now = GNUNET_TIME_absolute_get ();
311 hr_ctx.iter = iter;
312 hr_ctx.iter_cls = iter_cls;
313 hr_ctx.key = key;
314 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
315 (0 == type) ? "getk" : "getkt",
316 (0 == type) ? paramk : paramkt,
317 &handle_results,
318 &hr_ctx);
319 if (res < 0)
320 return 0;
321 return res;
322}
323
324
325/**
326 * Delete the entry with the lowest expiration value
327 * from the datacache right now.
328 *
329 * @param cls closure (our `struct Plugin`)
330 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
331 */
332static enum GNUNET_GenericReturnValue
333postgres_plugin_del (void *cls)
334{
335 struct Plugin *plugin = cls;
336 struct GNUNET_PQ_QueryParam pempty[] = {
337 GNUNET_PQ_query_param_end
338 };
339 uint32_t size;
340 uint64_t oid;
341 struct GNUNET_HashCode key;
342 struct GNUNET_PQ_ResultSpec rs[] = {
343 GNUNET_PQ_result_spec_uint32 ("len",
344 &size),
345 GNUNET_PQ_result_spec_uint64 ("oid",
346 &oid),
347 GNUNET_PQ_result_spec_auto_from_type ("key",
348 &key),
349 GNUNET_PQ_result_spec_end
350 };
351 enum GNUNET_DB_QueryStatus res;
352 struct GNUNET_PQ_QueryParam dparam[] = {
353 GNUNET_PQ_query_param_uint64 (&oid),
354 GNUNET_PQ_query_param_end
355 };
356 struct GNUNET_TIME_Absolute now;
357 struct GNUNET_PQ_QueryParam xparam[] = {
358 GNUNET_PQ_query_param_absolute_time (&now),
359 GNUNET_PQ_query_param_end
360 };
361
362 now = GNUNET_TIME_absolute_get ();
363 res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
364 "getex",
365 xparam,
366 rs);
367 if (0 >= res)
368 res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
369 "getm",
370 pempty,
371 rs);
372 if (0 > res)
373 return GNUNET_SYSERR;
374 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
375 {
376 /* no result */
377 LOG (GNUNET_ERROR_TYPE_DEBUG,
378 "Ending iteration (no more results)\n");
379 return 0;
380 }
381 res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
382 "delrow",
383 dparam);
384 if (0 > res)
385 {
386 GNUNET_PQ_cleanup_result (rs);
387 return GNUNET_SYSERR;
388 }
389 plugin->num_items--;
390 plugin->env->delete_notify (plugin->env->cls,
391 &key,
392 size + OVERHEAD);
393 GNUNET_PQ_cleanup_result (rs);
394 return GNUNET_OK;
395}
396
397
398/**
399 * Closure for #extract_result_cb.
400 */
401struct ExtractResultContext
402{
403 /**
404 * Function to call for each result found.
405 */
406 GNUNET_DATACACHE_Iterator iter;
407
408 /**
409 * Closure for @e iter.
410 */
411 void *iter_cls;
412};
413
414
415/**
416 * Function to be called with the results of a SELECT statement
417 * that has returned @a num_results results. Calls the `iter`
418 * from @a cls for each result.
419 *
420 * @param cls closure with the `struct ExtractResultContext`
421 * @param result the postgres result
422 * @param num_results the number of results in @a result
423 */
424static void
425extract_result_cb (void *cls,
426 PGresult *result,
427 unsigned int num_results)
428{
429 struct ExtractResultContext *erc = cls;
430
431 if (NULL == erc->iter)
432 return;
433 for (unsigned int i = 0; i < num_results; i++)
434 {
435 uint32_t type32;
436 uint32_t bro32;
437 struct GNUNET_DATACACHE_Block block;
438 void *data;
439 void *path;
440 size_t path_size;
441 struct GNUNET_PQ_ResultSpec rs[] = {
442 GNUNET_PQ_result_spec_absolute_time ("expiration_time",
443 &block.expiration_time),
444 GNUNET_PQ_result_spec_uint32 ("type",
445 &type32),
446 GNUNET_PQ_result_spec_uint32 ("ro",
447 &bro32),
448 GNUNET_PQ_result_spec_variable_size ("value",
449 &data,
450 &block.data_size),
451 GNUNET_PQ_result_spec_auto_from_type ("trunc",
452 &block.trunc_peer),
453 GNUNET_PQ_result_spec_variable_size ("path",
454 &path,
455 &path_size),
456 GNUNET_PQ_result_spec_auto_from_type ("key",
457 &block.key),
458 GNUNET_PQ_result_spec_end
459 };
460
461 if (GNUNET_YES !=
462 GNUNET_PQ_extract_result (result,
463 rs,
464 i))
465 {
466 GNUNET_break (0);
467 return;
468 }
469 if (0 != (path_size % sizeof(struct GNUNET_DHT_PathElement)))
470 {
471 GNUNET_break (0);
472 path_size = 0;
473 path = NULL;
474 }
475 block.type = (enum GNUNET_BLOCK_Type) type32;
476 block.ro = (enum GNUNET_DHT_RouteOption) bro32;
477 block.data = data;
478 block.put_path = path;
479 block.put_path_length = path_size / sizeof (struct GNUNET_DHT_PathElement);
480 LOG (GNUNET_ERROR_TYPE_DEBUG,
481 "Found result of size %u bytes and type %u in database\n",
482 (unsigned int) block.data_size,
483 (unsigned int) block.type);
484 if ( (NULL != erc->iter) &&
485 (GNUNET_SYSERR ==
486 erc->iter (erc->iter_cls,
487 &block)) )
488 {
489 LOG (GNUNET_ERROR_TYPE_DEBUG,
490 "Ending iteration (client error)\n");
491 GNUNET_PQ_cleanup_result (rs);
492 break;
493 }
494 GNUNET_PQ_cleanup_result (rs);
495 }
496}
497
498
499/**
500 * Iterate over the results that are "close" to a particular key in
501 * the datacache. "close" is defined as numerically larger than @a
502 * key (when interpreted as a circular address space), with small
503 * distance.
504 *
505 * @param cls closure (internal context for the plugin)
506 * @param key area of the keyspace to look into
507 * @param type desired block type for the replies
508 * @param num_results number of results that should be returned to @a iter
509 * @param iter maybe NULL (to just count)
510 * @param iter_cls closure for @a iter
511 * @return the number of results found
512 */
513static unsigned int
514postgres_plugin_get_closest (void *cls,
515 const struct GNUNET_HashCode *key,
516 enum GNUNET_BLOCK_Type type,
517 unsigned int num_results,
518 GNUNET_DATACACHE_Iterator iter,
519 void *iter_cls)
520{
521 struct Plugin *plugin = cls;
522 uint32_t num_results32 = (uint32_t) num_results;
523 uint32_t type32 = (uint32_t) type;
524 struct GNUNET_TIME_Absolute now;
525 struct GNUNET_PQ_QueryParam params[] = {
526 GNUNET_PQ_query_param_auto_from_type (key),
527 GNUNET_PQ_query_param_absolute_time (&now),
528 GNUNET_PQ_query_param_uint32 (&type32),
529 GNUNET_PQ_query_param_uint32 (&num_results32),
530 GNUNET_PQ_query_param_end
531 };
532 enum GNUNET_DB_QueryStatus res;
533 struct ExtractResultContext erc;
534
535 erc.iter = iter;
536 erc.iter_cls = iter_cls;
537 now = GNUNET_TIME_absolute_get ();
538 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
539 "get_closest",
540 params,
541 &extract_result_cb,
542 &erc);
543 if (0 > res)
544 {
545 LOG (GNUNET_ERROR_TYPE_DEBUG,
546 "Ending iteration (postgres error)\n");
547 return 0;
548 }
549 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
550 {
551 /* no result */
552 LOG (GNUNET_ERROR_TYPE_DEBUG,
553 "Ending iteration (no more results)\n");
554 return 0;
555 }
556 return res;
557}
558
559
560/**
561 * Entry point for the plugin.
562 *
563 * @param cls closure (the `struct GNUNET_DATACACHE_PluginEnvironmnet`)
564 * @return the plugin's closure (our `struct Plugin`)
565 */
566void *
567libgnunet_plugin_datacache_postgres_init (void *cls)
568{
569 struct GNUNET_DATACACHE_PluginEnvironment *env = cls;
570 struct GNUNET_DATACACHE_PluginFunctions *api;
571 struct Plugin *plugin;
572
573 plugin = GNUNET_new (struct Plugin);
574 plugin->env = env;
575
576 if (GNUNET_OK != init_connection (plugin))
577 {
578 GNUNET_free (plugin);
579 return NULL;
580 }
581
582 api = GNUNET_new (struct GNUNET_DATACACHE_PluginFunctions);
583 api->cls = plugin;
584 api->get = &postgres_plugin_get;
585 api->put = &postgres_plugin_put;
586 api->del = &postgres_plugin_del;
587 api->get_closest = &postgres_plugin_get_closest;
588 LOG (GNUNET_ERROR_TYPE_INFO,
589 "Postgres datacache running\n");
590 return api;
591}
592
593
594/**
595 * Exit point from the plugin.
596 *
597 * @param cls closure (our `struct Plugin`)
598 * @return NULL
599 */
600void *
601libgnunet_plugin_datacache_postgres_done (void *cls)
602{
603 struct GNUNET_DATACACHE_PluginFunctions *api = cls;
604 struct Plugin *plugin = api->cls;
605
606 GNUNET_break (GNUNET_OK ==
607 GNUNET_PQ_exec_sql (plugin->dbh,
608 "datacache-drop"));
609 GNUNET_PQ_disconnect (plugin->dbh);
610 GNUNET_free (plugin);
611 GNUNET_free (api);
612 return NULL;
613}
614
615
616/* end of plugin_datacache_postgres.c */