aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c525
1 files changed, 174 insertions, 351 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index aea87fdf4..cb077f06a 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -44,103 +44,6 @@
44 44
45 45
46/** 46/**
47 * Closure for 'postgres_next_request_cont'.
48 */
49struct NextRequestClosure
50{
51 /**
52 * Global plugin data.
53 */
54 struct Plugin *plugin;
55
56 /**
57 * Function to call for each matching entry.
58 */
59 PluginIterator iter;
60
61 /**
62 * Closure for 'iter'.
63 */
64 void *iter_cls;
65
66 /**
67 * Parameters for the prepared statement.
68 */
69 const char *paramValues[5];
70
71 /**
72 * Name of the prepared statement to run.
73 */
74 const char *pname;
75
76 /**
77 * Size of values pointed to by paramValues.
78 */
79 int paramLengths[5];
80
81 /**
82 * Number of paramters in paramValues/paramLengths.
83 */
84 int nparams;
85
86 /**
87 * Current time (possible parameter), big-endian.
88 */
89 uint64_t bnow;
90
91 /**
92 * Key (possible parameter)
93 */
94 GNUNET_HashCode key;
95
96 /**
97 * Hash of value (possible parameter)
98 */
99 GNUNET_HashCode vhash;
100
101 /**
102 * Number of entries found so far
103 */
104 unsigned long long count;
105
106 /**
107 * Offset this iteration starts at.
108 */
109 uint64_t off;
110
111 /**
112 * Current offset to use in query, big-endian.
113 */
114 uint64_t blimit_off;
115
116 /**
117 * Current total number of entries found so far, big-endian.
118 */
119 uint64_t bcount;
120
121 /**
122 * Overall number of matching entries.
123 */
124 unsigned long long total;
125
126 /**
127 * Type of block (possible paramter), big-endian.
128 */
129 uint32_t btype;
130
131 /**
132 * Flag set to GNUNET_YES to stop iteration.
133 */
134 int end_it;
135
136 /**
137 * Flag to indicate that there should only be one result.
138 */
139 int one_shot;
140};
141
142
143/**
144 * Context for all functions in this plugin. 47 * Context for all functions in this plugin.
145 */ 48 */
146struct Plugin 49struct Plugin
@@ -155,16 +58,6 @@ struct Plugin
155 */ 58 */
156 PGconn *dbh; 59 PGconn *dbh;
157 60
158 /**
159 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
160 */
161 struct NextRequestClosure *next_task_nc;
162
163 /**
164 * Pending task with scheduler for running the next request.
165 */
166 GNUNET_SCHEDULER_TaskIdentifier next_task;
167
168}; 61};
169 62
170 63
@@ -434,7 +327,7 @@ init_connection (struct Plugin *plugin)
434 pq_prepare (plugin, 327 pq_prepare (plugin,
435 "select_non_anonymous", 328 "select_non_anonymous",
436 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 329 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
437 "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1", 330 "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
438 1, 331 1,
439 __LINE__)) || 332 __LINE__)) ||
440 (GNUNET_OK != 333 (GNUNET_OK !=
@@ -482,11 +375,13 @@ static int
482delete_by_rowid (struct Plugin *plugin, 375delete_by_rowid (struct Plugin *plugin,
483 unsigned int rowid) 376 unsigned int rowid)
484{ 377{
485 const char *paramValues[] = { (const char *) &rowid }; 378 uint32_t browid;
486 int paramLengths[] = { sizeof (rowid) }; 379 const char *paramValues[] = { (const char *) &browid };
380 int paramLengths[] = { sizeof (browid) };
487 const int paramFormats[] = { 1 }; 381 const int paramFormats[] = { 1 };
488 PGresult *ret; 382 PGresult *ret;
489 383
384 browid = htonl (rowid);
490 ret = PQexecPrepared (plugin->dbh, 385 ret = PQexecPrepared (plugin->dbh,
491 "delrow", 386 "delrow",
492 1, paramValues, paramLengths, paramFormats, 1); 387 1, paramValues, paramLengths, paramFormats, 1);
@@ -510,7 +405,7 @@ delete_by_rowid (struct Plugin *plugin,
510 * @return number of bytes used on disk 405 * @return number of bytes used on disk
511 */ 406 */
512static unsigned long long 407static unsigned long long
513postgres_plugin_get_size (void *cls) 408postgres_plugin_estimate_size (void *cls)
514{ 409{
515 struct Plugin *plugin = cls; 410 struct Plugin *plugin = cls;
516 unsigned long long total; 411 unsigned long long total;
@@ -619,22 +514,20 @@ postgres_plugin_put (void *cls,
619 514
620 515
621/** 516/**
622 * Function invoked on behalf of a "PluginIterator" 517 * Function invoked to process the result and call
623 * asking the database plugin to call the iterator 518 * the processor.
624 * with the next item.
625 * 519 *
626 * @param next_cls the 'struct NextRequestClosure' 520 * @param plugin global plugin data
627 * @param tc scheduler context 521 * @param proc function to call the value (once only).
522 * @param proc_cls closure for proc
523 * @param res result from exec
628 */ 524 */
629static void 525static void
630postgres_next_request_cont (void *next_cls, 526process_result (struct Plugin *plugin,
631 const struct GNUNET_SCHEDULER_TaskContext *tc) 527 PluginDatumProcessor proc, void *proc_cls,
528 PGresult *res)
632{ 529{
633 struct NextRequestClosure *nrc = next_cls;
634 struct Plugin *plugin = nrc->plugin;
635 const int paramFormats[] = { 1, 1, 1, 1, 1 };
636 int iret; 530 int iret;
637 PGresult *res;
638 enum GNUNET_BLOCK_Type type; 531 enum GNUNET_BLOCK_Type type;
639 uint32_t anonymity; 532 uint32_t anonymity;
640 uint32_t priority; 533 uint32_t priority;
@@ -643,38 +536,11 @@ postgres_next_request_cont (void *next_cls,
643 struct GNUNET_TIME_Absolute expiration_time; 536 struct GNUNET_TIME_Absolute expiration_time;
644 GNUNET_HashCode key; 537 GNUNET_HashCode key;
645 538
646 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
647 plugin->next_task_nc = NULL;
648 if ( (GNUNET_YES == nrc->end_it) ||
649 (nrc->count == nrc->total) )
650 {
651#if DEBUG_POSTGRES
652 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
653 "datastore-postgres",
654 "Ending iteration (%s)\n",
655 (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
656#endif
657 nrc->iter (nrc->iter_cls,
658 NULL, NULL, 0, NULL, 0, 0, 0,
659 GNUNET_TIME_UNIT_ZERO_ABS, 0);
660 GNUNET_free (nrc);
661 return;
662 }
663 if (nrc->off == nrc->total)
664 nrc->off = 0;
665 nrc->blimit_off = GNUNET_htonll (nrc->off);
666 nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
667 res = PQexecPrepared (plugin->dbh,
668 nrc->pname,
669 nrc->nparams,
670 nrc->paramValues,
671 nrc->paramLengths,
672 paramFormats, 1);
673 if (GNUNET_OK != check_result (plugin, 539 if (GNUNET_OK != check_result (plugin,
674 res, 540 res,
675 PGRES_TUPLES_OK, 541 PGRES_TUPLES_OK,
676 "PQexecPrepared", 542 "PQexecPrepared",
677 nrc->pname, 543 "select",
678 __LINE__)) 544 __LINE__))
679 { 545 {
680#if DEBUG_POSTGRES 546#if DEBUG_POSTGRES
@@ -682,10 +548,9 @@ postgres_next_request_cont (void *next_cls,
682 "datastore-postgres", 548 "datastore-postgres",
683 "Ending iteration (postgres error)\n"); 549 "Ending iteration (postgres error)\n");
684#endif 550#endif
685 nrc->iter (nrc->iter_cls, 551 proc (proc_cls,
686 NULL, NULL, 0, NULL, 0, 0, 0, 552 NULL, 0, NULL, 0, 0, 0,
687 GNUNET_TIME_UNIT_ZERO_ABS, 0); 553 GNUNET_TIME_UNIT_ZERO_ABS, 0);
688 GNUNET_free (nrc);
689 return; 554 return;
690 } 555 }
691 556
@@ -697,11 +562,10 @@ postgres_next_request_cont (void *next_cls,
697 "datastore-postgres", 562 "datastore-postgres",
698 "Ending iteration (no more results)\n"); 563 "Ending iteration (no more results)\n");
699#endif 564#endif
700 nrc->iter (nrc->iter_cls, 565 proc (proc_cls,
701 NULL, NULL, 0, NULL, 0, 0, 0, 566 NULL, 0, NULL, 0, 0, 0,
702 GNUNET_TIME_UNIT_ZERO_ABS, 0); 567 GNUNET_TIME_UNIT_ZERO_ABS, 0);
703 PQclear (res); 568 PQclear (res);
704 GNUNET_free (nrc);
705 return; 569 return;
706 } 570 }
707 if ((1 != PQntuples (res)) || 571 if ((1 != PQntuples (res)) ||
@@ -710,11 +574,10 @@ postgres_next_request_cont (void *next_cls,
710 (sizeof (uint32_t) != PQfsize (res, 6))) 574 (sizeof (uint32_t) != PQfsize (res, 6)))
711 { 575 {
712 GNUNET_break (0); 576 GNUNET_break (0);
713 nrc->iter (nrc->iter_cls, 577 proc (proc_cls,
714 NULL, NULL, 0, NULL, 0, 0, 0, 578 NULL, 0, NULL, 0, 0, 0,
715 GNUNET_TIME_UNIT_ZERO_ABS, 0); 579 GNUNET_TIME_UNIT_ZERO_ABS, 0);
716 PQclear (res); 580 PQclear (res);
717 GNUNET_free (nrc);
718 return; 581 return;
719 } 582 }
720 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6)); 583 rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
@@ -727,10 +590,9 @@ postgres_next_request_cont (void *next_cls,
727 GNUNET_break (0); 590 GNUNET_break (0);
728 PQclear (res); 591 PQclear (res);
729 delete_by_rowid (plugin, rowid); 592 delete_by_rowid (plugin, rowid);
730 nrc->iter (nrc->iter_cls, 593 proc (proc_cls,
731 NULL, NULL, 0, NULL, 0, 0, 0, 594 NULL, 0, NULL, 0, 0, 0,
732 GNUNET_TIME_UNIT_ZERO_ABS, 0); 595 GNUNET_TIME_UNIT_ZERO_ABS, 0);
733 GNUNET_free (nrc);
734 return; 596 return;
735 } 597 }
736 598
@@ -749,33 +611,23 @@ postgres_next_request_cont (void *next_cls,
749 (unsigned int) size, 611 (unsigned int) size,
750 (unsigned int) type); 612 (unsigned int) type);
751#endif 613#endif
752 iret = nrc->iter (nrc->iter_cls, 614 iret = proc (proc_cls,
753 (nrc->one_shot == GNUNET_YES) ? NULL : nrc, 615 &key,
754 &key, 616 size,
755 size, 617 PQgetvalue (res, 0, 5),
756 PQgetvalue (res, 0, 5), 618 (enum GNUNET_BLOCK_Type) type,
757 (enum GNUNET_BLOCK_Type) type, 619 priority,
758 priority, 620 anonymity,
759 anonymity, 621 expiration_time,
760 expiration_time, 622 rowid);
761 rowid);
762 PQclear (res); 623 PQclear (res);
763 if (iret != GNUNET_NO) 624 if (iret == GNUNET_NO)
764 {
765 nrc->count++;
766 nrc->off++;
767 }
768 if (iret == GNUNET_SYSERR)
769 { 625 {
770#if DEBUG_POSTGRES 626#if DEBUG_POSTGRES
771 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 627 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
772 "datastore-postgres", 628 "Processor asked for item %u to be removed.\n",
773 "Ending iteration (client error)\n"); 629 rowid);
774#endif 630#endif
775 return;
776 }
777 if (iret == GNUNET_NO)
778 {
779 if (GNUNET_OK == delete_by_rowid (plugin, rowid)) 631 if (GNUNET_OK == delete_by_rowid (plugin, rowid))
780 { 632 {
781#if DEBUG_POSTGRES 633#if DEBUG_POSTGRES
@@ -794,34 +646,6 @@ postgres_next_request_cont (void *next_cls,
794#endif 646#endif
795 } 647 }
796 } 648 }
797 if (nrc->one_shot == GNUNET_YES)
798 GNUNET_free (nrc);
799}
800
801
802/**
803 * Function invoked on behalf of a "PluginIterator"
804 * asking the database plugin to call the iterator
805 * with the next item.
806 *
807 * @param next_cls whatever argument was given
808 * to the PluginIterator as "next_cls".
809 * @param end_it set to GNUNET_YES if we
810 * should terminate the iteration early
811 * (iterator should be still called once more
812 * to signal the end of the iteration).
813 */
814static void
815postgres_plugin_next_request (void *next_cls,
816 int end_it)
817{
818 struct NextRequestClosure *nrc = next_cls;
819
820 if (GNUNET_YES == end_it)
821 nrc->end_it = GNUNET_YES;
822 nrc->plugin->next_task_nc = nrc;
823 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
824 nrc);
825} 649}
826 650
827 651
@@ -843,62 +667,62 @@ postgres_plugin_next_request (void *next_cls,
843 * @param iter_cls closure for iter 667 * @param iter_cls closure for iter
844 */ 668 */
845static void 669static void
846postgres_plugin_get (void *cls, 670postgres_plugin_get_key (void *cls,
847 const GNUNET_HashCode * key, 671 uint64_t offset,
848 const GNUNET_HashCode * vhash, 672 const GNUNET_HashCode *key,
849 enum GNUNET_BLOCK_Type type, 673 const GNUNET_HashCode *vhash,
850 PluginIterator iter, void *iter_cls) 674 enum GNUNET_BLOCK_Type type,
675 PluginDatumProcessor proc, void *proc_cls)
851{ 676{
852 struct Plugin *plugin = cls; 677 struct Plugin *plugin = cls;
853 struct NextRequestClosure *nrc;
854 const int paramFormats[] = { 1, 1, 1, 1, 1 }; 678 const int paramFormats[] = { 1, 1, 1, 1, 1 };
679 int paramLengths[4];
680 const char *paramValues[4];
681 int nparams;
682 const char *pname;
855 PGresult *ret; 683 PGresult *ret;
684 uint64_t total;
685 uint64_t blimit_off;
686 uint32_t btype;
856 687
857 GNUNET_assert (key != NULL); 688 GNUNET_assert (key != NULL);
858 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 689 paramValues[0] = (const char*) key;
859 nrc->plugin = plugin; 690 paramLengths[0] = sizeof (GNUNET_HashCode);
860 nrc->iter = iter; 691 btype = htonl (type);
861 nrc->iter_cls = iter_cls;
862 nrc->key = *key;
863 if (vhash != NULL)
864 nrc->vhash = *vhash;
865 nrc->paramValues[0] = (const char*) &nrc->key;
866 nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
867 nrc->btype = htonl (type);
868 if (type != 0) 692 if (type != 0)
869 { 693 {
870 if (vhash != NULL) 694 if (vhash != NULL)
871 { 695 {
872 nrc->paramValues[1] = (const char *) &nrc->vhash; 696 paramValues[1] = (const char *) vhash;
873 nrc->paramLengths[1] = sizeof (nrc->vhash); 697 paramLengths[1] = sizeof (GNUNET_HashCode);
874 nrc->paramValues[2] = (const char *) &nrc->btype; 698 paramValues[2] = (const char *) &btype;
875 nrc->paramLengths[2] = sizeof (nrc->btype); 699 paramLengths[2] = sizeof (btype);
876 nrc->paramValues[3] = (const char *) &nrc->blimit_off; 700 paramValues[3] = (const char *) &blimit_off;
877 nrc->paramLengths[3] = sizeof (nrc->blimit_off); 701 paramLengths[3] = sizeof (blimit_off);
878 nrc->nparams = 4; 702 nparams = 4;
879 nrc->pname = "getvt"; 703 pname = "getvt";
880 ret = PQexecParams (plugin->dbh, 704 ret = PQexecParams (plugin->dbh,
881 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 705 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
882 3, 706 3,
883 NULL, 707 NULL,
884 nrc->paramValues, 708 paramValues,
885 nrc->paramLengths, 709 paramLengths,
886 paramFormats, 1); 710 paramFormats, 1);
887 } 711 }
888 else 712 else
889 { 713 {
890 nrc->paramValues[1] = (const char *) &nrc->btype; 714 paramValues[1] = (const char *) &btype;
891 nrc->paramLengths[1] = sizeof (nrc->btype); 715 paramLengths[1] = sizeof (btype);
892 nrc->paramValues[2] = (const char *) &nrc->blimit_off; 716 paramValues[2] = (const char *) &blimit_off;
893 nrc->paramLengths[2] = sizeof (nrc->blimit_off); 717 paramLengths[2] = sizeof (blimit_off);
894 nrc->nparams = 3; 718 nparams = 3;
895 nrc->pname = "gett"; 719 pname = "gett";
896 ret = PQexecParams (plugin->dbh, 720 ret = PQexecParams (plugin->dbh,
897 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 721 "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
898 2, 722 2,
899 NULL, 723 NULL,
900 nrc->paramValues, 724 paramValues,
901 nrc->paramLengths, 725 paramLengths,
902 paramFormats, 1); 726 paramFormats, 1);
903 } 727 }
904 } 728 }
@@ -906,32 +730,32 @@ postgres_plugin_get (void *cls,
906 { 730 {
907 if (vhash != NULL) 731 if (vhash != NULL)
908 { 732 {
909 nrc->paramValues[1] = (const char *) &nrc->vhash; 733 paramValues[1] = (const char *) vhash;
910 nrc->paramLengths[1] = sizeof (nrc->vhash); 734 paramLengths[1] = sizeof (GNUNET_HashCode);
911 nrc->paramValues[2] = (const char *) &nrc->blimit_off; 735 paramValues[2] = (const char *) &blimit_off;
912 nrc->paramLengths[2] = sizeof (nrc->blimit_off); 736 paramLengths[2] = sizeof (blimit_off);
913 nrc->nparams = 3; 737 nparams = 3;
914 nrc->pname = "getv"; 738 pname = "getv";
915 ret = PQexecParams (plugin->dbh, 739 ret = PQexecParams (plugin->dbh,
916 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 740 "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
917 2, 741 2,
918 NULL, 742 NULL,
919 nrc->paramValues, 743 paramValues,
920 nrc->paramLengths, 744 paramLengths,
921 paramFormats, 1); 745 paramFormats, 1);
922 } 746 }
923 else 747 else
924 { 748 {
925 nrc->paramValues[1] = (const char *) &nrc->blimit_off; 749 paramValues[1] = (const char *) &blimit_off;
926 nrc->paramLengths[1] = sizeof (nrc->blimit_off); 750 paramLengths[1] = sizeof (blimit_off);
927 nrc->nparams = 2; 751 nparams = 2;
928 nrc->pname = "get"; 752 pname = "get";
929 ret = PQexecParams (plugin->dbh, 753 ret = PQexecParams (plugin->dbh,
930 "SELECT count(*) FROM gn090 WHERE hash=$1", 754 "SELECT count(*) FROM gn090 WHERE hash=$1",
931 1, 755 1,
932 NULL, 756 NULL,
933 nrc->paramValues, 757 paramValues,
934 nrc->paramLengths, 758 paramLengths,
935 paramFormats, 1); 759 paramFormats, 1);
936 } 760 }
937 } 761 }
@@ -939,13 +763,12 @@ postgres_plugin_get (void *cls,
939 ret, 763 ret,
940 PGRES_TUPLES_OK, 764 PGRES_TUPLES_OK,
941 "PQexecParams", 765 "PQexecParams",
942 nrc->pname, 766 pname,
943 __LINE__)) 767 __LINE__))
944 { 768 {
945 iter (iter_cls, 769 proc (proc_cls,
946 NULL, NULL, 0, NULL, 0, 0, 0, 770 NULL, 0, NULL, 0, 0, 0,
947 GNUNET_TIME_UNIT_ZERO_ABS, 0); 771 GNUNET_TIME_UNIT_ZERO_ABS, 0);
948 GNUNET_free (nrc);
949 return; 772 return;
950 } 773 }
951 if ((PQntuples (ret) != 1) || 774 if ((PQntuples (ret) != 1) ||
@@ -954,26 +777,30 @@ postgres_plugin_get (void *cls,
954 { 777 {
955 GNUNET_break (0); 778 GNUNET_break (0);
956 PQclear (ret); 779 PQclear (ret);
957 iter (iter_cls, 780 proc (proc_cls,
958 NULL, NULL, 0, NULL, 0, 0, 0, 781 NULL, 0, NULL, 0, 0, 0,
959 GNUNET_TIME_UNIT_ZERO_ABS, 0); 782 GNUNET_TIME_UNIT_ZERO_ABS, 0);
960 GNUNET_free (nrc);
961 return; 783 return;
962 } 784 }
963 nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); 785 total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
964 PQclear (ret); 786 PQclear (ret);
965 if (nrc->total == 0) 787 if (total == 0)
966 { 788 {
967 iter (iter_cls, 789 proc (proc_cls,
968 NULL, NULL, 0, NULL, 0, 0, 0, 790 NULL, 0, NULL, 0, 0, 0,
969 GNUNET_TIME_UNIT_ZERO_ABS, 0); 791 GNUNET_TIME_UNIT_ZERO_ABS, 0);
970 GNUNET_free (nrc);
971 return; 792 return;
972 } 793 }
973 nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 794 blimit_off = GNUNET_htonll (offset % total);
974 nrc->total); 795 ret = PQexecPrepared (plugin->dbh,
975 postgres_plugin_next_request (nrc, 796 pname,
976 GNUNET_NO); 797 nparams,
798 paramValues,
799 paramLengths,
800 paramFormats, 1);
801 process_result (plugin,
802 proc, proc_cls,
803 ret);
977} 804}
978 805
979 806
@@ -989,28 +816,33 @@ postgres_plugin_get (void *cls,
989 * @param iter_cls closure for iter 816 * @param iter_cls closure for iter
990 */ 817 */
991static void 818static void
992postgres_plugin_iter_zero_anonymity (void *cls, 819postgres_plugin_get_zero_anonymity (void *cls,
993 enum GNUNET_BLOCK_Type type, 820 uint64_t offset,
994 PluginIterator iter, 821 enum GNUNET_BLOCK_Type type,
995 void *iter_cls) 822 PluginDatumProcessor proc, void *proc_cls)
996{ 823{
997 struct Plugin *plugin = cls; 824 struct Plugin *plugin = cls;
998 struct NextRequestClosure *nrc; 825 uint32_t btype;
826 uint64_t boff;
827 const int paramFormats[] = { 1, 1 };
828 int paramLengths[] = { sizeof (btype), sizeof (boff) };
829 const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
830 PGresult *ret;
999 831
1000 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 832 btype = htonl ((uint32_t) type);
1001 nrc->total = UINT32_MAX; 833 boff = GNUNET_htonll (offset);
1002 nrc->btype = htonl ((uint32_t) type); 834 ret = PQexecPrepared (plugin->dbh,
1003 nrc->plugin = plugin; 835 "select_non_anonymous",
1004 nrc->iter = iter; 836 2,
1005 nrc->iter_cls = iter_cls; 837 paramValues,
1006 nrc->pname = "select_non_anonymous"; 838 paramLengths,
1007 nrc->nparams = 1; 839 paramFormats, 1);
1008 nrc->paramLengths[0] = sizeof (nrc->bcount); 840 process_result (plugin,
1009 nrc->paramValues[0] = (const char*) &nrc->bcount; 841 proc, proc_cls,
1010 postgres_plugin_next_request (nrc, 842 ret);
1011 GNUNET_NO);
1012} 843}
1013 844
845
1014/** 846/**
1015 * Context for 'repl_iter' function. 847 * Context for 'repl_iter' function.
1016 */ 848 */
@@ -1025,12 +857,12 @@ struct ReplCtx
1025 /** 857 /**
1026 * Function to call for the result (or the NULL). 858 * Function to call for the result (or the NULL).
1027 */ 859 */
1028 PluginIterator iter; 860 PluginDatumProcessor proc;
1029 861
1030 /** 862 /**
1031 * Closure for iter. 863 * Closure for proc.
1032 */ 864 */
1033 void *iter_cls; 865 void *proc_cls;
1034}; 866};
1035 867
1036 868
@@ -1056,8 +888,7 @@ struct ReplCtx
1056 * GNUNET_NO to delete the item and continue (if supported) 888 * GNUNET_NO to delete the item and continue (if supported)
1057 */ 889 */
1058static int 890static int
1059repl_iter (void *cls, 891repl_proc (void *cls,
1060 void *next_cls,
1061 const GNUNET_HashCode *key, 892 const GNUNET_HashCode *key,
1062 uint32_t size, 893 uint32_t size,
1063 const void *data, 894 const void *data,
@@ -1073,8 +904,8 @@ repl_iter (void *cls,
1073 PGresult *qret; 904 PGresult *qret;
1074 uint32_t boid; 905 uint32_t boid;
1075 906
1076 ret = rc->iter (rc->iter_cls, 907 ret = rc->proc (rc->proc_cls,
1077 next_cls, key, 908 key,
1078 size, data, 909 size, data,
1079 type, priority, anonymity, expiration, 910 type, priority, anonymity, expiration,
1080 uid); 911 uid);
@@ -1107,32 +938,30 @@ repl_iter (void *cls,
1107 * Get a random item for replication. Returns a single, not expired, random item 938 * Get a random item for replication. Returns a single, not expired, random item
1108 * from those with the highest replication counters. The item's 939 * from those with the highest replication counters. The item's
1109 * replication counter is decremented by one IF it was positive before. 940 * replication counter is decremented by one IF it was positive before.
1110 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 941 * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1111 * 942 *
1112 * @param cls closure 943 * @param cls closure
1113 * @param iter function to call the value (once only). 944 * @param proc function to call the value (once only).
1114 * @param iter_cls closure for iter 945 * @param proc_cls closure for iter
1115 */ 946 */
1116static void 947static void
1117postgres_plugin_replication_get (void *cls, 948postgres_plugin_get_replication (void *cls,
1118 PluginIterator iter, void *iter_cls) 949 PluginDatumProcessor proc, void *proc_cls)
1119{ 950{
1120 struct Plugin *plugin = cls; 951 struct Plugin *plugin = cls;
1121 struct NextRequestClosure *nrc;
1122 struct ReplCtx rc; 952 struct ReplCtx rc;
953 PGresult *ret;
1123 954
1124 rc.plugin = plugin; 955 rc.plugin = plugin;
1125 rc.iter = iter; 956 rc.proc = proc;
1126 rc.iter_cls = iter_cls; 957 rc.proc_cls = proc_cls;
1127 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 958 ret = PQexecPrepared (plugin->dbh,
1128 nrc->one_shot = GNUNET_YES; 959 "select_replication_order",
1129 nrc->total = 1; 960 0,
1130 nrc->plugin = plugin; 961 NULL, NULL, NULL, 1);
1131 nrc->iter = &repl_iter; 962 process_result (plugin,
1132 nrc->iter_cls = &rc; 963 &repl_proc, &rc,
1133 nrc->pname = "select_replication_order"; 964 ret);
1134 nrc->nparams = 0;
1135 postgres_next_request_cont (nrc, NULL);
1136} 965}
1137 966
1138 967
@@ -1141,29 +970,31 @@ postgres_plugin_replication_get (void *cls,
1141 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 970 * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1142 * 971 *
1143 * @param cls closure 972 * @param cls closure
1144 * @param iter function to call the value (once only). 973 * @param proc function to call the value (once only).
1145 * @param iter_cls closure for iter 974 * @param proc_cls closure for iter
1146 */ 975 */
1147static void 976static void
1148postgres_plugin_expiration_get (void *cls, 977postgres_plugin_get_expiration (void *cls,
1149 PluginIterator iter, void *iter_cls) 978 PluginDatumProcessor proc, void *proc_cls)
1150{ 979{
1151 struct Plugin *plugin = cls; 980 struct Plugin *plugin = cls;
1152 struct NextRequestClosure *nrc;
1153 uint64_t btime; 981 uint64_t btime;
982 const int paramFormats[] = { 1 };
983 int paramLengths[] = { sizeof (btime) };
984 const char *paramValues[] = { (const char*) &btime };
985 PGresult *ret;
1154 986
1155 btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value); 987 btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
1156 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 988 ret = PQexecPrepared (plugin->dbh,
1157 nrc->one_shot = GNUNET_YES; 989 "select_expiration_order",
1158 nrc->total = 1; 990 1,
1159 nrc->plugin = plugin; 991 paramValues,
1160 nrc->iter = iter; 992 paramLengths,
1161 nrc->iter_cls = iter_cls; 993 paramFormats,
1162 nrc->pname = "select_expiration_order"; 994 1);
1163 nrc->nparams = 1; 995 process_result (plugin,
1164 nrc->paramValues[0] = (const char *) &btime; 996 proc, proc_cls,
1165 nrc->paramLengths[0] = sizeof (btime); 997 ret);
1166 postgres_next_request_cont (nrc, NULL);
1167} 998}
1168 999
1169 1000
@@ -1260,14 +1091,13 @@ libgnunet_plugin_datastore_postgres_init (void *cls)
1260 } 1091 }
1261 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); 1092 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1262 api->cls = plugin; 1093 api->cls = plugin;
1263 api->get_size = &postgres_plugin_get_size; 1094 api->estimate_size = &postgres_plugin_estimate_size;
1264 api->put = &postgres_plugin_put; 1095 api->put = &postgres_plugin_put;
1265 api->next_request = &postgres_plugin_next_request;
1266 api->get = &postgres_plugin_get;
1267 api->replication_get = &postgres_plugin_replication_get;
1268 api->expiration_get = &postgres_plugin_expiration_get;
1269 api->update = &postgres_plugin_update; 1096 api->update = &postgres_plugin_update;
1270 api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity; 1097 api->get_key = &postgres_plugin_get_key;
1098 api->get_replication = &postgres_plugin_get_replication;
1099 api->get_expiration = &postgres_plugin_get_expiration;
1100 api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
1271 api->drop = &postgres_plugin_drop; 1101 api->drop = &postgres_plugin_drop;
1272 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 1102 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1273 "datastore-postgres", 1103 "datastore-postgres",
@@ -1287,13 +1117,6 @@ libgnunet_plugin_datastore_postgres_done (void *cls)
1287 struct GNUNET_DATASTORE_PluginFunctions *api = cls; 1117 struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1288 struct Plugin *plugin = api->cls; 1118 struct Plugin *plugin = api->cls;
1289 1119
1290 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1291 {
1292 GNUNET_SCHEDULER_cancel (plugin->next_task);
1293 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1294 GNUNET_free (plugin->next_task_nc);
1295 plugin->next_task_nc = NULL;
1296 }
1297 PQfinish (plugin->dbh); 1120 PQfinish (plugin->dbh);
1298 GNUNET_free (plugin); 1121 GNUNET_free (plugin);
1299 GNUNET_free (api); 1122 GNUNET_free (api);