diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-06-01 21:48:19 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-06-01 21:48:19 +0200 |
commit | 1defd30dfeb1867c2756b3fe6a437f695951d0c9 (patch) | |
tree | b48c0fe6bb32469cfcb4284bfac3142e22417ae8 /src/psycstore | |
parent | bbbe0b2404d131cc0d9eda26725b65b47a7e073a (diff) | |
download | gnunet-1defd30dfeb1867c2756b3fe6a437f695951d0c9.tar.gz gnunet-1defd30dfeb1867c2756b3fe6a437f695951d0c9.zip |
adding more good helpers to libgnunetpq
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/plugin_psycstore_postgres.c | 219 |
1 files changed, 104 insertions, 115 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c index 273ab4e80..f410e2737 100644 --- a/src/psycstore/plugin_psycstore_postgres.c +++ b/src/psycstore/plugin_psycstore_postgres.c | |||
@@ -84,117 +84,96 @@ struct Plugin | |||
84 | * as needed as well). | 84 | * as needed as well). |
85 | * | 85 | * |
86 | * @param plugin the plugin context (state for this module) | 86 | * @param plugin the plugin context (state for this module) |
87 | * @return GNUNET_OK on success | 87 | * @return #GNUNET_OK on success |
88 | */ | 88 | */ |
89 | static int | 89 | static int |
90 | database_setup (struct Plugin *plugin) | 90 | database_setup (struct Plugin *plugin) |
91 | { | 91 | { |
92 | struct GNUNET_PQ_ExecuteStatement es[] = { | ||
93 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n" | ||
94 | " id SERIAL,\n" | ||
95 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
96 | " max_state_message_id BIGINT,\n" | ||
97 | " state_hash_message_id BIGINT,\n" | ||
98 | " PRIMARY KEY(id)\n" | ||
99 | ")" | ||
100 | "WITH OIDS"), | ||
101 | GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" | ||
102 | " ON channels (pub_key)"), | ||
103 | GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" | ||
104 | " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
105 | "RETURNS NULL ON NULL INPUT"), | ||
106 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n" | ||
107 | " id SERIAL,\n" | ||
108 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
109 | " PRIMARY KEY(id)\n" | ||
110 | ")" | ||
111 | "WITH OIDS"), | ||
112 | GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" | ||
113 | " ON slaves (pub_key)"), | ||
114 | GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" | ||
115 | " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
116 | "RETURNS NULL ON NULL INPUT"), | ||
117 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n" | ||
118 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
119 | " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" | ||
120 | " did_join INT NOT NULL,\n" | ||
121 | " announced_at BIGINT NOT NULL,\n" | ||
122 | " effective_since BIGINT NOT NULL,\n" | ||
123 | " group_generation BIGINT NOT NULL\n" | ||
124 | ")" | ||
125 | "WITH OIDS"), | ||
126 | GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " | ||
127 | "ON membership (channel_id, slave_id)"), | ||
128 | /** @todo messages table: add method_name column */ | ||
129 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n" | ||
130 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
131 | " hop_counter INT NOT NULL,\n" | ||
132 | " signature BYTEA CHECK (LENGTH(signature)=64),\n" | ||
133 | " purpose BYTEA CHECK (LENGTH(purpose)=8),\n" | ||
134 | " fragment_id BIGINT NOT NULL,\n" | ||
135 | " fragment_offset BIGINT NOT NULL,\n" | ||
136 | " message_id BIGINT NOT NULL,\n" | ||
137 | " group_generation BIGINT NOT NULL,\n" | ||
138 | " multicast_flags INT NOT NULL,\n" | ||
139 | " psycstore_flags INT NOT NULL,\n" | ||
140 | " data BYTEA,\n" | ||
141 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
142 | " UNIQUE (channel_id, message_id, fragment_offset)\n" | ||
143 | ")" | ||
144 | "WITH OIDS"), | ||
145 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n" | ||
146 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
147 | " name TEXT NOT NULL,\n" | ||
148 | " value_current BYTEA,\n" | ||
149 | " value_signed BYTEA,\n" | ||
150 | " PRIMARY KEY (channel_id, name)\n" | ||
151 | ")" | ||
152 | "WITH OIDS"), | ||
153 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
154 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
155 | " name TEXT NOT NULL,\n" | ||
156 | " value BYTEA,\n" | ||
157 | " PRIMARY KEY (channel_id, name)\n" | ||
158 | ")" | ||
159 | "WITH OIDS"), | ||
160 | GNUNET_PQ_EXECUTE_STATEMENT_END | ||
161 | }; | ||
162 | |||
92 | /* Open database and precompile statements */ | 163 | /* Open database and precompile statements */ |
93 | plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg, | 164 | plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg, |
94 | "psycstore-postgres"); | 165 | "psycstore-postgres"); |
95 | if (NULL == plugin->dbh) | 166 | if (NULL == plugin->dbh) |
96 | return GNUNET_SYSERR; | 167 | return GNUNET_SYSERR; |
97 | 168 | if (GNUNET_OK != | |
98 | /* Create tables */ | 169 | GNUNET_PQ_exec_statements (plugin->dbh, |
99 | if ((GNUNET_OK != | 170 | es)) |
100 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
101 | "CREATE TABLE IF NOT EXISTS channels (\n" | ||
102 | " id SERIAL,\n" | ||
103 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
104 | " max_state_message_id BIGINT,\n" | ||
105 | " state_hash_message_id BIGINT,\n" | ||
106 | " PRIMARY KEY(id)\n" | ||
107 | ")" "WITH OIDS")) || | ||
108 | |||
109 | (GNUNET_OK != | ||
110 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
111 | "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" | ||
112 | " ON channels (pub_key)")) || | ||
113 | |||
114 | (GNUNET_OK != | ||
115 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
116 | "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" | ||
117 | " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
118 | "RETURNS NULL ON NULL INPUT")) || | ||
119 | |||
120 | (GNUNET_OK != | ||
121 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
122 | "CREATE TABLE IF NOT EXISTS slaves (\n" | ||
123 | " id SERIAL,\n" | ||
124 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
125 | " PRIMARY KEY(id)\n" | ||
126 | ")" "WITH OIDS")) || | ||
127 | |||
128 | (GNUNET_OK != | ||
129 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
130 | "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" | ||
131 | " ON slaves (pub_key)")) || | ||
132 | |||
133 | (GNUNET_OK != | ||
134 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
135 | "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" | ||
136 | " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
137 | "RETURNS NULL ON NULL INPUT")) || | ||
138 | |||
139 | (GNUNET_OK != | ||
140 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
141 | "CREATE TABLE IF NOT EXISTS membership (\n" | ||
142 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
143 | " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" | ||
144 | " did_join INT NOT NULL,\n" | ||
145 | " announced_at BIGINT NOT NULL,\n" | ||
146 | " effective_since BIGINT NOT NULL,\n" | ||
147 | " group_generation BIGINT NOT NULL\n" | ||
148 | ")" "WITH OIDS")) || | ||
149 | |||
150 | (GNUNET_OK != | ||
151 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
152 | "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " | ||
153 | "ON membership (channel_id, slave_id)")) || | ||
154 | |||
155 | /** @todo messages table: add method_name column */ | ||
156 | (GNUNET_OK != | ||
157 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
158 | "CREATE TABLE IF NOT EXISTS messages (\n" | ||
159 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
160 | " hop_counter INT NOT NULL,\n" | ||
161 | " signature BYTEA CHECK (LENGTH(signature)=64),\n" | ||
162 | " purpose BYTEA CHECK (LENGTH(purpose)=8),\n" | ||
163 | " fragment_id BIGINT NOT NULL,\n" | ||
164 | " fragment_offset BIGINT NOT NULL,\n" | ||
165 | " message_id BIGINT NOT NULL,\n" | ||
166 | " group_generation BIGINT NOT NULL,\n" | ||
167 | " multicast_flags INT NOT NULL,\n" | ||
168 | " psycstore_flags INT NOT NULL,\n" | ||
169 | " data BYTEA,\n" | ||
170 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
171 | " UNIQUE (channel_id, message_id, fragment_offset)\n" | ||
172 | ")" "WITH OIDS")) || | ||
173 | |||
174 | (GNUNET_OK != | ||
175 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
176 | "CREATE TABLE IF NOT EXISTS state (\n" | ||
177 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
178 | " name TEXT NOT NULL,\n" | ||
179 | " value_current BYTEA,\n" | ||
180 | " value_signed BYTEA,\n" | ||
181 | " PRIMARY KEY (channel_id, name)\n" | ||
182 | ")" "WITH OIDS")) || | ||
183 | (GNUNET_OK != | ||
184 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
185 | "CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
186 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
187 | " name TEXT NOT NULL,\n" | ||
188 | " value BYTEA,\n" | ||
189 | " PRIMARY KEY (channel_id, name)\n" | ||
190 | ")" "WITH OIDS"))) | ||
191 | { | 171 | { |
192 | PQfinish (plugin->dbh); | 172 | PQfinish (plugin->dbh); |
193 | plugin->dbh = NULL; | 173 | plugin->dbh = NULL; |
194 | return GNUNET_SYSERR; | 174 | return GNUNET_SYSERR; |
195 | } | 175 | } |
196 | 176 | ||
197 | |||
198 | /* Prepare statements */ | 177 | /* Prepare statements */ |
199 | if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 178 | if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, |
200 | "transaction_begin", | 179 | "transaction_begin", |
@@ -842,7 +821,6 @@ fragment_row (struct Plugin *plugin, | |||
842 | void *purpose = NULL; | 821 | void *purpose = NULL; |
843 | size_t signature_size; | 822 | size_t signature_size; |
844 | size_t purpose_size; | 823 | size_t purpose_size; |
845 | |||
846 | uint64_t fragment_id; | 824 | uint64_t fragment_id; |
847 | uint64_t fragment_offset; | 825 | uint64_t fragment_offset; |
848 | uint64_t message_id; | 826 | uint64_t message_id; |
@@ -852,9 +830,7 @@ fragment_row (struct Plugin *plugin, | |||
852 | size_t buf_size; | 830 | size_t buf_size; |
853 | int ret = GNUNET_SYSERR; | 831 | int ret = GNUNET_SYSERR; |
854 | struct GNUNET_MULTICAST_MessageHeader *mp; | 832 | struct GNUNET_MULTICAST_MessageHeader *mp; |
855 | |||
856 | uint32_t msg_flags; | 833 | uint32_t msg_flags; |
857 | |||
858 | struct GNUNET_PQ_ResultSpec results[] = { | 834 | struct GNUNET_PQ_ResultSpec results[] = { |
859 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | 835 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), |
860 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | 836 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), |
@@ -964,8 +940,6 @@ fragment_get (void *cls, | |||
964 | void *cb_cls) | 940 | void *cb_cls) |
965 | { | 941 | { |
966 | struct Plugin *plugin = cls; | 942 | struct Plugin *plugin = cls; |
967 | *returned_fragments = 0; | ||
968 | |||
969 | struct GNUNET_PQ_QueryParam params_select[] = { | 943 | struct GNUNET_PQ_QueryParam params_select[] = { |
970 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 944 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
971 | GNUNET_PQ_query_param_uint64 (&first_fragment_id), | 945 | GNUNET_PQ_query_param_uint64 (&first_fragment_id), |
@@ -973,7 +947,12 @@ fragment_get (void *cls, | |||
973 | GNUNET_PQ_query_param_end | 947 | GNUNET_PQ_query_param_end |
974 | }; | 948 | }; |
975 | 949 | ||
976 | return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls); | 950 | *returned_fragments = 0; |
951 | return fragment_select (plugin, | ||
952 | "select_fragments", | ||
953 | params_select, | ||
954 | returned_fragments, | ||
955 | cb, cb_cls); | ||
977 | } | 956 | } |
978 | 957 | ||
979 | 958 | ||
@@ -1002,7 +981,11 @@ fragment_get_latest (void *cls, | |||
1002 | GNUNET_PQ_query_param_end | 981 | GNUNET_PQ_query_param_end |
1003 | }; | 982 | }; |
1004 | 983 | ||
1005 | return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls); | 984 | return fragment_select (plugin, |
985 | "select_latest_fragments", | ||
986 | params_select, | ||
987 | returned_fragments, | ||
988 | cb, cb_cls); | ||
1006 | } | 989 | } |
1007 | 990 | ||
1008 | 991 | ||
@@ -1024,11 +1007,6 @@ message_get (void *cls, | |||
1024 | void *cb_cls) | 1007 | void *cb_cls) |
1025 | { | 1008 | { |
1026 | struct Plugin *plugin = cls; | 1009 | struct Plugin *plugin = cls; |
1027 | *returned_fragments = 0; | ||
1028 | |||
1029 | if (0 == fragment_limit) | ||
1030 | fragment_limit = INT64_MAX; | ||
1031 | |||
1032 | struct GNUNET_PQ_QueryParam params_select[] = { | 1010 | struct GNUNET_PQ_QueryParam params_select[] = { |
1033 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1011 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1034 | GNUNET_PQ_query_param_uint64 (&first_message_id), | 1012 | GNUNET_PQ_query_param_uint64 (&first_message_id), |
@@ -1037,7 +1015,14 @@ message_get (void *cls, | |||
1037 | GNUNET_PQ_query_param_end | 1015 | GNUNET_PQ_query_param_end |
1038 | }; | 1016 | }; |
1039 | 1017 | ||
1040 | return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls); | 1018 | if (0 == fragment_limit) |
1019 | fragment_limit = INT64_MAX; | ||
1020 | *returned_fragments = 0; | ||
1021 | return fragment_select (plugin, | ||
1022 | "select_messages", | ||
1023 | params_select, | ||
1024 | returned_fragments, | ||
1025 | cb, cb_cls); | ||
1041 | } | 1026 | } |
1042 | 1027 | ||
1043 | 1028 | ||
@@ -1057,8 +1042,6 @@ message_get_latest (void *cls, | |||
1057 | void *cb_cls) | 1042 | void *cb_cls) |
1058 | { | 1043 | { |
1059 | struct Plugin *plugin = cls; | 1044 | struct Plugin *plugin = cls; |
1060 | *returned_fragments = 0; | ||
1061 | |||
1062 | struct GNUNET_PQ_QueryParam params_select[] = { | 1045 | struct GNUNET_PQ_QueryParam params_select[] = { |
1063 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1046 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1064 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1047 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
@@ -1066,7 +1049,12 @@ message_get_latest (void *cls, | |||
1066 | GNUNET_PQ_query_param_end | 1049 | GNUNET_PQ_query_param_end |
1067 | }; | 1050 | }; |
1068 | 1051 | ||
1069 | return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls); | 1052 | *returned_fragments = 0; |
1053 | return fragment_select (plugin, | ||
1054 | "select_latest_messages", | ||
1055 | params_select, | ||
1056 | returned_fragments, | ||
1057 | cb, cb_cls); | ||
1070 | } | 1058 | } |
1071 | 1059 | ||
1072 | 1060 | ||
@@ -1255,7 +1243,8 @@ state_assign (struct Plugin *plugin, const char *stmt, | |||
1255 | 1243 | ||
1256 | 1244 | ||
1257 | static int | 1245 | static int |
1258 | update_message_id (struct Plugin *plugin, const char *stmt, | 1246 | update_message_id (struct Plugin *plugin, |
1247 | const char *stmt, | ||
1259 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1248 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1260 | uint64_t message_id) | 1249 | uint64_t message_id) |
1261 | { | 1250 | { |