aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c611
1 files changed, 286 insertions, 325 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 1ff56da31..2cecfa9a1 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -30,45 +30,6 @@
30 30
31#define DEBUG_POSTGRES GNUNET_NO 31#define DEBUG_POSTGRES GNUNET_NO
32 32
33#define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
34 "WHERE (prio = $1 AND oid > $2) " \
35 "ORDER BY prio ASC,oid ASC LIMIT 1) "\
36 "UNION "\
37 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
38 "WHERE (prio > $1 AND oid != $2)"\
39 "ORDER BY prio ASC,oid ASC LIMIT 1)"\
40 "ORDER BY prio ASC,oid ASC LIMIT 1"
41
42#define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
43 "WHERE (prio = $1 AND oid < $2)"\
44 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
45 "UNION "\
46 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
47 "WHERE (prio < $1 AND oid != $2)"\
48 " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
49 "ORDER BY prio DESC,oid DESC LIMIT 1"
50
51#define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
52 "WHERE (expire = $1 AND oid > $2) "\
53 "ORDER BY expire ASC,oid ASC LIMIT 1) "\
54 "UNION "\
55 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
56 "WHERE (expire > $1 AND oid != $2) " \
57 "ORDER BY expire ASC,oid ASC LIMIT 1)"\
58 "ORDER BY expire ASC,oid ASC LIMIT 1"
59
60
61#define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
62 "WHERE (expire = $1 AND oid < $2)"\
63 " AND expire > $3 AND type!=3"\
64 " ORDER BY expire DESC,oid DESC LIMIT 1) "\
65 "UNION "\
66 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
67 "WHERE (expire < $1 AND oid != $2)" \
68 " AND expire > $3 AND type!=3"\
69 " ORDER BY expire DESC,oid DESC LIMIT 1)"\
70 "ORDER BY expire DESC,oid DESC LIMIT 1"
71
72/** 33/**
73 * After how many ms "busy" should a DB operation fail for good? 34 * After how many ms "busy" should a DB operation fail for good?
74 * A low value makes sure that we are more responsive to requests 35 * A low value makes sure that we are more responsive to requests
@@ -140,7 +101,7 @@ struct NextRequestClosure
140 /** 101 /**
141 * Number of entries found so far 102 * Number of entries found so far
142 */ 103 */
143 long long count; 104 unsigned long long count;
144 105
145 /** 106 /**
146 * Offset this iteration starts at. 107 * Offset this iteration starts at.
@@ -153,24 +114,14 @@ struct NextRequestClosure
153 uint64_t blimit_off; 114 uint64_t blimit_off;
154 115
155 /** 116 /**
156 * Overall number of matching entries. 117 * Current total number of entries found so far, big-endian.
157 */
158 unsigned long long total;
159
160 /**
161 * Expiration value of previous result (possible parameter), big-endian.
162 */ 118 */
163 uint64_t blast_expire; 119 uint64_t bcount;
164 120
165 /** 121 /**
166 * Row ID of last result (possible paramter), big-endian. 122 * Overall number of matching entries.
167 */
168 uint32_t blast_rowid;
169
170 /**
171 * Priority of last result (possible parameter), big-endian.
172 */ 123 */
173 uint32_t blast_prio; 124 unsigned long long total;
174 125
175 /** 126 /**
176 * Type of block (possible paramter), big-endian. 127 * Type of block (possible paramter), big-endian.
@@ -181,6 +132,11 @@ struct NextRequestClosure
181 * Flag set to GNUNET_YES to stop iteration. 132 * Flag set to GNUNET_YES to stop iteration.
182 */ 133 */
183 int end_it; 134 int end_it;
135
136 /**
137 * Flag to indicate that there should only be one result.
138 */
139 int one_shot;
184}; 140};
185 141
186 142
@@ -336,6 +292,7 @@ init_connection (struct Plugin *plugin)
336 GNUNET_free_non_null (conninfo); 292 GNUNET_free_non_null (conninfo);
337 ret = PQexec (plugin->dbh, 293 ret = PQexec (plugin->dbh,
338 "CREATE TABLE gn090 (" 294 "CREATE TABLE gn090 ("
295 " repl INTEGER NOT NULL DEFAULT 0,"
339 " type INTEGER NOT NULL DEFAULT 0," 296 " type INTEGER NOT NULL DEFAULT 0,"
340 " prio INTEGER NOT NULL DEFAULT 0," 297 " prio INTEGER NOT NULL DEFAULT 0,"
341 " anonLevel INTEGER NOT NULL DEFAULT 0," 298 " anonLevel INTEGER NOT NULL DEFAULT 0,"
@@ -385,7 +342,6 @@ init_connection (struct Plugin *plugin)
385 } 342 }
386 } 343 }
387 PQclear (ret); 344 PQclear (ret);
388#if 1
389 ret = PQexec (plugin->dbh, 345 ret = PQexec (plugin->dbh,
390 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"); 346 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
391 if (GNUNET_OK != 347 if (GNUNET_OK !=
@@ -421,44 +377,43 @@ init_connection (struct Plugin *plugin)
421 return GNUNET_SYSERR; 377 return GNUNET_SYSERR;
422 } 378 }
423 PQclear (ret); 379 PQclear (ret);
424#endif
425 if ((GNUNET_OK != 380 if ((GNUNET_OK !=
426 pq_prepare (plugin, 381 pq_prepare (plugin,
427 "getvt", 382 "getvt",
428 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 383 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
429 "WHERE hash=$1 AND vhash=$2 AND type=$3 " 384 "WHERE hash=$1 AND vhash=$2 AND type=$3 "
430 "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5", 385 "ORDER BY oid ASC LIMIT 1 OFFSET $4",
431 5, 386 4,
432 __LINE__)) || 387 __LINE__)) ||
433 (GNUNET_OK != 388 (GNUNET_OK !=
434 pq_prepare (plugin, 389 pq_prepare (plugin,
435 "gett", 390 "gett",
436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 391 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437 "WHERE hash=$1 AND type=$2" 392 "WHERE hash=$1 AND type=$2 "
438 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", 393 "ORDER BY oid ASC LIMIT 1 OFFSET $3",
439 4, 394 3,
440 __LINE__)) || 395 __LINE__)) ||
441 (GNUNET_OK != 396 (GNUNET_OK !=
442 pq_prepare (plugin, 397 pq_prepare (plugin,
443 "getv", 398 "getv",
444 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 399 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
445 "WHERE hash=$1 AND vhash=$2" 400 "WHERE hash=$1 AND vhash=$2 "
446 "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4", 401 "ORDER BY oid ASC LIMIT 1 OFFSET $3",
447 4, 402 3,
448 __LINE__)) || 403 __LINE__)) ||
449 (GNUNET_OK != 404 (GNUNET_OK !=
450 pq_prepare (plugin, 405 pq_prepare (plugin,
451 "get", 406 "get",
452 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 407 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
453 "WHERE hash=$1" 408 "WHERE hash=$1 "
454 "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3", 409 "ORDER BY oid ASC LIMIT 1 OFFSET $2",
455 3, 410 2,
456 __LINE__)) || 411 __LINE__)) ||
457 (GNUNET_OK != 412 (GNUNET_OK !=
458 pq_prepare (plugin, 413 pq_prepare (plugin,
459 "put", 414 "put",
460 "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) " 415 "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
461 "VALUES ($1, $2, $3, $4, $5, $6, $7)", 416 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
462 8, 417 8,
463 __LINE__)) || 418 __LINE__)) ||
464 (GNUNET_OK != 419 (GNUNET_OK !=
@@ -470,32 +425,42 @@ init_connection (struct Plugin *plugin)
470 __LINE__)) || 425 __LINE__)) ||
471 (GNUNET_OK != 426 (GNUNET_OK !=
472 pq_prepare (plugin, 427 pq_prepare (plugin,
473 "select_low_priority", 428 "decrepl",
474 SELECT_IT_LOW_PRIORITY, 429 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
475 2, 430 "WHERE oid = $1",
431 1,
476 __LINE__)) || 432 __LINE__)) ||
477 (GNUNET_OK != 433 (GNUNET_OK !=
478 pq_prepare (plugin, 434 pq_prepare (plugin,
479 "select_non_anonymous", 435 "select_non_anonymous",
480 SELECT_IT_NON_ANONYMOUS, 436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
481 2, 437 "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
438 1,
482 __LINE__)) || 439 __LINE__)) ||
483 (GNUNET_OK != 440 (GNUNET_OK !=
484 pq_prepare (plugin, 441 pq_prepare (plugin,
485 "select_expiration_time", 442 "select_expiration_order",
486 SELECT_IT_EXPIRATION_TIME, 443 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
487 2, 444 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
445 "UNION "
446 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
447 "ORDER BY prio ASC LIMIT 1) "
448 "ORDER BY expire ASC LIMIT 1",
449 1,
488 __LINE__)) || 450 __LINE__)) ||
489 (GNUNET_OK != 451 (GNUNET_OK !=
490 pq_prepare (plugin, 452 pq_prepare (plugin,
491 "select_migration_order", 453 "select_replication_order",
492 SELECT_IT_MIGRATION_ORDER, 454 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
493 3, 455 "ORDER BY repl DESC,RANDOM() LIMIT 1",
456 0,
494 __LINE__)) || 457 __LINE__)) ||
495 (GNUNET_OK != 458 (GNUNET_OK !=
496 pq_prepare (plugin, 459 pq_prepare (plugin,
497 "delrow", 460 "delrow",
498 "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__))) 461 "DELETE FROM gn090 " "WHERE oid=$1",
462 1,
463 __LINE__)))
499 { 464 {
500 PQfinish (plugin->dbh); 465 PQfinish (plugin->dbh);
501 plugin->dbh = NULL; 466 plugin->dbh = NULL;
@@ -610,8 +575,10 @@ postgres_plugin_put (void *cls,
610 uint32_t btype = htonl (type); 575 uint32_t btype = htonl (type);
611 uint32_t bprio = htonl (priority); 576 uint32_t bprio = htonl (priority);
612 uint32_t banon = htonl (anonymity); 577 uint32_t banon = htonl (anonymity);
578 uint32_t brepl = htonl (replication);
613 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__; 579 uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
614 const char *paramValues[] = { 580 const char *paramValues[] = {
581 (const char *) &brepl,
615 (const char *) &btype, 582 (const char *) &btype,
616 (const char *) &bprio, 583 (const char *) &bprio,
617 (const char *) &banon, 584 (const char *) &banon,
@@ -621,6 +588,7 @@ postgres_plugin_put (void *cls,
621 (const char *) data 588 (const char *) data
622 }; 589 };
623 int paramLengths[] = { 590 int paramLengths[] = {
591 sizeof (brepl),
624 sizeof (btype), 592 sizeof (btype),
625 sizeof (bprio), 593 sizeof (bprio),
626 sizeof (banon), 594 sizeof (banon),
@@ -629,11 +597,11 @@ postgres_plugin_put (void *cls,
629 sizeof (GNUNET_HashCode), 597 sizeof (GNUNET_HashCode),
630 size 598 size
631 }; 599 };
632 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 }; 600 const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
633 601
634 GNUNET_CRYPTO_hash (data, size, &vhash); 602 GNUNET_CRYPTO_hash (data, size, &vhash);
635 ret = PQexecPrepared (plugin->dbh, 603 ret = PQexecPrepared (plugin->dbh,
636 "put", 7, paramValues, paramLengths, paramFormats, 1); 604 "put", 8, paramValues, paramLengths, paramFormats, 1);
637 if (GNUNET_OK != check_result (plugin, ret, 605 if (GNUNET_OK != check_result (plugin, ret,
638 PGRES_COMMAND_OK, 606 PGRES_COMMAND_OK,
639 "PQexecPrepared", "put", __LINE__)) 607 "PQexecPrepared", "put", __LINE__))
@@ -649,6 +617,7 @@ postgres_plugin_put (void *cls,
649 return GNUNET_OK; 617 return GNUNET_OK;
650} 618}
651 619
620
652/** 621/**
653 * Function invoked on behalf of a "PluginIterator" 622 * Function invoked on behalf of a "PluginIterator"
654 * asking the database plugin to call the iterator 623 * asking the database plugin to call the iterator
@@ -690,15 +659,11 @@ postgres_next_request_cont (void *next_cls,
690 GNUNET_TIME_UNIT_ZERO_ABS, 0); 659 GNUNET_TIME_UNIT_ZERO_ABS, 0);
691 GNUNET_free (nrc); 660 GNUNET_free (nrc);
692 return; 661 return;
693 } 662 }
694 663 if (nrc->off == nrc->total)
695 if (nrc->count == 0) 664 nrc->off = 0;
696 nrc->blimit_off = GNUNET_htonll (nrc->off); 665 nrc->blimit_off = GNUNET_htonll (nrc->off);
697 else 666 nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
698 nrc->blimit_off = GNUNET_htonll (0);
699 if (nrc->count + nrc->off == nrc->total)
700 nrc->blast_rowid = htonl (0); /* back to start */
701
702 res = PQexecPrepared (plugin->dbh, 667 res = PQexecPrepared (plugin->dbh,
703 nrc->pname, 668 nrc->pname,
704 nrc->nparams, 669 nrc->nparams,
@@ -773,14 +738,10 @@ postgres_next_request_cont (void *next_cls,
773 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1)); 738 priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
774 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2)); 739 anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
775 expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3)); 740 expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
776 memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode)); 741 memcpy (&key,
742 PQgetvalue (res, 0, 4),
743 sizeof (GNUNET_HashCode));
777 size = PQgetlength (res, 0, 5); 744 size = PQgetlength (res, 0, 5);
778
779 nrc->blast_prio = htonl (priority);
780 nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
781 nrc->blast_rowid = htonl (rowid);
782 nrc->count++;
783
784#if DEBUG_POSTGRES 745#if DEBUG_POSTGRES
785 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 746 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
786 "datastore-postgres", 747 "datastore-postgres",
@@ -789,7 +750,7 @@ postgres_next_request_cont (void *next_cls,
789 (unsigned int) type); 750 (unsigned int) type);
790#endif 751#endif
791 iret = nrc->iter (nrc->iter_cls, 752 iret = nrc->iter (nrc->iter_cls,
792 nrc, 753 (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
793 &key, 754 &key,
794 size, 755 size,
795 PQgetvalue (res, 0, 5), 756 PQgetvalue (res, 0, 5),
@@ -799,6 +760,11 @@ postgres_next_request_cont (void *next_cls,
799 expiration_time, 760 expiration_time,
800 rowid); 761 rowid);
801 PQclear (res); 762 PQclear (res);
763 if (iret != GNUNET_NO)
764 {
765 nrc->count++;
766 nrc->off++;
767 }
802 if (iret == GNUNET_SYSERR) 768 if (iret == GNUNET_SYSERR)
803 { 769 {
804#if DEBUG_POSTGRES 770#if DEBUG_POSTGRES
@@ -828,6 +794,8 @@ postgres_next_request_cont (void *next_cls,
828#endif 794#endif
829 } 795 }
830 } 796 }
797 if (nrc->one_shot == GNUNET_YES)
798 GNUNET_free (nrc);
831} 799}
832 800
833 801
@@ -858,183 +826,6 @@ postgres_plugin_next_request (void *next_cls,
858 826
859 827
860/** 828/**
861 * Update the priority for a particular key in the datastore. If
862 * the expiration time in value is different than the time found in
863 * the datastore, the higher value should be kept. For the
864 * anonymity level, the lower value is to be used. The specified
865 * priority should be added to the existing priority, ignoring the
866 * priority in value.
867 *
868 * Note that it is possible for multiple values to match this put.
869 * In that case, all of the respective values are updated.
870 *
871 * @param cls our "struct Plugin*"
872 * @param uid unique identifier of the datum
873 * @param delta by how much should the priority
874 * change? If priority + delta < 0 the
875 * priority should be set to 0 (never go
876 * negative).
877 * @param expire new expiration time should be the
878 * MAX of any existing expiration time and
879 * this value
880 * @param msg set to error message
881 * @return GNUNET_OK on success
882 */
883static int
884postgres_plugin_update (void *cls,
885 uint64_t uid,
886 int delta, struct GNUNET_TIME_Absolute expire,
887 char **msg)
888{
889 struct Plugin *plugin = cls;
890 PGresult *ret;
891 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
892 uint32_t boid = htonl ( (uint32_t) uid);
893 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
894 const char *paramValues[] = {
895 (const char *) &bdelta,
896 (const char *) &bexpire,
897 (const char *) &boid,
898 };
899 int paramLengths[] = {
900 sizeof (bdelta),
901 sizeof (bexpire),
902 sizeof (boid),
903 };
904 const int paramFormats[] = { 1, 1, 1 };
905
906 ret = PQexecPrepared (plugin->dbh,
907 "update",
908 3, paramValues, paramLengths, paramFormats, 1);
909 if (GNUNET_OK != check_result (plugin,
910 ret,
911 PGRES_COMMAND_OK,
912 "PQexecPrepared", "update", __LINE__))
913 return GNUNET_SYSERR;
914 PQclear (ret);
915 return GNUNET_OK;
916}
917
918
919/**
920 * Call a method for each key in the database and
921 * call the callback method on it.
922 *
923 * @param plugin global context
924 * @param type entries of which type should be considered?
925 * @param is_asc ascending or descending iteration?
926 * @param iter_select which SELECT method should be used?
927 * @param iter maybe NULL (to just count); iter
928 * should return GNUNET_SYSERR to abort the
929 * iteration, GNUNET_NO to delete the entry and
930 * continue and GNUNET_OK to continue iterating
931 * @param iter_cls closure for 'iter'
932 */
933static void
934postgres_iterate (struct Plugin *plugin,
935 unsigned int type,
936 int is_asc,
937 unsigned int iter_select,
938 PluginIterator iter, void *iter_cls)
939{
940 struct NextRequestClosure *nrc;
941
942 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
943 nrc->count = UINT32_MAX;
944 nrc->plugin = plugin;
945 nrc->iter = iter;
946 nrc->iter_cls = iter_cls;
947 if (is_asc)
948 {
949 nrc->blast_prio = htonl (0);
950 nrc->blast_rowid = htonl (0);
951 nrc->blast_expire = htonl (0);
952 }
953 else
954 {
955 nrc->blast_prio = htonl (0x7FFFFFFFL);
956 nrc->blast_rowid = htonl (0xFFFFFFFF);
957 nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
958 }
959 switch (iter_select)
960 {
961 case 0:
962 nrc->pname = "select_low_priority";
963 nrc->nparams = 2;
964 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
965 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
966 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
967 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
968 break;
969 case 1:
970 nrc->pname = "select_non_anonymous";
971 nrc->nparams = 2;
972 nrc->paramValues[0] = (const char *) &nrc->blast_prio;
973 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
974 nrc->paramLengths[0] = sizeof (nrc->blast_prio);
975 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
976 break;
977 case 2:
978 nrc->pname = "select_expiration_time";
979 nrc->nparams = 2;
980 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
981 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
982 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
983 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
984 break;
985 case 3:
986 nrc->pname = "select_migration_order";
987 nrc->nparams = 3;
988 nrc->paramValues[0] = (const char *) &nrc->blast_expire;
989 nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
990 nrc->paramValues[2] = (const char *) &nrc->bnow;
991 nrc->paramLengths[0] = sizeof (nrc->blast_expire);
992 nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
993 nrc->paramLengths[2] = sizeof (nrc->bnow);
994 break;
995 default:
996 GNUNET_break (0);
997 iter (iter_cls,
998 NULL, NULL, 0, NULL, 0, 0, 0,
999 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1000 GNUNET_free (nrc);
1001 return;
1002 }
1003 nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
1004 postgres_plugin_next_request (nrc,
1005 GNUNET_NO);
1006}
1007
1008
1009/**
1010 * Select a subset of the items in the datastore and call
1011 * the given iterator for each of them.
1012 *
1013 * @param cls our "struct Plugin*"
1014 * @param type entries of which type should be considered?
1015 * Use 0 for any type.
1016 * @param iter function to call on each matching value;
1017 * will be called once with a NULL value at the end
1018 * @param iter_cls closure for iter
1019 */
1020static void
1021postgres_plugin_iter_low_priority (void *cls,
1022 enum GNUNET_BLOCK_Type type,
1023 PluginIterator iter,
1024 void *iter_cls)
1025{
1026 struct Plugin *plugin = cls;
1027
1028 postgres_iterate (plugin,
1029 type,
1030 GNUNET_YES, 0,
1031 iter, iter_cls);
1032}
1033
1034
1035
1036
1037/**
1038 * Iterate over the results for a particular key 829 * Iterate over the results for a particular key
1039 * in the datastore. 830 * in the datastore.
1040 * 831 *
@@ -1063,12 +854,7 @@ postgres_plugin_get (void *cls,
1063 const int paramFormats[] = { 1, 1, 1, 1, 1 }; 854 const int paramFormats[] = { 1, 1, 1, 1, 1 };
1064 PGresult *ret; 855 PGresult *ret;
1065 856
1066 if (key == NULL) 857 GNUNET_assert (key != NULL);
1067 {
1068 postgres_plugin_iter_low_priority (plugin, type,
1069 iter, iter_cls);
1070 return;
1071 }
1072 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 858 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1073 nrc->plugin = plugin; 859 nrc->plugin = plugin;
1074 nrc->iter = iter; 860 nrc->iter = iter;
@@ -1087,11 +873,9 @@ postgres_plugin_get (void *cls,
1087 nrc->paramLengths[1] = sizeof (nrc->vhash); 873 nrc->paramLengths[1] = sizeof (nrc->vhash);
1088 nrc->paramValues[2] = (const char *) &nrc->btype; 874 nrc->paramValues[2] = (const char *) &nrc->btype;
1089 nrc->paramLengths[2] = sizeof (nrc->btype); 875 nrc->paramLengths[2] = sizeof (nrc->btype);
1090 nrc->paramValues[3] = (const char *) &nrc->blast_rowid; 876 nrc->paramValues[3] = (const char *) &nrc->blimit_off;
1091 nrc->paramLengths[3] = sizeof (nrc->blast_rowid); 877 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1092 nrc->paramValues[4] = (const char *) &nrc->blimit_off; 878 nrc->nparams = 4;
1093 nrc->paramLengths[4] = sizeof (nrc->blimit_off);
1094 nrc->nparams = 5;
1095 nrc->pname = "getvt"; 879 nrc->pname = "getvt";
1096 ret = PQexecParams (plugin->dbh, 880 ret = PQexecParams (plugin->dbh,
1097 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 881 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
@@ -1105,11 +889,9 @@ postgres_plugin_get (void *cls,
1105 { 889 {
1106 nrc->paramValues[1] = (const char *) &nrc->btype; 890 nrc->paramValues[1] = (const char *) &nrc->btype;
1107 nrc->paramLengths[1] = sizeof (nrc->btype); 891 nrc->paramLengths[1] = sizeof (nrc->btype);
1108 nrc->paramValues[2] = (const char *) &nrc->blast_rowid; 892 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1109 nrc->paramLengths[2] = sizeof (nrc->blast_rowid); 893 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1110 nrc->paramValues[3] = (const char *) &nrc->blimit_off; 894 nrc->nparams = 3;
1111 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1112 nrc->nparams = 4;
1113 nrc->pname = "gett"; 895 nrc->pname = "gett";
1114 ret = PQexecParams (plugin->dbh, 896 ret = PQexecParams (plugin->dbh,
1115 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 897 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
@@ -1126,11 +908,9 @@ postgres_plugin_get (void *cls,
1126 { 908 {
1127 nrc->paramValues[1] = (const char *) &nrc->vhash; 909 nrc->paramValues[1] = (const char *) &nrc->vhash;
1128 nrc->paramLengths[1] = sizeof (nrc->vhash); 910 nrc->paramLengths[1] = sizeof (nrc->vhash);
1129 nrc->paramValues[2] = (const char *) &nrc->blast_rowid; 911 nrc->paramValues[2] = (const char *) &nrc->blimit_off;
1130 nrc->paramLengths[2] = sizeof (nrc->blast_rowid); 912 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1131 nrc->paramValues[3] = (const char *) &nrc->blimit_off; 913 nrc->nparams = 3;
1132 nrc->paramLengths[3] = sizeof (nrc->blimit_off);
1133 nrc->nparams = 4;
1134 nrc->pname = "getv"; 914 nrc->pname = "getv";
1135 ret = PQexecParams (plugin->dbh, 915 ret = PQexecParams (plugin->dbh,
1136 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 916 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
@@ -1142,11 +922,9 @@ postgres_plugin_get (void *cls,
1142 } 922 }
1143 else 923 else
1144 { 924 {
1145 nrc->paramValues[1] = (const char *) &nrc->blast_rowid; 925 nrc->paramValues[1] = (const char *) &nrc->blimit_off;
1146 nrc->paramLengths[1] = sizeof (nrc->blast_rowid); 926 nrc->paramLengths[1] = sizeof (nrc->blimit_off);
1147 nrc->paramValues[2] = (const char *) &nrc->blimit_off; 927 nrc->nparams = 2;
1148 nrc->paramLengths[2] = sizeof (nrc->blimit_off);
1149 nrc->nparams = 3;
1150 nrc->pname = "get"; 928 nrc->pname = "get";
1151 ret = PQexecParams (plugin->dbh, 929 ret = PQexecParams (plugin->dbh,
1152 "SELECT count(*) FROM gn090 WHERE hash=$1", 930 "SELECT count(*) FROM gn090 WHERE hash=$1",
@@ -1200,6 +978,131 @@ postgres_plugin_get (void *cls,
1200 978
1201 979
1202/** 980/**
981 * Select a subset of the items in the datastore and call
982 * the given iterator for each of them.
983 *
984 * @param cls our "struct Plugin*"
985 * @param type entries of which type should be considered?
986 * Use 0 for any type.
987 * @param iter function to call on each matching value;
988 * will be called once with a NULL value at the end
989 * @param iter_cls closure for iter
990 */
991static void
992postgres_plugin_iter_zero_anonymity (void *cls,
993 enum GNUNET_BLOCK_Type type,
994 PluginIterator iter,
995 void *iter_cls)
996{
997 struct Plugin *plugin = cls;
998 struct NextRequestClosure *nrc;
999
1000 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1001 nrc->btype = htonl ((uint32_t) type);
1002 nrc->plugin = plugin;
1003 nrc->iter = iter;
1004 nrc->iter_cls = iter_cls;
1005 nrc->pname = "select_non_anonymous";
1006 nrc->nparams = 1;
1007 nrc->paramLengths[0] = sizeof (nrc->bcount);
1008 nrc->paramValues[0] = (const char*) &nrc->bcount;
1009 postgres_plugin_next_request (nrc,
1010 GNUNET_NO);
1011}
1012
1013/**
1014 * Context for 'repl_iter' function.
1015 */
1016struct ReplCtx
1017{
1018
1019 /**
1020 * Plugin handle.
1021 */
1022 struct Plugin *plugin;
1023
1024 /**
1025 * Function to call for the result (or the NULL).
1026 */
1027 PluginIterator iter;
1028
1029 /**
1030 * Closure for iter.
1031 */
1032 void *iter_cls;
1033};
1034
1035
1036/**
1037 * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1038 * Decrements the replication counter and calls the original
1039 * iterator.
1040 *
1041 * @param cls closure
1042 * @param next_cls closure to pass to the "next" function.
1043 * @param key key for the content
1044 * @param size number of bytes in data
1045 * @param data content stored
1046 * @param type type of the content
1047 * @param priority priority of the content
1048 * @param anonymity anonymity-level for the content
1049 * @param expiration expiration time for the content
1050 * @param uid unique identifier for the datum;
1051 * maybe 0 if no unique identifier is available
1052 *
1053 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1054 * (continue on call to "next", of course),
1055 * GNUNET_NO to delete the item and continue (if supported)
1056 */
1057static int
1058repl_iter (void *cls,
1059 void *next_cls,
1060 const GNUNET_HashCode *key,
1061 uint32_t size,
1062 const void *data,
1063 enum GNUNET_BLOCK_Type type,
1064 uint32_t priority,
1065 uint32_t anonymity,
1066 struct GNUNET_TIME_Absolute expiration,
1067 uint64_t uid)
1068{
1069 struct ReplCtx *rc = cls;
1070 struct Plugin *plugin = rc->plugin;
1071 int ret;
1072 PGresult *qret;
1073 uint32_t boid;
1074
1075 ret = rc->iter (rc->iter_cls,
1076 next_cls, key,
1077 size, data,
1078 type, priority, anonymity, expiration,
1079 uid);
1080 if (NULL != key)
1081 {
1082 boid = htonl ( (uint32_t) uid);
1083 const char *paramValues[] = {
1084 (const char *) &boid,
1085 };
1086 int paramLengths[] = {
1087 sizeof (boid),
1088 };
1089 const int paramFormats[] = { 1 };
1090 qret = PQexecPrepared (plugin->dbh,
1091 "decrepl",
1092 1, paramValues, paramLengths, paramFormats, 1);
1093 if (GNUNET_OK != check_result (plugin,
1094 qret,
1095 PGRES_COMMAND_OK,
1096 "PQexecPrepared",
1097 "decrepl", __LINE__))
1098 return GNUNET_SYSERR;
1099 PQclear (qret);
1100 }
1101 return ret;
1102}
1103
1104
1105/**
1203 * Get a random item for replication. Returns a single, not expired, random item 1106 * Get a random item for replication. Returns a single, not expired, random item
1204 * from those with the highest replication counters. The item's 1107 * from those with the highest replication counters. The item's
1205 * replication counter is decremented by one IF it was positive before. 1108 * replication counter is decremented by one IF it was positive before.
@@ -1213,9 +1116,21 @@ static void
1213postgres_plugin_replication_get (void *cls, 1116postgres_plugin_replication_get (void *cls,
1214 PluginIterator iter, void *iter_cls) 1117 PluginIterator iter, void *iter_cls)
1215{ 1118{
1216 /* FIXME: not implemented! */ 1119 struct Plugin *plugin = cls;
1217 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 1120 struct NextRequestClosure *nrc;
1218 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1121 struct ReplCtx rc;
1122
1123 rc.plugin = plugin;
1124 rc.iter = iter;
1125 rc.iter_cls = iter_cls;
1126 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1127 nrc->one_shot = GNUNET_YES;
1128 nrc->plugin = plugin;
1129 nrc->iter = &repl_iter;
1130 nrc->iter_cls = &rc;
1131 nrc->pname = "select_replication_order";
1132 nrc->nparams = 0;
1133 postgres_next_request_cont (nrc, NULL);
1219} 1134}
1220 1135
1221 1136
@@ -1231,34 +1146,80 @@ static void
1231postgres_plugin_expiration_get (void *cls, 1146postgres_plugin_expiration_get (void *cls,
1232 PluginIterator iter, void *iter_cls) 1147 PluginIterator iter, void *iter_cls)
1233{ 1148{
1234 /* FIXME: not implemented! */ 1149 struct Plugin *plugin = cls;
1235 iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 1150 struct NextRequestClosure *nrc;
1236 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1151 uint64_t btime;
1152
1153 btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
1154 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1155 nrc->one_shot = GNUNET_YES;
1156 nrc->plugin = plugin;
1157 nrc->iter = iter;
1158 nrc->iter_cls = iter_cls;
1159 nrc->pname = "select_expiration_order";
1160 nrc->nparams = 1;
1161 nrc->paramValues[0] = (const char *) &btime;
1162 nrc->paramLengths[0] = sizeof (btime);
1163 postgres_next_request_cont (nrc, NULL);
1237} 1164}
1238 1165
1239 1166
1240/** 1167/**
1241 * Select a subset of the items in the datastore and call 1168 * Update the priority for a particular key in the datastore. If
1242 * the given iterator for each of them. 1169 * the expiration time in value is different than the time found in
1170 * the datastore, the higher value should be kept. For the
1171 * anonymity level, the lower value is to be used. The specified
1172 * priority should be added to the existing priority, ignoring the
1173 * priority in value.
1174 *
1175 * Note that it is possible for multiple values to match this put.
1176 * In that case, all of the respective values are updated.
1243 * 1177 *
1244 * @param cls our "struct Plugin*" 1178 * @param cls our "struct Plugin*"
1245 * @param type entries of which type should be considered? 1179 * @param uid unique identifier of the datum
1246 * Use 0 for any type. 1180 * @param delta by how much should the priority
1247 * @param iter function to call on each matching value; 1181 * change? If priority + delta < 0 the
1248 * will be called once with a NULL value at the end 1182 * priority should be set to 0 (never go
1249 * @param iter_cls closure for iter 1183 * negative).
1184 * @param expire new expiration time should be the
1185 * MAX of any existing expiration time and
1186 * this value
1187 * @param msg set to error message
1188 * @return GNUNET_OK on success
1250 */ 1189 */
1251static void 1190static int
1252postgres_plugin_iter_zero_anonymity (void *cls, 1191postgres_plugin_update (void *cls,
1253 enum GNUNET_BLOCK_Type type, 1192 uint64_t uid,
1254 PluginIterator iter, 1193 int delta, struct GNUNET_TIME_Absolute expire,
1255 void *iter_cls) 1194 char **msg)
1256{ 1195{
1257 struct Plugin *plugin = cls; 1196 struct Plugin *plugin = cls;
1197 PGresult *ret;
1198 int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
1199 uint32_t boid = htonl ( (uint32_t) uid);
1200 uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
1201 const char *paramValues[] = {
1202 (const char *) &bdelta,
1203 (const char *) &bexpire,
1204 (const char *) &boid,
1205 };
1206 int paramLengths[] = {
1207 sizeof (bdelta),
1208 sizeof (bexpire),
1209 sizeof (boid),
1210 };
1211 const int paramFormats[] = { 1, 1, 1 };
1258 1212
1259 postgres_iterate (plugin, 1213 ret = PQexecPrepared (plugin->dbh,
1260 type, GNUNET_NO, 1, 1214 "update",
1261 iter, iter_cls); 1215 3, paramValues, paramLengths, paramFormats, 1);
1216 if (GNUNET_OK != check_result (plugin,
1217 ret,
1218 PGRES_COMMAND_OK,
1219 "PQexecPrepared", "update", __LINE__))
1220 return GNUNET_SYSERR;
1221 PQclear (ret);
1222 return GNUNET_OK;
1262} 1223}
1263 1224
1264 1225