diff options
author | Gabor X Toth <*@tg-x.net> | 2015-07-18 00:03:06 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-07-18 00:03:06 +0000 |
commit | d38544730123a1f365ef287a7e88060d97b266b7 (patch) | |
tree | e66f5ee143adde1a5206f50bbbdd00a8a179fb81 /src/psycstore/gnunet-service-psycstore.c | |
parent | 2275976cf61565bde4f17e8c2c0bc0d359541ac4 (diff) | |
download | gnunet-d38544730123a1f365ef287a7e88060d97b266b7.tar.gz gnunet-d38544730123a1f365ef287a7e88060d97b266b7.zip |
psyc/store: apply state modifiers
Diffstat (limited to 'src/psycstore/gnunet-service-psycstore.c')
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 218 |
1 files changed, 160 insertions, 58 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 556712df4..6e40e7849 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c | |||
@@ -32,6 +32,7 @@ | |||
32 | #include "gnunet_constants.h" | 32 | #include "gnunet_constants.h" |
33 | #include "gnunet_protocols.h" | 33 | #include "gnunet_protocols.h" |
34 | #include "gnunet_statistics_service.h" | 34 | #include "gnunet_statistics_service.h" |
35 | #include "gnunet_psyc_util_lib.h" | ||
35 | #include "gnunet_psycstore_service.h" | 36 | #include "gnunet_psycstore_service.h" |
36 | #include "gnunet_psycstore_plugin.h" | 37 | #include "gnunet_psycstore_plugin.h" |
37 | #include "psycstore.h" | 38 | #include "psycstore.h" |
@@ -493,7 +494,136 @@ handle_counters_get (void *cls, | |||
493 | } | 494 | } |
494 | 495 | ||
495 | 496 | ||
496 | /** @todo FIXME: stop processing further state modify messages after an error */ | 497 | struct StateModifyClosure |
498 | { | ||
499 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; | ||
500 | struct GNUNET_PSYC_ReceiveHandle *recv; | ||
501 | enum GNUNET_PSYC_MessageState msg_state; | ||
502 | char mod_oper; | ||
503 | char *mod_name; | ||
504 | char *mod_value; | ||
505 | uint64_t mod_value_size; | ||
506 | uint64_t mod_value_remaining; | ||
507 | }; | ||
508 | |||
509 | |||
510 | static void | ||
511 | recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | ||
512 | uint32_t flags, const struct GNUNET_MessageHeader *msg) | ||
513 | { | ||
514 | struct StateModifyClosure *scls = cls; | ||
515 | uint16_t psize; | ||
516 | if (NULL == msg) | ||
517 | { | ||
518 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
519 | return; | ||
520 | } | ||
521 | |||
522 | switch (ntohs (msg->type)) | ||
523 | { | ||
524 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
525 | { | ||
526 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; | ||
527 | break; | ||
528 | } | ||
529 | |||
530 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
531 | { | ||
532 | struct GNUNET_PSYC_MessageModifier * | ||
533 | pmod = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
534 | psize = ntohs (pmod->header.size); | ||
535 | uint16_t name_size = ntohs (pmod->name_size); | ||
536 | uint16_t value_size = ntohs (pmod->value_size); | ||
537 | |||
538 | const char *name = (const char *) &pmod[1]; | ||
539 | const void *value = name + name_size; | ||
540 | |||
541 | if (GNUNET_ENV_OP_SET != pmod->oper) | ||
542 | { // Apply non-transient operation. | ||
543 | if (psize == sizeof (*pmod) + name_size + value_size) | ||
544 | { | ||
545 | db->state_modify_op (db->cls, scls->channel_key, | ||
546 | pmod->oper, name, value, value_size); | ||
547 | } | ||
548 | else | ||
549 | { | ||
550 | scls->mod_oper = pmod->oper; | ||
551 | scls->mod_name = GNUNET_malloc (name_size); | ||
552 | memcpy (scls->mod_name, name, name_size); | ||
553 | |||
554 | scls->mod_value_size = value_size; | ||
555 | scls->mod_value = GNUNET_malloc (scls->mod_value_size); | ||
556 | scls->mod_value_remaining | ||
557 | = scls->mod_value_size - (psize - sizeof (*pmod) - name_size); | ||
558 | memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining); | ||
559 | } | ||
560 | } | ||
561 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
562 | break; | ||
563 | } | ||
564 | |||
565 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
566 | if (GNUNET_ENV_OP_SET != scls->mod_oper) | ||
567 | { | ||
568 | if (scls->mod_value_remaining == 0) | ||
569 | { | ||
570 | GNUNET_break_op (0); | ||
571 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
572 | } | ||
573 | psize = ntohs (msg->size); | ||
574 | memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining), | ||
575 | &msg[1], psize - sizeof (*msg)); | ||
576 | scls->mod_value_remaining -= psize - sizeof (*msg); | ||
577 | if (0 == scls->mod_value_remaining) | ||
578 | { | ||
579 | db->state_modify_op (db->cls, scls->channel_key, | ||
580 | scls->mod_oper, scls->mod_name, | ||
581 | scls->mod_value, scls->mod_value_size); | ||
582 | GNUNET_free (scls->mod_name); | ||
583 | GNUNET_free (scls->mod_value); | ||
584 | scls->mod_oper = 0; | ||
585 | scls->mod_name = NULL; | ||
586 | scls->mod_value = NULL; | ||
587 | scls->mod_value_size = 0; | ||
588 | } | ||
589 | } | ||
590 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; | ||
591 | break; | ||
592 | |||
593 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
594 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
595 | break; | ||
596 | |||
597 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
598 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; | ||
599 | break; | ||
600 | |||
601 | default: | ||
602 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
603 | } | ||
604 | } | ||
605 | |||
606 | |||
607 | static int | ||
608 | recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | ||
609 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
610 | { | ||
611 | struct StateModifyClosure *scls = cls; | ||
612 | |||
613 | if (NULL == scls->recv) | ||
614 | { | ||
615 | scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part, | ||
616 | scls); | ||
617 | } | ||
618 | |||
619 | const struct GNUNET_PSYC_MessageHeader * | ||
620 | pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1]; | ||
621 | GNUNET_PSYC_receive_message (scls->recv, pmsg); | ||
622 | |||
623 | return GNUNET_YES; | ||
624 | } | ||
625 | |||
626 | |||
497 | static void | 627 | static void |
498 | handle_state_modify (void *cls, | 628 | handle_state_modify (void *cls, |
499 | struct GNUNET_SERVER_Client *client, | 629 | struct GNUNET_SERVER_Client *client, |
@@ -502,65 +632,36 @@ handle_state_modify (void *cls, | |||
502 | const struct StateModifyRequest *req | 632 | const struct StateModifyRequest *req |
503 | = (const struct StateModifyRequest *) msg; | 633 | = (const struct StateModifyRequest *) msg; |
504 | 634 | ||
505 | int ret = GNUNET_SYSERR; | 635 | uint64_t message_id = GNUNET_ntohll (req->message_id); |
506 | const char *name = (const char *) &req[1]; | 636 | uint64_t state_delta = GNUNET_ntohll (req->state_delta); |
507 | uint16_t name_size = ntohs (req->name_size); | 637 | uint64_t ret_frags = 0; |
508 | 638 | ||
509 | if (name_size <= 2 || '\0' != name[name_size - 1]) | 639 | struct StateModifyClosure scls = { 0 }; |
640 | |||
641 | if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key, | ||
642 | message_id, state_delta)) | ||
510 | { | 643 | { |
511 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 644 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
512 | _("Tried to set invalid state variable name!\n")); | 645 | _("Failed to begin modifying state!\n")); |
513 | GNUNET_break_op (0); | 646 | GNUNET_break (0); |
514 | } | 647 | } |
515 | else | ||
516 | { | ||
517 | ret = GNUNET_OK; | ||
518 | 648 | ||
519 | if (req->flags & STATE_OP_FIRST) | 649 | int ret = db->message_get (db->cls, &req->channel_key, |
520 | { | 650 | message_id, message_id, |
521 | ret = db->state_modify_begin (db->cls, &req->channel_key, | 651 | &ret_frags, &recv_state_fragment, &scls); |
522 | GNUNET_ntohll (req->message_id), | ||
523 | GNUNET_ntohll (req->state_delta)); | ||
524 | } | ||
525 | if (ret != GNUNET_OK) | ||
526 | { | ||
527 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
528 | _("Failed to begin modifying state!\n")); | ||
529 | } | ||
530 | else | ||
531 | { | ||
532 | switch (req->oper) | ||
533 | { | ||
534 | case GNUNET_ENV_OP_ASSIGN: | ||
535 | ret = db->state_modify_set (db->cls, &req->channel_key, | ||
536 | (const char *) &req[1], | ||
537 | name + ntohs (req->name_size), | ||
538 | ntohs (req->header.size) - sizeof (*req) | ||
539 | - ntohs (req->name_size)); | ||
540 | break; | ||
541 | default: | ||
542 | #if TODO | ||
543 | ret = GNUNET_ENV_operation ((const char *) &req[1], | ||
544 | current_value, current_value_size, | ||
545 | req->oper, name + ntohs (req->name_size), | ||
546 | ntohs (req->header.size) - sizeof (*req) | ||
547 | - ntohs (req->name_size), &value, &value_size); | ||
548 | #endif | ||
549 | ret = GNUNET_SYSERR; | ||
550 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
551 | _("Unknown operator: %c\n"), req->oper); | ||
552 | } | ||
553 | } | ||
554 | 652 | ||
555 | if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) | 653 | if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) |
556 | { | 654 | { |
557 | ret = db->state_modify_end (db->cls, &req->channel_key, | 655 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
558 | GNUNET_ntohll (req->message_id)); | 656 | _("Failed to end modifying state!\n")); |
559 | if (ret != GNUNET_OK) | 657 | GNUNET_break (0); |
560 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
561 | _("Failed to end modifying state!\n")); | ||
562 | } | ||
563 | } | 658 | } |
659 | |||
660 | if (NULL != scls.recv) | ||
661 | { | ||
662 | GNUNET_PSYC_receive_destroy (scls.recv); | ||
663 | } | ||
664 | |||
564 | send_result_code (client, req->op_id, ret, NULL); | 665 | send_result_code (client, req->op_id, ret, NULL); |
565 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 666 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
566 | } | 667 | } |
@@ -600,16 +701,17 @@ handle_state_sync (void *cls, | |||
600 | } | 701 | } |
601 | else | 702 | else |
602 | { | 703 | { |
603 | ret = db->state_sync_set (db->cls, &req->channel_key, name, | 704 | ret = db->state_sync_assign (db->cls, &req->channel_key, name, |
604 | name + ntohs (req->name_size), | 705 | name + ntohs (req->name_size), |
605 | ntohs (req->header.size) - sizeof (*req) | 706 | ntohs (req->header.size) - sizeof (*req) |
606 | - ntohs (req->name_size)); | 707 | - ntohs (req->name_size)); |
607 | } | 708 | } |
608 | 709 | ||
609 | if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) | 710 | if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) |
610 | { | 711 | { |
611 | ret = db->state_sync_end (db->cls, &req->channel_key, | 712 | ret = db->state_sync_end (db->cls, &req->channel_key, |
612 | GNUNET_ntohll (req->message_id)); | 713 | GNUNET_ntohll (req->max_state_message_id), |
714 | GNUNET_ntohll (req->state_hash_message_id)); | ||
613 | if (ret != GNUNET_OK) | 715 | if (ret != GNUNET_OK) |
614 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 716 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
615 | _("Failed to end synchronizing state!\n")); | 717 | _("Failed to end synchronizing state!\n")); |