diff options
author | Gabor X Toth <*@tg-x.net> | 2015-08-28 13:33:43 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-08-28 13:33:43 +0000 |
commit | 38963d1e81332032e0ac774f4f2c6b804c38802a (patch) | |
tree | ce33b979e47fe332c7c744744d60077a7e1fefee /src/psycstore | |
parent | b4fa14499c64140273850569247abda687803053 (diff) | |
download | gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.tar.gz gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.zip |
psyc/social: get state from psycstore
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 78 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_sqlite.c | 38 | ||||
-rw-r--r-- | src/psycstore/psyc_util_lib.c | 40 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore.c | 2 | ||||
-rw-r--r-- | src/psycstore/test_psycstore.c | 2 |
5 files changed, 117 insertions, 43 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 6e40e7849..1f9de54f8 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c | |||
@@ -217,7 +217,7 @@ send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | |||
217 | 217 | ||
218 | static int | 218 | static int |
219 | send_state_var (void *cls, const char *name, | 219 | send_state_var (void *cls, const char *name, |
220 | const void *value, size_t value_size) | 220 | const void *value, uint32_t value_size) |
221 | { | 221 | { |
222 | struct SendClosure *sc = cls; | 222 | struct SendClosure *sc = cls; |
223 | struct StateResult *res; | 223 | struct StateResult *res; |
@@ -496,14 +496,14 @@ handle_counters_get (void *cls, | |||
496 | 496 | ||
497 | struct StateModifyClosure | 497 | struct StateModifyClosure |
498 | { | 498 | { |
499 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; | 499 | const struct GNUNET_CRYPTO_EddsaPublicKey channel_key; |
500 | struct GNUNET_PSYC_ReceiveHandle *recv; | 500 | struct GNUNET_PSYC_ReceiveHandle *recv; |
501 | enum GNUNET_PSYC_MessageState msg_state; | 501 | enum GNUNET_PSYC_MessageState msg_state; |
502 | char mod_oper; | 502 | char mod_oper; |
503 | char *mod_name; | 503 | char *mod_name; |
504 | char *mod_value; | 504 | char *mod_value; |
505 | uint64_t mod_value_size; | 505 | uint32_t mod_value_size; |
506 | uint64_t mod_value_remaining; | 506 | uint32_t mod_value_remaining; |
507 | }; | 507 | }; |
508 | 508 | ||
509 | 509 | ||
@@ -513,6 +513,12 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | |||
513 | { | 513 | { |
514 | struct StateModifyClosure *scls = cls; | 514 | struct StateModifyClosure *scls = cls; |
515 | uint16_t psize; | 515 | uint16_t psize; |
516 | |||
517 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
518 | "recv_state_message_part() message_id: %" PRIu64 | ||
519 | ", data_offset: %" PRIu64 ", flags: %u\n", | ||
520 | message_id, data_offset, flags); | ||
521 | |||
516 | if (NULL == msg) | 522 | if (NULL == msg) |
517 | { | 523 | { |
518 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | 524 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; |
@@ -533,7 +539,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | |||
533 | pmod = (struct GNUNET_PSYC_MessageModifier *) msg; | 539 | pmod = (struct GNUNET_PSYC_MessageModifier *) msg; |
534 | psize = ntohs (pmod->header.size); | 540 | psize = ntohs (pmod->header.size); |
535 | uint16_t name_size = ntohs (pmod->name_size); | 541 | uint16_t name_size = ntohs (pmod->name_size); |
536 | uint16_t value_size = ntohs (pmod->value_size); | 542 | uint32_t value_size = ntohl (pmod->value_size); |
537 | 543 | ||
538 | const char *name = (const char *) &pmod[1]; | 544 | const char *name = (const char *) &pmod[1]; |
539 | const void *value = name + name_size; | 545 | const void *value = name + name_size; |
@@ -542,7 +548,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | |||
542 | { // Apply non-transient operation. | 548 | { // Apply non-transient operation. |
543 | if (psize == sizeof (*pmod) + name_size + value_size) | 549 | if (psize == sizeof (*pmod) + name_size + value_size) |
544 | { | 550 | { |
545 | db->state_modify_op (db->cls, scls->channel_key, | 551 | db->state_modify_op (db->cls, &scls->channel_key, |
546 | pmod->oper, name, value, value_size); | 552 | pmod->oper, name, value, value_size); |
547 | } | 553 | } |
548 | else | 554 | else |
@@ -576,7 +582,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | |||
576 | scls->mod_value_remaining -= psize - sizeof (*msg); | 582 | scls->mod_value_remaining -= psize - sizeof (*msg); |
577 | if (0 == scls->mod_value_remaining) | 583 | if (0 == scls->mod_value_remaining) |
578 | { | 584 | { |
579 | db->state_modify_op (db->cls, scls->channel_key, | 585 | db->state_modify_op (db->cls, &scls->channel_key, |
580 | scls->mod_oper, scls->mod_name, | 586 | scls->mod_oper, scls->mod_name, |
581 | scls->mod_value, scls->mod_value_size); | 587 | scls->mod_value, scls->mod_value_size); |
582 | GNUNET_free (scls->mod_name); | 588 | GNUNET_free (scls->mod_name); |
@@ -616,9 +622,13 @@ recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | |||
616 | scls); | 622 | scls); |
617 | } | 623 | } |
618 | 624 | ||
619 | const struct GNUNET_PSYC_MessageHeader * | 625 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
620 | pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1]; | 626 | "recv_state_fragment: %" PRIu64 "\n", GNUNET_ntohll (msg->fragment_id)); |
627 | |||
628 | struct GNUNET_PSYC_MessageHeader * | ||
629 | pmsg = GNUNET_PSYC_message_header_create (msg, flags); | ||
621 | GNUNET_PSYC_receive_message (scls->recv, pmsg); | 630 | GNUNET_PSYC_receive_message (scls->recv, pmsg); |
631 | GNUNET_free (pmsg); | ||
622 | 632 | ||
623 | return GNUNET_YES; | 633 | return GNUNET_YES; |
624 | } | 634 | } |
@@ -635,31 +645,41 @@ handle_state_modify (void *cls, | |||
635 | uint64_t message_id = GNUNET_ntohll (req->message_id); | 645 | uint64_t message_id = GNUNET_ntohll (req->message_id); |
636 | uint64_t state_delta = GNUNET_ntohll (req->state_delta); | 646 | uint64_t state_delta = GNUNET_ntohll (req->state_delta); |
637 | uint64_t ret_frags = 0; | 647 | uint64_t ret_frags = 0; |
648 | struct StateModifyClosure | ||
649 | scls = { .channel_key = req->channel_key }; | ||
638 | 650 | ||
639 | struct StateModifyClosure scls = { 0 }; | 651 | int ret = db->state_modify_begin (db->cls, &req->channel_key, |
640 | 652 | message_id, state_delta); | |
641 | if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key, | ||
642 | message_id, state_delta)) | ||
643 | { | ||
644 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
645 | _("Failed to begin modifying state!\n")); | ||
646 | GNUNET_break (0); | ||
647 | } | ||
648 | |||
649 | int ret = db->message_get (db->cls, &req->channel_key, | ||
650 | message_id, message_id, | ||
651 | &ret_frags, &recv_state_fragment, &scls); | ||
652 | 653 | ||
653 | if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) | 654 | if (GNUNET_OK != ret) |
654 | { | 655 | { |
655 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 656 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
656 | _("Failed to end modifying state!\n")); | 657 | _("Failed to begin modifying state: %d\n"), ret); |
657 | GNUNET_break (0); | ||
658 | } | 658 | } |
659 | 659 | else | |
660 | if (NULL != scls.recv) | ||
661 | { | 660 | { |
662 | GNUNET_PSYC_receive_destroy (scls.recv); | 661 | ret = db->message_get (db->cls, &req->channel_key, |
662 | message_id, message_id, | ||
663 | &ret_frags, &recv_state_fragment, &scls); | ||
664 | if (GNUNET_OK != ret) | ||
665 | { | ||
666 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
667 | _("Failed to modify state: %d\n"), ret); | ||
668 | GNUNET_break (0); | ||
669 | } | ||
670 | else | ||
671 | { | ||
672 | if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) | ||
673 | { | ||
674 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
675 | _("Failed to end modifying state!\n")); | ||
676 | GNUNET_break (0); | ||
677 | } | ||
678 | } | ||
679 | if (NULL != scls.recv) | ||
680 | { | ||
681 | GNUNET_PSYC_receive_destroy (scls.recv); | ||
682 | } | ||
663 | } | 683 | } |
664 | 684 | ||
665 | send_result_code (client, req->op_id, ret, NULL); | 685 | send_result_code (client, req->op_id, ret, NULL); |
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c index 1abc479d2..1bf14644b 100644 --- a/src/psycstore/plugin_psycstore_sqlite.c +++ b/src/psycstore/plugin_psycstore_sqlite.c | |||
@@ -64,7 +64,8 @@ | |||
64 | 64 | ||
65 | enum Transactions { | 65 | enum Transactions { |
66 | TRANSACTION_NONE = 0, | 66 | TRANSACTION_NONE = 0, |
67 | TRANSACTION_STATE_MODIFY | 67 | TRANSACTION_STATE_MODIFY, |
68 | TRANSACTION_STATE_SYNC, | ||
68 | }; | 69 | }; |
69 | 70 | ||
70 | /** | 71 | /** |
@@ -1522,18 +1523,27 @@ state_modify_begin (void *cls, | |||
1522 | 1523 | ||
1523 | uint64_t max_state_message_id = 0; | 1524 | uint64_t max_state_message_id = 0; |
1524 | int ret = counters_state_get (plugin, channel_key, &max_state_message_id); | 1525 | int ret = counters_state_get (plugin, channel_key, &max_state_message_id); |
1525 | if (GNUNET_OK != ret) | 1526 | switch (ret) |
1527 | { | ||
1528 | case GNUNET_OK: | ||
1529 | case GNUNET_NO: // no state yet | ||
1530 | ret = GNUNET_OK; | ||
1531 | break; | ||
1532 | default: | ||
1526 | return ret; | 1533 | return ret; |
1534 | } | ||
1527 | 1535 | ||
1528 | if (message_id - state_delta != max_state_message_id) | 1536 | if (max_state_message_id < message_id - state_delta) |
1529 | return GNUNET_NO; | 1537 | return GNUNET_NO; /* some stateful messages not yet applied */ |
1538 | else if (message_id - state_delta < max_state_message_id) | ||
1539 | return GNUNET_NO; /* changes already applied */ | ||
1530 | } | 1540 | } |
1531 | 1541 | ||
1532 | // Make sure no other transaction is going on. | ||
1533 | if (TRANSACTION_NONE != plugin->transaction) | 1542 | if (TRANSACTION_NONE != plugin->transaction) |
1534 | if (GNUNET_OK != transaction_rollback (plugin)) | 1543 | { |
1535 | return GNUNET_SYSERR; | 1544 | /** @todo FIXME: wait for other transaction to finish */ |
1536 | 1545 | return GNUNET_SYSERR; | |
1546 | } | ||
1537 | return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); | 1547 | return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); |
1538 | } | 1548 | } |
1539 | 1549 | ||
@@ -1560,8 +1570,8 @@ state_modify_op (void *cls, | |||
1560 | return state_assign (plugin, plugin->insert_state_current, channel_key, | 1570 | return state_assign (plugin, plugin->insert_state_current, channel_key, |
1561 | name, value, value_size); | 1571 | name, value, value_size); |
1562 | 1572 | ||
1563 | /// @todo implement more state operations | 1573 | default: /** @todo implement more state operations */ |
1564 | default: | 1574 | GNUNET_break (0); |
1565 | return GNUNET_SYSERR; | 1575 | return GNUNET_SYSERR; |
1566 | } | 1576 | } |
1567 | } | 1577 | } |
@@ -1630,7 +1640,13 @@ state_sync_end (void *cls, | |||
1630 | struct Plugin *plugin = cls; | 1640 | struct Plugin *plugin = cls; |
1631 | int ret = GNUNET_SYSERR; | 1641 | int ret = GNUNET_SYSERR; |
1632 | 1642 | ||
1633 | GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE) | 1643 | if (TRANSACTION_NONE != plugin->transaction) |
1644 | { | ||
1645 | /** @todo FIXME: wait for other transaction to finish */ | ||
1646 | return GNUNET_SYSERR; | ||
1647 | } | ||
1648 | |||
1649 | GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) | ||
1634 | && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) | 1650 | && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) |
1635 | && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, | 1651 | && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, |
1636 | channel_key) | 1652 | channel_key) |
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c index 75f97aad0..f6dd4e593 100644 --- a/src/psycstore/psyc_util_lib.c +++ b/src/psycstore/psyc_util_lib.c | |||
@@ -570,7 +570,7 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
570 | { | 570 | { |
571 | struct GNUNET_PSYC_TransmitHandle *tmit = cls; | 571 | struct GNUNET_PSYC_TransmitHandle *tmit = cls; |
572 | uint16_t name_size = 0; | 572 | uint16_t name_size = 0; |
573 | size_t value_size = 0; | 573 | uint32_t value_size = 0; |
574 | const char *value = NULL; | 574 | const char *value = NULL; |
575 | 575 | ||
576 | if (NULL != oper) | 576 | if (NULL != oper) |
@@ -1231,3 +1231,41 @@ GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg, | |||
1231 | ? GNUNET_OK | 1231 | ? GNUNET_OK |
1232 | : GNUNET_SYSERR; | 1232 | : GNUNET_SYSERR; |
1233 | } | 1233 | } |
1234 | |||
1235 | |||
1236 | /** | ||
1237 | * Initialize PSYC message header. | ||
1238 | */ | ||
1239 | void | ||
1240 | GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, | ||
1241 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1242 | uint32_t flags) | ||
1243 | { | ||
1244 | uint16_t size = ntohs (mmsg->header.size); | ||
1245 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1246 | |||
1247 | pmsg->header.size = htons (psize); | ||
1248 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1249 | pmsg->message_id = mmsg->message_id; | ||
1250 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
1251 | pmsg->flags = htonl (flags); | ||
1252 | |||
1253 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
1254 | } | ||
1255 | |||
1256 | |||
1257 | /** | ||
1258 | * Create a new PSYC message header from a multicast message for sending it to clients. | ||
1259 | */ | ||
1260 | struct GNUNET_PSYC_MessageHeader * | ||
1261 | GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1262 | uint32_t flags) | ||
1263 | { | ||
1264 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
1265 | uint16_t size = ntohs (mmsg->header.size); | ||
1266 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1267 | |||
1268 | pmsg = GNUNET_malloc (psize); | ||
1269 | GNUNET_PSYC_message_header_init (pmsg, mmsg, flags); | ||
1270 | return pmsg; | ||
1271 | } | ||
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c index 0a7824929..5a5f970b9 100644 --- a/src/psycstore/test_plugin_psycstore.c +++ b/src/psycstore/test_plugin_psycstore.c | |||
@@ -141,7 +141,7 @@ struct StateClosure { | |||
141 | }; | 141 | }; |
142 | 142 | ||
143 | static int | 143 | static int |
144 | state_cb (void *cls, const char *name, const void *value, size_t value_size) | 144 | state_cb (void *cls, const char *name, const void *value, uint32_t value_size) |
145 | { | 145 | { |
146 | struct StateClosure *scls = cls; | 146 | struct StateClosure *scls = cls; |
147 | const void *val = scls->value[scls->n]; | 147 | const void *val = scls->value[scls->n]; |
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c index c869a862f..58e6243b7 100644 --- a/src/psycstore/test_psycstore.c +++ b/src/psycstore/test_psycstore.c | |||
@@ -170,7 +170,7 @@ state_reset_result (void *cls, int64_t result, | |||
170 | 170 | ||
171 | 171 | ||
172 | static int | 172 | static int |
173 | state_result (void *cls, const char *name, const void *value, size_t value_size) | 173 | state_result (void *cls, const char *name, const void *value, uint32_t value_size) |
174 | { | 174 | { |
175 | struct StateClosure *scls = cls; | 175 | struct StateClosure *scls = cls; |
176 | const char *nam = scls->name[scls->n]; | 176 | const char *nam = scls->name[scls->n]; |