diff options
Diffstat (limited to 'src/psycstore/plugin_psycstore_postgres.c')
-rw-r--r-- | src/psycstore/plugin_psycstore_postgres.c | 1198 |
1 files changed, 489 insertions, 709 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c index 273ab4e80..b8010af0a 100644 --- a/src/psycstore/plugin_psycstore_postgres.c +++ b/src/psycstore/plugin_psycstore_postgres.c | |||
@@ -25,6 +25,7 @@ | |||
25 | * @author Gabor X Toth | 25 | * @author Gabor X Toth |
26 | * @author Christian Grothoff | 26 | * @author Christian Grothoff |
27 | * @author Christophe Genevey | 27 | * @author Christophe Genevey |
28 | * @author Jeffrey Burdges | ||
28 | */ | 29 | */ |
29 | 30 | ||
30 | #include "platform.h" | 31 | #include "platform.h" |
@@ -34,7 +35,6 @@ | |||
34 | #include "gnunet_crypto_lib.h" | 35 | #include "gnunet_crypto_lib.h" |
35 | #include "gnunet_psyc_util_lib.h" | 36 | #include "gnunet_psyc_util_lib.h" |
36 | #include "psycstore.h" | 37 | #include "psycstore.h" |
37 | #include "gnunet_postgres_lib.h" | ||
38 | #include "gnunet_pq_lib.h" | 38 | #include "gnunet_pq_lib.h" |
39 | 39 | ||
40 | /** | 40 | /** |
@@ -84,342 +84,276 @@ struct Plugin | |||
84 | * as needed as well). | 84 | * as needed as well). |
85 | * | 85 | * |
86 | * @param plugin the plugin context (state for this module) | 86 | * @param plugin the plugin context (state for this module) |
87 | * @return GNUNET_OK on success | 87 | * @return #GNUNET_OK on success |
88 | */ | 88 | */ |
89 | static int | 89 | static int |
90 | database_setup (struct Plugin *plugin) | 90 | database_setup (struct Plugin *plugin) |
91 | { | 91 | { |
92 | struct GNUNET_PQ_ExecuteStatement es[] = { | ||
93 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n" | ||
94 | " id SERIAL,\n" | ||
95 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
96 | " max_state_message_id BIGINT,\n" | ||
97 | " state_hash_message_id BIGINT,\n" | ||
98 | " PRIMARY KEY(id)\n" | ||
99 | ")" | ||
100 | "WITH OIDS"), | ||
101 | GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" | ||
102 | " ON channels (pub_key)"), | ||
103 | GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" | ||
104 | " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
105 | "RETURNS NULL ON NULL INPUT"), | ||
106 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n" | ||
107 | " id SERIAL,\n" | ||
108 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
109 | " PRIMARY KEY(id)\n" | ||
110 | ")" | ||
111 | "WITH OIDS"), | ||
112 | GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" | ||
113 | " ON slaves (pub_key)"), | ||
114 | GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" | ||
115 | " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
116 | "RETURNS NULL ON NULL INPUT"), | ||
117 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n" | ||
118 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
119 | " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" | ||
120 | " did_join INT NOT NULL,\n" | ||
121 | " announced_at BIGINT NOT NULL,\n" | ||
122 | " effective_since BIGINT NOT NULL,\n" | ||
123 | " group_generation BIGINT NOT NULL\n" | ||
124 | ")" | ||
125 | "WITH OIDS"), | ||
126 | GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " | ||
127 | "ON membership (channel_id, slave_id)"), | ||
128 | /** @todo messages table: add method_name column */ | ||
129 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n" | ||
130 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
131 | " hop_counter INT NOT NULL,\n" | ||
132 | " signature BYTEA CHECK (LENGTH(signature)=64),\n" | ||
133 | " purpose BYTEA CHECK (LENGTH(purpose)=8),\n" | ||
134 | " fragment_id BIGINT NOT NULL,\n" | ||
135 | " fragment_offset BIGINT NOT NULL,\n" | ||
136 | " message_id BIGINT NOT NULL,\n" | ||
137 | " group_generation BIGINT NOT NULL,\n" | ||
138 | " multicast_flags INT NOT NULL,\n" | ||
139 | " psycstore_flags INT NOT NULL,\n" | ||
140 | " data BYTEA,\n" | ||
141 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
142 | " UNIQUE (channel_id, message_id, fragment_offset)\n" | ||
143 | ")" | ||
144 | "WITH OIDS"), | ||
145 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n" | ||
146 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
147 | " name TEXT NOT NULL,\n" | ||
148 | " value_current BYTEA,\n" | ||
149 | " value_signed BYTEA,\n" | ||
150 | " PRIMARY KEY (channel_id, name)\n" | ||
151 | ")" | ||
152 | "WITH OIDS"), | ||
153 | GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
154 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
155 | " name TEXT NOT NULL,\n" | ||
156 | " value BYTEA,\n" | ||
157 | " PRIMARY KEY (channel_id, name)\n" | ||
158 | ")" | ||
159 | "WITH OIDS"), | ||
160 | GNUNET_PQ_EXECUTE_STATEMENT_END | ||
161 | }; | ||
162 | |||
92 | /* Open database and precompile statements */ | 163 | /* Open database and precompile statements */ |
93 | plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg, | 164 | plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg, |
94 | "psycstore-postgres"); | 165 | "psycstore-postgres"); |
95 | if (NULL == plugin->dbh) | 166 | if (NULL == plugin->dbh) |
96 | return GNUNET_SYSERR; | 167 | return GNUNET_SYSERR; |
97 | 168 | if (GNUNET_OK != | |
98 | /* Create tables */ | 169 | GNUNET_PQ_exec_statements (plugin->dbh, |
99 | if ((GNUNET_OK != | 170 | es)) |
100 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
101 | "CREATE TABLE IF NOT EXISTS channels (\n" | ||
102 | " id SERIAL,\n" | ||
103 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
104 | " max_state_message_id BIGINT,\n" | ||
105 | " state_hash_message_id BIGINT,\n" | ||
106 | " PRIMARY KEY(id)\n" | ||
107 | ")" "WITH OIDS")) || | ||
108 | |||
109 | (GNUNET_OK != | ||
110 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
111 | "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" | ||
112 | " ON channels (pub_key)")) || | ||
113 | |||
114 | (GNUNET_OK != | ||
115 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
116 | "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" | ||
117 | " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
118 | "RETURNS NULL ON NULL INPUT")) || | ||
119 | |||
120 | (GNUNET_OK != | ||
121 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
122 | "CREATE TABLE IF NOT EXISTS slaves (\n" | ||
123 | " id SERIAL,\n" | ||
124 | " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" | ||
125 | " PRIMARY KEY(id)\n" | ||
126 | ")" "WITH OIDS")) || | ||
127 | |||
128 | (GNUNET_OK != | ||
129 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
130 | "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" | ||
131 | " ON slaves (pub_key)")) || | ||
132 | |||
133 | (GNUNET_OK != | ||
134 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
135 | "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" | ||
136 | " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" | ||
137 | "RETURNS NULL ON NULL INPUT")) || | ||
138 | |||
139 | (GNUNET_OK != | ||
140 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
141 | "CREATE TABLE IF NOT EXISTS membership (\n" | ||
142 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
143 | " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" | ||
144 | " did_join INT NOT NULL,\n" | ||
145 | " announced_at BIGINT NOT NULL,\n" | ||
146 | " effective_since BIGINT NOT NULL,\n" | ||
147 | " group_generation BIGINT NOT NULL\n" | ||
148 | ")" "WITH OIDS")) || | ||
149 | |||
150 | (GNUNET_OK != | ||
151 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
152 | "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " | ||
153 | "ON membership (channel_id, slave_id)")) || | ||
154 | |||
155 | /** @todo messages table: add method_name column */ | ||
156 | (GNUNET_OK != | ||
157 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
158 | "CREATE TABLE IF NOT EXISTS messages (\n" | ||
159 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
160 | " hop_counter INT NOT NULL,\n" | ||
161 | " signature BYTEA CHECK (LENGTH(signature)=64),\n" | ||
162 | " purpose BYTEA CHECK (LENGTH(purpose)=8),\n" | ||
163 | " fragment_id BIGINT NOT NULL,\n" | ||
164 | " fragment_offset BIGINT NOT NULL,\n" | ||
165 | " message_id BIGINT NOT NULL,\n" | ||
166 | " group_generation BIGINT NOT NULL,\n" | ||
167 | " multicast_flags INT NOT NULL,\n" | ||
168 | " psycstore_flags INT NOT NULL,\n" | ||
169 | " data BYTEA,\n" | ||
170 | " PRIMARY KEY (channel_id, fragment_id),\n" | ||
171 | " UNIQUE (channel_id, message_id, fragment_offset)\n" | ||
172 | ")" "WITH OIDS")) || | ||
173 | |||
174 | (GNUNET_OK != | ||
175 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
176 | "CREATE TABLE IF NOT EXISTS state (\n" | ||
177 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
178 | " name TEXT NOT NULL,\n" | ||
179 | " value_current BYTEA,\n" | ||
180 | " value_signed BYTEA,\n" | ||
181 | " PRIMARY KEY (channel_id, name)\n" | ||
182 | ")" "WITH OIDS")) || | ||
183 | (GNUNET_OK != | ||
184 | GNUNET_POSTGRES_exec(plugin->dbh, | ||
185 | "CREATE TABLE IF NOT EXISTS state_sync (\n" | ||
186 | " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" | ||
187 | " name TEXT NOT NULL,\n" | ||
188 | " value BYTEA,\n" | ||
189 | " PRIMARY KEY (channel_id, name)\n" | ||
190 | ")" "WITH OIDS"))) | ||
191 | { | 171 | { |
192 | PQfinish (plugin->dbh); | 172 | PQfinish (plugin->dbh); |
193 | plugin->dbh = NULL; | 173 | plugin->dbh = NULL; |
194 | return GNUNET_SYSERR; | 174 | return GNUNET_SYSERR; |
195 | } | 175 | } |
196 | 176 | ||
197 | |||
198 | /* Prepare statements */ | 177 | /* Prepare statements */ |
199 | if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 178 | { |
200 | "transaction_begin", | 179 | struct GNUNET_PQ_PreparedStatement ps[] = { |
201 | "BEGIN", 0)) || | 180 | GNUNET_PQ_make_prepare ("transaction_begin", |
202 | 181 | "BEGIN", 0), | |
203 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 182 | GNUNET_PQ_make_prepare ("transaction_commit", |
204 | "transaction_commit", | 183 | "COMMIT", 0), |
205 | "COMMIT", 0)) || | 184 | GNUNET_PQ_make_prepare ("transaction_rollback", |
206 | 185 | "ROLLBACK", 0), | |
207 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 186 | GNUNET_PQ_make_prepare ("insert_channel_key", |
208 | "transaction_rollback", | 187 | "INSERT INTO channels (pub_key) VALUES ($1)" |
209 | "ROLLBACK", 0)) || | 188 | " ON CONFLICT DO NOTHING", 1), |
210 | 189 | GNUNET_PQ_make_prepare ("insert_slave_key", | |
211 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 190 | "INSERT INTO slaves (pub_key) VALUES ($1)" |
212 | "insert_channel_key", | 191 | " ON CONFLICT DO NOTHING", 1), |
213 | "INSERT INTO channels (pub_key) VALUES ($1)" | 192 | GNUNET_PQ_make_prepare ("insert_membership", |
214 | " ON CONFLICT DO NOTHING", 1)) || | 193 | "INSERT INTO membership\n" |
215 | 194 | " (channel_id, slave_id, did_join, announced_at,\n" | |
216 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 195 | " effective_since, group_generation)\n" |
217 | "insert_slave_key", | 196 | "VALUES (get_chan_id($1),\n" |
218 | "INSERT INTO slaves (pub_key) VALUES ($1)" | 197 | " get_slave_id($2),\n" |
219 | " ON CONFLICT DO NOTHING", 1)) || | 198 | " $3, $4, $5, $6)", 6), |
220 | 199 | GNUNET_PQ_make_prepare ("select_membership", | |
221 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 200 | "SELECT did_join FROM membership\n" |
222 | "insert_membership", | 201 | "WHERE channel_id = get_chan_id($1)\n" |
223 | "INSERT INTO membership\n" | 202 | " AND slave_id = get_slave_id($2)\n" |
224 | " (channel_id, slave_id, did_join, announced_at,\n" | 203 | " AND effective_since <= $3 AND did_join = 1\n" |
225 | " effective_since, group_generation)\n" | 204 | "ORDER BY announced_at DESC LIMIT 1", 3), |
226 | "VALUES (get_chan_id($1),\n" | 205 | GNUNET_PQ_make_prepare ("insert_fragment", |
227 | " get_slave_id($2),\n" | 206 | "INSERT INTO messages\n" |
228 | " $3, $4, $5, $6)", 6)) || | 207 | " (channel_id, hop_counter, signature, purpose,\n" |
229 | 208 | " fragment_id, fragment_offset, message_id,\n" | |
230 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 209 | " group_generation, multicast_flags, psycstore_flags, data)\n" |
231 | "select_membership", | 210 | "VALUES (get_chan_id($1),\n" |
232 | "SELECT did_join FROM membership\n" | 211 | " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" |
233 | "WHERE channel_id = get_chan_id($1)\n" | 212 | "ON CONFLICT DO NOTHING", 11), |
234 | " AND slave_id = get_slave_id($2)\n" | 213 | GNUNET_PQ_make_prepare ("update_message_flags", |
235 | " AND effective_since <= $3 AND did_join = 1\n" | 214 | "UPDATE messages\n" |
236 | "ORDER BY announced_at DESC LIMIT 1", 3)) || | 215 | "SET psycstore_flags = psycstore_flags | $1\n" |
237 | 216 | "WHERE channel_id = get_chan_id($2) \n" | |
238 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 217 | " AND message_id = $3 AND fragment_offset = 0", 3), |
239 | "insert_fragment", | 218 | GNUNET_PQ_make_prepare ("select_fragments", |
240 | "INSERT INTO messages\n" | 219 | "SELECT hop_counter, signature, purpose, fragment_id,\n" |
241 | " (channel_id, hop_counter, signature, purpose,\n" | 220 | " fragment_offset, message_id, group_generation,\n" |
242 | " fragment_id, fragment_offset, message_id,\n" | 221 | " multicast_flags, psycstore_flags, data\n" |
243 | " group_generation, multicast_flags, psycstore_flags, data)\n" | 222 | "FROM messages\n" |
244 | "VALUES (get_chan_id($1),\n" | 223 | "WHERE channel_id = get_chan_id($1) \n" |
245 | " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" | 224 | " AND $2 <= fragment_id AND fragment_id <= $3", 3), |
246 | "ON CONFLICT DO NOTHING", 11)) || | ||
247 | |||
248 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
249 | "update_message_flags", | ||
250 | "UPDATE messages\n" | ||
251 | "SET psycstore_flags = psycstore_flags | $1\n" | ||
252 | "WHERE channel_id = get_chan_id($2) \n" | ||
253 | " AND message_id = $3 AND fragment_offset = 0", 3)) || | ||
254 | |||
255 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
256 | "select_fragments", | ||
257 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | ||
258 | " fragment_offset, message_id, group_generation,\n" | ||
259 | " multicast_flags, psycstore_flags, data\n" | ||
260 | "FROM messages\n" | ||
261 | "WHERE channel_id = get_chan_id($1) \n" | ||
262 | " AND $2 <= fragment_id AND fragment_id <= $3", 3)) || | ||
263 | |||
264 | /** @todo select_messages: add method_prefix filter */ | 225 | /** @todo select_messages: add method_prefix filter */ |
265 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 226 | GNUNET_PQ_make_prepare ("select_messages", |
266 | "select_messages", | 227 | "SELECT hop_counter, signature, purpose, fragment_id,\n" |
267 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | 228 | " fragment_offset, message_id, group_generation,\n" |
268 | " fragment_offset, message_id, group_generation,\n" | 229 | " multicast_flags, psycstore_flags, data\n" |
269 | " multicast_flags, psycstore_flags, data\n" | 230 | "FROM messages\n" |
270 | "FROM messages\n" | 231 | "WHERE channel_id = get_chan_id($1) \n" |
271 | "WHERE channel_id = get_chan_id($1) \n" | 232 | " AND $2 <= message_id AND message_id <= $3\n" |
272 | " AND $2 <= message_id AND message_id <= $3\n" | 233 | "LIMIT $4;", 4), |
273 | "LIMIT $4;", 4)) || | ||
274 | |||
275 | /** @todo select_latest_messages: add method_prefix filter */ | 234 | /** @todo select_latest_messages: add method_prefix filter */ |
276 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 235 | GNUNET_PQ_make_prepare ("select_latest_fragments", |
277 | "select_latest_fragments", | 236 | "SELECT rev.hop_counter AS hop_counter,\n" |
278 | "SELECT rev.hop_counter AS hop_counter,\n" | 237 | " rev.signature AS signature,\n" |
279 | " rev.signature AS signature,\n" | 238 | " rev.purpose AS purpose,\n" |
280 | " rev.purpose AS purpose,\n" | 239 | " rev.fragment_id AS fragment_id,\n" |
281 | " rev.fragment_id AS fragment_id,\n" | 240 | " rev.fragment_offset AS fragment_offset,\n" |
282 | " rev.fragment_offset AS fragment_offset,\n" | 241 | " rev.message_id AS message_id,\n" |
283 | " rev.message_id AS message_id,\n" | 242 | " rev.group_generation AS group_generation,\n" |
284 | " rev.group_generation AS group_generation,\n" | 243 | " rev.multicast_flags AS multicast_flags,\n" |
285 | " rev.multicast_flags AS multicast_flags,\n" | 244 | " rev.psycstore_flags AS psycstore_flags,\n" |
286 | " rev.psycstore_flags AS psycstore_flags,\n" | 245 | " rev.data AS data\n" |
287 | " rev.data AS data\n" | 246 | " FROM\n" |
288 | " FROM\n" | 247 | " (SELECT hop_counter, signature, purpose, fragment_id,\n" |
289 | " (SELECT hop_counter, signature, purpose, fragment_id,\n" | 248 | " fragment_offset, message_id, group_generation,\n" |
290 | " fragment_offset, message_id, group_generation,\n" | 249 | " multicast_flags, psycstore_flags, data \n" |
291 | " multicast_flags, psycstore_flags, data \n" | 250 | " FROM messages\n" |
292 | " FROM messages\n" | 251 | " WHERE channel_id = get_chan_id($1) \n" |
293 | " WHERE channel_id = get_chan_id($1) \n" | 252 | " ORDER BY fragment_id DESC\n" |
294 | " ORDER BY fragment_id DESC\n" | 253 | " LIMIT $2) AS rev\n" |
295 | " LIMIT $2) AS rev\n" | 254 | " ORDER BY rev.fragment_id;", 2), |
296 | " ORDER BY rev.fragment_id;", 2)) || | 255 | GNUNET_PQ_make_prepare ("select_latest_messages", |
297 | 256 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | |
298 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 257 | " fragment_offset, message_id, group_generation,\n" |
299 | "select_latest_messages", | 258 | " multicast_flags, psycstore_flags, data\n" |
300 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | 259 | "FROM messages\n" |
301 | " fragment_offset, message_id, group_generation,\n" | 260 | "WHERE channel_id = get_chan_id($1)\n" |
302 | " multicast_flags, psycstore_flags, data\n" | 261 | " AND message_id IN\n" |
303 | "FROM messages\n" | 262 | " (SELECT message_id\n" |
304 | "WHERE channel_id = get_chan_id($1)\n" | 263 | " FROM messages\n" |
305 | " AND message_id IN\n" | 264 | " WHERE channel_id = get_chan_id($2) \n" |
306 | " (SELECT message_id\n" | 265 | " GROUP BY message_id\n" |
307 | " FROM messages\n" | 266 | " ORDER BY message_id\n" |
308 | " WHERE channel_id = get_chan_id($2) \n" | 267 | " DESC LIMIT $3)\n" |
309 | " GROUP BY message_id\n" | 268 | "ORDER BY fragment_id", 3), |
310 | " ORDER BY message_id\n" | 269 | GNUNET_PQ_make_prepare ("select_message_fragment", |
311 | " DESC LIMIT $3)\n" | 270 | "SELECT hop_counter, signature, purpose, fragment_id,\n" |
312 | "ORDER BY fragment_id", 3)) || | 271 | " fragment_offset, message_id, group_generation,\n" |
313 | 272 | " multicast_flags, psycstore_flags, data\n" | |
314 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 273 | "FROM messages\n" |
315 | "select_message_fragment", | 274 | "WHERE channel_id = get_chan_id($1) \n" |
316 | "SELECT hop_counter, signature, purpose, fragment_id,\n" | 275 | " AND message_id = $2 AND fragment_offset = $3", 3), |
317 | " fragment_offset, message_id, group_generation,\n" | 276 | GNUNET_PQ_make_prepare ("select_counters_message", |
318 | " multicast_flags, psycstore_flags, data\n" | 277 | "SELECT fragment_id, message_id, group_generation\n" |
319 | "FROM messages\n" | 278 | "FROM messages\n" |
320 | "WHERE channel_id = get_chan_id($1) \n" | 279 | "WHERE channel_id = get_chan_id($1)\n" |
321 | " AND message_id = $2 AND fragment_offset = $3", 3)) || | 280 | "ORDER BY fragment_id DESC LIMIT 1", 1), |
322 | 281 | GNUNET_PQ_make_prepare ("select_counters_state", | |
323 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 282 | "SELECT max_state_message_id\n" |
324 | "select_counters_message", | 283 | "FROM channels\n" |
325 | "SELECT fragment_id, message_id, group_generation\n" | 284 | "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1), |
326 | "FROM messages\n" | 285 | GNUNET_PQ_make_prepare ("update_max_state_message_id", |
327 | "WHERE channel_id = get_chan_id($1)\n" | 286 | "UPDATE channels\n" |
328 | "ORDER BY fragment_id DESC LIMIT 1", 1)) || | 287 | "SET max_state_message_id = $1\n" |
329 | 288 | "WHERE pub_key = $2", 2), | |
330 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 289 | |
331 | "select_counters_state", | 290 | GNUNET_PQ_make_prepare ("update_state_hash_message_id", |
332 | "SELECT max_state_message_id\n" | 291 | "UPDATE channels\n" |
333 | "FROM channels\n" | 292 | "SET state_hash_message_id = $1\n" |
334 | "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) || | 293 | "WHERE pub_key = $2", 2), |
335 | 294 | GNUNET_PQ_make_prepare ("insert_state_current", | |
336 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 295 | "INSERT INTO state\n" |
337 | "update_max_state_message_id", | 296 | " (channel_id, name, value_current, value_signed)\n" |
338 | "UPDATE channels\n" | 297 | "SELECT new.channel_id, new.name,\n" |
339 | "SET max_state_message_id = $1\n" | 298 | " new.value_current, old.value_signed\n" |
340 | "WHERE pub_key = $2", 2)) || | 299 | "FROM (SELECT get_chan_id($1) AS channel_id,\n" |
341 | 300 | " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" | |
342 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 301 | "LEFT JOIN (SELECT channel_id, name, value_signed\n" |
343 | "update_state_hash_message_id", | 302 | " FROM state) AS old\n" |
344 | "UPDATE channels\n" | 303 | "ON new.channel_id = old.channel_id AND new.name = old.name\n" |
345 | "SET state_hash_message_id = $1\n" | 304 | "ON CONFLICT (channel_id, name)\n" |
346 | "WHERE pub_key = $2", 2)) || | 305 | " DO UPDATE SET value_current = EXCLUDED.value_current,\n" |
347 | 306 | " value_signed = EXCLUDED.value_signed", 3), | |
348 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 307 | GNUNET_PQ_make_prepare ("delete_state_empty", |
349 | "insert_state_current", | 308 | "DELETE FROM state\n" |
350 | "INSERT INTO state\n" | 309 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" |
351 | " (channel_id, name, value_current, value_signed)\n" | 310 | " AND (value_current IS NULL OR length(value_current) = 0)\n" |
352 | "SELECT new.channel_id, new.name,\n" | 311 | " AND (value_signed IS NULL OR length(value_signed) = 0)", 1), |
353 | " new.value_current, old.value_signed\n" | 312 | GNUNET_PQ_make_prepare ("update_state_signed", |
354 | "FROM (SELECT get_chan_id($1) AS channel_id,\n" | 313 | "UPDATE state\n" |
355 | " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" | 314 | "SET value_signed = value_current\n" |
356 | "LEFT JOIN (SELECT channel_id, name, value_signed\n" | 315 | "WHERE channel_id = get_chan_id($1) ", 1), |
357 | " FROM state) AS old\n" | 316 | GNUNET_PQ_make_prepare ("delete_state", |
358 | "ON new.channel_id = old.channel_id AND new.name = old.name\n" | 317 | "DELETE FROM state\n" |
359 | "ON CONFLICT (channel_id, name)\n" | 318 | "WHERE channel_id = get_chan_id($1) ", 1), |
360 | " DO UPDATE SET value_current = EXCLUDED.value_current,\n" | 319 | GNUNET_PQ_make_prepare ("insert_state_sync", |
361 | " value_signed = EXCLUDED.value_signed", 3)) || | 320 | "INSERT INTO state_sync (channel_id, name, value)\n" |
362 | 321 | "VALUES (get_chan_id($1), $2, $3)", 3), | |
363 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 322 | GNUNET_PQ_make_prepare ("insert_state_from_sync", |
364 | "delete_state_empty", | 323 | "INSERT INTO state\n" |
365 | "DELETE FROM state\n" | 324 | " (channel_id, name, value_current, value_signed)\n" |
366 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" | 325 | "SELECT channel_id, name, value, value\n" |
367 | " AND (value_current IS NULL OR length(value_current) = 0)\n" | 326 | "FROM state_sync\n" |
368 | " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) || | 327 | "WHERE channel_id = get_chan_id($1)", 1), |
369 | 328 | GNUNET_PQ_make_prepare ("delete_state_sync", | |
370 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 329 | "DELETE FROM state_sync\n" |
371 | "update_state_signed", | 330 | "WHERE channel_id = get_chan_id($1)", 1), |
372 | "UPDATE state\n" | 331 | GNUNET_PQ_make_prepare ("select_state_one", |
373 | "SET value_signed = value_current\n" | 332 | "SELECT value_current\n" |
374 | "WHERE channel_id = get_chan_id($1) ", 1)) || | 333 | "FROM state\n" |
375 | 334 | "WHERE channel_id = get_chan_id($1)\n" | |
376 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 335 | " AND name = $2", 2), |
377 | "delete_state", | 336 | GNUNET_PQ_make_prepare ("select_state_prefix", |
378 | "DELETE FROM state\n" | 337 | "SELECT name, value_current\n" |
379 | "WHERE channel_id = get_chan_id($1) ", 1)) || | 338 | "FROM state\n" |
380 | 339 | "WHERE channel_id = get_chan_id($1)\n" | |
381 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 340 | " AND (name = $2 OR substr(name, 1, $3) = $4)", 4), |
382 | "insert_state_sync", | 341 | GNUNET_PQ_make_prepare ("select_state_signed", |
383 | "INSERT INTO state_sync (channel_id, name, value)\n" | 342 | "SELECT name, value_signed\n" |
384 | "VALUES (get_chan_id($1), $2, $3)", 3)) || | 343 | "FROM state\n" |
385 | 344 | "WHERE channel_id = get_chan_id($1)\n" | |
386 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 345 | " AND value_signed IS NOT NULL", 1), |
387 | "insert_state_from_sync", | 346 | GNUNET_PQ_PREPARED_STATEMENT_END |
388 | "INSERT INTO state\n" | 347 | }; |
389 | " (channel_id, name, value_current, value_signed)\n" | 348 | |
390 | "SELECT channel_id, name, value, value\n" | 349 | if (GNUNET_OK != |
391 | "FROM state_sync\n" | 350 | GNUNET_PQ_prepare_statements (plugin->dbh, |
392 | "WHERE channel_id = get_chan_id($1)", 1)) || | 351 | ps)) |
393 | 352 | { | |
394 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | 353 | PQfinish (plugin->dbh); |
395 | "delete_state_sync", | 354 | plugin->dbh = NULL; |
396 | "DELETE FROM state_sync\n" | 355 | return GNUNET_SYSERR; |
397 | "WHERE channel_id = get_chan_id($1)", 1)) || | 356 | } |
398 | |||
399 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
400 | "select_state_one", | ||
401 | "SELECT value_current\n" | ||
402 | "FROM state\n" | ||
403 | "WHERE channel_id = get_chan_id($1)\n" | ||
404 | " AND name = $2", 2)) || | ||
405 | |||
406 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
407 | "select_state_prefix", | ||
408 | "SELECT name, value_current\n" | ||
409 | "FROM state\n" | ||
410 | "WHERE channel_id = get_chan_id($1)\n" | ||
411 | " AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) || | ||
412 | |||
413 | (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, | ||
414 | "select_state_signed", | ||
415 | "SELECT name, value_signed\n" | ||
416 | "FROM state\n" | ||
417 | "WHERE channel_id = get_chan_id($1)\n" | ||
418 | " AND value_signed IS NOT NULL", 1))) | ||
419 | { | ||
420 | PQfinish (plugin->dbh); | ||
421 | plugin->dbh = NULL; | ||
422 | return GNUNET_SYSERR; | ||
423 | } | 357 | } |
424 | 358 | ||
425 | return GNUNET_OK; | 359 | return GNUNET_OK; |
@@ -452,22 +386,15 @@ static int | |||
452 | exec_channel (struct Plugin *plugin, const char *stmt, | 386 | exec_channel (struct Plugin *plugin, const char *stmt, |
453 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | 387 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) |
454 | { | 388 | { |
455 | PGresult *ret; | ||
456 | struct GNUNET_PQ_QueryParam params[] = { | 389 | struct GNUNET_PQ_QueryParam params[] = { |
457 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 390 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
458 | GNUNET_PQ_query_param_end | 391 | GNUNET_PQ_query_param_end |
459 | }; | 392 | }; |
460 | 393 | ||
461 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | 394 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
462 | if (GNUNET_OK != | 395 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) |
463 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
464 | ret, | ||
465 | PGRES_COMMAND_OK, | ||
466 | "PQexecPrepared", stmt)) | ||
467 | return GNUNET_SYSERR; | 396 | return GNUNET_SYSERR; |
468 | 397 | ||
469 | PQclear (ret); | ||
470 | |||
471 | return GNUNET_OK; | 398 | return GNUNET_OK; |
472 | } | 399 | } |
473 | 400 | ||
@@ -478,23 +405,15 @@ exec_channel (struct Plugin *plugin, const char *stmt, | |||
478 | static int | 405 | static int |
479 | transaction_begin (struct Plugin *plugin, enum Transactions transaction) | 406 | transaction_begin (struct Plugin *plugin, enum Transactions transaction) |
480 | { | 407 | { |
481 | PGresult *ret; | ||
482 | struct GNUNET_PQ_QueryParam params[] = { | 408 | struct GNUNET_PQ_QueryParam params[] = { |
483 | GNUNET_PQ_query_param_end | 409 | GNUNET_PQ_query_param_end |
484 | }; | 410 | }; |
485 | 411 | ||
486 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params); | 412 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
487 | if (GNUNET_OK != | 413 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params)) |
488 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
489 | ret, | ||
490 | PGRES_COMMAND_OK, | ||
491 | "PQexecPrepared", "transaction_begin")) | ||
492 | { | ||
493 | return GNUNET_SYSERR; | 414 | return GNUNET_SYSERR; |
494 | } | ||
495 | 415 | ||
496 | plugin->transaction = transaction; | 416 | plugin->transaction = transaction; |
497 | PQclear (ret); | ||
498 | return GNUNET_OK; | 417 | return GNUNET_OK; |
499 | } | 418 | } |
500 | 419 | ||
@@ -505,23 +424,14 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction) | |||
505 | static int | 424 | static int |
506 | transaction_commit (struct Plugin *plugin) | 425 | transaction_commit (struct Plugin *plugin) |
507 | { | 426 | { |
508 | PGresult *ret; | ||
509 | |||
510 | struct GNUNET_PQ_QueryParam params[] = { | 427 | struct GNUNET_PQ_QueryParam params[] = { |
511 | GNUNET_PQ_query_param_end | 428 | GNUNET_PQ_query_param_end |
512 | }; | 429 | }; |
513 | 430 | ||
514 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params); | 431 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
515 | if (GNUNET_OK != | 432 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params)) |
516 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
517 | ret, | ||
518 | PGRES_COMMAND_OK, | ||
519 | "PQexecPrepared", "transaction_commit")) | ||
520 | { | ||
521 | return GNUNET_SYSERR; | 433 | return GNUNET_SYSERR; |
522 | } | ||
523 | 434 | ||
524 | PQclear (ret); | ||
525 | plugin->transaction = TRANSACTION_NONE; | 435 | plugin->transaction = TRANSACTION_NONE; |
526 | return GNUNET_OK; | 436 | return GNUNET_OK; |
527 | } | 437 | } |
@@ -533,23 +443,14 @@ transaction_commit (struct Plugin *plugin) | |||
533 | static int | 443 | static int |
534 | transaction_rollback (struct Plugin *plugin) | 444 | transaction_rollback (struct Plugin *plugin) |
535 | { | 445 | { |
536 | PGresult *ret; | ||
537 | |||
538 | struct GNUNET_PQ_QueryParam params[] = { | 446 | struct GNUNET_PQ_QueryParam params[] = { |
539 | GNUNET_PQ_query_param_end | 447 | GNUNET_PQ_query_param_end |
540 | }; | 448 | }; |
541 | 449 | ||
542 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params); | 450 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
543 | if (GNUNET_OK != | 451 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params)) |
544 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
545 | ret, | ||
546 | PGRES_COMMAND_OK, | ||
547 | "PQexecPrepared", "transaction_rollback")) | ||
548 | { | ||
549 | return GNUNET_SYSERR; | 452 | return GNUNET_SYSERR; |
550 | } | ||
551 | 453 | ||
552 | PQclear (ret); | ||
553 | plugin->transaction = TRANSACTION_NONE; | 454 | plugin->transaction = TRANSACTION_NONE; |
554 | return GNUNET_OK; | 455 | return GNUNET_OK; |
555 | } | 456 | } |
@@ -559,24 +460,15 @@ static int | |||
559 | channel_key_store (struct Plugin *plugin, | 460 | channel_key_store (struct Plugin *plugin, |
560 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) | 461 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) |
561 | { | 462 | { |
562 | PGresult *ret; | ||
563 | |||
564 | struct GNUNET_PQ_QueryParam params[] = { | 463 | struct GNUNET_PQ_QueryParam params[] = { |
565 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 464 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
566 | GNUNET_PQ_query_param_end | 465 | GNUNET_PQ_query_param_end |
567 | }; | 466 | }; |
568 | 467 | ||
569 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params); | 468 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
570 | if (GNUNET_OK != | 469 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_channel_key", params)) |
571 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
572 | ret, | ||
573 | PGRES_COMMAND_OK, | ||
574 | "PQexecPrepared", "insert_channel_key")) | ||
575 | { | ||
576 | return GNUNET_SYSERR; | 470 | return GNUNET_SYSERR; |
577 | } | ||
578 | 471 | ||
579 | PQclear (ret); | ||
580 | return GNUNET_OK; | 472 | return GNUNET_OK; |
581 | } | 473 | } |
582 | 474 | ||
@@ -585,24 +477,15 @@ static int | |||
585 | slave_key_store (struct Plugin *plugin, | 477 | slave_key_store (struct Plugin *plugin, |
586 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) | 478 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) |
587 | { | 479 | { |
588 | PGresult *ret; | ||
589 | |||
590 | struct GNUNET_PQ_QueryParam params[] = { | 480 | struct GNUNET_PQ_QueryParam params[] = { |
591 | GNUNET_PQ_query_param_auto_from_type (slave_key), | 481 | GNUNET_PQ_query_param_auto_from_type (slave_key), |
592 | GNUNET_PQ_query_param_end | 482 | GNUNET_PQ_query_param_end |
593 | }; | 483 | }; |
594 | 484 | ||
595 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params); | 485 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
596 | if (GNUNET_OK != | 486 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params)) |
597 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
598 | ret, | ||
599 | PGRES_COMMAND_OK, | ||
600 | "PQexecPrepared", "insert_slave_key")) | ||
601 | { | ||
602 | return GNUNET_SYSERR; | 487 | return GNUNET_SYSERR; |
603 | } | ||
604 | 488 | ||
605 | PQclear (ret); | ||
606 | return GNUNET_OK; | 489 | return GNUNET_OK; |
607 | } | 490 | } |
608 | 491 | ||
@@ -624,7 +507,6 @@ postgres_membership_store (void *cls, | |||
624 | uint64_t effective_since, | 507 | uint64_t effective_since, |
625 | uint64_t group_generation) | 508 | uint64_t group_generation) |
626 | { | 509 | { |
627 | PGresult *ret; | ||
628 | struct Plugin *plugin = cls; | 510 | struct Plugin *plugin = cls; |
629 | 511 | ||
630 | uint32_t idid_join = (uint32_t)did_join; | 512 | uint32_t idid_join = (uint32_t)did_join; |
@@ -653,17 +535,10 @@ postgres_membership_store (void *cls, | |||
653 | GNUNET_PQ_query_param_end | 535 | GNUNET_PQ_query_param_end |
654 | }; | 536 | }; |
655 | 537 | ||
656 | ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params); | 538 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
657 | if (GNUNET_OK != | 539 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_membership", params)) |
658 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
659 | ret, | ||
660 | PGRES_COMMAND_OK, | ||
661 | "PQexecPrepared", "insert_membership")) | ||
662 | { | ||
663 | return GNUNET_SYSERR; | 540 | return GNUNET_SYSERR; |
664 | } | ||
665 | 541 | ||
666 | PQclear (ret); | ||
667 | return GNUNET_OK; | 542 | return GNUNET_OK; |
668 | } | 543 | } |
669 | 544 | ||
@@ -681,13 +556,10 @@ membership_test (void *cls, | |||
681 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 556 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
682 | uint64_t message_id) | 557 | uint64_t message_id) |
683 | { | 558 | { |
684 | PGresult *res; | ||
685 | struct Plugin *plugin = cls; | 559 | struct Plugin *plugin = cls; |
686 | 560 | ||
687 | uint32_t did_join = 0; | 561 | uint32_t did_join = 0; |
688 | 562 | ||
689 | int ret = GNUNET_SYSERR; | ||
690 | |||
691 | struct GNUNET_PQ_QueryParam params_select[] = { | 563 | struct GNUNET_PQ_QueryParam params_select[] = { |
692 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 564 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
693 | GNUNET_PQ_query_param_auto_from_type (slave_key), | 565 | GNUNET_PQ_query_param_auto_from_type (slave_key), |
@@ -695,35 +567,17 @@ membership_test (void *cls, | |||
695 | GNUNET_PQ_query_param_end | 567 | GNUNET_PQ_query_param_end |
696 | }; | 568 | }; |
697 | 569 | ||
698 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select); | ||
699 | if (GNUNET_OK != | ||
700 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
701 | res, | ||
702 | PGRES_TUPLES_OK, | ||
703 | "PQexecPrepared", "select_membership")) | ||
704 | { | ||
705 | return GNUNET_SYSERR; | ||
706 | } | ||
707 | |||
708 | struct GNUNET_PQ_ResultSpec results_select[] = { | 570 | struct GNUNET_PQ_ResultSpec results_select[] = { |
709 | GNUNET_PQ_result_spec_uint32 ("did_join", &did_join), | 571 | GNUNET_PQ_result_spec_uint32 ("did_join", &did_join), |
710 | GNUNET_PQ_result_spec_end | 572 | GNUNET_PQ_result_spec_end |
711 | }; | 573 | }; |
712 | 574 | ||
713 | switch (GNUNET_PQ_extract_result (res, results_select, 0)) | 575 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != |
714 | { | 576 | GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership", |
715 | case GNUNET_OK: | 577 | params_select, results_select)) |
716 | ret = GNUNET_YES; | 578 | return GNUNET_SYSERR; |
717 | break; | ||
718 | |||
719 | default: | ||
720 | ret = GNUNET_NO; | ||
721 | break; | ||
722 | } | ||
723 | 579 | ||
724 | PQclear (res); | 580 | return GNUNET_OK; |
725 | |||
726 | return ret; | ||
727 | } | 581 | } |
728 | 582 | ||
729 | /** | 583 | /** |
@@ -739,7 +593,6 @@ fragment_store (void *cls, | |||
739 | const struct GNUNET_MULTICAST_MessageHeader *msg, | 593 | const struct GNUNET_MULTICAST_MessageHeader *msg, |
740 | uint32_t psycstore_flags) | 594 | uint32_t psycstore_flags) |
741 | { | 595 | { |
742 | PGresult *res; | ||
743 | struct Plugin *plugin = cls; | 596 | struct Plugin *plugin = cls; |
744 | 597 | ||
745 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); | 598 | GNUNET_assert (TRANSACTION_NONE == plugin->transaction); |
@@ -782,15 +635,10 @@ fragment_store (void *cls, | |||
782 | GNUNET_PQ_query_param_end | 635 | GNUNET_PQ_query_param_end |
783 | }; | 636 | }; |
784 | 637 | ||
785 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert); | 638 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
786 | if (GNUNET_OK != | 639 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert)) |
787 | GNUNET_POSTGRES_check_result (plugin->dbh, | ||
788 | res, | ||
789 | PGRES_COMMAND_OK, | ||
790 | "PQexecPrepared", "insert_fragment")) | ||
791 | return GNUNET_SYSERR; | 640 | return GNUNET_SYSERR; |
792 | 641 | ||
793 | PQclear (res); | ||
794 | return GNUNET_OK; | 642 | return GNUNET_OK; |
795 | } | 643 | } |
796 | 644 | ||
@@ -807,7 +655,6 @@ message_add_flags (void *cls, | |||
807 | uint64_t message_id, | 655 | uint64_t message_id, |
808 | uint32_t psycstore_flags) | 656 | uint32_t psycstore_flags) |
809 | { | 657 | { |
810 | PGresult *res; | ||
811 | struct Plugin *plugin = cls; | 658 | struct Plugin *plugin = cls; |
812 | 659 | ||
813 | struct GNUNET_PQ_QueryParam params_update[] = { | 660 | struct GNUNET_PQ_QueryParam params_update[] = { |
@@ -817,74 +664,80 @@ message_add_flags (void *cls, | |||
817 | GNUNET_PQ_query_param_end | 664 | GNUNET_PQ_query_param_end |
818 | }; | 665 | }; |
819 | 666 | ||
820 | res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update); | 667 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
821 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 668 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update)) |
822 | res, | ||
823 | PGRES_COMMAND_OK, | ||
824 | "PQexecPrepared","update_message_flags")) | ||
825 | return GNUNET_SYSERR; | 669 | return GNUNET_SYSERR; |
826 | 670 | ||
827 | PQclear (res); | ||
828 | return GNUNET_OK; | 671 | return GNUNET_OK; |
829 | } | 672 | } |
830 | 673 | ||
831 | 674 | ||
832 | static int | 675 | /** |
833 | fragment_row (struct Plugin *plugin, | 676 | * Closure for #fragment_rows. |
834 | const char *stmt, | 677 | */ |
835 | PGresult *res, | 678 | struct FragmentRowsContext { |
836 | GNUNET_PSYCSTORE_FragmentCallback cb, | 679 | GNUNET_PSYCSTORE_FragmentCallback cb; |
837 | void *cb_cls, | 680 | void *cb_cls; |
838 | uint64_t *returned_fragments) | ||
839 | { | ||
840 | uint32_t hop_counter; | ||
841 | void *signature = NULL; | ||
842 | void *purpose = NULL; | ||
843 | size_t signature_size; | ||
844 | size_t purpose_size; | ||
845 | |||
846 | uint64_t fragment_id; | ||
847 | uint64_t fragment_offset; | ||
848 | uint64_t message_id; | ||
849 | uint64_t group_generation; | ||
850 | uint32_t flags; | ||
851 | void *buf; | ||
852 | size_t buf_size; | ||
853 | int ret = GNUNET_SYSERR; | ||
854 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
855 | |||
856 | uint32_t msg_flags; | ||
857 | |||
858 | struct GNUNET_PQ_ResultSpec results[] = { | ||
859 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | ||
860 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | ||
861 | GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), | ||
862 | GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), | ||
863 | GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), | ||
864 | GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), | ||
865 | GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), | ||
866 | GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), | ||
867 | GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), | ||
868 | GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), | ||
869 | GNUNET_PQ_result_spec_end | ||
870 | }; | ||
871 | 681 | ||
872 | if (GNUNET_OK != | 682 | uint64_t *returned_fragments; |
873 | GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK, | ||
874 | "PQexecPrepared", | ||
875 | stmt)) | ||
876 | { | ||
877 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
878 | "Failing fragment lookup (postgres error)\n"); | ||
879 | return GNUNET_SYSERR; | ||
880 | } | ||
881 | 683 | ||
882 | int nrows = PQntuples (res); | 684 | /* I preserved this but I do not see the point since |
883 | for (int row = 0; row < nrows; row++) | 685 | * it cannot stop the loop early and gets overwritten ?? */ |
686 | int ret; | ||
687 | }; | ||
688 | |||
689 | |||
690 | /** | ||
691 | * Callback that retrieves the results of a SELECT statement | ||
692 | * reading form the messages table. | ||
693 | * | ||
694 | * Only passed to GNUNET_PQ_eval_prepared_multi_select and | ||
695 | * has type GNUNET_PQ_PostgresResultHandler. | ||
696 | * | ||
697 | * @param cls closure | ||
698 | * @param result the postgres result | ||
699 | * @param num_result the number of results in @a result | ||
700 | */ | ||
701 | void fragment_rows (void *cls, | ||
702 | PGresult *res, | ||
703 | unsigned int num_results) | ||
704 | { | ||
705 | struct FragmentRowsContext *c = cls; | ||
706 | |||
707 | for (unsigned int i=0;i<num_results;i++) | ||
884 | { | 708 | { |
885 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) | 709 | uint32_t hop_counter; |
710 | void *signature = NULL; | ||
711 | void *purpose = NULL; | ||
712 | size_t signature_size; | ||
713 | size_t purpose_size; | ||
714 | uint64_t fragment_id; | ||
715 | uint64_t fragment_offset; | ||
716 | uint64_t message_id; | ||
717 | uint64_t group_generation; | ||
718 | uint32_t flags; | ||
719 | void *buf; | ||
720 | size_t buf_size; | ||
721 | uint32_t msg_flags; | ||
722 | struct GNUNET_PQ_ResultSpec results[] = { | ||
723 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | ||
724 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | ||
725 | GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), | ||
726 | GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), | ||
727 | GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), | ||
728 | GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), | ||
729 | GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), | ||
730 | GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), | ||
731 | GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), | ||
732 | GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), | ||
733 | GNUNET_PQ_result_spec_end | ||
734 | }; | ||
735 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
736 | |||
737 | if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i)) | ||
886 | { | 738 | { |
887 | break; | 739 | GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */ |
740 | break; /* nothing more?? */ | ||
888 | } | 741 | } |
889 | 742 | ||
890 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); | 743 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); |
@@ -893,11 +746,9 @@ fragment_row (struct Plugin *plugin, | |||
893 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | 746 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); |
894 | mp->hop_counter = htonl (hop_counter); | 747 | mp->hop_counter = htonl (hop_counter); |
895 | GNUNET_memcpy (&mp->signature, | 748 | GNUNET_memcpy (&mp->signature, |
896 | signature, | 749 | signature, signature_size); |
897 | signature_size); | ||
898 | GNUNET_memcpy (&mp->purpose, | 750 | GNUNET_memcpy (&mp->purpose, |
899 | purpose, | 751 | purpose, purpose_size); |
900 | purpose_size); | ||
901 | mp->fragment_id = GNUNET_htonll (fragment_id); | 752 | mp->fragment_id = GNUNET_htonll (fragment_id); |
902 | mp->fragment_offset = GNUNET_htonll (fragment_offset); | 753 | mp->fragment_offset = GNUNET_htonll (fragment_offset); |
903 | mp->message_id = GNUNET_htonll (message_id); | 754 | mp->message_id = GNUNET_htonll (message_id); |
@@ -905,15 +756,12 @@ fragment_row (struct Plugin *plugin, | |||
905 | mp->flags = htonl(msg_flags); | 756 | mp->flags = htonl(msg_flags); |
906 | 757 | ||
907 | GNUNET_memcpy (&mp[1], | 758 | GNUNET_memcpy (&mp[1], |
908 | buf, | 759 | buf, buf_size); |
909 | buf_size); | ||
910 | GNUNET_PQ_cleanup_result(results); | 760 | GNUNET_PQ_cleanup_result(results); |
911 | ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); | 761 | c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); |
912 | if (NULL != returned_fragments) | 762 | if (NULL != c->returned_fragments) |
913 | (*returned_fragments)++; | 763 | (*c->returned_fragments)++; |
914 | } | 764 | } |
915 | |||
916 | return ret; | ||
917 | } | 765 | } |
918 | 766 | ||
919 | 767 | ||
@@ -925,26 +773,19 @@ fragment_select (struct Plugin *plugin, | |||
925 | GNUNET_PSYCSTORE_FragmentCallback cb, | 773 | GNUNET_PSYCSTORE_FragmentCallback cb, |
926 | void *cb_cls) | 774 | void *cb_cls) |
927 | { | 775 | { |
928 | PGresult *res; | 776 | /* Stack based closure */ |
929 | int ret = GNUNET_SYSERR; | 777 | struct FragmentRowsContext frc = { |
930 | 778 | .cb = cb, | |
931 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | 779 | .cb_cls = cb_cls, |
932 | if (GNUNET_YES == | 780 | .returned_fragments = returned_fragments, |
933 | GNUNET_POSTGRES_check_result (plugin->dbh, | 781 | .ret = GNUNET_SYSERR |
934 | res, | 782 | }; |
935 | PGRES_TUPLES_OK, | ||
936 | "PQexecPrepared", stmt)) | ||
937 | { | ||
938 | if (PQntuples (res) == 0) | ||
939 | ret = GNUNET_NO; | ||
940 | else | ||
941 | { | ||
942 | ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments); | ||
943 | } | ||
944 | PQclear (res); | ||
945 | } | ||
946 | 783 | ||
947 | return ret; | 784 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
785 | stmt, params, | ||
786 | &fragment_rows, &frc)) | ||
787 | return GNUNET_SYSERR; | ||
788 | return frc.ret; /* GNUNET_OK ?? */ | ||
948 | } | 789 | } |
949 | 790 | ||
950 | /** | 791 | /** |
@@ -964,8 +805,6 @@ fragment_get (void *cls, | |||
964 | void *cb_cls) | 805 | void *cb_cls) |
965 | { | 806 | { |
966 | struct Plugin *plugin = cls; | 807 | struct Plugin *plugin = cls; |
967 | *returned_fragments = 0; | ||
968 | |||
969 | struct GNUNET_PQ_QueryParam params_select[] = { | 808 | struct GNUNET_PQ_QueryParam params_select[] = { |
970 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 809 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
971 | GNUNET_PQ_query_param_uint64 (&first_fragment_id), | 810 | GNUNET_PQ_query_param_uint64 (&first_fragment_id), |
@@ -973,7 +812,12 @@ fragment_get (void *cls, | |||
973 | GNUNET_PQ_query_param_end | 812 | GNUNET_PQ_query_param_end |
974 | }; | 813 | }; |
975 | 814 | ||
976 | return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls); | 815 | *returned_fragments = 0; |
816 | return fragment_select (plugin, | ||
817 | "select_fragments", | ||
818 | params_select, | ||
819 | returned_fragments, | ||
820 | cb, cb_cls); | ||
977 | } | 821 | } |
978 | 822 | ||
979 | 823 | ||
@@ -1002,7 +846,11 @@ fragment_get_latest (void *cls, | |||
1002 | GNUNET_PQ_query_param_end | 846 | GNUNET_PQ_query_param_end |
1003 | }; | 847 | }; |
1004 | 848 | ||
1005 | return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls); | 849 | return fragment_select (plugin, |
850 | "select_latest_fragments", | ||
851 | params_select, | ||
852 | returned_fragments, | ||
853 | cb, cb_cls); | ||
1006 | } | 854 | } |
1007 | 855 | ||
1008 | 856 | ||
@@ -1024,11 +872,6 @@ message_get (void *cls, | |||
1024 | void *cb_cls) | 872 | void *cb_cls) |
1025 | { | 873 | { |
1026 | struct Plugin *plugin = cls; | 874 | struct Plugin *plugin = cls; |
1027 | *returned_fragments = 0; | ||
1028 | |||
1029 | if (0 == fragment_limit) | ||
1030 | fragment_limit = INT64_MAX; | ||
1031 | |||
1032 | struct GNUNET_PQ_QueryParam params_select[] = { | 875 | struct GNUNET_PQ_QueryParam params_select[] = { |
1033 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 876 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1034 | GNUNET_PQ_query_param_uint64 (&first_message_id), | 877 | GNUNET_PQ_query_param_uint64 (&first_message_id), |
@@ -1037,7 +880,14 @@ message_get (void *cls, | |||
1037 | GNUNET_PQ_query_param_end | 880 | GNUNET_PQ_query_param_end |
1038 | }; | 881 | }; |
1039 | 882 | ||
1040 | return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls); | 883 | if (0 == fragment_limit) |
884 | fragment_limit = INT64_MAX; | ||
885 | *returned_fragments = 0; | ||
886 | return fragment_select (plugin, | ||
887 | "select_messages", | ||
888 | params_select, | ||
889 | returned_fragments, | ||
890 | cb, cb_cls); | ||
1041 | } | 891 | } |
1042 | 892 | ||
1043 | 893 | ||
@@ -1057,8 +907,6 @@ message_get_latest (void *cls, | |||
1057 | void *cb_cls) | 907 | void *cb_cls) |
1058 | { | 908 | { |
1059 | struct Plugin *plugin = cls; | 909 | struct Plugin *plugin = cls; |
1060 | *returned_fragments = 0; | ||
1061 | |||
1062 | struct GNUNET_PQ_QueryParam params_select[] = { | 910 | struct GNUNET_PQ_QueryParam params_select[] = { |
1063 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 911 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1064 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 912 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
@@ -1066,7 +914,12 @@ message_get_latest (void *cls, | |||
1066 | GNUNET_PQ_query_param_end | 914 | GNUNET_PQ_query_param_end |
1067 | }; | 915 | }; |
1068 | 916 | ||
1069 | return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls); | 917 | *returned_fragments = 0; |
918 | return fragment_select (plugin, | ||
919 | "select_latest_messages", | ||
920 | params_select, | ||
921 | returned_fragments, | ||
922 | cb, cb_cls); | ||
1070 | } | 923 | } |
1071 | 924 | ||
1072 | 925 | ||
@@ -1086,9 +939,7 @@ message_get_fragment (void *cls, | |||
1086 | GNUNET_PSYCSTORE_FragmentCallback cb, | 939 | GNUNET_PSYCSTORE_FragmentCallback cb, |
1087 | void *cb_cls) | 940 | void *cb_cls) |
1088 | { | 941 | { |
1089 | PGresult *res; | ||
1090 | struct Plugin *plugin = cls; | 942 | struct Plugin *plugin = cls; |
1091 | int ret = GNUNET_SYSERR; | ||
1092 | const char *stmt = "select_message_fragment"; | 943 | const char *stmt = "select_message_fragment"; |
1093 | 944 | ||
1094 | struct GNUNET_PQ_QueryParam params_select[] = { | 945 | struct GNUNET_PQ_QueryParam params_select[] = { |
@@ -1098,21 +949,19 @@ message_get_fragment (void *cls, | |||
1098 | GNUNET_PQ_query_param_end | 949 | GNUNET_PQ_query_param_end |
1099 | }; | 950 | }; |
1100 | 951 | ||
1101 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 952 | /* Stack based closure */ |
1102 | if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh, | 953 | struct FragmentRowsContext frc = { |
1103 | res, | 954 | .cb = cb, |
1104 | PGRES_TUPLES_OK, | 955 | .cb_cls = cb_cls, |
1105 | "PQexecPrepared", stmt)) | 956 | .returned_fragments = NULL, |
1106 | { | 957 | .ret = GNUNET_SYSERR |
1107 | if (PQntuples (res) == 0) | 958 | }; |
1108 | ret = GNUNET_NO; | ||
1109 | else | ||
1110 | ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL); | ||
1111 | |||
1112 | PQclear (res); | ||
1113 | } | ||
1114 | 959 | ||
1115 | return ret; | 960 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
961 | stmt, params_select, | ||
962 | &fragment_rows, &frc)) | ||
963 | return GNUNET_SYSERR; | ||
964 | return frc.ret; /* GNUNET_OK ?? */ | ||
1116 | } | 965 | } |
1117 | 966 | ||
1118 | /** | 967 | /** |
@@ -1129,7 +978,6 @@ counters_message_get (void *cls, | |||
1129 | uint64_t *max_message_id, | 978 | uint64_t *max_message_id, |
1130 | uint64_t *max_group_generation) | 979 | uint64_t *max_group_generation) |
1131 | { | 980 | { |
1132 | PGresult *res; | ||
1133 | struct Plugin *plugin = cls; | 981 | struct Plugin *plugin = cls; |
1134 | 982 | ||
1135 | const char *stmt = "select_counters_message"; | 983 | const char *stmt = "select_counters_message"; |
@@ -1139,15 +987,6 @@ counters_message_get (void *cls, | |||
1139 | GNUNET_PQ_query_param_end | 987 | GNUNET_PQ_query_param_end |
1140 | }; | 988 | }; |
1141 | 989 | ||
1142 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1143 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1144 | res, | ||
1145 | PGRES_TUPLES_OK, | ||
1146 | "PQexecPrepared", stmt)) | ||
1147 | { | ||
1148 | return GNUNET_SYSERR; | ||
1149 | } | ||
1150 | |||
1151 | struct GNUNET_PQ_ResultSpec results_select[] = { | 990 | struct GNUNET_PQ_ResultSpec results_select[] = { |
1152 | GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id), | 991 | GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id), |
1153 | GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id), | 992 | GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id), |
@@ -1155,14 +994,10 @@ counters_message_get (void *cls, | |||
1155 | GNUNET_PQ_result_spec_end | 994 | GNUNET_PQ_result_spec_end |
1156 | }; | 995 | }; |
1157 | 996 | ||
1158 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0)) | 997 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != |
1159 | { | 998 | GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, |
1160 | PQclear (res); | 999 | params_select, results_select)) |
1161 | return GNUNET_SYSERR; | 1000 | return GNUNET_SYSERR; |
1162 | } | ||
1163 | |||
1164 | GNUNET_PQ_cleanup_result(results_select); | ||
1165 | PQclear (res); | ||
1166 | 1001 | ||
1167 | return GNUNET_OK; | 1002 | return GNUNET_OK; |
1168 | } | 1003 | } |
@@ -1179,44 +1014,26 @@ counters_state_get (void *cls, | |||
1179 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1014 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1180 | uint64_t *max_state_message_id) | 1015 | uint64_t *max_state_message_id) |
1181 | { | 1016 | { |
1182 | PGresult *res; | ||
1183 | struct Plugin *plugin = cls; | 1017 | struct Plugin *plugin = cls; |
1184 | 1018 | ||
1185 | const char *stmt = "select_counters_state"; | 1019 | const char *stmt = "select_counters_state"; |
1186 | 1020 | ||
1187 | int ret = GNUNET_SYSERR; | ||
1188 | |||
1189 | struct GNUNET_PQ_QueryParam params_select[] = { | 1021 | struct GNUNET_PQ_QueryParam params_select[] = { |
1190 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1022 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1191 | GNUNET_PQ_query_param_end | 1023 | GNUNET_PQ_query_param_end |
1192 | }; | 1024 | }; |
1193 | 1025 | ||
1194 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | ||
1195 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | ||
1196 | res, | ||
1197 | PGRES_TUPLES_OK, | ||
1198 | "PQexecPrepared", stmt)) | ||
1199 | { | ||
1200 | return GNUNET_SYSERR; | ||
1201 | } | ||
1202 | |||
1203 | struct GNUNET_PQ_ResultSpec results_select[] = { | 1026 | struct GNUNET_PQ_ResultSpec results_select[] = { |
1204 | GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id), | 1027 | GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id), |
1205 | GNUNET_PQ_result_spec_end | 1028 | GNUNET_PQ_result_spec_end |
1206 | }; | 1029 | }; |
1207 | 1030 | ||
1208 | ret = GNUNET_PQ_extract_result (res, results_select, 0); | 1031 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != |
1209 | 1032 | GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, | |
1210 | if (GNUNET_OK != ret) | 1033 | params_select, results_select)) |
1211 | { | 1034 | return GNUNET_SYSERR; |
1212 | PQclear (res); | ||
1213 | return GNUNET_SYSERR; | ||
1214 | } | ||
1215 | |||
1216 | GNUNET_PQ_cleanup_result(results_select); | ||
1217 | PQclear (res); | ||
1218 | 1035 | ||
1219 | return ret; | 1036 | return GNUNET_OK; |
1220 | } | 1037 | } |
1221 | 1038 | ||
1222 | 1039 | ||
@@ -1230,8 +1047,6 @@ state_assign (struct Plugin *plugin, const char *stmt, | |||
1230 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1047 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1231 | const char *name, const void *value, size_t value_size) | 1048 | const char *name, const void *value, size_t value_size) |
1232 | { | 1049 | { |
1233 | PGresult *res; | ||
1234 | |||
1235 | struct GNUNET_PQ_QueryParam params[] = { | 1050 | struct GNUNET_PQ_QueryParam params[] = { |
1236 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1051 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1237 | GNUNET_PQ_query_param_string (name), | 1052 | GNUNET_PQ_query_param_string (name), |
@@ -1239,44 +1054,29 @@ state_assign (struct Plugin *plugin, const char *stmt, | |||
1239 | GNUNET_PQ_query_param_end | 1054 | GNUNET_PQ_query_param_end |
1240 | }; | 1055 | }; |
1241 | 1056 | ||
1242 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | 1057 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
1243 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1058 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) |
1244 | res, | ||
1245 | PGRES_COMMAND_OK, | ||
1246 | "PQexecPrepared", stmt)) | ||
1247 | { | ||
1248 | return GNUNET_SYSERR; | 1059 | return GNUNET_SYSERR; |
1249 | } | ||
1250 | |||
1251 | PQclear (res); | ||
1252 | 1060 | ||
1253 | return GNUNET_OK; | 1061 | return GNUNET_OK; |
1254 | } | 1062 | } |
1255 | 1063 | ||
1256 | 1064 | ||
1257 | static int | 1065 | static int |
1258 | update_message_id (struct Plugin *plugin, const char *stmt, | 1066 | update_message_id (struct Plugin *plugin, |
1067 | const char *stmt, | ||
1259 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1068 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1260 | uint64_t message_id) | 1069 | uint64_t message_id) |
1261 | { | 1070 | { |
1262 | PGresult *res; | ||
1263 | |||
1264 | struct GNUNET_PQ_QueryParam params[] = { | 1071 | struct GNUNET_PQ_QueryParam params[] = { |
1265 | GNUNET_PQ_query_param_uint64 (&message_id), | 1072 | GNUNET_PQ_query_param_uint64 (&message_id), |
1266 | GNUNET_PQ_query_param_auto_from_type (channel_key), | 1073 | GNUNET_PQ_query_param_auto_from_type (channel_key), |
1267 | GNUNET_PQ_query_param_end | 1074 | GNUNET_PQ_query_param_end |
1268 | }; | 1075 | }; |
1269 | 1076 | ||
1270 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | 1077 | if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != |
1271 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1078 | GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) |
1272 | res, | ||
1273 | PGRES_COMMAND_OK, | ||
1274 | "PQexecPrepared", stmt)) | ||
1275 | { | ||
1276 | return GNUNET_SYSERR; | 1079 | return GNUNET_SYSERR; |
1277 | } | ||
1278 | |||
1279 | PQclear (res); | ||
1280 | 1080 | ||
1281 | return GNUNET_OK; | 1081 | return GNUNET_OK; |
1282 | } | 1082 | } |
@@ -1487,10 +1287,7 @@ static int | |||
1487 | state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1287 | state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1488 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | 1288 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) |
1489 | { | 1289 | { |
1490 | PGresult *res; | ||
1491 | |||
1492 | struct Plugin *plugin = cls; | 1290 | struct Plugin *plugin = cls; |
1493 | int ret = GNUNET_SYSERR; | ||
1494 | 1291 | ||
1495 | const char *stmt = "select_state_one"; | 1292 | const char *stmt = "select_state_one"; |
1496 | 1293 | ||
@@ -1503,44 +1300,80 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | |||
1503 | void *value_current = NULL; | 1300 | void *value_current = NULL; |
1504 | size_t value_size = 0; | 1301 | size_t value_size = 0; |
1505 | 1302 | ||
1506 | struct GNUNET_PQ_ResultSpec results[] = { | 1303 | struct GNUNET_PQ_ResultSpec results_select[] = { |
1507 | GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), | 1304 | GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), |
1508 | GNUNET_PQ_result_spec_end | 1305 | GNUNET_PQ_result_spec_end |
1509 | }; | 1306 | }; |
1510 | 1307 | ||
1511 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 1308 | if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != |
1512 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1309 | GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, |
1513 | res, | 1310 | params_select, results_select)) |
1514 | PGRES_TUPLES_OK, | 1311 | return GNUNET_SYSERR; |
1515 | "PQexecPrepared", stmt)) | ||
1516 | { | ||
1517 | return GNUNET_SYSERR; | ||
1518 | } | ||
1519 | 1312 | ||
1520 | if (PQntuples (res) == 0) | 1313 | return cb (cb_cls, name, value_current, |
1521 | { | 1314 | value_size); |
1522 | PQclear (res); | 1315 | } |
1523 | ret = GNUNET_NO; | ||
1524 | } | ||
1525 | 1316 | ||
1526 | ret = GNUNET_PQ_extract_result (res, results, 0); | ||
1527 | 1317 | ||
1528 | if (GNUNET_OK != ret) | 1318 | |
1319 | /** | ||
1320 | * Closure for #get_state_cb. | ||
1321 | */ | ||
1322 | struct GetStateContext { | ||
1323 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; | ||
1324 | // const char *name, | ||
1325 | GNUNET_PSYCSTORE_StateCallback cb; | ||
1326 | void *cb_cls; | ||
1327 | |||
1328 | const char *value_id; | ||
1329 | |||
1330 | /* I preserved this but I do not see the point since | ||
1331 | * it cannot stop the loop early and gets overwritten ?? */ | ||
1332 | int ret; | ||
1333 | }; | ||
1334 | |||
1335 | |||
1336 | /** | ||
1337 | * Callback that retrieves the results of a SELECT statement | ||
1338 | * reading form the state table. | ||
1339 | * | ||
1340 | * Only passed to GNUNET_PQ_eval_prepared_multi_select and | ||
1341 | * has type GNUNET_PQ_PostgresResultHandler. | ||
1342 | * | ||
1343 | * @param cls closure | ||
1344 | * @param result the postgres result | ||
1345 | * @param num_result the number of results in @a result | ||
1346 | */ | ||
1347 | static void | ||
1348 | get_state_cb (void *cls, | ||
1349 | PGresult *res, | ||
1350 | unsigned int num_results) | ||
1351 | { | ||
1352 | struct GetStateContext *c = cls; | ||
1353 | |||
1354 | for (unsigned int i=0;i<num_results;i++) | ||
1529 | { | 1355 | { |
1530 | PQclear (res); | 1356 | char *name = ""; |
1531 | return GNUNET_SYSERR; | 1357 | void *value = NULL; |
1532 | } | 1358 | size_t value_size = 0; |
1533 | 1359 | ||
1534 | ret = cb (cb_cls, name, value_current, | 1360 | struct GNUNET_PQ_ResultSpec results[] = { |
1535 | value_size); | 1361 | GNUNET_PQ_result_spec_string ("name", &name), |
1362 | GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size), | ||
1363 | GNUNET_PQ_result_spec_end | ||
1364 | }; | ||
1536 | 1365 | ||
1537 | GNUNET_PQ_cleanup_result(results); | 1366 | if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i)) |
1538 | PQclear (res); | 1367 | { |
1368 | GNUNET_PQ_cleanup_result(results); /* previously invoked via PQclear?? */ | ||
1369 | break; /* nothing more?? */ | ||
1370 | } | ||
1539 | 1371 | ||
1540 | return ret; | 1372 | c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size); |
1373 | GNUNET_PQ_cleanup_result(results); | ||
1374 | } | ||
1541 | } | 1375 | } |
1542 | 1376 | ||
1543 | |||
1544 | /** | 1377 | /** |
1545 | * Retrieve all state variables for a channel with the given prefix. | 1378 | * Retrieve all state variables for a channel with the given prefix. |
1546 | * | 1379 | * |
@@ -1553,9 +1386,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1553 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, | 1386 | const char *name, GNUNET_PSYCSTORE_StateCallback cb, |
1554 | void *cb_cls) | 1387 | void *cb_cls) |
1555 | { | 1388 | { |
1556 | PGresult *res; | ||
1557 | struct Plugin *plugin = cls; | 1389 | struct Plugin *plugin = cls; |
1558 | int ret = GNUNET_NO; | ||
1559 | 1390 | ||
1560 | const char *stmt = "select_state_prefix"; | 1391 | const char *stmt = "select_state_prefix"; |
1561 | 1392 | ||
@@ -1569,42 +1400,18 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1569 | GNUNET_PQ_query_param_end | 1400 | GNUNET_PQ_query_param_end |
1570 | }; | 1401 | }; |
1571 | 1402 | ||
1572 | char *name2 = ""; | 1403 | struct GetStateContext gsc = { |
1573 | void *value_current = NULL; | 1404 | .cb = cb, |
1574 | size_t value_size = 0; | 1405 | .cb_cls = cb_cls, |
1575 | 1406 | .value_id = "value_current", | |
1576 | struct GNUNET_PQ_ResultSpec results[] = { | 1407 | .ret = GNUNET_NO |
1577 | GNUNET_PQ_result_spec_string ("name", &name2), | ||
1578 | GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), | ||
1579 | GNUNET_PQ_result_spec_end | ||
1580 | }; | 1408 | }; |
1581 | 1409 | ||
1582 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 1410 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
1583 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1411 | stmt, params_select, |
1584 | res, | 1412 | &get_state_cb, &gsc)) |
1585 | PGRES_TUPLES_OK, | ||
1586 | "PQexecPrepared", stmt)) | ||
1587 | { | ||
1588 | return GNUNET_SYSERR; | 1413 | return GNUNET_SYSERR; |
1589 | } | 1414 | return gsc.ret; /* GNUNET_OK ?? */ |
1590 | |||
1591 | int nrows = PQntuples (res); | ||
1592 | for (int row = 0; row < nrows; row++) | ||
1593 | { | ||
1594 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) | ||
1595 | { | ||
1596 | break; | ||
1597 | } | ||
1598 | |||
1599 | ret = cb (cb_cls, (const char *) name2, | ||
1600 | value_current, | ||
1601 | value_size); | ||
1602 | GNUNET_PQ_cleanup_result(results); | ||
1603 | } | ||
1604 | |||
1605 | PQclear (res); | ||
1606 | |||
1607 | return ret; | ||
1608 | } | 1415 | } |
1609 | 1416 | ||
1610 | 1417 | ||
@@ -1620,9 +1427,7 @@ state_get_signed (void *cls, | |||
1620 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1427 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1621 | GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) | 1428 | GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) |
1622 | { | 1429 | { |
1623 | PGresult *res; | ||
1624 | struct Plugin *plugin = cls; | 1430 | struct Plugin *plugin = cls; |
1625 | int ret = GNUNET_NO; | ||
1626 | 1431 | ||
1627 | const char *stmt = "select_state_signed"; | 1432 | const char *stmt = "select_state_signed"; |
1628 | 1433 | ||
@@ -1631,43 +1436,18 @@ state_get_signed (void *cls, | |||
1631 | GNUNET_PQ_query_param_end | 1436 | GNUNET_PQ_query_param_end |
1632 | }; | 1437 | }; |
1633 | 1438 | ||
1634 | char *name = ""; | 1439 | struct GetStateContext gsc = { |
1635 | void *value_signed = NULL; | 1440 | .cb = cb, |
1636 | size_t value_size = 0; | 1441 | .cb_cls = cb_cls, |
1637 | 1442 | .value_id = "value_signed", | |
1638 | struct GNUNET_PQ_ResultSpec results[] = { | 1443 | .ret = GNUNET_NO |
1639 | GNUNET_PQ_result_spec_string ("name", &name), | ||
1640 | GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size), | ||
1641 | GNUNET_PQ_result_spec_end | ||
1642 | }; | 1444 | }; |
1643 | 1445 | ||
1644 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 1446 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
1645 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1447 | stmt, params_select, |
1646 | res, | 1448 | &get_state_cb, &gsc)) |
1647 | PGRES_TUPLES_OK, | ||
1648 | "PQexecPrepared", stmt)) | ||
1649 | { | ||
1650 | return GNUNET_SYSERR; | 1449 | return GNUNET_SYSERR; |
1651 | } | 1450 | return gsc.ret; /* GNUNET_OK ?? */ |
1652 | |||
1653 | int nrows = PQntuples (res); | ||
1654 | for (int row = 0; row < nrows; row++) | ||
1655 | { | ||
1656 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) | ||
1657 | { | ||
1658 | break; | ||
1659 | } | ||
1660 | |||
1661 | ret = cb (cb_cls, (const char *) name, | ||
1662 | value_signed, | ||
1663 | value_size); | ||
1664 | |||
1665 | GNUNET_PQ_cleanup_result (results); | ||
1666 | } | ||
1667 | |||
1668 | PQclear (res); | ||
1669 | |||
1670 | return ret; | ||
1671 | } | 1451 | } |
1672 | 1452 | ||
1673 | 1453 | ||