aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore/plugin_psycstore_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psycstore/plugin_psycstore_postgres.c')
-rw-r--r--src/psycstore/plugin_psycstore_postgres.c1198
1 files changed, 489 insertions, 709 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c
index 273ab4e80..b8010af0a 100644
--- a/src/psycstore/plugin_psycstore_postgres.c
+++ b/src/psycstore/plugin_psycstore_postgres.c
@@ -25,6 +25,7 @@
25 * @author Gabor X Toth 25 * @author Gabor X Toth
26 * @author Christian Grothoff 26 * @author Christian Grothoff
27 * @author Christophe Genevey 27 * @author Christophe Genevey
28 * @author Jeffrey Burdges
28 */ 29 */
29 30
30#include "platform.h" 31#include "platform.h"
@@ -34,7 +35,6 @@
34#include "gnunet_crypto_lib.h" 35#include "gnunet_crypto_lib.h"
35#include "gnunet_psyc_util_lib.h" 36#include "gnunet_psyc_util_lib.h"
36#include "psycstore.h" 37#include "psycstore.h"
37#include "gnunet_postgres_lib.h"
38#include "gnunet_pq_lib.h" 38#include "gnunet_pq_lib.h"
39 39
40/** 40/**
@@ -84,342 +84,276 @@ struct Plugin
84 * as needed as well). 84 * as needed as well).
85 * 85 *
86 * @param plugin the plugin context (state for this module) 86 * @param plugin the plugin context (state for this module)
87 * @return GNUNET_OK on success 87 * @return #GNUNET_OK on success
88 */ 88 */
89static int 89static int
90database_setup (struct Plugin *plugin) 90database_setup (struct Plugin *plugin)
91{ 91{
92 struct GNUNET_PQ_ExecuteStatement es[] = {
93 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n"
94 " id SERIAL,\n"
95 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
96 " max_state_message_id BIGINT,\n"
97 " state_hash_message_id BIGINT,\n"
98 " PRIMARY KEY(id)\n"
99 ")"
100 "WITH OIDS"),
101 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
102 " ON channels (pub_key)"),
103 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
104 " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
105 "RETURNS NULL ON NULL INPUT"),
106 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n"
107 " id SERIAL,\n"
108 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
109 " PRIMARY KEY(id)\n"
110 ")"
111 "WITH OIDS"),
112 GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
113 " ON slaves (pub_key)"),
114 GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
115 " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
116 "RETURNS NULL ON NULL INPUT"),
117 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n"
118 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
119 " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
120 " did_join INT NOT NULL,\n"
121 " announced_at BIGINT NOT NULL,\n"
122 " effective_since BIGINT NOT NULL,\n"
123 " group_generation BIGINT NOT NULL\n"
124 ")"
125 "WITH OIDS"),
126 GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
127 "ON membership (channel_id, slave_id)"),
128 /** @todo messages table: add method_name column */
129 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n"
130 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
131 " hop_counter INT NOT NULL,\n"
132 " signature BYTEA CHECK (LENGTH(signature)=64),\n"
133 " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
134 " fragment_id BIGINT NOT NULL,\n"
135 " fragment_offset BIGINT NOT NULL,\n"
136 " message_id BIGINT NOT NULL,\n"
137 " group_generation BIGINT NOT NULL,\n"
138 " multicast_flags INT NOT NULL,\n"
139 " psycstore_flags INT NOT NULL,\n"
140 " data BYTEA,\n"
141 " PRIMARY KEY (channel_id, fragment_id),\n"
142 " UNIQUE (channel_id, message_id, fragment_offset)\n"
143 ")"
144 "WITH OIDS"),
145 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n"
146 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
147 " name TEXT NOT NULL,\n"
148 " value_current BYTEA,\n"
149 " value_signed BYTEA,\n"
150 " PRIMARY KEY (channel_id, name)\n"
151 ")"
152 "WITH OIDS"),
153 GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n"
154 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
155 " name TEXT NOT NULL,\n"
156 " value BYTEA,\n"
157 " PRIMARY KEY (channel_id, name)\n"
158 ")"
159 "WITH OIDS"),
160 GNUNET_PQ_EXECUTE_STATEMENT_END
161 };
162
92 /* Open database and precompile statements */ 163 /* Open database and precompile statements */
93 plugin->dbh = GNUNET_POSTGRES_connect (plugin->cfg, 164 plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg,
94 "psycstore-postgres"); 165 "psycstore-postgres");
95 if (NULL == plugin->dbh) 166 if (NULL == plugin->dbh)
96 return GNUNET_SYSERR; 167 return GNUNET_SYSERR;
97 168 if (GNUNET_OK !=
98 /* Create tables */ 169 GNUNET_PQ_exec_statements (plugin->dbh,
99 if ((GNUNET_OK != 170 es))
100 GNUNET_POSTGRES_exec(plugin->dbh,
101 "CREATE TABLE IF NOT EXISTS channels (\n"
102 " id SERIAL,\n"
103 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
104 " max_state_message_id BIGINT,\n"
105 " state_hash_message_id BIGINT,\n"
106 " PRIMARY KEY(id)\n"
107 ")" "WITH OIDS")) ||
108
109 (GNUNET_OK !=
110 GNUNET_POSTGRES_exec(plugin->dbh,
111 "CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
112 " ON channels (pub_key)")) ||
113
114 (GNUNET_OK !=
115 GNUNET_POSTGRES_exec(plugin->dbh,
116 "CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n"
117 " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
118 "RETURNS NULL ON NULL INPUT")) ||
119
120 (GNUNET_OK !=
121 GNUNET_POSTGRES_exec(plugin->dbh,
122 "CREATE TABLE IF NOT EXISTS slaves (\n"
123 " id SERIAL,\n"
124 " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
125 " PRIMARY KEY(id)\n"
126 ")" "WITH OIDS")) ||
127
128 (GNUNET_OK !=
129 GNUNET_POSTGRES_exec(plugin->dbh,
130 "CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
131 " ON slaves (pub_key)")) ||
132
133 (GNUNET_OK !=
134 GNUNET_POSTGRES_exec(plugin->dbh,
135 "CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n"
136 " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n"
137 "RETURNS NULL ON NULL INPUT")) ||
138
139 (GNUNET_OK !=
140 GNUNET_POSTGRES_exec(plugin->dbh,
141 "CREATE TABLE IF NOT EXISTS membership (\n"
142 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
143 " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
144 " did_join INT NOT NULL,\n"
145 " announced_at BIGINT NOT NULL,\n"
146 " effective_since BIGINT NOT NULL,\n"
147 " group_generation BIGINT NOT NULL\n"
148 ")" "WITH OIDS")) ||
149
150 (GNUNET_OK !=
151 GNUNET_POSTGRES_exec(plugin->dbh,
152 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
153 "ON membership (channel_id, slave_id)")) ||
154
155 /** @todo messages table: add method_name column */
156 (GNUNET_OK !=
157 GNUNET_POSTGRES_exec(plugin->dbh,
158 "CREATE TABLE IF NOT EXISTS messages (\n"
159 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
160 " hop_counter INT NOT NULL,\n"
161 " signature BYTEA CHECK (LENGTH(signature)=64),\n"
162 " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
163 " fragment_id BIGINT NOT NULL,\n"
164 " fragment_offset BIGINT NOT NULL,\n"
165 " message_id BIGINT NOT NULL,\n"
166 " group_generation BIGINT NOT NULL,\n"
167 " multicast_flags INT NOT NULL,\n"
168 " psycstore_flags INT NOT NULL,\n"
169 " data BYTEA,\n"
170 " PRIMARY KEY (channel_id, fragment_id),\n"
171 " UNIQUE (channel_id, message_id, fragment_offset)\n"
172 ")" "WITH OIDS")) ||
173
174 (GNUNET_OK !=
175 GNUNET_POSTGRES_exec(plugin->dbh,
176 "CREATE TABLE IF NOT EXISTS state (\n"
177 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
178 " name TEXT NOT NULL,\n"
179 " value_current BYTEA,\n"
180 " value_signed BYTEA,\n"
181 " PRIMARY KEY (channel_id, name)\n"
182 ")" "WITH OIDS")) ||
183 (GNUNET_OK !=
184 GNUNET_POSTGRES_exec(plugin->dbh,
185 "CREATE TABLE IF NOT EXISTS state_sync (\n"
186 " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
187 " name TEXT NOT NULL,\n"
188 " value BYTEA,\n"
189 " PRIMARY KEY (channel_id, name)\n"
190 ")" "WITH OIDS")))
191 { 171 {
192 PQfinish (plugin->dbh); 172 PQfinish (plugin->dbh);
193 plugin->dbh = NULL; 173 plugin->dbh = NULL;
194 return GNUNET_SYSERR; 174 return GNUNET_SYSERR;
195 } 175 }
196 176
197
198 /* Prepare statements */ 177 /* Prepare statements */
199 if ((GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 178 {
200 "transaction_begin", 179 struct GNUNET_PQ_PreparedStatement ps[] = {
201 "BEGIN", 0)) || 180 GNUNET_PQ_make_prepare ("transaction_begin",
202 181 "BEGIN", 0),
203 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 182 GNUNET_PQ_make_prepare ("transaction_commit",
204 "transaction_commit", 183 "COMMIT", 0),
205 "COMMIT", 0)) || 184 GNUNET_PQ_make_prepare ("transaction_rollback",
206 185 "ROLLBACK", 0),
207 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 186 GNUNET_PQ_make_prepare ("insert_channel_key",
208 "transaction_rollback", 187 "INSERT INTO channels (pub_key) VALUES ($1)"
209 "ROLLBACK", 0)) || 188 " ON CONFLICT DO NOTHING", 1),
210 189 GNUNET_PQ_make_prepare ("insert_slave_key",
211 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 190 "INSERT INTO slaves (pub_key) VALUES ($1)"
212 "insert_channel_key", 191 " ON CONFLICT DO NOTHING", 1),
213 "INSERT INTO channels (pub_key) VALUES ($1)" 192 GNUNET_PQ_make_prepare ("insert_membership",
214 " ON CONFLICT DO NOTHING", 1)) || 193 "INSERT INTO membership\n"
215 194 " (channel_id, slave_id, did_join, announced_at,\n"
216 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 195 " effective_since, group_generation)\n"
217 "insert_slave_key", 196 "VALUES (get_chan_id($1),\n"
218 "INSERT INTO slaves (pub_key) VALUES ($1)" 197 " get_slave_id($2),\n"
219 " ON CONFLICT DO NOTHING", 1)) || 198 " $3, $4, $5, $6)", 6),
220 199 GNUNET_PQ_make_prepare ("select_membership",
221 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 200 "SELECT did_join FROM membership\n"
222 "insert_membership", 201 "WHERE channel_id = get_chan_id($1)\n"
223 "INSERT INTO membership\n" 202 " AND slave_id = get_slave_id($2)\n"
224 " (channel_id, slave_id, did_join, announced_at,\n" 203 " AND effective_since <= $3 AND did_join = 1\n"
225 " effective_since, group_generation)\n" 204 "ORDER BY announced_at DESC LIMIT 1", 3),
226 "VALUES (get_chan_id($1),\n" 205 GNUNET_PQ_make_prepare ("insert_fragment",
227 " get_slave_id($2),\n" 206 "INSERT INTO messages\n"
228 " $3, $4, $5, $6)", 6)) || 207 " (channel_id, hop_counter, signature, purpose,\n"
229 208 " fragment_id, fragment_offset, message_id,\n"
230 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 209 " group_generation, multicast_flags, psycstore_flags, data)\n"
231 "select_membership", 210 "VALUES (get_chan_id($1),\n"
232 "SELECT did_join FROM membership\n" 211 " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
233 "WHERE channel_id = get_chan_id($1)\n" 212 "ON CONFLICT DO NOTHING", 11),
234 " AND slave_id = get_slave_id($2)\n" 213 GNUNET_PQ_make_prepare ("update_message_flags",
235 " AND effective_since <= $3 AND did_join = 1\n" 214 "UPDATE messages\n"
236 "ORDER BY announced_at DESC LIMIT 1", 3)) || 215 "SET psycstore_flags = psycstore_flags | $1\n"
237 216 "WHERE channel_id = get_chan_id($2) \n"
238 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 217 " AND message_id = $3 AND fragment_offset = 0", 3),
239 "insert_fragment", 218 GNUNET_PQ_make_prepare ("select_fragments",
240 "INSERT INTO messages\n" 219 "SELECT hop_counter, signature, purpose, fragment_id,\n"
241 " (channel_id, hop_counter, signature, purpose,\n" 220 " fragment_offset, message_id, group_generation,\n"
242 " fragment_id, fragment_offset, message_id,\n" 221 " multicast_flags, psycstore_flags, data\n"
243 " group_generation, multicast_flags, psycstore_flags, data)\n" 222 "FROM messages\n"
244 "VALUES (get_chan_id($1),\n" 223 "WHERE channel_id = get_chan_id($1) \n"
245 " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" 224 " AND $2 <= fragment_id AND fragment_id <= $3", 3),
246 "ON CONFLICT DO NOTHING", 11)) ||
247
248 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
249 "update_message_flags",
250 "UPDATE messages\n"
251 "SET psycstore_flags = psycstore_flags | $1\n"
252 "WHERE channel_id = get_chan_id($2) \n"
253 " AND message_id = $3 AND fragment_offset = 0", 3)) ||
254
255 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
256 "select_fragments",
257 "SELECT hop_counter, signature, purpose, fragment_id,\n"
258 " fragment_offset, message_id, group_generation,\n"
259 " multicast_flags, psycstore_flags, data\n"
260 "FROM messages\n"
261 "WHERE channel_id = get_chan_id($1) \n"
262 " AND $2 <= fragment_id AND fragment_id <= $3", 3)) ||
263
264 /** @todo select_messages: add method_prefix filter */ 225 /** @todo select_messages: add method_prefix filter */
265 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 226 GNUNET_PQ_make_prepare ("select_messages",
266 "select_messages", 227 "SELECT hop_counter, signature, purpose, fragment_id,\n"
267 "SELECT hop_counter, signature, purpose, fragment_id,\n" 228 " fragment_offset, message_id, group_generation,\n"
268 " fragment_offset, message_id, group_generation,\n" 229 " multicast_flags, psycstore_flags, data\n"
269 " multicast_flags, psycstore_flags, data\n" 230 "FROM messages\n"
270 "FROM messages\n" 231 "WHERE channel_id = get_chan_id($1) \n"
271 "WHERE channel_id = get_chan_id($1) \n" 232 " AND $2 <= message_id AND message_id <= $3\n"
272 " AND $2 <= message_id AND message_id <= $3\n" 233 "LIMIT $4;", 4),
273 "LIMIT $4;", 4)) ||
274
275 /** @todo select_latest_messages: add method_prefix filter */ 234 /** @todo select_latest_messages: add method_prefix filter */
276 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 235 GNUNET_PQ_make_prepare ("select_latest_fragments",
277 "select_latest_fragments", 236 "SELECT rev.hop_counter AS hop_counter,\n"
278 "SELECT rev.hop_counter AS hop_counter,\n" 237 " rev.signature AS signature,\n"
279 " rev.signature AS signature,\n" 238 " rev.purpose AS purpose,\n"
280 " rev.purpose AS purpose,\n" 239 " rev.fragment_id AS fragment_id,\n"
281 " rev.fragment_id AS fragment_id,\n" 240 " rev.fragment_offset AS fragment_offset,\n"
282 " rev.fragment_offset AS fragment_offset,\n" 241 " rev.message_id AS message_id,\n"
283 " rev.message_id AS message_id,\n" 242 " rev.group_generation AS group_generation,\n"
284 " rev.group_generation AS group_generation,\n" 243 " rev.multicast_flags AS multicast_flags,\n"
285 " rev.multicast_flags AS multicast_flags,\n" 244 " rev.psycstore_flags AS psycstore_flags,\n"
286 " rev.psycstore_flags AS psycstore_flags,\n" 245 " rev.data AS data\n"
287 " rev.data AS data\n" 246 " FROM\n"
288 " FROM\n" 247 " (SELECT hop_counter, signature, purpose, fragment_id,\n"
289 " (SELECT hop_counter, signature, purpose, fragment_id,\n" 248 " fragment_offset, message_id, group_generation,\n"
290 " fragment_offset, message_id, group_generation,\n" 249 " multicast_flags, psycstore_flags, data \n"
291 " multicast_flags, psycstore_flags, data \n" 250 " FROM messages\n"
292 " FROM messages\n" 251 " WHERE channel_id = get_chan_id($1) \n"
293 " WHERE channel_id = get_chan_id($1) \n" 252 " ORDER BY fragment_id DESC\n"
294 " ORDER BY fragment_id DESC\n" 253 " LIMIT $2) AS rev\n"
295 " LIMIT $2) AS rev\n" 254 " ORDER BY rev.fragment_id;", 2),
296 " ORDER BY rev.fragment_id;", 2)) || 255 GNUNET_PQ_make_prepare ("select_latest_messages",
297 256 "SELECT hop_counter, signature, purpose, fragment_id,\n"
298 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 257 " fragment_offset, message_id, group_generation,\n"
299 "select_latest_messages", 258 " multicast_flags, psycstore_flags, data\n"
300 "SELECT hop_counter, signature, purpose, fragment_id,\n" 259 "FROM messages\n"
301 " fragment_offset, message_id, group_generation,\n" 260 "WHERE channel_id = get_chan_id($1)\n"
302 " multicast_flags, psycstore_flags, data\n" 261 " AND message_id IN\n"
303 "FROM messages\n" 262 " (SELECT message_id\n"
304 "WHERE channel_id = get_chan_id($1)\n" 263 " FROM messages\n"
305 " AND message_id IN\n" 264 " WHERE channel_id = get_chan_id($2) \n"
306 " (SELECT message_id\n" 265 " GROUP BY message_id\n"
307 " FROM messages\n" 266 " ORDER BY message_id\n"
308 " WHERE channel_id = get_chan_id($2) \n" 267 " DESC LIMIT $3)\n"
309 " GROUP BY message_id\n" 268 "ORDER BY fragment_id", 3),
310 " ORDER BY message_id\n" 269 GNUNET_PQ_make_prepare ("select_message_fragment",
311 " DESC LIMIT $3)\n" 270 "SELECT hop_counter, signature, purpose, fragment_id,\n"
312 "ORDER BY fragment_id", 3)) || 271 " fragment_offset, message_id, group_generation,\n"
313 272 " multicast_flags, psycstore_flags, data\n"
314 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 273 "FROM messages\n"
315 "select_message_fragment", 274 "WHERE channel_id = get_chan_id($1) \n"
316 "SELECT hop_counter, signature, purpose, fragment_id,\n" 275 " AND message_id = $2 AND fragment_offset = $3", 3),
317 " fragment_offset, message_id, group_generation,\n" 276 GNUNET_PQ_make_prepare ("select_counters_message",
318 " multicast_flags, psycstore_flags, data\n" 277 "SELECT fragment_id, message_id, group_generation\n"
319 "FROM messages\n" 278 "FROM messages\n"
320 "WHERE channel_id = get_chan_id($1) \n" 279 "WHERE channel_id = get_chan_id($1)\n"
321 " AND message_id = $2 AND fragment_offset = $3", 3)) || 280 "ORDER BY fragment_id DESC LIMIT 1", 1),
322 281 GNUNET_PQ_make_prepare ("select_counters_state",
323 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 282 "SELECT max_state_message_id\n"
324 "select_counters_message", 283 "FROM channels\n"
325 "SELECT fragment_id, message_id, group_generation\n" 284 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1),
326 "FROM messages\n" 285 GNUNET_PQ_make_prepare ("update_max_state_message_id",
327 "WHERE channel_id = get_chan_id($1)\n" 286 "UPDATE channels\n"
328 "ORDER BY fragment_id DESC LIMIT 1", 1)) || 287 "SET max_state_message_id = $1\n"
329 288 "WHERE pub_key = $2", 2),
330 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 289
331 "select_counters_state", 290 GNUNET_PQ_make_prepare ("update_state_hash_message_id",
332 "SELECT max_state_message_id\n" 291 "UPDATE channels\n"
333 "FROM channels\n" 292 "SET state_hash_message_id = $1\n"
334 "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) || 293 "WHERE pub_key = $2", 2),
335 294 GNUNET_PQ_make_prepare ("insert_state_current",
336 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 295 "INSERT INTO state\n"
337 "update_max_state_message_id", 296 " (channel_id, name, value_current, value_signed)\n"
338 "UPDATE channels\n" 297 "SELECT new.channel_id, new.name,\n"
339 "SET max_state_message_id = $1\n" 298 " new.value_current, old.value_signed\n"
340 "WHERE pub_key = $2", 2)) || 299 "FROM (SELECT get_chan_id($1) AS channel_id,\n"
341 300 " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
342 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 301 "LEFT JOIN (SELECT channel_id, name, value_signed\n"
343 "update_state_hash_message_id", 302 " FROM state) AS old\n"
344 "UPDATE channels\n" 303 "ON new.channel_id = old.channel_id AND new.name = old.name\n"
345 "SET state_hash_message_id = $1\n" 304 "ON CONFLICT (channel_id, name)\n"
346 "WHERE pub_key = $2", 2)) || 305 " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
347 306 " value_signed = EXCLUDED.value_signed", 3),
348 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 307 GNUNET_PQ_make_prepare ("delete_state_empty",
349 "insert_state_current", 308 "DELETE FROM state\n"
350 "INSERT INTO state\n" 309 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
351 " (channel_id, name, value_current, value_signed)\n" 310 " AND (value_current IS NULL OR length(value_current) = 0)\n"
352 "SELECT new.channel_id, new.name,\n" 311 " AND (value_signed IS NULL OR length(value_signed) = 0)", 1),
353 " new.value_current, old.value_signed\n" 312 GNUNET_PQ_make_prepare ("update_state_signed",
354 "FROM (SELECT get_chan_id($1) AS channel_id,\n" 313 "UPDATE state\n"
355 " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" 314 "SET value_signed = value_current\n"
356 "LEFT JOIN (SELECT channel_id, name, value_signed\n" 315 "WHERE channel_id = get_chan_id($1) ", 1),
357 " FROM state) AS old\n" 316 GNUNET_PQ_make_prepare ("delete_state",
358 "ON new.channel_id = old.channel_id AND new.name = old.name\n" 317 "DELETE FROM state\n"
359 "ON CONFLICT (channel_id, name)\n" 318 "WHERE channel_id = get_chan_id($1) ", 1),
360 " DO UPDATE SET value_current = EXCLUDED.value_current,\n" 319 GNUNET_PQ_make_prepare ("insert_state_sync",
361 " value_signed = EXCLUDED.value_signed", 3)) || 320 "INSERT INTO state_sync (channel_id, name, value)\n"
362 321 "VALUES (get_chan_id($1), $2, $3)", 3),
363 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 322 GNUNET_PQ_make_prepare ("insert_state_from_sync",
364 "delete_state_empty", 323 "INSERT INTO state\n"
365 "DELETE FROM state\n" 324 " (channel_id, name, value_current, value_signed)\n"
366 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" 325 "SELECT channel_id, name, value, value\n"
367 " AND (value_current IS NULL OR length(value_current) = 0)\n" 326 "FROM state_sync\n"
368 " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) || 327 "WHERE channel_id = get_chan_id($1)", 1),
369 328 GNUNET_PQ_make_prepare ("delete_state_sync",
370 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 329 "DELETE FROM state_sync\n"
371 "update_state_signed", 330 "WHERE channel_id = get_chan_id($1)", 1),
372 "UPDATE state\n" 331 GNUNET_PQ_make_prepare ("select_state_one",
373 "SET value_signed = value_current\n" 332 "SELECT value_current\n"
374 "WHERE channel_id = get_chan_id($1) ", 1)) || 333 "FROM state\n"
375 334 "WHERE channel_id = get_chan_id($1)\n"
376 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 335 " AND name = $2", 2),
377 "delete_state", 336 GNUNET_PQ_make_prepare ("select_state_prefix",
378 "DELETE FROM state\n" 337 "SELECT name, value_current\n"
379 "WHERE channel_id = get_chan_id($1) ", 1)) || 338 "FROM state\n"
380 339 "WHERE channel_id = get_chan_id($1)\n"
381 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 340 " AND (name = $2 OR substr(name, 1, $3) = $4)", 4),
382 "insert_state_sync", 341 GNUNET_PQ_make_prepare ("select_state_signed",
383 "INSERT INTO state_sync (channel_id, name, value)\n" 342 "SELECT name, value_signed\n"
384 "VALUES (get_chan_id($1), $2, $3)", 3)) || 343 "FROM state\n"
385 344 "WHERE channel_id = get_chan_id($1)\n"
386 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 345 " AND value_signed IS NOT NULL", 1),
387 "insert_state_from_sync", 346 GNUNET_PQ_PREPARED_STATEMENT_END
388 "INSERT INTO state\n" 347 };
389 " (channel_id, name, value_current, value_signed)\n" 348
390 "SELECT channel_id, name, value, value\n" 349 if (GNUNET_OK !=
391 "FROM state_sync\n" 350 GNUNET_PQ_prepare_statements (plugin->dbh,
392 "WHERE channel_id = get_chan_id($1)", 1)) || 351 ps))
393 352 {
394 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh, 353 PQfinish (plugin->dbh);
395 "delete_state_sync", 354 plugin->dbh = NULL;
396 "DELETE FROM state_sync\n" 355 return GNUNET_SYSERR;
397 "WHERE channel_id = get_chan_id($1)", 1)) || 356 }
398
399 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
400 "select_state_one",
401 "SELECT value_current\n"
402 "FROM state\n"
403 "WHERE channel_id = get_chan_id($1)\n"
404 " AND name = $2", 2)) ||
405
406 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
407 "select_state_prefix",
408 "SELECT name, value_current\n"
409 "FROM state\n"
410 "WHERE channel_id = get_chan_id($1)\n"
411 " AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) ||
412
413 (GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
414 "select_state_signed",
415 "SELECT name, value_signed\n"
416 "FROM state\n"
417 "WHERE channel_id = get_chan_id($1)\n"
418 " AND value_signed IS NOT NULL", 1)))
419 {
420 PQfinish (plugin->dbh);
421 plugin->dbh = NULL;
422 return GNUNET_SYSERR;
423 } 357 }
424 358
425 return GNUNET_OK; 359 return GNUNET_OK;
@@ -452,22 +386,15 @@ static int
452exec_channel (struct Plugin *plugin, const char *stmt, 386exec_channel (struct Plugin *plugin, const char *stmt,
453 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) 387 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
454{ 388{
455 PGresult *ret;
456 struct GNUNET_PQ_QueryParam params[] = { 389 struct GNUNET_PQ_QueryParam params[] = {
457 GNUNET_PQ_query_param_auto_from_type (channel_key), 390 GNUNET_PQ_query_param_auto_from_type (channel_key),
458 GNUNET_PQ_query_param_end 391 GNUNET_PQ_query_param_end
459 }; 392 };
460 393
461 ret = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); 394 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
462 if (GNUNET_OK != 395 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
463 GNUNET_POSTGRES_check_result (plugin->dbh,
464 ret,
465 PGRES_COMMAND_OK,
466 "PQexecPrepared", stmt))
467 return GNUNET_SYSERR; 396 return GNUNET_SYSERR;
468 397
469 PQclear (ret);
470
471 return GNUNET_OK; 398 return GNUNET_OK;
472} 399}
473 400
@@ -478,23 +405,15 @@ exec_channel (struct Plugin *plugin, const char *stmt,
478static int 405static int
479transaction_begin (struct Plugin *plugin, enum Transactions transaction) 406transaction_begin (struct Plugin *plugin, enum Transactions transaction)
480{ 407{
481 PGresult *ret;
482 struct GNUNET_PQ_QueryParam params[] = { 408 struct GNUNET_PQ_QueryParam params[] = {
483 GNUNET_PQ_query_param_end 409 GNUNET_PQ_query_param_end
484 }; 410 };
485 411
486 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_begin", params); 412 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
487 if (GNUNET_OK != 413 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params))
488 GNUNET_POSTGRES_check_result (plugin->dbh,
489 ret,
490 PGRES_COMMAND_OK,
491 "PQexecPrepared", "transaction_begin"))
492 {
493 return GNUNET_SYSERR; 414 return GNUNET_SYSERR;
494 }
495 415
496 plugin->transaction = transaction; 416 plugin->transaction = transaction;
497 PQclear (ret);
498 return GNUNET_OK; 417 return GNUNET_OK;
499} 418}
500 419
@@ -505,23 +424,14 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction)
505static int 424static int
506transaction_commit (struct Plugin *plugin) 425transaction_commit (struct Plugin *plugin)
507{ 426{
508 PGresult *ret;
509
510 struct GNUNET_PQ_QueryParam params[] = { 427 struct GNUNET_PQ_QueryParam params[] = {
511 GNUNET_PQ_query_param_end 428 GNUNET_PQ_query_param_end
512 }; 429 };
513 430
514 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_commit", params); 431 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
515 if (GNUNET_OK != 432 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params))
516 GNUNET_POSTGRES_check_result (plugin->dbh,
517 ret,
518 PGRES_COMMAND_OK,
519 "PQexecPrepared", "transaction_commit"))
520 {
521 return GNUNET_SYSERR; 433 return GNUNET_SYSERR;
522 }
523 434
524 PQclear (ret);
525 plugin->transaction = TRANSACTION_NONE; 435 plugin->transaction = TRANSACTION_NONE;
526 return GNUNET_OK; 436 return GNUNET_OK;
527} 437}
@@ -533,23 +443,14 @@ transaction_commit (struct Plugin *plugin)
533static int 443static int
534transaction_rollback (struct Plugin *plugin) 444transaction_rollback (struct Plugin *plugin)
535{ 445{
536 PGresult *ret;
537
538 struct GNUNET_PQ_QueryParam params[] = { 446 struct GNUNET_PQ_QueryParam params[] = {
539 GNUNET_PQ_query_param_end 447 GNUNET_PQ_query_param_end
540 }; 448 };
541 449
542 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "transaction_rollback", params); 450 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
543 if (GNUNET_OK != 451 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params))
544 GNUNET_POSTGRES_check_result (plugin->dbh,
545 ret,
546 PGRES_COMMAND_OK,
547 "PQexecPrepared", "transaction_rollback"))
548 {
549 return GNUNET_SYSERR; 452 return GNUNET_SYSERR;
550 }
551 453
552 PQclear (ret);
553 plugin->transaction = TRANSACTION_NONE; 454 plugin->transaction = TRANSACTION_NONE;
554 return GNUNET_OK; 455 return GNUNET_OK;
555} 456}
@@ -559,24 +460,15 @@ static int
559channel_key_store (struct Plugin *plugin, 460channel_key_store (struct Plugin *plugin,
560 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) 461 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key)
561{ 462{
562 PGresult *ret;
563
564 struct GNUNET_PQ_QueryParam params[] = { 463 struct GNUNET_PQ_QueryParam params[] = {
565 GNUNET_PQ_query_param_auto_from_type (channel_key), 464 GNUNET_PQ_query_param_auto_from_type (channel_key),
566 GNUNET_PQ_query_param_end 465 GNUNET_PQ_query_param_end
567 }; 466 };
568 467
569 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_channel_key", params); 468 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
570 if (GNUNET_OK != 469 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_channel_key", params))
571 GNUNET_POSTGRES_check_result (plugin->dbh,
572 ret,
573 PGRES_COMMAND_OK,
574 "PQexecPrepared", "insert_channel_key"))
575 {
576 return GNUNET_SYSERR; 470 return GNUNET_SYSERR;
577 }
578 471
579 PQclear (ret);
580 return GNUNET_OK; 472 return GNUNET_OK;
581} 473}
582 474
@@ -585,24 +477,15 @@ static int
585slave_key_store (struct Plugin *plugin, 477slave_key_store (struct Plugin *plugin,
586 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) 478 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key)
587{ 479{
588 PGresult *ret;
589
590 struct GNUNET_PQ_QueryParam params[] = { 480 struct GNUNET_PQ_QueryParam params[] = {
591 GNUNET_PQ_query_param_auto_from_type (slave_key), 481 GNUNET_PQ_query_param_auto_from_type (slave_key),
592 GNUNET_PQ_query_param_end 482 GNUNET_PQ_query_param_end
593 }; 483 };
594 484
595 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params); 485 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
596 if (GNUNET_OK != 486 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params))
597 GNUNET_POSTGRES_check_result (plugin->dbh,
598 ret,
599 PGRES_COMMAND_OK,
600 "PQexecPrepared", "insert_slave_key"))
601 {
602 return GNUNET_SYSERR; 487 return GNUNET_SYSERR;
603 }
604 488
605 PQclear (ret);
606 return GNUNET_OK; 489 return GNUNET_OK;
607} 490}
608 491
@@ -624,7 +507,6 @@ postgres_membership_store (void *cls,
624 uint64_t effective_since, 507 uint64_t effective_since,
625 uint64_t group_generation) 508 uint64_t group_generation)
626{ 509{
627 PGresult *ret;
628 struct Plugin *plugin = cls; 510 struct Plugin *plugin = cls;
629 511
630 uint32_t idid_join = (uint32_t)did_join; 512 uint32_t idid_join = (uint32_t)did_join;
@@ -653,17 +535,10 @@ postgres_membership_store (void *cls,
653 GNUNET_PQ_query_param_end 535 GNUNET_PQ_query_param_end
654 }; 536 };
655 537
656 ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params); 538 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
657 if (GNUNET_OK != 539 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_membership", params))
658 GNUNET_POSTGRES_check_result (plugin->dbh,
659 ret,
660 PGRES_COMMAND_OK,
661 "PQexecPrepared", "insert_membership"))
662 {
663 return GNUNET_SYSERR; 540 return GNUNET_SYSERR;
664 }
665 541
666 PQclear (ret);
667 return GNUNET_OK; 542 return GNUNET_OK;
668} 543}
669 544
@@ -681,13 +556,10 @@ membership_test (void *cls,
681 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 556 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
682 uint64_t message_id) 557 uint64_t message_id)
683{ 558{
684 PGresult *res;
685 struct Plugin *plugin = cls; 559 struct Plugin *plugin = cls;
686 560
687 uint32_t did_join = 0; 561 uint32_t did_join = 0;
688 562
689 int ret = GNUNET_SYSERR;
690
691 struct GNUNET_PQ_QueryParam params_select[] = { 563 struct GNUNET_PQ_QueryParam params_select[] = {
692 GNUNET_PQ_query_param_auto_from_type (channel_key), 564 GNUNET_PQ_query_param_auto_from_type (channel_key),
693 GNUNET_PQ_query_param_auto_from_type (slave_key), 565 GNUNET_PQ_query_param_auto_from_type (slave_key),
@@ -695,35 +567,17 @@ membership_test (void *cls,
695 GNUNET_PQ_query_param_end 567 GNUNET_PQ_query_param_end
696 }; 568 };
697 569
698 res = GNUNET_PQ_exec_prepared (plugin->dbh, "select_membership", params_select);
699 if (GNUNET_OK !=
700 GNUNET_POSTGRES_check_result (plugin->dbh,
701 res,
702 PGRES_TUPLES_OK,
703 "PQexecPrepared", "select_membership"))
704 {
705 return GNUNET_SYSERR;
706 }
707
708 struct GNUNET_PQ_ResultSpec results_select[] = { 570 struct GNUNET_PQ_ResultSpec results_select[] = {
709 GNUNET_PQ_result_spec_uint32 ("did_join", &did_join), 571 GNUNET_PQ_result_spec_uint32 ("did_join", &did_join),
710 GNUNET_PQ_result_spec_end 572 GNUNET_PQ_result_spec_end
711 }; 573 };
712 574
713 switch (GNUNET_PQ_extract_result (res, results_select, 0)) 575 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
714 { 576 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership",
715 case GNUNET_OK: 577 params_select, results_select))
716 ret = GNUNET_YES; 578 return GNUNET_SYSERR;
717 break;
718
719 default:
720 ret = GNUNET_NO;
721 break;
722 }
723 579
724 PQclear (res); 580 return GNUNET_OK;
725
726 return ret;
727} 581}
728 582
729/** 583/**
@@ -739,7 +593,6 @@ fragment_store (void *cls,
739 const struct GNUNET_MULTICAST_MessageHeader *msg, 593 const struct GNUNET_MULTICAST_MessageHeader *msg,
740 uint32_t psycstore_flags) 594 uint32_t psycstore_flags)
741{ 595{
742 PGresult *res;
743 struct Plugin *plugin = cls; 596 struct Plugin *plugin = cls;
744 597
745 GNUNET_assert (TRANSACTION_NONE == plugin->transaction); 598 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
@@ -782,15 +635,10 @@ fragment_store (void *cls,
782 GNUNET_PQ_query_param_end 635 GNUNET_PQ_query_param_end
783 }; 636 };
784 637
785 res = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_fragment", params_insert); 638 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
786 if (GNUNET_OK != 639 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert))
787 GNUNET_POSTGRES_check_result (plugin->dbh,
788 res,
789 PGRES_COMMAND_OK,
790 "PQexecPrepared", "insert_fragment"))
791 return GNUNET_SYSERR; 640 return GNUNET_SYSERR;
792 641
793 PQclear (res);
794 return GNUNET_OK; 642 return GNUNET_OK;
795} 643}
796 644
@@ -807,7 +655,6 @@ message_add_flags (void *cls,
807 uint64_t message_id, 655 uint64_t message_id,
808 uint32_t psycstore_flags) 656 uint32_t psycstore_flags)
809{ 657{
810 PGresult *res;
811 struct Plugin *plugin = cls; 658 struct Plugin *plugin = cls;
812 659
813 struct GNUNET_PQ_QueryParam params_update[] = { 660 struct GNUNET_PQ_QueryParam params_update[] = {
@@ -817,74 +664,80 @@ message_add_flags (void *cls,
817 GNUNET_PQ_query_param_end 664 GNUNET_PQ_query_param_end
818 }; 665 };
819 666
820 res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update); 667 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
821 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 668 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update))
822 res,
823 PGRES_COMMAND_OK,
824 "PQexecPrepared","update_message_flags"))
825 return GNUNET_SYSERR; 669 return GNUNET_SYSERR;
826 670
827 PQclear (res);
828 return GNUNET_OK; 671 return GNUNET_OK;
829} 672}
830 673
831 674
832static int 675/**
833fragment_row (struct Plugin *plugin, 676 * Closure for #fragment_rows.
834 const char *stmt, 677 */
835 PGresult *res, 678struct FragmentRowsContext {
836 GNUNET_PSYCSTORE_FragmentCallback cb, 679 GNUNET_PSYCSTORE_FragmentCallback cb;
837 void *cb_cls, 680 void *cb_cls;
838 uint64_t *returned_fragments)
839{
840 uint32_t hop_counter;
841 void *signature = NULL;
842 void *purpose = NULL;
843 size_t signature_size;
844 size_t purpose_size;
845
846 uint64_t fragment_id;
847 uint64_t fragment_offset;
848 uint64_t message_id;
849 uint64_t group_generation;
850 uint32_t flags;
851 void *buf;
852 size_t buf_size;
853 int ret = GNUNET_SYSERR;
854 struct GNUNET_MULTICAST_MessageHeader *mp;
855
856 uint32_t msg_flags;
857
858 struct GNUNET_PQ_ResultSpec results[] = {
859 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
860 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
861 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
862 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
863 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
864 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
865 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
866 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
867 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
868 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
869 GNUNET_PQ_result_spec_end
870 };
871 681
872 if (GNUNET_OK != 682 uint64_t *returned_fragments;
873 GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
874 "PQexecPrepared",
875 stmt))
876 {
877 LOG (GNUNET_ERROR_TYPE_DEBUG,
878 "Failing fragment lookup (postgres error)\n");
879 return GNUNET_SYSERR;
880 }
881 683
882 int nrows = PQntuples (res); 684 /* I preserved this but I do not see the point since
883 for (int row = 0; row < nrows; row++) 685 * it cannot stop the loop early and gets overwritten ?? */
686 int ret;
687};
688
689
690/**
691 * Callback that retrieves the results of a SELECT statement
692 * reading form the messages table.
693 *
694 * Only passed to GNUNET_PQ_eval_prepared_multi_select and
695 * has type GNUNET_PQ_PostgresResultHandler.
696 *
697 * @param cls closure
698 * @param result the postgres result
699 * @param num_result the number of results in @a result
700 */
701void fragment_rows (void *cls,
702 PGresult *res,
703 unsigned int num_results)
704{
705 struct FragmentRowsContext *c = cls;
706
707 for (unsigned int i=0;i<num_results;i++)
884 { 708 {
885 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) 709 uint32_t hop_counter;
710 void *signature = NULL;
711 void *purpose = NULL;
712 size_t signature_size;
713 size_t purpose_size;
714 uint64_t fragment_id;
715 uint64_t fragment_offset;
716 uint64_t message_id;
717 uint64_t group_generation;
718 uint32_t flags;
719 void *buf;
720 size_t buf_size;
721 uint32_t msg_flags;
722 struct GNUNET_PQ_ResultSpec results[] = {
723 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
724 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
725 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
726 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
727 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
728 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
729 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
730 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
731 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
732 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
733 GNUNET_PQ_result_spec_end
734 };
735 struct GNUNET_MULTICAST_MessageHeader *mp;
736
737 if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
886 { 738 {
887 break; 739 GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */
740 break; /* nothing more?? */
888 } 741 }
889 742
890 mp = GNUNET_malloc (sizeof (*mp) + buf_size); 743 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
@@ -893,11 +746,9 @@ fragment_row (struct Plugin *plugin,
893 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 746 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
894 mp->hop_counter = htonl (hop_counter); 747 mp->hop_counter = htonl (hop_counter);
895 GNUNET_memcpy (&mp->signature, 748 GNUNET_memcpy (&mp->signature,
896 signature, 749 signature, signature_size);
897 signature_size);
898 GNUNET_memcpy (&mp->purpose, 750 GNUNET_memcpy (&mp->purpose,
899 purpose, 751 purpose, purpose_size);
900 purpose_size);
901 mp->fragment_id = GNUNET_htonll (fragment_id); 752 mp->fragment_id = GNUNET_htonll (fragment_id);
902 mp->fragment_offset = GNUNET_htonll (fragment_offset); 753 mp->fragment_offset = GNUNET_htonll (fragment_offset);
903 mp->message_id = GNUNET_htonll (message_id); 754 mp->message_id = GNUNET_htonll (message_id);
@@ -905,15 +756,12 @@ fragment_row (struct Plugin *plugin,
905 mp->flags = htonl(msg_flags); 756 mp->flags = htonl(msg_flags);
906 757
907 GNUNET_memcpy (&mp[1], 758 GNUNET_memcpy (&mp[1],
908 buf, 759 buf, buf_size);
909 buf_size);
910 GNUNET_PQ_cleanup_result(results); 760 GNUNET_PQ_cleanup_result(results);
911 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); 761 c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
912 if (NULL != returned_fragments) 762 if (NULL != c->returned_fragments)
913 (*returned_fragments)++; 763 (*c->returned_fragments)++;
914 } 764 }
915
916 return ret;
917} 765}
918 766
919 767
@@ -925,26 +773,19 @@ fragment_select (struct Plugin *plugin,
925 GNUNET_PSYCSTORE_FragmentCallback cb, 773 GNUNET_PSYCSTORE_FragmentCallback cb,
926 void *cb_cls) 774 void *cb_cls)
927{ 775{
928 PGresult *res; 776 /* Stack based closure */
929 int ret = GNUNET_SYSERR; 777 struct FragmentRowsContext frc = {
930 778 .cb = cb,
931 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); 779 .cb_cls = cb_cls,
932 if (GNUNET_YES == 780 .returned_fragments = returned_fragments,
933 GNUNET_POSTGRES_check_result (plugin->dbh, 781 .ret = GNUNET_SYSERR
934 res, 782 };
935 PGRES_TUPLES_OK,
936 "PQexecPrepared", stmt))
937 {
938 if (PQntuples (res) == 0)
939 ret = GNUNET_NO;
940 else
941 {
942 ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
943 }
944 PQclear (res);
945 }
946 783
947 return ret; 784 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
785 stmt, params,
786 &fragment_rows, &frc))
787 return GNUNET_SYSERR;
788 return frc.ret; /* GNUNET_OK ?? */
948} 789}
949 790
950/** 791/**
@@ -964,8 +805,6 @@ fragment_get (void *cls,
964 void *cb_cls) 805 void *cb_cls)
965{ 806{
966 struct Plugin *plugin = cls; 807 struct Plugin *plugin = cls;
967 *returned_fragments = 0;
968
969 struct GNUNET_PQ_QueryParam params_select[] = { 808 struct GNUNET_PQ_QueryParam params_select[] = {
970 GNUNET_PQ_query_param_auto_from_type (channel_key), 809 GNUNET_PQ_query_param_auto_from_type (channel_key),
971 GNUNET_PQ_query_param_uint64 (&first_fragment_id), 810 GNUNET_PQ_query_param_uint64 (&first_fragment_id),
@@ -973,7 +812,12 @@ fragment_get (void *cls,
973 GNUNET_PQ_query_param_end 812 GNUNET_PQ_query_param_end
974 }; 813 };
975 814
976 return fragment_select (plugin, "select_fragments", params_select, returned_fragments, cb, cb_cls); 815 *returned_fragments = 0;
816 return fragment_select (plugin,
817 "select_fragments",
818 params_select,
819 returned_fragments,
820 cb, cb_cls);
977} 821}
978 822
979 823
@@ -1002,7 +846,11 @@ fragment_get_latest (void *cls,
1002 GNUNET_PQ_query_param_end 846 GNUNET_PQ_query_param_end
1003 }; 847 };
1004 848
1005 return fragment_select (plugin, "select_latest_fragments", params_select, returned_fragments, cb, cb_cls); 849 return fragment_select (plugin,
850 "select_latest_fragments",
851 params_select,
852 returned_fragments,
853 cb, cb_cls);
1006} 854}
1007 855
1008 856
@@ -1024,11 +872,6 @@ message_get (void *cls,
1024 void *cb_cls) 872 void *cb_cls)
1025{ 873{
1026 struct Plugin *plugin = cls; 874 struct Plugin *plugin = cls;
1027 *returned_fragments = 0;
1028
1029 if (0 == fragment_limit)
1030 fragment_limit = INT64_MAX;
1031
1032 struct GNUNET_PQ_QueryParam params_select[] = { 875 struct GNUNET_PQ_QueryParam params_select[] = {
1033 GNUNET_PQ_query_param_auto_from_type (channel_key), 876 GNUNET_PQ_query_param_auto_from_type (channel_key),
1034 GNUNET_PQ_query_param_uint64 (&first_message_id), 877 GNUNET_PQ_query_param_uint64 (&first_message_id),
@@ -1037,7 +880,14 @@ message_get (void *cls,
1037 GNUNET_PQ_query_param_end 880 GNUNET_PQ_query_param_end
1038 }; 881 };
1039 882
1040 return fragment_select (plugin, "select_messages", params_select, returned_fragments, cb, cb_cls); 883 if (0 == fragment_limit)
884 fragment_limit = INT64_MAX;
885 *returned_fragments = 0;
886 return fragment_select (plugin,
887 "select_messages",
888 params_select,
889 returned_fragments,
890 cb, cb_cls);
1041} 891}
1042 892
1043 893
@@ -1057,8 +907,6 @@ message_get_latest (void *cls,
1057 void *cb_cls) 907 void *cb_cls)
1058{ 908{
1059 struct Plugin *plugin = cls; 909 struct Plugin *plugin = cls;
1060 *returned_fragments = 0;
1061
1062 struct GNUNET_PQ_QueryParam params_select[] = { 910 struct GNUNET_PQ_QueryParam params_select[] = {
1063 GNUNET_PQ_query_param_auto_from_type (channel_key), 911 GNUNET_PQ_query_param_auto_from_type (channel_key),
1064 GNUNET_PQ_query_param_auto_from_type (channel_key), 912 GNUNET_PQ_query_param_auto_from_type (channel_key),
@@ -1066,7 +914,12 @@ message_get_latest (void *cls,
1066 GNUNET_PQ_query_param_end 914 GNUNET_PQ_query_param_end
1067 }; 915 };
1068 916
1069 return fragment_select (plugin, "select_latest_messages", params_select, returned_fragments, cb, cb_cls); 917 *returned_fragments = 0;
918 return fragment_select (plugin,
919 "select_latest_messages",
920 params_select,
921 returned_fragments,
922 cb, cb_cls);
1070} 923}
1071 924
1072 925
@@ -1086,9 +939,7 @@ message_get_fragment (void *cls,
1086 GNUNET_PSYCSTORE_FragmentCallback cb, 939 GNUNET_PSYCSTORE_FragmentCallback cb,
1087 void *cb_cls) 940 void *cb_cls)
1088{ 941{
1089 PGresult *res;
1090 struct Plugin *plugin = cls; 942 struct Plugin *plugin = cls;
1091 int ret = GNUNET_SYSERR;
1092 const char *stmt = "select_message_fragment"; 943 const char *stmt = "select_message_fragment";
1093 944
1094 struct GNUNET_PQ_QueryParam params_select[] = { 945 struct GNUNET_PQ_QueryParam params_select[] = {
@@ -1098,21 +949,19 @@ message_get_fragment (void *cls,
1098 GNUNET_PQ_query_param_end 949 GNUNET_PQ_query_param_end
1099 }; 950 };
1100 951
1101 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 952 /* Stack based closure */
1102 if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh, 953 struct FragmentRowsContext frc = {
1103 res, 954 .cb = cb,
1104 PGRES_TUPLES_OK, 955 .cb_cls = cb_cls,
1105 "PQexecPrepared", stmt)) 956 .returned_fragments = NULL,
1106 { 957 .ret = GNUNET_SYSERR
1107 if (PQntuples (res) == 0) 958 };
1108 ret = GNUNET_NO;
1109 else
1110 ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
1111
1112 PQclear (res);
1113 }
1114 959
1115 return ret; 960 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
961 stmt, params_select,
962 &fragment_rows, &frc))
963 return GNUNET_SYSERR;
964 return frc.ret; /* GNUNET_OK ?? */
1116} 965}
1117 966
1118/** 967/**
@@ -1129,7 +978,6 @@ counters_message_get (void *cls,
1129 uint64_t *max_message_id, 978 uint64_t *max_message_id,
1130 uint64_t *max_group_generation) 979 uint64_t *max_group_generation)
1131{ 980{
1132 PGresult *res;
1133 struct Plugin *plugin = cls; 981 struct Plugin *plugin = cls;
1134 982
1135 const char *stmt = "select_counters_message"; 983 const char *stmt = "select_counters_message";
@@ -1139,15 +987,6 @@ counters_message_get (void *cls,
1139 GNUNET_PQ_query_param_end 987 GNUNET_PQ_query_param_end
1140 }; 988 };
1141 989
1142 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1143 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1144 res,
1145 PGRES_TUPLES_OK,
1146 "PQexecPrepared", stmt))
1147 {
1148 return GNUNET_SYSERR;
1149 }
1150
1151 struct GNUNET_PQ_ResultSpec results_select[] = { 990 struct GNUNET_PQ_ResultSpec results_select[] = {
1152 GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id), 991 GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
1153 GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id), 992 GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
@@ -1155,14 +994,10 @@ counters_message_get (void *cls,
1155 GNUNET_PQ_result_spec_end 994 GNUNET_PQ_result_spec_end
1156 }; 995 };
1157 996
1158 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0)) 997 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1159 { 998 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1160 PQclear (res); 999 params_select, results_select))
1161 return GNUNET_SYSERR; 1000 return GNUNET_SYSERR;
1162 }
1163
1164 GNUNET_PQ_cleanup_result(results_select);
1165 PQclear (res);
1166 1001
1167 return GNUNET_OK; 1002 return GNUNET_OK;
1168} 1003}
@@ -1179,44 +1014,26 @@ counters_state_get (void *cls,
1179 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1014 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1180 uint64_t *max_state_message_id) 1015 uint64_t *max_state_message_id)
1181{ 1016{
1182 PGresult *res;
1183 struct Plugin *plugin = cls; 1017 struct Plugin *plugin = cls;
1184 1018
1185 const char *stmt = "select_counters_state"; 1019 const char *stmt = "select_counters_state";
1186 1020
1187 int ret = GNUNET_SYSERR;
1188
1189 struct GNUNET_PQ_QueryParam params_select[] = { 1021 struct GNUNET_PQ_QueryParam params_select[] = {
1190 GNUNET_PQ_query_param_auto_from_type (channel_key), 1022 GNUNET_PQ_query_param_auto_from_type (channel_key),
1191 GNUNET_PQ_query_param_end 1023 GNUNET_PQ_query_param_end
1192 }; 1024 };
1193 1025
1194 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1195 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1196 res,
1197 PGRES_TUPLES_OK,
1198 "PQexecPrepared", stmt))
1199 {
1200 return GNUNET_SYSERR;
1201 }
1202
1203 struct GNUNET_PQ_ResultSpec results_select[] = { 1026 struct GNUNET_PQ_ResultSpec results_select[] = {
1204 GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id), 1027 GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id),
1205 GNUNET_PQ_result_spec_end 1028 GNUNET_PQ_result_spec_end
1206 }; 1029 };
1207 1030
1208 ret = GNUNET_PQ_extract_result (res, results_select, 0); 1031 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1209 1032 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1210 if (GNUNET_OK != ret) 1033 params_select, results_select))
1211 { 1034 return GNUNET_SYSERR;
1212 PQclear (res);
1213 return GNUNET_SYSERR;
1214 }
1215
1216 GNUNET_PQ_cleanup_result(results_select);
1217 PQclear (res);
1218 1035
1219 return ret; 1036 return GNUNET_OK;
1220} 1037}
1221 1038
1222 1039
@@ -1230,8 +1047,6 @@ state_assign (struct Plugin *plugin, const char *stmt,
1230 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1047 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1231 const char *name, const void *value, size_t value_size) 1048 const char *name, const void *value, size_t value_size)
1232{ 1049{
1233 PGresult *res;
1234
1235 struct GNUNET_PQ_QueryParam params[] = { 1050 struct GNUNET_PQ_QueryParam params[] = {
1236 GNUNET_PQ_query_param_auto_from_type (channel_key), 1051 GNUNET_PQ_query_param_auto_from_type (channel_key),
1237 GNUNET_PQ_query_param_string (name), 1052 GNUNET_PQ_query_param_string (name),
@@ -1239,44 +1054,29 @@ state_assign (struct Plugin *plugin, const char *stmt,
1239 GNUNET_PQ_query_param_end 1054 GNUNET_PQ_query_param_end
1240 }; 1055 };
1241 1056
1242 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); 1057 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1243 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1058 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1244 res,
1245 PGRES_COMMAND_OK,
1246 "PQexecPrepared", stmt))
1247 {
1248 return GNUNET_SYSERR; 1059 return GNUNET_SYSERR;
1249 }
1250
1251 PQclear (res);
1252 1060
1253 return GNUNET_OK; 1061 return GNUNET_OK;
1254} 1062}
1255 1063
1256 1064
1257static int 1065static int
1258update_message_id (struct Plugin *plugin, const char *stmt, 1066update_message_id (struct Plugin *plugin,
1067 const char *stmt,
1259 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1068 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1260 uint64_t message_id) 1069 uint64_t message_id)
1261{ 1070{
1262 PGresult *res;
1263
1264 struct GNUNET_PQ_QueryParam params[] = { 1071 struct GNUNET_PQ_QueryParam params[] = {
1265 GNUNET_PQ_query_param_uint64 (&message_id), 1072 GNUNET_PQ_query_param_uint64 (&message_id),
1266 GNUNET_PQ_query_param_auto_from_type (channel_key), 1073 GNUNET_PQ_query_param_auto_from_type (channel_key),
1267 GNUNET_PQ_query_param_end 1074 GNUNET_PQ_query_param_end
1268 }; 1075 };
1269 1076
1270 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); 1077 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
1271 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1078 GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params))
1272 res,
1273 PGRES_COMMAND_OK,
1274 "PQexecPrepared", stmt))
1275 {
1276 return GNUNET_SYSERR; 1079 return GNUNET_SYSERR;
1277 }
1278
1279 PQclear (res);
1280 1080
1281 return GNUNET_OK; 1081 return GNUNET_OK;
1282} 1082}
@@ -1487,10 +1287,7 @@ static int
1487state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1287state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1488 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) 1288 const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1489{ 1289{
1490 PGresult *res;
1491
1492 struct Plugin *plugin = cls; 1290 struct Plugin *plugin = cls;
1493 int ret = GNUNET_SYSERR;
1494 1291
1495 const char *stmt = "select_state_one"; 1292 const char *stmt = "select_state_one";
1496 1293
@@ -1503,44 +1300,80 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1503 void *value_current = NULL; 1300 void *value_current = NULL;
1504 size_t value_size = 0; 1301 size_t value_size = 0;
1505 1302
1506 struct GNUNET_PQ_ResultSpec results[] = { 1303 struct GNUNET_PQ_ResultSpec results_select[] = {
1507 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), 1304 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1508 GNUNET_PQ_result_spec_end 1305 GNUNET_PQ_result_spec_end
1509 }; 1306 };
1510 1307
1511 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 1308 if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
1512 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1309 GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt,
1513 res, 1310 params_select, results_select))
1514 PGRES_TUPLES_OK, 1311 return GNUNET_SYSERR;
1515 "PQexecPrepared", stmt))
1516 {
1517 return GNUNET_SYSERR;
1518 }
1519 1312
1520 if (PQntuples (res) == 0) 1313 return cb (cb_cls, name, value_current,
1521 { 1314 value_size);
1522 PQclear (res); 1315}
1523 ret = GNUNET_NO;
1524 }
1525 1316
1526 ret = GNUNET_PQ_extract_result (res, results, 0);
1527 1317
1528 if (GNUNET_OK != ret) 1318
1319/**
1320 * Closure for #get_state_cb.
1321 */
1322struct GetStateContext {
1323 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
1324 // const char *name,
1325 GNUNET_PSYCSTORE_StateCallback cb;
1326 void *cb_cls;
1327
1328 const char *value_id;
1329
1330 /* I preserved this but I do not see the point since
1331 * it cannot stop the loop early and gets overwritten ?? */
1332 int ret;
1333};
1334
1335
1336/**
1337 * Callback that retrieves the results of a SELECT statement
1338 * reading form the state table.
1339 *
1340 * Only passed to GNUNET_PQ_eval_prepared_multi_select and
1341 * has type GNUNET_PQ_PostgresResultHandler.
1342 *
1343 * @param cls closure
1344 * @param result the postgres result
1345 * @param num_result the number of results in @a result
1346 */
1347static void
1348get_state_cb (void *cls,
1349 PGresult *res,
1350 unsigned int num_results)
1351{
1352 struct GetStateContext *c = cls;
1353
1354 for (unsigned int i=0;i<num_results;i++)
1529 { 1355 {
1530 PQclear (res); 1356 char *name = "";
1531 return GNUNET_SYSERR; 1357 void *value = NULL;
1532 } 1358 size_t value_size = 0;
1533 1359
1534 ret = cb (cb_cls, name, value_current, 1360 struct GNUNET_PQ_ResultSpec results[] = {
1535 value_size); 1361 GNUNET_PQ_result_spec_string ("name", &name),
1362 GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size),
1363 GNUNET_PQ_result_spec_end
1364 };
1536 1365
1537 GNUNET_PQ_cleanup_result(results); 1366 if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
1538 PQclear (res); 1367 {
1368 GNUNET_PQ_cleanup_result(results); /* previously invoked via PQclear?? */
1369 break; /* nothing more?? */
1370 }
1539 1371
1540 return ret; 1372 c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size);
1373 GNUNET_PQ_cleanup_result(results);
1374 }
1541} 1375}
1542 1376
1543
1544/** 1377/**
1545 * Retrieve all state variables for a channel with the given prefix. 1378 * Retrieve all state variables for a channel with the given prefix.
1546 * 1379 *
@@ -1553,9 +1386,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1553 const char *name, GNUNET_PSYCSTORE_StateCallback cb, 1386 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1554 void *cb_cls) 1387 void *cb_cls)
1555{ 1388{
1556 PGresult *res;
1557 struct Plugin *plugin = cls; 1389 struct Plugin *plugin = cls;
1558 int ret = GNUNET_NO;
1559 1390
1560 const char *stmt = "select_state_prefix"; 1391 const char *stmt = "select_state_prefix";
1561 1392
@@ -1569,42 +1400,18 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1569 GNUNET_PQ_query_param_end 1400 GNUNET_PQ_query_param_end
1570 }; 1401 };
1571 1402
1572 char *name2 = ""; 1403 struct GetStateContext gsc = {
1573 void *value_current = NULL; 1404 .cb = cb,
1574 size_t value_size = 0; 1405 .cb_cls = cb_cls,
1575 1406 .value_id = "value_current",
1576 struct GNUNET_PQ_ResultSpec results[] = { 1407 .ret = GNUNET_NO
1577 GNUNET_PQ_result_spec_string ("name", &name2),
1578 GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size),
1579 GNUNET_PQ_result_spec_end
1580 }; 1408 };
1581 1409
1582 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 1410 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1583 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1411 stmt, params_select,
1584 res, 1412 &get_state_cb, &gsc))
1585 PGRES_TUPLES_OK,
1586 "PQexecPrepared", stmt))
1587 {
1588 return GNUNET_SYSERR; 1413 return GNUNET_SYSERR;
1589 } 1414 return gsc.ret; /* GNUNET_OK ?? */
1590
1591 int nrows = PQntuples (res);
1592 for (int row = 0; row < nrows; row++)
1593 {
1594 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1595 {
1596 break;
1597 }
1598
1599 ret = cb (cb_cls, (const char *) name2,
1600 value_current,
1601 value_size);
1602 GNUNET_PQ_cleanup_result(results);
1603 }
1604
1605 PQclear (res);
1606
1607 return ret;
1608} 1415}
1609 1416
1610 1417
@@ -1620,9 +1427,7 @@ state_get_signed (void *cls,
1620 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1427 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1621 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) 1428 GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls)
1622{ 1429{
1623 PGresult *res;
1624 struct Plugin *plugin = cls; 1430 struct Plugin *plugin = cls;
1625 int ret = GNUNET_NO;
1626 1431
1627 const char *stmt = "select_state_signed"; 1432 const char *stmt = "select_state_signed";
1628 1433
@@ -1631,43 +1436,18 @@ state_get_signed (void *cls,
1631 GNUNET_PQ_query_param_end 1436 GNUNET_PQ_query_param_end
1632 }; 1437 };
1633 1438
1634 char *name = ""; 1439 struct GetStateContext gsc = {
1635 void *value_signed = NULL; 1440 .cb = cb,
1636 size_t value_size = 0; 1441 .cb_cls = cb_cls,
1637 1442 .value_id = "value_signed",
1638 struct GNUNET_PQ_ResultSpec results[] = { 1443 .ret = GNUNET_NO
1639 GNUNET_PQ_result_spec_string ("name", &name),
1640 GNUNET_PQ_result_spec_variable_size ("value_signed", &value_signed, &value_size),
1641 GNUNET_PQ_result_spec_end
1642 }; 1444 };
1643 1445
1644 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 1446 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
1645 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1447 stmt, params_select,
1646 res, 1448 &get_state_cb, &gsc))
1647 PGRES_TUPLES_OK,
1648 "PQexecPrepared", stmt))
1649 {
1650 return GNUNET_SYSERR; 1449 return GNUNET_SYSERR;
1651 } 1450 return gsc.ret; /* GNUNET_OK ?? */
1652
1653 int nrows = PQntuples (res);
1654 for (int row = 0; row < nrows; row++)
1655 {
1656 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
1657 {
1658 break;
1659 }
1660
1661 ret = cb (cb_cls, (const char *) name,
1662 value_signed,
1663 value_size);
1664
1665 GNUNET_PQ_cleanup_result (results);
1666 }
1667
1668 PQclear (res);
1669
1670 return ret;
1671} 1451}
1672 1452
1673 1453