aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-08-28 13:33:43 +0000
committerGabor X Toth <*@tg-x.net>2015-08-28 13:33:43 +0000
commit38963d1e81332032e0ac774f4f2c6b804c38802a (patch)
treece33b979e47fe332c7c744744d60077a7e1fefee /src/psycstore
parentb4fa14499c64140273850569247abda687803053 (diff)
downloadgnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.tar.gz
gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.zip
psyc/social: get state from psycstore
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/gnunet-service-psycstore.c78
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c38
-rw-r--r--src/psycstore/psyc_util_lib.c40
-rw-r--r--src/psycstore/test_plugin_psycstore.c2
-rw-r--r--src/psycstore/test_psycstore.c2
5 files changed, 117 insertions, 43 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 6e40e7849..1f9de54f8 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -217,7 +217,7 @@ send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
217 217
218static int 218static int
219send_state_var (void *cls, const char *name, 219send_state_var (void *cls, const char *name,
220 const void *value, size_t value_size) 220 const void *value, uint32_t value_size)
221{ 221{
222 struct SendClosure *sc = cls; 222 struct SendClosure *sc = cls;
223 struct StateResult *res; 223 struct StateResult *res;
@@ -496,14 +496,14 @@ handle_counters_get (void *cls,
496 496
497struct StateModifyClosure 497struct StateModifyClosure
498{ 498{
499 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; 499 const struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
500 struct GNUNET_PSYC_ReceiveHandle *recv; 500 struct GNUNET_PSYC_ReceiveHandle *recv;
501 enum GNUNET_PSYC_MessageState msg_state; 501 enum GNUNET_PSYC_MessageState msg_state;
502 char mod_oper; 502 char mod_oper;
503 char *mod_name; 503 char *mod_name;
504 char *mod_value; 504 char *mod_value;
505 uint64_t mod_value_size; 505 uint32_t mod_value_size;
506 uint64_t mod_value_remaining; 506 uint32_t mod_value_remaining;
507}; 507};
508 508
509 509
@@ -513,6 +513,12 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
513{ 513{
514 struct StateModifyClosure *scls = cls; 514 struct StateModifyClosure *scls = cls;
515 uint16_t psize; 515 uint16_t psize;
516
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "recv_state_message_part() message_id: %" PRIu64
519 ", data_offset: %" PRIu64 ", flags: %u\n",
520 message_id, data_offset, flags);
521
516 if (NULL == msg) 522 if (NULL == msg)
517 { 523 {
518 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; 524 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
@@ -533,7 +539,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
533 pmod = (struct GNUNET_PSYC_MessageModifier *) msg; 539 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
534 psize = ntohs (pmod->header.size); 540 psize = ntohs (pmod->header.size);
535 uint16_t name_size = ntohs (pmod->name_size); 541 uint16_t name_size = ntohs (pmod->name_size);
536 uint16_t value_size = ntohs (pmod->value_size); 542 uint32_t value_size = ntohl (pmod->value_size);
537 543
538 const char *name = (const char *) &pmod[1]; 544 const char *name = (const char *) &pmod[1];
539 const void *value = name + name_size; 545 const void *value = name + name_size;
@@ -542,7 +548,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
542 { // Apply non-transient operation. 548 { // Apply non-transient operation.
543 if (psize == sizeof (*pmod) + name_size + value_size) 549 if (psize == sizeof (*pmod) + name_size + value_size)
544 { 550 {
545 db->state_modify_op (db->cls, scls->channel_key, 551 db->state_modify_op (db->cls, &scls->channel_key,
546 pmod->oper, name, value, value_size); 552 pmod->oper, name, value, value_size);
547 } 553 }
548 else 554 else
@@ -576,7 +582,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
576 scls->mod_value_remaining -= psize - sizeof (*msg); 582 scls->mod_value_remaining -= psize - sizeof (*msg);
577 if (0 == scls->mod_value_remaining) 583 if (0 == scls->mod_value_remaining)
578 { 584 {
579 db->state_modify_op (db->cls, scls->channel_key, 585 db->state_modify_op (db->cls, &scls->channel_key,
580 scls->mod_oper, scls->mod_name, 586 scls->mod_oper, scls->mod_name,
581 scls->mod_value, scls->mod_value_size); 587 scls->mod_value, scls->mod_value_size);
582 GNUNET_free (scls->mod_name); 588 GNUNET_free (scls->mod_name);
@@ -616,9 +622,13 @@ recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
616 scls); 622 scls);
617 } 623 }
618 624
619 const struct GNUNET_PSYC_MessageHeader * 625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620 pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1]; 626 "recv_state_fragment: %" PRIu64 "\n", GNUNET_ntohll (msg->fragment_id));
627
628 struct GNUNET_PSYC_MessageHeader *
629 pmsg = GNUNET_PSYC_message_header_create (msg, flags);
621 GNUNET_PSYC_receive_message (scls->recv, pmsg); 630 GNUNET_PSYC_receive_message (scls->recv, pmsg);
631 GNUNET_free (pmsg);
622 632
623 return GNUNET_YES; 633 return GNUNET_YES;
624} 634}
@@ -635,31 +645,41 @@ handle_state_modify (void *cls,
635 uint64_t message_id = GNUNET_ntohll (req->message_id); 645 uint64_t message_id = GNUNET_ntohll (req->message_id);
636 uint64_t state_delta = GNUNET_ntohll (req->state_delta); 646 uint64_t state_delta = GNUNET_ntohll (req->state_delta);
637 uint64_t ret_frags = 0; 647 uint64_t ret_frags = 0;
648 struct StateModifyClosure
649 scls = { .channel_key = req->channel_key };
638 650
639 struct StateModifyClosure scls = { 0 }; 651 int ret = db->state_modify_begin (db->cls, &req->channel_key,
640 652 message_id, state_delta);
641 if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
642 message_id, state_delta))
643 {
644 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
645 _("Failed to begin modifying state!\n"));
646 GNUNET_break (0);
647 }
648
649 int ret = db->message_get (db->cls, &req->channel_key,
650 message_id, message_id,
651 &ret_frags, &recv_state_fragment, &scls);
652 653
653 if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) 654 if (GNUNET_OK != ret)
654 { 655 {
655 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 656 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
656 _("Failed to end modifying state!\n")); 657 _("Failed to begin modifying state: %d\n"), ret);
657 GNUNET_break (0);
658 } 658 }
659 659 else
660 if (NULL != scls.recv)
661 { 660 {
662 GNUNET_PSYC_receive_destroy (scls.recv); 661 ret = db->message_get (db->cls, &req->channel_key,
662 message_id, message_id,
663 &ret_frags, &recv_state_fragment, &scls);
664 if (GNUNET_OK != ret)
665 {
666 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
667 _("Failed to modify state: %d\n"), ret);
668 GNUNET_break (0);
669 }
670 else
671 {
672 if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id))
673 {
674 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
675 _("Failed to end modifying state!\n"));
676 GNUNET_break (0);
677 }
678 }
679 if (NULL != scls.recv)
680 {
681 GNUNET_PSYC_receive_destroy (scls.recv);
682 }
663 } 683 }
664 684
665 send_result_code (client, req->op_id, ret, NULL); 685 send_result_code (client, req->op_id, ret, NULL);
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 1abc479d2..1bf14644b 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -64,7 +64,8 @@
64 64
65enum Transactions { 65enum Transactions {
66 TRANSACTION_NONE = 0, 66 TRANSACTION_NONE = 0,
67 TRANSACTION_STATE_MODIFY 67 TRANSACTION_STATE_MODIFY,
68 TRANSACTION_STATE_SYNC,
68}; 69};
69 70
70/** 71/**
@@ -1522,18 +1523,27 @@ state_modify_begin (void *cls,
1522 1523
1523 uint64_t max_state_message_id = 0; 1524 uint64_t max_state_message_id = 0;
1524 int ret = counters_state_get (plugin, channel_key, &max_state_message_id); 1525 int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
1525 if (GNUNET_OK != ret) 1526 switch (ret)
1527 {
1528 case GNUNET_OK:
1529 case GNUNET_NO: // no state yet
1530 ret = GNUNET_OK;
1531 break;
1532 default:
1526 return ret; 1533 return ret;
1534 }
1527 1535
1528 if (message_id - state_delta != max_state_message_id) 1536 if (max_state_message_id < message_id - state_delta)
1529 return GNUNET_NO; 1537 return GNUNET_NO; /* some stateful messages not yet applied */
1538 else if (message_id - state_delta < max_state_message_id)
1539 return GNUNET_NO; /* changes already applied */
1530 } 1540 }
1531 1541
1532 // Make sure no other transaction is going on.
1533 if (TRANSACTION_NONE != plugin->transaction) 1542 if (TRANSACTION_NONE != plugin->transaction)
1534 if (GNUNET_OK != transaction_rollback (plugin)) 1543 {
1535 return GNUNET_SYSERR; 1544 /** @todo FIXME: wait for other transaction to finish */
1536 1545 return GNUNET_SYSERR;
1546 }
1537 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); 1547 return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
1538} 1548}
1539 1549
@@ -1560,8 +1570,8 @@ state_modify_op (void *cls,
1560 return state_assign (plugin, plugin->insert_state_current, channel_key, 1570 return state_assign (plugin, plugin->insert_state_current, channel_key,
1561 name, value, value_size); 1571 name, value, value_size);
1562 1572
1563 /// @todo implement more state operations 1573 default: /** @todo implement more state operations */
1564 default: 1574 GNUNET_break (0);
1565 return GNUNET_SYSERR; 1575 return GNUNET_SYSERR;
1566 } 1576 }
1567} 1577}
@@ -1630,7 +1640,13 @@ state_sync_end (void *cls,
1630 struct Plugin *plugin = cls; 1640 struct Plugin *plugin = cls;
1631 int ret = GNUNET_SYSERR; 1641 int ret = GNUNET_SYSERR;
1632 1642
1633 GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE) 1643 if (TRANSACTION_NONE != plugin->transaction)
1644 {
1645 /** @todo FIXME: wait for other transaction to finish */
1646 return GNUNET_SYSERR;
1647 }
1648
1649 GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC)
1634 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) 1650 && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
1635 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, 1651 && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
1636 channel_key) 1652 channel_key)
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c
index 75f97aad0..f6dd4e593 100644
--- a/src/psycstore/psyc_util_lib.c
+++ b/src/psycstore/psyc_util_lib.c
@@ -570,7 +570,7 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
570{ 570{
571 struct GNUNET_PSYC_TransmitHandle *tmit = cls; 571 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
572 uint16_t name_size = 0; 572 uint16_t name_size = 0;
573 size_t value_size = 0; 573 uint32_t value_size = 0;
574 const char *value = NULL; 574 const char *value = NULL;
575 575
576 if (NULL != oper) 576 if (NULL != oper)
@@ -1231,3 +1231,41 @@ GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1231 ? GNUNET_OK 1231 ? GNUNET_OK
1232 : GNUNET_SYSERR; 1232 : GNUNET_SYSERR;
1233} 1233}
1234
1235
1236/**
1237 * Initialize PSYC message header.
1238 */
1239void
1240GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1241 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1242 uint32_t flags)
1243{
1244 uint16_t size = ntohs (mmsg->header.size);
1245 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1246
1247 pmsg->header.size = htons (psize);
1248 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1249 pmsg->message_id = mmsg->message_id;
1250 pmsg->fragment_offset = mmsg->fragment_offset;
1251 pmsg->flags = htonl (flags);
1252
1253 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1254}
1255
1256
1257/**
1258 * Create a new PSYC message header from a multicast message for sending it to clients.
1259 */
1260struct GNUNET_PSYC_MessageHeader *
1261GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1262 uint32_t flags)
1263{
1264 struct GNUNET_PSYC_MessageHeader *pmsg;
1265 uint16_t size = ntohs (mmsg->header.size);
1266 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1267
1268 pmsg = GNUNET_malloc (psize);
1269 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1270 return pmsg;
1271}
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index 0a7824929..5a5f970b9 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -141,7 +141,7 @@ struct StateClosure {
141}; 141};
142 142
143static int 143static int
144state_cb (void *cls, const char *name, const void *value, size_t value_size) 144state_cb (void *cls, const char *name, const void *value, uint32_t value_size)
145{ 145{
146 struct StateClosure *scls = cls; 146 struct StateClosure *scls = cls;
147 const void *val = scls->value[scls->n]; 147 const void *val = scls->value[scls->n];
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index c869a862f..58e6243b7 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -170,7 +170,7 @@ state_reset_result (void *cls, int64_t result,
170 170
171 171
172static int 172static int
173state_result (void *cls, const char *name, const void *value, size_t value_size) 173state_result (void *cls, const char *name, const void *value, uint32_t value_size)
174{ 174{
175 struct StateClosure *scls = cls; 175 struct StateClosure *scls = cls;
176 const char *nam = scls->name[scls->n]; 176 const char *nam = scls->name[scls->n];