From d38544730123a1f365ef287a7e88060d97b266b7 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Sat, 18 Jul 2015 00:03:06 +0000 Subject: psyc/store: apply state modifiers --- src/psycstore/gnunet-service-psycstore.c | 218 +++++++++++++++++++++++-------- 1 file changed, 160 insertions(+), 58 deletions(-) (limited to 'src/psycstore/gnunet-service-psycstore.c') diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 556712df4..6e40e7849 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c @@ -32,6 +32,7 @@ #include "gnunet_constants.h" #include "gnunet_protocols.h" #include "gnunet_statistics_service.h" +#include "gnunet_psyc_util_lib.h" #include "gnunet_psycstore_service.h" #include "gnunet_psycstore_plugin.h" #include "psycstore.h" @@ -493,7 +494,136 @@ handle_counters_get (void *cls, } -/** @todo FIXME: stop processing further state modify messages after an error */ +struct StateModifyClosure +{ + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; + struct GNUNET_PSYC_ReceiveHandle *recv; + enum GNUNET_PSYC_MessageState msg_state; + char mod_oper; + char *mod_name; + char *mod_value; + uint64_t mod_value_size; + uint64_t mod_value_remaining; +}; + + +static void +recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, + uint32_t flags, const struct GNUNET_MessageHeader *msg) +{ + struct StateModifyClosure *scls = cls; + uint16_t psize; + if (NULL == msg) + { + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + return; + } + + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + { + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; + break; + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + { + struct GNUNET_PSYC_MessageModifier * + pmod = (struct GNUNET_PSYC_MessageModifier *) msg; + psize = ntohs (pmod->header.size); + uint16_t name_size = ntohs (pmod->name_size); + uint16_t value_size = ntohs (pmod->value_size); + + const char *name = (const char *) &pmod[1]; + const void *value = name + name_size; + + if (GNUNET_ENV_OP_SET != pmod->oper) + { // Apply non-transient operation. + if (psize == sizeof (*pmod) + name_size + value_size) + { + db->state_modify_op (db->cls, scls->channel_key, + pmod->oper, name, value, value_size); + } + else + { + scls->mod_oper = pmod->oper; + scls->mod_name = GNUNET_malloc (name_size); + memcpy (scls->mod_name, name, name_size); + + scls->mod_value_size = value_size; + scls->mod_value = GNUNET_malloc (scls->mod_value_size); + scls->mod_value_remaining + = scls->mod_value_size - (psize - sizeof (*pmod) - name_size); + memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining); + } + } + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; + break; + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: + if (GNUNET_ENV_OP_SET != scls->mod_oper) + { + if (scls->mod_value_remaining == 0) + { + GNUNET_break_op (0); + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + } + psize = ntohs (msg->size); + memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining), + &msg[1], psize - sizeof (*msg)); + scls->mod_value_remaining -= psize - sizeof (*msg); + if (0 == scls->mod_value_remaining) + { + db->state_modify_op (db->cls, scls->channel_key, + scls->mod_oper, scls->mod_name, + scls->mod_value, scls->mod_value_size); + GNUNET_free (scls->mod_name); + GNUNET_free (scls->mod_value); + scls->mod_oper = 0; + scls->mod_name = NULL; + scls->mod_value = NULL; + scls->mod_value_size = 0; + } + } + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; + break; + + default: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + } +} + + +static int +recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct StateModifyClosure *scls = cls; + + if (NULL == scls->recv) + { + scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part, + scls); + } + + const struct GNUNET_PSYC_MessageHeader * + pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1]; + GNUNET_PSYC_receive_message (scls->recv, pmsg); + + return GNUNET_YES; +} + + static void handle_state_modify (void *cls, struct GNUNET_SERVER_Client *client, @@ -502,65 +632,36 @@ handle_state_modify (void *cls, const struct StateModifyRequest *req = (const struct StateModifyRequest *) msg; - int ret = GNUNET_SYSERR; - const char *name = (const char *) &req[1]; - uint16_t name_size = ntohs (req->name_size); + uint64_t message_id = GNUNET_ntohll (req->message_id); + uint64_t state_delta = GNUNET_ntohll (req->state_delta); + uint64_t ret_frags = 0; - if (name_size <= 2 || '\0' != name[name_size - 1]) + struct StateModifyClosure scls = { 0 }; + + if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key, + message_id, state_delta)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Tried to set invalid state variable name!\n")); - GNUNET_break_op (0); + _("Failed to begin modifying state!\n")); + GNUNET_break (0); } - else - { - ret = GNUNET_OK; - if (req->flags & STATE_OP_FIRST) - { - ret = db->state_modify_begin (db->cls, &req->channel_key, - GNUNET_ntohll (req->message_id), - GNUNET_ntohll (req->state_delta)); - } - if (ret != GNUNET_OK) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to begin modifying state!\n")); - } - else - { - switch (req->oper) - { - case GNUNET_ENV_OP_ASSIGN: - ret = db->state_modify_set (db->cls, &req->channel_key, - (const char *) &req[1], - name + ntohs (req->name_size), - ntohs (req->header.size) - sizeof (*req) - - ntohs (req->name_size)); - break; - default: -#if TODO - ret = GNUNET_ENV_operation ((const char *) &req[1], - current_value, current_value_size, - req->oper, name + ntohs (req->name_size), - ntohs (req->header.size) - sizeof (*req) - - ntohs (req->name_size), &value, &value_size); -#endif - ret = GNUNET_SYSERR; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Unknown operator: %c\n"), req->oper); - } - } + int ret = db->message_get (db->cls, &req->channel_key, + message_id, message_id, + &ret_frags, &recv_state_fragment, &scls); - if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) - { - ret = db->state_modify_end (db->cls, &req->channel_key, - GNUNET_ntohll (req->message_id)); - if (ret != GNUNET_OK) - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to end modifying state!\n")); - } + if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to end modifying state!\n")); + GNUNET_break (0); } + + if (NULL != scls.recv) + { + GNUNET_PSYC_receive_destroy (scls.recv); + } + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -600,16 +701,17 @@ handle_state_sync (void *cls, } else { - ret = db->state_sync_set (db->cls, &req->channel_key, name, - name + ntohs (req->name_size), - ntohs (req->header.size) - sizeof (*req) - - ntohs (req->name_size)); + ret = db->state_sync_assign (db->cls, &req->channel_key, name, + name + ntohs (req->name_size), + ntohs (req->header.size) - sizeof (*req) + - ntohs (req->name_size)); } if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) { ret = db->state_sync_end (db->cls, &req->channel_key, - GNUNET_ntohll (req->message_id)); + GNUNET_ntohll (req->max_state_message_id), + GNUNET_ntohll (req->state_hash_message_id)); if (ret != GNUNET_OK) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to end synchronizing state!\n")); -- cgit v1.2.3