aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
committerGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
commitd38544730123a1f365ef287a7e88060d97b266b7 (patch)
treee66f5ee143adde1a5206f50bbbdd00a8a179fb81 /src
parent2275976cf61565bde4f17e8c2c0bc0d359541ac4 (diff)
downloadgnunet-d38544730123a1f365ef287a7e88060d97b266b7.tar.gz
gnunet-d38544730123a1f365ef287a7e88060d97b266b7.zip
psyc/store: apply state modifiers
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_psycstore_plugin.h20
-rw-r--r--src/include/gnunet_psycstore_service.h13
-rw-r--r--src/psyc/gnunet-service-psyc.c63
-rw-r--r--src/psyc/psyc_util_lib.c7
-rw-r--r--src/psycstore/Makefile.am1
-rw-r--r--src/psycstore/gnunet-service-psycstore.c218
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c121
-rw-r--r--src/psycstore/psycstore.h35
-rw-r--r--src/psycstore/psycstore_api.c72
-rw-r--r--src/psycstore/test_plugin_psycstore.c41
-rw-r--r--src/psycstore/test_psycstore.c30
11 files changed, 353 insertions, 268 deletions
diff --git a/src/include/gnunet_psycstore_plugin.h b/src/include/gnunet_psycstore_plugin.h
index 12f4e692f..b0bbfd819 100644
--- a/src/include/gnunet_psycstore_plugin.h
+++ b/src/include/gnunet_psycstore_plugin.h
@@ -240,9 +240,10 @@ struct GNUNET_PSYCSTORE_PluginFunctions
240 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 240 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
241 */ 241 */
242 int 242 int
243 (*state_modify_set) (void *cls, 243 (*state_modify_op) (void *cls,
244 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 244 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
245 const char *name, const void *value, size_t value_size); 245 enum GNUNET_ENV_Operator op,
246 const char *name, const void *value, size_t value_size);
246 247
247 248
248 /** 249 /**
@@ -270,20 +271,20 @@ struct GNUNET_PSYCSTORE_PluginFunctions
270 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key); 271 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key);
271 272
272 /** 273 /**
273 * Set the value of a state variable while synchronizing state. 274 * Assign value of a state variable while synchronizing state.
274 * 275 *
275 * The state synchronization process is started with state_sync_begin(), 276 * The state synchronization process is started with state_sync_begin(),
276 * which is followed by one or more calls to this function, 277 * which is followed by one or more calls to this function,
277 * and finished with state_sync_end(). 278 * and finished using state_sync_end().
278 * 279 *
279 * @see GNUNET_PSYCSTORE_state_sync() 280 * @see GNUNET_PSYCSTORE_state_sync()
280 * 281 *
281 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 282 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
282 */ 283 */
283 int 284 int
284 (*state_sync_set) (void *cls, 285 (*state_sync_assign) (void *cls,
285 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 286 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
286 const char *name, const void *value, size_t value_size); 287 const char *name, const void *value, size_t value_size);
287 288
288 289
289 /** 290 /**
@@ -296,7 +297,8 @@ struct GNUNET_PSYCSTORE_PluginFunctions
296 int 297 int
297 (*state_sync_end) (void *cls, 298 (*state_sync_end) (void *cls,
298 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 299 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
299 uint64_t message_id); 300 uint64_t max_state_message_id,
301 uint64_t state_hash_message_id);
300 302
301 303
302 /** 304 /**
diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h
index 12a375c44..8f3866bdb 100644
--- a/src/include/gnunet_psycstore_service.h
+++ b/src/include/gnunet_psycstore_service.h
@@ -494,10 +494,6 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
494 * ID of the message that contains the @a modifiers. 494 * ID of the message that contains the @a modifiers.
495 * @param state_delta 495 * @param state_delta
496 * Value of the @e state_delta PSYC header variable of the message. 496 * Value of the @e state_delta PSYC header variable of the message.
497 * @param modifier_count
498 * Number of elements in the @a modifiers array.
499 * @param modifiers
500 * List of modifiers to apply.
501 * @param rcb 497 * @param rcb
502 * Callback to call with the result of the operation. 498 * Callback to call with the result of the operation.
503 * @param rcb_cls 499 * @param rcb_cls
@@ -510,8 +506,6 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
510 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 506 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
511 uint64_t message_id, 507 uint64_t message_id,
512 uint64_t state_delta, 508 uint64_t state_delta,
513 size_t modifier_count,
514 const struct GNUNET_ENV_Modifier *modifiers,
515 GNUNET_PSYCSTORE_ResultCallback rcb, 509 GNUNET_PSYCSTORE_ResultCallback rcb,
516 void *rcb_cls); 510 void *rcb_cls);
517 511
@@ -523,7 +517,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
523 * Handle for the PSYCstore. 517 * Handle for the PSYCstore.
524 * @param channel_key 518 * @param channel_key
525 * The channel we are interested in. 519 * The channel we are interested in.
526 * @param message_id 520 * @param max_state_message_id
521 * ID of the last stateful message before @a state_hash_message_id.
522 * @param state_hash_message_id
527 * ID of the message that contains the state_hash PSYC header variable. 523 * ID of the message that contains the state_hash PSYC header variable.
528 * @param modifier_count 524 * @param modifier_count
529 * Number of elements in the @a modifiers array. 525 * Number of elements in the @a modifiers array.
@@ -539,7 +535,8 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
539struct GNUNET_PSYCSTORE_OperationHandle * 535struct GNUNET_PSYCSTORE_OperationHandle *
540GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, 536GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
541 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 537 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
542 uint64_t message_id, 538 uint64_t max_state_message_id,
539 uint64_t state_hash_message_id,
543 size_t modifier_count, 540 size_t modifier_count,
544 const struct GNUNET_ENV_Modifier *modifiers, 541 const struct GNUNET_ENV_Modifier *modifiers,
545 GNUNET_PSYCSTORE_ResultCallback rcb, 542 GNUNET_PSYCSTORE_ResultCallback rcb,
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 72377680d..29ef07f10 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -416,6 +416,8 @@ struct Slave
416static void 416static void
417transmit_message (struct Channel *chn); 417transmit_message (struct Channel *chn);
418 418
419static uint64_t
420message_queue_run (struct Channel *chn);
419 421
420static uint64_t 422static uint64_t
421message_queue_drop (struct Channel *chn); 423message_queue_drop (struct Channel *chn);
@@ -1274,6 +1276,39 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1274} 1276}
1275 1277
1276 1278
1279struct StateModifyClosure
1280{
1281 struct Channel *chn;
1282 struct FragmentQueue *fragq;
1283 uint64_t message_id;
1284};
1285
1286
1287void
1288store_recv_state_modify_result (void *cls, int64_t result,
1289 const char *err_msg, uint16_t err_msg_size)
1290{
1291 struct StateModifyClosure *mcls = cls;
1292 struct Channel *chn = mcls->chn;
1293 struct FragmentQueue *fragq = mcls->fragq;
1294 uint64_t msg_id = mcls->message_id;
1295
1296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1298 chn, result, err_msg_size, err_msg);
1299
1300 if (GNUNET_OK == result)
1301 {
1302 chn->max_state_message_id = msg_id;
1303 chn->max_message_id = msg_id;
1304
1305 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1306 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1307 message_queue_run (chn);
1308 }
1309}
1310
1311
1277/** 1312/**
1278 * Run message queue. 1313 * Run message queue.
1279 * 1314 *
@@ -1294,6 +1329,7 @@ message_queue_run (struct Channel *chn)
1294 "%p Running message queue.\n", chn); 1329 "%p Running message queue.\n", chn);
1295 uint64_t n = 0; 1330 uint64_t n = 0;
1296 uint64_t msg_id; 1331 uint64_t msg_id;
1332
1297 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, 1333 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1298 &msg_id)) 1334 &msg_id))
1299 { 1335 {
@@ -1325,7 +1361,7 @@ message_queue_run (struct Channel *chn)
1325 "%p Out of order message. " 1361 "%p Out of order message. "
1326 "(%" PRIu64 " - 1 != %" PRIu64 ")\n", 1362 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1327 chn, msg_id, chn->max_message_id); 1363 chn, msg_id, chn->max_message_id);
1328 break; 1364 continue;
1329 } 1365 }
1330 } 1366 }
1331 else 1367 else
@@ -1336,14 +1372,19 @@ message_queue_run (struct Channel *chn)
1336 "%p Out of order stateful message. " 1372 "%p Out of order stateful message. "
1337 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", 1373 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1338 chn, msg_id, fragq->state_delta, chn->max_state_message_id); 1374 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1339 break; 1375 continue;
1340 } 1376 }
1341#if TODO 1377
1342 /* FIXME: apply modifiers to state in PSYCstore */ 1378 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1343 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, 1379 mcls->chn = chn;
1344 store_recv_state_modify_result, cls); 1380 mcls->fragq = fragq;
1345#endif 1381 mcls->message_id = msg_id;
1346 chn->max_state_message_id = msg_id; 1382
1383 /* Apply modifiers to state in PSYCstore */
1384 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1385 fragq->state_delta,
1386 store_recv_state_modify_result, mcls);
1387 break;
1347 } 1388 }
1348 chn->max_message_id = msg_id; 1389 chn->max_message_id = msg_id;
1349 } 1390 }
@@ -1351,6 +1392,7 @@ message_queue_run (struct Channel *chn)
1351 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); 1392 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1352 n++; 1393 n++;
1353 } 1394 }
1395
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355 "%p Removed %" PRIu64 " messages from queue.\n", chn, n); 1397 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1356 return n; 1398 return n;
@@ -2039,6 +2081,11 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2039 { 2081 {
2040 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2082 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2041 } 2083 }
2084
2085 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2086 {
2087 /// @todo add state_hash to PSYC header
2088 }
2042 } 2089 }
2043} 2090}
2044 2091
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 13d66e6d6..4ad7a914b 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -343,7 +343,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343 343
344 LOG (GNUNET_ERROR_TYPE_DEBUG, 344 LOG (GNUNET_ERROR_TYPE_DEBUG,
345 "Queueing message part of type %u and size %u (end: %u)).\n", 345 "Queueing message part of type %u and size %u (end: %u)).\n",
346 ntohs (msg->type), size, end); 346 NULL != msg ? ntohs (msg->type) : 0, size, end);
347 347
348 if (NULL != tmit->msg) 348 if (NULL != tmit->msg)
349 { 349 {
@@ -917,7 +917,8 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
917 } 917 }
918 918
919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920 "Received message part from PSYC.\n"); 920 "Received message part of type %u and size %u from PSYC.\n",
921 ptype, psize);
921 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); 922 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
922 923
923 switch (ptype) 924 switch (ptype)
@@ -1118,7 +1119,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1118 ptype, psize); 1119 ptype, psize);
1119 return GNUNET_SYSERR; 1120 return GNUNET_SYSERR;
1120 } 1121 }
1121 /* FIXME: check message part order */ 1122 /** @todo FIXME: check message part order */
1122 } 1123 }
1123 return parts; 1124 return parts;
1124} 1125}
diff --git a/src/psycstore/Makefile.am b/src/psycstore/Makefile.am
index 8804255d2..7dac87084 100644
--- a/src/psycstore/Makefile.am
+++ b/src/psycstore/Makefile.am
@@ -49,6 +49,7 @@ gnunet_service_psycstore_SOURCES = \
49gnunet_service_psycstore_LDADD = \ 49gnunet_service_psycstore_LDADD = \
50 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 50 $(top_builddir)/src/statistics/libgnunetstatistics.la \
51 $(top_builddir)/src/util/libgnunetutil.la \ 51 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/psyc/libgnunetpsycutil.la \
52 $(GN_LIBINTL) 53 $(GN_LIBINTL)
53 54
54plugin_LTLIBRARIES = \ 55plugin_LTLIBRARIES = \
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 556712df4..6e40e7849 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -32,6 +32,7 @@
32#include "gnunet_constants.h" 32#include "gnunet_constants.h"
33#include "gnunet_protocols.h" 33#include "gnunet_protocols.h"
34#include "gnunet_statistics_service.h" 34#include "gnunet_statistics_service.h"
35#include "gnunet_psyc_util_lib.h"
35#include "gnunet_psycstore_service.h" 36#include "gnunet_psycstore_service.h"
36#include "gnunet_psycstore_plugin.h" 37#include "gnunet_psycstore_plugin.h"
37#include "psycstore.h" 38#include "psycstore.h"
@@ -493,7 +494,136 @@ handle_counters_get (void *cls,
493} 494}
494 495
495 496
496/** @todo FIXME: stop processing further state modify messages after an error */ 497struct StateModifyClosure
498{
499 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
500 struct GNUNET_PSYC_ReceiveHandle *recv;
501 enum GNUNET_PSYC_MessageState msg_state;
502 char mod_oper;
503 char *mod_name;
504 char *mod_value;
505 uint64_t mod_value_size;
506 uint64_t mod_value_remaining;
507};
508
509
510static void
511recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
512 uint32_t flags, const struct GNUNET_MessageHeader *msg)
513{
514 struct StateModifyClosure *scls = cls;
515 uint16_t psize;
516 if (NULL == msg)
517 {
518 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
519 return;
520 }
521
522 switch (ntohs (msg->type))
523 {
524 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
525 {
526 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
527 break;
528 }
529
530 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
531 {
532 struct GNUNET_PSYC_MessageModifier *
533 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
534 psize = ntohs (pmod->header.size);
535 uint16_t name_size = ntohs (pmod->name_size);
536 uint16_t value_size = ntohs (pmod->value_size);
537
538 const char *name = (const char *) &pmod[1];
539 const void *value = name + name_size;
540
541 if (GNUNET_ENV_OP_SET != pmod->oper)
542 { // Apply non-transient operation.
543 if (psize == sizeof (*pmod) + name_size + value_size)
544 {
545 db->state_modify_op (db->cls, scls->channel_key,
546 pmod->oper, name, value, value_size);
547 }
548 else
549 {
550 scls->mod_oper = pmod->oper;
551 scls->mod_name = GNUNET_malloc (name_size);
552 memcpy (scls->mod_name, name, name_size);
553
554 scls->mod_value_size = value_size;
555 scls->mod_value = GNUNET_malloc (scls->mod_value_size);
556 scls->mod_value_remaining
557 = scls->mod_value_size - (psize - sizeof (*pmod) - name_size);
558 memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining);
559 }
560 }
561 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
562 break;
563 }
564
565 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
566 if (GNUNET_ENV_OP_SET != scls->mod_oper)
567 {
568 if (scls->mod_value_remaining == 0)
569 {
570 GNUNET_break_op (0);
571 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
572 }
573 psize = ntohs (msg->size);
574 memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining),
575 &msg[1], psize - sizeof (*msg));
576 scls->mod_value_remaining -= psize - sizeof (*msg);
577 if (0 == scls->mod_value_remaining)
578 {
579 db->state_modify_op (db->cls, scls->channel_key,
580 scls->mod_oper, scls->mod_name,
581 scls->mod_value, scls->mod_value_size);
582 GNUNET_free (scls->mod_name);
583 GNUNET_free (scls->mod_value);
584 scls->mod_oper = 0;
585 scls->mod_name = NULL;
586 scls->mod_value = NULL;
587 scls->mod_value_size = 0;
588 }
589 }
590 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
591 break;
592
593 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
594 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
595 break;
596
597 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
598 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
599 break;
600
601 default:
602 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
603 }
604}
605
606
607static int
608recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
609 enum GNUNET_PSYCSTORE_MessageFlags flags)
610{
611 struct StateModifyClosure *scls = cls;
612
613 if (NULL == scls->recv)
614 {
615 scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part,
616 scls);
617 }
618
619 const struct GNUNET_PSYC_MessageHeader *
620 pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1];
621 GNUNET_PSYC_receive_message (scls->recv, pmsg);
622
623 return GNUNET_YES;
624}
625
626
497static void 627static void
498handle_state_modify (void *cls, 628handle_state_modify (void *cls,
499 struct GNUNET_SERVER_Client *client, 629 struct GNUNET_SERVER_Client *client,
@@ -502,65 +632,36 @@ handle_state_modify (void *cls,
502 const struct StateModifyRequest *req 632 const struct StateModifyRequest *req
503 = (const struct StateModifyRequest *) msg; 633 = (const struct StateModifyRequest *) msg;
504 634
505 int ret = GNUNET_SYSERR; 635 uint64_t message_id = GNUNET_ntohll (req->message_id);
506 const char *name = (const char *) &req[1]; 636 uint64_t state_delta = GNUNET_ntohll (req->state_delta);
507 uint16_t name_size = ntohs (req->name_size); 637 uint64_t ret_frags = 0;
508 638
509 if (name_size <= 2 || '\0' != name[name_size - 1]) 639 struct StateModifyClosure scls = { 0 };
640
641 if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
642 message_id, state_delta))
510 { 643 {
511 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 644 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
512 _("Tried to set invalid state variable name!\n")); 645 _("Failed to begin modifying state!\n"));
513 GNUNET_break_op (0); 646 GNUNET_break (0);
514 } 647 }
515 else
516 {
517 ret = GNUNET_OK;
518 648
519 if (req->flags & STATE_OP_FIRST) 649 int ret = db->message_get (db->cls, &req->channel_key,
520 { 650 message_id, message_id,
521 ret = db->state_modify_begin (db->cls, &req->channel_key, 651 &ret_frags, &recv_state_fragment, &scls);
522 GNUNET_ntohll (req->message_id),
523 GNUNET_ntohll (req->state_delta));
524 }
525 if (ret != GNUNET_OK)
526 {
527 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528 _("Failed to begin modifying state!\n"));
529 }
530 else
531 {
532 switch (req->oper)
533 {
534 case GNUNET_ENV_OP_ASSIGN:
535 ret = db->state_modify_set (db->cls, &req->channel_key,
536 (const char *) &req[1],
537 name + ntohs (req->name_size),
538 ntohs (req->header.size) - sizeof (*req)
539 - ntohs (req->name_size));
540 break;
541 default:
542#if TODO
543 ret = GNUNET_ENV_operation ((const char *) &req[1],
544 current_value, current_value_size,
545 req->oper, name + ntohs (req->name_size),
546 ntohs (req->header.size) - sizeof (*req)
547 - ntohs (req->name_size), &value, &value_size);
548#endif
549 ret = GNUNET_SYSERR;
550 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
551 _("Unknown operator: %c\n"), req->oper);
552 }
553 }
554 652
555 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) 653 if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id))
556 { 654 {
557 ret = db->state_modify_end (db->cls, &req->channel_key, 655 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
558 GNUNET_ntohll (req->message_id)); 656 _("Failed to end modifying state!\n"));
559 if (ret != GNUNET_OK) 657 GNUNET_break (0);
560 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
561 _("Failed to end modifying state!\n"));
562 }
563 } 658 }
659
660 if (NULL != scls.recv)
661 {
662 GNUNET_PSYC_receive_destroy (scls.recv);
663 }
664
564 send_result_code (client, req->op_id, ret, NULL); 665 send_result_code (client, req->op_id, ret, NULL);
565 GNUNET_SERVER_receive_done (client, GNUNET_OK); 666 GNUNET_SERVER_receive_done (client, GNUNET_OK);
566} 667}
@@ -600,16 +701,17 @@ handle_state_sync (void *cls,
600 } 701 }
601 else 702 else
602 { 703 {
603 ret = db->state_sync_set (db->cls, &req->channel_key, name, 704 ret = db->state_sync_assign (db->cls, &req->channel_key, name,
604 name + ntohs (req->name_size), 705 name + ntohs (req->name_size),
605 ntohs (req->header.size) - sizeof (*req) 706 ntohs (req->header.size) - sizeof (*req)
606 - ntohs (req->name_size)); 707 - ntohs (req->name_size));
607 } 708 }
608 709
609 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) 710 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
610 { 711 {
611 ret = db->state_sync_end (db->cls, &req->channel_key, 712 ret = db->state_sync_end (db->cls, &req->channel_key,
612 GNUNET_ntohll (req->message_id)); 713 GNUNET_ntohll (req->max_state_message_id),
714 GNUNET_ntohll (req->state_hash_message_id));
613 if (ret != GNUNET_OK) 715 if (ret != GNUNET_OK)
614 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 716 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
615 _("Failed to end synchronizing state!\n")); 717 _("Failed to end synchronizing state!\n"));
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 60dc74303..1abc479d2 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -35,6 +35,7 @@
35#include "gnunet_psycstore_service.h" 35#include "gnunet_psycstore_service.h"
36#include "gnunet_multicast_service.h" 36#include "gnunet_multicast_service.h"
37#include "gnunet_crypto_lib.h" 37#include "gnunet_crypto_lib.h"
38#include "gnunet_env_lib.h"
38#include "psycstore.h" 39#include "psycstore.h"
39#include <sqlite3.h> 40#include <sqlite3.h>
40 41
@@ -172,14 +173,8 @@ struct Plugin
172 */ 173 */
173 sqlite3_stmt *update_max_state_message_id; 174 sqlite3_stmt *update_max_state_message_id;
174 175
175
176 /** 176 /**
177 * Precompiled SQL for message_modify_begin() 177 * Precompiled SQL for state_modify_op()
178 */
179 sqlite3_stmt *select_message_state_delta;
180
181 /**
182 * Precompiled SQL for state_modify_set()
183 */ 178 */
184 sqlite3_stmt *insert_state_current; 179 sqlite3_stmt *insert_state_current;
185 180
@@ -353,8 +348,8 @@ database_setup (struct Plugin *plugin)
353 "CREATE TABLE IF NOT EXISTS channels (\n" 348 "CREATE TABLE IF NOT EXISTS channels (\n"
354 " id INTEGER PRIMARY KEY,\n" 349 " id INTEGER PRIMARY KEY,\n"
355 " pub_key BLOB UNIQUE,\n" 350 " pub_key BLOB UNIQUE,\n"
356 " max_state_message_id INTEGER,\n" 351 " max_state_message_id INTEGER,\n" // last applied state message ID
357 " state_hash_message_id INTEGER\n" 352 " state_hash_message_id INTEGER\n" // last message ID with a state hash
358 ");"); 353 ");");
359 354
360 sql_exec (plugin->dbh, 355 sql_exec (plugin->dbh,
@@ -543,17 +538,6 @@ database_setup (struct Plugin *plugin)
543 &plugin->update_state_hash_message_id); 538 &plugin->update_state_hash_message_id);
544 539
545 sql_prepare (plugin->dbh, 540 sql_prepare (plugin->dbh,
546 "SELECT 1\n"
547 "FROM channels AS c\n"
548 "LEFT JOIN messages AS m\n"
549 "ON c.id = m.channel_id\n"
550 "WHERE c.pub_key = ?\n"
551 " AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
552 " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
553 "LIMIT 1;",
554 &plugin->select_message_state_delta);
555
556 sql_prepare (plugin->dbh,
557 "INSERT OR REPLACE INTO state\n" 541 "INSERT OR REPLACE INTO state\n"
558 " (channel_id, name, value_current, value_signed)\n" 542 " (channel_id, name, value_current, value_signed)\n"
559 "SELECT new.channel_id, new.name,\n" 543 "SELECT new.channel_id, new.name,\n"
@@ -1447,14 +1431,14 @@ counters_state_get (void *cls,
1447 1431
1448 1432
1449/** 1433/**
1450 * Set a state variable to the given value. 1434 * Assign a value to a state variable.
1451 * 1435 *
1452 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 1436 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1453 */ 1437 */
1454static int 1438static int
1455state_set (struct Plugin *plugin, sqlite3_stmt *stmt, 1439state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
1456 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1440 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1457 const char *name, const void *value, size_t value_size) 1441 const char *name, const void *value, size_t value_size)
1458{ 1442{
1459 int ret = GNUNET_SYSERR; 1443 int ret = GNUNET_SYSERR;
1460 1444
@@ -1527,50 +1511,25 @@ state_modify_begin (void *cls,
1527 uint64_t message_id, uint64_t state_delta) 1511 uint64_t message_id, uint64_t state_delta)
1528{ 1512{
1529 struct Plugin *plugin = cls; 1513 struct Plugin *plugin = cls;
1530 sqlite3_stmt *stmt = plugin->select_message_state_delta;
1531 1514
1532 if (state_delta > 0) 1515 if (state_delta > 0)
1533 { 1516 {
1534 int ret = GNUNET_SYSERR; 1517 /**
1535 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1518 * We can only apply state modifiers in the current message if modifiers in
1536 sizeof (*channel_key), SQLITE_STATIC) 1519 * the previous stateful message (message_id - state_delta) were already
1537 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, 1520 * applied.
1538 message_id - state_delta) 1521 */
1539 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, 1522
1540 message_id) 1523 uint64_t max_state_message_id = 0;
1541 || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, 1524 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1542 message_id - state_delta)
1543 || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
1544 GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
1545 {
1546 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1547 "sqlite3_bind");
1548 }
1549 else
1550 {
1551 switch (sqlite3_step (stmt))
1552 {
1553 case SQLITE_DONE:
1554 ret = GNUNET_NO;
1555 break;
1556 case SQLITE_ROW:
1557 ret = GNUNET_OK;
1558 break;
1559 default:
1560 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1561 "sqlite3_step");
1562 }
1563 }
1564 if (SQLITE_OK != sqlite3_reset (stmt))
1565 {
1566 ret = GNUNET_SYSERR;
1567 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1568 "sqlite3_reset");
1569 }
1570 if (GNUNET_OK != ret) 1525 if (GNUNET_OK != ret)
1571 return ret; 1526 return ret;
1527
1528 if (message_id - state_delta != max_state_message_id)
1529 return GNUNET_NO;
1572 } 1530 }
1573 1531
1532 // Make sure no other transaction is going on.
1574 if (TRANSACTION_NONE != plugin->transaction) 1533 if (TRANSACTION_NONE != plugin->transaction)
1575 if (GNUNET_OK != transaction_rollback (plugin)) 1534 if (GNUNET_OK != transaction_rollback (plugin))
1576 return GNUNET_SYSERR; 1535 return GNUNET_SYSERR;
@@ -1587,16 +1546,24 @@ state_modify_begin (void *cls,
1587 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 1546 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1588 */ 1547 */
1589static int 1548static int
1590state_modify_set (void *cls, 1549state_modify_op (void *cls,
1591 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1550 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1592 const char *name, const void *value, size_t value_size) 1551 enum GNUNET_ENV_Operator op,
1552 const char *name, const void *value, size_t value_size)
1593{ 1553{
1594 struct Plugin *plugin = cls; 1554 struct Plugin *plugin = cls;
1595 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); 1555 GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
1596 1556
1597 return state_set (plugin, plugin->insert_state_current, channel_key, 1557 switch (op)
1598 name, value, value_size); 1558 {
1559 case GNUNET_ENV_OP_ASSIGN:
1560 return state_assign (plugin, plugin->insert_state_current, channel_key,
1561 name, value, value_size);
1599 1562
1563 /// @todo implement more state operations
1564 default:
1565 return GNUNET_SYSERR;
1566 }
1600} 1567}
1601 1568
1602 1569
@@ -1634,20 +1601,20 @@ state_sync_begin (void *cls,
1634 1601
1635 1602
1636/** 1603/**
1637 * Set the current value of state variable. 1604 * Assign current value of a state variable.
1638 * 1605 *
1639 * @see GNUNET_PSYCSTORE_state_modify() 1606 * @see GNUNET_PSYCSTORE_state_modify()
1640 * 1607 *
1641 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 1608 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1642 */ 1609 */
1643static int 1610static int
1644state_sync_set (void *cls, 1611state_sync_assign (void *cls,
1645 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1612 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1646 const char *name, const void *value, size_t value_size) 1613 const char *name, const void *value, size_t value_size)
1647{ 1614{
1648 struct Plugin *plugin = cls; 1615 struct Plugin *plugin = cls;
1649 return state_set (cls, plugin->insert_state_sync, channel_key, 1616 return state_assign (cls, plugin->insert_state_sync, channel_key,
1650 name, value, value_size); 1617 name, value, value_size);
1651} 1618}
1652 1619
1653 1620
@@ -1657,7 +1624,8 @@ state_sync_set (void *cls,
1657static int 1624static int
1658state_sync_end (void *cls, 1625state_sync_end (void *cls,
1659 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1626 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1660 uint64_t message_id) 1627 uint64_t max_state_message_id,
1628 uint64_t state_hash_message_id)
1661{ 1629{
1662 struct Plugin *plugin = cls; 1630 struct Plugin *plugin = cls;
1663 int ret = GNUNET_SYSERR; 1631 int ret = GNUNET_SYSERR;
@@ -1670,7 +1638,10 @@ state_sync_end (void *cls,
1670 channel_key) 1638 channel_key)
1671 && GNUNET_OK == update_message_id (plugin, 1639 && GNUNET_OK == update_message_id (plugin,
1672 plugin->update_state_hash_message_id, 1640 plugin->update_state_hash_message_id,
1673 channel_key, message_id) 1641 channel_key, state_hash_message_id)
1642 && GNUNET_OK == update_message_id (plugin,
1643 plugin->update_max_state_message_id,
1644 channel_key, max_state_message_id)
1674 && GNUNET_OK == transaction_commit (plugin) 1645 && GNUNET_OK == transaction_commit (plugin)
1675 ? ret = GNUNET_OK 1646 ? ret = GNUNET_OK
1676 : transaction_rollback (plugin); 1647 : transaction_rollback (plugin);
@@ -1679,7 +1650,7 @@ state_sync_end (void *cls,
1679 1650
1680 1651
1681/** 1652/**
1682 * Reset the state of a channel. 1653 * Delete the whole state.
1683 * 1654 *
1684 * @see GNUNET_PSYCSTORE_state_reset() 1655 * @see GNUNET_PSYCSTORE_state_reset()
1685 * 1656 *
@@ -1922,10 +1893,10 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
1922 api->counters_message_get = &counters_message_get; 1893 api->counters_message_get = &counters_message_get;
1923 api->counters_state_get = &counters_state_get; 1894 api->counters_state_get = &counters_state_get;
1924 api->state_modify_begin = &state_modify_begin; 1895 api->state_modify_begin = &state_modify_begin;
1925 api->state_modify_set = &state_modify_set; 1896 api->state_modify_op = &state_modify_op;
1926 api->state_modify_end = &state_modify_end; 1897 api->state_modify_end = &state_modify_end;
1927 api->state_sync_begin = &state_sync_begin; 1898 api->state_sync_begin = &state_sync_begin;
1928 api->state_sync_set = &state_sync_set; 1899 api->state_sync_assign = &state_sync_assign;
1929 api->state_sync_end = &state_sync_end; 1900 api->state_sync_end = &state_sync_end;
1930 api->state_reset = &state_reset; 1901 api->state_reset = &state_reset;
1931 api->state_update_signed = &state_update_signed; 1902 api->state_update_signed = &state_update_signed;
diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h
index 807c3c3da..67104e8ad 100644
--- a/src/psycstore/psycstore.h
+++ b/src/psycstore/psycstore.h
@@ -441,35 +441,24 @@ struct StateModifyRequest
441 struct GNUNET_MessageHeader header; 441 struct GNUNET_MessageHeader header;
442 442
443 /** 443 /**
444 * Size of name, including NUL terminator. 444 * Operation ID.
445 */
446 uint16_t name_size GNUNET_PACKED;
447
448 /**
449 * OR'd StateOpFlags
450 */ 445 */
451 uint8_t flags; 446 uint64_t op_id GNUNET_PACKED;
452 447
453 /** 448 /**
454 * enum GNUNET_ENV_Operator 449 * ID of the message to apply the state changes in.
455 */ 450 */
456 uint8_t oper; 451 uint64_t message_id GNUNET_PACKED;
457 452
458 /** 453 /**
459 * Operation ID. 454 * State delta of the message with ID @a message_id.
460 */ 455 */
461 uint64_t op_id GNUNET_PACKED; 456 uint64_t state_delta GNUNET_PACKED;
462 457
463 /** 458 /**
464 * Channel's public key. 459 * Channel's public key.
465 */ 460 */
466 struct GNUNET_CRYPTO_EddsaPublicKey channel_key; 461 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
467
468 uint64_t message_id GNUNET_PACKED;
469
470 uint64_t state_delta GNUNET_PACKED;
471
472 /* Followed by NUL-terminated name, then the value. */
473}; 462};
474 463
475 464
@@ -495,14 +484,22 @@ struct StateSyncRequest
495 484
496 uint8_t reserved; 485 uint8_t reserved;
497 486
498 uint64_t message_id GNUNET_PACKED;
499
500 /** 487 /**
501 * Operation ID. 488 * Operation ID.
502 */ 489 */
503 uint64_t op_id GNUNET_PACKED; 490 uint64_t op_id GNUNET_PACKED;
504 491
505 /** 492 /**
493 * ID of the message that contains the state_hash PSYC header variable.
494 */
495 uint64_t state_hash_message_id GNUNET_PACKED;
496
497 /**
498 * ID of the last stateful message before @a state_hash_message_id.
499 */
500 uint64_t max_state_message_id GNUNET_PACKED;
501
502 /**
506 * Channel's public key. 503 * Channel's public key.
507 */ 504 */
508 struct GNUNET_CRYPTO_EddsaPublicKey channel_key; 505 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index f5210ac76..214d8ba5d 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -302,16 +302,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); 302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
303 if (NULL != op->res_cb) 303 if (NULL != op->res_cb)
304 { 304 {
305 const struct StateModifyRequest *smreq;
306 const struct StateSyncRequest *ssreq; 305 const struct StateSyncRequest *ssreq;
307 switch (ntohs (op->msg->type)) 306 switch (ntohs (op->msg->type))
308 { 307 {
309 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
310 smreq = (const struct StateModifyRequest *) op->msg;
311 if (!(smreq->flags & STATE_OP_LAST
312 || GNUNET_OK != result_code))
313 op->res_cb = NULL;
314 break;
315 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: 308 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
316 ssreq = (const struct StateSyncRequest *) op->msg; 309 ssreq = (const struct StateSyncRequest *) op->msg;
317 if (!(ssreq->flags & STATE_OP_LAST 310 if (!(ssreq->flags & STATE_OP_LAST
@@ -1234,10 +1227,6 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1234 * ID of the message that contains the @a modifiers. 1227 * ID of the message that contains the @a modifiers.
1235 * @param state_delta 1228 * @param state_delta
1236 * Value of the _state_delta PSYC header variable of the message. 1229 * Value of the _state_delta PSYC header variable of the message.
1237 * @param modifier_count
1238 * Number of elements in the @a modifiers array.
1239 * @param modifiers
1240 * List of modifiers to apply.
1241 * @param rcb 1230 * @param rcb
1242 * Callback to call with the result of the operation. 1231 * Callback to call with the result of the operation.
1243 * @param rcb_cls 1232 * @param rcb_cls
@@ -1250,50 +1239,31 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1250 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1239 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1251 uint64_t message_id, 1240 uint64_t message_id,
1252 uint64_t state_delta, 1241 uint64_t state_delta,
1253 size_t modifier_count,
1254 const struct GNUNET_ENV_Modifier *modifiers,
1255 GNUNET_PSYCSTORE_ResultCallback rcb, 1242 GNUNET_PSYCSTORE_ResultCallback rcb,
1256 void *rcb_cls) 1243 void *rcb_cls)
1257{ 1244{
1258 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; 1245 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1259 size_t i; 1246 struct StateModifyRequest *req;
1260 1247
1261 for (i = 0; i < modifier_count; i++) { 1248 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1262 struct StateModifyRequest *req; 1249 op->h = h;
1263 uint16_t name_size = strlen (modifiers[i].name) + 1; 1250 op->res_cb = rcb;
1264 1251 op->cls = rcb_cls;
1265 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1266 modifiers[i].value_size);
1267 op->h = h;
1268 op->res_cb = rcb;
1269 op->cls = rcb_cls;
1270 1252
1271 req = (struct StateModifyRequest *) &op[1]; 1253 req = (struct StateModifyRequest *) &op[1];
1272 op->msg = (struct GNUNET_MessageHeader *) req; 1254 op->msg = (struct GNUNET_MessageHeader *) req;
1273 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); 1255 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1274 req->header.size = htons (sizeof (*req) + name_size 1256 req->header.size = htons (sizeof (*req));
1275 + modifiers[i].value_size); 1257 req->channel_key = *channel_key;
1276 req->channel_key = *channel_key; 1258 req->message_id = GNUNET_htonll (message_id);
1277 req->message_id = GNUNET_htonll (message_id); 1259 req->state_delta = GNUNET_htonll (state_delta);
1278 req->state_delta = GNUNET_htonll (state_delta);
1279 req->oper = modifiers[i].oper;
1280 req->name_size = htons (name_size);
1281 req->flags
1282 = 0 == i
1283 ? STATE_OP_FIRST
1284 : modifier_count - 1 == i
1285 ? STATE_OP_LAST
1286 : 0;
1287 1260
1288 memcpy (&req[1], modifiers[i].name, name_size); 1261 op->op_id = get_next_op_id (h);
1289 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); 1262 req->op_id = GNUNET_htonll (op->op_id);
1290 1263
1291 op->op_id = get_next_op_id (h); 1264 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1292 req->op_id = GNUNET_htonll (op->op_id); 1265 transmit_next (h);
1293 1266
1294 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1295 transmit_next (h);
1296 }
1297 return op; 1267 return op;
1298 /* FIXME: only the last operation is returned, 1268 /* FIXME: only the last operation is returned,
1299 * operation_cancel() should be able to cancel all of them. 1269 * operation_cancel() should be able to cancel all of them.
@@ -1308,7 +1278,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1308 * Handle for the PSYCstore. 1278 * Handle for the PSYCstore.
1309 * @param channel_key 1279 * @param channel_key
1310 * The channel we are interested in. 1280 * The channel we are interested in.
1311 * @param message_id 1281 * @param max_state_message_id
1282 * ID of the last stateful message before @a state_hash_message_id.
1283 * @param state_hash_message_id
1312 * ID of the message that contains the state_hash PSYC header variable. 1284 * ID of the message that contains the state_hash PSYC header variable.
1313 * @param modifier_count 1285 * @param modifier_count
1314 * Number of elements in the @a modifiers array. 1286 * Number of elements in the @a modifiers array.
@@ -1324,7 +1296,8 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1324struct GNUNET_PSYCSTORE_OperationHandle * 1296struct GNUNET_PSYCSTORE_OperationHandle *
1325GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, 1297GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1326 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1298 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1327 uint64_t message_id, 1299 uint64_t max_state_message_id,
1300 uint64_t state_hash_message_id,
1328 size_t modifier_count, 1301 size_t modifier_count,
1329 const struct GNUNET_ENV_Modifier *modifiers, 1302 const struct GNUNET_ENV_Modifier *modifiers,
1330 GNUNET_PSYCSTORE_ResultCallback rcb, 1303 GNUNET_PSYCSTORE_ResultCallback rcb,
@@ -1349,7 +1322,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1349 req->header.size = htons (sizeof (*req) + name_size 1322 req->header.size = htons (sizeof (*req) + name_size
1350 + modifiers[i].value_size); 1323 + modifiers[i].value_size);
1351 req->channel_key = *channel_key; 1324 req->channel_key = *channel_key;
1352 req->message_id = GNUNET_htonll (message_id); 1325 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1326 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1353 req->name_size = htons (name_size); 1327 req->name_size = htons (name_size);
1354 req->flags 1328 req->flags
1355 = (0 == i) 1329 = (0 == i)
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index 9e4def7ea..0a7824929 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -85,7 +85,7 @@ load_plugin (const struct GNUNET_CONFIGURATION_Handle *cfg)
85 struct GNUNET_PSYCSTORE_PluginFunctions *ret; 85 struct GNUNET_PSYCSTORE_PluginFunctions *ret;
86 char *libname; 86 char *libname;
87 87
88 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' psycstore plugin\n"), 88 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Loading `%s' psycstore plugin\n"),
89 plugin_name); 89 plugin_name);
90 GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name); 90 GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name);
91 if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg))) 91 if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg)))
@@ -306,15 +306,17 @@ run (void *cls, char *const *args, const char *cfgfile,
306 306
307 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1; 307 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
308 GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key, 308 GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
309 message_id, 1)); 309 message_id, 0));
310 310
311 GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, 311 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
312 "_foo", 312 GNUNET_ENV_OP_ASSIGN,
313 C2ARG("one two three"))); 313 "_foo",
314 C2ARG("one two three")));
314 315
315 GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, 316 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
316 "_foo_bar", slave_key, 317 GNUNET_ENV_OP_ASSIGN,
317 sizeof (*slave_key))); 318 "_foo_bar", slave_key,
319 sizeof (*slave_key)));
318 320
319 GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key, 321 GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
320 message_id)); 322 message_id));
@@ -366,15 +368,16 @@ run (void *cls, char *const *args, const char *cfgfile,
366 368
367 GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key)); 369 GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key));
368 370
369 GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key, 371 GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
370 "_sync_bar", scls.value[0], 372 "_sync_bar", scls.value[0],
371 scls.value_size[0])); 373 scls.value_size[0]));
372 374
373 GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key, 375 GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
374 "_sync_foo", scls.value[1], 376 "_sync_foo", scls.value[1],
375 scls.value_size[1])); 377 scls.value_size[1]));
376 378
377 GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key, 379 GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key,
380 max_state_msg_id,
378 INT64_MAX - 5)); 381 INT64_MAX - 5));
379 382
380 GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key, 383 GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key,
@@ -394,11 +397,13 @@ run (void *cls, char *const *args, const char *cfgfile,
394 397
395 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6; 398 message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
396 GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key, 399 GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
397 message_id, 3)); 400 message_id,
401 message_id - max_state_msg_id));
398 402
399 GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, 403 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
400 "_sync_foo", 404 GNUNET_ENV_OP_ASSIGN,
401 C2ARG("five six seven"))); 405 "_sync_foo",
406 C2ARG("five six seven")));
402 407
403 GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key, 408 GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
404 message_id)); 409 message_id));
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index c20868cbc..c869a862f 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -224,8 +224,8 @@ state_get_result (void *cls, int64_t result,
224 scls.value_size[0] = sizeof ("ten eleven twelve") - 1; 224 scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
225 225
226 scls.name[1] = "_sync_foo"; 226 scls.name[1] = "_sync_foo";
227 scls.value[1] = "one two three"; 227 scls.value[1] = "three two one";
228 scls.value_size[1] = sizeof ("one two three") - 1; 228 scls.value_size[1] = sizeof ("three two one") - 1;
229 229
230 op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync", 230 op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
231 &state_result, 231 &state_result,
@@ -253,11 +253,11 @@ counters_result (void *cls, int status, uint64_t max_fragment_id,
253 GNUNET_assert (result == 1); 253 GNUNET_assert (result == 1);
254 254
255 scls.n = 0; 255 scls.n = 0;
256 scls.name[0] = "_bar"; 256 scls.name[0] = "_sync_bar";
257 scls.value[0] = "four five six"; 257 scls.value[0] = "ten eleven twelve";
258 scls.value_size[0] = sizeof ("four five six") - 1; 258 scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
259 259
260 op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz", 260 op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_sync_bar_x_yy_zzz",
261 &state_result, &state_get_result, &scls); 261 &state_result, &state_get_result, &scls);
262} 262}
263 263
@@ -284,22 +284,9 @@ state_sync_result (void *cls, int64_t result,
284 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result); 284 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
285 GNUNET_assert (GNUNET_OK == result); 285 GNUNET_assert (GNUNET_OK == result);
286 286
287 modifiers[0] = (struct GNUNET_ENV_Modifier) {
288 .oper = '=',
289 .name = "_sync_foo",
290 .value = "one two three",
291 .value_size = sizeof ("one two three") - 1
292 };
293 modifiers[1] = (struct GNUNET_ENV_Modifier) {
294 .oper = '=',
295 .name = "_bar",
296 .value = "four five six",
297 .value_size = sizeof ("four five six") - 1
298 };
299
300 op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key, 287 op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
301 GNUNET_ntohll (fcls->msg[0]->message_id), 0, 288 GNUNET_ntohll (fcls->msg[0]->message_id),
302 2, modifiers, state_modify_result, fcls); 289 0, state_modify_result, fcls);
303} 290}
304 291
305 292
@@ -356,6 +343,7 @@ message_get_latest_result (void *cls, int64_t result,
356 343
357 op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key, 344 op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
358 GNUNET_ntohll (fcls->msg[0]->message_id) + 1, 345 GNUNET_ntohll (fcls->msg[0]->message_id) + 1,
346 GNUNET_ntohll (fcls->msg[0]->message_id) + 2,
359 2, modifiers, state_sync_result, fcls); 347 2, modifiers, state_sync_result, fcls);
360} 348}
361 349