aboutsummaryrefslogtreecommitdiff
path: root/src/datacache/plugin_datacache_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datacache/plugin_datacache_postgres.c')
-rw-r--r--src/datacache/plugin_datacache_postgres.c750
1 files changed, 369 insertions, 381 deletions
diff --git a/src/datacache/plugin_datacache_postgres.c b/src/datacache/plugin_datacache_postgres.c
index 13c2c26a2..5c497cdf8 100644
--- a/src/datacache/plugin_datacache_postgres.c
+++ b/src/datacache/plugin_datacache_postgres.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2006, 2009, 2010, 2012, 2015 GNUnet e.V. 3 Copyright (C) 2006, 2009, 2010, 2012, 2015, 2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -25,7 +25,7 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_postgres_lib.h" 28#include "gnunet_pq_lib.h"
29#include "gnunet_datacache_plugin.h" 29#include "gnunet_datacache_plugin.h"
30 30
31#define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__) 31#define LOG(kind,...) GNUNET_log_from (kind, "datacache-postgres", __VA_ARGS__)
@@ -66,115 +66,67 @@ struct Plugin
66static int 66static int
67init_connection (struct Plugin *plugin) 67init_connection (struct Plugin *plugin)
68{ 68{
69 PGresult *ret; 69 struct GNUNET_PQ_ExecuteStatement es[] = {
70 GNUNET_PQ_make_execute ("CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
71 " type INTEGER NOT NULL DEFAULT 0,"
72 " discard_time BIGINT NOT NULL DEFAULT 0,"
73 " key BYTEA NOT NULL DEFAULT '',"
74 " value BYTEA NOT NULL DEFAULT '',"
75 " path BYTEA DEFAULT '')"
76 "WITH OIDS"),
77 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)"),
78 GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)"),
79 GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL"),
80 GNUNET_PQ_make_execute ("ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN"),
81 GNUNET_PQ_EXECUTE_STATEMENT_END
82 };
83 struct GNUNET_PQ_PreparedStatement ps[] = {
84 GNUNET_PQ_make_prepare ("getkt",
85 "SELECT discard_time,type,value,path FROM gn090dc "
86 "WHERE key=$1 AND type=$2",
87 2),
88 GNUNET_PQ_make_prepare ("getk",
89 "SELECT discard_time,type,value,path FROM gn090dc "
90 "WHERE key=$1",
91 1),
92 GNUNET_PQ_make_prepare ("getm",
93 "SELECT length(value) AS len,oid,key FROM gn090dc "
94 "ORDER BY discard_time ASC LIMIT 1",
95 0),
96 GNUNET_PQ_make_prepare ("get_random",
97 "SELECT discard_time,type,value,path,key FROM gn090dc "
98 "ORDER BY key ASC LIMIT 1 OFFSET $1",
99 1),
100 GNUNET_PQ_make_prepare ("get_closest",
101 "SELECT discard_time,type,value,path,key FROM gn090dc "
102 "WHERE key>=$1 ORDER BY key ASC LIMIT $2",
103 1),
104 GNUNET_PQ_make_prepare ("delrow",
105 "DELETE FROM gn090dc WHERE oid=$1",
106 1),
107 GNUNET_PQ_make_prepare ("put",
108 "INSERT INTO gn090dc (type, discard_time, key, value, path) "
109 "VALUES ($1, $2, $3, $4, $5)",
110 5),
111 GNUNET_PQ_PREPARED_STATEMENT_END
112 };
70 113
71 plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, 114 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
72 "datacache-postgres"); 115 "datacache-postgres");
73 if (NULL == plugin->dbh) 116 if (NULL == plugin->dbh)
74 return GNUNET_SYSERR; 117 return GNUNET_SYSERR;
75 ret =
76 PQexec (plugin->dbh,
77 "CREATE TEMPORARY TABLE IF NOT EXISTS gn090dc ("
78 " type INTEGER NOT NULL DEFAULT 0,"
79 " discard_time BIGINT NOT NULL DEFAULT 0,"
80 " key BYTEA NOT NULL DEFAULT '',"
81 " value BYTEA NOT NULL DEFAULT '',"
82 " path BYTEA DEFAULT '')"
83 "WITH OIDS");
84 if ( (ret == NULL) ||
85 ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
86 (0 != strcmp ("42P07", /* duplicate table */
87 PQresultErrorField
88 (ret,
89 PG_DIAG_SQLSTATE)))))
90 {
91 (void) GNUNET_POSTGRES_check_result (plugin->dbh, ret,
92 PGRES_COMMAND_OK,
93 "CREATE TABLE",
94 "gn090dc");
95 PQfinish (plugin->dbh);
96 plugin->dbh = NULL;
97 return GNUNET_SYSERR;
98 }
99 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
100 {
101 if ((GNUNET_OK !=
102 GNUNET_POSTGRES_exec (plugin->dbh,
103 "CREATE INDEX IF NOT EXISTS idx_key ON gn090dc (key)")) ||
104 (GNUNET_OK !=
105 GNUNET_POSTGRES_exec (plugin->dbh,
106 "CREATE INDEX IF NOT EXISTS idx_dt ON gn090dc (discard_time)")))
107 {
108 PQclear (ret);
109 PQfinish (plugin->dbh);
110 plugin->dbh = NULL;
111 return GNUNET_SYSERR;
112 }
113 }
114 PQclear (ret);
115 ret =
116 PQexec (plugin->dbh,
117 "ALTER TABLE gn090dc ALTER value SET STORAGE EXTERNAL");
118 if (GNUNET_OK != 118 if (GNUNET_OK !=
119 GNUNET_POSTGRES_check_result (plugin->dbh, 119 GNUNET_PQ_exec_statements (plugin->dbh,
120 ret, 120 es))
121 PGRES_COMMAND_OK,
122 "ALTER TABLE",
123 "gn090dc"))
124 { 121 {
125 PQfinish (plugin->dbh); 122 PQfinish (plugin->dbh);
126 plugin->dbh = NULL; 123 plugin->dbh = NULL;
127 return GNUNET_SYSERR; 124 return GNUNET_SYSERR;
128 } 125 }
129 PQclear (ret); 126
130 ret = PQexec (plugin->dbh,
131 "ALTER TABLE gn090dc ALTER key SET STORAGE PLAIN");
132 if (GNUNET_OK != 127 if (GNUNET_OK !=
133 GNUNET_POSTGRES_check_result (plugin->dbh, 128 GNUNET_PQ_prepare_statements (plugin->dbh,
134 ret, 129 ps))
135 PGRES_COMMAND_OK,
136 "ALTER TABLE",
137 "gn090dc"))
138 {
139 PQfinish (plugin->dbh);
140 plugin->dbh = NULL;
141 return GNUNET_SYSERR;
142 }
143 PQclear (ret);
144 if ((GNUNET_OK !=
145 GNUNET_POSTGRES_prepare (plugin->dbh,
146 "getkt",
147 "SELECT discard_time,type,value,path FROM gn090dc "
148 "WHERE key=$1 AND type=$2 ", 2)) ||
149 (GNUNET_OK !=
150 GNUNET_POSTGRES_prepare (plugin->dbh,
151 "getk",
152 "SELECT discard_time,type,value,path FROM gn090dc "
153 "WHERE key=$1", 1)) ||
154 (GNUNET_OK !=
155 GNUNET_POSTGRES_prepare (plugin->dbh,
156 "getm",
157 "SELECT length(value),oid,key FROM gn090dc "
158 "ORDER BY discard_time ASC LIMIT 1", 0)) ||
159 (GNUNET_OK !=
160 GNUNET_POSTGRES_prepare (plugin->dbh,
161 "get_random",
162 "SELECT discard_time,type,value,path,key FROM gn090dc "
163 "ORDER BY key ASC LIMIT 1 OFFSET $1", 1)) ||
164 (GNUNET_OK !=
165 GNUNET_POSTGRES_prepare (plugin->dbh,
166 "get_closest",
167 "SELECT discard_time,type,value,path,key FROM gn090dc "
168 "WHERE key>=$1 ORDER BY key ASC LIMIT $2", 1)) ||
169 (GNUNET_OK !=
170 GNUNET_POSTGRES_prepare (plugin->dbh,
171 "delrow",
172 "DELETE FROM gn090dc WHERE oid=$1", 1)) ||
173 (GNUNET_OK !=
174 GNUNET_POSTGRES_prepare (plugin->dbh,
175 "put",
176 "INSERT INTO gn090dc (type, discard_time, key, value, path) "
177 "VALUES ($1, $2, $3, $4, $5)", 5)))
178 { 130 {
179 PQfinish (plugin->dbh); 131 PQfinish (plugin->dbh);
180 plugin->dbh = NULL; 132 plugin->dbh = NULL;
@@ -189,7 +141,7 @@ init_connection (struct Plugin *plugin)
189 * 141 *
190 * @param cls closure (our `struct Plugin`) 142 * @param cls closure (our `struct Plugin`)
191 * @param key key to store @a data under 143 * @param key key to store @a data under
192 * @param size number of bytes in @a data 144 * @param data_size number of bytes in @a data
193 * @param data data to store 145 * @param data data to store
194 * @param type type of the value 146 * @param type type of the value
195 * @param discard_time when to discard the value in any case 147 * @param discard_time when to discard the value in any case
@@ -200,7 +152,7 @@ init_connection (struct Plugin *plugin)
200static ssize_t 152static ssize_t
201postgres_plugin_put (void *cls, 153postgres_plugin_put (void *cls,
202 const struct GNUNET_HashCode *key, 154 const struct GNUNET_HashCode *key,
203 size_t size, 155 size_t data_size,
204 const char *data, 156 const char *data,
205 enum GNUNET_BLOCK_Type type, 157 enum GNUNET_BLOCK_Type type,
206 struct GNUNET_TIME_Absolute discard_time, 158 struct GNUNET_TIME_Absolute discard_time,
@@ -208,36 +160,125 @@ postgres_plugin_put (void *cls,
208 const struct GNUNET_PeerIdentity *path_info) 160 const struct GNUNET_PeerIdentity *path_info)
209{ 161{
210 struct Plugin *plugin = cls; 162 struct Plugin *plugin = cls;
211 PGresult *ret; 163 uint32_t type32 = (uint32_t) type;
212 uint32_t btype = htonl (type); 164 struct GNUNET_PQ_QueryParam params[] = {
213 uint64_t bexpi = GNUNET_TIME_absolute_hton (discard_time).abs_value_us__; 165 GNUNET_PQ_query_param_uint32 (&type32),
214 166 GNUNET_PQ_query_param_absolute_time (&discard_time),
215 const char *paramValues[] = { 167 GNUNET_PQ_query_param_auto_from_type (key),
216 (const char *) &btype, 168 GNUNET_PQ_query_param_fixed_size (data, data_size),
217 (const char *) &bexpi, 169 GNUNET_PQ_query_param_fixed_size (path_info,
218 (const char *) key, 170 path_info_len * sizeof (struct GNUNET_PeerIdentity)),
219 (const char *) data, 171 GNUNET_PQ_query_param_end
220 (const char *) path_info
221 };
222 int paramLengths[] = {
223 sizeof (btype),
224 sizeof (bexpi),
225 sizeof (struct GNUNET_HashCode),
226 size,
227 path_info_len * sizeof (struct GNUNET_PeerIdentity)
228 }; 172 };
229 const int paramFormats[] = { 1, 1, 1, 1, 1 }; 173 enum GNUNET_DB_QueryStatus ret;
230 174
231 ret = 175 ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
232 PQexecPrepared (plugin->dbh, "put", 5, paramValues, paramLengths, 176 "put",
233 paramFormats, 1); 177 params);
234 if (GNUNET_OK != 178 if (0 > ret)
235 GNUNET_POSTGRES_check_result (plugin->dbh, ret,
236 PGRES_COMMAND_OK, "PQexecPrepared", "put"))
237 return -1; 179 return -1;
238 plugin->num_items++; 180 plugin->num_items++;
239 PQclear (ret); 181 return data_size + OVERHEAD;
240 return size + OVERHEAD; 182}
183
184
185/**
186 * Closure for #handle_results.
187 */
188struct HandleResultContext
189{
190
191 /**
192 * Function to call on each result, may be NULL.
193 */
194 GNUNET_DATACACHE_Iterator iter;
195
196 /**
197 * Closure for @e iter.
198 */
199 void *iter_cls;
200
201 /**
202 * Key used.
203 */
204 const struct GNUNET_HashCode *key;
205};
206
207
208/**
209 * Function to be called with the results of a SELECT statement
210 * that has returned @a num_results results. Parse the result
211 * and call the callback given in @a cls
212 *
213 * @param cls closure of type `struct HandleResultContext`
214 * @param result the postgres result
215 * @param num_result the number of results in @a result
216 */
217static void
218handle_results (void *cls,
219 PGresult *result,
220 unsigned int num_results)
221{
222 struct HandleResultContext *hrc = cls;
223
224 for (unsigned int i=0;i<num_results;i++)
225 {
226 struct GNUNET_TIME_Absolute expiration_time;
227 uint32_t type;
228 void *data;
229 size_t data_size;
230 struct GNUNET_PeerIdentity *path;
231 size_t path_len;
232 struct GNUNET_PQ_ResultSpec rs[] = {
233 GNUNET_PQ_result_spec_absolute_time ("discard_time",
234 &expiration_time),
235 GNUNET_PQ_result_spec_uint32 ("type",
236 &type),
237 GNUNET_PQ_result_spec_variable_size ("value",
238 &data,
239 &data_size),
240 GNUNET_PQ_result_spec_variable_size ("path",
241 (void **) &path,
242 &path_len),
243 GNUNET_PQ_result_spec_end
244 };
245
246 if (GNUNET_YES !=
247 GNUNET_PQ_extract_result (result,
248 rs,
249 i))
250 {
251 GNUNET_break (0);
252 return;
253 }
254 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
255 {
256 GNUNET_break (0);
257 path_len = 0;
258 }
259 path_len %= sizeof (struct GNUNET_PeerIdentity);
260 LOG (GNUNET_ERROR_TYPE_DEBUG,
261 "Found result of size %u bytes and type %u in database\n",
262 (unsigned int) data_size,
263 (unsigned int) type);
264 if ( (NULL != hrc->iter) &&
265 (GNUNET_SYSERR ==
266 hrc->iter (hrc->iter_cls,
267 hrc->key,
268 data_size,
269 data,
270 (enum GNUNET_BLOCK_Type) type,
271 expiration_time,
272 path_len,
273 path)) )
274 {
275 LOG (GNUNET_ERROR_TYPE_DEBUG,
276 "Ending iteration (client error)\n");
277 GNUNET_PQ_cleanup_result (rs);
278 return;
279 }
280 GNUNET_PQ_cleanup_result (rs);
281 }
241} 282}
242 283
243 284
@@ -260,94 +301,30 @@ postgres_plugin_get (void *cls,
260 void *iter_cls) 301 void *iter_cls)
261{ 302{
262 struct Plugin *plugin = cls; 303 struct Plugin *plugin = cls;
263 uint32_t btype = htonl (type); 304 uint32_t type32 = (uint32_t) type;
264 305 struct GNUNET_PQ_QueryParam paramk[] = {
265 const char *paramValues[] = { 306 GNUNET_PQ_query_param_auto_from_type (key),
266 (const char *) key, 307 GNUNET_PQ_query_param_end
267 (const char *) &btype
268 }; 308 };
269 int paramLengths[] = { 309 struct GNUNET_PQ_QueryParam paramkt[] = {
270 sizeof (struct GNUNET_HashCode), 310 GNUNET_PQ_query_param_auto_from_type (key),
271 sizeof (btype) 311 GNUNET_PQ_query_param_uint32 (&type32),
312 GNUNET_PQ_query_param_end
272 }; 313 };
273 const int paramFormats[] = { 1, 1 }; 314 enum GNUNET_DB_QueryStatus res;
274 struct GNUNET_TIME_Absolute expiration_time; 315 struct HandleResultContext hr_ctx;
275 uint32_t size; 316
276 unsigned int cnt; 317 hr_ctx.iter = iter;
277 unsigned int i; 318 hr_ctx.iter_cls = iter_cls;
278 unsigned int path_len; 319 hr_ctx.key = key;
279 const struct GNUNET_PeerIdentity *path; 320 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
280 PGresult *res; 321 (0 == type) ? "getk" : "getkt",
281 322 (0 == type) ? paramk : paramkt,
282 res = 323 &handle_results,
283 PQexecPrepared (plugin->dbh, (type == 0) ? "getk" : "getkt", 324 &hr_ctx);
284 (type == 0) ? 1 : 2, paramValues, paramLengths, 325 if (res < 0)
285 paramFormats, 1);
286 if (GNUNET_OK !=
287 GNUNET_POSTGRES_check_result (plugin->dbh,
288 res,
289 PGRES_TUPLES_OK,
290 "PQexecPrepared",
291 (type == 0) ? "getk" : "getkt"))
292 {
293 LOG (GNUNET_ERROR_TYPE_DEBUG,
294 "Ending iteration (postgres error)\n");
295 return 0; 326 return 0;
296 } 327 return res;
297
298 if (0 == (cnt = PQntuples (res)))
299 {
300 /* no result */
301 LOG (GNUNET_ERROR_TYPE_DEBUG,
302 "Ending iteration (no more results)\n");
303 PQclear (res);
304 return 0;
305 }
306 if (iter == NULL)
307 {
308 PQclear (res);
309 return cnt;
310 }
311 if ( (4 != PQnfields (res)) ||
312 (sizeof (uint64_t) != PQfsize (res, 0)) ||
313 (sizeof (uint32_t) != PQfsize (res, 1)))
314 {
315 GNUNET_break (0);
316 PQclear (res);
317 return 0;
318 }
319 for (i = 0; i < cnt; i++)
320 {
321 expiration_time.abs_value_us =
322 GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
323 type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
324 size = PQgetlength (res, i, 2);
325 path_len = PQgetlength (res, i, 3);
326 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
327 {
328 GNUNET_break (0);
329 path_len = 0;
330 }
331 path_len %= sizeof (struct GNUNET_PeerIdentity);
332 path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
333 LOG (GNUNET_ERROR_TYPE_DEBUG,
334 "Found result of size %u bytes and type %u in database\n",
335 (unsigned int) size, (unsigned int) type);
336 if (GNUNET_SYSERR ==
337 iter (iter_cls, key, size, PQgetvalue (res, i, 2),
338 (enum GNUNET_BLOCK_Type) type,
339 expiration_time,
340 path_len,
341 path))
342 {
343 LOG (GNUNET_ERROR_TYPE_DEBUG,
344 "Ending iteration (client error)\n");
345 PQclear (res);
346 return cnt;
347 }
348 }
349 PQclear (res);
350 return cnt;
351} 328}
352 329
353 330
@@ -362,54 +339,53 @@ static int
362postgres_plugin_del (void *cls) 339postgres_plugin_del (void *cls)
363{ 340{
364 struct Plugin *plugin = cls; 341 struct Plugin *plugin = cls;
342 struct GNUNET_PQ_QueryParam pempty[] = {
343 GNUNET_PQ_query_param_end
344 };
365 uint32_t size; 345 uint32_t size;
366 uint32_t oid; 346 uint32_t oid;
367 struct GNUNET_HashCode key; 347 struct GNUNET_HashCode key;
368 PGresult *res; 348 struct GNUNET_PQ_ResultSpec rs[] = {
349 GNUNET_PQ_result_spec_uint32 ("len",
350 &size),
351 GNUNET_PQ_result_spec_uint32 ("oid",
352 &oid),
353 GNUNET_PQ_result_spec_auto_from_type ("key",
354 &key),
355 GNUNET_PQ_result_spec_end
356 };
357 enum GNUNET_DB_QueryStatus res;
358 struct GNUNET_PQ_QueryParam dparam[] = {
359 GNUNET_PQ_query_param_uint32 (&oid),
360 GNUNET_PQ_query_param_end
361 };
369 362
370 res = PQexecPrepared (plugin->dbh, 363 res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
371 "getm", 364 "getm",
372 0, NULL, NULL, NULL, 1); 365 pempty,
373 if (GNUNET_OK != 366 rs);
374 GNUNET_POSTGRES_check_result (plugin->dbh, 367 if (0 > res)
375 res, 368 return GNUNET_SYSERR;
376 PGRES_TUPLES_OK, 369 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
377 "PQexecPrepared",
378 "getm"))
379 {
380 LOG (GNUNET_ERROR_TYPE_DEBUG,
381 "Ending iteration (postgres error)\n");
382 return 0;
383 }
384 if (0 == PQntuples (res))
385 { 370 {
386 /* no result */ 371 /* no result */
387 LOG (GNUNET_ERROR_TYPE_DEBUG, 372 LOG (GNUNET_ERROR_TYPE_DEBUG,
388 "Ending iteration (no more results)\n"); 373 "Ending iteration (no more results)\n");
389 PQclear (res);
390 return GNUNET_SYSERR;
391 }
392 if ((3 != PQnfields (res)) || (sizeof (size) != PQfsize (res, 0)) ||
393 (sizeof (oid) != PQfsize (res, 1)) ||
394 (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 2)))
395 {
396 GNUNET_break (0);
397 PQclear (res);
398 return 0; 374 return 0;
399 } 375 }
400 size = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0)); 376 res = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
401 oid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1)); 377 "delrow",
402 GNUNET_memcpy (&key, PQgetvalue (res, 0, 2), sizeof (struct GNUNET_HashCode)); 378 dparam);
403 PQclear (res); 379 if (0 > res)
404 if (GNUNET_OK != 380 {
405 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, 381 GNUNET_PQ_cleanup_result (rs);
406 "delrow",
407 oid))
408 return GNUNET_SYSERR; 382 return GNUNET_SYSERR;
383 }
409 plugin->num_items--; 384 plugin->num_items--;
410 plugin->env->delete_notify (plugin->env->cls, 385 plugin->env->delete_notify (plugin->env->cls,
411 &key, 386 &key,
412 size + OVERHEAD); 387 size + OVERHEAD);
388 GNUNET_PQ_cleanup_result (rs);
413 return GNUNET_OK; 389 return GNUNET_OK;
414} 390}
415 391
@@ -428,22 +404,34 @@ postgres_plugin_get_random (void *cls,
428 void *iter_cls) 404 void *iter_cls)
429{ 405{
430 struct Plugin *plugin = cls; 406 struct Plugin *plugin = cls;
431 unsigned int off; 407 uint32_t off;
432 uint32_t off_be;
433 struct GNUNET_TIME_Absolute expiration_time; 408 struct GNUNET_TIME_Absolute expiration_time;
434 uint32_t size; 409 size_t data_size;
435 unsigned int path_len; 410 void *data;
436 const struct GNUNET_PeerIdentity *path; 411 size_t path_len;
437 const struct GNUNET_HashCode *key; 412 struct GNUNET_PeerIdentity *path;
438 unsigned int type; 413 struct GNUNET_HashCode key;
439 PGresult *res; 414 uint32_t type;
440 const char *paramValues[] = { 415 enum GNUNET_DB_QueryStatus res;
441 (const char *) &off_be 416 struct GNUNET_PQ_QueryParam params[] = {
417 GNUNET_PQ_query_param_uint32 (&off),
418 GNUNET_PQ_query_param_end
442 }; 419 };
443 int paramLengths[] = { 420 struct GNUNET_PQ_ResultSpec rs[] = {
444 sizeof (off_be) 421 GNUNET_PQ_result_spec_absolute_time ("discard_time",
422 &expiration_time),
423 GNUNET_PQ_result_spec_uint32 ("type",
424 &type),
425 GNUNET_PQ_result_spec_variable_size ("value",
426 &data,
427 &data_size),
428 GNUNET_PQ_result_spec_variable_size ("path",
429 (void **) &path,
430 &path_len),
431 GNUNET_PQ_result_spec_auto_from_type ("key",
432 &key),
433 GNUNET_PQ_result_spec_end
445 }; 434 };
446 const int paramFormats[] = { 1 };
447 435
448 if (0 == plugin->num_items) 436 if (0 == plugin->num_items)
449 return 0; 437 return 0;
@@ -451,67 +439,136 @@ postgres_plugin_get_random (void *cls,
451 return 1; 439 return 1;
452 off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 440 off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
453 plugin->num_items); 441 plugin->num_items);
454 off_be = htonl (off); 442 res = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
455 res = 443 "get_random",
456 PQexecPrepared (plugin->dbh, "get_random", 444 params,
457 1, paramValues, paramLengths, paramFormats, 445 rs);
458 1); 446 if (0 > res)
459 if (GNUNET_OK !=
460 GNUNET_POSTGRES_check_result (plugin->dbh,
461 res,
462 PGRES_TUPLES_OK,
463 "PQexecPrepared",
464 "get_random"))
465 { 447 {
466 GNUNET_break (0); 448 GNUNET_break (0);
467 return 0; 449 return 0;
468 } 450 }
469 if (0 == PQntuples (res)) 451 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
470 { 452 {
471 GNUNET_break (0); 453 GNUNET_break (0);
472 return 0; 454 return 0;
473 } 455 }
474 if ( (5 != PQnfields (res)) ||
475 (sizeof (uint64_t) != PQfsize (res, 0)) ||
476 (sizeof (uint32_t) != PQfsize (res, 1)) ||
477 (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
478 {
479 GNUNET_break (0);
480 PQclear (res);
481 return 0;
482 }
483 expiration_time.abs_value_us =
484 GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 0));
485 type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
486 size = PQgetlength (res, 0, 2);
487 path_len = PQgetlength (res, 0, 3);
488 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity))) 456 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
489 { 457 {
490 GNUNET_break (0); 458 GNUNET_break (0);
491 path_len = 0; 459 path_len = 0;
492 } 460 }
493 path_len %= sizeof (struct GNUNET_PeerIdentity); 461 path_len %= sizeof (struct GNUNET_PeerIdentity);
494 path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, 0, 3);
495 key = (const struct GNUNET_HashCode *) PQgetvalue (res, 0, 4);
496 LOG (GNUNET_ERROR_TYPE_DEBUG, 462 LOG (GNUNET_ERROR_TYPE_DEBUG,
497 "Found random value with key %s of size %u bytes and type %u in database\n", 463 "Found random value with key %s of size %u bytes and type %u in database\n",
498 GNUNET_h2s (key), 464 GNUNET_h2s (&key),
499 (unsigned int) size, 465 (unsigned int) data_size,
500 (unsigned int) type); 466 (unsigned int) type);
501 (void) iter (iter_cls, 467 (void) iter (iter_cls,
502 key, 468 &key,
503 size, 469 data_size,
504 PQgetvalue (res, 0, 2), 470 data,
505 (enum GNUNET_BLOCK_Type) type, 471 (enum GNUNET_BLOCK_Type) type,
506 expiration_time, 472 expiration_time,
507 path_len, 473 path_len,
508 path); 474 path);
509 PQclear (res); 475 GNUNET_PQ_cleanup_result (rs);
510 return 1; 476 return 1;
511} 477}
512 478
513 479
514/** 480/**
481 * Closure for #extract_result_cb.
482 */
483struct ExtractResultContext
484{
485 /**
486 * Function to call for each result found.
487 */
488 GNUNET_DATACACHE_Iterator iter;
489
490 /**
491 * Closure for @e iter.
492 */
493 void *iter_cls;
494
495};
496
497
498/**
499 * Function to be called with the results of a SELECT statement
500 * that has returned @a num_results results. Calls the `iter`
501 * from @a cls for each result.
502 *
503 * @param cls closure with the `struct ExtractResultContext`
504 * @param result the postgres result
505 * @param num_result the number of results in @a result
506 */
507static void
508extract_result_cb (void *cls,
509 PGresult *result,
510 unsigned int num_results)
511{
512 struct ExtractResultContext *erc = cls;
513
514 if (NULL == erc->iter)
515 return;
516 for (unsigned int i=0;i<num_results;i++)
517 {
518 struct GNUNET_TIME_Absolute expiration_time;
519 uint32_t type;
520 void *data;
521 size_t data_size;
522 struct GNUNET_PeerIdentity *path;
523 size_t path_len;
524 struct GNUNET_HashCode key;
525 struct GNUNET_PQ_ResultSpec rs[] = {
526 GNUNET_PQ_result_spec_absolute_time ("",
527 &expiration_time),
528 GNUNET_PQ_result_spec_uint32 ("type",
529 &type),
530 GNUNET_PQ_result_spec_variable_size ("value",
531 &data,
532 &data_size),
533 GNUNET_PQ_result_spec_variable_size ("path",
534 (void **) &path,
535 &path_len),
536 GNUNET_PQ_result_spec_auto_from_type ("key",
537 &key),
538 GNUNET_PQ_result_spec_end
539 };
540
541 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
542 {
543 GNUNET_break (0);
544 path_len = 0;
545 }
546 path_len %= sizeof (struct GNUNET_PeerIdentity);
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "Found result of size %u bytes and type %u in database\n",
549 (unsigned int) data_size,
550 (unsigned int) type);
551 if (GNUNET_SYSERR ==
552 erc->iter (erc->iter_cls,
553 &key,
554 data_size,
555 data,
556 (enum GNUNET_BLOCK_Type) type,
557 expiration_time,
558 path_len,
559 path))
560 {
561 LOG (GNUNET_ERROR_TYPE_DEBUG,
562 "Ending iteration (client error)\n");
563 GNUNET_PQ_cleanup_result (rs);
564 break;
565 }
566 GNUNET_PQ_cleanup_result (rs);
567 }
568}
569
570
571/**
515 * Iterate over the results that are "close" to a particular key in 572 * Iterate over the results that are "close" to a particular key in
516 * the datacache. "close" is defined as numerically larger than @a 573 * the datacache. "close" is defined as numerically larger than @a
517 * key (when interpreted as a circular address space), with small 574 * key (when interpreted as a circular address space), with small
@@ -532,105 +589,36 @@ postgres_plugin_get_closest (void *cls,
532 void *iter_cls) 589 void *iter_cls)
533{ 590{
534 struct Plugin *plugin = cls; 591 struct Plugin *plugin = cls;
535 uint32_t nbo_limit = htonl (num_results); 592 uint32_t num_results32 = (uint32_t) num_results;
536 const char *paramValues[] = { 593 struct GNUNET_PQ_QueryParam params[] = {
537 (const char *) key, 594 GNUNET_PQ_query_param_auto_from_type (key),
538 (const char *) &nbo_limit, 595 GNUNET_PQ_query_param_uint32 (&num_results32),
596 GNUNET_PQ_query_param_end
539 }; 597 };
540 int paramLengths[] = { 598 enum GNUNET_DB_QueryStatus res;
541 sizeof (struct GNUNET_HashCode), 599 struct ExtractResultContext erc;
542 sizeof (nbo_limit) 600
543 601 erc.iter = iter;
544 }; 602 erc.iter_cls = iter_cls;
545 const int paramFormats[] = { 1, 1 }; 603 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
546 struct GNUNET_TIME_Absolute expiration_time; 604 "get_closest",
547 uint32_t size; 605 params,
548 unsigned int type; 606 &extract_result_cb,
549 unsigned int cnt; 607 &erc);
550 unsigned int i; 608 if (0 > res)
551 unsigned int path_len;
552 const struct GNUNET_PeerIdentity *path;
553 PGresult *res;
554
555 res =
556 PQexecPrepared (plugin->dbh,
557 "get_closest",
558 2,
559 paramValues,
560 paramLengths,
561 paramFormats,
562 1);
563 if (GNUNET_OK !=
564 GNUNET_POSTGRES_check_result (plugin->dbh,
565 res,
566 PGRES_TUPLES_OK,
567 "PQexecPrepared",
568 "get_closest"))
569 { 609 {
570 LOG (GNUNET_ERROR_TYPE_DEBUG, 610 LOG (GNUNET_ERROR_TYPE_DEBUG,
571 "Ending iteration (postgres error)\n"); 611 "Ending iteration (postgres error)\n");
572 return 0; 612 return 0;
573 } 613 }
574 614 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == res)
575 if (0 == (cnt = PQntuples (res)))
576 { 615 {
577 /* no result */ 616 /* no result */
578 LOG (GNUNET_ERROR_TYPE_DEBUG, 617 LOG (GNUNET_ERROR_TYPE_DEBUG,
579 "Ending iteration (no more results)\n"); 618 "Ending iteration (no more results)\n");
580 PQclear (res);
581 return 0; 619 return 0;
582 } 620 }
583 if (NULL == iter) 621 return res;
584 {
585 PQclear (res);
586 return cnt;
587 }
588 if ( (5 != PQnfields (res)) ||
589 (sizeof (uint64_t) != PQfsize (res, 0)) ||
590 (sizeof (uint32_t) != PQfsize (res, 1)) ||
591 (sizeof (struct GNUNET_HashCode) != PQfsize (res, 4)) )
592 {
593 GNUNET_break (0);
594 PQclear (res);
595 return 0;
596 }
597 for (i = 0; i < cnt; i++)
598 {
599 expiration_time.abs_value_us =
600 GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, i, 0));
601 type = ntohl (*(uint32_t *) PQgetvalue (res, i, 1));
602 size = PQgetlength (res, i, 2);
603 path_len = PQgetlength (res, i, 3);
604 if (0 != (path_len % sizeof (struct GNUNET_PeerIdentity)))
605 {
606 GNUNET_break (0);
607 path_len = 0;
608 }
609 path_len %= sizeof (struct GNUNET_PeerIdentity);
610 path = (const struct GNUNET_PeerIdentity *) PQgetvalue (res, i, 3);
611 key = (const struct GNUNET_HashCode *) PQgetvalue (res, i, 4);
612 LOG (GNUNET_ERROR_TYPE_DEBUG,
613 "Found result of size %u bytes and type %u in database\n",
614 (unsigned int) size,
615 (unsigned int) type);
616 if (GNUNET_SYSERR ==
617 iter (iter_cls,
618 key,
619 size,
620 PQgetvalue (res, i, 2),
621 (enum GNUNET_BLOCK_Type) type,
622 expiration_time,
623 path_len,
624 path))
625 {
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "Ending iteration (client error)\n");
628 PQclear (res);
629 return cnt;
630 }
631 }
632 PQclear (res);
633 return cnt;
634} 622}
635 623
636 624