aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c834
1 files changed, 418 insertions, 416 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index b6aeb0be6..fd1a533bb 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2009-2016 GNUnet e.V. 3 Copyright (C) 2009-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -23,10 +23,8 @@
23 * @brief postgres-based datastore backend 23 * @brief postgres-based datastore backend
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26
27#include "platform.h" 26#include "platform.h"
28#include "gnunet_datastore_plugin.h" 27#include "gnunet_datastore_plugin.h"
29#include "gnunet_postgres_lib.h"
30#include "gnunet_pq_lib.h" 28#include "gnunet_pq_lib.h"
31 29
32 30
@@ -70,160 +68,106 @@ struct Plugin
70static int 68static int
71init_connection (struct Plugin *plugin) 69init_connection (struct Plugin *plugin)
72{ 70{
73 PGresult *ret; 71 struct GNUNET_PQ_ExecuteStatement es[] = {
72 /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
73 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
74 * we do math or inequality tests, so we can't handle the entire range of uint32_t.
75 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
76 * PostgreSQL also recommends against using WITH OIDS.
77 */
78 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
79 " repl INTEGER NOT NULL DEFAULT 0,"
80 " type INTEGER NOT NULL DEFAULT 0,"
81 " prio INTEGER NOT NULL DEFAULT 0,"
82 " anonLevel INTEGER NOT NULL DEFAULT 0,"
83 " expire BIGINT NOT NULL DEFAULT 0,"
84 " rvalue BIGINT NOT NULL DEFAULT 0,"
85 " hash BYTEA NOT NULL DEFAULT '',"
86 " vhash BYTEA NOT NULL DEFAULT '',"
87 " value BYTEA NOT NULL DEFAULT '')"
88 "WITH OIDS"),
89 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
90 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
91 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
92 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
93 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
94 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
95 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
96 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
97 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
98 GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
99 GNUNET_PQ_EXECUTE_STATEMENT_END
100 };
101#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
102 struct GNUNET_PQ_PreparedStatement ps[] = {
103 GNUNET_PQ_make_prepare ("get",
104 "SELECT " RESULT_COLUMNS " FROM gn090 "
105 "WHERE oid >= $1::bigint AND "
106 "(rvalue >= $2 OR 0 = $3::smallint) AND "
107 "(hash = $4 OR 0 = $5::smallint) AND "
108 "(type = $6 OR 0 = $7::smallint) "
109 "ORDER BY oid ASC LIMIT 1",
110 7),
111 GNUNET_PQ_make_prepare ("put",
112 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
113 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
114 9),
115 GNUNET_PQ_make_prepare ("update",
116 "UPDATE gn090 "
117 "SET prio = prio + $1, "
118 "repl = repl + $2, "
119 "expire = GREATEST(expire, $3) "
120 "WHERE hash = $4 AND vhash = $5",
121 5),
122 GNUNET_PQ_make_prepare ("decrepl",
123 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
124 "WHERE oid = $1",
125 1),
126 GNUNET_PQ_make_prepare ("select_non_anonymous",
127 "SELECT " RESULT_COLUMNS " FROM gn090 "
128 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
129 "ORDER BY oid ASC LIMIT 1",
130 2),
131 GNUNET_PQ_make_prepare ("select_expiration_order",
132 "(SELECT " RESULT_COLUMNS " FROM gn090 "
133 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
134 "UNION "
135 "(SELECT " RESULT_COLUMNS " FROM gn090 "
136 "ORDER BY prio ASC LIMIT 1) "
137 "ORDER BY expire ASC LIMIT 1",
138 1),
139 GNUNET_PQ_make_prepare ("select_replication_order",
140 "SELECT " RESULT_COLUMNS " FROM gn090 "
141 "ORDER BY repl DESC,RANDOM() LIMIT 1",
142 0),
143 GNUNET_PQ_make_prepare ("delrow",
144 "DELETE FROM gn090 " "WHERE oid=$1",
145 1),
146 GNUNET_PQ_make_prepare ("remove", "DELETE FROM gn090 "
147 "WHERE hash = $1 AND "
148 "value = $2",
149 2),
150 GNUNET_PQ_make_prepare ("get_keys",
151 "SELECT hash FROM gn090",
152 0),
153 GNUNET_PQ_make_prepare ("estimate_size",
154 "SELECT SUM(LENGTH(value))+256*COUNT(*) AS total FROM gn090",
155 0),
156 GNUNET_PQ_PREPARED_STATEMENT_END
157 };
158#undef RESULT_COLUMNS
74 159
75 plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres"); 160 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
161 "datastore-postgres");
76 if (NULL == plugin->dbh) 162 if (NULL == plugin->dbh)
77 return GNUNET_SYSERR; 163 return GNUNET_SYSERR;
78 164
79 /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because 165 if ( (GNUNET_OK !=
80 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel 166 GNUNET_PQ_exec_statements (plugin->dbh,
81 * we do math or inequality tests, so we can't handle the entire range of uint32_t. 167 es)) ||
82 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC. 168 (GNUNET_OK !=
83 * PostgreSQL also recommends against using WITH OIDS. 169 GNUNET_PQ_prepare_statements (plugin->dbh,
84 */ 170 ps)) )
85 ret =
86 PQexec (plugin->dbh,
87 "CREATE TABLE IF NOT EXISTS gn090 ("
88 " repl INTEGER NOT NULL DEFAULT 0,"
89 " type INTEGER NOT NULL DEFAULT 0,"
90 " prio INTEGER NOT NULL DEFAULT 0,"
91 " anonLevel INTEGER NOT NULL DEFAULT 0,"
92 " expire BIGINT NOT NULL DEFAULT 0,"
93 " rvalue BIGINT NOT NULL DEFAULT 0,"
94 " hash BYTEA NOT NULL DEFAULT '',"
95 " vhash BYTEA NOT NULL DEFAULT '',"
96 " value BYTEA NOT NULL DEFAULT '')"
97 "WITH OIDS");
98 if ( (NULL == ret) ||
99 ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
100 (0 != strcmp ("42P07", /* duplicate table */
101 PQresultErrorField
102 (ret,
103 PG_DIAG_SQLSTATE)))))
104 {
105 (void) GNUNET_POSTGRES_check_result (plugin->dbh,
106 ret,
107 PGRES_COMMAND_OK,
108 "CREATE TABLE",
109 "gn090");
110 PQfinish (plugin->dbh);
111 plugin->dbh = NULL;
112 return GNUNET_SYSERR;
113 }
114
115 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
116 {
117 if ((GNUNET_OK !=
118 GNUNET_POSTGRES_exec (plugin->dbh,
119 "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) ||
120 (GNUNET_OK !=
121 GNUNET_POSTGRES_exec (plugin->dbh,
122 "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) ||
123 (GNUNET_OK !=
124 GNUNET_POSTGRES_exec (plugin->dbh,
125 "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) ||
126 (GNUNET_OK !=
127 GNUNET_POSTGRES_exec (plugin->dbh,
128 "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) ||
129 (GNUNET_OK !=
130 GNUNET_POSTGRES_exec (plugin->dbh,
131 "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
132 (GNUNET_OK !=
133 GNUNET_POSTGRES_exec (plugin->dbh,
134 "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
135 (GNUNET_OK !=
136 GNUNET_POSTGRES_exec (plugin->dbh,
137 "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)")))
138 {
139 PQclear (ret);
140 PQfinish (plugin->dbh);
141 plugin->dbh = NULL;
142 return GNUNET_SYSERR;
143 }
144 }
145 PQclear (ret);
146
147 ret =
148 PQexec (plugin->dbh,
149 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
150 if (GNUNET_OK !=
151 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
152 {
153 PQfinish (plugin->dbh);
154 plugin->dbh = NULL;
155 return GNUNET_SYSERR;
156 }
157 PQclear (ret);
158 ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
159 if (GNUNET_OK !=
160 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
161 {
162 PQfinish (plugin->dbh);
163 plugin->dbh = NULL;
164 return GNUNET_SYSERR;
165 }
166 PQclear (ret);
167 ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
168 if (GNUNET_OK !=
169 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
170 {
171 PQfinish (plugin->dbh);
172 plugin->dbh = NULL;
173 return GNUNET_SYSERR;
174 }
175 PQclear (ret);
176#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
177 if ((GNUNET_OK !=
178 GNUNET_POSTGRES_prepare (plugin->dbh, "get",
179 "SELECT " RESULT_COLUMNS " FROM gn090 "
180 "WHERE oid >= $1::bigint AND "
181 "(rvalue >= $2 OR 0 = $3::smallint) AND "
182 "(hash = $4 OR 0 = $5::smallint) AND "
183 "(type = $6 OR 0 = $7::smallint) "
184 "ORDER BY oid ASC LIMIT 1", 7)) ||
185 (GNUNET_OK !=
186 GNUNET_POSTGRES_prepare (plugin->dbh, "put",
187 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
188 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
189 (GNUNET_OK !=
190 GNUNET_POSTGRES_prepare (plugin->dbh, "update",
191 "UPDATE gn090 "
192 "SET prio = prio + $1, "
193 "repl = repl + $2, "
194 "expire = GREATEST(expire, $3) "
195 "WHERE hash = $4 AND vhash = $5", 5)) ||
196 (GNUNET_OK !=
197 GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
198 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
199 "WHERE oid = $1", 1)) ||
200 (GNUNET_OK !=
201 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
202 "SELECT " RESULT_COLUMNS " FROM gn090 "
203 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
204 "ORDER BY oid ASC LIMIT 1",
205 2)) ||
206 (GNUNET_OK !=
207 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
208 "(SELECT " RESULT_COLUMNS " FROM gn090 "
209 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
210 "UNION "
211 "(SELECT " RESULT_COLUMNS " FROM gn090 "
212 "ORDER BY prio ASC LIMIT 1) "
213 "ORDER BY expire ASC LIMIT 1",
214 1)) ||
215 (GNUNET_OK !=
216 GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
217 "SELECT " RESULT_COLUMNS " FROM gn090 "
218 "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
219 (GNUNET_OK !=
220 GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
221 (GNUNET_OK !=
222 GNUNET_POSTGRES_prepare (plugin->dbh, "remove", "DELETE FROM gn090 "
223 "WHERE hash = $1 AND "
224 "value = $2", 2)) ||
225 (GNUNET_OK !=
226 GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
227 { 171 {
228 PQfinish (plugin->dbh); 172 PQfinish (plugin->dbh);
229 plugin->dbh = NULL; 173 plugin->dbh = NULL;
@@ -241,44 +185,32 @@ init_connection (struct Plugin *plugin)
241 * @return number of bytes used on disk 185 * @return number of bytes used on disk
242 */ 186 */
243static void 187static void
244postgres_plugin_estimate_size (void *cls, unsigned long long *estimate) 188postgres_plugin_estimate_size (void *cls,
189 unsigned long long *estimate)
245{ 190{
246 struct Plugin *plugin = cls; 191 struct Plugin *plugin = cls;
247 unsigned long long total; 192 uint64_t total;
248 PGresult *ret; 193 struct GNUNET_PQ_QueryParam params[] = {
194 GNUNET_PQ_query_param_end
195 };
196 struct GNUNET_PQ_ResultSpec rs[] = {
197 GNUNET_PQ_result_spec_uint64 ("total",
198 &total),
199 GNUNET_PQ_result_spec_end
200 };
201 enum GNUNET_DB_QueryStatus ret;
249 202
250 if (NULL == estimate) 203 if (NULL == estimate)
251 return; 204 return;
252 ret = 205 ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
253 PQexecParams (plugin->dbh, 206 "estimate_size",
254 "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0, 207 params,
255 NULL, NULL, NULL, NULL, 1); 208 rs);
256 if (GNUNET_OK != 209 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
257 GNUNET_POSTGRES_check_result (plugin->dbh,
258 ret,
259 PGRES_TUPLES_OK,
260 "PQexecParams",
261 "get_size"))
262 {
263 *estimate = 0;
264 return;
265 }
266 if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
267 {
268 GNUNET_break (0);
269 PQclear (ret);
270 *estimate = 0;
271 return;
272 }
273 if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
274 { 210 {
275 GNUNET_break (0 == PQgetlength (ret, 0, 0)); 211 *estimate = 0LL;
276 PQclear (ret);
277 *estimate = 0;
278 return; 212 return;
279 } 213 }
280 total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
281 PQclear (ret);
282 *estimate = total; 214 *estimate = total;
283} 215}
284 216
@@ -315,13 +247,12 @@ postgres_plugin_put (void *cls,
315{ 247{
316 struct Plugin *plugin = cls; 248 struct Plugin *plugin = cls;
317 struct GNUNET_HashCode vhash; 249 struct GNUNET_HashCode vhash;
318 PGresult *ret; 250 enum GNUNET_DB_QueryStatus ret;
319 251
320 GNUNET_CRYPTO_hash (data, 252 GNUNET_CRYPTO_hash (data,
321 size, 253 size,
322 &vhash); 254 &vhash);
323 255 if (! absent)
324 if (!absent)
325 { 256 {
326 struct GNUNET_PQ_QueryParam params[] = { 257 struct GNUNET_PQ_QueryParam params[] = {
327 GNUNET_PQ_query_param_uint32 (&priority), 258 GNUNET_PQ_query_param_uint32 (&priority),
@@ -331,15 +262,10 @@ postgres_plugin_put (void *cls,
331 GNUNET_PQ_query_param_auto_from_type (&vhash), 262 GNUNET_PQ_query_param_auto_from_type (&vhash),
332 GNUNET_PQ_query_param_end 263 GNUNET_PQ_query_param_end
333 }; 264 };
334 ret = GNUNET_PQ_exec_prepared (plugin->dbh, 265 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
335 "update", 266 "update",
336 params); 267 params);
337 if (GNUNET_OK != 268 if (0 > ret)
338 GNUNET_POSTGRES_check_result (plugin->dbh,
339 ret,
340 PGRES_COMMAND_OK,
341 "PQexecPrepared",
342 "update"))
343 { 269 {
344 cont (cont_cls, 270 cont (cont_cls,
345 key, 271 key,
@@ -348,9 +274,7 @@ postgres_plugin_put (void *cls,
348 _("Postgress exec failure")); 274 _("Postgress exec failure"));
349 return; 275 return;
350 } 276 }
351 /* What an awful API, this function really does return a string */ 277 bool affected = (0 != ret);
352 bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
353 PQclear (ret);
354 if (affected) 278 if (affected)
355 { 279 {
356 cont (cont_cls, 280 cont (cont_cls,
@@ -362,177 +286,195 @@ postgres_plugin_put (void *cls,
362 } 286 }
363 } 287 }
364 288
365 uint32_t utype = type;
366 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
367 UINT64_MAX);
368 struct GNUNET_PQ_QueryParam params[] = {
369 GNUNET_PQ_query_param_uint32 (&replication),
370 GNUNET_PQ_query_param_uint32 (&utype),
371 GNUNET_PQ_query_param_uint32 (&priority),
372 GNUNET_PQ_query_param_uint32 (&anonymity),
373 GNUNET_PQ_query_param_absolute_time (&expiration),
374 GNUNET_PQ_query_param_uint64 (&rvalue),
375 GNUNET_PQ_query_param_auto_from_type (key),
376 GNUNET_PQ_query_param_auto_from_type (&vhash),
377 GNUNET_PQ_query_param_fixed_size (data, size),
378 GNUNET_PQ_query_param_end
379 };
380
381 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
382 "put",
383 params);
384 if (GNUNET_OK !=
385 GNUNET_POSTGRES_check_result (plugin->dbh,
386 ret,
387 PGRES_COMMAND_OK,
388 "PQexecPrepared", "put"))
389 { 289 {
390 cont (cont_cls, key, size, 290 uint32_t utype = (uint32_t) type;
391 GNUNET_SYSERR, 291 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
392 _("Postgress exec failure")); 292 UINT64_MAX);
393 return; 293 struct GNUNET_PQ_QueryParam params[] = {
294 GNUNET_PQ_query_param_uint32 (&replication),
295 GNUNET_PQ_query_param_uint32 (&utype),
296 GNUNET_PQ_query_param_uint32 (&priority),
297 GNUNET_PQ_query_param_uint32 (&anonymity),
298 GNUNET_PQ_query_param_absolute_time (&expiration),
299 GNUNET_PQ_query_param_uint64 (&rvalue),
300 GNUNET_PQ_query_param_auto_from_type (key),
301 GNUNET_PQ_query_param_auto_from_type (&vhash),
302 GNUNET_PQ_query_param_fixed_size (data, size),
303 GNUNET_PQ_query_param_end
304 };
305
306 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
307 "put",
308 params);
309 if (0 > ret)
310 {
311 cont (cont_cls,
312 key,
313 size,
314 GNUNET_SYSERR,
315 "Postgress exec failure");
316 return;
317 }
394 } 318 }
395 PQclear (ret);
396 plugin->env->duc (plugin->env->cls, 319 plugin->env->duc (plugin->env->cls,
397 size + GNUNET_DATASTORE_ENTRY_OVERHEAD); 320 size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
398 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 321 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
399 "datastore-postgres", 322 "datastore-postgres",
400 "Stored %u bytes in database\n", 323 "Stored %u bytes in database\n",
401 (unsigned int) size); 324 (unsigned int) size);
402 cont (cont_cls, key, size, GNUNET_OK, NULL); 325 cont (cont_cls,
326 key,
327 size,
328 GNUNET_OK,
329 NULL);
403} 330}
404 331
405 332
406/** 333/**
407 * Function invoked to process the result and call the processor. 334 * Closure for #process_result.
335 */
336struct ProcessResultContext
337{
338
339 /**
340 * The plugin handle.
341 */
342 struct Plugin *plugin;
343
344 /**
345 * Function to call on each result.
346 */
347 PluginDatumProcessor proc;
348
349 /**
350 * Closure for @e proc.
351 */
352 void *proc_cls;
353
354};
355
356
357/**
358 * Function invoked to process the result and call the processor of @a
359 * cls.
408 * 360 *
409 * @param plugin global plugin data 361 * @param cls our `struct ProcessResultContext`
410 * @param proc function to call the value (once only).
411 * @param proc_cls closure for proc
412 * @param res result from exec 362 * @param res result from exec
413 * @param filename filename for error messages 363 * @param num_results number of results in @a res
414 * @param line line number for error messages
415 */ 364 */
416static void 365static void
417process_result (struct Plugin *plugin, 366process_result (void *cls,
418 PluginDatumProcessor proc, 367 PGresult *res,
419 void *proc_cls, 368 unsigned int num_results)
420 PGresult * res,
421 const char *filename, int line)
422{ 369{
423 int iret; 370 struct ProcessResultContext *prc = cls;
424 uint32_t rowid; 371 struct Plugin *plugin = prc->plugin;
425 uint32_t utype;
426 uint32_t anonymity;
427 uint32_t replication;
428 uint32_t priority;
429 size_t size;
430 void *data;
431 struct GNUNET_TIME_Absolute expiration_time;
432 struct GNUNET_HashCode key;
433 struct GNUNET_PQ_ResultSpec rs[] = {
434 GNUNET_PQ_result_spec_uint32 ("repl", &replication),
435 GNUNET_PQ_result_spec_uint32 ("type", &utype),
436 GNUNET_PQ_result_spec_uint32 ("prio", &priority),
437 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
438 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
439 GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
440 GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
441 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
442 GNUNET_PQ_result_spec_end
443 };
444 372
445 if (GNUNET_OK != 373 if (0 == num_results)
446 GNUNET_POSTGRES_check_result_ (plugin->dbh,
447 res,
448 PGRES_TUPLES_OK,
449 "PQexecPrepared",
450 "select",
451 filename, line))
452 {
453 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
454 "datastore-postgres",
455 "Ending iteration (postgres error)\n");
456 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
457 return;
458 }
459
460 if (0 == PQntuples (res))
461 { 374 {
462 /* no result */ 375 /* no result */
463 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 376 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
464 "datastore-postgres", 377 "datastore-postgres",
465 "Ending iteration (no more results)\n"); 378 "Ending iteration (no more results)\n");
466 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 379 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
467 PQclear (res); 380 GNUNET_TIME_UNIT_ZERO_ABS, 0);
468 return; 381 return;
469 } 382 }
470 if (1 != PQntuples (res)) 383 if (1 != num_results)
471 { 384 {
472 GNUNET_break (0); 385 GNUNET_break (0);
473 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 386 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
474 PQclear (res); 387 GNUNET_TIME_UNIT_ZERO_ABS, 0);
475 return; 388 return;
476 } 389 }
477 if (GNUNET_OK != 390 /* Technically we don't need the loop here, but nicer in case
478 GNUNET_PQ_extract_result (res, 391 we ever relax the condition above. */
479 rs, 392 for (unsigned int i=0;i<num_results;i++)
480 0))
481 { 393 {
482 GNUNET_break (0); 394 int iret;
483 PQclear (res); 395 uint32_t rowid;
484 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, 396 uint32_t utype;
485 "delrow", 397 uint32_t anonymity;
486 rowid); 398 uint32_t replication;
487 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 399 uint32_t priority;
488 return; 400 size_t size;
489 } 401 void *data;
402 struct GNUNET_TIME_Absolute expiration_time;
403 struct GNUNET_HashCode key;
404 struct GNUNET_PQ_ResultSpec rs[] = {
405 GNUNET_PQ_result_spec_uint32 ("repl", &replication),
406 GNUNET_PQ_result_spec_uint32 ("type", &utype),
407 GNUNET_PQ_result_spec_uint32 ("prio", &priority),
408 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
409 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
410 GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
411 GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
412 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
413 GNUNET_PQ_result_spec_end
414 };
490 415
491 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 416 if (GNUNET_OK !=
492 "datastore-postgres", 417 GNUNET_PQ_extract_result (res,
493 "Found result of size %u bytes and type %u in database\n", 418 rs,
494 (unsigned int) size, 419 i))
495 (unsigned int) utype);
496 iret = proc (proc_cls,
497 &key,
498 size,
499 data,
500 (enum GNUNET_BLOCK_Type) utype,
501 priority,
502 anonymity,
503 replication,
504 expiration_time,
505 rowid);
506 PQclear (res);
507 if (iret == GNUNET_NO)
508 {
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510 "Processor asked for item %u to be removed.\n",
511 (unsigned int) rowid);
512 if (GNUNET_OK ==
513 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
514 "delrow",
515 rowid))
516 { 420 {
517 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 421 GNUNET_break (0);
518 "datastore-postgres", 422 prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
519 "Deleting %u bytes from database\n", 423 GNUNET_TIME_UNIT_ZERO_ABS, 0);
520 (unsigned int) size); 424 return;
521 plugin->env->duc (plugin->env->cls,
522 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
523 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
524 "datastore-postgres",
525 "Deleted %u bytes from database\n",
526 (unsigned int) size);
527 } 425 }
528 } 426
427 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
428 "datastore-postgres",
429 "Found result of size %u bytes and type %u in database\n",
430 (unsigned int) size,
431 (unsigned int) utype);
432 iret = prc->proc (prc->proc_cls,
433 &key,
434 size,
435 data,
436 (enum GNUNET_BLOCK_Type) utype,
437 priority,
438 anonymity,
439 replication,
440 expiration_time,
441 rowid);
442 if (iret == GNUNET_NO)
443 {
444 struct GNUNET_PQ_QueryParam param[] = {
445 GNUNET_PQ_query_param_uint32 (&rowid),
446 GNUNET_PQ_query_param_end
447 };
448
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450 "Processor asked for item %u to be removed.\n",
451 (unsigned int) rowid);
452 if (0 <
453 GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
454 "delrow",
455 param))
456 {
457 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
458 "datastore-postgres",
459 "Deleting %u bytes from database\n",
460 (unsigned int) size);
461 plugin->env->duc (plugin->env->cls,
462 - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
463 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
464 "datastore-postgres",
465 "Deleted %u bytes from database\n",
466 (unsigned int) size);
467 }
468 }
469 GNUNET_PQ_cleanup_result (rs);
470 } /* for (i) */
529} 471}
530 472
531 473
532/** 474/**
533 * Get one of the results for a particular key in the datastore. 475 * Get one of the results for a particular key in the datastore.
534 * 476 *
535 * @param cls closure with the 'struct Plugin' 477 * @param cls closure with the `struct Plugin`
536 * @param next_uid return the result with lowest uid >= next_uid 478 * @param next_uid return the result with lowest uid >= next_uid
537 * @param random if true, return a random result instead of using next_uid 479 * @param random if true, return a random result instead of using next_uid
538 * @param key maybe NULL (to match all entries) 480 * @param key maybe NULL (to match all entries)
@@ -567,7 +509,8 @@ postgres_plugin_get_key (void *cls,
567 GNUNET_PQ_query_param_uint16 (&use_type), 509 GNUNET_PQ_query_param_uint16 (&use_type),
568 GNUNET_PQ_query_param_end 510 GNUNET_PQ_query_param_end
569 }; 511 };
570 PGresult *ret; 512 struct ProcessResultContext prc;
513 enum GNUNET_DB_QueryStatus res;
571 514
572 if (random) 515 if (random)
573 { 516 {
@@ -576,16 +519,21 @@ postgres_plugin_get_key (void *cls,
576 next_uid = 0; 519 next_uid = 0;
577 } 520 }
578 else 521 else
522 {
579 rvalue = 0; 523 rvalue = 0;
580 524 }
581 ret = GNUNET_PQ_exec_prepared (plugin->dbh, 525 prc.plugin = plugin;
582 "get", 526 prc.proc = proc;
583 params); 527 prc.proc_cls = proc_cls;
584 process_result (plugin, 528
585 proc, 529 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
586 proc_cls, 530 "get",
587 ret, 531 params,
588 __FILE__, __LINE__); 532 &process_result,
533 &prc);
534 if (0 > res)
535 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
536 GNUNET_TIME_UNIT_ZERO_ABS, 0);
589} 537}
590 538
591 539
@@ -615,16 +563,20 @@ postgres_plugin_get_zero_anonymity (void *cls,
615 GNUNET_PQ_query_param_uint64 (&next_uid), 563 GNUNET_PQ_query_param_uint64 (&next_uid),
616 GNUNET_PQ_query_param_end 564 GNUNET_PQ_query_param_end
617 }; 565 };
618 PGresult *ret; 566 struct ProcessResultContext prc;
619 567 enum GNUNET_DB_QueryStatus res;
620 ret = GNUNET_PQ_exec_prepared (plugin->dbh, 568
621 "select_non_anonymous", 569 prc.plugin = plugin;
622 params); 570 prc.proc = proc;
623 571 prc.proc_cls = proc_cls;
624 process_result (plugin, 572 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
625 proc, proc_cls, 573 "select_non_anonymous",
626 ret, 574 params,
627 __FILE__, __LINE__); 575 &process_result,
576 &prc);
577 if (0 > res)
578 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
579 GNUNET_TIME_UNIT_ZERO_ABS, 0);
628} 580}
629 581
630 582
@@ -692,7 +644,7 @@ repl_proc (void *cls,
692 GNUNET_PQ_query_param_uint32 (&oid), 644 GNUNET_PQ_query_param_uint32 (&oid),
693 GNUNET_PQ_query_param_end 645 GNUNET_PQ_query_param_end
694 }; 646 };
695 PGresult *qret; 647 enum GNUNET_DB_QueryStatus qret;
696 648
697 ret = rc->proc (rc->proc_cls, 649 ret = rc->proc (rc->proc_cls,
698 key, 650 key,
@@ -706,17 +658,11 @@ repl_proc (void *cls,
706 uid); 658 uid);
707 if (NULL == key) 659 if (NULL == key)
708 return ret; 660 return ret;
709 qret = GNUNET_PQ_exec_prepared (plugin->dbh, 661 qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
710 "decrepl", 662 "decrepl",
711 params); 663 params);
712 if (GNUNET_OK != 664 if (0 > qret)
713 GNUNET_POSTGRES_check_result (plugin->dbh,
714 qret,
715 PGRES_COMMAND_OK,
716 "PQexecPrepared",
717 "decrepl"))
718 return GNUNET_SYSERR; 665 return GNUNET_SYSERR;
719 PQclear (qret);
720 return ret; 666 return ret;
721} 667}
722 668
@@ -738,20 +684,27 @@ postgres_plugin_get_replication (void *cls,
738 void *proc_cls) 684 void *proc_cls)
739{ 685{
740 struct Plugin *plugin = cls; 686 struct Plugin *plugin = cls;
687 struct GNUNET_PQ_QueryParam params[] = {
688 GNUNET_PQ_query_param_end
689 };
741 struct ReplCtx rc; 690 struct ReplCtx rc;
742 PGresult *ret; 691 struct ProcessResultContext prc;
692 enum GNUNET_DB_QueryStatus res;
743 693
744 rc.plugin = plugin; 694 rc.plugin = plugin;
745 rc.proc = proc; 695 rc.proc = proc;
746 rc.proc_cls = proc_cls; 696 rc.proc_cls = proc_cls;
747 ret = PQexecPrepared (plugin->dbh, 697 prc.plugin = plugin;
748 "select_replication_order", 0, NULL, NULL, 698 prc.proc = &repl_proc;
749 NULL, 1); 699 prc.proc_cls = &rc;
750 process_result (plugin, 700 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
751 &repl_proc, 701 "select_replication_order",
752 &rc, 702 params,
753 ret, 703 &process_result,
754 __FILE__, __LINE__); 704 &prc);
705 if (0 > res)
706 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
707 GNUNET_TIME_UNIT_ZERO_ABS, 0);
755} 708}
756 709
757 710
@@ -774,16 +727,75 @@ postgres_plugin_get_expiration (void *cls,
774 GNUNET_PQ_query_param_absolute_time (&now), 727 GNUNET_PQ_query_param_absolute_time (&now),
775 GNUNET_PQ_query_param_end 728 GNUNET_PQ_query_param_end
776 }; 729 };
777 PGresult *ret; 730 struct ProcessResultContext prc;
778 731
779 now = GNUNET_TIME_absolute_get (); 732 now = GNUNET_TIME_absolute_get ();
780 ret = GNUNET_PQ_exec_prepared (plugin->dbh, 733 prc.plugin = plugin;
781 "select_expiration_order", 734 prc.proc = proc;
782 params); 735 prc.proc_cls = proc_cls;
783 process_result (plugin, 736 (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
784 proc, proc_cls, 737 "select_expiration_order",
785 ret, 738 params,
786 __FILE__, __LINE__); 739 &process_result,
740 &prc);
741}
742
743
744/**
745 * Closure for #process_keys.
746 */
747struct ProcessKeysContext
748{
749
750 /**
751 * Function to call for each key.
752 */
753 PluginKeyProcessor proc;
754
755 /**
756 * Closure for @e proc.
757 */
758 void *proc_cls;
759};
760
761
762/**
763 * Function to be called with the results of a SELECT statement
764 * that has returned @a num_results results.
765 *
766 * @param cls closure with a `struct ProcessKeysContext`
767 * @param result the postgres result
768 * @param num_result the number of results in @a result
769 */
770static void
771process_keys (void *cls,
772 PGresult *result,
773 unsigned int num_results)
774{
775 struct ProcessKeysContext *pkc = cls;
776
777 for (unsigned i=0;i<num_results;i++)
778 {
779 struct GNUNET_HashCode key;
780 struct GNUNET_PQ_ResultSpec rs[] = {
781 GNUNET_PQ_result_spec_auto_from_type ("hash",
782 &key),
783 GNUNET_PQ_result_spec_end
784 };
785
786 if (GNUNET_OK !=
787 GNUNET_PQ_extract_result (result,
788 rs,
789 i))
790 {
791 GNUNET_break (0);
792 continue;
793 }
794 pkc->proc (pkc->proc_cls,
795 &key,
796 1);
797 GNUNET_PQ_cleanup_result (rs);
798 }
787} 799}
788 800
789 801
@@ -800,28 +812,21 @@ postgres_plugin_get_keys (void *cls,
800 void *proc_cls) 812 void *proc_cls)
801{ 813{
802 struct Plugin *plugin = cls; 814 struct Plugin *plugin = cls;
803 int ret; 815 struct GNUNET_PQ_QueryParam params[] = {
804 int i; 816 GNUNET_PQ_query_param_end
805 struct GNUNET_HashCode key; 817 };
806 PGresult * res; 818 struct ProcessKeysContext pkc;
807 819
808 res = PQexecPrepared (plugin->dbh, 820 pkc.proc = proc;
809 "get_keys", 821 pkc.proc_cls = proc_cls;
810 0, NULL, NULL, NULL, 1); 822 (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
811 ret = PQntuples (res); 823 "get_keys",
812 for (i=0;i<ret;i++) 824 params,
813 { 825 &process_keys,
814 if (sizeof (struct GNUNET_HashCode) != 826 &pkc);
815 PQgetlength (res, i, 0)) 827 proc (proc_cls,
816 { 828 NULL,
817 GNUNET_memcpy (&key, 829 0);
818 PQgetvalue (res, i, 0),
819 sizeof (struct GNUNET_HashCode));
820 proc (proc_cls, &key, 1);
821 }
822 }
823 PQclear (res);
824 proc (proc_cls, NULL, 0);
825} 830}
826 831
827 832
@@ -834,10 +839,14 @@ static void
834postgres_plugin_drop (void *cls) 839postgres_plugin_drop (void *cls)
835{ 840{
836 struct Plugin *plugin = cls; 841 struct Plugin *plugin = cls;
842 struct GNUNET_PQ_ExecuteStatement es[] = {
843 GNUNET_PQ_make_execute ("DROP TABLE gn090"),
844 GNUNET_PQ_EXECUTE_STATEMENT_END
845 };
837 846
838 if (GNUNET_OK != 847 if (GNUNET_OK !=
839 GNUNET_POSTGRES_exec (plugin->dbh, 848 GNUNET_PQ_exec_statements (plugin->dbh,
840 "DROP TABLE gn090")) 849 es))
841 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 850 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
842 "postgres", 851 "postgres",
843 _("Failed to drop table from database.\n")); 852 _("Failed to drop table from database.\n"));
@@ -863,21 +872,17 @@ postgres_plugin_remove_key (void *cls,
863 void *cont_cls) 872 void *cont_cls)
864{ 873{
865 struct Plugin *plugin = cls; 874 struct Plugin *plugin = cls;
866 PGresult *ret; 875 enum GNUNET_DB_QueryStatus ret;
867 struct GNUNET_PQ_QueryParam params[] = { 876 struct GNUNET_PQ_QueryParam params[] = {
868 GNUNET_PQ_query_param_auto_from_type (key), 877 GNUNET_PQ_query_param_auto_from_type (key),
869 GNUNET_PQ_query_param_fixed_size (data, size), 878 GNUNET_PQ_query_param_fixed_size (data, size),
870 GNUNET_PQ_query_param_end 879 GNUNET_PQ_query_param_end
871 }; 880 };
872 ret = GNUNET_PQ_exec_prepared (plugin->dbh, 881
873 "remove", 882 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
874 params); 883 "remove",
875 if (GNUNET_OK != 884 params);
876 GNUNET_POSTGRES_check_result (plugin->dbh, 885 if (0 > ret)
877 ret,
878 PGRES_COMMAND_OK,
879 "PQexecPrepared",
880 "remove"))
881 { 886 {
882 cont (cont_cls, 887 cont (cont_cls,
883 key, 888 key,
@@ -886,10 +891,7 @@ postgres_plugin_remove_key (void *cls,
886 _("Postgress exec failure")); 891 _("Postgress exec failure"));
887 return; 892 return;
888 } 893 }
889 /* What an awful API, this function really does return a string */ 894 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
890 bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
891 PQclear (ret);
892 if (!affected)
893 { 895 {
894 cont (cont_cls, 896 cont (cont_cls,
895 key, 897 key,