diff options
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r-- | src/datastore/plugin_datastore_postgres.c | 525 |
1 files changed, 174 insertions, 351 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index aea87fdf4..cb077f06a 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c | |||
@@ -44,103 +44,6 @@ | |||
44 | 44 | ||
45 | 45 | ||
46 | /** | 46 | /** |
47 | * Closure for 'postgres_next_request_cont'. | ||
48 | */ | ||
49 | struct NextRequestClosure | ||
50 | { | ||
51 | /** | ||
52 | * Global plugin data. | ||
53 | */ | ||
54 | struct Plugin *plugin; | ||
55 | |||
56 | /** | ||
57 | * Function to call for each matching entry. | ||
58 | */ | ||
59 | PluginIterator iter; | ||
60 | |||
61 | /** | ||
62 | * Closure for 'iter'. | ||
63 | */ | ||
64 | void *iter_cls; | ||
65 | |||
66 | /** | ||
67 | * Parameters for the prepared statement. | ||
68 | */ | ||
69 | const char *paramValues[5]; | ||
70 | |||
71 | /** | ||
72 | * Name of the prepared statement to run. | ||
73 | */ | ||
74 | const char *pname; | ||
75 | |||
76 | /** | ||
77 | * Size of values pointed to by paramValues. | ||
78 | */ | ||
79 | int paramLengths[5]; | ||
80 | |||
81 | /** | ||
82 | * Number of paramters in paramValues/paramLengths. | ||
83 | */ | ||
84 | int nparams; | ||
85 | |||
86 | /** | ||
87 | * Current time (possible parameter), big-endian. | ||
88 | */ | ||
89 | uint64_t bnow; | ||
90 | |||
91 | /** | ||
92 | * Key (possible parameter) | ||
93 | */ | ||
94 | GNUNET_HashCode key; | ||
95 | |||
96 | /** | ||
97 | * Hash of value (possible parameter) | ||
98 | */ | ||
99 | GNUNET_HashCode vhash; | ||
100 | |||
101 | /** | ||
102 | * Number of entries found so far | ||
103 | */ | ||
104 | unsigned long long count; | ||
105 | |||
106 | /** | ||
107 | * Offset this iteration starts at. | ||
108 | */ | ||
109 | uint64_t off; | ||
110 | |||
111 | /** | ||
112 | * Current offset to use in query, big-endian. | ||
113 | */ | ||
114 | uint64_t blimit_off; | ||
115 | |||
116 | /** | ||
117 | * Current total number of entries found so far, big-endian. | ||
118 | */ | ||
119 | uint64_t bcount; | ||
120 | |||
121 | /** | ||
122 | * Overall number of matching entries. | ||
123 | */ | ||
124 | unsigned long long total; | ||
125 | |||
126 | /** | ||
127 | * Type of block (possible paramter), big-endian. | ||
128 | */ | ||
129 | uint32_t btype; | ||
130 | |||
131 | /** | ||
132 | * Flag set to GNUNET_YES to stop iteration. | ||
133 | */ | ||
134 | int end_it; | ||
135 | |||
136 | /** | ||
137 | * Flag to indicate that there should only be one result. | ||
138 | */ | ||
139 | int one_shot; | ||
140 | }; | ||
141 | |||
142 | |||
143 | /** | ||
144 | * Context for all functions in this plugin. | 47 | * Context for all functions in this plugin. |
145 | */ | 48 | */ |
146 | struct Plugin | 49 | struct Plugin |
@@ -155,16 +58,6 @@ struct Plugin | |||
155 | */ | 58 | */ |
156 | PGconn *dbh; | 59 | PGconn *dbh; |
157 | 60 | ||
158 | /** | ||
159 | * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). | ||
160 | */ | ||
161 | struct NextRequestClosure *next_task_nc; | ||
162 | |||
163 | /** | ||
164 | * Pending task with scheduler for running the next request. | ||
165 | */ | ||
166 | GNUNET_SCHEDULER_TaskIdentifier next_task; | ||
167 | |||
168 | }; | 61 | }; |
169 | 62 | ||
170 | 63 | ||
@@ -434,7 +327,7 @@ init_connection (struct Plugin *plugin) | |||
434 | pq_prepare (plugin, | 327 | pq_prepare (plugin, |
435 | "select_non_anonymous", | 328 | "select_non_anonymous", |
436 | "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 329 | "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " |
437 | "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1", | 330 | "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2", |
438 | 1, | 331 | 1, |
439 | __LINE__)) || | 332 | __LINE__)) || |
440 | (GNUNET_OK != | 333 | (GNUNET_OK != |
@@ -482,11 +375,13 @@ static int | |||
482 | delete_by_rowid (struct Plugin *plugin, | 375 | delete_by_rowid (struct Plugin *plugin, |
483 | unsigned int rowid) | 376 | unsigned int rowid) |
484 | { | 377 | { |
485 | const char *paramValues[] = { (const char *) &rowid }; | 378 | uint32_t browid; |
486 | int paramLengths[] = { sizeof (rowid) }; | 379 | const char *paramValues[] = { (const char *) &browid }; |
380 | int paramLengths[] = { sizeof (browid) }; | ||
487 | const int paramFormats[] = { 1 }; | 381 | const int paramFormats[] = { 1 }; |
488 | PGresult *ret; | 382 | PGresult *ret; |
489 | 383 | ||
384 | browid = htonl (rowid); | ||
490 | ret = PQexecPrepared (plugin->dbh, | 385 | ret = PQexecPrepared (plugin->dbh, |
491 | "delrow", | 386 | "delrow", |
492 | 1, paramValues, paramLengths, paramFormats, 1); | 387 | 1, paramValues, paramLengths, paramFormats, 1); |
@@ -510,7 +405,7 @@ delete_by_rowid (struct Plugin *plugin, | |||
510 | * @return number of bytes used on disk | 405 | * @return number of bytes used on disk |
511 | */ | 406 | */ |
512 | static unsigned long long | 407 | static unsigned long long |
513 | postgres_plugin_get_size (void *cls) | 408 | postgres_plugin_estimate_size (void *cls) |
514 | { | 409 | { |
515 | struct Plugin *plugin = cls; | 410 | struct Plugin *plugin = cls; |
516 | unsigned long long total; | 411 | unsigned long long total; |
@@ -619,22 +514,20 @@ postgres_plugin_put (void *cls, | |||
619 | 514 | ||
620 | 515 | ||
621 | /** | 516 | /** |
622 | * Function invoked on behalf of a "PluginIterator" | 517 | * Function invoked to process the result and call |
623 | * asking the database plugin to call the iterator | 518 | * the processor. |
624 | * with the next item. | ||
625 | * | 519 | * |
626 | * @param next_cls the 'struct NextRequestClosure' | 520 | * @param plugin global plugin data |
627 | * @param tc scheduler context | 521 | * @param proc function to call the value (once only). |
522 | * @param proc_cls closure for proc | ||
523 | * @param res result from exec | ||
628 | */ | 524 | */ |
629 | static void | 525 | static void |
630 | postgres_next_request_cont (void *next_cls, | 526 | process_result (struct Plugin *plugin, |
631 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 527 | PluginDatumProcessor proc, void *proc_cls, |
528 | PGresult *res) | ||
632 | { | 529 | { |
633 | struct NextRequestClosure *nrc = next_cls; | ||
634 | struct Plugin *plugin = nrc->plugin; | ||
635 | const int paramFormats[] = { 1, 1, 1, 1, 1 }; | ||
636 | int iret; | 530 | int iret; |
637 | PGresult *res; | ||
638 | enum GNUNET_BLOCK_Type type; | 531 | enum GNUNET_BLOCK_Type type; |
639 | uint32_t anonymity; | 532 | uint32_t anonymity; |
640 | uint32_t priority; | 533 | uint32_t priority; |
@@ -643,38 +536,11 @@ postgres_next_request_cont (void *next_cls, | |||
643 | struct GNUNET_TIME_Absolute expiration_time; | 536 | struct GNUNET_TIME_Absolute expiration_time; |
644 | GNUNET_HashCode key; | 537 | GNUNET_HashCode key; |
645 | 538 | ||
646 | plugin->next_task = GNUNET_SCHEDULER_NO_TASK; | ||
647 | plugin->next_task_nc = NULL; | ||
648 | if ( (GNUNET_YES == nrc->end_it) || | ||
649 | (nrc->count == nrc->total) ) | ||
650 | { | ||
651 | #if DEBUG_POSTGRES | ||
652 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | ||
653 | "datastore-postgres", | ||
654 | "Ending iteration (%s)\n", | ||
655 | (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set"); | ||
656 | #endif | ||
657 | nrc->iter (nrc->iter_cls, | ||
658 | NULL, NULL, 0, NULL, 0, 0, 0, | ||
659 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
660 | GNUNET_free (nrc); | ||
661 | return; | ||
662 | } | ||
663 | if (nrc->off == nrc->total) | ||
664 | nrc->off = 0; | ||
665 | nrc->blimit_off = GNUNET_htonll (nrc->off); | ||
666 | nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count); | ||
667 | res = PQexecPrepared (plugin->dbh, | ||
668 | nrc->pname, | ||
669 | nrc->nparams, | ||
670 | nrc->paramValues, | ||
671 | nrc->paramLengths, | ||
672 | paramFormats, 1); | ||
673 | if (GNUNET_OK != check_result (plugin, | 539 | if (GNUNET_OK != check_result (plugin, |
674 | res, | 540 | res, |
675 | PGRES_TUPLES_OK, | 541 | PGRES_TUPLES_OK, |
676 | "PQexecPrepared", | 542 | "PQexecPrepared", |
677 | nrc->pname, | 543 | "select", |
678 | __LINE__)) | 544 | __LINE__)) |
679 | { | 545 | { |
680 | #if DEBUG_POSTGRES | 546 | #if DEBUG_POSTGRES |
@@ -682,10 +548,9 @@ postgres_next_request_cont (void *next_cls, | |||
682 | "datastore-postgres", | 548 | "datastore-postgres", |
683 | "Ending iteration (postgres error)\n"); | 549 | "Ending iteration (postgres error)\n"); |
684 | #endif | 550 | #endif |
685 | nrc->iter (nrc->iter_cls, | 551 | proc (proc_cls, |
686 | NULL, NULL, 0, NULL, 0, 0, 0, | 552 | NULL, 0, NULL, 0, 0, 0, |
687 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 553 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
688 | GNUNET_free (nrc); | ||
689 | return; | 554 | return; |
690 | } | 555 | } |
691 | 556 | ||
@@ -697,11 +562,10 @@ postgres_next_request_cont (void *next_cls, | |||
697 | "datastore-postgres", | 562 | "datastore-postgres", |
698 | "Ending iteration (no more results)\n"); | 563 | "Ending iteration (no more results)\n"); |
699 | #endif | 564 | #endif |
700 | nrc->iter (nrc->iter_cls, | 565 | proc (proc_cls, |
701 | NULL, NULL, 0, NULL, 0, 0, 0, | 566 | NULL, 0, NULL, 0, 0, 0, |
702 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 567 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
703 | PQclear (res); | 568 | PQclear (res); |
704 | GNUNET_free (nrc); | ||
705 | return; | 569 | return; |
706 | } | 570 | } |
707 | if ((1 != PQntuples (res)) || | 571 | if ((1 != PQntuples (res)) || |
@@ -710,11 +574,10 @@ postgres_next_request_cont (void *next_cls, | |||
710 | (sizeof (uint32_t) != PQfsize (res, 6))) | 574 | (sizeof (uint32_t) != PQfsize (res, 6))) |
711 | { | 575 | { |
712 | GNUNET_break (0); | 576 | GNUNET_break (0); |
713 | nrc->iter (nrc->iter_cls, | 577 | proc (proc_cls, |
714 | NULL, NULL, 0, NULL, 0, 0, 0, | 578 | NULL, 0, NULL, 0, 0, 0, |
715 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 579 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
716 | PQclear (res); | 580 | PQclear (res); |
717 | GNUNET_free (nrc); | ||
718 | return; | 581 | return; |
719 | } | 582 | } |
720 | rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6)); | 583 | rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6)); |
@@ -727,10 +590,9 @@ postgres_next_request_cont (void *next_cls, | |||
727 | GNUNET_break (0); | 590 | GNUNET_break (0); |
728 | PQclear (res); | 591 | PQclear (res); |
729 | delete_by_rowid (plugin, rowid); | 592 | delete_by_rowid (plugin, rowid); |
730 | nrc->iter (nrc->iter_cls, | 593 | proc (proc_cls, |
731 | NULL, NULL, 0, NULL, 0, 0, 0, | 594 | NULL, 0, NULL, 0, 0, 0, |
732 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 595 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
733 | GNUNET_free (nrc); | ||
734 | return; | 596 | return; |
735 | } | 597 | } |
736 | 598 | ||
@@ -749,33 +611,23 @@ postgres_next_request_cont (void *next_cls, | |||
749 | (unsigned int) size, | 611 | (unsigned int) size, |
750 | (unsigned int) type); | 612 | (unsigned int) type); |
751 | #endif | 613 | #endif |
752 | iret = nrc->iter (nrc->iter_cls, | 614 | iret = proc (proc_cls, |
753 | (nrc->one_shot == GNUNET_YES) ? NULL : nrc, | 615 | &key, |
754 | &key, | 616 | size, |
755 | size, | 617 | PQgetvalue (res, 0, 5), |
756 | PQgetvalue (res, 0, 5), | 618 | (enum GNUNET_BLOCK_Type) type, |
757 | (enum GNUNET_BLOCK_Type) type, | 619 | priority, |
758 | priority, | 620 | anonymity, |
759 | anonymity, | 621 | expiration_time, |
760 | expiration_time, | 622 | rowid); |
761 | rowid); | ||
762 | PQclear (res); | 623 | PQclear (res); |
763 | if (iret != GNUNET_NO) | 624 | if (iret == GNUNET_NO) |
764 | { | ||
765 | nrc->count++; | ||
766 | nrc->off++; | ||
767 | } | ||
768 | if (iret == GNUNET_SYSERR) | ||
769 | { | 625 | { |
770 | #if DEBUG_POSTGRES | 626 | #if DEBUG_POSTGRES |
771 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | 627 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
772 | "datastore-postgres", | 628 | "Processor asked for item %u to be removed.\n", |
773 | "Ending iteration (client error)\n"); | 629 | rowid); |
774 | #endif | 630 | #endif |
775 | return; | ||
776 | } | ||
777 | if (iret == GNUNET_NO) | ||
778 | { | ||
779 | if (GNUNET_OK == delete_by_rowid (plugin, rowid)) | 631 | if (GNUNET_OK == delete_by_rowid (plugin, rowid)) |
780 | { | 632 | { |
781 | #if DEBUG_POSTGRES | 633 | #if DEBUG_POSTGRES |
@@ -794,34 +646,6 @@ postgres_next_request_cont (void *next_cls, | |||
794 | #endif | 646 | #endif |
795 | } | 647 | } |
796 | } | 648 | } |
797 | if (nrc->one_shot == GNUNET_YES) | ||
798 | GNUNET_free (nrc); | ||
799 | } | ||
800 | |||
801 | |||
802 | /** | ||
803 | * Function invoked on behalf of a "PluginIterator" | ||
804 | * asking the database plugin to call the iterator | ||
805 | * with the next item. | ||
806 | * | ||
807 | * @param next_cls whatever argument was given | ||
808 | * to the PluginIterator as "next_cls". | ||
809 | * @param end_it set to GNUNET_YES if we | ||
810 | * should terminate the iteration early | ||
811 | * (iterator should be still called once more | ||
812 | * to signal the end of the iteration). | ||
813 | */ | ||
814 | static void | ||
815 | postgres_plugin_next_request (void *next_cls, | ||
816 | int end_it) | ||
817 | { | ||
818 | struct NextRequestClosure *nrc = next_cls; | ||
819 | |||
820 | if (GNUNET_YES == end_it) | ||
821 | nrc->end_it = GNUNET_YES; | ||
822 | nrc->plugin->next_task_nc = nrc; | ||
823 | nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont, | ||
824 | nrc); | ||
825 | } | 649 | } |
826 | 650 | ||
827 | 651 | ||
@@ -843,62 +667,62 @@ postgres_plugin_next_request (void *next_cls, | |||
843 | * @param iter_cls closure for iter | 667 | * @param iter_cls closure for iter |
844 | */ | 668 | */ |
845 | static void | 669 | static void |
846 | postgres_plugin_get (void *cls, | 670 | postgres_plugin_get_key (void *cls, |
847 | const GNUNET_HashCode * key, | 671 | uint64_t offset, |
848 | const GNUNET_HashCode * vhash, | 672 | const GNUNET_HashCode *key, |
849 | enum GNUNET_BLOCK_Type type, | 673 | const GNUNET_HashCode *vhash, |
850 | PluginIterator iter, void *iter_cls) | 674 | enum GNUNET_BLOCK_Type type, |
675 | PluginDatumProcessor proc, void *proc_cls) | ||
851 | { | 676 | { |
852 | struct Plugin *plugin = cls; | 677 | struct Plugin *plugin = cls; |
853 | struct NextRequestClosure *nrc; | ||
854 | const int paramFormats[] = { 1, 1, 1, 1, 1 }; | 678 | const int paramFormats[] = { 1, 1, 1, 1, 1 }; |
679 | int paramLengths[4]; | ||
680 | const char *paramValues[4]; | ||
681 | int nparams; | ||
682 | const char *pname; | ||
855 | PGresult *ret; | 683 | PGresult *ret; |
684 | uint64_t total; | ||
685 | uint64_t blimit_off; | ||
686 | uint32_t btype; | ||
856 | 687 | ||
857 | GNUNET_assert (key != NULL); | 688 | GNUNET_assert (key != NULL); |
858 | nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); | 689 | paramValues[0] = (const char*) key; |
859 | nrc->plugin = plugin; | 690 | paramLengths[0] = sizeof (GNUNET_HashCode); |
860 | nrc->iter = iter; | 691 | btype = htonl (type); |
861 | nrc->iter_cls = iter_cls; | ||
862 | nrc->key = *key; | ||
863 | if (vhash != NULL) | ||
864 | nrc->vhash = *vhash; | ||
865 | nrc->paramValues[0] = (const char*) &nrc->key; | ||
866 | nrc->paramLengths[0] = sizeof (GNUNET_HashCode); | ||
867 | nrc->btype = htonl (type); | ||
868 | if (type != 0) | 692 | if (type != 0) |
869 | { | 693 | { |
870 | if (vhash != NULL) | 694 | if (vhash != NULL) |
871 | { | 695 | { |
872 | nrc->paramValues[1] = (const char *) &nrc->vhash; | 696 | paramValues[1] = (const char *) vhash; |
873 | nrc->paramLengths[1] = sizeof (nrc->vhash); | 697 | paramLengths[1] = sizeof (GNUNET_HashCode); |
874 | nrc->paramValues[2] = (const char *) &nrc->btype; | 698 | paramValues[2] = (const char *) &btype; |
875 | nrc->paramLengths[2] = sizeof (nrc->btype); | 699 | paramLengths[2] = sizeof (btype); |
876 | nrc->paramValues[3] = (const char *) &nrc->blimit_off; | 700 | paramValues[3] = (const char *) &blimit_off; |
877 | nrc->paramLengths[3] = sizeof (nrc->blimit_off); | 701 | paramLengths[3] = sizeof (blimit_off); |
878 | nrc->nparams = 4; | 702 | nparams = 4; |
879 | nrc->pname = "getvt"; | 703 | pname = "getvt"; |
880 | ret = PQexecParams (plugin->dbh, | 704 | ret = PQexecParams (plugin->dbh, |
881 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", | 705 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", |
882 | 3, | 706 | 3, |
883 | NULL, | 707 | NULL, |
884 | nrc->paramValues, | 708 | paramValues, |
885 | nrc->paramLengths, | 709 | paramLengths, |
886 | paramFormats, 1); | 710 | paramFormats, 1); |
887 | } | 711 | } |
888 | else | 712 | else |
889 | { | 713 | { |
890 | nrc->paramValues[1] = (const char *) &nrc->btype; | 714 | paramValues[1] = (const char *) &btype; |
891 | nrc->paramLengths[1] = sizeof (nrc->btype); | 715 | paramLengths[1] = sizeof (btype); |
892 | nrc->paramValues[2] = (const char *) &nrc->blimit_off; | 716 | paramValues[2] = (const char *) &blimit_off; |
893 | nrc->paramLengths[2] = sizeof (nrc->blimit_off); | 717 | paramLengths[2] = sizeof (blimit_off); |
894 | nrc->nparams = 3; | 718 | nparams = 3; |
895 | nrc->pname = "gett"; | 719 | pname = "gett"; |
896 | ret = PQexecParams (plugin->dbh, | 720 | ret = PQexecParams (plugin->dbh, |
897 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", | 721 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", |
898 | 2, | 722 | 2, |
899 | NULL, | 723 | NULL, |
900 | nrc->paramValues, | 724 | paramValues, |
901 | nrc->paramLengths, | 725 | paramLengths, |
902 | paramFormats, 1); | 726 | paramFormats, 1); |
903 | } | 727 | } |
904 | } | 728 | } |
@@ -906,32 +730,32 @@ postgres_plugin_get (void *cls, | |||
906 | { | 730 | { |
907 | if (vhash != NULL) | 731 | if (vhash != NULL) |
908 | { | 732 | { |
909 | nrc->paramValues[1] = (const char *) &nrc->vhash; | 733 | paramValues[1] = (const char *) vhash; |
910 | nrc->paramLengths[1] = sizeof (nrc->vhash); | 734 | paramLengths[1] = sizeof (GNUNET_HashCode); |
911 | nrc->paramValues[2] = (const char *) &nrc->blimit_off; | 735 | paramValues[2] = (const char *) &blimit_off; |
912 | nrc->paramLengths[2] = sizeof (nrc->blimit_off); | 736 | paramLengths[2] = sizeof (blimit_off); |
913 | nrc->nparams = 3; | 737 | nparams = 3; |
914 | nrc->pname = "getv"; | 738 | pname = "getv"; |
915 | ret = PQexecParams (plugin->dbh, | 739 | ret = PQexecParams (plugin->dbh, |
916 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", | 740 | "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", |
917 | 2, | 741 | 2, |
918 | NULL, | 742 | NULL, |
919 | nrc->paramValues, | 743 | paramValues, |
920 | nrc->paramLengths, | 744 | paramLengths, |
921 | paramFormats, 1); | 745 | paramFormats, 1); |
922 | } | 746 | } |
923 | else | 747 | else |
924 | { | 748 | { |
925 | nrc->paramValues[1] = (const char *) &nrc->blimit_off; | 749 | paramValues[1] = (const char *) &blimit_off; |
926 | nrc->paramLengths[1] = sizeof (nrc->blimit_off); | 750 | paramLengths[1] = sizeof (blimit_off); |
927 | nrc->nparams = 2; | 751 | nparams = 2; |
928 | nrc->pname = "get"; | 752 | pname = "get"; |
929 | ret = PQexecParams (plugin->dbh, | 753 | ret = PQexecParams (plugin->dbh, |
930 | "SELECT count(*) FROM gn090 WHERE hash=$1", | 754 | "SELECT count(*) FROM gn090 WHERE hash=$1", |
931 | 1, | 755 | 1, |
932 | NULL, | 756 | NULL, |
933 | nrc->paramValues, | 757 | paramValues, |
934 | nrc->paramLengths, | 758 | paramLengths, |
935 | paramFormats, 1); | 759 | paramFormats, 1); |
936 | } | 760 | } |
937 | } | 761 | } |
@@ -939,13 +763,12 @@ postgres_plugin_get (void *cls, | |||
939 | ret, | 763 | ret, |
940 | PGRES_TUPLES_OK, | 764 | PGRES_TUPLES_OK, |
941 | "PQexecParams", | 765 | "PQexecParams", |
942 | nrc->pname, | 766 | pname, |
943 | __LINE__)) | 767 | __LINE__)) |
944 | { | 768 | { |
945 | iter (iter_cls, | 769 | proc (proc_cls, |
946 | NULL, NULL, 0, NULL, 0, 0, 0, | 770 | NULL, 0, NULL, 0, 0, 0, |
947 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 771 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
948 | GNUNET_free (nrc); | ||
949 | return; | 772 | return; |
950 | } | 773 | } |
951 | if ((PQntuples (ret) != 1) || | 774 | if ((PQntuples (ret) != 1) || |
@@ -954,26 +777,30 @@ postgres_plugin_get (void *cls, | |||
954 | { | 777 | { |
955 | GNUNET_break (0); | 778 | GNUNET_break (0); |
956 | PQclear (ret); | 779 | PQclear (ret); |
957 | iter (iter_cls, | 780 | proc (proc_cls, |
958 | NULL, NULL, 0, NULL, 0, 0, 0, | 781 | NULL, 0, NULL, 0, 0, 0, |
959 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 782 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
960 | GNUNET_free (nrc); | ||
961 | return; | 783 | return; |
962 | } | 784 | } |
963 | nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); | 785 | total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); |
964 | PQclear (ret); | 786 | PQclear (ret); |
965 | if (nrc->total == 0) | 787 | if (total == 0) |
966 | { | 788 | { |
967 | iter (iter_cls, | 789 | proc (proc_cls, |
968 | NULL, NULL, 0, NULL, 0, 0, 0, | 790 | NULL, 0, NULL, 0, 0, 0, |
969 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 791 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
970 | GNUNET_free (nrc); | ||
971 | return; | 792 | return; |
972 | } | 793 | } |
973 | nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | 794 | blimit_off = GNUNET_htonll (offset % total); |
974 | nrc->total); | 795 | ret = PQexecPrepared (plugin->dbh, |
975 | postgres_plugin_next_request (nrc, | 796 | pname, |
976 | GNUNET_NO); | 797 | nparams, |
798 | paramValues, | ||
799 | paramLengths, | ||
800 | paramFormats, 1); | ||
801 | process_result (plugin, | ||
802 | proc, proc_cls, | ||
803 | ret); | ||
977 | } | 804 | } |
978 | 805 | ||
979 | 806 | ||
@@ -989,28 +816,33 @@ postgres_plugin_get (void *cls, | |||
989 | * @param iter_cls closure for iter | 816 | * @param iter_cls closure for iter |
990 | */ | 817 | */ |
991 | static void | 818 | static void |
992 | postgres_plugin_iter_zero_anonymity (void *cls, | 819 | postgres_plugin_get_zero_anonymity (void *cls, |
993 | enum GNUNET_BLOCK_Type type, | 820 | uint64_t offset, |
994 | PluginIterator iter, | 821 | enum GNUNET_BLOCK_Type type, |
995 | void *iter_cls) | 822 | PluginDatumProcessor proc, void *proc_cls) |
996 | { | 823 | { |
997 | struct Plugin *plugin = cls; | 824 | struct Plugin *plugin = cls; |
998 | struct NextRequestClosure *nrc; | 825 | uint32_t btype; |
826 | uint64_t boff; | ||
827 | const int paramFormats[] = { 1, 1 }; | ||
828 | int paramLengths[] = { sizeof (btype), sizeof (boff) }; | ||
829 | const char *paramValues[] = { (const char*) &btype, (const char*) &boff }; | ||
830 | PGresult *ret; | ||
999 | 831 | ||
1000 | nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); | 832 | btype = htonl ((uint32_t) type); |
1001 | nrc->total = UINT32_MAX; | 833 | boff = GNUNET_htonll (offset); |
1002 | nrc->btype = htonl ((uint32_t) type); | 834 | ret = PQexecPrepared (plugin->dbh, |
1003 | nrc->plugin = plugin; | 835 | "select_non_anonymous", |
1004 | nrc->iter = iter; | 836 | 2, |
1005 | nrc->iter_cls = iter_cls; | 837 | paramValues, |
1006 | nrc->pname = "select_non_anonymous"; | 838 | paramLengths, |
1007 | nrc->nparams = 1; | 839 | paramFormats, 1); |
1008 | nrc->paramLengths[0] = sizeof (nrc->bcount); | 840 | process_result (plugin, |
1009 | nrc->paramValues[0] = (const char*) &nrc->bcount; | 841 | proc, proc_cls, |
1010 | postgres_plugin_next_request (nrc, | 842 | ret); |
1011 | GNUNET_NO); | ||
1012 | } | 843 | } |
1013 | 844 | ||
845 | |||
1014 | /** | 846 | /** |
1015 | * Context for 'repl_iter' function. | 847 | * Context for 'repl_iter' function. |
1016 | */ | 848 | */ |
@@ -1025,12 +857,12 @@ struct ReplCtx | |||
1025 | /** | 857 | /** |
1026 | * Function to call for the result (or the NULL). | 858 | * Function to call for the result (or the NULL). |
1027 | */ | 859 | */ |
1028 | PluginIterator iter; | 860 | PluginDatumProcessor proc; |
1029 | 861 | ||
1030 | /** | 862 | /** |
1031 | * Closure for iter. | 863 | * Closure for proc. |
1032 | */ | 864 | */ |
1033 | void *iter_cls; | 865 | void *proc_cls; |
1034 | }; | 866 | }; |
1035 | 867 | ||
1036 | 868 | ||
@@ -1056,8 +888,7 @@ struct ReplCtx | |||
1056 | * GNUNET_NO to delete the item and continue (if supported) | 888 | * GNUNET_NO to delete the item and continue (if supported) |
1057 | */ | 889 | */ |
1058 | static int | 890 | static int |
1059 | repl_iter (void *cls, | 891 | repl_proc (void *cls, |
1060 | void *next_cls, | ||
1061 | const GNUNET_HashCode *key, | 892 | const GNUNET_HashCode *key, |
1062 | uint32_t size, | 893 | uint32_t size, |
1063 | const void *data, | 894 | const void *data, |
@@ -1073,8 +904,8 @@ repl_iter (void *cls, | |||
1073 | PGresult *qret; | 904 | PGresult *qret; |
1074 | uint32_t boid; | 905 | uint32_t boid; |
1075 | 906 | ||
1076 | ret = rc->iter (rc->iter_cls, | 907 | ret = rc->proc (rc->proc_cls, |
1077 | next_cls, key, | 908 | key, |
1078 | size, data, | 909 | size, data, |
1079 | type, priority, anonymity, expiration, | 910 | type, priority, anonymity, expiration, |
1080 | uid); | 911 | uid); |
@@ -1107,32 +938,30 @@ repl_iter (void *cls, | |||
1107 | * Get a random item for replication. Returns a single, not expired, random item | 938 | * Get a random item for replication. Returns a single, not expired, random item |
1108 | * from those with the highest replication counters. The item's | 939 | * from those with the highest replication counters. The item's |
1109 | * replication counter is decremented by one IF it was positive before. | 940 | * replication counter is decremented by one IF it was positive before. |
1110 | * Call 'iter' with all values ZERO or NULL if the datastore is empty. | 941 | * Call 'proc' with all values ZERO or NULL if the datastore is empty. |
1111 | * | 942 | * |
1112 | * @param cls closure | 943 | * @param cls closure |
1113 | * @param iter function to call the value (once only). | 944 | * @param proc function to call the value (once only). |
1114 | * @param iter_cls closure for iter | 945 | * @param proc_cls closure for iter |
1115 | */ | 946 | */ |
1116 | static void | 947 | static void |
1117 | postgres_plugin_replication_get (void *cls, | 948 | postgres_plugin_get_replication (void *cls, |
1118 | PluginIterator iter, void *iter_cls) | 949 | PluginDatumProcessor proc, void *proc_cls) |
1119 | { | 950 | { |
1120 | struct Plugin *plugin = cls; | 951 | struct Plugin *plugin = cls; |
1121 | struct NextRequestClosure *nrc; | ||
1122 | struct ReplCtx rc; | 952 | struct ReplCtx rc; |
953 | PGresult *ret; | ||
1123 | 954 | ||
1124 | rc.plugin = plugin; | 955 | rc.plugin = plugin; |
1125 | rc.iter = iter; | 956 | rc.proc = proc; |
1126 | rc.iter_cls = iter_cls; | 957 | rc.proc_cls = proc_cls; |
1127 | nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); | 958 | ret = PQexecPrepared (plugin->dbh, |
1128 | nrc->one_shot = GNUNET_YES; | 959 | "select_replication_order", |
1129 | nrc->total = 1; | 960 | 0, |
1130 | nrc->plugin = plugin; | 961 | NULL, NULL, NULL, 1); |
1131 | nrc->iter = &repl_iter; | 962 | process_result (plugin, |
1132 | nrc->iter_cls = &rc; | 963 | &repl_proc, &rc, |
1133 | nrc->pname = "select_replication_order"; | 964 | ret); |
1134 | nrc->nparams = 0; | ||
1135 | postgres_next_request_cont (nrc, NULL); | ||
1136 | } | 965 | } |
1137 | 966 | ||
1138 | 967 | ||
@@ -1141,29 +970,31 @@ postgres_plugin_replication_get (void *cls, | |||
1141 | * Call 'iter' with all values ZERO or NULL if the datastore is empty. | 970 | * Call 'iter' with all values ZERO or NULL if the datastore is empty. |
1142 | * | 971 | * |
1143 | * @param cls closure | 972 | * @param cls closure |
1144 | * @param iter function to call the value (once only). | 973 | * @param proc function to call the value (once only). |
1145 | * @param iter_cls closure for iter | 974 | * @param proc_cls closure for iter |
1146 | */ | 975 | */ |
1147 | static void | 976 | static void |
1148 | postgres_plugin_expiration_get (void *cls, | 977 | postgres_plugin_get_expiration (void *cls, |
1149 | PluginIterator iter, void *iter_cls) | 978 | PluginDatumProcessor proc, void *proc_cls) |
1150 | { | 979 | { |
1151 | struct Plugin *plugin = cls; | 980 | struct Plugin *plugin = cls; |
1152 | struct NextRequestClosure *nrc; | ||
1153 | uint64_t btime; | 981 | uint64_t btime; |
982 | const int paramFormats[] = { 1 }; | ||
983 | int paramLengths[] = { sizeof (btime) }; | ||
984 | const char *paramValues[] = { (const char*) &btime }; | ||
985 | PGresult *ret; | ||
1154 | 986 | ||
1155 | btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value); | 987 | btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value); |
1156 | nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); | 988 | ret = PQexecPrepared (plugin->dbh, |
1157 | nrc->one_shot = GNUNET_YES; | 989 | "select_expiration_order", |
1158 | nrc->total = 1; | 990 | 1, |
1159 | nrc->plugin = plugin; | 991 | paramValues, |
1160 | nrc->iter = iter; | 992 | paramLengths, |
1161 | nrc->iter_cls = iter_cls; | 993 | paramFormats, |
1162 | nrc->pname = "select_expiration_order"; | 994 | 1); |
1163 | nrc->nparams = 1; | 995 | process_result (plugin, |
1164 | nrc->paramValues[0] = (const char *) &btime; | 996 | proc, proc_cls, |
1165 | nrc->paramLengths[0] = sizeof (btime); | 997 | ret); |
1166 | postgres_next_request_cont (nrc, NULL); | ||
1167 | } | 998 | } |
1168 | 999 | ||
1169 | 1000 | ||
@@ -1260,14 +1091,13 @@ libgnunet_plugin_datastore_postgres_init (void *cls) | |||
1260 | } | 1091 | } |
1261 | api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); | 1092 | api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); |
1262 | api->cls = plugin; | 1093 | api->cls = plugin; |
1263 | api->get_size = &postgres_plugin_get_size; | 1094 | api->estimate_size = &postgres_plugin_estimate_size; |
1264 | api->put = &postgres_plugin_put; | 1095 | api->put = &postgres_plugin_put; |
1265 | api->next_request = &postgres_plugin_next_request; | ||
1266 | api->get = &postgres_plugin_get; | ||
1267 | api->replication_get = &postgres_plugin_replication_get; | ||
1268 | api->expiration_get = &postgres_plugin_expiration_get; | ||
1269 | api->update = &postgres_plugin_update; | 1096 | api->update = &postgres_plugin_update; |
1270 | api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity; | 1097 | api->get_key = &postgres_plugin_get_key; |
1098 | api->get_replication = &postgres_plugin_get_replication; | ||
1099 | api->get_expiration = &postgres_plugin_get_expiration; | ||
1100 | api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity; | ||
1271 | api->drop = &postgres_plugin_drop; | 1101 | api->drop = &postgres_plugin_drop; |
1272 | GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, | 1102 | GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, |
1273 | "datastore-postgres", | 1103 | "datastore-postgres", |
@@ -1287,13 +1117,6 @@ libgnunet_plugin_datastore_postgres_done (void *cls) | |||
1287 | struct GNUNET_DATASTORE_PluginFunctions *api = cls; | 1117 | struct GNUNET_DATASTORE_PluginFunctions *api = cls; |
1288 | struct Plugin *plugin = api->cls; | 1118 | struct Plugin *plugin = api->cls; |
1289 | 1119 | ||
1290 | if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) | ||
1291 | { | ||
1292 | GNUNET_SCHEDULER_cancel (plugin->next_task); | ||
1293 | plugin->next_task = GNUNET_SCHEDULER_NO_TASK; | ||
1294 | GNUNET_free (plugin->next_task_nc); | ||
1295 | plugin->next_task_nc = NULL; | ||
1296 | } | ||
1297 | PQfinish (plugin->dbh); | 1120 | PQfinish (plugin->dbh); |
1298 | GNUNET_free (plugin); | 1121 | GNUNET_free (plugin); |
1299 | GNUNET_free (api); | 1122 | GNUNET_free (api); |