aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c251
1 files changed, 65 insertions, 186 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 7b04cc68a..0376ebb6c 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -80,6 +80,7 @@ init_connection (struct Plugin *plugin)
80 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel 80 * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
81 * we do math or inequality tests, so we can't handle the entire range of uint32_t. 81 * we do math or inequality tests, so we can't handle the entire range of uint32_t.
82 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC. 82 * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
83 * PostgreSQL also recommends against using WITH OIDS.
83 */ 84 */
84 ret = 85 ret =
85 PQexec (plugin->dbh, 86 PQexec (plugin->dbh,
@@ -114,13 +115,17 @@ init_connection (struct Plugin *plugin)
114 if (PQresultStatus (ret) == PGRES_COMMAND_OK) 115 if (PQresultStatus (ret) == PGRES_COMMAND_OK)
115 { 116 {
116 if ((GNUNET_OK != 117 if ((GNUNET_OK !=
117 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) || 118 GNUNET_POSTGRES_exec (plugin->dbh,
119 "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)")) ||
118 (GNUNET_OK != 120 (GNUNET_OK !=
119 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_hash_vhash ON gn090 (hash,vhash)")) || 121 GNUNET_POSTGRES_exec (plugin->dbh,
122 "CREATE INDEX IF NOT EXISTS idx_hash_vhash ON gn090 (hash,vhash)")) ||
120 (GNUNET_OK != 123 (GNUNET_OK !=
121 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) || 124 GNUNET_POSTGRES_exec (plugin->dbh,
125 "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)")) ||
122 (GNUNET_OK != 126 (GNUNET_OK !=
123 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) || 127 GNUNET_POSTGRES_exec (plugin->dbh,
128 "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)")) ||
124 (GNUNET_OK != 129 (GNUNET_OK !=
125 GNUNET_POSTGRES_exec (plugin->dbh, 130 GNUNET_POSTGRES_exec (plugin->dbh,
126 "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) || 131 "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)")) ||
@@ -128,9 +133,11 @@ init_connection (struct Plugin *plugin)
128 GNUNET_POSTGRES_exec (plugin->dbh, 133 GNUNET_POSTGRES_exec (plugin->dbh,
129 "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) || 134 "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
130 (GNUNET_OK != 135 (GNUNET_OK !=
131 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) || 136 GNUNET_POSTGRES_exec (plugin->dbh,
137 "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
132 (GNUNET_OK != 138 (GNUNET_OK !=
133 GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"))) 139 GNUNET_POSTGRES_exec (plugin->dbh,
140 "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)")))
134 { 141 {
135 PQclear (ret); 142 PQclear (ret);
136 PQfinish (plugin->dbh); 143 PQfinish (plugin->dbh);
@@ -170,40 +177,18 @@ init_connection (struct Plugin *plugin)
170 } 177 }
171 PQclear (ret); 178 PQclear (ret);
172 if ((GNUNET_OK != 179 if ((GNUNET_OK !=
173 GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
174 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
175 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
176 "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
177 (GNUNET_OK !=
178 GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
179 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
180 "WHERE hash=$1 AND type=$2 "
181 "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
182 (GNUNET_OK !=
183 GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
184 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
185 "WHERE hash=$1 AND vhash=$2 "
186 "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
187 (GNUNET_OK !=
188 GNUNET_POSTGRES_prepare (plugin->dbh, "get", 180 GNUNET_POSTGRES_prepare (plugin->dbh, "get",
189 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 181 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
190 "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) || 182 "WHERE oid >= $1::bigint AND "
191 (GNUNET_OK != 183 "(rvalue >= $2 OR 0 = $3::smallint) AND "
192 GNUNET_POSTGRES_prepare (plugin->dbh, "count_getvt", 184 "(hash = $4 OR 0 = $5::smallint) AND "
193 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3)) || 185 "(vhash = $6 OR 0 = $7::smallint) AND "
194 (GNUNET_OK != 186 "(type = $8 OR 0 = $9::smallint) "
195 GNUNET_POSTGRES_prepare (plugin->dbh, "count_gett", 187 "ORDER BY oid ASC LIMIT 1", 9)) ||
196 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2)) ||
197 (GNUNET_OK !=
198 GNUNET_POSTGRES_prepare (plugin->dbh, "count_getv",
199 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2)) ||
200 (GNUNET_OK !=
201 GNUNET_POSTGRES_prepare (plugin->dbh, "count_get",
202 "SELECT count(*) FROM gn090 WHERE hash=$1", 1)) ||
203 (GNUNET_OK != 188 (GNUNET_OK !=
204 GNUNET_POSTGRES_prepare (plugin->dbh, "put", 189 GNUNET_POSTGRES_prepare (plugin->dbh, "put",
205 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) " 190 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
206 "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) || 191 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
207 (GNUNET_OK != 192 (GNUNET_OK !=
208 GNUNET_POSTGRES_prepare (plugin->dbh, "update", 193 GNUNET_POSTGRES_prepare (plugin->dbh, "update",
209 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END " 194 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
@@ -215,8 +200,9 @@ init_connection (struct Plugin *plugin)
215 (GNUNET_OK != 200 (GNUNET_OK !=
216 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous", 201 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
217 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 202 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
218 "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2", 203 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
219 1)) || 204 "ORDER BY oid ASC LIMIT 1",
205 2)) ||
220 (GNUNET_OK != 206 (GNUNET_OK !=
221 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order", 207 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
222 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 208 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
@@ -322,6 +308,8 @@ postgres_plugin_put (void *cls,
322 struct Plugin *plugin = cls; 308 struct Plugin *plugin = cls;
323 uint32_t utype = type; 309 uint32_t utype = type;
324 struct GNUNET_HashCode vhash; 310 struct GNUNET_HashCode vhash;
311 uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
312 UINT64_MAX);
325 PGresult *ret; 313 PGresult *ret;
326 struct GNUNET_PQ_QueryParam params[] = { 314 struct GNUNET_PQ_QueryParam params[] = {
327 GNUNET_PQ_query_param_uint32 (&replication), 315 GNUNET_PQ_query_param_uint32 (&replication),
@@ -329,6 +317,7 @@ postgres_plugin_put (void *cls,
329 GNUNET_PQ_query_param_uint32 (&priority), 317 GNUNET_PQ_query_param_uint32 (&priority),
330 GNUNET_PQ_query_param_uint32 (&anonymity), 318 GNUNET_PQ_query_param_uint32 (&anonymity),
331 GNUNET_PQ_query_param_absolute_time (&expiration), 319 GNUNET_PQ_query_param_absolute_time (&expiration),
320 GNUNET_PQ_query_param_uint64 (&rvalue),
332 GNUNET_PQ_query_param_auto_from_type (key), 321 GNUNET_PQ_query_param_auto_from_type (key),
333 GNUNET_PQ_query_param_auto_from_type (&vhash), 322 GNUNET_PQ_query_param_auto_from_type (&vhash),
334 GNUNET_PQ_query_param_fixed_size (data, size), 323 GNUNET_PQ_query_param_fixed_size (data, size),
@@ -489,12 +478,11 @@ process_result (struct Plugin *plugin,
489 478
490 479
491/** 480/**
492 * Iterate over the results for a particular key 481 * Get one of the results for a particular key in the datastore.
493 * in the datastore.
494 * 482 *
495 * @param cls closure with the 'struct Plugin' 483 * @param cls closure with the 'struct Plugin'
496 * @param offset offset of the result (modulo num-results); 484 * @param next_uid return the result with lowest uid >= next_uid
497 * specific ordering does not matter for the offset 485 * @param random if true, return a random result instead of using next_uid
498 * @param key maybe NULL (to match all entries) 486 * @param key maybe NULL (to match all entries)
499 * @param vhash hash of the value, maybe NULL (to 487 * @param vhash hash of the value, maybe NULL (to
500 * match all values that have the right key). 488 * match all values that have the right key).
@@ -504,160 +492,52 @@ process_result (struct Plugin *plugin,
504 * @param type entries of which type are relevant? 492 * @param type entries of which type are relevant?
505 * Use 0 for any type. 493 * Use 0 for any type.
506 * @param proc function to call on the matching value; 494 * @param proc function to call on the matching value;
507 * will be called once with a NULL if no value matches 495 * will be called with NULL if nothing matches
508 * @param proc_cls closure for iter 496 * @param proc_cls closure for @a proc
509 */ 497 */
510static void 498static void
511postgres_plugin_get_key (void *cls, 499postgres_plugin_get_key (void *cls,
512 uint64_t offset, 500 uint64_t next_uid,
501 bool random,
513 const struct GNUNET_HashCode *key, 502 const struct GNUNET_HashCode *key,
514 const struct GNUNET_HashCode *vhash, 503 const struct GNUNET_HashCode *vhash,
515 enum GNUNET_BLOCK_Type type, 504 enum GNUNET_BLOCK_Type type,
516 PluginDatumProcessor proc, 505 PluginDatumProcessor proc,
517 void *proc_cls) 506 void *proc_cls)
518{ 507{
519 struct Plugin *plugin = cls; 508 struct Plugin *plugin = cls;
520 uint32_t utype = type; 509 uint32_t utype = type;
510 uint16_t use_rvalue = random;
511 uint16_t use_key = NULL != key;
512 uint16_t use_vhash = NULL != vhash;
513 uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
514 uint64_t rvalue;
515 struct GNUNET_PQ_QueryParam params[] = {
516 GNUNET_PQ_query_param_uint64 (&next_uid),
517 GNUNET_PQ_query_param_uint64 (&rvalue),
518 GNUNET_PQ_query_param_uint16 (&use_rvalue),
519 GNUNET_PQ_query_param_auto_from_type (key),
520 GNUNET_PQ_query_param_uint16 (&use_key),
521 GNUNET_PQ_query_param_auto_from_type (vhash),
522 GNUNET_PQ_query_param_uint16 (&use_vhash),
523 GNUNET_PQ_query_param_uint32 (&utype),
524 GNUNET_PQ_query_param_uint16 (&use_type),
525 GNUNET_PQ_query_param_end
526 };
521 PGresult *ret; 527 PGresult *ret;
522 uint64_t total;
523 uint64_t limit_off;
524 528
525 if (0 != type) 529 if (random)
526 { 530 {
527 if (NULL != vhash) 531 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
528 { 532 UINT64_MAX);
529 struct GNUNET_PQ_QueryParam params[] = { 533 next_uid = 0;
530 GNUNET_PQ_query_param_auto_from_type (key),
531 GNUNET_PQ_query_param_auto_from_type (vhash),
532 GNUNET_PQ_query_param_uint32 (&utype),
533 GNUNET_PQ_query_param_end
534 };
535 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
536 "count_getvt",
537 params);
538 }
539 else
540 {
541 struct GNUNET_PQ_QueryParam params[] = {
542 GNUNET_PQ_query_param_auto_from_type (key),
543 GNUNET_PQ_query_param_uint32 (&utype),
544 GNUNET_PQ_query_param_end
545 };
546 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
547 "count_gett",
548 params);
549 }
550 } 534 }
551 else 535 else
552 { 536 rvalue = 0;
553 if (NULL != vhash)
554 {
555 struct GNUNET_PQ_QueryParam params[] = {
556 GNUNET_PQ_query_param_auto_from_type (key),
557 GNUNET_PQ_query_param_auto_from_type (vhash),
558 GNUNET_PQ_query_param_end
559 };
560 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
561 "count_getv",
562 params);
563 }
564 else
565 {
566 struct GNUNET_PQ_QueryParam params[] = {
567 GNUNET_PQ_query_param_auto_from_type (key),
568 GNUNET_PQ_query_param_end
569 };
570 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
571 "count_get",
572 params);
573 }
574 }
575 537
576 if (GNUNET_OK != 538 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
577 GNUNET_POSTGRES_check_result (plugin->dbh, 539 "get",
578 ret, 540 params);
579 PGRES_TUPLES_OK,
580 "PQexecParams",
581 "count"))
582 {
583 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
584 GNUNET_TIME_UNIT_ZERO_ABS, 0);
585 return;
586 }
587 if ( (PQntuples (ret) != 1) ||
588 (PQnfields (ret) != 1) ||
589 (PQgetlength (ret, 0, 0) != sizeof (uint64_t)))
590 {
591 GNUNET_break (0);
592 PQclear (ret);
593 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
594 GNUNET_TIME_UNIT_ZERO_ABS, 0);
595 return;
596 }
597 total = GNUNET_ntohll (*(const uint64_t *) PQgetvalue (ret, 0, 0));
598 PQclear (ret);
599 if (0 == total)
600 {
601 proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
602 GNUNET_TIME_UNIT_ZERO_ABS, 0);
603 return;
604 }
605 limit_off = offset % total;
606
607 if (0 != type)
608 {
609 if (NULL != vhash)
610 {
611 struct GNUNET_PQ_QueryParam params[] = {
612 GNUNET_PQ_query_param_auto_from_type (key),
613 GNUNET_PQ_query_param_auto_from_type (vhash),
614 GNUNET_PQ_query_param_uint32 (&utype),
615 GNUNET_PQ_query_param_uint64 (&limit_off),
616 GNUNET_PQ_query_param_end
617 };
618 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
619 "getvt",
620 params);
621 }
622 else
623 {
624 struct GNUNET_PQ_QueryParam params[] = {
625 GNUNET_PQ_query_param_auto_from_type (key),
626 GNUNET_PQ_query_param_uint32 (&utype),
627 GNUNET_PQ_query_param_uint64 (&limit_off),
628 GNUNET_PQ_query_param_end
629 };
630 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
631 "gett",
632 params);
633 }
634 }
635 else
636 {
637 if (NULL != vhash)
638 {
639 struct GNUNET_PQ_QueryParam params[] = {
640 GNUNET_PQ_query_param_auto_from_type (key),
641 GNUNET_PQ_query_param_auto_from_type (vhash),
642 GNUNET_PQ_query_param_uint64 (&limit_off),
643 GNUNET_PQ_query_param_end
644 };
645 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
646 "getv",
647 params);
648 }
649 else
650 {
651 struct GNUNET_PQ_QueryParam params[] = {
652 GNUNET_PQ_query_param_auto_from_type (key),
653 GNUNET_PQ_query_param_uint64 (&limit_off),
654 GNUNET_PQ_query_param_end
655 };
656 ret = GNUNET_PQ_exec_prepared (plugin->dbh,
657 "get",
658 params);
659 }
660 }
661 process_result (plugin, 541 process_result (plugin,
662 proc, 542 proc,
663 proc_cls, 543 proc_cls,
@@ -671,26 +551,25 @@ postgres_plugin_get_key (void *cls,
671 * the given iterator for each of them. 551 * the given iterator for each of them.
672 * 552 *
673 * @param cls our `struct Plugin *` 553 * @param cls our `struct Plugin *`
674 * @param offset offset of the result (modulo num-results); 554 * @param next_uid return the result with lowest uid >= next_uid
675 * specific ordering does not matter for the offset
676 * @param type entries of which type should be considered? 555 * @param type entries of which type should be considered?
677 * Use 0 for any type. 556 * Must not be zero (ANY).
678 * @param proc function to call on the matching value; 557 * @param proc function to call on the matching value;
679 * will be called with a NULL if no value matches 558 * will be called with NULL if no value matches
680 * @param proc_cls closure for @a proc 559 * @param proc_cls closure for @a proc
681 */ 560 */
682static void 561static void
683postgres_plugin_get_zero_anonymity (void *cls, 562postgres_plugin_get_zero_anonymity (void *cls,
684 uint64_t offset, 563 uint64_t next_uid,
685 enum GNUNET_BLOCK_Type type, 564 enum GNUNET_BLOCK_Type type,
686 PluginDatumProcessor proc, 565 PluginDatumProcessor proc,
687 void *proc_cls) 566 void *proc_cls)
688{ 567{
689 struct Plugin *plugin = cls; 568 struct Plugin *plugin = cls;
690 uint32_t utype = type; 569 uint32_t utype = type;
691 struct GNUNET_PQ_QueryParam params[] = { 570 struct GNUNET_PQ_QueryParam params[] = {
692 GNUNET_PQ_query_param_uint32 (&utype), 571 GNUNET_PQ_query_param_uint32 (&utype),
693 GNUNET_PQ_query_param_uint64 (&offset), 572 GNUNET_PQ_query_param_uint64 (&next_uid),
694 GNUNET_PQ_query_param_end 573 GNUNET_PQ_query_param_end
695 }; 574 };
696 PGresult *ret; 575 PGresult *ret;