/* * This file is part of GNUnet * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License as published * by the Free Software Foundation, either version 3 of the License, * or (at your option) any later version. * * GNUnet is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /** * @file psyc/gnunet-service-psyc.c * @brief PSYC service * @author Gabor X Toth */ #include #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" #include "gnunet_protocols.h" #include "gnunet_statistics_service.h" #include "gnunet_multicast_service.h" #include "gnunet_psycstore_service.h" #include "gnunet_psyc_service.h" #include "gnunet_psyc_util_lib.h" #include "psyc.h" /** * Handle to our current configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Service handle. */ static struct GNUNET_SERVICE_Handle *service; /** * Handle to the statistics service. */ static struct GNUNET_STATISTICS_Handle *stats; /** * Handle to the PSYCstore. */ static struct GNUNET_PSYCSTORE_Handle *store; /** * All connected masters. * Channel's pub_key_hash -> struct Master */ static struct GNUNET_CONTAINER_MultiHashMap *masters; /** * All connected slaves. * Channel's pub_key_hash -> struct Slave */ static struct GNUNET_CONTAINER_MultiHashMap *slaves; /** * Connected slaves per channel. * Channel's pub_key_hash -> Slave's pub_key -> struct Slave */ static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves; /** * Message in the transmission queue. */ struct TransmitMessage { struct TransmitMessage *prev; struct TransmitMessage *next; struct GNUNET_SERVICE_Client *client; /** * ID assigned to the message. */ uint64_t id; /** * Size of message. */ uint16_t size; /** * Type of first message part. */ uint16_t first_ptype; /** * Type of last message part. */ uint16_t last_ptype; /* Followed by message */ }; /** * Cache for received message fragments. * Message fragments are only sent to clients after all modifiers arrived. * * chan_key -> MultiHashMap chan_msgs */ static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; /** * Entry in the chan_msgs hashmap of @a recv_cache: * fragment_id -> RecvCacheEntry */ struct RecvCacheEntry { struct GNUNET_MULTICAST_MessageHeader *mmsg; uint16_t ref_count; }; /** * Entry in the @a recv_frags hash map of a @a Channel. * message_id -> FragmentQueue */ struct FragmentQueue { /** * Fragment IDs stored in @a recv_cache. */ struct GNUNET_CONTAINER_Heap *fragments; /** * Total size of received fragments. */ uint64_t size; /** * Total size of received header fragments (METHOD & MODIFIERs) */ uint64_t header_size; /** * The @a state_delta field from struct GNUNET_PSYC_MessageMethod. */ uint64_t state_delta; /** * The @a flags field from struct GNUNET_PSYC_MessageMethod. */ uint32_t flags; /** * Receive state of message. * * @see MessageFragmentState */ uint8_t state; /** * Whether the state is already modified in PSYCstore. */ uint8_t state_is_modified; /** * Is the message queued for delivery to the client? * i.e. added to the recv_msgs queue */ uint8_t is_queued; }; /** * List of connected clients. */ struct ClientList { struct ClientList *prev; struct ClientList *next; struct GNUNET_SERVICE_Client *client; }; struct Operation { struct Operation *prev; struct Operation *next; struct GNUNET_SERVICE_Client *client; struct Channel *channel; uint64_t op_id; uint32_t flags; }; /** * Common part of the client context for both a channel master and slave. */ struct Channel { struct ClientList *clients_head; struct ClientList *clients_tail; struct Operation *op_head; struct Operation *op_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; /** * Current PSYCstore operation. */ struct GNUNET_PSYCSTORE_OperationHandle *store_op; /** * Received fragments not yet sent to the client. * message_id -> FragmentQueue */ struct GNUNET_CONTAINER_MultiHashMap *recv_frags; /** * Received message IDs not yet sent to the client. */ struct GNUNET_CONTAINER_Heap *recv_msgs; /** * Public key of the channel. */ struct GNUNET_CRYPTO_EddsaPublicKey pub_key; /** * Hash of @a pub_key. */ struct GNUNET_HashCode pub_key_hash; /** * Last message ID sent to the client. * 0 if there is no such message. */ uint64_t max_message_id; /** * ID of the last stateful message, where the state operations has been * processed and saved to PSYCstore and which has been sent to the client. * 0 if there is no such message. */ uint64_t max_state_message_id; /** * Expected value size for the modifier being received from the PSYC service. */ uint32_t tmit_mod_value_size_expected; /** * Actual value size for the modifier being received from the PSYC service. */ uint32_t tmit_mod_value_size; /** * Is this channel ready to receive messages from client? * #GNUNET_YES or #GNUNET_NO */ uint8_t is_ready; /** * Is the client disconnected? * #GNUNET_YES or #GNUNET_NO */ uint8_t is_disconnecting; /** * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? */ uint8_t is_master; union { struct Master *master; struct Slave *slave; }; }; /** * Client context for a channel master. */ struct Master { /** * Channel struct common for Master and Slave */ struct Channel channel; /** * Private key of the channel. */ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; /** * Handle for the multicast origin. */ struct GNUNET_MULTICAST_Origin *origin; /** * Transmit handle for multicast. */ struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle; /** * Incoming join requests from multicast. * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle * */ struct GNUNET_CONTAINER_MultiHashMap *join_reqs; /** * Last message ID transmitted to this channel. * * Incremented before sending a message, thus the message_id in messages sent * starts from 1. */ uint64_t max_message_id; /** * ID of the last message with state operations transmitted to the channel. * 0 if there is no such message. */ uint64_t max_state_message_id; /** * Maximum group generation transmitted to the channel. */ uint64_t max_group_generation; /** * @see enum GNUNET_PSYC_Policy */ enum GNUNET_PSYC_Policy policy; }; /** * Client context for a channel slave. */ struct Slave { /** * Channel struct common for Master and Slave */ struct Channel channel; /** * Private key of the slave. */ struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key; /** * Public key of the slave. */ struct GNUNET_CRYPTO_EcdsaPublicKey pub_key; /** * Hash of @a pub_key. */ struct GNUNET_HashCode pub_key_hash; /** * Handle for the multicast member. */ struct GNUNET_MULTICAST_Member *member; /** * Transmit handle for multicast. */ struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle; /** * Peer identity of the origin. */ struct GNUNET_PeerIdentity origin; /** * Number of items in @a relays. */ uint32_t relay_count; /** * Relays that multicast can use to connect. */ struct GNUNET_PeerIdentity *relays; /** * Join request to be transmitted to the master on join. */ struct GNUNET_PSYC_Message *join_msg; /** * Join decision received from multicast. */ struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn; /** * Maximum request ID for this channel. */ uint64_t max_request_id; /** * Join flags. */ enum GNUNET_PSYC_SlaveJoinFlags join_flags; }; /** * Client context. */ struct Client { struct GNUNET_SERVICE_Client *client; struct Channel *channel; }; struct ReplayRequestKey { uint64_t fragment_id; uint64_t message_id; uint64_t fragment_offset; uint64_t flags; }; static void transmit_message (struct Channel *chn); static uint64_t message_queue_run (struct Channel *chn); static uint64_t message_queue_drop (struct Channel *chn); static void schedule_transmit_message (void *cls) { struct Channel *chn = cls; transmit_message (chn); } /** * Task run during shutdown. * * @param cls unused */ static void shutdown_task (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "shutting down...\n"); GNUNET_PSYCSTORE_disconnect (store); if (NULL != stats) { GNUNET_STATISTICS_destroy (stats, GNUNET_YES); stats = NULL; } } static struct Operation * op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client, uint64_t op_id, uint32_t flags) { struct Operation *op = GNUNET_malloc (sizeof (*op)); op->client = client; op->channel = chn; op->op_id = op_id; op->flags = flags; GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); return op; } static void op_remove (struct Operation *op) { GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op); GNUNET_free (op); } /** * Clean up master data structures after a client disconnected. */ static void cleanup_master (struct Master *mst) { struct Channel *chn = &mst->channel; GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); } /** * Clean up slave data structures after a client disconnected. */ static void cleanup_slave (struct Slave *slv) { struct Channel *chn = &slv->channel; struct GNUNET_CONTAINER_MultiHashMap * chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &chn->pub_key_hash); GNUNET_assert (NULL != chn_slv); GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv); if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv)) { GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash, chn_slv); GNUNET_CONTAINER_multihashmap_destroy (chn_slv); } GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); if (NULL != slv->join_msg) { GNUNET_free (slv->join_msg); slv->join_msg = NULL; } if (NULL != slv->relays) { GNUNET_free (slv->relays); slv->relays = NULL; } GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); } /** * Clean up channel data structures after a client disconnected. */ static void cleanup_channel (struct Channel *chn) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Cleaning up channel %s. master? %u\n", chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master); message_queue_drop (chn); GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags); chn->recv_frags = NULL; if (NULL != chn->store_op) { GNUNET_PSYCSTORE_operation_cancel (chn->store_op); chn->store_op = NULL; } (GNUNET_YES == chn->is_master) ? cleanup_master (chn->master) : cleanup_slave (chn->slave); GNUNET_free (chn); } /** * Called whenever a client is disconnected. * Frees our resources associated with that client. * * @param cls closure * @param client identification of the client * @param app_ctx must match @a client */ static void client_notify_disconnect (void *cls, struct GNUNET_SERVICE_Client *client, void *app_ctx) { struct Client *c = app_ctx; struct Channel *chn = c->channel; GNUNET_free (c); if (NULL == chn) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p User context is NULL in client_notify_disconnect ()\n", chn); GNUNET_break (0); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client %p (%s) disconnected from channel %s\n", chn, client, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); struct ClientList *cli = chn->clients_head; while (NULL != cli) { if (cli->client == client) { GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli); GNUNET_free (cli); break; } cli = cli->next; } struct Operation *op = chn->op_head; while (NULL != op) { if (op->client == client) { op->client = NULL; break; } op = op->next; } if (NULL == chn->clients_head) { /* Last client disconnected. */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Last client (%s) disconnected from channel %s\n", chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); chn->is_disconnecting = GNUNET_YES; cleanup_channel (chn); } } /** * A new client connected. * * @param cls NULL * @param client client to add * @param mq message queue for @a client * @return @a client */ static void * client_notify_connect (void *cls, struct GNUNET_SERVICE_Client *client, struct GNUNET_MQ_Handle *mq) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); struct Client *c = GNUNET_malloc (sizeof (*c)); c->client = client; return c; } /** * Send message to all clients connected to the channel. */ static void client_send_msg (const struct Channel *chn, const struct GNUNET_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message to clients of channel %p.\n", chn); struct ClientList *cli = chn->clients_head; while (NULL != cli) { struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg_copy (msg); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client), env); cli = cli->next; } } /** * Send a result code back to the client. * * @param client * Client that should receive the result code. * @param result_code * Code to transmit. * @param op_id * Operation ID in network byte order. * @param data * Data payload or NULL. * @param data_size * Size of @a data. */ static void client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id, int64_t result_code, const void *data, uint16_t data_size) { struct GNUNET_OperationResultMessage *res; struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg_extra (res, data_size, GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); res->result_code = GNUNET_htonll (result_code); res->op_id = op_id; if (0 < data_size) GNUNET_memcpy (&res[1], data, data_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n", client, GNUNET_ntohll (op_id), result_code, data_size); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } /** * Closure for join_mem_test_cb() */ struct JoinMemTestClosure { struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; struct Channel *channel; struct GNUNET_MULTICAST_JoinHandle *join_handle; struct GNUNET_PSYC_JoinRequestMessage *join_msg; }; /** * Membership test result callback used for join requests. */ static void join_mem_test_cb (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct JoinMemTestClosure *jcls = cls; if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master) { /* Pass on join request to client if this is a master channel */ struct Master *mst = jcls->channel->master; struct GNUNET_HashCode slave_pub_hash; GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key), &slave_pub_hash); GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); client_send_msg (jcls->channel, &jcls->join_msg->header); } else { if (GNUNET_SYSERR == result) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not perform membership test (%.*s)\n", err_msg_size, err_msg); } // FIXME: add relays GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL); } GNUNET_free (jcls->join_msg); GNUNET_free (jcls); } /** * Incoming join request from multicast. */ static void mcast_recv_join_request (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, const struct GNUNET_MessageHeader *join_msg, struct GNUNET_MULTICAST_JoinHandle *jh) { struct Channel *chn = cls; uint16_t join_msg_size = 0; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn); if (NULL != join_msg) { if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type)) { join_msg_size = ntohs (join_msg->size); } else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%p Got join message with invalid type %u.\n", chn, ntohs (join_msg->type)); } } struct GNUNET_PSYC_JoinRequestMessage * req = GNUNET_malloc (sizeof (*req) + join_msg_size); req->header.size = htons (sizeof (*req) + join_msg_size); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); req->slave_pub_key = *slave_pub_key; if (0 < join_msg_size) GNUNET_memcpy (&req[1], join_msg, join_msg_size); struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); jcls->slave_pub_key = *slave_pub_key; jcls->channel = chn; jcls->join_handle = jh; jcls->join_msg = req; GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key, chn->max_message_id, 0, &join_mem_test_cb, jcls); } /** * Join decision received from multicast. */ static void mcast_recv_join_decision (void *cls, int is_admitted, const struct GNUNET_PeerIdentity *peer, uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { struct Slave *slv = cls; struct Channel *chn = &slv->channel; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision: %d\n", slv, is_admitted); if (GNUNET_YES == chn->is_ready) { /* Already admitted */ return; } uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; struct GNUNET_PSYC_JoinDecisionMessage * dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size); dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size); dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); dcsn->is_admitted = htonl (is_admitted); if (0 < join_resp_size) GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); client_send_msg (chn, &dcsn->header); if (GNUNET_YES == is_admitted && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)) { chn->is_ready = GNUNET_YES; } } static int store_recv_fragment_replay (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, enum GNUNET_PSYCSTORE_MessageFlags flags) { struct GNUNET_MULTICAST_ReplayHandle *rh = cls; GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK); return GNUNET_YES; } /** * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. */ static void store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct GNUNET_MULTICAST_ReplayHandle *rh = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", rh, result, err_msg_size, err_msg); switch (result) { case GNUNET_YES: break; case GNUNET_NO: GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_MULTICAST_REC_NOT_FOUND); return; case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED: GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_MULTICAST_REC_ACCESS_DENIED); return; case GNUNET_SYSERR: GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_MULTICAST_REC_INTERNAL_ERROR); return; } /* GNUNET_MULTICAST_replay_response frees 'rh' when passed * an error code, so it must be ensured no further processing * is attempted on 'rh'. Maybe this should be refactored as * it doesn't look very intuitive. --lynX */ GNUNET_MULTICAST_replay_response_end (rh); } /** * Incoming fragment replay request from multicast. */ static void mcast_recv_replay_fragment (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, uint64_t fragment_id, uint64_t flags, struct GNUNET_MULTICAST_ReplayHandle *rh) { struct Channel *chn = cls; GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key, fragment_id, fragment_id, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } /** * Incoming message replay request from multicast. */ static void mcast_recv_replay_message (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, uint64_t message_id, uint64_t fragment_offset, uint64_t flags, struct GNUNET_MULTICAST_ReplayHandle *rh) { struct Channel *chn = cls; GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key, message_id, message_id, 1, NULL, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } /** * Convert an uint64_t in network byte order to a HashCode * that can be used as key in a MultiHashMap */ static inline void hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) { /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ /* TODO: use built-in byte swap functions if available */ n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); *key = (struct GNUNET_HashCode) {}; *((uint64_t *) key) = (n << 32) | (n >> 32); } /** * Convert an uint64_t in host byte order to a HashCode * that can be used as key in a MultiHashMap */ static inline void hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) { #if __BYTE_ORDER == __BIG_ENDIAN hash_key_from_nll (key, n); #elif __BYTE_ORDER == __LITTLE_ENDIAN *key = (struct GNUNET_HashCode) {}; *((uint64_t *) key) = n; #else #error byteorder undefined #endif } /** * Initialize PSYC message header. */ static inline void psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); pmsg->header.size = htons (psize); pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->message_id = mmsg->message_id; pmsg->fragment_offset = mmsg->fragment_offset; pmsg->flags = htonl (flags); GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); } /** * Create a new PSYC message from a multicast message for sending it to clients. */ static inline struct GNUNET_PSYC_MessageHeader * psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); pmsg = GNUNET_malloc (psize); psyc_msg_init (pmsg, mmsg, flags); return pmsg; } /** * Send multicast message to all clients connected to the channel. */ static void client_send_mcast_msg (struct Channel *chn, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id)); struct GNUNET_PSYC_MessageHeader * pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); client_send_msg (chn, &pmsg->header); GNUNET_free (pmsg); } /** * Send multicast request to all clients connected to the channel. */ static void client_send_mcast_req (struct Master *mst, const struct GNUNET_MULTICAST_RequestHeader *req) { struct Channel *chn = &mst->channel; struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t size = ntohs (req->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", chn, GNUNET_ntohll (req->fragment_id), GNUNET_ntohll (req->request_id)); pmsg = GNUNET_malloc (psize); pmsg->header.size = htons (psize); pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->message_id = req->request_id; pmsg->fragment_offset = req->fragment_offset; pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); pmsg->slave_pub_key = req->member_pub_key; GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req)); client_send_msg (chn, &pmsg->header); /* FIXME: save req to PSYCstore so that it can be resent later to clients */ GNUNET_free (pmsg); } /** * Insert a multicast message fragment into the queue belonging to the message. * * @param chn Channel. * @param mmsg Multicast message fragment. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. * @param first_ptype First PSYC message part type in @a mmsg. * @param last_ptype Last PSYC message part type in @a mmsg. */ static void fragment_queue_insert (struct Channel *chn, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint16_t first_ptype, uint16_t last_ptype) { const uint16_t size = ntohs (mmsg->header.size); const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, &chn->pub_key_hash); struct GNUNET_HashCode msg_id_hash; hash_key_from_nll (&msg_id_hash, mmsg->message_id); struct FragmentQueue *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); if (NULL == fragq) { fragq = GNUNET_malloc (sizeof (*fragq)); fragq->state = MSG_FRAG_STATE_HEADER; fragq->fragments = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); if (NULL == chan_msgs) { chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } } struct GNUNET_HashCode frag_id_hash; hash_key_from_nll (&frag_id_hash, mmsg->fragment_id); struct RecvCacheEntry *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); if (NULL == cache_entry) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->message_id), GNUNET_ntohll (mmsg->fragment_id)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p header_size: %" PRIu64 " + %u\n", chn, fragq->header_size, size); cache_entry = GNUNET_malloc (sizeof (*cache_entry)); cache_entry->ref_count = 1; cache_entry->mmsg = GNUNET_malloc (size); GNUNET_memcpy (cache_entry->mmsg, mmsg, size); GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } else { cache_entry->ref_count++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n", chn, GNUNET_ntohll (mmsg->message_id), GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); } if (MSG_FRAG_STATE_HEADER == fragq->state) { if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) { struct GNUNET_PSYC_MessageMethod * pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1]; fragq->state_delta = GNUNET_ntohll (pmeth->state_delta); fragq->flags = ntohl (pmeth->flags); } if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA) { fragq->header_size += size; } else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype || frag_offset == fragq->header_size) { /* header is now complete */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Header of message %" PRIu64 " is complete.\n", chn, GNUNET_ntohll (mmsg->message_id)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Adding message %" PRIu64 " to queue.\n", chn, GNUNET_ntohll (mmsg->message_id)); fragq->state = MSG_FRAG_STATE_DATA; } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->message_id), frag_offset, fragq->header_size); } } switch (last_ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: if (frag_offset == fragq->size) fragq->state = MSG_FRAG_STATE_END; else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->message_id), frag_offset, fragq->size); break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: /* Drop message without delivering to client if it's a single fragment */ fragq->state = (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) ? MSG_FRAG_STATE_DROP : MSG_FRAG_STATE_CANCEL; } switch (fragq->state) { case MSG_FRAG_STATE_DATA: case MSG_FRAG_STATE_END: case MSG_FRAG_STATE_CANCEL: if (GNUNET_NO == fragq->is_queued) { GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, GNUNET_ntohll (mmsg->message_id)); fragq->is_queued = GNUNET_YES; } } fragq->size += size; GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL, GNUNET_ntohll (mmsg->fragment_id)); } /** * Run fragment queue of a message. * * Send fragments of a message in order to client, after all modifiers arrived * from multicast. * * @param chn * Channel. * @param msg_id * ID of the message @a fragq belongs to. * @param fragq * Fragment queue of the message. * @param drop * Drop message without delivering to client? * #GNUNET_YES or #GNUNET_NO. */ static void fragment_queue_run (struct Channel *chn, uint64_t msg_id, struct FragmentQueue *fragq, uint8_t drop) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n", chn, msg_id, fragq->state); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, &chn->pub_key_hash); GNUNET_assert (NULL != chan_msgs); uint64_t frag_id; while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, &frag_id)) { struct GNUNET_HashCode frag_id_hash; hash_key_from_hll (&frag_id_hash, frag_id); struct RecvCacheEntry *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); if (cache_entry != NULL) { if (GNUNET_NO == drop) { client_send_mcast_msg (chn, cache_entry->mmsg, 0); } if (cache_entry->ref_count <= 1) { GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash, cache_entry); GNUNET_free (cache_entry->mmsg); GNUNET_free (cache_entry); } else { cache_entry->ref_count--; } } #if CACHE_AGING_IMPLEMENTED else if (GNUNET_NO == drop) { /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */ } #endif GNUNET_CONTAINER_heap_remove_root (fragq->fragments); } if (MSG_FRAG_STATE_END <= fragq->state) { struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); GNUNET_CONTAINER_heap_destroy (fragq->fragments); GNUNET_free (fragq); } else { fragq->is_queued = GNUNET_NO; } } struct StateModifyClosure { struct Channel *channel; uint64_t msg_id; struct GNUNET_HashCode msg_id_hash; }; void store_recv_state_modify_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct StateModifyClosure *mcls = cls; struct Channel *chn = mcls->channel; uint64_t msg_id = mcls->msg_id; struct FragmentQueue * fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", chn, result, err_msg_size, err_msg); switch (result) { case GNUNET_OK: case GNUNET_NO: if (NULL != fragq) fragq->state_is_modified = GNUNET_YES; if (chn->max_state_message_id < msg_id) chn->max_state_message_id = msg_id; if (chn->max_message_id < msg_id) chn->max_message_id = msg_id; if (NULL != fragq) fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); message_queue_run (chn); break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", chn, result, err_msg_size, err_msg); /** @todo FIXME: handle state_modify error */ } } /** * Run message queue. * * Send messages in queue to client in order after a message has arrived from * multicast, according to the following: * - A message is only sent if all of its modifiers arrived. * - A stateful message is only sent if the previous stateful message * has already been delivered to the client. * * @param chn Channel. * * @return Number of messages removed from queue and sent to client. */ static uint64_t message_queue_run (struct Channel *chn) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Running message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); struct FragmentQueue * fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p No fragq (%p) or header not complete.\n", chn, fragq); break; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Fragment queue entry: state: %u, state delta: " "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); if (MSG_FRAG_STATE_DATA <= fragq->state) { /* Check if there's a missing message before the current one */ if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn); if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) && (chn->max_message_id != msg_id - 1 && chn->max_message_id != msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Out of order message. " "(%" PRIu64 " != %" PRIu64 " - 1)\n", chn, chn->max_message_id, msg_id); break; // FIXME: keep track of messages processed in this queue run, // and only stop after reaching the end } } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn); if (GNUNET_YES != fragq->state_is_modified) { if (msg_id - fragq->state_delta != chn->max_state_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Out of order stateful message. " "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", chn, msg_id, fragq->state_delta, chn->max_state_message_id); break; // FIXME: keep track of messages processed in this queue run, // and only stop after reaching the end } struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); mcls->channel = chn; mcls->msg_id = msg_id; mcls->msg_id_hash = msg_id_hash; /* Apply modifiers to state in PSYCstore */ GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, fragq->state_delta, store_recv_state_modify_result, mcls); break; // continue after asynchronous state modify result } } chn->max_message_id = msg_id; } fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Removed %" PRIu64 " messages from queue.\n", chn, n); return n; } /** * Drop message queue of a channel. * * Remove all messages in queue without sending it to clients. * * @param chn Channel. * * @return Number of messages removed from queue. */ static uint64_t message_queue_drop (struct Channel *chn) { uint64_t n = 0; uint64_t msg_id; while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id); struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); struct FragmentQueue * fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); GNUNET_assert (NULL != fragq); fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Removed %" PRIu64 " messages from queue.\n", chn, n); return n; } /** * Received result of GNUNET_PSYCSTORE_fragment_store(). */ static void store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct Channel *chn = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", chn, result, err_msg_size, err_msg); } /** * Handle incoming message fragment from multicast. * * Store it using PSYCstore and send it to the clients of the channel in order. */ static void mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg) { struct Channel *chn = cls; uint16_t size = ntohs (mmsg->header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received multicast message of size %u. " "fragment_id=%" PRIu64 ", message_id=%" PRIu64 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n", chn, size, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id), GNUNET_ntohll (mmsg->fragment_offset), GNUNET_ntohll (mmsg->flags)); GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, &store_recv_fragment_store_result, chn); uint16_t first_ptype = 0, last_ptype = 0; int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), (const char *) &mmsg[1], &first_ptype, &last_ptype); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Message check result %d, first part type %u, last part type %u\n", chn, check, first_ptype, last_ptype); if (GNUNET_SYSERR == check) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping incoming multicast message with invalid parts.\n", chn); GNUNET_break_op (0); return; } fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); message_queue_run (chn); } /** * Incoming request fragment from multicast for a master. * * @param cls Master. * @param req The request. */ static void mcast_recv_request (void *cls, const struct GNUNET_MULTICAST_RequestHeader *req) { struct Master *mst = cls; uint16_t size = ntohs (req->header.size); char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received multicast request of size %u from %s.\n", mst, size, str); GNUNET_free (str); uint16_t first_ptype = 0, last_ptype = 0; if (GNUNET_SYSERR == GNUNET_PSYC_receive_check_parts (size - sizeof (*req), (const char *) &req[1], &first_ptype, &last_ptype)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping incoming multicast request with invalid parts.\n", mst); GNUNET_break_op (0); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message parts: first: type %u, last: type %u\n", first_ptype, last_ptype); /* FIXME: in-order delivery */ client_send_mcast_req (mst, req); } /** * Response from PSYCstore with the current counter values for a channel master. */ static void store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, uint64_t max_message_id, uint64_t max_group_generation, uint64_t max_state_message_id) { struct Master *mst = cls; struct Channel *chn = &mst->channel; chn->store_op = NULL; struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (result); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) { mst->max_message_id = max_message_id; chn->max_message_id = max_message_id; chn->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; mst->origin = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, mcast_recv_join_request, mcast_recv_replay_fragment, mcast_recv_replay_message, mcast_recv_request, mcast_recv_message, chn); chn->is_ready = GNUNET_YES; } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p GNUNET_PSYCSTORE_counters_get() " "returned %d for channel %s.\n", chn, result, GNUNET_h2s (&chn->pub_key_hash)); } client_send_msg (chn, &res.header); } /** * Response from PSYCstore with the current counter values for a channel slave. */ void store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, uint64_t max_message_id, uint64_t max_group_generation, uint64_t max_state_message_id) { struct Slave *slv = cls; struct Channel *chn = &slv->channel; chn->store_op = NULL; struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (result); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_YES == result || GNUNET_NO == result) { chn->max_message_id = max_message_id; chn->max_state_message_id = max_state_message_id; slv->member = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, &slv->origin, slv->relay_count, slv->relays, &slv->join_msg->header, mcast_recv_join_request, mcast_recv_join_decision, mcast_recv_replay_fragment, mcast_recv_replay_message, mcast_recv_message, chn); if (NULL != slv->join_msg) { GNUNET_free (slv->join_msg); slv->join_msg = NULL; } } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p GNUNET_PSYCSTORE_counters_get() " "returned %d for channel %s.\n", chn, result, GNUNET_h2s (&chn->pub_key_hash)); } client_send_msg (chn, &res.header); } static void channel_init (struct Channel *chn) { chn->recv_msgs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); } /** * Handle a connecting client starting a channel master. */ static void handle_client_master_start (void *cls, const struct MasterStartRequest *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct GNUNET_CRYPTO_EddsaPublicKey pub_key; struct GNUNET_HashCode pub_key_hash; GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key); GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); struct Master * mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); struct Channel *chn; if (NULL == mst) { mst = GNUNET_malloc (sizeof (*mst)); mst->policy = ntohl (req->policy); mst->priv_key = req->channel_key; mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); chn = c->channel = &mst->channel; chn->master = mst; chn->is_master = GNUNET_YES; chn->pub_key = pub_key; chn->pub_key_hash = pub_key_hash; channel_init (chn); GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, store_recv_master_counters, mst); } else { chn = &mst->channel; struct GNUNET_PSYC_CountersResultMessage *res; struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res->result_code = htonl (GNUNET_OK); res->max_message_id = GNUNET_htonll (mst->max_message_id); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as master to channel %s.\n", mst, GNUNET_h2s (&chn->pub_key_hash)); struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); GNUNET_SERVICE_client_continue (client); } static int check_client_slave_join (void *cls, const struct SlaveJoinRequest *req) { return GNUNET_OK; } /** * Handle a connecting client joining as a channel slave. */ static void handle_client_slave_join (void *cls, const struct SlaveJoinRequest *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; uint16_t req_size = ntohs (req->header.size); struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; struct GNUNET_HashCode pub_key_hash, slv_pub_hash; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got join request from client %p\n", client); GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); struct GNUNET_CONTAINER_MultiHashMap * chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); struct Slave *slv = NULL; struct Channel *chn; if (NULL != chn_slv) { slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash); } if (NULL == slv) { slv = GNUNET_malloc (sizeof (*slv)); slv->priv_key = req->slave_key; slv->pub_key = slv_pub_key; slv->pub_key_hash = slv_pub_hash; slv->origin = req->origin; slv->relay_count = ntohl (req->relay_count); slv->join_flags = ntohl (req->flags); const struct GNUNET_PeerIdentity * relays = (const struct GNUNET_PeerIdentity *) &req[1]; uint16_t relay_size = slv->relay_count * sizeof (*relays); uint16_t join_msg_size = 0; if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader) <= req_size) { struct GNUNET_PSYC_Message * join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size); join_msg_size = ntohs (join_msg->header.size); slv->join_msg = GNUNET_malloc (join_msg_size); GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size); } if (sizeof (*req) + relay_size + join_msg_size != req_size) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%u + %u + %u != %u\n", (unsigned int) sizeof (*req), relay_size, join_msg_size, req_size); GNUNET_break (0); GNUNET_SERVICE_client_drop (client); GNUNET_free (slv); return; } if (0 < slv->relay_count) { slv->relays = GNUNET_malloc (relay_size); GNUNET_memcpy (slv->relays, &req[1], relay_size); } chn = c->channel = &slv->channel; chn->slave = slv; chn->is_master = GNUNET_NO; chn->pub_key = req->channel_pub_key; chn->pub_key_hash = pub_key_hash; channel_init (chn); if (NULL == chn_slv) { chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, &store_recv_slave_counters, slv); } else { chn = &slv->channel; struct GNUNET_PSYC_CountersResultMessage *res; struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res->result_code = htonl (GNUNET_OK); res->max_message_id = GNUNET_htonll (chn->max_message_id); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags) { mcast_recv_join_decision (slv, GNUNET_YES, NULL, 0, NULL, NULL); } else if (NULL == slv->member) { slv->member = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, &slv->origin, slv->relay_count, slv->relays, &slv->join_msg->header, &mcast_recv_join_request, &mcast_recv_join_decision, &mcast_recv_replay_fragment, &mcast_recv_replay_message, &mcast_recv_message, chn); if (NULL != slv->join_msg) { GNUNET_free (slv->join_msg); slv->join_msg = NULL; } } else if (NULL != slv->join_dcsn) { struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected as slave to channel %s.\n", client, GNUNET_h2s (&chn->pub_key_hash)); struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); GNUNET_SERVICE_client_continue (client); } struct JoinDecisionClosure { int32_t is_admitted; struct GNUNET_MessageHeader *msg; }; /** * Iterator callback for sending join decisions to multicast. */ static int mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *value) { struct JoinDecisionClosure *jcls = cls; struct GNUNET_MULTICAST_JoinHandle *jh = value; // FIXME: add relays GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); return GNUNET_YES; } static int check_client_join_decision (void *cls, const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) { return GNUNET_OK; } /** * Join decision from client. */ static void handle_client_join_decision (void *cls, const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } GNUNET_assert (GNUNET_YES == chn->is_master); struct Master *mst = chn->master; struct JoinDecisionClosure jcls; jcls.is_admitted = ntohl (dcsn->is_admitted); jcls.msg = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size)) ? (struct GNUNET_MessageHeader *) &dcsn[1] : NULL; struct GNUNET_HashCode slave_pub_hash; GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key), &slave_pub_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision (%d) from client for channel %s..\n", mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p ..and slave %s.\n", mst, GNUNET_h2s (&slave_pub_hash)); GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash, &mcast_send_join_decision, &jcls); GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash); GNUNET_SERVICE_client_continue (client); } static void channel_part_cb (void *cls) { struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_MQ_Envelope *env; env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } static void handle_client_part_request (void *cls, const struct GNUNET_MessageHeader *msg) { struct Client *c = cls; c->channel->is_disconnecting = GNUNET_YES; if (GNUNET_YES == c->channel->is_master) { struct Master *mst = (struct Master *) c->channel; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got part request from master %p\n", mst); GNUNET_assert (NULL != mst->origin); GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client); } else { struct Slave *slv = (struct Slave *) c->channel; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got part request from slave %p\n", slv); GNUNET_assert (NULL != slv->member); GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client); } GNUNET_SERVICE_client_continue (c->client); } /** * Send acknowledgement to a client. * * Sent after a message fragment has been passed on to multicast. * * @param chn The channel struct for the client. */ static void send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client) { struct GNUNET_MessageHeader *res; struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); /* FIXME? */ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } /** * Callback for the transmit functions of multicast. */ static int transmit_notify (void *cls, size_t *data_size, void *data) { struct Channel *chn = cls; struct TransmitMessage *tmit_msg = chn->tmit_head; if (NULL == tmit_msg || *data_size < tmit_msg->size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p transmit_notify: nothing to send.\n", chn); if (NULL != tmit_msg && *data_size < tmit_msg->size) GNUNET_break (0); *data_size = 0; return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size); *data_size = tmit_msg->size; GNUNET_memcpy (data, &tmit_msg[1], *data_size); int ret = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) ? GNUNET_NO : GNUNET_YES; /* FIXME: handle disconnecting clients */ if (NULL != tmit_msg->client) send_message_ack (chn, tmit_msg->client); GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); if (NULL != chn->tmit_head) { GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn); } else if (GNUNET_YES == chn->is_disconnecting && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) { /* FIXME: handle partial message (when still in_transmit) */ GNUNET_free (tmit_msg); return GNUNET_SYSERR; } GNUNET_free (tmit_msg); return ret; } /** * Callback for the transmit functions of multicast. */ static int master_transmit_notify (void *cls, size_t *data_size, void *data) { int ret = transmit_notify (cls, data_size, data); if (GNUNET_YES == ret) { struct Master *mst = cls; mst->tmit_handle = NULL; } return ret; } /** * Callback for the transmit functions of multicast. */ static int slave_transmit_notify (void *cls, size_t *data_size, void *data) { int ret = transmit_notify (cls, data_size, data); if (GNUNET_YES == ret) { struct Slave *slv = cls; slv->tmit_handle = NULL; } return ret; } /** * Transmit a message from a channel master to the multicast group. */ static void master_transmit_message (struct Master *mst) { struct Channel *chn = &mst->channel; struct TransmitMessage *tmit_msg = chn->tmit_head; if (NULL == tmit_msg) return; if (NULL == mst->tmit_handle) { mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id, mst->max_group_generation, &master_transmit_notify, mst); } else { GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); } } /** * Transmit a message from a channel slave to the multicast group. */ static void slave_transmit_message (struct Slave *slv) { if (NULL == slv->channel.tmit_head) return; if (NULL == slv->tmit_handle) { slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id, &slave_transmit_notify, slv); } else { GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); } } static void transmit_message (struct Channel *chn) { chn->is_master ? master_transmit_message (chn->master) : slave_transmit_message (chn->slave); } /** * Queue a message from a channel master for sending to the multicast group. */ static void master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) { if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { tmit_msg->id = ++mst->max_message_id; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: message_id=%" PRIu64 "\n", mst, tmit_msg->id); struct GNUNET_PSYC_MessageMethod *pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET) { pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET); } else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: state_delta=%" PRIu64 "\n", mst, tmit_msg->id - mst->max_state_message_id); pmeth->state_delta = GNUNET_htonll (tmit_msg->id - mst->max_state_message_id); mst->max_state_message_id = tmit_msg->id; } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: state not modified\n", mst); pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); } if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH) { /// @todo add state_hash to PSYC header } } } /** * Queue a message from a channel slave for sending to the multicast group. */ static void slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) { if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { struct GNUNET_PSYC_MessageMethod *pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); tmit_msg->id = ++slv->max_request_id; } } /** * Queue PSYC message parts for sending to multicast. * * @param chn * Channel to send to. * @param client * Client the message originates from. * @param data_size * Size of @a data. * @param data * Concatenated message parts. * @param first_ptype * First message part type in @a data. * @param last_ptype * Last message part type in @a data. */ static struct TransmitMessage * queue_message (struct Channel *chn, struct GNUNET_SERVICE_Client *client, size_t data_size, const void *data, uint16_t first_ptype, uint16_t last_ptype) { struct TransmitMessage * tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size); GNUNET_memcpy (&tmit_msg[1], data, data_size); tmit_msg->client = client; tmit_msg->size = data_size; tmit_msg->first_ptype = first_ptype; tmit_msg->last_ptype = last_ptype; /* FIXME: separate queue per message ID */ GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); chn->is_master ? master_queue_message (chn->master, tmit_msg) : slave_queue_message (chn->slave, tmit_msg); return tmit_msg; } /** * Cancel transmission of current message. * * @param chn Channel to send to. * @param client Client the message originates from. */ static void transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client) { uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; struct GNUNET_MessageHeader msg; msg.size = htons (sizeof (msg)); msg.type = htons (type); queue_message (chn, client, sizeof (msg), &msg, type, type); transmit_message (chn); /* FIXME: cleanup */ } static int check_client_psyc_message (void *cls, const struct GNUNET_MessageHeader *msg) { return GNUNET_OK; } /** * Incoming message from a master or slave client. */ static void handle_client_psyc_message (void *cls, const struct GNUNET_MessageHeader *msg) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received message from client.\n", chn); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); if (GNUNET_YES != chn->is_ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Channel is not ready yet, disconnecting client %p.\n", chn, client); GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } uint16_t size = ntohs (msg->size); if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large: %u < %u.\n", chn, (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, (unsigned int) (size - sizeof (*msg))); GNUNET_break (0); transmit_cancel (chn, client); GNUNET_SERVICE_client_drop (client); return; } uint16_t first_ptype = 0, last_ptype = 0; if (GNUNET_SYSERR == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg), (const char *) &msg[1], &first_ptype, &last_ptype)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Received invalid message part from client.\n", chn); GNUNET_break (0); transmit_cancel (chn, client); GNUNET_SERVICE_client_drop (client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received message with first part type %u and last part type %u.\n", chn, first_ptype, last_ptype); queue_message (chn, client, size - sizeof (*msg), &msg[1], first_ptype, last_ptype); transmit_message (chn); /* FIXME: send a few ACKs even before transmit_notify is called */ GNUNET_SERVICE_client_continue (client); }; /** * Received result of GNUNET_PSYCSTORE_membership_store() */ static void store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n", op->channel, result, (int) err_msg_size, err_msg); if (NULL != op->client) client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); op_remove (op); } /** * Client requests to add/remove a slave in the membership database. */ static void handle_client_membership_store (void *cls, const struct ChannelMembershipStoreRequest *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } struct Operation *op = op_add (chn, client, req->op_id, 0); uint64_t announced_at = GNUNET_ntohll (req->announced_at); uint64_t effective_since = GNUNET_ntohll (req->effective_since); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received membership store request from client.\n", chn); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n", chn, req->did_join, announced_at, effective_since); GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key, req->did_join, announced_at, effective_since, 0, /* FIXME: group_generation */ &store_recv_membership_store_result, op); GNUNET_SERVICE_client_continue (client); } /** * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), * in response to a history request from a client. */ static int store_recv_fragment_history (void *cls, struct GNUNET_MULTICAST_MessageHeader *mmsg, enum GNUNET_PSYCSTORE_MessageFlags flags) { struct Operation *op = cls; if (NULL == op->client) { /* Requesting client already disconnected. */ return GNUNET_NO; } struct Channel *chn = op->channel; struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t msize = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); struct GNUNET_OperationResultMessage * res = GNUNET_malloc (sizeof (*res) + psize); res->header.size = htons (sizeof (*res) + psize); res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); res->op_id = op->op_id; res->result_code = GNUNET_htonll (GNUNET_OK); pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); GNUNET_memcpy (&res[1], pmsg, psize); /** @todo FIXME: send only to requesting client */ client_send_msg (chn, &res->header); GNUNET_free (res); return GNUNET_YES; } /** * Received the result of GNUNET_PSYCSTORE_fragment_get(), * in response to a history request from a client. */ static void store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct Operation *op = cls; if (NULL == op->client) { /* Requesting client already disconnected. */ return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p History replay #%" PRIu64 ": " "PSYCSTORE returned %" PRId64 " (%.*s)\n", op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) { /** @todo Multicast replay request for messages not found locally. */ } client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); op_remove (op); } static int check_client_history_replay (void *cls, const struct GNUNET_PSYC_HistoryRequestMessage *req) { return GNUNET_OK; } /** * Client requests channel history. */ static void handle_client_history_replay (void *cls, const struct GNUNET_PSYC_HistoryRequestMessage *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } uint16_t size = ntohs (req->header.size); const char *method_prefix = (const char *) &req[1]; if (size < sizeof (*req) + 1 || '\0' != method_prefix[size - sizeof (*req) - 1]) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p History replay #%" PRIu64 ": " "invalid method prefix. size: %u < %u?\n", chn, GNUNET_ntohll (req->op_id), size, (unsigned int) sizeof (*req) + 1); GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); if (0 == req->message_limit) { GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, GNUNET_ntohll (req->start_message_id), GNUNET_ntohll (req->end_message_id), 0, method_prefix, &store_recv_fragment_history, &store_recv_fragment_history_result, op); } else { GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, GNUNET_ntohll (req->message_limit), method_prefix, &store_recv_fragment_history, &store_recv_fragment_history_result, op); } GNUNET_SERVICE_client_continue (client); } /** * Received state var from PSYCstore, send it to client. */ static int store_recv_state_var (void *cls, const char *name, const void *value, uint32_t value_size) { struct Operation *op = cls; struct GNUNET_OperationResultMessage *res; struct GNUNET_MQ_Envelope *env; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", op->channel, GNUNET_ntohll (op->op_id), name); if (NULL != name) /* First part */ { uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; struct GNUNET_PSYC_MessageModifier *mod; env = GNUNET_MQ_msg_extra (res, sizeof (*mod) + name_size + value_size, GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); res->op_id = op->op_id; mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; mod->header.size = htons (sizeof (*mod) + name_size + value_size); mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); mod->name_size = htons (name_size); mod->value_size = htonl (value_size); mod->oper = htons (GNUNET_PSYC_OP_ASSIGN); GNUNET_memcpy (&mod[1], name, name_size); GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size); } else /* Continuation */ { struct GNUNET_MessageHeader *mod; env = GNUNET_MQ_msg_extra (res, sizeof (*mod) + value_size, GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); res->op_id = op->op_id; mod = (struct GNUNET_MessageHeader *) &res[1]; mod->size = htons (sizeof (*mod) + value_size); mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); GNUNET_memcpy (&mod[1], value, value_size); } // FIXME: client might have been disconnected GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env); return GNUNET_YES; } /** * Received result of GNUNET_PSYCSTORE_state_get() * or GNUNET_PSYCSTORE_state_get_prefix() */ static void store_recv_state_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state_get #%" PRIu64 ": " "PSYCSTORE returned %" PRId64 " (%.*s)\n", op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); // FIXME: client might have been disconnected client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); op_remove (op); } static int check_client_state_get (void *cls, const struct StateRequest *req) { struct Client *c = cls; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); return GNUNET_SYSERR; } uint16_t name_size = ntohs (req->header.size) - sizeof (*req); const char *name = (const char *) &req[1]; if (0 == name_size || '\0' != name[name_size - 1]) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Client requests best matching state variable from PSYCstore. */ static void handle_client_state_get (void *cls, const struct StateRequest *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; const char *name = (const char *) &req[1]; struct Operation *op = op_add (chn, client, req->op_id, 0); GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, &store_recv_state_var, &store_recv_state_result, op); GNUNET_SERVICE_client_continue (client); } static int check_client_state_get_prefix (void *cls, const struct StateRequest *req) { struct Client *c = cls; struct Channel *chn = c->channel; if (NULL == chn) { GNUNET_break (0); return GNUNET_SYSERR; } uint16_t name_size = ntohs (req->header.size) - sizeof (*req); const char *name = (const char *) &req[1]; if (0 == name_size || '\0' != name[name_size - 1]) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Client requests state variables with a given prefix from PSYCstore. */ static void handle_client_state_get_prefix (void *cls, const struct StateRequest *req) { struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Channel *chn = c->channel; const char *name = (const char *) &req[1]; struct Operation *op = op_add (chn, client, req->op_id, 0); GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, &store_recv_state_var, &store_recv_state_result, op); GNUNET_SERVICE_client_continue (client); } /** * Initialize the PSYC service. * * @param cls Closure. * @param server The initialized server. * @param c Configuration to use. */ static void run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *svc) { cfg = c; service = svc; store = GNUNET_PSYCSTORE_connect (cfg); stats = GNUNET_STATISTICS_create ("psyc", cfg); masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } /** * Define "main" method using service macro. */ GNUNET_SERVICE_MAIN ("psyc", GNUNET_SERVICE_OPTION_NONE, &run, &client_notify_connect, &client_notify_disconnect, NULL, GNUNET_MQ_hd_fixed_size (client_master_start, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, struct MasterStartRequest, NULL), GNUNET_MQ_hd_var_size (client_slave_join, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, struct SlaveJoinRequest, NULL), GNUNET_MQ_hd_var_size (client_join_decision, GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, struct GNUNET_PSYC_JoinDecisionMessage, NULL), GNUNET_MQ_hd_fixed_size (client_part_request, GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size (client_psyc_message, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_fixed_size (client_membership_store, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, struct ChannelMembershipStoreRequest, NULL), GNUNET_MQ_hd_var_size (client_history_replay, GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, struct GNUNET_PSYC_HistoryRequestMessage, NULL), GNUNET_MQ_hd_var_size (client_state_get, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, struct StateRequest, NULL), GNUNET_MQ_hd_var_size (client_state_get_prefix, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, struct StateRequest, NULL)); /* end of gnunet-service-psyc.c */