aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-09-16 04:59:05 +0000
committerGabor X Toth <*@tg-x.net>2013-09-16 04:59:05 +0000
commitf78e9753a91497f1deb5e20d10868c27ab4a6013 (patch)
treef49653367e00fd4b611ec6ec281144d4568bd111 /src/psycstore
parentfbae7143d25a258b9dfabefa9ca3956e9228cb0e (diff)
downloadgnunet-f78e9753a91497f1deb5e20d10868c27ab4a6013.tar.gz
gnunet-f78e9753a91497f1deb5e20d10868c27ab4a6013.zip
PSYCstore service and API implementation
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/Makefile.am6
-rw-r--r--src/psycstore/gnunet-service-psycstore.c661
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c823
-rw-r--r--src/psycstore/psycstore.h378
-rw-r--r--src/psycstore/psycstore_api.c1145
-rw-r--r--src/psycstore/test_plugin_psycstore.c212
-rw-r--r--src/psycstore/test_psycstore.c452
-rw-r--r--src/psycstore/test_psycstore.conf5
8 files changed, 3073 insertions, 609 deletions
diff --git a/src/psycstore/Makefile.am b/src/psycstore/Makefile.am
index 6f5c7187f..8448f45e8 100644
--- a/src/psycstore/Makefile.am
+++ b/src/psycstore/Makefile.am
@@ -104,9 +104,3 @@ test_plugin_psycstore_sqlite_SOURCES = \
104test_plugin_psycstore_sqlite_LDADD = \ 104test_plugin_psycstore_sqlite_LDADD = \
105 $(top_builddir)/src/testing/libgnunettesting.la \ 105 $(top_builddir)/src/testing/libgnunettesting.la \
106 $(top_builddir)/src/util/libgnunetutil.la 106 $(top_builddir)/src/util/libgnunetutil.la
107
108test_plugin_psycstore_postgres_SOURCES = \
109 test_plugin_psycstore.c
110test_plugin_psycstore_postgres_LDADD = \
111 $(top_builddir)/src/testing/libgnunettesting.la \
112 $(top_builddir)/src/util/libgnunetutil.la
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 5bc35d227..3d6cfdc40 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -1,21 +1,21 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public License 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19 */ 19 */
20 20
21/** 21/**
@@ -89,71 +89,134 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
89} 89}
90 90
91 91
92/** 92/**
93 * Send a result code back to the client. 93 * Send a result code back to the client.
94 * 94 *
95 * @param client client that should receive the result code 95 * @param client Client that should receive the result code.
96 * @param result_code code to transmit 96 * @param result_code Code to transmit.
97 * @param emsg error message to include (or NULL for none) 97 * @param op_id Operation ID.
98 * @param err_msg Error message to include (or NULL for none).
98 */ 99 */
99static void 100static void
100send_result_code (struct GNUNET_SERVER_Client *client, 101send_result_code (struct GNUNET_SERVER_Client *client, uint32_t result_code,
101 uint32_t result_code, 102 uint32_t op_id, const char *err_msg)
102 const char *emsg)
103{ 103{
104 struct ResultCodeMessage *rcm; 104 struct OperationResult *res;
105 size_t elen; 105 size_t err_len;
106 106
107 if (NULL == emsg) 107 if (NULL == err_msg)
108 elen = 0; 108 err_len = 0;
109 else 109 else
110 elen = strlen (emsg) + 1; 110 err_len = strlen (err_msg) + 1;
111 rcm = GNUNET_malloc (sizeof (struct ResultCodeMessage) + elen); 111 res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
112 rcm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); 112 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
113 rcm->header.size = htons (sizeof (struct ResultCodeMessage) + elen); 113 res->header.size = htons (sizeof (struct OperationResult) + err_len);
114 rcm->result_code = htonl (result_code); 114 res->result_code = htonl (result_code);
115 memcpy (&rcm[1], emsg, elen); 115 res->op_id = op_id;
116 memcpy (&res[1], err_msg, err_len);
116 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
117 "Sending result %d (%s) to client\n", 118 "Sending result %d (%s) to client\n",
118 (int) result_code, 119 (int) result_code,
119 emsg); 120 err_msg);
120 GNUNET_SERVER_notification_context_unicast (nc, client, &rcm->header, 121 GNUNET_SERVER_notification_context_add (nc, client);
122 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
123 GNUNET_NO);
124 GNUNET_free (res);
125}
126
127
128struct SendClosure
129{
130 struct GNUNET_SERVER_Client *client;
131 uint64_t op_id;
132};
133
134
135static int
136send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
137 enum GNUNET_PSYCSTORE_MessageFlags flags)
138{
139 struct SendClosure *sc = cls;
140 struct FragmentResult *res;
141 size_t msg_size = ntohs (msg->header.size);
142
143 res = GNUNET_malloc (sizeof (struct FragmentResult) + msg_size);
144 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT);
145 res->header.size = htons (sizeof (struct FragmentResult) + msg_size);
146 res->op_id = sc->op_id;
147 res->psycstore_flags = htonl (flags);
148 memcpy (&res[1], msg, msg_size);
149 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
150 "Sending fragment %ld to client\n",
151 GNUNET_ntohll (msg->fragment_id));
152 GNUNET_free (msg);
153 GNUNET_SERVER_notification_context_add (nc, sc->client);
154 GNUNET_SERVER_notification_context_unicast (nc, sc->client, &res->header,
121 GNUNET_NO); 155 GNUNET_NO);
122 GNUNET_free (rcm); 156 GNUNET_free (res);
157 return GNUNET_OK;
158}
159
160
161static int
162send_state_var (void *cls, const char *name,
163 const void *value, size_t value_size)
164{
165 struct SendClosure *sc = cls;
166 struct StateResult *res;
167 size_t name_size = strlen (name) + 1;
168
169 res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
170 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
171 res->header.size = htons (sizeof (struct StateResult) + name_size + value_size);
172 res->op_id = sc->op_id;
173 res->name_size = htons (name_size);
174 memcpy (&res[1], name, name_size);
175 memcpy ((void *) &res[1] + name_size, value, value_size);
176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
177 "Sending state variable %s to client\n", name);
178 GNUNET_SERVER_notification_context_add (nc, sc->client);
179 GNUNET_SERVER_notification_context_unicast (nc, sc->client, &res->header,
180 GNUNET_NO);
181 GNUNET_free (res);
182 return GNUNET_OK;
123} 183}
124 184
125 185
126static void 186static void
127handle_membership_store (void *cls, 187handle_membership_store (void *cls,
128 struct GNUNET_SERVER_Client *client, 188 struct GNUNET_SERVER_Client *client,
129 const struct GNUNET_MessageHeader *message) 189 const struct GNUNET_MessageHeader *msg)
130{ 190{
131 const struct MembershipStoreMessage *msg = 191 const struct MembershipStoreRequest *req =
132 (const struct MembershipStoreMessage *) message; 192 (const struct MembershipStoreRequest *) msg;
133 193
134 int res = db->membership_store (db->cls, msg->channel_key, msg->slave_key, 194 int ret = db->membership_store (db->cls, &req->channel_key, &req->slave_key,
135 msg->did_join, msg->announced_at, 195 ntohl (req->did_join),
136 msg->effective_since, msg->group_generation); 196 GNUNET_ntohll (req->announced_at),
197 GNUNET_ntohll (req->effective_since),
198 GNUNET_ntohll (req->group_generation));
137 199
138 if (res != GNUNET_OK) 200 if (ret != GNUNET_OK)
139 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 201 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
140 _("Failed to store membership information!\n")); 202 _("Failed to store membership information!\n"));
141 203
142 send_result_code (client, res, NULL); 204 send_result_code (client, ret, req->op_id, NULL);
205 GNUNET_SERVER_receive_done (client, GNUNET_OK);
143} 206}
144 207
145 208
146static void 209static void
147handle_membership_test (void *cls, 210handle_membership_test (void *cls,
148 struct GNUNET_SERVER_Client *client, 211 struct GNUNET_SERVER_Client *client,
149 const struct GNUNET_MessageHeader *message) 212 const struct GNUNET_MessageHeader *msg)
150{ 213{
151 const struct MembershipTestMessage *msg = 214 const struct MembershipTestRequest *req =
152 (const struct MembershipTestMessage *) message; 215 (const struct MembershipTestRequest *) msg;
153 216
154 int res = db->membership_test (db->cls, msg->channel_key, msg->slave_key, 217 int ret = db->membership_test (db->cls, &req->channel_key, &req->slave_key,
155 msg->message_id); 218 GNUNET_ntohll (req->message_id));
156 switch (res) 219 switch (ret)
157 { 220 {
158 case GNUNET_YES: 221 case GNUNET_YES:
159 case GNUNET_NO: 222 case GNUNET_NO:
@@ -163,7 +226,440 @@ handle_membership_test (void *cls,
163 _("Failed to test membership!\n")); 226 _("Failed to test membership!\n"));
164 } 227 }
165 228
166 send_result_code (client, res, NULL); 229 send_result_code (client, ret, req->op_id, NULL);
230 GNUNET_SERVER_receive_done (client, GNUNET_OK);
231}
232
233
234static void
235handle_fragment_store (void *cls,
236 struct GNUNET_SERVER_Client *client,
237 const struct GNUNET_MessageHeader *msg)
238{
239 const struct FragmentStoreRequest *req =
240 (const struct FragmentStoreRequest *) msg;
241
242 int ret = db->fragment_store (db->cls, &req->channel_key,
243 (const struct GNUNET_MULTICAST_MessageHeader *)
244 &req[1], ntohl (req->psycstore_flags));
245
246 if (ret != GNUNET_OK)
247 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
248 _("Failed to store fragment!\n"));
249
250 send_result_code (client, ret, req->op_id, NULL);
251 GNUNET_SERVER_receive_done (client, GNUNET_OK);
252}
253
254
255static void
256handle_fragment_get (void *cls,
257 struct GNUNET_SERVER_Client *client,
258 const struct GNUNET_MessageHeader *msg)
259{
260 const struct FragmentGetRequest *req
261 = (const struct FragmentGetRequest *) msg;
262 struct SendClosure sc = { .op_id = req->op_id, .client = client };
263
264 int ret = db->fragment_get (db->cls, &req->channel_key,
265 GNUNET_ntohll (req->fragment_id),
266 &send_fragment, &sc);
267 switch (ret)
268 {
269 case GNUNET_YES:
270 case GNUNET_NO:
271 break;
272 default:
273 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
274 _("Failed to get fragment!\n"));
275 }
276
277 send_result_code (client, ret, req->op_id, NULL);
278 GNUNET_SERVER_receive_done (client, GNUNET_OK);
279}
280
281
282static void
283handle_message_get (void *cls,
284 struct GNUNET_SERVER_Client *client,
285 const struct GNUNET_MessageHeader *msg)
286{
287 const struct MessageGetRequest *req = (const struct MessageGetRequest *) msg;
288 struct SendClosure sc = { .op_id = req->op_id, .client = client };
289 uint64_t ret_frags = 0;
290 int64_t ret = db->message_get (db->cls, &req->channel_key,
291 GNUNET_ntohll (req->message_id),
292 &ret_frags, &send_fragment, &sc);
293 switch (ret)
294 {
295 case GNUNET_YES:
296 case GNUNET_NO:
297 break;
298 default:
299 ret_frags = ret;
300 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
301 _("Failed to get message!\n"));
302 }
303
304 send_result_code (client, ret_frags, req->op_id, NULL);
305 GNUNET_SERVER_receive_done (client, GNUNET_OK);
306}
307
308
309static void
310handle_message_get_fragment (void *cls,
311 struct GNUNET_SERVER_Client *client,
312 const struct GNUNET_MessageHeader *msg)
313{
314 const struct MessageGetFragmentRequest *req =
315 (const struct MessageGetFragmentRequest *) msg;
316
317 struct SendClosure sc = { .op_id = req->op_id, .client = client };
318
319 int ret = db->message_get_fragment (db->cls, &req->channel_key,
320 GNUNET_ntohll (req->message_id),
321 GNUNET_ntohll (req->fragment_offset),
322 &send_fragment, &sc);
323 switch (ret)
324 {
325 case GNUNET_YES:
326 case GNUNET_NO:
327 break;
328 default:
329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
330 _("Failed to get message fragment!\n"));
331 }
332
333 send_result_code (client, ret, req->op_id, NULL);
334 GNUNET_SERVER_receive_done (client, GNUNET_OK);
335}
336
337
338static void
339handle_counters_get_master (void *cls,
340 struct GNUNET_SERVER_Client *client,
341 const struct GNUNET_MessageHeader *msg)
342{
343 const struct OperationRequest *req = (const struct OperationRequest *) msg;
344 struct MasterCountersResult res = { {0} };
345
346 int ret = db->counters_get_master (db->cls, &req->channel_key,
347 &res.fragment_id, &res.message_id,
348 &res.group_generation);
349 switch (ret)
350 {
351 case GNUNET_YES:
352 case GNUNET_NO:
353 break;
354 default:
355 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
356 _("Failed to get master counters!\n"));
357 }
358
359 res.header.type
360 = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER);
361 res.header.size = htons (sizeof (res));
362 res.result_code = htonl (ret);
363 res.op_id = req->op_id;
364 res.fragment_id = GNUNET_htonll (res.fragment_id);
365 res.message_id = GNUNET_htonll (res.message_id);
366 res.group_generation = GNUNET_htonll (res.group_generation);
367
368 GNUNET_SERVER_notification_context_add (nc, client);
369 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
370 GNUNET_NO);
371
372 GNUNET_SERVER_receive_done (client, GNUNET_OK);
373}
374
375
376static void
377handle_counters_get_slave (void *cls,
378 struct GNUNET_SERVER_Client *client,
379 const struct GNUNET_MessageHeader *msg)
380{
381 const struct OperationRequest *req = (const struct OperationRequest *) msg;
382 struct SlaveCountersResult res = { {0} };
383
384 int ret = db->counters_get_slave (db->cls, &req->channel_key,
385 &res.max_known_msg_id);
386
387 switch (ret)
388 {
389 case GNUNET_YES:
390 case GNUNET_NO:
391 break;
392 default:
393 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
394 _("Failed to get slave counters!\n"));
395 }
396
397 res.header.type
398 = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE);
399 res.header.size = htons (sizeof (res));
400 res.result_code = htonl (ret);
401 res.op_id = req->op_id;
402 res.max_known_msg_id = GNUNET_htonll (res.max_known_msg_id);
403
404 GNUNET_SERVER_notification_context_add (nc, client);
405 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
406 GNUNET_NO);
407
408 GNUNET_SERVER_receive_done (client, GNUNET_OK);
409}
410
411
412/* FIXME: stop processing further state modify messages after an error */
413static void
414handle_state_modify (void *cls,
415 struct GNUNET_SERVER_Client *client,
416 const struct GNUNET_MessageHeader *msg)
417{
418 const struct StateModifyRequest *req
419 = (const struct StateModifyRequest *) msg;
420
421 int ret = GNUNET_SYSERR;
422 const char *name = (const char *) &req[1];
423 uint16_t name_size = ntohs (req->name_size);
424
425 if (name_size <= 2 || '\0' != name[name_size - 1])
426 {
427 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
428 _("Tried to set invalid state variable name!\n"));
429 GNUNET_break_op (0);
430 }
431 else
432 {
433 ret = GNUNET_OK;
434
435 if (req->flags & STATE_OP_FIRST)
436 {
437 ret = db->state_modify_begin (db->cls, &req->channel_key,
438 GNUNET_ntohll (req->message_id),
439 GNUNET_ntohll (req->state_delta));
440 }
441 if (ret != GNUNET_OK)
442 {
443 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
444 _("Failed to begin modifying state!\n"));
445 }
446 else
447 {
448 switch (req->oper)
449 {
450 case GNUNET_ENV_OP_ASSIGN:
451 ret = db->state_modify_set (db->cls, &req->channel_key,
452 (const char *) &req[1],
453 name + ntohs (req->name_size),
454 ntohs (req->header.size) - sizeof (*req)
455 - ntohs (req->name_size));
456 break;
457 default:
458#if TODO
459 ret = GNUNET_ENV_operation ((const char *) &req[1],
460 current_value, current_value_size,
461 req->oper, name + ntohs (req->name_size),
462 ntohs (req->header.size) - sizeof (*req)
463 - ntohs (req->name_size), &value, &value_size);
464#endif
465 ret = GNUNET_SYSERR;
466 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
467 _("Unknown operator: %c\n"), req->oper);
468 }
469 }
470
471 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
472 {
473 ret = db->state_modify_end (db->cls, &req->channel_key,
474 GNUNET_ntohll (req->message_id));
475 if (ret != GNUNET_OK)
476 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477 _("Failed to end modifying state!\n"));
478 }
479 }
480 send_result_code (client, ret, req->op_id, NULL);
481 GNUNET_SERVER_receive_done (client, GNUNET_OK);
482}
483
484
485/* FIXME: stop processing further state sync messages after an error */
486static void
487handle_state_sync (void *cls,
488 struct GNUNET_SERVER_Client *client,
489 const struct GNUNET_MessageHeader *msg)
490{
491 const struct StateSyncRequest *req
492 = (const struct StateSyncRequest *) msg;
493
494 int ret = GNUNET_SYSERR;
495 const char *name = (const char *) &req[1];
496 uint16_t name_size = ntohs (req->name_size);
497
498 if (name_size <= 2 || '\0' != name[name_size - 1])
499 {
500 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
501 _("Tried to set invalid state variable name!\n"));
502 GNUNET_break_op (0);
503 }
504 else
505 {
506 ret = GNUNET_OK;
507
508 if (req->flags & STATE_OP_FIRST)
509 {
510 ret = db->state_sync_begin (db->cls, &req->channel_key);
511 }
512 if (ret != GNUNET_OK)
513 {
514 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
515 _("Failed to begin synchronizing state!\n"));
516 }
517 else
518 {
519 ret = db->state_sync_set (db->cls, &req->channel_key, name,
520 name + ntohs (req->name_size),
521 ntohs (req->header.size) - sizeof (*req)
522 - ntohs (req->name_size));
523 }
524
525 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
526 {
527 ret = db->state_sync_end (db->cls, &req->channel_key,
528 GNUNET_ntohll (req->message_id));
529 if (ret != GNUNET_OK)
530 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
531 _("Failed to end synchronizing state!\n"));
532 }
533 }
534 send_result_code (client, ret, req->op_id, NULL);
535 GNUNET_SERVER_receive_done (client, GNUNET_OK);
536}
537
538
539static void
540handle_state_reset (void *cls,
541 struct GNUNET_SERVER_Client *client,
542 const struct GNUNET_MessageHeader *msg)
543{
544 const struct OperationRequest *req =
545 (const struct OperationRequest *) msg;
546
547 int ret = db->state_reset (db->cls, &req->channel_key);
548
549 if (ret != GNUNET_OK)
550 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
551 _("Failed to reset state!\n"));
552
553 send_result_code (client, ret, req->op_id, NULL);
554 GNUNET_SERVER_receive_done (client, GNUNET_OK);
555}
556
557
558static void
559handle_state_hash_update (void *cls,
560 struct GNUNET_SERVER_Client *client,
561 const struct GNUNET_MessageHeader *msg)
562{
563 const struct OperationRequest *req =
564 (const struct OperationRequest *) msg;
565
566 int ret = db->state_reset (db->cls, &req->channel_key);
567
568 if (ret != GNUNET_OK)
569 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
570 _("Failed to reset state!\n"));
571
572 send_result_code (client, ret, req->op_id, NULL);
573 GNUNET_SERVER_receive_done (client, GNUNET_OK);
574}
575
576
577static void
578handle_state_get (void *cls,
579 struct GNUNET_SERVER_Client *client,
580 const struct GNUNET_MessageHeader *msg)
581{
582 const struct OperationRequest *req =
583 (const struct OperationRequest *) msg;
584
585 struct SendClosure sc = { .op_id = req->op_id, .client = client };
586 int64_t ret = GNUNET_SYSERR;
587 const char *name = (const char *) &req[1];
588 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
589
590 if (name_size <= 2 || '\0' != name[name_size - 1])
591 {
592 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
593 _("Tried to get invalid state variable name!\n"));
594 GNUNET_break (0);
595 }
596 else
597 {
598 ret = db->state_get (db->cls, &req->channel_key, name,
599 &send_state_var, &sc);
600 if (GNUNET_NO == ret && name_size >= 5) /* min: _a_b\0 */
601 {
602 char *p, *n = GNUNET_malloc (name_size);
603 memcpy (n, name, name_size);
604 while (&n[1] < (p = strrchr (n, '_')) && GNUNET_NO == ret)
605 {
606 *p = '\0';
607 ret = db->state_get (db->cls, &req->channel_key, n,
608 &send_state_var, &sc);
609 }
610 }
611 }
612 switch (ret)
613 {
614 case GNUNET_OK:
615 case GNUNET_NO:
616 break;
617 default:
618 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
619 _("Failed to get state variable!\n"));
620 }
621
622 send_result_code (client, ret, req->op_id, NULL);
623 GNUNET_SERVER_receive_done (client, GNUNET_OK);
624}
625
626
627static void
628handle_state_get_prefix (void *cls,
629 struct GNUNET_SERVER_Client *client,
630 const struct GNUNET_MessageHeader *msg)
631{
632 const struct OperationRequest *req =
633 (const struct OperationRequest *) msg;
634
635 struct SendClosure sc = { .op_id = req->op_id, .client = client };
636 int64_t ret = GNUNET_SYSERR;
637 const char *name = (const char *) &req[1];
638 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
639
640 if (name_size <= 1 || '\0' != name[name_size - 1])
641 {
642 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
643 _("Tried to get invalid state variable name!\n"));
644 GNUNET_break (0);
645 }
646 else
647 {
648 ret = db->state_get_prefix (db->cls, &req->channel_key, name,
649 &send_state_var, &sc);
650 }
651 switch (ret)
652 {
653 case GNUNET_OK:
654 case GNUNET_NO:
655 break;
656 default:
657 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
658 _("Failed to get state variable!\n"));
659 }
660
661 send_result_code (client, ret, req->op_id, NULL);
662 GNUNET_SERVER_receive_done (client, GNUNET_OK);
167} 663}
168 664
169 665
@@ -180,13 +676,58 @@ run (void *cls,
180 const struct GNUNET_CONFIGURATION_Handle *c) 676 const struct GNUNET_CONFIGURATION_Handle *c)
181{ 677{
182 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 678 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
183 {&handle_membership_store, NULL, 679 { &handle_membership_store, NULL,
184 GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE, 680 GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE,
185 sizeof (struct MembershipStoreMessage)}, 681 sizeof (struct MembershipStoreRequest) },
186 {&handle_membership_test, NULL, 682
187 GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST, 683 { &handle_membership_test, NULL,
188 sizeof (struct MembershipTestMessage)}, 684 GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST,
189 {NULL, NULL, 0, 0} 685 sizeof (struct MembershipTestRequest) },
686
687 { &handle_fragment_store, NULL,
688 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE, 0, },
689
690 { &handle_fragment_get, NULL,
691 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET,
692 sizeof (struct FragmentGetRequest) },
693
694 { &handle_message_get, NULL,
695 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET,
696 sizeof (struct MessageGetRequest) },
697
698 { &handle_message_get_fragment, NULL,
699 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT,
700 sizeof (struct MessageGetFragmentRequest) },
701
702 { &handle_counters_get_master, NULL,
703 GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER,
704 sizeof (struct OperationRequest) },
705
706 { &handle_counters_get_slave, NULL,
707 GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE,
708 sizeof (struct OperationRequest) },
709
710 { &handle_state_modify, NULL,
711 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY, 0 },
712
713 { &handle_state_sync, NULL,
714 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC, 0 },
715
716 { &handle_state_reset, NULL,
717 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET,
718 sizeof (struct OperationRequest) },
719
720 { &handle_state_hash_update, NULL,
721 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE,
722 sizeof (struct StateHashUpdateRequest) },
723
724 { &handle_state_get, NULL,
725 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET, 0 },
726
727 { &handle_state_get_prefix, NULL,
728 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX, 0 },
729
730 { NULL, NULL, 0, 0 }
190 }; 731 };
191 732
192 cfg = c; 733 cfg = c;
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index d8006c4fc..e9cc76e70 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -1,22 +1,22 @@
1 /* 1/*
2 * This file is part of GNUnet 2 * This file is part of GNUnet
3 * (C) 2009-2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 * 4 *
5 * GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published 6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version. 8 * option) any later version.
9 * 9 *
10 * GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12t * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details. 13 * General Public License for more details.
14 * 14 *
15 * You should have received a copy of the GNU General Public License 15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19 */ 19 */
20 20
21/** 21/**
22 * @file psycstore/plugin_psycstore_sqlite.c 22 * @file psycstore/plugin_psycstore_sqlite.c
@@ -61,6 +61,10 @@ t * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
61 61
62#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__) 62#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
63 63
64enum Transactions {
65 TRANSACTION_NONE = 0,
66 TRANSACTION_STATE_MODIFY
67};
64 68
65/** 69/**
66 * Context for all functions in this plugin. 70 * Context for all functions in this plugin.
@@ -81,6 +85,17 @@ struct Plugin
81 sqlite3 *dbh; 85 sqlite3 *dbh;
82 86
83 /** 87 /**
88 * Current transaction.
89 */
90 enum Transactions transaction;
91
92 sqlite3_stmt *transaction_begin;
93
94 sqlite3_stmt *transaction_commit;
95
96 sqlite3_stmt *transaction_rollback;
97
98 /**
84 * Precompiled SQL for channel_key_store() 99 * Precompiled SQL for channel_key_store()
85 */ 100 */
86 sqlite3_stmt *insert_channel_key; 101 sqlite3_stmt *insert_channel_key;
@@ -135,15 +150,35 @@ struct Plugin
135 /** 150 /**
136 * Precompiled SQL for counters_get_slave() 151 * Precompiled SQL for counters_get_slave()
137 */ 152 */
138 sqlite3_stmt *select_counters_slave; 153 sqlite3_stmt *select_max_state_message_id;
154
155 /**
156 * Precompiled SQL for state_modify_end()
157 */
158 sqlite3_stmt *update_state_hash_message_id;
159
160 /**
161 * Precompiled SQL for state_sync_end()
162 */
163 sqlite3_stmt *update_max_state_message_id;
164
139 165
166 /**
167 * Precompiled SQL for message_modify_begin()
168 */
169 sqlite3_stmt *select_message_state_delta;
140 170
141 /** 171 /**
142 * Precompiled SQL for state_set() 172 * Precompiled SQL for state_modify_set()
143 */ 173 */
144 sqlite3_stmt *insert_state_current; 174 sqlite3_stmt *insert_state_current;
145 175
146 /** 176 /**
177 * Precompiled SQL for state_modify_end()
178 */
179 sqlite3_stmt *delete_state_empty;
180
181 /**
147 * Precompiled SQL for state_set_signed() 182 * Precompiled SQL for state_set_signed()
148 */ 183 */
149 sqlite3_stmt *update_state_signed; 184 sqlite3_stmt *update_state_signed;
@@ -179,7 +214,7 @@ struct Plugin
179 sqlite3_stmt *select_state_one; 214 sqlite3_stmt *select_state_one;
180 215
181 /** 216 /**
182 * Precompiled SQL for state_get_all() 217 * Precompiled SQL for state_get_prefix()
183 */ 218 */
184 sqlite3_stmt *select_state_prefix; 219 sqlite3_stmt *select_state_prefix;
185 220
@@ -211,7 +246,7 @@ sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt)
211 246
212 result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt, 247 result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
213 (const char **) &tail); 248 (const char **) &tail);
214 LOG (GNUNET_ERROR_TYPE_DEBUG, 249 LOG (GNUNET_ERROR_TYPE_DEBUG,
215 "Prepared `%s' / %p: %d\n", sql, *stmt, result); 250 "Prepared `%s' / %p: %d\n", sql, *stmt, result);
216 if (result != SQLITE_OK) 251 if (result != SQLITE_OK)
217 LOG (GNUNET_ERROR_TYPE_ERROR, 252 LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -234,7 +269,7 @@ sql_exec (sqlite3 *dbh, const char *sql)
234 int result; 269 int result;
235 270
236 result = sqlite3_exec (dbh, sql, NULL, NULL, NULL); 271 result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
237 LOG (GNUNET_ERROR_TYPE_DEBUG, 272 LOG (GNUNET_ERROR_TYPE_DEBUG,
238 "Executed `%s' / %d\n", sql, result); 273 "Executed `%s' / %d\n", sql, result);
239 if (result != SQLITE_OK) 274 if (result != SQLITE_OK)
240 LOG (GNUNET_ERROR_TYPE_ERROR, 275 LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -295,7 +330,9 @@ database_setup (struct Plugin *plugin)
295 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF"); 330 sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
296 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL"); 331 sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
297 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\""); 332 sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
333#if ! DEBUG_PSYCSTORE
298 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE"); 334 sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
335#endif
299 sql_exec (plugin->dbh, "PRAGMA count_changes=OFF"); 336 sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
300 sql_exec (plugin->dbh, "PRAGMA page_size=4096"); 337 sql_exec (plugin->dbh, "PRAGMA page_size=4096");
301 338
@@ -306,7 +343,9 @@ database_setup (struct Plugin *plugin)
306 sql_exec (plugin->dbh, 343 sql_exec (plugin->dbh,
307 "CREATE TABLE IF NOT EXISTS channels (\n" 344 "CREATE TABLE IF NOT EXISTS channels (\n"
308 " id INTEGER PRIMARY KEY,\n" 345 " id INTEGER PRIMARY KEY,\n"
309 " pub_key BLOB UNIQUE\n" 346 " pub_key BLOB UNIQUE,\n"
347 " max_state_message_id INTEGER,\n"
348 " state_hash_message_id INTEGER\n"
310 ");"); 349 ");");
311 350
312 sql_exec (plugin->dbh, 351 sql_exec (plugin->dbh,
@@ -364,6 +403,12 @@ database_setup (struct Plugin *plugin)
364 403
365 /* Prepare statements */ 404 /* Prepare statements */
366 405
406 sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
407
408 sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
409
410 sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
411
367 sql_prepare (plugin->dbh, 412 sql_prepare (plugin->dbh,
368 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);", 413 "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
369 &plugin->insert_channel_key); 414 &plugin->insert_channel_key);
@@ -420,8 +465,8 @@ database_setup (struct Plugin *plugin)
420 " multicast_flags, psycstore_flags, data\n" 465 " multicast_flags, psycstore_flags, data\n"
421 "FROM messages\n" 466 "FROM messages\n"
422 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 467 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
423 " AND message_id = ?;", 468 " AND message_id = ? AND fragment_offset = ?;",
424 &plugin->select_message); 469 &plugin->select_message_fragment);
425 470
426 sql_prepare (plugin->dbh, 471 sql_prepare (plugin->dbh,
427 "SELECT hop_counter, signature, purpose, fragment_id,\n" 472 "SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -429,8 +474,8 @@ database_setup (struct Plugin *plugin)
429 " multicast_flags, psycstore_flags, data\n" 474 " multicast_flags, psycstore_flags, data\n"
430 "FROM messages\n" 475 "FROM messages\n"
431 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 476 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
432 " AND message_id = ? AND fragment_offset = ?;", 477 " AND message_id = ?;",
433 &plugin->select_message_fragment); 478 &plugin->select_message);
434 479
435 sql_prepare (plugin->dbh, 480 sql_prepare (plugin->dbh,
436 "SELECT fragment_id, message_id, group_generation\n" 481 "SELECT fragment_id, message_id, group_generation\n"
@@ -440,12 +485,33 @@ database_setup (struct Plugin *plugin)
440 &plugin->select_counters_master); 485 &plugin->select_counters_master);
441 486
442 sql_prepare (plugin->dbh, 487 sql_prepare (plugin->dbh,
443 "SELECT message_id\n" 488 "SELECT max_state_message_id\n"
444 "FROM messages\n" 489 "FROM channels\n"
445 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 490 "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
446 " AND psycstore_flags & ?\n" 491 &plugin->select_max_state_message_id);
447 "ORDER BY message_id DESC LIMIT 1", 492
448 &plugin->select_counters_slave); 493 sql_prepare (plugin->dbh,
494 "UPDATE channels\n"
495 "SET max_state_message_id = ?\n"
496 "WHERE pub_key = ?;",
497 &plugin->update_max_state_message_id);
498
499 sql_prepare (plugin->dbh,
500 "UPDATE channels\n"
501 "SET state_hash_message_id = ?\n"
502 "WHERE pub_key = ?;",
503 &plugin->update_state_hash_message_id);
504
505 sql_prepare (plugin->dbh,
506 "SELECT 1\n"
507 "FROM channels AS c\n"
508 "LEFT JOIN messages AS m\n"
509 "ON c.id = m.channel_id\n"
510 "WHERE c.pub_key = ?\n"
511 " AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
512 " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
513 "LIMIT 1;",
514 &plugin->select_message_state_delta);
449 515
450 sql_prepare (plugin->dbh, 516 sql_prepare (plugin->dbh,
451 "INSERT OR REPLACE INTO state\n" 517 "INSERT OR REPLACE INTO state\n"
@@ -461,22 +527,29 @@ database_setup (struct Plugin *plugin)
461 &plugin->insert_state_current); 527 &plugin->insert_state_current);
462 528
463 sql_prepare (plugin->dbh, 529 sql_prepare (plugin->dbh,
530 "DELETE FROM state\n"
531 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
532 " AND (value_current IS NULL OR length(value_current) = 0)\n"
533 " AND (value_signed IS NULL OR length(value_signed) = 0);",
534 &plugin->delete_state_empty);
535
536 sql_prepare (plugin->dbh,
464 "UPDATE state\n" 537 "UPDATE state\n"
465 "SET value_signed = value_current\n" 538 "SET value_signed = value_current\n"
466 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", 539 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
467 &plugin->update_state_signed); 540 &plugin->update_state_signed);
468 541
469 sql_prepare (plugin->dbh, 542 sql_prepare (plugin->dbh,
470 "INSERT INTO state_sync (channel_id, name, value)\n"
471 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
472 &plugin->insert_state_sync);
473
474 sql_prepare (plugin->dbh,
475 "DELETE FROM state\n" 543 "DELETE FROM state\n"
476 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", 544 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);",
477 &plugin->delete_state); 545 &plugin->delete_state);
478 546
479 sql_prepare (plugin->dbh, 547 sql_prepare (plugin->dbh,
548 "INSERT INTO state_sync (channel_id, name, value)\n"
549 "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
550 &plugin->insert_state_sync);
551
552 sql_prepare (plugin->dbh,
480 "INSERT INTO state\n" 553 "INSERT INTO state\n"
481 " (channel_id, name, value_current, value_signed)\n" 554 " (channel_id, name, value_current, value_signed)\n"
482 "SELECT channel_id, name, value, value\n" 555 "SELECT channel_id, name, value, value\n"
@@ -500,7 +573,7 @@ database_setup (struct Plugin *plugin)
500 "SELECT name, value_current\n" 573 "SELECT name, value_current\n"
501 "FROM state\n" 574 "FROM state\n"
502 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 575 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503 " AND name = ? OR name LIKE ?;", 576 " AND (name = ? OR name LIKE ?);",
504 &plugin->select_state_prefix); 577 &plugin->select_state_prefix);
505 578
506 sql_prepare (plugin->dbh, 579 sql_prepare (plugin->dbh,
@@ -524,86 +597,125 @@ database_shutdown (struct Plugin *plugin)
524{ 597{
525 int result; 598 int result;
526 sqlite3_stmt *stmt; 599 sqlite3_stmt *stmt;
600 while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
601 {
602 result = sqlite3_finalize (stmt);
603 if (SQLITE_OK != result)
604 LOG (GNUNET_ERROR_TYPE_WARNING,
605 "Failed to close statement %p: %d\n", stmt, result);
606 }
607 if (SQLITE_OK != sqlite3_close (plugin->dbh))
608 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
527 609
528 if (NULL != plugin->insert_channel_key) 610 GNUNET_free_non_null (plugin->fn);
529 sqlite3_finalize (plugin->insert_channel_key); 611}
530
531 if (NULL != plugin->insert_slave_key)
532 sqlite3_finalize (plugin->insert_slave_key);
533
534 if (NULL != plugin->insert_membership)
535 sqlite3_finalize (plugin->insert_membership);
536
537 if (NULL != plugin->select_membership)
538 sqlite3_finalize (plugin->select_membership);
539
540 if (NULL != plugin->insert_fragment)
541 sqlite3_finalize (plugin->insert_fragment);
542
543 if (NULL != plugin->update_message_flags)
544 sqlite3_finalize (plugin->update_message_flags);
545
546 if (NULL != plugin->select_fragment)
547 sqlite3_finalize (plugin->select_fragment);
548 612
549 if (NULL != plugin->select_message) 613/**
550 sqlite3_finalize (plugin->select_message); 614 * Execute a prepared statement with a @a channel_key argument.
615 *
616 * @param plugin Plugin handle.
617 * @param stmt Statement to execute.
618 * @param channel_key Public key of the channel.
619 *
620 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
621 */
622static int
623exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
624 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
625{
626 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
627 sizeof (*channel_key), SQLITE_STATIC))
628 {
629 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
630 "sqlite3_bind");
631 }
632 else if (SQLITE_DONE != sqlite3_step (stmt))
633 {
634 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
635 "sqlite3_step");
636 }
551 637
552 if (NULL != plugin->select_message_fragment) 638 if (SQLITE_OK != sqlite3_reset (stmt))
553 sqlite3_finalize (plugin->select_message_fragment); 639 {
640 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
641 "sqlite3_reset");
642 return GNUNET_SYSERR;
643 }
554 644
555 if (NULL != plugin->select_counters_master) 645 return GNUNET_OK;
556 sqlite3_finalize (plugin->select_counters_master); 646}
557 647
558 if (NULL != plugin->select_counters_slave) 648/**
559 sqlite3_finalize (plugin->select_counters_slave); 649 * Begin a transaction.
650 */
651static int
652transaction_begin (struct Plugin *plugin, enum Transactions transaction)
653{
654 sqlite3_stmt *stmt = plugin->transaction_begin;
560 655
561 if (NULL != plugin->insert_state_current) 656 if (SQLITE_DONE != sqlite3_step (stmt))
562 sqlite3_finalize (plugin->insert_state_current); 657 {
658 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
659 "sqlite3_step");
660 }
661 if (SQLITE_OK != sqlite3_reset (stmt))
662 {
663 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
664 "sqlite3_reset");
665 return GNUNET_SYSERR;
666 }
563 667
564 if (NULL != plugin->update_state_signed) 668 plugin->transaction = transaction;
565 sqlite3_finalize (plugin->update_state_signed); 669 return GNUNET_OK;
670}
566 671
567 if (NULL != plugin->insert_state_sync)
568 sqlite3_finalize (plugin->insert_state_sync);
569 672
570 if (NULL != plugin->delete_state) 673/**
571 sqlite3_finalize (plugin->delete_state); 674 * Commit current transaction.
675 */
676static int
677transaction_commit (struct Plugin *plugin)
678{
679 sqlite3_stmt *stmt = plugin->transaction_commit;
572 680
573 if (NULL != plugin->insert_state_from_sync) 681 if (SQLITE_DONE != sqlite3_step (stmt))
574 sqlite3_finalize (plugin->insert_state_from_sync); 682 {
683 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
684 "sqlite3_step");
685 }
686 if (SQLITE_OK != sqlite3_reset (stmt))
687 {
688 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
689 "sqlite3_reset");
690 return GNUNET_SYSERR;
691 }
575 692
576 if (NULL != plugin->delete_state_sync) 693 plugin->transaction = TRANSACTION_NONE;
577 sqlite3_finalize (plugin->delete_state_sync); 694 return GNUNET_OK;
695}
578 696
579 if (NULL != plugin->select_state_one)
580 sqlite3_finalize (plugin->select_state_one);
581 697
582 if (NULL != plugin->select_state_prefix) 698/**
583 sqlite3_finalize (plugin->select_state_prefix); 699 * Roll back current transaction.
700 */
701static int
702transaction_rollback (struct Plugin *plugin)
703{
704 sqlite3_stmt *stmt = plugin->transaction_rollback;
584 705
585 result = sqlite3_close (plugin->dbh); 706 if (SQLITE_DONE != sqlite3_step (stmt))
586 if (result == SQLITE_BUSY)
587 { 707 {
588 LOG (GNUNET_ERROR_TYPE_WARNING, 708 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
589 _("Tried to close sqlite without finalizing all prepared statements.\n")); 709 "sqlite3_step");
590 stmt = sqlite3_next_stmt (plugin->dbh, NULL);
591 while (stmt != NULL)
592 {
593 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
594 "Closing statement %p\n", stmt);
595 result = sqlite3_finalize (stmt);
596 if (result != SQLITE_OK)
597 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite",
598 "Failed to close statement %p: %d\n", stmt, result);
599 stmt = sqlite3_next_stmt (plugin->dbh, NULL);
600 }
601 result = sqlite3_close (plugin->dbh);
602 } 710 }
603 if (SQLITE_OK != result) 711 if (SQLITE_OK != sqlite3_reset (stmt))
604 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close"); 712 {
605 713 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
606 GNUNET_free_non_null (plugin->fn); 714 "sqlite3_reset");
715 return GNUNET_SYSERR;
716 }
717 plugin->transaction = TRANSACTION_NONE;
718 return GNUNET_OK;
607} 719}
608 720
609 721
@@ -617,18 +729,18 @@ channel_key_store (struct Plugin *plugin,
617 sizeof (*channel_key), SQLITE_STATIC)) 729 sizeof (*channel_key), SQLITE_STATIC))
618 { 730 {
619 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 731 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
620 "insert_channel_key (bind)"); 732 "sqlite3_bind");
621 } 733 }
622 else if (SQLITE_DONE != sqlite3_step (stmt)) 734 else if (SQLITE_DONE != sqlite3_step (stmt))
623 { 735 {
624 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 736 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
625 "insert_channel_key (step)"); 737 "sqlite3_step");
626 } 738 }
627 739
628 if (SQLITE_OK != sqlite3_reset (stmt)) 740 if (SQLITE_OK != sqlite3_reset (stmt))
629 { 741 {
630 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 742 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
631 "insert_channel_key (reset)"); 743 "sqlite3_reset");
632 return GNUNET_SYSERR; 744 return GNUNET_SYSERR;
633 } 745 }
634 746
@@ -646,19 +758,19 @@ slave_key_store (struct Plugin *plugin,
646 sizeof (*slave_key), SQLITE_STATIC)) 758 sizeof (*slave_key), SQLITE_STATIC))
647 { 759 {
648 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 760 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
649 "insert_slave_key (bind)"); 761 "sqlite3_bind");
650 } 762 }
651 else if (SQLITE_DONE != sqlite3_step (stmt)) 763 else if (SQLITE_DONE != sqlite3_step (stmt))
652 { 764 {
653 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 765 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
654 "insert_slave_key (step)"); 766 "sqlite3_step");
655 } 767 }
656 768
657 769
658 if (SQLITE_OK != sqlite3_reset (stmt)) 770 if (SQLITE_OK != sqlite3_reset (stmt))
659 { 771 {
660 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 772 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
661 "insert_slave_key (reset)"); 773 "sqlite3_reset");
662 return GNUNET_SYSERR; 774 return GNUNET_SYSERR;
663 } 775 }
664 776
@@ -683,6 +795,11 @@ membership_store (void *cls,
683 uint64_t effective_since, 795 uint64_t effective_since,
684 uint64_t group_generation) 796 uint64_t group_generation)
685{ 797{
798 struct Plugin *plugin = cls;
799 sqlite3_stmt *stmt = plugin->insert_membership;
800
801 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
802
686 if (announced_at > INT64_MAX || 803 if (announced_at > INT64_MAX ||
687 effective_since > INT64_MAX || 804 effective_since > INT64_MAX ||
688 group_generation > INT64_MAX) 805 group_generation > INT64_MAX)
@@ -691,35 +808,32 @@ membership_store (void *cls,
691 return GNUNET_SYSERR; 808 return GNUNET_SYSERR;
692 } 809 }
693 810
694 struct Plugin *plugin = cls; 811 if (GNUNET_OK != channel_key_store (plugin, channel_key)
695 sqlite3_stmt *stmt = plugin->insert_membership; 812 || GNUNET_OK != slave_key_store (plugin, slave_key))
696
697 if (GNUNET_OK != channel_key_store (plugin, channel_key) ||
698 GNUNET_OK != slave_key_store (plugin, slave_key))
699 return GNUNET_SYSERR; 813 return GNUNET_SYSERR;
700 814
701 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 815 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
702 sizeof (*channel_key), SQLITE_STATIC) || 816 sizeof (*channel_key), SQLITE_STATIC)
703 SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key, 817 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
704 sizeof (*slave_key), SQLITE_STATIC) || 818 sizeof (*slave_key), SQLITE_STATIC)
705 SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join) || 819 || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
706 SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at) || 820 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
707 SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since) || 821 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
708 SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation)) 822 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
709 { 823 {
710 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 824 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
711 "insert_membership (bind)"); 825 "sqlite3_bind");
712 } 826 }
713 else if (SQLITE_DONE != sqlite3_step (stmt)) 827 else if (SQLITE_DONE != sqlite3_step (stmt))
714 { 828 {
715 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 829 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
716 "insert_membership (step)"); 830 "sqlite3_step");
717 } 831 }
718 832
719 if (SQLITE_OK != sqlite3_reset (stmt)) 833 if (SQLITE_OK != sqlite3_reset (stmt))
720 { 834 {
721 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 835 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722 "insert_membership (reset)"); 836 "sqlite3_reset");
723 return GNUNET_SYSERR; 837 return GNUNET_SYSERR;
724 } 838 }
725 839
@@ -745,13 +859,13 @@ membership_test (void *cls,
745 int ret = GNUNET_SYSERR; 859 int ret = GNUNET_SYSERR;
746 860
747 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 861 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
748 sizeof (*channel_key), SQLITE_STATIC) || 862 sizeof (*channel_key), SQLITE_STATIC)
749 SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key, 863 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
750 sizeof (*slave_key), SQLITE_STATIC) || 864 sizeof (*slave_key), SQLITE_STATIC)
751 SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id)) 865 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
752 { 866 {
753 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 867 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
754 "select_membership (bind)"); 868 "sqlite3_bind");
755 } 869 }
756 else 870 else
757 { 871 {
@@ -768,7 +882,7 @@ membership_test (void *cls,
768 if (SQLITE_OK != sqlite3_reset (stmt)) 882 if (SQLITE_OK != sqlite3_reset (stmt))
769 { 883 {
770 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 884 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
771 "select_membership (reset)"); 885 "sqlite3_reset");
772 } 886 }
773 887
774 return ret; 888 return ret;
@@ -787,51 +901,60 @@ fragment_store (void *cls,
787 const struct GNUNET_MULTICAST_MessageHeader *msg, 901 const struct GNUNET_MULTICAST_MessageHeader *msg,
788 uint32_t psycstore_flags) 902 uint32_t psycstore_flags)
789{ 903{
790 if (msg->fragment_id > INT64_MAX || 904 struct Plugin *plugin = cls;
791 msg->fragment_offset > INT64_MAX || 905 sqlite3_stmt *stmt = plugin->insert_fragment;
792 msg->message_id > INT64_MAX || 906
793 msg->group_generation > INT64_MAX) 907 GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
908
909 uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
910 uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
911 uint64_t message_id = GNUNET_ntohll (msg->message_id);
912 uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
913
914 if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
915 message_id > INT64_MAX || group_generation > INT64_MAX)
794 { 916 {
917 LOG (GNUNET_ERROR_TYPE_ERROR,
918 "Tried to store fragment with a field > INT64_MAX: "
919 "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
920 message_id, group_generation);
795 GNUNET_break (0); 921 GNUNET_break (0);
796 return GNUNET_SYSERR; 922 return GNUNET_SYSERR;
797 } 923 }
798 924
799 struct Plugin *plugin = cls;
800 sqlite3_stmt *stmt = plugin->insert_fragment;
801
802 if (GNUNET_OK != channel_key_store (plugin, channel_key)) 925 if (GNUNET_OK != channel_key_store (plugin, channel_key))
803 return GNUNET_SYSERR; 926 return GNUNET_SYSERR;
804 927
805 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 928 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
806 sizeof (*channel_key), SQLITE_STATIC) || 929 sizeof (*channel_key), SQLITE_STATIC)
807 SQLITE_OK != sqlite3_bind_int64 (stmt, 2, msg->hop_counter ) || 930 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
808 SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature, 931 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
809 sizeof (msg->signature), SQLITE_STATIC) || 932 sizeof (msg->signature), SQLITE_STATIC)
810 SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose, 933 || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
811 sizeof (msg->purpose), SQLITE_STATIC) || 934 sizeof (msg->purpose), SQLITE_STATIC)
812 SQLITE_OK != sqlite3_bind_int64 (stmt, 5, msg->fragment_id) || 935 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
813 SQLITE_OK != sqlite3_bind_int64 (stmt, 6, msg->fragment_offset) || 936 || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
814 SQLITE_OK != sqlite3_bind_int64 (stmt, 7, msg->message_id) || 937 || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
815 SQLITE_OK != sqlite3_bind_int64 (stmt, 8, msg->group_generation) || 938 || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
816 SQLITE_OK != sqlite3_bind_int64 (stmt, 9, msg->flags) || 939 || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
817 SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags) || 940 || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
818 SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1], 941 || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
819 ntohs (msg->header.size) - sizeof (*msg), 942 ntohs (msg->header.size)
820 SQLITE_STATIC)) 943 - sizeof (*msg), SQLITE_STATIC))
821 { 944 {
822 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 945 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
823 "insert_fragment (bind)"); 946 "sqlite3_bind");
824 } 947 }
825 else if (SQLITE_DONE != sqlite3_step (stmt)) 948 else if (SQLITE_DONE != sqlite3_step (stmt))
826 { 949 {
827 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 950 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
828 "insert_fragment (step)"); 951 "sqlite3_step");
829 } 952 }
830 953
831 if (SQLITE_OK != sqlite3_reset (stmt)) 954 if (SQLITE_OK != sqlite3_reset (stmt))
832 { 955 {
833 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 956 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
834 "insert_fragment (reset)"); 957 "sqlite3_reset");
835 return GNUNET_SYSERR; 958 return GNUNET_SYSERR;
836 } 959 }
837 960
@@ -855,13 +978,13 @@ message_add_flags (void *cls,
855 sqlite3_stmt *stmt = plugin->update_message_flags; 978 sqlite3_stmt *stmt = plugin->update_message_flags;
856 int ret = GNUNET_SYSERR; 979 int ret = GNUNET_SYSERR;
857 980
858 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags) || 981 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
859 SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key, 982 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
860 sizeof (*channel_key), SQLITE_STATIC) || 983 sizeof (*channel_key), SQLITE_STATIC)
861 SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id)) 984 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
862 { 985 {
863 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 986 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
864 "update_message_flags (bind)"); 987 "sqlite3_bind");
865 } 988 }
866 else 989 else
867 { 990 {
@@ -872,14 +995,14 @@ message_add_flags (void *cls,
872 break; 995 break;
873 default: 996 default:
874 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 997 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
875 "update_message_flags (step)"); 998 "sqlite3_step");
876 } 999 }
877 } 1000 }
878 1001
879 if (SQLITE_OK != sqlite3_reset (stmt)) 1002 if (SQLITE_OK != sqlite3_reset (stmt))
880 { 1003 {
881 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1004 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
882 "update_message_flags (reset)"); 1005 "sqlite3_reset");
883 return GNUNET_SYSERR; 1006 return GNUNET_SYSERR;
884 } 1007 }
885 1008
@@ -896,18 +1019,18 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
896 1019
897 msg->header.size = htons (sizeof (*msg) + data_size); 1020 msg->header.size = htons (sizeof (*msg) + data_size);
898 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 1021 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
899 msg->hop_counter = (uint32_t) sqlite3_column_int64 (stmt, 0); 1022 msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
900 memcpy (&msg->signature, 1023 memcpy (&msg->signature,
901 sqlite3_column_blob (stmt, 1), 1024 sqlite3_column_blob (stmt, 1),
902 sqlite3_column_bytes (stmt, 1)); 1025 sqlite3_column_bytes (stmt, 1));
903 memcpy (&msg->purpose, 1026 memcpy (&msg->purpose,
904 sqlite3_column_blob (stmt, 2), 1027 sqlite3_column_blob (stmt, 2),
905 sqlite3_column_bytes (stmt, 2)); 1028 sqlite3_column_bytes (stmt, 2));
906 msg->fragment_id = sqlite3_column_int64 (stmt, 3); 1029 msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
907 msg->fragment_offset = sqlite3_column_int64 (stmt, 4); 1030 msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
908 msg->message_id = sqlite3_column_int64 (stmt, 5); 1031 msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
909 msg->group_generation = sqlite3_column_int64 (stmt, 6); 1032 msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
910 msg->flags = sqlite3_column_int64 (stmt, 7); 1033 msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
911 memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size); 1034 memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
912 1035
913 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8)); 1036 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
@@ -922,8 +1045,7 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
922 */ 1045 */
923static int 1046static int
924fragment_get (void *cls, 1047fragment_get (void *cls,
925 const struct 1048 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
926 GNUNET_CRYPTO_EccPublicSignKey *channel_key,
927 uint64_t fragment_id, 1049 uint64_t fragment_id,
928 GNUNET_PSYCSTORE_FragmentCallback cb, 1050 GNUNET_PSYCSTORE_FragmentCallback cb,
929 void *cb_cls) 1051 void *cb_cls)
@@ -934,11 +1056,11 @@ fragment_get (void *cls,
934 1056
935 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1057 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
936 sizeof (*channel_key), 1058 sizeof (*channel_key),
937 SQLITE_STATIC) || 1059 SQLITE_STATIC)
938 SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id)) 1060 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
939 { 1061 {
940 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1062 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
941 "select_fragment (bind)"); 1063 "sqlite3_bind");
942 } 1064 }
943 else 1065 else
944 { 1066 {
@@ -952,14 +1074,14 @@ fragment_get (void *cls,
952 break; 1074 break;
953 default: 1075 default:
954 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1076 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
955 "select_fragment (step)"); 1077 "sqlite3_step");
956 } 1078 }
957 } 1079 }
958 1080
959 if (SQLITE_OK != sqlite3_reset (stmt)) 1081 if (SQLITE_OK != sqlite3_reset (stmt))
960 { 1082 {
961 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1083 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
962 "select_fragment (reset)"); 1084 "sqlite3_reset");
963 } 1085 }
964 1086
965 return ret; 1087 return ret;
@@ -976,20 +1098,22 @@ static int
976message_get (void *cls, 1098message_get (void *cls,
977 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 1099 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
978 uint64_t message_id, 1100 uint64_t message_id,
1101 uint64_t *returned_fragments,
979 GNUNET_PSYCSTORE_FragmentCallback cb, 1102 GNUNET_PSYCSTORE_FragmentCallback cb,
980 void *cb_cls) 1103 void *cb_cls)
981{ 1104{
982 struct Plugin *plugin = cls; 1105 struct Plugin *plugin = cls;
983 sqlite3_stmt *stmt = plugin->select_message; 1106 sqlite3_stmt *stmt = plugin->select_message;
984 int ret = GNUNET_SYSERR; 1107 int ret = GNUNET_SYSERR;
1108 *returned_fragments = 0;
985 1109
986 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1110 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
987 sizeof (*channel_key), 1111 sizeof (*channel_key),
988 SQLITE_STATIC) || 1112 SQLITE_STATIC)
989 SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)) 1113 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
990 { 1114 {
991 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1115 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
992 "select_message (bind)"); 1116 "sqlite3_bind");
993 } 1117 }
994 else 1118 else
995 { 1119 {
@@ -1005,12 +1129,13 @@ message_get (void *cls,
1005 break; 1129 break;
1006 case SQLITE_ROW: 1130 case SQLITE_ROW:
1007 ret = fragment_row (stmt, cb, cb_cls); 1131 ret = fragment_row (stmt, cb, cb_cls);
1132 (*returned_fragments)++;
1008 if (ret != GNUNET_YES) 1133 if (ret != GNUNET_YES)
1009 sql_ret = SQLITE_DONE; 1134 sql_ret = SQLITE_DONE;
1010 break; 1135 break;
1011 default: 1136 default:
1012 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1013 "select_message (step)"); 1138 "sqlite3_step");
1014 } 1139 }
1015 } 1140 }
1016 while (sql_ret == SQLITE_ROW); 1141 while (sql_ret == SQLITE_ROW);
@@ -1019,7 +1144,7 @@ message_get (void *cls,
1019 if (SQLITE_OK != sqlite3_reset (stmt)) 1144 if (SQLITE_OK != sqlite3_reset (stmt))
1020 { 1145 {
1021 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1146 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1022 "select_message (reset)"); 1147 "sqlite3_reset");
1023 } 1148 }
1024 1149
1025 return ret; 1150 return ret;
@@ -1042,18 +1167,17 @@ message_get_fragment (void *cls,
1042 void *cb_cls) 1167 void *cb_cls)
1043{ 1168{
1044 struct Plugin *plugin = cls; 1169 struct Plugin *plugin = cls;
1045 int ret = GNUNET_SYSERR;
1046
1047 sqlite3_stmt *stmt = plugin->select_message_fragment; 1170 sqlite3_stmt *stmt = plugin->select_message_fragment;
1171 int ret = GNUNET_SYSERR;
1048 1172
1049 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1173 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1050 sizeof (*channel_key), 1174 sizeof (*channel_key),
1051 SQLITE_STATIC) || 1175 SQLITE_STATIC)
1052 SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id) || 1176 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
1053 SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset)) 1177 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
1054 { 1178 {
1055 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1179 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1056 "select_message_fragment (bind)"); 1180 "sqlite3_bind");
1057 } 1181 }
1058 else 1182 else
1059 { 1183 {
@@ -1067,14 +1191,14 @@ message_get_fragment (void *cls,
1067 break; 1191 break;
1068 default: 1192 default:
1069 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1193 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1070 "select_message_fragment (step)"); 1194 "sqlite3_step");
1071 } 1195 }
1072 } 1196 }
1073 1197
1074 if (SQLITE_OK != sqlite3_reset (stmt)) 1198 if (SQLITE_OK != sqlite3_reset (stmt))
1075 { 1199 {
1076 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1200 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1077 "select_message_fragment (reset)"); 1201 "sqlite3_reset");
1078 } 1202 }
1079 1203
1080 return ret; 1204 return ret;
@@ -1103,7 +1227,7 @@ counters_get_master (void *cls,
1103 SQLITE_STATIC)) 1227 SQLITE_STATIC))
1104 { 1228 {
1105 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1229 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1106 "select_counters_master (bind)"); 1230 "sqlite3_bind");
1107 } 1231 }
1108 else 1232 else
1109 { 1233 {
@@ -1120,21 +1244,21 @@ counters_get_master (void *cls,
1120 break; 1244 break;
1121 default: 1245 default:
1122 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1246 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1123 "select_counters_master (step)"); 1247 "sqlite3_step");
1124 } 1248 }
1125 } 1249 }
1126 1250
1127 if (SQLITE_OK != sqlite3_reset (stmt)) 1251 if (SQLITE_OK != sqlite3_reset (stmt))
1128 { 1252 {
1129 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1253 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1130 "select_counters_master (reset)"); 1254 "sqlite3_reset");
1131 } 1255 }
1132 1256
1133 return ret; 1257 return ret;
1134} 1258}
1135 1259
1136/** 1260/**
1137 * Retrieve latest values of counters for a channel slave. 1261 * Retrieve latest values of counters for a channel slave.
1138 * 1262 *
1139 * @see GNUNET_PSYCSTORE_counters_get_slave() 1263 * @see GNUNET_PSYCSTORE_counters_get_slave()
1140 * 1264 *
@@ -1146,17 +1270,15 @@ counters_get_slave (void *cls,
1146 uint64_t *max_state_msg_id) 1270 uint64_t *max_state_msg_id)
1147{ 1271{
1148 struct Plugin *plugin = cls; 1272 struct Plugin *plugin = cls;
1149 sqlite3_stmt *stmt = plugin->select_counters_slave; 1273 sqlite3_stmt *stmt = plugin->select_max_state_message_id;
1150 int ret = GNUNET_SYSERR; 1274 int ret = GNUNET_SYSERR;
1151 1275
1152 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1276 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1153 sizeof (*channel_key), 1277 sizeof (*channel_key),
1154 SQLITE_STATIC) || 1278 SQLITE_STATIC))
1155 SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1156 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1157 { 1279 {
1158 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1280 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1159 "select_counters_slave (bind)"); 1281 "sqlite3_bind");
1160 } 1282 }
1161 else 1283 else
1162 { 1284 {
@@ -1171,62 +1293,58 @@ counters_get_slave (void *cls,
1171 break; 1293 break;
1172 default: 1294 default:
1173 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1295 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1174 "select_counters_slave (step)"); 1296 "sqlite3_step");
1175 } 1297 }
1176 } 1298 }
1177 1299
1178 if (SQLITE_OK != sqlite3_reset (stmt)) 1300 if (SQLITE_OK != sqlite3_reset (stmt))
1179 { 1301 {
1180 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1302 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1181 "select_counters_slave (reset)"); 1303 "sqlite3_reset");
1182 } 1304 }
1183 1305
1184 return ret; 1306 return ret;
1185} 1307}
1186 1308
1309
1187/** 1310/**
1188 * Set a state variable to the given value. 1311 * Set a state variable to the given value.
1189 *
1190 * @see GNUNET_PSYCSTORE_state_modify()
1191 * 1312 *
1192 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 1313 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1193 */ 1314 */
1194static int 1315static int
1195state_set (void *cls, 1316state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
1196 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 1317 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1197 const char *name, const void *value, size_t value_size) 1318 const char *name, const void *value, size_t value_size)
1198{ 1319{
1199 struct Plugin *plugin = cls;
1200 int ret = GNUNET_SYSERR; 1320 int ret = GNUNET_SYSERR;
1201 1321
1202 sqlite3_stmt *stmt = plugin->insert_state_current;
1203
1204 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1322 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1205 sizeof (*channel_key), SQLITE_STATIC) || 1323 sizeof (*channel_key), SQLITE_STATIC)
1206 SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC) || 1324 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
1207 SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size, 1325 || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
1208 SQLITE_STATIC)) 1326 SQLITE_STATIC))
1209 { 1327 {
1210 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1328 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1211 "insert_state_current (bind)"); 1329 "sqlite3_bind");
1212 } 1330 }
1213 else 1331 else
1214 { 1332 {
1215 switch (sqlite3_step (stmt)) 1333 switch (sqlite3_step (stmt))
1216 { 1334 {
1217 case SQLITE_DONE: 1335 case SQLITE_DONE:
1218 ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO; 1336 ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
1219 break; 1337 break;
1220 default: 1338 default:
1221 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1339 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1222 "insert_state_current (step)"); 1340 "sqlite3_step");
1223 } 1341 }
1224 } 1342 }
1225 1343
1226 if (SQLITE_OK != sqlite3_reset (stmt)) 1344 if (SQLITE_OK != sqlite3_reset (stmt))
1227 { 1345 {
1228 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1346 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1229 "insert_state_current (reset)"); 1347 "sqlite3_reset");
1230 return GNUNET_SYSERR; 1348 return GNUNET_SYSERR;
1231 } 1349 }
1232 1350
@@ -1234,42 +1352,208 @@ state_set (void *cls,
1234} 1352}
1235 1353
1236 1354
1237/**
1238 * Reset the state of a channel.
1239 *
1240 * @see GNUNET_PSYCSTORE_state_reset()
1241 *
1242 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1243 */
1244static int 1355static int
1245state_reset (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key) 1356update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
1357 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1358 uint64_t message_id)
1246{ 1359{
1247 struct Plugin *plugin = cls; 1360 if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
1248 sqlite3_stmt *stmt = plugin->delete_state; 1361 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1249 1362 sizeof (*channel_key), SQLITE_STATIC))
1250 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1251 sizeof (*channel_key), SQLITE_STATIC))
1252 { 1363 {
1253 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1364 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1254 "delete_state (bind)"); 1365 "sqlite3_bind");
1255 } 1366 }
1256 else if (SQLITE_DONE != sqlite3_step (stmt)) 1367 else if (SQLITE_DONE != sqlite3_step (stmt))
1257 { 1368 {
1258 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1369 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1259 "delete_state (step)"); 1370 "sqlite3_step");
1260 } 1371 }
1261
1262 if (SQLITE_OK != sqlite3_reset (stmt)) 1372 if (SQLITE_OK != sqlite3_reset (stmt))
1263 { 1373 {
1264 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1374 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1265 "delete_state (reset)"); 1375 "sqlite3_reset");
1266 return GNUNET_SYSERR; 1376 return GNUNET_SYSERR;
1267 } 1377 }
1268
1269 return GNUNET_OK; 1378 return GNUNET_OK;
1270} 1379}
1271 1380
1272 1381
1382/**
1383 * Begin modifying current state.
1384 */
1385static int
1386state_modify_begin (void *cls,
1387 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1388 uint64_t message_id, uint64_t state_delta)
1389{
1390 struct Plugin *plugin = cls;
1391 sqlite3_stmt *stmt = plugin->select_message_state_delta;
1392
1393 if (state_delta > 0)
1394 {
1395 int ret = GNUNET_SYSERR;
1396 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1397 sizeof (*channel_key), SQLITE_STATIC)
1398 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
1399 message_id - state_delta)
1400 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
1401 message_id)
1402 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
1403 message_id - state_delta)
1404 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1405 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1406 {
1407 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1408 "sqlite3_bind");
1409 }
1410 else
1411 {
1412 switch (sqlite3_step (stmt))
1413 {
1414 case SQLITE_DONE:
1415 ret = GNUNET_NO;
1416 break;
1417 case SQLITE_ROW:
1418 ret = GNUNET_OK;
1419 break;
1420 default:
1421 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422 "sqlite3_step");
1423 }
1424 }
1425 if (SQLITE_OK != sqlite3_reset (stmt))
1426 {
1427 ret = GNUNET_SYSERR;
1428 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1429 "sqlite3_reset");
1430 }
1431 if (GNUNET_OK != ret)
1432 return ret;
1433 }
1434
1435 if (TRANSACTION_NONE != plugin->transaction)
1436 if (GNUNET_OK != transaction_rollback (plugin))
1437 return GNUNET_SYSERR;
1438
1439 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1440}
1441
1442
1443/**
1444 * Set the current value of state variable.
1445 *
1446 * @see GNUNET_PSYCSTORE_state_modify()
1447 *
1448 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1449 */
1450static int
1451state_modify_set (void *cls,
1452 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1453 const char *name, const void *value, size_t value_size)
1454{
1455 struct Plugin *plugin = cls;
1456 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1457
1458 return state_set (plugin, plugin->insert_state_current, channel_key,
1459 name, value, value_size);
1460
1461}
1462
1463
1464/**
1465 * End modifying current state.
1466 */
1467static int
1468state_modify_end (void *cls,
1469 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1470 uint64_t message_id)
1471{
1472 struct Plugin *plugin = cls;
1473 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1474
1475 return
1476 GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
1477 && GNUNET_OK == update_message_id (plugin,
1478 plugin->update_max_state_message_id,
1479 channel_key, message_id)
1480 && GNUNET_OK == transaction_commit (plugin)
1481 ? GNUNET_OK : GNUNET_SYSERR;
1482}
1483
1484
1485/**
1486 * Begin state synchronization.
1487 */
1488static int
1489state_sync_begin (void *cls,
1490 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
1491{
1492 struct Plugin *plugin = cls;
1493 return exec_channel (plugin, plugin->delete_state_sync, channel_key);
1494}
1495
1496
1497/**
1498 * Set the current value of state variable.
1499 *
1500 * @see GNUNET_PSYCSTORE_state_modify()
1501 *
1502 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1503 */
1504static int
1505state_sync_set (void *cls,
1506 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1507 const char *name, const void *value, size_t value_size)
1508{
1509 struct Plugin *plugin = cls;
1510 return state_set (cls, plugin->insert_state_sync, channel_key,
1511 name, value, value_size);
1512}
1513
1514
1515/**
1516 * End modifying current state.
1517 */
1518static int
1519state_sync_end (void *cls,
1520 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1521 uint64_t message_id)
1522{
1523 struct Plugin *plugin = cls;
1524 int ret = GNUNET_SYSERR;
1525
1526 GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
1527 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1528 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1529 channel_key)
1530 && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
1531 channel_key)
1532 && GNUNET_OK == update_message_id (plugin,
1533 plugin->update_state_hash_message_id,
1534 channel_key, message_id)
1535 && GNUNET_OK == transaction_commit (plugin)
1536 ? ret = GNUNET_OK
1537 : transaction_rollback (plugin);
1538 return ret;
1539}
1540
1541
1542/**
1543 * Reset the state of a channel.
1544 *
1545 * @see GNUNET_PSYCSTORE_state_reset()
1546 *
1547 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1548 */
1549static int
1550state_reset (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
1551{
1552 struct Plugin *plugin = cls;
1553 return exec_channel (plugin, plugin->delete_state, channel_key);
1554}
1555
1556
1273/** 1557/**
1274 * Update signed values of state variables in the state store. 1558 * Update signed values of state variables in the state store.
1275 * 1559 *
@@ -1282,28 +1566,7 @@ state_update_signed (void *cls,
1282 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key) 1566 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
1283{ 1567{
1284 struct Plugin *plugin = cls; 1568 struct Plugin *plugin = cls;
1285 sqlite3_stmt *stmt = plugin->update_state_signed; 1569 return exec_channel (plugin, plugin->update_state_signed, channel_key);
1286
1287 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1288 sizeof (*channel_key), SQLITE_STATIC))
1289 {
1290 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1291 "update_state_signed (bind)");
1292 }
1293 else if (SQLITE_DONE != sqlite3_step (stmt))
1294 {
1295 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1296 "update_state_signed (step)");
1297 }
1298
1299 if (SQLITE_OK != sqlite3_reset (stmt))
1300 {
1301 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1302 "update_state_signed (reset)");
1303 return GNUNET_SYSERR;
1304 }
1305
1306 return GNUNET_OK;
1307} 1570}
1308 1571
1309 1572
@@ -1325,11 +1588,11 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1325 1588
1326 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1589 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1327 sizeof (*channel_key), 1590 sizeof (*channel_key),
1328 SQLITE_STATIC) || 1591 SQLITE_STATIC)
1329 SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)) 1592 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
1330 { 1593 {
1331 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1594 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1332 "select_state_one (bind)"); 1595 "sqlite3_bind");
1333 } 1596 }
1334 else 1597 else
1335 { 1598 {
@@ -1344,16 +1607,15 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1344 break; 1607 break;
1345 default: 1608 default:
1346 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1609 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1347 "select_state_one (step)"); 1610 "sqlite3_step");
1348 } 1611 }
1349 } 1612 }
1350 1613
1351 if (SQLITE_OK != sqlite3_reset (stmt)) 1614 if (SQLITE_OK != sqlite3_reset (stmt))
1352 { 1615 {
1353 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1616 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1354 "select_state_one (reset)"); 1617 "sqlite3_reset");
1355 } 1618 }
1356
1357 1619
1358 return ret; 1620 return ret;
1359} 1621}
@@ -1362,14 +1624,14 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1362/** 1624/**
1363 * Retrieve all state variables for a channel with the given prefix. 1625 * Retrieve all state variables for a channel with the given prefix.
1364 * 1626 *
1365 * @see GNUNET_PSYCSTORE_state_get_all() 1627 * @see GNUNET_PSYCSTORE_state_get_prefix()
1366 * 1628 *
1367 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 1629 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1368 */ 1630 */
1369static int 1631static int
1370state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 1632state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1371 const char *name, GNUNET_PSYCSTORE_StateCallback cb, 1633 const char *name, GNUNET_PSYCSTORE_StateCallback cb,
1372 void *cb_cls) 1634 void *cb_cls)
1373{ 1635{
1374 struct Plugin *plugin = cls; 1636 struct Plugin *plugin = cls;
1375 int ret = GNUNET_SYSERR; 1637 int ret = GNUNET_SYSERR;
@@ -1381,13 +1643,13 @@ state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_k
1381 memcpy (name_prefix + name_len, "_%", 2); 1643 memcpy (name_prefix + name_len, "_%", 2);
1382 1644
1383 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1645 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1384 sizeof (*channel_key), SQLITE_STATIC) || 1646 sizeof (*channel_key), SQLITE_STATIC)
1385 SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) || 1647 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1386 SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2, 1648 || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
1387 SQLITE_STATIC)) 1649 SQLITE_STATIC))
1388 { 1650 {
1389 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1651 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1390 "select_state_prefix (bind)"); 1652 "sqlite3_bind");
1391 } 1653 }
1392 else 1654 else
1393 { 1655 {
@@ -1410,7 +1672,7 @@ state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_k
1410 break; 1672 break;
1411 default: 1673 default:
1412 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1674 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1413 "select_state_prefix (step)"); 1675 "sqlite3_step");
1414 } 1676 }
1415 } 1677 }
1416 while (sql_ret == SQLITE_ROW); 1678 while (sql_ret == SQLITE_ROW);
@@ -1419,7 +1681,7 @@ state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_k
1419 if (SQLITE_OK != sqlite3_reset (stmt)) 1681 if (SQLITE_OK != sqlite3_reset (stmt))
1420 { 1682 {
1421 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1683 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1422 "select_state_prefix (reset)"); 1684 "sqlite3_reset");
1423 } 1685 }
1424 1686
1425 return ret; 1687 return ret;
@@ -1447,7 +1709,7 @@ state_get_signed (void *cls,
1447 sizeof (*channel_key), SQLITE_STATIC)) 1709 sizeof (*channel_key), SQLITE_STATIC))
1448 { 1710 {
1449 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1711 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1450 "select_state_signed (bind)"); 1712 "sqlite3_bind");
1451 } 1713 }
1452 else 1714 else
1453 { 1715 {
@@ -1470,7 +1732,7 @@ state_get_signed (void *cls,
1470 break; 1732 break;
1471 default: 1733 default:
1472 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1734 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1473 "select_state_signed (step)"); 1735 "sqlite3_step");
1474 } 1736 }
1475 } 1737 }
1476 while (sql_ret == SQLITE_ROW); 1738 while (sql_ret == SQLITE_ROW);
@@ -1479,7 +1741,7 @@ state_get_signed (void *cls,
1479 if (SQLITE_OK != sqlite3_reset (stmt)) 1741 if (SQLITE_OK != sqlite3_reset (stmt))
1480 { 1742 {
1481 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1743 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1482 "select_state_signed (reset)"); 1744 "sqlite3_reset");
1483 } 1745 }
1484 1746
1485 return ret; 1747 return ret;
@@ -1502,7 +1764,7 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
1502 if (NULL != plugin.cfg) 1764 if (NULL != plugin.cfg)
1503 return NULL; /* can only initialize once! */ 1765 return NULL; /* can only initialize once! */
1504 memset (&plugin, 0, sizeof (struct Plugin)); 1766 memset (&plugin, 0, sizeof (struct Plugin));
1505 plugin.cfg = cfg; 1767 plugin.cfg = cfg;
1506 if (GNUNET_OK != database_setup (&plugin)) 1768 if (GNUNET_OK != database_setup (&plugin))
1507 { 1769 {
1508 database_shutdown (&plugin); 1770 database_shutdown (&plugin);
@@ -1519,11 +1781,16 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
1519 api->message_get_fragment = &message_get_fragment; 1781 api->message_get_fragment = &message_get_fragment;
1520 api->counters_get_master = &counters_get_master; 1782 api->counters_get_master = &counters_get_master;
1521 api->counters_get_slave = &counters_get_slave; 1783 api->counters_get_slave = &counters_get_slave;
1522 api->state_set = &state_set; 1784 api->state_modify_begin = &state_modify_begin;
1785 api->state_modify_set = &state_modify_set;
1786 api->state_modify_end = &state_modify_end;
1787 api->state_sync_begin = &state_sync_begin;
1788 api->state_sync_set = &state_sync_set;
1789 api->state_sync_end = &state_sync_end;
1523 api->state_reset = &state_reset; 1790 api->state_reset = &state_reset;
1524 api->state_update_signed = &state_update_signed; 1791 api->state_update_signed = &state_update_signed;
1525 api->state_get = &state_get; 1792 api->state_get = &state_get;
1526 api->state_get_all = &state_get_all; 1793 api->state_get_prefix = &state_get_prefix;
1527 api->state_get_signed = &state_get_signed; 1794 api->state_get_signed = &state_get_signed;
1528 1795
1529 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n")); 1796 LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h
index d2c25e791..3a7a9feed 100644
--- a/src/psycstore/psycstore.h
+++ b/src/psycstore/psycstore.h
@@ -1,22 +1,22 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public Liceidentity as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public Liceidentity for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public Liceidentity 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19*/ 19 */
20 20
21/** 21/**
22 * @file psycstore/psycstore.h 22 * @file psycstore/psycstore.h
@@ -35,7 +35,7 @@ GNUNET_NETWORK_STRUCT_BEGIN
35/** 35/**
36 * Answer from service to client about last operation. 36 * Answer from service to client about last operation.
37 */ 37 */
38struct ResultCodeMessage 38struct OperationResult
39{ 39{
40 /** 40 /**
41 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE 41 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
@@ -43,10 +43,11 @@ struct ResultCodeMessage
43 struct GNUNET_MessageHeader header; 43 struct GNUNET_MessageHeader header;
44 44
45 /** 45 /**
46 * Status code for the last operation, in NBO. 46 * Status code for the operation.
47 * (currently not used).
48 */ 47 */
49 uint32_t result_code GNUNET_PACKED; 48 int64_t result_code GNUNET_PACKED;
49
50 uint32_t op_id GNUNET_PACKED;
50 51
51 /* followed by 0-terminated error message (on error) */ 52 /* followed by 0-terminated error message (on error) */
52 53
@@ -54,30 +55,349 @@ struct ResultCodeMessage
54 55
55 56
56/** 57/**
58 * Answer from service to client about master counters.
59 *
60 * @see GNUNET_PSYCSTORE_counters_get_master()
61 */
62struct MasterCountersResult
63{
64 /**
65 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER
66 */
67 struct GNUNET_MessageHeader header;
68
69 uint64_t fragment_id GNUNET_PACKED;
70
71 uint64_t message_id GNUNET_PACKED;
72
73 uint64_t group_generation GNUNET_PACKED;
74
75 /**
76 * Status code for the operation.
77 */
78 int64_t result_code GNUNET_PACKED;
79
80 uint32_t op_id GNUNET_PACKED;
81
82};
83
84
85/**
86 * Answer from service to client about slave counters.
87 *
88 * @see GNUNET_PSYCSTORE_counters_get_slave()
89 */
90struct SlaveCountersResult
91{
92 /**
93 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE
94 */
95 struct GNUNET_MessageHeader header;
96
97 uint64_t max_known_msg_id GNUNET_PACKED;
98
99 /**
100 * Status code for the operation.
101 */
102 int64_t result_code GNUNET_PACKED;
103
104 uint32_t op_id GNUNET_PACKED;
105
106};
107
108
109/**
110 * Answer from service to client containing a message fragment.
111 */
112struct FragmentResult
113{
114 /**
115 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
116 */
117 struct GNUNET_MessageHeader header;
118
119 uint32_t op_id GNUNET_PACKED;
120
121 uint32_t psycstore_flags GNUNET_PACKED;
122
123 /* followed by GNUNET_MULTICAST_MessageHeader */
124
125};
126
127
128/**
129 * Answer from service to client containing a state variable.
130 */
131struct StateResult
132{
133 /**
134 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
135 */
136 struct GNUNET_MessageHeader header;
137
138 uint32_t op_id GNUNET_PACKED;
139
140 uint16_t name_size GNUNET_PACKED;
141
142 /* followed by name and value */
143};
144
145
146/**
147 * Generic operation request.
148 */
149struct OperationRequest
150{
151 struct GNUNET_MessageHeader header;
152
153 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
154
155 uint32_t op_id GNUNET_PACKED;
156};
157
158
159/**
57 * @see GNUNET_PSYCSTORE_membership_store() 160 * @see GNUNET_PSYCSTORE_membership_store()
58 */ 161 */
59struct MembershipStoreMessage 162struct MembershipStoreRequest
60{ 163{
61 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key; 164 /**
62 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key; 165 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE
166 */
167 struct GNUNET_MessageHeader header;
168
169 /**
170 * Channel's public key.
171 */
172 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
173
174 /**
175 * Slave's public key.
176 */
177 struct GNUNET_CRYPTO_EccPublicSignKey slave_key;
178
63 int did_join; 179 int did_join;
64 uint64_t announced_at; 180 uint64_t announced_at;
65 uint64_t effective_since; 181 uint64_t effective_since;
66 uint64_t group_generation; 182 uint64_t group_generation;
183
184 uint32_t op_id GNUNET_PACKED;
67}; 185};
68 186
69 187
70/** 188/**
71 * @see GNUNET_PSYCSTORE_membership_test() 189 * @see GNUNET_PSYCSTORE_membership_test()
72 */ 190 */
73struct MembershipTestMessage 191struct MembershipTestRequest
192{
193 /**
194 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST
195 */
196 struct GNUNET_MessageHeader header;
197
198 /**
199 * Channel's public key.
200 */
201 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
202
203 /**
204 * Slave's public key.
205 */
206 struct GNUNET_CRYPTO_EccPublicSignKey slave_key;
207
208 uint64_t message_id GNUNET_PACKED;
209
210 uint64_t group_generation GNUNET_PACKED;
211
212 uint32_t op_id GNUNET_PACKED;
213};
214
215
216/**
217 * @see GNUNET_PSYCSTORE_fragment_store()
218 */
219struct FragmentStoreRequest
220{
221 /**
222 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE
223 */
224 struct GNUNET_MessageHeader header;
225
226 /**
227 * Channel's public key.
228 */
229 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
230
231 uint32_t psycstore_flags GNUNET_PACKED;
232
233 uint32_t op_id GNUNET_PACKED;
234};
235
236
237/**
238 * @see GNUNET_PSYCSTORE_fragment_get()
239 */
240struct FragmentGetRequest
241{
242 /**
243 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET
244 */
245 struct GNUNET_MessageHeader header;
246
247 /**
248 * Channel's public key.
249 */
250 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
251
252 uint64_t fragment_id;
253
254 uint32_t op_id GNUNET_PACKED;
255};
256
257
258/**
259 * @see GNUNET_PSYCSTORE_message_get()
260 */
261struct MessageGetRequest
74{ 262{
75 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key; 263 /**
76 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key; 264 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET
265 */
266 struct GNUNET_MessageHeader header;
267
268 /**
269 * Channel's public key.
270 */
271 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
272
77 uint64_t message_id; 273 uint64_t message_id;
78 uint64_t group_generation; 274
275 uint32_t op_id GNUNET_PACKED;
79}; 276};
80 277
278
279/**
280 * @see GNUNET_PSYCSTORE_message_get_fragment()
281 */
282struct MessageGetFragmentRequest
283{
284 /**
285 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_FRAGMENT_GET
286 */
287 struct GNUNET_MessageHeader header;
288
289 /**
290 * Channel's public key.
291 */
292 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
293
294 uint64_t message_id;
295
296 uint64_t fragment_offset;
297
298 uint32_t op_id GNUNET_PACKED;
299};
300
301
302/**
303 * @see GNUNET_PSYCSTORE_state_hash_update()
304 */
305struct StateHashUpdateRequest
306{
307 /**
308 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE
309 */
310 struct GNUNET_MessageHeader header;
311
312 /**
313 * Channel's public key.
314 */
315 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
316
317 struct GNUNET_HashCode hash;
318
319 uint32_t op_id GNUNET_PACKED;
320};
321
322enum StateOpFlags
323{
324 STATE_OP_FIRST = 1 << 0,
325 STATE_OP_LAST = 1 << 1
326};
327
328/**
329 * @see GNUNET_PSYCSTORE_state_modify()
330 */
331struct StateModifyRequest
332{
333 /**
334 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY
335 */
336 struct GNUNET_MessageHeader header;
337
338 /**
339 * Channel's public key.
340 */
341 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
342
343 uint64_t message_id GNUNET_PACKED;
344
345 uint64_t state_delta GNUNET_PACKED;
346
347 uint32_t op_id GNUNET_PACKED;
348
349 /**
350 * Size of name, including NUL terminator.
351 */
352 uint16_t name_size GNUNET_PACKED;
353
354 /**
355 * OR'd StateOpFlags
356 */
357 uint8_t flags;
358
359 /**
360 * enum GNUNET_ENV_Operator
361 */
362 uint8_t oper;
363
364 /* Followed by NUL-terminated name, then the value. */
365};
366
367
368/**
369 * @see GNUNET_PSYCSTORE_state_sync()
370 */
371struct StateSyncRequest
372{
373 /**
374 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC
375 */
376 struct GNUNET_MessageHeader header;
377
378 /**
379 * Channel's public key.
380 */
381 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
382
383 uint64_t message_id GNUNET_PACKED;
384
385 uint32_t op_id GNUNET_PACKED;
386
387 /**
388 * Size of name, including NUL terminator.
389 */
390 uint16_t name_size GNUNET_PACKED;
391
392 /**
393 * OR'd StateOpFlags
394 */
395 uint8_t flags;
396
397 /* Followed by NUL-terminated name, then the value. */
398};
399
400
81GNUNET_NETWORK_STRUCT_END 401GNUNET_NETWORK_STRUCT_END
82 402
83#endif 403#endif
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 5847fc852..5b9bb7e89 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -1,22 +1,22 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public Liceidentity as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public Liceidentity for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public Liceidentity 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19*/ 19 */
20 20
21/** 21/**
22 * @file psycstore/psycstore_api.c 22 * @file psycstore/psycstore_api.c
@@ -30,10 +30,12 @@
30#include "gnunet_constants.h" 30#include "gnunet_constants.h"
31#include "gnunet_protocols.h" 31#include "gnunet_protocols.h"
32#include "gnunet_psycstore_service.h" 32#include "gnunet_psycstore_service.h"
33#include "gnunet_multicast_service.h"
33#include "psycstore.h" 34#include "psycstore.h"
34 35
35#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) 36#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
36 37
38typedef void (*DataCallback) ();
37 39
38/** 40/**
39 * Handle for an operation with the PSYCstore service. 41 * Handle for an operation with the PSYCstore service.
@@ -45,7 +47,7 @@ struct GNUNET_PSYCSTORE_OperationHandle
45 * Main PSYCstore handle. 47 * Main PSYCstore handle.
46 */ 48 */
47 struct GNUNET_PSYCSTORE_Handle *h; 49 struct GNUNET_PSYCSTORE_Handle *h;
48 50
49 /** 51 /**
50 * We keep operations in a DLL. 52 * We keep operations in a DLL.
51 */ 53 */
@@ -57,31 +59,30 @@ struct GNUNET_PSYCSTORE_OperationHandle
57 struct GNUNET_PSYCSTORE_OperationHandle *prev; 59 struct GNUNET_PSYCSTORE_OperationHandle *prev;
58 60
59 /** 61 /**
60 * Message to send to the PSYCstore service.
61 * Allocated at the end of this struct.
62 */
63 const struct GNUNET_MessageHeader *msg;
64
65 /**
66 * Continuation to invoke with the result of an operation. 62 * Continuation to invoke with the result of an operation.
67 */ 63 */
68 GNUNET_PSYCSTORE_ResultCallback res_cb; 64 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 65
70 /** 66 /**
71 * Continuation to invoke with the result of an operation returning a fragment. 67 * Continuation to invoke with the result of an operation returning data.
72 */ 68 */
73 GNUNET_PSYCSTORE_FragmentCallback frag_cb; 69 DataCallback data_cb;
74 70
75 /** 71 /**
76 * Continuation to invoke with the result of an operation returning a state variable. 72 * Closure for the callbacks.
77 */ 73 */
78 GNUNET_PSYCSTORE_StateCallback state_cb; 74 void *cls;
79 75
80 /** 76 /**
81 * Closure for the callbacks. 77 * Operation ID.
82 */ 78 */
83 void *cls; 79 uint32_t op_id;
84 80
81 /**
82 * Message to send to the PSYCstore service.
83 * Allocated at the end of this struct.
84 */
85 const struct GNUNET_MessageHeader *msg;
85}; 86};
86 87
87 88
@@ -101,13 +102,23 @@ struct GNUNET_PSYCSTORE_Handle
101 struct GNUNET_CLIENT_Connection *client; 102 struct GNUNET_CLIENT_Connection *client;
102 103
103 /** 104 /**
104 * Head of active operations. 105 * Head of operations to transmit.
105 */ 106 */
107 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
108
109 /**
110 * Tail of operations to transmit.
111 */
112 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
113
114 /**
115 * Head of active operations waiting for response.
116 */
106 struct GNUNET_PSYCSTORE_OperationHandle *op_head; 117 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
107 118
108 /** 119 /**
109 * Tail of active operations. 120 * Tail of active operations waiting for response.
110 */ 121 */
111 struct GNUNET_PSYCSTORE_OperationHandle *op_tail; 122 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
112 123
113 /** 124 /**
@@ -130,10 +141,47 @@ struct GNUNET_PSYCSTORE_Handle
130 */ 141 */
131 int in_receive; 142 int in_receive;
132 143
144 /**
145 * The last operation id used for a PSYCstore operation.
146 */
147 uint32_t last_op_id_used;
148
133}; 149};
134 150
135 151
136/** 152/**
153 * Get a fresh operation ID to distinguish between PSYCstore requests.
154 *
155 * @param h Handle to the PSYCstore service.
156 * @return next operation id to use
157 */
158static uint32_t
159get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
160{
161 return h->last_op_id_used++;
162}
163
164
165/**
166 * Find operation by ID.
167 *
168 * @return OperationHandle if found, or NULL otherwise.
169 */
170static struct GNUNET_PSYCSTORE_OperationHandle *
171find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
172{
173 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
174 while (NULL != op)
175 {
176 if (op->op_id == op_id)
177 return op;
178 op = op->next;
179 }
180 return NULL;
181}
182
183
184/**
137 * Try again to connect to the PSYCstore service. 185 * Try again to connect to the PSYCstore service.
138 * 186 *
139 * @param cls handle to the PSYCstore service. 187 * @param cls handle to the PSYCstore service.
@@ -175,6 +223,15 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
175 223
176 224
177/** 225/**
226 * Schedule transmission of the next message from our queue.
227 *
228 * @param h PSYCstore handle
229 */
230static void
231transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
232
233
234/**
178 * Type of a function to call when we receive a message 235 * Type of a function to call when we receive a message
179 * from the service. 236 * from the service.
180 * 237 *
@@ -182,12 +239,16 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
182 * @param msg message received, NULL on timeout or fatal error 239 * @param msg message received, NULL on timeout or fatal error
183 */ 240 */
184static void 241static void
185message_handler (void *cls, 242message_handler (void *cls,
186 const struct GNUNET_MessageHeader *msg) 243 const struct GNUNET_MessageHeader *msg)
187{ 244{
188 struct GNUNET_PSYCSTORE_Handle *h = cls; 245 struct GNUNET_PSYCSTORE_Handle *h = cls;
189 struct GNUNET_PSYCSTORE_OperationHandle *op; 246 struct GNUNET_PSYCSTORE_OperationHandle *op;
190 const struct ResultCodeMessage *rcm; 247 const struct OperationResult *opres;
248 const struct MasterCountersResult *mcres;
249 const struct SlaveCountersResult *scres;
250 const struct FragmentResult *fres;
251 const struct StateResult *sres;
191 const char *str; 252 const char *str;
192 uint16_t size; 253 uint16_t size;
193 254
@@ -203,68 +264,240 @@ message_handler (void *cls,
203 switch (ntohs (msg->type)) 264 switch (ntohs (msg->type))
204 { 265 {
205 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: 266 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
206 if (size < sizeof (struct ResultCodeMessage)) 267 if (size < sizeof (struct OperationResult))
207 { 268 {
269 LOG (GNUNET_ERROR_TYPE_ERROR,
270 "Received message of type %d with length %lu bytes. "
271 "Expected >= %lu\n",
272 ntohs (msg->type), size, sizeof (struct OperationResult));
208 GNUNET_break (0); 273 GNUNET_break (0);
209 reschedule_connect (h); 274 reschedule_connect (h);
210 return; 275 return;
211 } 276 }
212 rcm = (const struct ResultCodeMessage *) msg; 277
213 str = (const char *) &rcm[1]; 278 opres = (const struct OperationResult *) msg;
214 if ( (size > sizeof (struct ResultCodeMessage)) && 279 str = (const char *) &opres[1];
215 ('\0' != str[size - sizeof (struct ResultCodeMessage) - 1]) ) 280 if ( (size > sizeof (struct OperationResult)) &&
281 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
216 { 282 {
217 GNUNET_break (0); 283 GNUNET_break (0);
218 reschedule_connect (h); 284 reschedule_connect (h);
219 return; 285 return;
220 } 286 }
221 if (size == sizeof (struct ResultCodeMessage)) 287 if (size == sizeof (struct OperationResult))
222 str = NULL; 288 str = NULL;
223 289
224 op = h->op_head; 290 op = find_op_by_id (h, ntohl (opres->op_id));
225 GNUNET_CONTAINER_DLL_remove (h->op_head, 291 if (NULL == op)
226 h->op_tail, 292 {
227 op); 293 LOG (GNUNET_ERROR_TYPE_ERROR,
228 GNUNET_CLIENT_receive (h->client, &message_handler, h, 294 "Received result of an unkown operation ID: %ld\n",
229 GNUNET_TIME_UNIT_FOREVER_REL); 295 ntohl (opres->op_id));
230 if (NULL != op->res_cb) 296 }
231 op->res_cb (op->cls, rcm->result_code , str); 297 else
232 GNUNET_free (op); 298 {
299 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
300 if (NULL != op->res_cb)
301 {
302 const struct StateModifyRequest *smreq;
303 const struct StateSyncRequest *ssreq;
304 switch (ntohs (op->msg->type))
305 {
306 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
307 smreq = (const struct StateModifyRequest *) op->msg;
308 if (!(smreq->flags & STATE_OP_LAST
309 || GNUNET_OK != ntohl (opres->result_code)))
310 op->res_cb = NULL;
311 break;
312 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
313 ssreq = (const struct StateSyncRequest *) op->msg;
314 if (!(ssreq->flags & STATE_OP_LAST
315 || GNUNET_OK != ntohl (opres->result_code)))
316 op->res_cb = NULL;
317 break;
318 }
319 }
320 if (NULL != op->res_cb)
321 op->res_cb (op->cls, ntohl (opres->result_code), str);
322 GNUNET_free (op);
323 }
324 break;
325
326 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER:
327 if (size != sizeof (struct MasterCountersResult))
328 {
329 LOG (GNUNET_ERROR_TYPE_ERROR,
330 "Received message of type %d with length %lu bytes. "
331 "Expected %lu\n",
332 ntohs (msg->type), size, sizeof (struct MasterCountersResult));
333 GNUNET_break (0);
334 reschedule_connect (h);
335 return;
336 }
337
338 mcres = (const struct MasterCountersResult *) msg;
339
340 op = find_op_by_id (h, ntohl (mcres->op_id));
341 if (NULL == op)
342 {
343 LOG (GNUNET_ERROR_TYPE_ERROR,
344 "Received result of an unkown operation ID: %ld\n",
345 ntohl (mcres->op_id));
346 }
347 else
348 {
349 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
350 if (NULL != op->data_cb)
351 ((GNUNET_PSYCSTORE_MasterCountersCallback)
352 op->data_cb) (op->cls,
353 GNUNET_ntohll (mcres->fragment_id),
354 GNUNET_ntohll (mcres->message_id),
355 GNUNET_ntohll (mcres->group_generation));
356 GNUNET_free (op);
357 }
233 break; 358 break;
359
360 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE:
361 if (size != sizeof (struct SlaveCountersResult))
362 {
363 LOG (GNUNET_ERROR_TYPE_ERROR,
364 "Received message of type %d with length %lu bytes. "
365 "Expected %lu\n",
366 ntohs (msg->type), size, sizeof (struct SlaveCountersResult));
367 GNUNET_break (0);
368 reschedule_connect (h);
369 return;
370 }
371
372 scres = (const struct SlaveCountersResult *) msg;
373
374 op = find_op_by_id (h, ntohl (scres->op_id));
375 if (NULL == op)
376 {
377 LOG (GNUNET_ERROR_TYPE_ERROR,
378 "Received result of an unkown operation ID: %ld\n",
379 ntohl (scres->op_id));
380 }
381 else
382 {
383 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
384 if (NULL != op->data_cb)
385 ((GNUNET_PSYCSTORE_SlaveCountersCallback)
386 op->data_cb) (op->cls, GNUNET_ntohll (scres->max_known_msg_id));
387 GNUNET_free (op);
388 }
389 break;
390
391 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
392 if (size < sizeof (struct FragmentResult))
393 {
394 LOG (GNUNET_ERROR_TYPE_ERROR,
395 "Received message of type %d with length %lu bytes. "
396 "Expected >= %lu\n",
397 ntohs (msg->type), size, sizeof (struct FragmentResult));
398 GNUNET_break (0);
399 reschedule_connect (h);
400 return;
401 }
402
403 fres = (const struct FragmentResult *) msg;
404 struct GNUNET_MULTICAST_MessageHeader *mmsg =
405 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
406 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
407 {
408 LOG (GNUNET_ERROR_TYPE_ERROR,
409 "Received message of type %d with length %lu bytes. "
410 "Expected = %lu\n",
411 ntohs (msg->type), size,
412 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
413 GNUNET_break (0);
414 reschedule_connect (h);
415 return;
416 }
417
418 op = find_op_by_id (h, ntohl (fres->op_id));
419 if (NULL == op)
420 {
421 LOG (GNUNET_ERROR_TYPE_ERROR,
422 "Received result of an unkown operation ID: %ld\n",
423 ntohl (fres->op_id));
424 }
425 else
426 {
427 if (NULL != op->data_cb)
428 ((GNUNET_PSYCSTORE_FragmentCallback)
429 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
430 }
431 break;
432
433 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
434 if (size < sizeof (struct StateResult))
435 {
436 LOG (GNUNET_ERROR_TYPE_ERROR,
437 "Received message of type %d with length %lu bytes. "
438 "Expected >= %lu\n",
439 ntohs (msg->type), size, sizeof (struct StateResult));
440 GNUNET_break (0);
441 reschedule_connect (h);
442 return;
443 }
444
445 sres = (const struct StateResult *) msg;
446 const char *name = (const char *) &sres[1];
447 uint16_t name_size = ntohs (sres->name_size);
448
449 if (name_size <= 2 || '\0' != name[name_size - 1])
450 {
451 LOG (GNUNET_ERROR_TYPE_ERROR,
452 "Received state result message (type %d) with invalid name.\n",
453 ntohs (msg->type), name_size, name);
454 GNUNET_break (0);
455 reschedule_connect (h);
456 return;
457 }
458
459 op = find_op_by_id (h, ntohl (sres->op_id));
460 if (NULL == op)
461 {
462 LOG (GNUNET_ERROR_TYPE_ERROR,
463 "Received result of an unkown operation ID: %ld\n",
464 ntohl (sres->op_id));
465 }
466 else
467 {
468 if (NULL != op->data_cb)
469 ((GNUNET_PSYCSTORE_StateCallback)
470 op->data_cb) (op->cls, name, (void *) &sres[1] + name_size,
471 ntohs (sres->header.size) - sizeof (*sres) - name_size);
472 }
473 break;
474
234 default: 475 default:
235 GNUNET_break (0); 476 GNUNET_break (0);
236 reschedule_connect (h); 477 reschedule_connect (h);
237 return; 478 return;
238 } 479 }
239}
240 480
241 481 GNUNET_CLIENT_receive (h->client, &message_handler, h,
242/** 482 GNUNET_TIME_UNIT_FOREVER_REL);
243 * Schedule transmission of the next message from our queue. 483}
244 *
245 * @param h PSYCstore handle
246 */
247static void
248transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
249 484
250 485
251/** 486/**
252 * Transmit next message to service. 487 * Transmit next message to service.
253 * 488 *
254 * @param cls the 'struct GNUNET_PSYCSTORE_Handle'. 489 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
255 * @param size number of bytes available in buf 490 * @param size Number of bytes available in buf.
256 * @param buf where to copy the message 491 * @param buf Where to copy the message.
257 * @return number of bytes copied to buf 492 * @return Number of bytes copied to buf.
258 */ 493 */
259static size_t 494static size_t
260send_next_message (void *cls, 495send_next_message (void *cls, size_t size, void *buf)
261 size_t size,
262 void *buf)
263{ 496{
264 struct GNUNET_PSYCSTORE_Handle *h = cls; 497 struct GNUNET_PSYCSTORE_Handle *h = cls;
265 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; 498 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
266 size_t ret; 499 size_t ret;
267 500
268 h->th = NULL; 501 h->th = NULL;
269 if (NULL == op) 502 if (NULL == op)
270 return 0; 503 return 0;
@@ -273,26 +506,30 @@ send_next_message (void *cls,
273 { 506 {
274 reschedule_connect (h); 507 reschedule_connect (h);
275 return 0; 508 return 0;
276 } 509 }
277 LOG (GNUNET_ERROR_TYPE_DEBUG, 510 LOG (GNUNET_ERROR_TYPE_DEBUG,
278 "Sending message of type %d to PSYCstore service\n", 511 "Sending message of type %d to PSYCstore service\n",
279 ntohs (op->msg->type)); 512 ntohs (op->msg->type));
280 memcpy (buf, op->msg, ret); 513 memcpy (buf, op->msg, ret);
281 if ( (NULL == op->res_cb) && 514
282 (NULL == op->frag_cb) && 515 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
283 (NULL == op->state_cb)) 516
517 if (NULL == op->res_cb && NULL == op->data_cb)
284 { 518 {
285 GNUNET_CONTAINER_DLL_remove (h->op_head,
286 h->op_tail,
287 op);
288 GNUNET_free (op); 519 GNUNET_free (op);
289 transmit_next (h);
290 } 520 }
521 else
522 {
523 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
524 }
525
526 if (NULL != h->transmit_head)
527 transmit_next (h);
528
291 if (GNUNET_NO == h->in_receive) 529 if (GNUNET_NO == h->in_receive)
292 { 530 {
293 h->in_receive = GNUNET_YES; 531 h->in_receive = GNUNET_YES;
294 GNUNET_CLIENT_receive (h->client, 532 GNUNET_CLIENT_receive (h->client, &message_handler, h,
295 &message_handler, h,
296 GNUNET_TIME_UNIT_FOREVER_REL); 533 GNUNET_TIME_UNIT_FOREVER_REL);
297 } 534 }
298 return ret; 535 return ret;
@@ -302,18 +539,18 @@ send_next_message (void *cls,
302/** 539/**
303 * Schedule transmission of the next message from our queue. 540 * Schedule transmission of the next message from our queue.
304 * 541 *
305 * @param h PSYCstore handle 542 * @param h PSYCstore handle.
306 */ 543 */
307static void 544static void
308transmit_next (struct GNUNET_PSYCSTORE_Handle *h) 545transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
309{ 546{
310 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; 547 if (NULL != h->th || NULL == h->client)
548 return;
311 549
312 GNUNET_assert (NULL == h->th); 550 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
313 if (NULL == op) 551 if (NULL == op)
314 return; 552 return;
315 if (NULL == h->client) 553
316 return;
317 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 554 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
318 ntohs (op->msg->size), 555 ntohs (op->msg->size),
319 GNUNET_TIME_UNIT_FOREVER_REL, 556 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -326,8 +563,8 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
326/** 563/**
327 * Try again to connect to the PSYCstore service. 564 * Try again to connect to the PSYCstore service.
328 * 565 *
329 * @param cls the handle to the PSYCstore service 566 * @param cls Handle to the PSYCstore service.
330 * @param tc scheduler context 567 * @param tc Scheduler context.
331 */ 568 */
332static void 569static void
333reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 570reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -347,8 +584,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
347/** 584/**
348 * Connect to the PSYCstore service. 585 * Connect to the PSYCstore service.
349 * 586 *
350 * @param cfg the configuration to use 587 * @param cfg The configuration to use
351 * @return handle to use 588 * @return Handle to use
352 */ 589 */
353struct GNUNET_PSYCSTORE_Handle * 590struct GNUNET_PSYCSTORE_Handle *
354GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 591GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -366,7 +603,7 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
366/** 603/**
367 * Disconnect from PSYCstore service 604 * Disconnect from PSYCstore service
368 * 605 *
369 * @param h handle to destroy 606 * @param h Handle to destroy
370 */ 607 */
371void 608void
372GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) 609GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
@@ -405,13 +642,10 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
405{ 642{
406 struct GNUNET_PSYCSTORE_Handle *h = op->h; 643 struct GNUNET_PSYCSTORE_Handle *h = op->h;
407 644
408 if ( (h->op_head != op) || 645 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
409 (NULL == h->client) )
410 { 646 {
411 /* request not active, can simply remove */ 647 /* request not active, can simply remove */
412 GNUNET_CONTAINER_DLL_remove (h->op_head, 648 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
413 h->op_tail,
414 op);
415 GNUNET_free (op); 649 GNUNET_free (op);
416 return; 650 return;
417 } 651 }
@@ -420,47 +654,720 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
420 /* request active but not yet with service, can still abort */ 654 /* request active but not yet with service, can still abort */
421 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 655 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
422 h->th = NULL; 656 h->th = NULL;
423 GNUNET_CONTAINER_DLL_remove (h->op_head, 657 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
424 h->op_tail,
425 op);
426 GNUNET_free (op); 658 GNUNET_free (op);
427 transmit_next (h); 659 transmit_next (h);
428 return; 660 return;
429 } 661 }
430 /* request active with service, simply ensure continuations are not called */ 662 /* request active with service, simply ensure continuations are not called */
431 op->res_cb = NULL; 663 op->res_cb = NULL;
432 op->frag_cb = NULL; 664 op->data_cb = NULL;
433 op->state_cb = NULL; 665}
666
667
668/**
669 * Store join/leave events for a PSYC channel in order to be able to answer
670 * membership test queries later.
671 *
672 * @param h Handle for the PSYCstore.
673 * @param channel_key The channel where the event happened.
674 * @param slave_key Public key of joining/leaving slave.
675 * @param did_join #GNUNET_YES on join, #GNUNET_NO on part.
676 * @param announced_at ID of the message that announced the membership change.
677 * @param effective_since Message ID this membership change is in effect since.
678 * For joins it is <= announced_at, for parts it is always 0.
679 * @param group_generation In case of a part, the last group generation the
680 * slave has access to. It has relevance when a larger message have
681 * fragments with different group generations.
682 * @param rcb Callback to call with the result of the storage operation.
683 * @param rcb_cls Closure for the callback.
684 *
685 * @return Operation handle that can be used to cancel the operation.
686 */
687struct GNUNET_PSYCSTORE_OperationHandle *
688GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
689 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
690 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
691 int did_join,
692 uint64_t announced_at,
693 uint64_t effective_since,
694 uint64_t group_generation,
695 GNUNET_PSYCSTORE_ResultCallback rcb,
696 void *rcb_cls)
697{
698 GNUNET_assert (NULL != h);
699 GNUNET_assert (NULL != channel_key);
700 GNUNET_assert (NULL != slave_key);
701 GNUNET_assert (did_join
702 ? effective_since <= announced_at
703 : effective_since == 0);
704
705 struct MembershipStoreRequest *req;
706 struct GNUNET_PSYCSTORE_OperationHandle *op
707 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
708 op->h = h;
709 op->res_cb = rcb;
710 op->cls = rcb_cls;
711
712 req = (struct MembershipStoreRequest *) &op[1];
713 op->msg = (struct GNUNET_MessageHeader *) req;
714 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
715 req->header.size = htons (sizeof (*req));
716 req->channel_key = *channel_key;
717 req->slave_key = *slave_key;
718 req->did_join = htonl (did_join);
719 req->announced_at = GNUNET_htonll (announced_at);
720 req->effective_since = GNUNET_htonll (effective_since);
721 req->group_generation = GNUNET_htonll (group_generation);
722
723 op->op_id = get_next_op_id (h);
724 req->op_id = htonl (op->op_id);
725
726 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
727 transmit_next (h);
728
729 return op;
434} 730}
435 731
436 732
733/**
734 * Test if a member was admitted to the channel at the given message ID.
735 *
736 * This is useful when relaying and replaying messages to check if a particular
737 * slave has access to the message fragment with a given group generation. It
738 * is also used when handling join requests to determine whether the slave is
739 * currently admitted to the channel.
740 *
741 * @param h Handle for the PSYCstore.
742 * @param channel_key The channel we are interested in.
743 * @param slave_key Public key of slave whose membership to check.
744 * @param message_id Message ID for which to do the membership test.
745 * @param group_generation Group generation of the fragment of the message to
746 * test. It has relevance if the message consists of multiple fragments
747 * with different group generations.
748 * @param rcb Callback to call with the test result.
749 * @param rcb_cls Closure for the callback.
750 *
751 * @return Operation handle that can be used to cancel the operation.
752 */
753struct GNUNET_PSYCSTORE_OperationHandle *
754GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
755 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
756 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
757 uint64_t message_id,
758 uint64_t group_generation,
759 GNUNET_PSYCSTORE_ResultCallback rcb,
760 void *rcb_cls)
761{
762 struct MembershipTestRequest *req;
763 struct GNUNET_PSYCSTORE_OperationHandle *op
764 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
765 op->h = h;
766 op->res_cb = rcb;
767 op->cls = rcb_cls;
768
769 req = (struct MembershipTestRequest *) &op[1];
770 op->msg = (struct GNUNET_MessageHeader *) req;
771 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
772 req->header.size = htons (sizeof (*req));
773 req->channel_key = *channel_key;
774 req->slave_key = *slave_key;
775 req->message_id = GNUNET_htonll (message_id);
776 req->group_generation = GNUNET_htonll (group_generation);
777
778 op->op_id = get_next_op_id (h);
779 req->op_id = htonl (op->op_id);
780
781 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
782 transmit_next (h);
783
784 return op;
785}
786
787
788/**
789 * Store a message fragment sent to a channel.
790 *
791 * @param h Handle for the PSYCstore.
792 * @param channel_key The channel the message belongs to.
793 * @param message Message to store.
794 * @param psycstore_flags Flags indicating whether the PSYC message contains
795 * state modifiers.
796 * @param rcb Callback to call with the result of the operation.
797 * @param rcb_cls Closure for the callback.
798 *
799 * @return Handle that can be used to cancel the operation.
800 */
801struct GNUNET_PSYCSTORE_OperationHandle *
802GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
803 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
804 const struct GNUNET_MULTICAST_MessageHeader *message,
805 uint32_t psycstore_flags,
806 GNUNET_PSYCSTORE_ResultCallback rcb,
807 void *rcb_cls)
808{
809 uint16_t size = ntohs (message->header.size);
810 struct FragmentStoreRequest *req;
811 struct GNUNET_PSYCSTORE_OperationHandle *op
812 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
813 op->h = h;
814 op->res_cb = rcb;
815 op->cls = rcb_cls;
816
817 req = (struct FragmentStoreRequest *) &op[1];
818 op->msg = (struct GNUNET_MessageHeader *) req;
819 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
820 req->header.size = htons (sizeof (*req) + size);
821 req->channel_key = *channel_key;
822 req->psycstore_flags = htonl (psycstore_flags);
823 memcpy (&req[1], message, size);
824
825 op->op_id = get_next_op_id (h);
826 req->op_id = htonl (op->op_id);
827
828 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
829 transmit_next (h);
830
831 return op;
832}
833
834
835/**
836 * Retrieve a message fragment by fragment ID.
837 *
838 * @param h Handle for the PSYCstore.
839 * @param channel_key The channel we are interested in.
840 * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment.
841 * @param fcb Callback to call with the retrieved fragments.
842 * @param rcb Callback to call with the result of the operation.
843 * @param cls Closure for the callbacks.
844 *
845 * @return Handle that can be used to cancel the operation.
846 */
847struct GNUNET_PSYCSTORE_OperationHandle *
848GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
849 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
850 uint64_t fragment_id,
851 GNUNET_PSYCSTORE_FragmentCallback fcb,
852 GNUNET_PSYCSTORE_ResultCallback rcb,
853 void *cls)
854{
855 struct FragmentGetRequest *req;
856 struct GNUNET_PSYCSTORE_OperationHandle *op
857 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
858 op->h = h;
859 op->data_cb = (DataCallback) fcb;
860 op->res_cb = rcb;
861 op->cls = cls;
862
863 req = (struct FragmentGetRequest *) &op[1];
864 op->msg = (struct GNUNET_MessageHeader *) req;
865 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
866 req->header.size = htons (sizeof (*req));
867 req->channel_key = *channel_key;
868 req->fragment_id = GNUNET_htonll (fragment_id);
869
870 op->op_id = get_next_op_id (h);
871 req->op_id = htonl (op->op_id);
872
873 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
874 transmit_next (h);
875
876 return op;
877}
878
879
880/**
881 * Retrieve all fragments of a message.
882 *
883 * @param h Handle for the PSYCstore.
884 * @param channel_key The channel we are interested in.
885 * @param message_id Message ID to check. Use 0 to get the latest message.
886 * @param fcb Callback to call with the retrieved fragments.
887 * @param rcb Callback to call with the result of the operation.
888 * @param cls Closure for the callbacks.
889 *
890 * @return Handle that can be used to cancel the operation.
891 */
437struct GNUNET_PSYCSTORE_OperationHandle * 892struct GNUNET_PSYCSTORE_OperationHandle *
438GNUNET_PSYCSTORE_membership_store ( 893GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
439 struct GNUNET_PSYCSTORE_Handle *h, 894 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
440 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 895 uint64_t message_id,
441 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 896 GNUNET_PSYCSTORE_FragmentCallback fcb,
442 int did_join, 897 GNUNET_PSYCSTORE_ResultCallback rcb,
443 uint64_t announced_at, 898 void *cls)
444 uint64_t effective_since,
445 uint64_t group_generation,
446 GNUNET_PSYCSTORE_ResultCallback rcb,
447 void *rcb_cls)
448{ 899{
449 900 struct MessageGetRequest *req;
901 struct GNUNET_PSYCSTORE_OperationHandle *op
902 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
903 op->h = h;
904 op->data_cb = (DataCallback) fcb;
905 op->res_cb = rcb;
906 op->cls = cls;
907
908 req = (struct MessageGetRequest *) &op[1];
909 op->msg = (struct GNUNET_MessageHeader *) req;
910 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
911 req->header.size = htons (sizeof (*req));
912 req->channel_key = *channel_key;
913 req->message_id = GNUNET_htonll (message_id);
914
915 op->op_id = get_next_op_id (h);
916 req->op_id = htonl (op->op_id);
917
918 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
919 transmit_next (h);
920
921 return op;
922}
923
924
925/**
926 * Retrieve a fragment of message specified by its message ID and fragment
927 * offset.
928 *
929 * @param h Handle for the PSYCstore.
930 * @param channel_key The channel we are interested in.
931 * @param message_id Message ID to check. Use 0 to get the latest message.
932 * @param fragment_offset Offset of the fragment to retrieve.
933 * @param fcb Callback to call with the retrieved fragments.
934 * @param rcb Callback to call with the result of the operation.
935 * @param cls Closure for the callbacks.
936 *
937 * @return Handle that can be used to cancel the operation.
938 */
939struct GNUNET_PSYCSTORE_OperationHandle *
940GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
941 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
942 uint64_t message_id,
943 uint64_t fragment_offset,
944 GNUNET_PSYCSTORE_FragmentCallback fcb,
945 GNUNET_PSYCSTORE_ResultCallback rcb,
946 void *cls)
947{
948 struct MessageGetFragmentRequest *req;
949 struct GNUNET_PSYCSTORE_OperationHandle *op
950 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
951 op->h = h;
952 op->data_cb = (DataCallback) fcb;
953 op->res_cb = rcb;
954 op->cls = cls;
955
956 req = (struct MessageGetFragmentRequest *) &op[1];
957 op->msg = (struct GNUNET_MessageHeader *) req;
958 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
959 req->header.size = htons (sizeof (*req));
960 req->channel_key = *channel_key;
961 req->message_id = GNUNET_htonll (message_id);
962 req->fragment_offset = GNUNET_htonll (fragment_offset);
963
964 op->op_id = get_next_op_id (h);
965 req->op_id = htonl (op->op_id);
966
967 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
968 transmit_next (h);
969
970 return op;
971}
972
973
974/**
975 * Retrieve latest values of counters for a channel master.
976 *
977 * The current value of counters are needed when a channel master is restarted,
978 * so that it can continue incrementing the counters from their last value.
979 *
980 * @param h Handle for the PSYCstore.
981 * @param channel_key Public key that identifies the channel.
982 * @param mccb Callback to call with the result.
983 * @param mccb_cls Closure for the callback.
984 *
985 * @return Handle that can be used to cancel the operation.
986 */
987struct GNUNET_PSYCSTORE_OperationHandle *
988GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h,
989 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
990 GNUNET_PSYCSTORE_MasterCountersCallback mccb,
991 void *mccb_cls)
992{
993 struct OperationRequest *req;
994 struct GNUNET_PSYCSTORE_OperationHandle *op
995 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
996 op->h = h;
997 op->data_cb = mccb;
998 op->cls = mccb_cls;
999
1000 req = (struct OperationRequest *) &op[1];
1001 op->msg = (struct GNUNET_MessageHeader *) req;
1002 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER);
1003 req->header.size = htons (sizeof (*req));
1004 req->channel_key = *channel_key;
1005
1006 op->op_id = get_next_op_id (h);
1007 req->op_id = htonl (op->op_id);
1008
1009 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1010 transmit_next (h);
1011
1012 return op;
450} 1013}
451 1014
452 1015
1016
1017/**
1018 * Retrieve latest values of counters for a channel slave.
1019 *
1020 * The current value of counters are needed when a channel slave rejoins
1021 * and starts the state synchronization process.
1022 *
1023 * @param h Handle for the PSYCstore.
1024 * @param channel_key Public key that identifies the channel.
1025 * @param sccb Callback to call with the result.
1026 * @param sccb_cls Closure for the callback.
1027 *
1028 * @return Handle that can be used to cancel the operation.
1029 */
453struct GNUNET_PSYCSTORE_OperationHandle * 1030struct GNUNET_PSYCSTORE_OperationHandle *
454GNUNET_PSYCSTORE_membership_test ( 1031GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h,
455 struct GNUNET_PSYCSTORE_Handle *h, 1032 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
456 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 1033 GNUNET_PSYCSTORE_SlaveCountersCallback sccb,
457 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 1034 void *sccb_cls)
458 uint64_t message_id,
459 uint64_t group_generation,
460 GNUNET_PSYCSTORE_ResultCallback rcb,
461 void *rcb_cls)
462{ 1035{
1036 struct OperationRequest *req;
1037 struct GNUNET_PSYCSTORE_OperationHandle *op
1038 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1039 op->h = h;
1040 op->data_cb = sccb;
1041 op->cls = sccb_cls;
1042
1043 req = (struct OperationRequest *) &op[1];
1044 op->msg = (struct GNUNET_MessageHeader *) req;
1045 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE);
1046 req->header.size = htons (sizeof (*req));
1047 req->channel_key = *channel_key;
1048
1049 op->op_id = get_next_op_id (h);
1050 req->op_id = htonl (op->op_id);
1051
1052 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1053 transmit_next (h);
1054
1055 return op;
1056}
1057
1058
1059/**
1060 * Apply modifiers of a message to the current channel state.
1061 *
1062 * An error is returned if there are missing messages containing state
1063 * operations before the current one.
1064 *
1065 * @param h Handle for the PSYCstore.
1066 * @param channel_key The channel we are interested in.
1067 * @param message_id ID of the message that contains the @a modifiers.
1068 * @param state_delta Value of the _state_delta PSYC header variable of the message.
1069 * @param modifier_count Number of elements in the @a modifiers array.
1070 * @param modifiers List of modifiers to apply.
1071 * @param rcb Callback to call with the result of the operation.
1072 * @param rcb_cls Closure for the callback.
1073 *
1074 * @return Handle that can be used to cancel the operation.
1075 */
1076struct GNUNET_PSYCSTORE_OperationHandle *
1077GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1078 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1079 uint64_t message_id,
1080 uint64_t state_delta,
1081 size_t modifier_count,
1082 const struct GNUNET_ENV_Modifier *modifiers,
1083 GNUNET_PSYCSTORE_ResultCallback rcb,
1084 void *rcb_cls)
1085{
1086 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1087 size_t i;
1088
1089 for (i = 0; i < modifier_count; i++) {
1090 struct StateModifyRequest *req;
1091 uint16_t name_size = strlen (modifiers[i].name) + 1;
1092
1093 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1094 modifiers[i].value_size);
1095 op->h = h;
1096 op->res_cb = rcb;
1097 op->cls = rcb_cls;
1098
1099 req = (struct StateModifyRequest *) &op[1];
1100 op->msg = (struct GNUNET_MessageHeader *) req;
1101 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1102 req->header.size = htons (sizeof (*req) + name_size
1103 + modifiers[i].value_size);
1104 req->channel_key = *channel_key;
1105 req->message_id = GNUNET_htonll (message_id);
1106 req->state_delta = GNUNET_htonll (state_delta);
1107 req->oper = modifiers[i].oper;
1108 req->name_size = htons (name_size);
1109 req->flags
1110 = 0 == i
1111 ? STATE_OP_FIRST
1112 : modifier_count - 1 == i
1113 ? STATE_OP_LAST
1114 : 0;
1115
1116 memcpy (&req[1], modifiers[i].name, name_size);
1117 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1118
1119 op->op_id = get_next_op_id (h);
1120 req->op_id = htonl (op->op_id);
1121
1122 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1123 transmit_next (h);
1124 }
1125 return op;
1126 /* FIXME: only the last operation is returned,
1127 * operation_cancel() should be able to cancel all of them.
1128 */
1129}
1130
1131
1132/**
1133 * Store synchronized state.
1134 *
1135 * @param h Handle for the PSYCstore.
1136 * @param channel_key The channel we are interested in.
1137 * @param message_id ID of the message that contains the state_hash PSYC header variable.
1138 * @param modifier_count Number of elements in the @a modifiers array.
1139 * @param modifiers Full state to store.
1140 * @param rcb Callback to call with the result of the operation.
1141 * @param rcb_cls Closure for the callback.
1142 *
1143 * @return Handle that can be used to cancel the operation.
1144 */
1145struct GNUNET_PSYCSTORE_OperationHandle *
1146GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1147 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1148 uint64_t message_id,
1149 size_t modifier_count,
1150 const struct GNUNET_ENV_Modifier *modifiers,
1151 GNUNET_PSYCSTORE_ResultCallback rcb,
1152 void *rcb_cls)
1153{
1154 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1155 size_t i;
1156
1157 for (i = 0; i < modifier_count; i++) {
1158 struct StateSyncRequest *req;
1159 uint16_t name_size = strlen (modifiers[i].name) + 1;
1160
1161 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1162 modifiers[i].value_size);
1163 op->h = h;
1164 op->res_cb = rcb;
1165 op->cls = rcb_cls;
1166
1167 req = (struct StateSyncRequest *) &op[1];
1168 op->msg = (struct GNUNET_MessageHeader *) req;
1169 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1170 req->header.size = htons (sizeof (*req) + name_size
1171 + modifiers[i].value_size);
1172 req->channel_key = *channel_key;
1173 req->message_id = GNUNET_htonll (message_id);
1174 req->name_size = htons (name_size);
1175 req->flags
1176 = 0 == i
1177 ? STATE_OP_FIRST
1178 : modifier_count - 1 == i
1179 ? STATE_OP_LAST
1180 : 0;
1181
1182 memcpy (&req[1], modifiers[i].name, name_size);
1183 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1184
1185 op->op_id = get_next_op_id (h);
1186 req->op_id = htonl (op->op_id);
1187
1188 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1189 transmit_next (h);
1190 }
1191 return op;
1192}
1193
1194
1195/**
1196 * Reset the state of a channel.
1197 *
1198 * Delete all state variables stored for the given channel.
1199 *
1200 * @param h Handle for the PSYCstore.
1201 * @param channel_key The channel we are interested in.
1202 * @param rcb Callback to call with the result of the operation.
1203 * @param rcb_cls Closure for the callback.
1204 *
1205 * @return Handle that can be used to cancel the operation.
1206 */
1207struct GNUNET_PSYCSTORE_OperationHandle *
1208GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1209 const struct GNUNET_CRYPTO_EccPublicSignKey
1210 *channel_key,
1211 GNUNET_PSYCSTORE_ResultCallback rcb,
1212 void *rcb_cls)
1213{
1214 struct OperationRequest *req;
1215 struct GNUNET_PSYCSTORE_OperationHandle *op
1216 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1217 op->h = h;
1218 op->res_cb = rcb;
1219 op->cls = rcb_cls;
1220
1221 req = (struct OperationRequest *) &op[1];
1222 op->msg = (struct GNUNET_MessageHeader *) req;
1223 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1224 req->header.size = htons (sizeof (*req));
1225 req->channel_key = *channel_key;
1226
1227 op->op_id = get_next_op_id (h);
1228 req->op_id = htonl (op->op_id);
1229
1230 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1231 transmit_next (h);
1232
1233 return op;
1234}
1235
1236
1237
1238/**
1239 * Update signed values of state variables in the state store.
1240 *
1241 * @param h Handle for the PSYCstore.
1242 * @param channel_key The channel we are interested in.
1243 * @param message_id Message ID that contained the state @a hash.
1244 * @param hash Hash of the serialized full state.
1245 * @param rcb Callback to call with the result of the operation.
1246 * @param rcb_cls Closure for the callback.
1247 *
1248 */
1249struct GNUNET_PSYCSTORE_OperationHandle *
1250GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1251 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1252 uint64_t message_id,
1253 const struct GNUNET_HashCode *hash,
1254 GNUNET_PSYCSTORE_ResultCallback rcb,
1255 void *rcb_cls)
1256{
1257 struct StateHashUpdateRequest *req;
1258 struct GNUNET_PSYCSTORE_OperationHandle *op
1259 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1260 op->h = h;
1261 op->res_cb = rcb;
1262 op->cls = rcb_cls;
1263
1264 req = (struct StateHashUpdateRequest *) &op[1];
1265 op->msg = (struct GNUNET_MessageHeader *) req;
1266 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1267 req->header.size = htons (sizeof (*req));
1268 req->channel_key = *channel_key;
1269 req->hash = *hash;
1270
1271 op->op_id = get_next_op_id (h);
1272 req->op_id = htonl (op->op_id);
1273
1274 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1275 transmit_next (h);
1276
1277 return op;
1278}
1279
1280
1281/**
1282 * Retrieve the best matching state variable.
1283 *
1284 * @param h Handle for the PSYCstore.
1285 * @param channel_key The channel we are interested in.
1286 * @param name Name of variable to match, the returned variable might be less specific.
1287 * @param scb Callback to return the matching state variable.
1288 * @param rcb Callback to call with the result of the operation.
1289 * @param cls Closure for the callbacks.
1290 *
1291 * @return Handle that can be used to cancel the operation.
1292 */
1293struct GNUNET_PSYCSTORE_OperationHandle *
1294GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1295 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1296 const char *name,
1297 GNUNET_PSYCSTORE_StateCallback scb,
1298 GNUNET_PSYCSTORE_ResultCallback rcb,
1299 void *cls)
1300{
1301 size_t name_size = strlen (name) + 1;
1302 struct OperationRequest *req;
1303 struct GNUNET_PSYCSTORE_OperationHandle *op
1304 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1305 op->h = h;
1306 op->data_cb = (DataCallback) scb;
1307 op->res_cb = rcb;
1308 op->cls = cls;
1309
1310 req = (struct OperationRequest *) &op[1];
1311 op->msg = (struct GNUNET_MessageHeader *) req;
1312 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1313 req->header.size = htons (sizeof (*req) + name_size);
1314 req->channel_key = *channel_key;
1315 memcpy (&req[1], name, name_size);
1316
1317 op->op_id = get_next_op_id (h);
1318 req->op_id = htonl (op->op_id);
1319
1320 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1321 transmit_next (h);
1322
1323 return op;
1324}
1325
1326
1327
1328/**
1329 * Retrieve all state variables for a channel with the given prefix.
1330 *
1331 * @param h Handle for the PSYCstore.
1332 * @param channel_key The channel we are interested in.
1333 * @param name_prefix Prefix of state variable names to match.
1334 * @param scb Callback to return matching state variables.
1335 * @param rcb Callback to call with the result of the operation.
1336 * @param cls Closure for the callbacks.
1337 *
1338 * @return Handle that can be used to cancel the operation.
1339 */
1340struct GNUNET_PSYCSTORE_OperationHandle *
1341GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1342 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1343 const char *name_prefix,
1344 GNUNET_PSYCSTORE_StateCallback scb,
1345 GNUNET_PSYCSTORE_ResultCallback rcb,
1346 void *cls)
1347{
1348 size_t name_size = strlen (name_prefix) + 1;
1349 struct OperationRequest *req;
1350 struct GNUNET_PSYCSTORE_OperationHandle *op
1351 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1352 op->h = h;
1353 op->data_cb = (DataCallback) scb;
1354 op->res_cb = rcb;
1355 op->cls = cls;
1356
1357 req = (struct OperationRequest *) &op[1];
1358 op->msg = (struct GNUNET_MessageHeader *) req;
1359 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1360 req->header.size = htons (sizeof (*req) + name_size);
1361 req->channel_key = *channel_key;
1362 memcpy (&req[1], name_prefix, name_size);
1363
1364 op->op_id = get_next_op_id (h);
1365 req->op_id = htonl (op->op_id);
1366
1367 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1368 transmit_next (h);
463 1369
1370 return op;
464} 1371}
465 1372
466/* end of psycstore_api.c */ 1373/* end of psycstore_api.c */
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index b584ed51d..a59b18939 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -1,25 +1,26 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public License 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19*/ 19 */
20
20/* 21/*
21 * @file psycstore/test_plugin_psycstore.c 22 * @file psycstore/test_plugin_psycstore.c
22 * @brief Test for the psycstore plugins 23 * @brief Test for the PSYCstore plugins.
23 * @author Gabor X Toth 24 * @author Gabor X Toth
24 * @author Christian Grothoff 25 * @author Christian Grothoff
25 */ 26 */
@@ -39,9 +40,17 @@
39 40
40#define C2ARG(str) str, (sizeof (str) - 1) 41#define C2ARG(str) str, (sizeof (str) - 1)
41 42
42#define LOG(kind,...) GNUNET_log_from (kind, "test-plugin-psycstore", __VA_ARGS__) 43#define LOG(kind,...) \
44 GNUNET_log_from (kind, "test-plugin-psycstore", __VA_ARGS__)
43 45
44#define ASSERT(x) do { if (! (x)) { printf("Error at %s:%d\n", __FILE__, __LINE__); goto FAILURE;} } while (0) 46#define ASSERT(x) \
47 do { \
48 if (! (x)) \
49 { \
50 LOG (GNUNET_ERROR_TYPE_ERROR, "Error at %s:%d\n", __FILE__, __LINE__); \
51 goto FAILURE; \
52 } \
53 } while (0)
45 54
46static int ok; 55static int ok;
47 56
@@ -118,13 +127,13 @@ fragment_cb (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg2,
118 && 0 == memcmp (msg1, msg2, ntohs (msg1->header.size))) 127 && 0 == memcmp (msg1, msg2, ntohs (msg1->header.size)))
119 { 128 {
120 LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment %llu matches\n", 129 LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment %llu matches\n",
121 msg1->fragment_id); 130 GNUNET_ntohll (msg1->fragment_id));
122 ret = GNUNET_YES; 131 ret = GNUNET_YES;
123 } 132 }
124 else 133 else
125 { 134 {
126 LOG (GNUNET_ERROR_TYPE_ERROR, "Fragment %llu differs\n", 135 LOG (GNUNET_ERROR_TYPE_ERROR, "Fragment %llu differs\n",
127 msg1->fragment_id); 136 GNUNET_ntohll (msg1->fragment_id));
128 ret = GNUNET_SYSERR; 137 ret = GNUNET_SYSERR;
129 } 138 }
130 139
@@ -135,6 +144,7 @@ fragment_cb (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg2,
135 144
136struct StateClosure { 145struct StateClosure {
137 size_t n; 146 size_t n;
147 char *name[16];
138 void *value[16]; 148 void *value[16];
139 size_t value_size[16]; 149 size_t value_size[16];
140}; 150};
@@ -146,6 +156,8 @@ state_cb (void *cls, const char *name, const void *value, size_t value_size)
146 const void *val = scls->value[scls->n]; 156 const void *val = scls->value[scls->n];
147 size_t val_size = scls->value_size[scls->n++]; 157 size_t val_size = scls->value_size[scls->n++];
148 158
159 /* FIXME: check name */
160
149 return value_size == val_size && 0 == memcmp (value, val, val_size) 161 return value_size == val_size && 0 == memcmp (value, val, val_size)
150 ? GNUNET_YES 162 ? GNUNET_YES
151 : GNUNET_SYSERR; 163 : GNUNET_SYSERR;
@@ -157,19 +169,19 @@ run (void *cls, char *const *args, const char *cfgfile,
157 const struct GNUNET_CONFIGURATION_Handle *cfg) 169 const struct GNUNET_CONFIGURATION_Handle *cfg)
158{ 170{
159 struct GNUNET_PSYCSTORE_PluginFunctions *db; 171 struct GNUNET_PSYCSTORE_PluginFunctions *db;
160 172
161 ok = 1; 173 ok = 1;
162 db = load_plugin (cfg); 174 db = load_plugin (cfg);
163 if (NULL == db) 175 if (NULL == db)
164 { 176 {
165 FPRINTF (stderr, 177 FPRINTF (stderr,
166 "%s", 178 "%s",
167 "Failed to initialize PSYCstore. " 179 "Failed to initialize PSYCstore. "
168 "Database likely not setup, skipping test.\n"); 180 "Database likely not setup, skipping test.\n");
169 return; 181 return;
170 } 182 }
171 183
172 /* Membership */ 184 /* Store & test membership */
173 185
174 channel_key = GNUNET_CRYPTO_ecc_key_create (); 186 channel_key = GNUNET_CRYPTO_ecc_key_create ();
175 slave_key = GNUNET_CRYPTO_ecc_key_create (); 187 slave_key = GNUNET_CRYPTO_ecc_key_create ();
@@ -177,21 +189,21 @@ run (void *cls, char *const *args, const char *cfgfile,
177 GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key, &channel_pub_key); 189 GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key, &channel_pub_key);
178 GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key); 190 GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
179 191
180 ASSERT (GNUNET_OK == db->membership_store(db->cls, &channel_pub_key, 192 ASSERT (GNUNET_OK == db->membership_store (db->cls, &channel_pub_key,
181 &slave_pub_key, GNUNET_YES, 193 &slave_pub_key, GNUNET_YES,
182 4, 2, 1)); 194 4, 2, 1));
183 195
184 ASSERT (GNUNET_YES == db->membership_test(db->cls, &channel_pub_key, 196 ASSERT (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key,
185 &slave_pub_key, 4)); 197 &slave_pub_key, 4));
186 198
187 ASSERT (GNUNET_YES == db->membership_test(db->cls, &channel_pub_key, 199 ASSERT (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key,
188 &slave_pub_key, 2)); 200 &slave_pub_key, 2));
189 201
190 ASSERT (GNUNET_NO == db->membership_test(db->cls, &channel_pub_key, 202 ASSERT (GNUNET_NO == db->membership_test (db->cls, &channel_pub_key,
191 &slave_pub_key, 1)); 203 &slave_pub_key, 1));
192 204
193 205
194 /* Messages */ 206 /* Store & get messages */
195 207
196 struct GNUNET_MULTICAST_MessageHeader *msg 208 struct GNUNET_MULTICAST_MessageHeader *msg
197 = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key)); 209 = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
@@ -200,12 +212,12 @@ run (void *cls, char *const *args, const char *cfgfile,
200 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 212 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
201 msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key)); 213 msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
202 214
203 msg->hop_counter = 9; 215 msg->hop_counter = htonl (9);
204 msg->fragment_id = INT64_MAX - 1; 216 msg->fragment_id = GNUNET_htonll (INT64_MAX - 1);
205 msg->fragment_offset = 0; 217 msg->fragment_offset = GNUNET_htonll (0);
206 msg->message_id = INT64_MAX - 2; 218 msg->message_id = GNUNET_htonll (INT64_MAX - 10);
207 msg->group_generation = INT64_MAX - 3; 219 msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
208 msg->flags = GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT; 220 msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
209 221
210 memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key)); 222 memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
211 223
@@ -225,27 +237,27 @@ run (void *cls, char *const *args, const char *cfgfile,
225 fcls.flags[0])); 237 fcls.flags[0]));
226 238
227 ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, 239 ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
228 msg->fragment_id, 240 GNUNET_ntohll (msg->fragment_id),
229 fragment_cb, &fcls)); 241 fragment_cb, &fcls));
230 ASSERT (fcls.n == 1); 242 ASSERT (fcls.n == 1);
231 243
232 fcls.n = 0; 244 fcls.n = 0;
233 245
234 ASSERT (GNUNET_OK == db->message_get_fragment (db->cls, &channel_pub_key, 246 ASSERT (GNUNET_OK == db->message_get_fragment (db->cls, &channel_pub_key,
235 msg->message_id, 247 GNUNET_ntohll (msg->message_id),
236 msg->fragment_offset, 248 GNUNET_ntohll (msg->fragment_offset),
237 fragment_cb, &fcls)); 249 fragment_cb, &fcls));
238 ASSERT (fcls.n == 1); 250 ASSERT (fcls.n == 1);
239 251
240 ASSERT (GNUNET_OK == db->message_add_flags ( 252 ASSERT (GNUNET_OK == db->message_add_flags (
241 db->cls, &channel_pub_key, msg->message_id, 253 db->cls, &channel_pub_key, GNUNET_ntohll (msg->message_id),
242 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED)); 254 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED));
243 255
244 fcls.n = 0; 256 fcls.n = 0;
245 fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED; 257 fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
246 258
247 ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, 259 ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
248 msg->fragment_id, 260 GNUNET_ntohll (msg->fragment_id),
249 fragment_cb, &fcls)); 261 fragment_cb, &fcls));
250 ASSERT (fcls.n == 1); 262 ASSERT (fcls.n == 1);
251 263
@@ -254,8 +266,8 @@ run (void *cls, char *const *args, const char *cfgfile,
254 266
255 memcpy (msg1, msg, sizeof (*msg1) + sizeof (channel_pub_key)); 267 memcpy (msg1, msg, sizeof (*msg1) + sizeof (channel_pub_key));
256 268
257 msg1->fragment_id++; 269 msg1->fragment_id = GNUNET_htonll (INT64_MAX);
258 msg1->fragment_offset += 32768; 270 msg1->fragment_offset = GNUNET_htonll (32768);
259 271
260 fcls.n = 0; 272 fcls.n = 0;
261 fcls.msg[1] = msg1; 273 fcls.msg[1] = msg1;
@@ -264,33 +276,38 @@ run (void *cls, char *const *args, const char *cfgfile,
264 ASSERT (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1, 276 ASSERT (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1,
265 fcls.flags[1])); 277 fcls.flags[1]));
266 278
279 uint64_t retfrags = 0;
267 ASSERT (GNUNET_OK == db->message_get (db->cls, &channel_pub_key, 280 ASSERT (GNUNET_OK == db->message_get (db->cls, &channel_pub_key,
268 msg->message_id, 281 GNUNET_ntohll (msg->message_id),
269 fragment_cb, &fcls)); 282 &retfrags, fragment_cb, &fcls));
270 ASSERT (fcls.n == 2); 283 ASSERT (fcls.n == 2 && retfrags == 2);
271 284
272 uint64_t max_state_msg_id = 0; 285 /* Master counters */
273 ASSERT (GNUNET_OK == db->counters_get_slave (db->cls, &channel_pub_key,
274 &max_state_msg_id)
275 && max_state_msg_id == msg->message_id);
276 286
277 uint64_t fragment_id = 0, message_id = 0, group_generation = 0; 287 uint64_t fragment_id = 0, message_id = 0, group_generation = 0;
278 ASSERT (GNUNET_OK == db->counters_get_master (db->cls, &channel_pub_key, 288 ASSERT (GNUNET_OK == db->counters_get_master (db->cls, &channel_pub_key,
279 &fragment_id, &message_id, 289 &fragment_id, &message_id,
280 &group_generation) 290 &group_generation)
281 && fragment_id == msg1->fragment_id 291 && fragment_id == GNUNET_ntohll (msg1->fragment_id)
282 && message_id == msg1->message_id 292 && message_id == GNUNET_ntohll (msg1->message_id)
283 && group_generation == msg1->group_generation); 293 && group_generation == GNUNET_ntohll (msg1->group_generation));
294
295
296 /* Modify state */
284 297
298 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
299 ASSERT (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
300 message_id, 1));
285 301
286 /* State */ 302 ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, "_foo",
303 C2ARG("one two three")));
287 304
288 ASSERT (GNUNET_OK == db->state_set (db->cls, &channel_pub_key, "_foo", 305 ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
289 C2ARG("one two three"))); 306 "_foo_bar", slave_key,
307 sizeof (*slave_key)));
290 308
291 ASSERT (GNUNET_OK == db->state_set (db->cls, &channel_pub_key, "_foo_bar", 309 ASSERT (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
292 slave_key, 310 message_id));
293 sizeof (*slave_key)));
294 311
295 struct StateClosure scls = { 0 }; 312 struct StateClosure scls = { 0 };
296 scls.n = 0; 313 scls.n = 0;
@@ -305,8 +322,8 @@ run (void *cls, char *const *args, const char *cfgfile,
305 scls.value[1] = slave_key; 322 scls.value[1] = slave_key;
306 scls.value_size[1] = sizeof (*slave_key); 323 scls.value_size[1] = sizeof (*slave_key);
307 324
308 ASSERT (GNUNET_OK == db->state_get_all (db->cls, &channel_pub_key, "_foo", 325 ASSERT (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key, "_foo",
309 state_cb, &scls)); 326 state_cb, &scls));
310 ASSERT (scls.n == 2); 327 ASSERT (scls.n == 2);
311 328
312 scls.n = 0; 329 scls.n = 0;
@@ -321,8 +338,66 @@ run (void *cls, char *const *args, const char *cfgfile,
321 state_cb, &scls)); 338 state_cb, &scls));
322 ASSERT (scls.n == 2); 339 ASSERT (scls.n == 2);
323 340
341 /* Slave counters */
342
343 uint64_t max_state_msg_id = 0;
344 ASSERT (GNUNET_OK == db->counters_get_slave (db->cls, &channel_pub_key,
345 &max_state_msg_id)
346 && max_state_msg_id == message_id);
347
348 /* State sync */
349
350 scls.n = 0;
351 scls.value[0] = channel_key;
352 scls.value_size[0] = sizeof (*channel_key);
353 scls.value[1] = "three two one";
354 scls.value_size[1] = strlen ("three two one");
355
356 ASSERT (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key));
357
358 ASSERT (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
359 "_sync_bar",
360 scls.value[0], scls.value_size[0]));
361
362 ASSERT (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
363 "_sync_foo",
364 scls.value[1], scls.value_size[1]));
365
366 ASSERT (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key, INT64_MAX - 5));
367
368 ASSERT (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key, "_foo",
369 state_cb, &scls));
370 ASSERT (scls.n == 0);
371
372 ASSERT (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key, "_sync",
373 state_cb, &scls));
374 ASSERT (scls.n == 2);
375
376 scls.n = 0;
377 ASSERT (GNUNET_OK == db->state_get_signed (db->cls, &channel_pub_key,
378 state_cb, &scls));
379 ASSERT (scls.n == 2);
380
381 /* Modify state after sync */
382
383 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
384 ASSERT (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
385 message_id, 3));
386
387 ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, "_sync_foo",
388 C2ARG("five six seven")));
389
390 ASSERT (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
391 message_id));
392
393 /* Reset state */
394
395 scls.n = 0;
396 ASSERT (GNUNET_OK == db->state_reset (db->cls, &channel_pub_key));
397 ASSERT (scls.n == 0);
398
324 ok = 0; 399 ok = 0;
325 400
326FAILURE: 401FAILURE:
327 402
328 if (NULL != channel_key) 403 if (NULL != channel_key)
@@ -353,7 +428,6 @@ main (int argc, char *argv[])
353 struct GNUNET_GETOPT_CommandLineOption options[] = { 428 struct GNUNET_GETOPT_CommandLineOption options[] = {
354 GNUNET_GETOPT_OPTION_END 429 GNUNET_GETOPT_OPTION_END
355 }; 430 };
356
357 GNUNET_DISK_directory_remove ("/tmp/gnunet-test-plugin-psycstore-sqlite"); 431 GNUNET_DISK_directory_remove ("/tmp/gnunet-test-plugin-psycstore-sqlite");
358 GNUNET_log_setup ("test-plugin-psycstore", LOG_LEVEL, NULL); 432 GNUNET_log_setup ("test-plugin-psycstore", LOG_LEVEL, NULL);
359 plugin_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); 433 plugin_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index db4cfb0eb..b238d98c5 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -1,26 +1,26 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public License 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19*/ 19 */
20 20
21/** 21/**
22 * @file psycstore/test_psycstore.c 22 * @file psycstore/test_psycstore.c
23 * @brief Testcase for the PSYCstore service 23 * @brief Test for the PSYCstore service.
24 * @author Gabor X Toth 24 * @author Gabor X Toth
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
@@ -31,9 +31,13 @@
31#include "gnunet_psycstore_service.h" 31#include "gnunet_psycstore_service.h"
32#include "gnunet_testing_lib.h" 32#include "gnunet_testing_lib.h"
33 33
34#define ASSERT(x) do { if (! (x)) { printf ("Error at %s:%d\n", __FILE__, __LINE__); cleanup (); return; } } while (0)
35#define ASSERRT(x) do { if (! (x)) { printf ("Error at %s:%d\n", __FILE__, __LINE__); cleanup (); return GNUNET_SYSERR; } } while (0)
34 36
35#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 37#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
36 38
39#define DEBUG_SERVICE 1
40
37 41
38/** 42/**
39 * Return value from 'main'. 43 * Return value from 'main'.
@@ -52,8 +56,8 @@ static struct GNUNET_PSYCSTORE_OperationHandle *op;
52 56
53/** 57/**
54 * Handle for task for timeout termination. 58 * Handle for task for timeout termination.
55 */ 59 */
56static GNUNET_SCHEDULER_TaskIdentifier endbadly_task; 60static GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
57 61
58static struct GNUNET_CRYPTO_EccPrivateKey *channel_key; 62static struct GNUNET_CRYPTO_EccPrivateKey *channel_key;
59static struct GNUNET_CRYPTO_EccPrivateKey *slave_key; 63static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
@@ -61,6 +65,23 @@ static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
61static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; 65static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
62static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; 66static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
63 67
68static struct FragmentClosure
69{
70 uint8_t n;
71 uint8_t n_expected;
72 uint64_t flags[16];
73 struct GNUNET_MULTICAST_MessageHeader *msg[16];
74} fcls;
75
76struct StateClosure {
77 size_t n;
78 char *name[16];
79 void *value[16];
80 size_t value_size[16];
81} scls;
82
83static struct GNUNET_ENV_Modifier modifiers[16];
84
64/** 85/**
65 * Clean up all resources used. 86 * Clean up all resources used.
66 */ 87 */
@@ -92,21 +113,21 @@ cleanup ()
92 113
93 114
94/** 115/**
95 * Termiante the testcase (failure). 116 * Terminate the testcase (failure).
96 * 117 *
97 * @param cls NULL 118 * @param cls NULL
98 * @param tc scheduler context 119 * @param tc scheduler context
99 */ 120 */
100static void 121static void
101endbadly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 122end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
102{ 123{
103 cleanup ();
104 res = 1; 124 res = 1;
125 cleanup ();
105} 126}
106 127
107 128
108/** 129/**
109 * Termiante the testcase (success). 130 * Terminate the testcase (success).
110 * 131 *
111 * @param cls NULL 132 * @param cls NULL
112 * @param tc scheduler context 133 * @param tc scheduler context
@@ -114,32 +135,361 @@ endbadly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
114static void 135static void
115end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 136end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
116{ 137{
117 cleanup ();
118 res = 0; 138 res = 0;
139 cleanup ();
119} 140}
120 141
121 142
122/** 143/**
123 * Finish the testcase (successfully). 144 * Finish the testcase (successfully).
124 */ 145 */
125static void 146static void
126end () 147end ()
127{ 148{
128 if (endbadly_task != GNUNET_SCHEDULER_NO_TASK) 149 if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
129 { 150 {
130 GNUNET_SCHEDULER_cancel (endbadly_task); 151 GNUNET_SCHEDULER_cancel (end_badly_task);
131 endbadly_task = GNUNET_SCHEDULER_NO_TASK; 152 end_badly_task = GNUNET_SCHEDULER_NO_TASK;
132 } 153 }
133 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS, 154 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
134 &end_normally, NULL); 155 &end_normally, NULL);
135} 156}
136 157
158
159void
160state_reset_result (void *cls, int64_t result, const char *err_msg)
161{
162 op = NULL;
163 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_reset_result:\t%d\n", result);
164 ASSERT (GNUNET_OK == result);
165
166 op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key,
167 &state_reset_result, cls);
168 GNUNET_PSYCSTORE_operation_cancel (op);
169 op = NULL;
170 end ();
171}
172
173
174static int
175state_result (void *cls, const char *name, const void *value, size_t value_size)
176{
177 struct StateClosure *scls = cls;
178 const char *nam = scls->name[scls->n];
179 const void *val = scls->value[scls->n];
180 size_t val_size = scls->value_size[scls->n++];
181
182 if (value_size == val_size
183 && 0 == memcmp (value, val, val_size)
184 && 0 == strcmp (name, nam))
185 {
186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " variable %s matches\n", name);
187 return GNUNET_YES;
188 }
189 else
190 {
191 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
192 " variable %s differs\nReceived: %.*s\nExpected: %.*s\n",
193 name, value_size, value, val_size, val);
194 ASSERRT (0);
195 return GNUNET_SYSERR;
196 }
197}
198
199
200void
201state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
202{
203 struct StateClosure *scls = cls;
204 op = NULL;
205 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_prefix_result:\t%d\n", result);
206 ASSERT (GNUNET_OK == result && 2 == scls->n);
207
208 op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key,
209 &state_reset_result, cls);
210}
211
212
213void
214state_get_result (void *cls, int64_t result, const char *err_msg)
215{
216 op = NULL;
217 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%d\n", result);
218 ASSERT (GNUNET_OK == result);
219
220 scls.n = 0;
221
222 scls.name[0] = "_sync_bar";
223 scls.value[0] = "ten eleven twelve";
224 scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
225
226 scls.name[1] = "_sync_foo";
227 scls.value[1] = "one two three";
228 scls.value_size[1] = sizeof ("one two three") - 1;
229
230 op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
231 &state_result,
232 &state_get_prefix_result, &scls);
233}
234
235
236void
237counters_slave_result (void *cls, uint64_t max_state_msg_id)
238{
239 struct FragmentClosure *fcls = cls;
240 int result = 0;
241 op = NULL;
242
243 if (max_state_msg_id == GNUNET_ntohll (fcls->msg[0]->message_id))
244 result = 1;
245
246 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "counters_get_slave:\t%d\n", result);
247 ASSERT (result == 1);
248
249 scls.n = 0;
250 scls.name[0] = "_bar";
251 scls.value[0] = "four five six";
252 scls.value_size[0] = sizeof ("four five six") - 1;
253
254 op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz",
255 &state_result, &state_get_result, &scls);
256}
257
258
259void
260state_modify_result (void *cls, int64_t result, const char *err_msg)
261{
262 op = NULL;
263 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%d\n", result);
264 ASSERT (GNUNET_OK == result);
265
266 op = GNUNET_PSYCSTORE_counters_get_slave (h, &channel_pub_key,
267 &counters_slave_result, cls);
268}
269
270
271void
272state_sync_result (void *cls, int64_t result, const char *err_msg)
273{
274 struct FragmentClosure *fcls = cls;
275 op = NULL;
276 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
277 ASSERT (GNUNET_OK == result);
278
279 modifiers[0] = (struct GNUNET_ENV_Modifier) {
280 .oper = '=',
281 .name = "_sync_foo",
282 .value = "one two three",
283 .value_size = sizeof ("one two three") - 1
284 };
285 modifiers[1] = (struct GNUNET_ENV_Modifier) {
286 .oper = '=',
287 .name = "_bar",
288 .value = "four five six",
289 .value_size = sizeof ("four five six") - 1
290 };
291
292 op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
293 GNUNET_ntohll (fcls->msg[0]->message_id), 0,
294 2, modifiers, state_modify_result, fcls);
295}
296
297
298void
299counters_master_result (void *cls, uint64_t fragment_id, uint64_t message_id,
300 uint64_t group_generation)
301{
302 struct FragmentClosure *fcls = cls;
303 int result = 0;
304 op = NULL;
305
306 if (fragment_id == GNUNET_ntohll (fcls->msg[2]->fragment_id) &&
307 message_id == GNUNET_ntohll (fcls->msg[2]->message_id) &&
308 group_generation == GNUNET_ntohll (fcls->msg[2]->group_generation))
309 result = 1;
310
311 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "counters_get_master:\t%d\n", result);
312 ASSERT (result == 1);
313
314 modifiers[0] = (struct GNUNET_ENV_Modifier) {
315 .oper = '=',
316 .name = "_sync_foo",
317 .value = "three two one",
318 .value_size = sizeof ("three two one") - 1
319 };
320 modifiers[1] = (struct GNUNET_ENV_Modifier) {
321 .oper = '=',
322 .name = "_sync_bar",
323 .value = "ten eleven twelve",
324 .value_size = sizeof ("ten eleven twelve") - 1
325 };
326
327 op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
328 GNUNET_ntohll (fcls->msg[0]->message_id) + 1,
329 2, modifiers, state_sync_result, fcls);
330}
331
332
333int
334fragment_result (void *cls,
335 struct GNUNET_MULTICAST_MessageHeader *msg,
336 enum GNUNET_PSYCSTORE_MessageFlags flags)
337{
338 struct FragmentClosure *fcls = cls;
339 struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n];
340 uint64_t flags0 = fcls->flags[fcls->n++];
341
342 if (flags == flags0 && msg->header.size == msg0->header.size
343 && 0 == memcmp (msg, msg0, ntohs (msg->header.size)))
344 {
345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %llu matches\n",
346 GNUNET_ntohll (msg->fragment_id));
347 return GNUNET_YES;
348 }
349 else
350 {
351 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " fragment %llu differs\n",
352 GNUNET_ntohll (msg->fragment_id));
353 ASSERRT (0);
354 return GNUNET_SYSERR;
355 }
356}
357
358
359void
360message_get_result (void *cls, int64_t result, const char *err_msg)
361{
362 struct FragmentClosure *fcls = cls;
363 op = NULL;
364 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result);
365 ASSERT (result > 0 && fcls->n && fcls->n_expected);
366
367 op = GNUNET_PSYCSTORE_counters_get_master (h, &channel_pub_key,
368 &counters_master_result, fcls);
369}
370
371
372void
373message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
374{
375 struct FragmentClosure *fcls = cls;
376 op = NULL;
377 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n", result);
378 ASSERT (result > 0 && fcls->n && fcls->n_expected);
379
380 fcls->n = 0;
381 fcls->n_expected = 3;
382 op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key,
383 GNUNET_ntohll (fcls->msg[0]->message_id),
384 &fragment_result,
385 &message_get_result, fcls);
386}
387
388
137void 389void
138membership_store_result (void *cls, int result, const char *err_msg) 390fragment_get_result (void *cls, int64_t result, const char *err_msg)
139{ 391{
392 struct FragmentClosure *fcls = cls;
393 op = NULL;
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result);
395 ASSERT (result > 0 && fcls->n && fcls->n_expected);
396
397 fcls->n = 1;
398 fcls->n_expected = 2;
399 op = GNUNET_PSYCSTORE_message_get_fragment (h, &channel_pub_key,
400 GNUNET_ntohll (fcls->msg[1]->message_id),
401 GNUNET_ntohll (fcls->msg[1]->fragment_offset),
402 &fragment_result,
403 &message_get_fragment_result,
404 fcls);
140 405
141} 406}
142 407
408
409void
410fragment_store_result (void *cls, int64_t result, const char *err_msg)
411{
412 op = NULL;
413 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%d\n", result);
414 ASSERT (GNUNET_OK == result);
415
416 if ((intptr_t) cls == GNUNET_YES)
417 {
418 fcls.n = 0;
419 fcls.n_expected = 1;
420 op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key,
421 GNUNET_ntohll (fcls.msg[0]->fragment_id),
422 &fragment_result,
423 &fragment_get_result, &fcls);
424 }
425}
426
427
428void
429membership_test_result (void *cls, int64_t result, const char *err_msg)
430{
431 op = NULL;
432 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%d\n", result);
433 ASSERT (GNUNET_OK == result);
434
435 struct GNUNET_MULTICAST_MessageHeader *msg;
436 fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE;
437 fcls.msg[0] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
438 ASSERT (msg != NULL);
439
440 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
441 msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
442
443 msg->hop_counter = htonl (9);
444 msg->fragment_id = GNUNET_htonll (INT64_MAX - 8);
445 msg->fragment_offset = GNUNET_htonll (0);
446 msg->message_id = GNUNET_htonll (INT64_MAX - 10);
447 msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
448 msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
449
450 memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
451
452 msg->purpose.size = htonl (ntohs (msg->header.size)
453 - sizeof (msg->header)
454 - sizeof (msg->hop_counter)
455 - sizeof (msg->signature));
456 msg->purpose.purpose = htonl (234);
457 GNUNET_CRYPTO_ecc_sign (slave_key, &msg->purpose, &msg->signature);
458
459 op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[0],
460 &fragment_store_result, GNUNET_NO);
461
462 fcls.flags[1] = GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
463 fcls.msg[1] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
464 memcpy (msg, fcls.msg[0], sizeof (*msg) + sizeof (channel_pub_key));
465 msg->fragment_id = GNUNET_htonll (INT64_MAX - 4);
466 msg->fragment_offset = GNUNET_htonll (1024);
467
468 op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[1],
469 &fragment_store_result, GNUNET_NO);
470
471 fcls.flags[2] = GNUNET_PSYCSTORE_MESSAGE_STATE_HASH;
472 fcls.msg[2] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
473 memcpy (msg, fcls.msg[1], sizeof (*msg) + sizeof (channel_pub_key));
474 msg->fragment_id = GNUNET_htonll (INT64_MAX);
475 msg->fragment_offset = GNUNET_htonll (16384);
476
477 op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[2],
478 &fragment_store_result, (void *) GNUNET_YES);
479}
480
481void
482membership_store_result (void *cls, int64_t result, const char *err_msg)
483{
484 op = NULL;
485 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%d\n", result);
486 ASSERT (GNUNET_OK == result);
487
488 op = GNUNET_PSYCSTORE_membership_test (h, &channel_pub_key, &slave_pub_key,
489 4, 1,
490 &membership_test_result, NULL);
491}
492
143/** 493/**
144 * Main function of the test, run from scheduler. 494 * Main function of the test, run from scheduler.
145 * 495 *
@@ -148,14 +498,19 @@ membership_store_result (void *cls, int result, const char *err_msg)
148 * @param peer handle to access more of the peer (not used) 498 * @param peer handle to access more of the peer (not used)
149 */ 499 */
150static void 500static void
151run (void *cls, 501#if DEBUG_SERVICE
152 const struct GNUNET_CONFIGURATION_Handle *cfg, 502run (void *cls, char *const *args, const char *cfgfile,
153 struct GNUNET_TESTING_Peer *peer) 503 const struct GNUNET_CONFIGURATION_Handle *cfg)
504#else
505 run (void *cls,
506 const struct GNUNET_CONFIGURATION_Handle *cfg,
507 struct GNUNET_TESTING_Peer *peer)
508#endif
154{ 509{
155 endbadly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, 510 end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
156 &endbadly, NULL); 511 &end_badly, NULL);
157 h = GNUNET_PSYCSTORE_connect (cfg); 512 h = GNUNET_PSYCSTORE_connect (cfg);
158 GNUNET_assert (NULL != h); 513 ASSERT (NULL != h);
159 514
160 channel_key = GNUNET_CRYPTO_ecc_key_create (); 515 channel_key = GNUNET_CRYPTO_ecc_key_create ();
161 slave_key = GNUNET_CRYPTO_ecc_key_create (); 516 slave_key = GNUNET_CRYPTO_ecc_key_create ();
@@ -164,10 +519,8 @@ run (void *cls,
164 GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key); 519 GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
165 520
166 op = GNUNET_PSYCSTORE_membership_store (h, &channel_pub_key, &slave_pub_key, 521 op = GNUNET_PSYCSTORE_membership_store (h, &channel_pub_key, &slave_pub_key,
167 GNUNET_YES, 2, 2, 1, 522 GNUNET_YES, 4, 2, 1,
168 &membership_store_result, NULL); 523 &membership_store_result, NULL);
169
170 end ();
171} 524}
172 525
173 526
@@ -175,15 +528,20 @@ int
175main (int argc, char *argv[]) 528main (int argc, char *argv[])
176{ 529{
177 res = 1; 530 res = 1;
178 if (0 != 531#if DEBUG_SERVICE
179 GNUNET_TESTING_service_run ("test-psycstore", 532 const struct GNUNET_GETOPT_CommandLineOption opts[] = {
180 "psycstore", 533 GNUNET_GETOPT_OPTION_END
181 "test_psycstore.conf", 534 };
182 &run, 535 if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "test-psycstore",
183 NULL)) 536 "test-psycstore [options]",
537 opts, &run, NULL))
538 return 1;
539#else
540 if (0 != GNUNET_TESTING_service_run ("test-psycstore", "psycstore",
541 "test_psycstore.conf", &run, NULL))
184 return 1; 542 return 1;
543#endif
185 return res; 544 return res;
186} 545}
187 546
188
189/* end of test_psycstore.c */ 547/* end of test_psycstore.c */
diff --git a/src/psycstore/test_psycstore.conf b/src/psycstore/test_psycstore.conf
index ea3031524..4c52d1ce0 100644
--- a/src/psycstore/test_psycstore.conf
+++ b/src/psycstore/test_psycstore.conf
@@ -1,6 +1,7 @@
1[arm] 1[arm]
2UNIXPATH = /tmp/test-gnunet-service-arm.sock
2DEFAULTSERVICES = psycstore 3DEFAULTSERVICES = psycstore
3UNIXPATH = /tmp/test-psycstore-service-arm.sock 4GLOBAL_POSTFIX = -L DEBUG
4 5
5[psycstore] 6[psycstore]
6AUTOSTART = YES 7AUTOSTART = YES
@@ -10,6 +11,8 @@ UNIXPATH = /tmp/test-gnunet-service-psycstore.sock
10UNIX_MATCH_UID = NO 11UNIX_MATCH_UID = NO
11UNIX_MATCH_GID = YES 12UNIX_MATCH_GID = YES
12DATABASE = sqlite 13DATABASE = sqlite
14OPTIONS = -L DEBUG
15DEBUG = YES
13 16
14[psycstore-sqlite] 17[psycstore-sqlite]
15FILENAME = $SERVICEHOME/psycstore/sqlite_test.db 18FILENAME = $SERVICEHOME/psycstore/sqlite_test.db