From f78e9753a91497f1deb5e20d10868c27ab4a6013 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Mon, 16 Sep 2013 04:59:05 +0000 Subject: PSYCstore service and API implementation --- src/psycstore/psycstore_api.c | 1145 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 1026 insertions(+), 119 deletions(-) (limited to 'src/psycstore/psycstore_api.c') diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 5847fc852..5b9bb7e89 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -1,22 +1,22 @@ /* - This file is part of GNUnet. - (C) 2013 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public Liceidentity as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public Liceidentity for more details. - - You should have received a copy of the GNU General Public Liceidentity - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ + * This file is part of GNUnet + * (C) 2013 Christian Grothoff (and other contributing authors) + * + * GNUnet is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 3, or (at your + * option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNUnet; see the file COPYING. If not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ /** * @file psycstore/psycstore_api.c @@ -30,10 +30,12 @@ #include "gnunet_constants.h" #include "gnunet_protocols.h" #include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" #include "psycstore.h" #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) +typedef void (*DataCallback) (); /** * Handle for an operation with the PSYCstore service. @@ -45,7 +47,7 @@ struct GNUNET_PSYCSTORE_OperationHandle * Main PSYCstore handle. */ struct GNUNET_PSYCSTORE_Handle *h; - + /** * We keep operations in a DLL. */ @@ -56,32 +58,31 @@ struct GNUNET_PSYCSTORE_OperationHandle */ struct GNUNET_PSYCSTORE_OperationHandle *prev; - /** - * Message to send to the PSYCstore service. - * Allocated at the end of this struct. - */ - const struct GNUNET_MessageHeader *msg; - /** * Continuation to invoke with the result of an operation. */ GNUNET_PSYCSTORE_ResultCallback res_cb; /** - * Continuation to invoke with the result of an operation returning a fragment. + * Continuation to invoke with the result of an operation returning data. */ - GNUNET_PSYCSTORE_FragmentCallback frag_cb; + DataCallback data_cb; /** - * Continuation to invoke with the result of an operation returning a state variable. + * Closure for the callbacks. */ - GNUNET_PSYCSTORE_StateCallback state_cb; + void *cls; /** - * Closure for the callbacks. + * Operation ID. */ - void *cls; + uint32_t op_id; + /** + * Message to send to the PSYCstore service. + * Allocated at the end of this struct. + */ + const struct GNUNET_MessageHeader *msg; }; @@ -101,13 +102,23 @@ struct GNUNET_PSYCSTORE_Handle struct GNUNET_CLIENT_Connection *client; /** - * Head of active operations. - */ + * Head of operations to transmit. + */ + struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; + + /** + * Tail of operations to transmit. + */ + struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail; + + /** + * Head of active operations waiting for response. + */ struct GNUNET_PSYCSTORE_OperationHandle *op_head; /** - * Tail of active operations. - */ + * Tail of active operations waiting for response. + */ struct GNUNET_PSYCSTORE_OperationHandle *op_tail; /** @@ -130,9 +141,46 @@ struct GNUNET_PSYCSTORE_Handle */ int in_receive; + /** + * The last operation id used for a PSYCstore operation. + */ + uint32_t last_op_id_used; + }; +/** + * Get a fresh operation ID to distinguish between PSYCstore requests. + * + * @param h Handle to the PSYCstore service. + * @return next operation id to use + */ +static uint32_t +get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) +{ + return h->last_op_id_used++; +} + + +/** + * Find operation by ID. + * + * @return OperationHandle if found, or NULL otherwise. + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; + while (NULL != op) + { + if (op->op_id == op_id) + return op; + op = op->next; + } + return NULL; +} + + /** * Try again to connect to the PSYCstore service. * @@ -174,6 +222,15 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) } +/** + * Schedule transmission of the next message from our queue. + * + * @param h PSYCstore handle + */ +static void +transmit_next (struct GNUNET_PSYCSTORE_Handle *h); + + /** * Type of a function to call when we receive a message * from the service. @@ -182,12 +239,16 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) * @param msg message received, NULL on timeout or fatal error */ static void -message_handler (void *cls, +message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_PSYCSTORE_Handle *h = cls; struct GNUNET_PSYCSTORE_OperationHandle *op; - const struct ResultCodeMessage *rcm; + const struct OperationResult *opres; + const struct MasterCountersResult *mcres; + const struct SlaveCountersResult *scres; + const struct FragmentResult *fres; + const struct StateResult *sres; const char *str; uint16_t size; @@ -203,68 +264,240 @@ message_handler (void *cls, switch (ntohs (msg->type)) { case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: - if (size < sizeof (struct ResultCodeMessage)) + if (size < sizeof (struct OperationResult)) { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected >= %lu\n", + ntohs (msg->type), size, sizeof (struct OperationResult)); GNUNET_break (0); reschedule_connect (h); return; } - rcm = (const struct ResultCodeMessage *) msg; - str = (const char *) &rcm[1]; - if ( (size > sizeof (struct ResultCodeMessage)) && - ('\0' != str[size - sizeof (struct ResultCodeMessage) - 1]) ) + + opres = (const struct OperationResult *) msg; + str = (const char *) &opres[1]; + if ( (size > sizeof (struct OperationResult)) && + ('\0' != str[size - sizeof (struct OperationResult) - 1]) ) { GNUNET_break (0); reschedule_connect (h); return; } - if (size == sizeof (struct ResultCodeMessage)) + if (size == sizeof (struct OperationResult)) str = NULL; - op = h->op_head; - GNUNET_CONTAINER_DLL_remove (h->op_head, - h->op_tail, - op); - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - if (NULL != op->res_cb) - op->res_cb (op->cls, rcm->result_code , str); - GNUNET_free (op); + op = find_op_by_id (h, ntohl (opres->op_id)); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received result of an unkown operation ID: %ld\n", + ntohl (opres->op_id)); + } + else + { + GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); + if (NULL != op->res_cb) + { + const struct StateModifyRequest *smreq; + const struct StateSyncRequest *ssreq; + switch (ntohs (op->msg->type)) + { + case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY: + smreq = (const struct StateModifyRequest *) op->msg; + if (!(smreq->flags & STATE_OP_LAST + || GNUNET_OK != ntohl (opres->result_code))) + op->res_cb = NULL; + break; + case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: + ssreq = (const struct StateSyncRequest *) op->msg; + if (!(ssreq->flags & STATE_OP_LAST + || GNUNET_OK != ntohl (opres->result_code))) + op->res_cb = NULL; + break; + } + } + if (NULL != op->res_cb) + op->res_cb (op->cls, ntohl (opres->result_code), str); + GNUNET_free (op); + } + break; + + case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER: + if (size != sizeof (struct MasterCountersResult)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected %lu\n", + ntohs (msg->type), size, sizeof (struct MasterCountersResult)); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + mcres = (const struct MasterCountersResult *) msg; + + op = find_op_by_id (h, ntohl (mcres->op_id)); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received result of an unkown operation ID: %ld\n", + ntohl (mcres->op_id)); + } + else + { + GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); + if (NULL != op->data_cb) + ((GNUNET_PSYCSTORE_MasterCountersCallback) + op->data_cb) (op->cls, + GNUNET_ntohll (mcres->fragment_id), + GNUNET_ntohll (mcres->message_id), + GNUNET_ntohll (mcres->group_generation)); + GNUNET_free (op); + } break; + + case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE: + if (size != sizeof (struct SlaveCountersResult)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected %lu\n", + ntohs (msg->type), size, sizeof (struct SlaveCountersResult)); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + scres = (const struct SlaveCountersResult *) msg; + + op = find_op_by_id (h, ntohl (scres->op_id)); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received result of an unkown operation ID: %ld\n", + ntohl (scres->op_id)); + } + else + { + GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); + if (NULL != op->data_cb) + ((GNUNET_PSYCSTORE_SlaveCountersCallback) + op->data_cb) (op->cls, GNUNET_ntohll (scres->max_known_msg_id)); + GNUNET_free (op); + } + break; + + case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT: + if (size < sizeof (struct FragmentResult)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected >= %lu\n", + ntohs (msg->type), size, sizeof (struct FragmentResult)); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + fres = (const struct FragmentResult *) msg; + struct GNUNET_MULTICAST_MessageHeader *mmsg = + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; + if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected = %lu\n", + ntohs (msg->type), size, + sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + op = find_op_by_id (h, ntohl (fres->op_id)); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received result of an unkown operation ID: %ld\n", + ntohl (fres->op_id)); + } + else + { + if (NULL != op->data_cb) + ((GNUNET_PSYCSTORE_FragmentCallback) + op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags)); + } + break; + + case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: + if (size < sizeof (struct StateResult)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message of type %d with length %lu bytes. " + "Expected >= %lu\n", + ntohs (msg->type), size, sizeof (struct StateResult)); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + sres = (const struct StateResult *) msg; + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); + + if (name_size <= 2 || '\0' != name[name_size - 1]) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received state result message (type %d) with invalid name.\n", + ntohs (msg->type), name_size, name); + GNUNET_break (0); + reschedule_connect (h); + return; + } + + op = find_op_by_id (h, ntohl (sres->op_id)); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received result of an unkown operation ID: %ld\n", + ntohl (sres->op_id)); + } + else + { + if (NULL != op->data_cb) + ((GNUNET_PSYCSTORE_StateCallback) + op->data_cb) (op->cls, name, (void *) &sres[1] + name_size, + ntohs (sres->header.size) - sizeof (*sres) - name_size); + } + break; + default: GNUNET_break (0); reschedule_connect (h); return; } -} - -/** - * Schedule transmission of the next message from our queue. - * - * @param h PSYCstore handle - */ -static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h); + GNUNET_CLIENT_receive (h->client, &message_handler, h, + GNUNET_TIME_UNIT_FOREVER_REL); +} /** * Transmit next message to service. * - * @param cls the 'struct GNUNET_PSYCSTORE_Handle'. - * @param size number of bytes available in buf - * @param buf where to copy the message - * @return number of bytes copied to buf + * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. + * @param size Number of bytes available in buf. + * @param buf Where to copy the message. + * @return Number of bytes copied to buf. */ static size_t -send_next_message (void *cls, - size_t size, - void *buf) +send_next_message (void *cls, size_t size, void *buf) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; + struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; size_t ret; - + h->th = NULL; if (NULL == op) return 0; @@ -273,26 +506,30 @@ send_next_message (void *cls, { reschedule_connect (h); return 0; - } + } LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %d to PSYCstore service\n", ntohs (op->msg->type)); memcpy (buf, op->msg, ret); - if ( (NULL == op->res_cb) && - (NULL == op->frag_cb) && - (NULL == op->state_cb)) + + GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); + + if (NULL == op->res_cb && NULL == op->data_cb) { - GNUNET_CONTAINER_DLL_remove (h->op_head, - h->op_tail, - op); GNUNET_free (op); - transmit_next (h); } + else + { + GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); + } + + if (NULL != h->transmit_head) + transmit_next (h); + if (GNUNET_NO == h->in_receive) { h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - &message_handler, h, + GNUNET_CLIENT_receive (h->client, &message_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); } return ret; @@ -302,18 +539,18 @@ send_next_message (void *cls, /** * Schedule transmission of the next message from our queue. * - * @param h PSYCstore handle + * @param h PSYCstore handle. */ static void transmit_next (struct GNUNET_PSYCSTORE_Handle *h) { - struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; + if (NULL != h->th || NULL == h->client) + return; - GNUNET_assert (NULL == h->th); + struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; if (NULL == op) return; - if (NULL == h->client) - return; + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, ntohs (op->msg->size), GNUNET_TIME_UNIT_FOREVER_REL, @@ -326,8 +563,8 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h) /** * Try again to connect to the PSYCstore service. * - * @param cls the handle to the PSYCstore service - * @param tc scheduler context + * @param cls Handle to the PSYCstore service. + * @param tc Scheduler context. */ static void reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) @@ -347,8 +584,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Connect to the PSYCstore service. * - * @param cfg the configuration to use - * @return handle to use + * @param cfg The configuration to use + * @return Handle to use */ struct GNUNET_PSYCSTORE_Handle * GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) @@ -366,7 +603,7 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) /** * Disconnect from PSYCstore service * - * @param h handle to destroy + * @param h Handle to destroy */ void GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) @@ -405,13 +642,10 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) { struct GNUNET_PSYCSTORE_Handle *h = op->h; - if ( (h->op_head != op) || - (NULL == h->client) ) + if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) { /* request not active, can simply remove */ - GNUNET_CONTAINER_DLL_remove (h->op_head, - h->op_tail, - op); + GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); GNUNET_free (op); return; } @@ -420,47 +654,720 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) /* request active but not yet with service, can still abort */ GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); h->th = NULL; - GNUNET_CONTAINER_DLL_remove (h->op_head, - h->op_tail, - op); + GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); GNUNET_free (op); transmit_next (h); return; } /* request active with service, simply ensure continuations are not called */ op->res_cb = NULL; - op->frag_cb = NULL; - op->state_cb = NULL; + op->data_cb = NULL; +} + + +/** + * Store join/leave events for a PSYC channel in order to be able to answer + * membership test queries later. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel where the event happened. + * @param slave_key Public key of joining/leaving slave. + * @param did_join #GNUNET_YES on join, #GNUNET_NO on part. + * @param announced_at ID of the message that announced the membership change. + * @param effective_since Message ID this membership change is in effect since. + * For joins it is <= announced_at, for parts it is always 0. + * @param group_generation In case of a part, the last group generation the + * slave has access to. It has relevance when a larger message have + * fragments with different group generations. + * @param rcb Callback to call with the result of the storage operation. + * @param rcb_cls Closure for the callback. + * + * @return Operation handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + int did_join, + uint64_t announced_at, + uint64_t effective_since, + uint64_t group_generation, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != channel_key); + GNUNET_assert (NULL != slave_key); + GNUNET_assert (did_join + ? effective_since <= announced_at + : effective_since == 0); + + struct MembershipStoreRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct MembershipStoreRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->slave_key = *slave_key; + req->did_join = htonl (did_join); + req->announced_at = GNUNET_htonll (announced_at); + req->effective_since = GNUNET_htonll (effective_since); + req->group_generation = GNUNET_htonll (group_generation); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; } +/** + * Test if a member was admitted to the channel at the given message ID. + * + * This is useful when relaying and replaying messages to check if a particular + * slave has access to the message fragment with a given group generation. It + * is also used when handling join requests to determine whether the slave is + * currently admitted to the channel. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param slave_key Public key of slave whose membership to check. + * @param message_id Message ID for which to do the membership test. + * @param group_generation Group generation of the fragment of the message to + * test. It has relevance if the message consists of multiple fragments + * with different group generations. + * @param rcb Callback to call with the test result. + * @param rcb_cls Closure for the callback. + * + * @return Operation handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + uint64_t message_id, + uint64_t group_generation, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + struct MembershipTestRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct MembershipTestRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->slave_key = *slave_key; + req->message_id = GNUNET_htonll (message_id); + req->group_generation = GNUNET_htonll (group_generation); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Store a message fragment sent to a channel. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel the message belongs to. + * @param message Message to store. + * @param psycstore_flags Flags indicating whether the PSYC message contains + * state modifiers. + * @param rcb Callback to call with the result of the operation. + * @param rcb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + const struct GNUNET_MULTICAST_MessageHeader *message, + uint32_t psycstore_flags, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + uint16_t size = ntohs (message->header.size); + struct FragmentStoreRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct FragmentStoreRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); + req->header.size = htons (sizeof (*req) + size); + req->channel_key = *channel_key; + req->psycstore_flags = htonl (psycstore_flags); + memcpy (&req[1], message, size); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve a message fragment by fragment ID. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment. + * @param fcb Callback to call with the retrieved fragments. + * @param rcb Callback to call with the result of the operation. + * @param cls Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t fragment_id, + GNUNET_PSYCSTORE_FragmentCallback fcb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + struct FragmentGetRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = (DataCallback) fcb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct FragmentGetRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->fragment_id = GNUNET_htonll (fragment_id); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve all fragments of a message. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param message_id Message ID to check. Use 0 to get the latest message. + * @param fcb Callback to call with the retrieved fragments. + * @param rcb Callback to call with the result of the operation. + * @param cls Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ struct GNUNET_PSYCSTORE_OperationHandle * -GNUNET_PSYCSTORE_membership_store ( - struct GNUNET_PSYCSTORE_Handle *h, - const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, - int did_join, - uint64_t announced_at, - uint64_t effective_since, - uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) +GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t message_id, + GNUNET_PSYCSTORE_FragmentCallback fcb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) { - + struct MessageGetRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = (DataCallback) fcb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct MessageGetRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve a fragment of message specified by its message ID and fragment + * offset. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param message_id Message ID to check. Use 0 to get the latest message. + * @param fragment_offset Offset of the fragment to retrieve. + * @param fcb Callback to call with the retrieved fragments. + * @param rcb Callback to call with the result of the operation. + * @param cls Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t message_id, + uint64_t fragment_offset, + GNUNET_PSYCSTORE_FragmentCallback fcb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + struct MessageGetFragmentRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = (DataCallback) fcb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct MessageGetFragmentRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->fragment_offset = GNUNET_htonll (fragment_offset); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve latest values of counters for a channel master. + * + * The current value of counters are needed when a channel master is restarted, + * so that it can continue incrementing the counters from their last value. + * + * @param h Handle for the PSYCstore. + * @param channel_key Public key that identifies the channel. + * @param mccb Callback to call with the result. + * @param mccb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + GNUNET_PSYCSTORE_MasterCountersCallback mccb, + void *mccb_cls) +{ + struct OperationRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = mccb; + op->cls = mccb_cls; + + req = (struct OperationRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; } + +/** + * Retrieve latest values of counters for a channel slave. + * + * The current value of counters are needed when a channel slave rejoins + * and starts the state synchronization process. + * + * @param h Handle for the PSYCstore. + * @param channel_key Public key that identifies the channel. + * @param sccb Callback to call with the result. + * @param sccb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ struct GNUNET_PSYCSTORE_OperationHandle * -GNUNET_PSYCSTORE_membership_test ( - struct GNUNET_PSYCSTORE_Handle *h, - const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, - uint64_t message_id, - uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) +GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + GNUNET_PSYCSTORE_SlaveCountersCallback sccb, + void *sccb_cls) { + struct OperationRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = sccb; + op->cls = sccb_cls; + + req = (struct OperationRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Apply modifiers of a message to the current channel state. + * + * An error is returned if there are missing messages containing state + * operations before the current one. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param message_id ID of the message that contains the @a modifiers. + * @param state_delta Value of the _state_delta PSYC header variable of the message. + * @param modifier_count Number of elements in the @a modifiers array. + * @param modifiers List of modifiers to apply. + * @param rcb Callback to call with the result of the operation. + * @param rcb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t message_id, + uint64_t state_delta, + size_t modifier_count, + const struct GNUNET_ENV_Modifier *modifiers, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + size_t i; + + for (i = 0; i < modifier_count; i++) { + struct StateModifyRequest *req; + uint16_t name_size = strlen (modifiers[i].name) + 1; + + op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + + modifiers[i].value_size); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct StateModifyRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); + req->header.size = htons (sizeof (*req) + name_size + + modifiers[i].value_size); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->state_delta = GNUNET_htonll (state_delta); + req->oper = modifiers[i].oper; + req->name_size = htons (name_size); + req->flags + = 0 == i + ? STATE_OP_FIRST + : modifier_count - 1 == i + ? STATE_OP_LAST + : 0; + + memcpy (&req[1], modifiers[i].name, name_size); + memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + } + return op; + /* FIXME: only the last operation is returned, + * operation_cancel() should be able to cancel all of them. + */ +} + + +/** + * Store synchronized state. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param message_id ID of the message that contains the state_hash PSYC header variable. + * @param modifier_count Number of elements in the @a modifiers array. + * @param modifiers Full state to store. + * @param rcb Callback to call with the result of the operation. + * @param rcb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t message_id, + size_t modifier_count, + const struct GNUNET_ENV_Modifier *modifiers, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + size_t i; + + for (i = 0; i < modifier_count; i++) { + struct StateSyncRequest *req; + uint16_t name_size = strlen (modifiers[i].name) + 1; + + op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + + modifiers[i].value_size); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct StateSyncRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); + req->header.size = htons (sizeof (*req) + name_size + + modifiers[i].value_size); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->name_size = htons (name_size); + req->flags + = 0 == i + ? STATE_OP_FIRST + : modifier_count - 1 == i + ? STATE_OP_LAST + : 0; + + memcpy (&req[1], modifiers[i].name, name_size); + memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + } + return op; +} + + +/** + * Reset the state of a channel. + * + * Delete all state variables stored for the given channel. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param rcb Callback to call with the result of the operation. + * @param rcb_cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey + *channel_key, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + struct OperationRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct OperationRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + + +/** + * Update signed values of state variables in the state store. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param message_id Message ID that contained the state @a hash. + * @param hash Hash of the serialized full state. + * @param rcb Callback to call with the result of the operation. + * @param rcb_cls Closure for the callback. + * + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + uint64_t message_id, + const struct GNUNET_HashCode *hash, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *rcb_cls) +{ + struct StateHashUpdateRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->res_cb = rcb; + op->cls = rcb_cls; + + req = (struct StateHashUpdateRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->hash = *hash; + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve the best matching state variable. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param name Name of variable to match, the returned variable might be less specific. + * @param scb Callback to return the matching state variable. + * @param rcb Callback to call with the result of the operation. + * @param cls Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + const char *name, + GNUNET_PSYCSTORE_StateCallback scb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + size_t name_size = strlen (name) + 1; + struct OperationRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); + op->h = h; + op->data_cb = (DataCallback) scb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct OperationRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); + req->header.size = htons (sizeof (*req) + name_size); + req->channel_key = *channel_key; + memcpy (&req[1], name, name_size); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + + +/** + * Retrieve all state variables for a channel with the given prefix. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel we are interested in. + * @param name_prefix Prefix of state variable names to match. + * @param scb Callback to return matching state variables. + * @param rcb Callback to call with the result of the operation. + * @param cls Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, + const char *name_prefix, + GNUNET_PSYCSTORE_StateCallback scb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + size_t name_size = strlen (name_prefix) + 1; + struct OperationRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); + op->h = h; + op->data_cb = (DataCallback) scb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct OperationRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); + req->header.size = htons (sizeof (*req) + name_size); + req->channel_key = *channel_key; + memcpy (&req[1], name_prefix, name_size); + + op->op_id = get_next_op_id (h); + req->op_id = htonl (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + return op; } /* end of psycstore_api.c */ -- cgit v1.2.3