summaryrefslogtreecommitdiff
path: root/src/psycstore/gnunet-service-psycstore.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
committerGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
commitd38544730123a1f365ef287a7e88060d97b266b7 (patch)
treee66f5ee143adde1a5206f50bbbdd00a8a179fb81 /src/psycstore/gnunet-service-psycstore.c
parent2275976cf61565bde4f17e8c2c0bc0d359541ac4 (diff)
downloadgnunet-d38544730123a1f365ef287a7e88060d97b266b7.tar.gz
gnunet-d38544730123a1f365ef287a7e88060d97b266b7.zip
psyc/store: apply state modifiers
Diffstat (limited to 'src/psycstore/gnunet-service-psycstore.c')
-rw-r--r--src/psycstore/gnunet-service-psycstore.c218
1 files changed, 160 insertions, 58 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 556712df4..6e40e7849 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -32,6 +32,7 @@
32#include "gnunet_constants.h" 32#include "gnunet_constants.h"
33#include "gnunet_protocols.h" 33#include "gnunet_protocols.h"
34#include "gnunet_statistics_service.h" 34#include "gnunet_statistics_service.h"
35#include "gnunet_psyc_util_lib.h"
35#include "gnunet_psycstore_service.h" 36#include "gnunet_psycstore_service.h"
36#include "gnunet_psycstore_plugin.h" 37#include "gnunet_psycstore_plugin.h"
37#include "psycstore.h" 38#include "psycstore.h"
@@ -493,7 +494,136 @@ handle_counters_get (void *cls,
493} 494}
494 495
495 496
496/** @todo FIXME: stop processing further state modify messages after an error */ 497struct StateModifyClosure
498{
499 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
500 struct GNUNET_PSYC_ReceiveHandle *recv;
501 enum GNUNET_PSYC_MessageState msg_state;
502 char mod_oper;
503 char *mod_name;
504 char *mod_value;
505 uint64_t mod_value_size;
506 uint64_t mod_value_remaining;
507};
508
509
510static void
511recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
512 uint32_t flags, const struct GNUNET_MessageHeader *msg)
513{
514 struct StateModifyClosure *scls = cls;
515 uint16_t psize;
516 if (NULL == msg)
517 {
518 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
519 return;
520 }
521
522 switch (ntohs (msg->type))
523 {
524 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
525 {
526 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
527 break;
528 }
529
530 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
531 {
532 struct GNUNET_PSYC_MessageModifier *
533 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
534 psize = ntohs (pmod->header.size);
535 uint16_t name_size = ntohs (pmod->name_size);
536 uint16_t value_size = ntohs (pmod->value_size);
537
538 const char *name = (const char *) &pmod[1];
539 const void *value = name + name_size;
540
541 if (GNUNET_ENV_OP_SET != pmod->oper)
542 { // Apply non-transient operation.
543 if (psize == sizeof (*pmod) + name_size + value_size)
544 {
545 db->state_modify_op (db->cls, scls->channel_key,
546 pmod->oper, name, value, value_size);
547 }
548 else
549 {
550 scls->mod_oper = pmod->oper;
551 scls->mod_name = GNUNET_malloc (name_size);
552 memcpy (scls->mod_name, name, name_size);
553
554 scls->mod_value_size = value_size;
555 scls->mod_value = GNUNET_malloc (scls->mod_value_size);
556 scls->mod_value_remaining
557 = scls->mod_value_size - (psize - sizeof (*pmod) - name_size);
558 memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining);
559 }
560 }
561 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
562 break;
563 }
564
565 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
566 if (GNUNET_ENV_OP_SET != scls->mod_oper)
567 {
568 if (scls->mod_value_remaining == 0)
569 {
570 GNUNET_break_op (0);
571 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
572 }
573 psize = ntohs (msg->size);
574 memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining),
575 &msg[1], psize - sizeof (*msg));
576 scls->mod_value_remaining -= psize - sizeof (*msg);
577 if (0 == scls->mod_value_remaining)
578 {
579 db->state_modify_op (db->cls, scls->channel_key,
580 scls->mod_oper, scls->mod_name,
581 scls->mod_value, scls->mod_value_size);
582 GNUNET_free (scls->mod_name);
583 GNUNET_free (scls->mod_value);
584 scls->mod_oper = 0;
585 scls->mod_name = NULL;
586 scls->mod_value = NULL;
587 scls->mod_value_size = 0;
588 }
589 }
590 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
591 break;
592
593 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
594 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
595 break;
596
597 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
598 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
599 break;
600
601 default:
602 scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
603 }
604}
605
606
607static int
608recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
609 enum GNUNET_PSYCSTORE_MessageFlags flags)
610{
611 struct StateModifyClosure *scls = cls;
612
613 if (NULL == scls->recv)
614 {
615 scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part,
616 scls);
617 }
618
619 const struct GNUNET_PSYC_MessageHeader *
620 pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1];
621 GNUNET_PSYC_receive_message (scls->recv, pmsg);
622
623 return GNUNET_YES;
624}
625
626
497static void 627static void
498handle_state_modify (void *cls, 628handle_state_modify (void *cls,
499 struct GNUNET_SERVER_Client *client, 629 struct GNUNET_SERVER_Client *client,
@@ -502,65 +632,36 @@ handle_state_modify (void *cls,
502 const struct StateModifyRequest *req 632 const struct StateModifyRequest *req
503 = (const struct StateModifyRequest *) msg; 633 = (const struct StateModifyRequest *) msg;
504 634
505 int ret = GNUNET_SYSERR; 635 uint64_t message_id = GNUNET_ntohll (req->message_id);
506 const char *name = (const char *) &req[1]; 636 uint64_t state_delta = GNUNET_ntohll (req->state_delta);
507 uint16_t name_size = ntohs (req->name_size); 637 uint64_t ret_frags = 0;
508 638
509 if (name_size <= 2 || '\0' != name[name_size - 1]) 639 struct StateModifyClosure scls = { 0 };
640
641 if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
642 message_id, state_delta))
510 { 643 {
511 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 644 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
512 _("Tried to set invalid state variable name!\n")); 645 _("Failed to begin modifying state!\n"));
513 GNUNET_break_op (0); 646 GNUNET_break (0);
514 } 647 }
515 else
516 {
517 ret = GNUNET_OK;
518 648
519 if (req->flags & STATE_OP_FIRST) 649 int ret = db->message_get (db->cls, &req->channel_key,
520 { 650 message_id, message_id,
521 ret = db->state_modify_begin (db->cls, &req->channel_key, 651 &ret_frags, &recv_state_fragment, &scls);
522 GNUNET_ntohll (req->message_id),
523 GNUNET_ntohll (req->state_delta));
524 }
525 if (ret != GNUNET_OK)
526 {
527 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528 _("Failed to begin modifying state!\n"));
529 }
530 else
531 {
532 switch (req->oper)
533 {
534 case GNUNET_ENV_OP_ASSIGN:
535 ret = db->state_modify_set (db->cls, &req->channel_key,
536 (const char *) &req[1],
537 name + ntohs (req->name_size),
538 ntohs (req->header.size) - sizeof (*req)
539 - ntohs (req->name_size));
540 break;
541 default:
542#if TODO
543 ret = GNUNET_ENV_operation ((const char *) &req[1],
544 current_value, current_value_size,
545 req->oper, name + ntohs (req->name_size),
546 ntohs (req->header.size) - sizeof (*req)
547 - ntohs (req->name_size), &value, &value_size);
548#endif
549 ret = GNUNET_SYSERR;
550 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
551 _("Unknown operator: %c\n"), req->oper);
552 }
553 }
554 652
555 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) 653 if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id))
556 { 654 {
557 ret = db->state_modify_end (db->cls, &req->channel_key, 655 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
558 GNUNET_ntohll (req->message_id)); 656 _("Failed to end modifying state!\n"));
559 if (ret != GNUNET_OK) 657 GNUNET_break (0);
560 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
561 _("Failed to end modifying state!\n"));
562 }
563 } 658 }
659
660 if (NULL != scls.recv)
661 {
662 GNUNET_PSYC_receive_destroy (scls.recv);
663 }
664
564 send_result_code (client, req->op_id, ret, NULL); 665 send_result_code (client, req->op_id, ret, NULL);
565 GNUNET_SERVER_receive_done (client, GNUNET_OK); 666 GNUNET_SERVER_receive_done (client, GNUNET_OK);
566} 667}
@@ -600,16 +701,17 @@ handle_state_sync (void *cls,
600 } 701 }
601 else 702 else
602 { 703 {
603 ret = db->state_sync_set (db->cls, &req->channel_key, name, 704 ret = db->state_sync_assign (db->cls, &req->channel_key, name,
604 name + ntohs (req->name_size), 705 name + ntohs (req->name_size),
605 ntohs (req->header.size) - sizeof (*req) 706 ntohs (req->header.size) - sizeof (*req)
606 - ntohs (req->name_size)); 707 - ntohs (req->name_size));
607 } 708 }
608 709
609 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) 710 if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
610 { 711 {
611 ret = db->state_sync_end (db->cls, &req->channel_key, 712 ret = db->state_sync_end (db->cls, &req->channel_key,
612 GNUNET_ntohll (req->message_id)); 713 GNUNET_ntohll (req->max_state_message_id),
714 GNUNET_ntohll (req->state_hash_message_id));
613 if (ret != GNUNET_OK) 715 if (ret != GNUNET_OK)
614 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 716 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
615 _("Failed to end synchronizing state!\n")); 717 _("Failed to end synchronizing state!\n"));