/* * This file is part of GNUnet * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License as published * by the Free Software Foundation, either version 3 of the License, * or (at your option) any later version. * * GNUnet is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /** * @file psycutil/psyc_message.c * @brief PSYC utilities; receiving/transmitting/logging PSYC messages. * @author Gabor X Toth */ #include #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_psyc_util_lib.h" #include "gnunet_psyc_service.h" #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__) struct GNUNET_PSYC_TransmitHandle { /** * Client connection to service. */ struct GNUNET_MQ_Handle *mq; /** * Message currently being received from the client. */ struct GNUNET_MessageHeader *msg; /** * Envelope for @a msg */ struct GNUNET_MQ_Envelope *env; /** * Callback to request next modifier from client. */ GNUNET_PSYC_TransmitNotifyModifier notify_mod; /** * Closure for the notify callbacks. */ void *notify_mod_cls; /** * Callback to request next data fragment from client. */ GNUNET_PSYC_TransmitNotifyData notify_data; /** * Closure for the notify callbacks. */ void *notify_data_cls; /** * Modifier of the environment that is currently being transmitted. */ struct GNUNET_PSYC_Modifier *mod; /** * */ const char *mod_value; /** * Number of bytes remaining to be transmitted from the current modifier value. */ uint32_t mod_value_remaining; /** * State of the current message being received from client. */ enum GNUNET_PSYC_MessageState state; /** * Number of PSYC_TRANSMIT_ACK messages we are still waiting for. */ uint8_t acks_pending; /** * Is transmission paused? */ uint8_t paused; /** * Are we currently transmitting a message? */ uint8_t in_transmit; /** * Notify callback is currently being called. */ uint8_t in_notify; }; struct GNUNET_PSYC_ReceiveHandle { /** * Message callback. */ GNUNET_PSYC_MessageCallback message_cb; /** * Message part callback. */ GNUNET_PSYC_MessagePartCallback message_part_cb; /** * Closure for the callbacks. */ void *cb_cls; /** * ID of the message being received from the PSYC service. */ uint64_t message_id; /** * Public key of the slave from which a message is being received. */ struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; /** * State of the currently being received message from the PSYC service. */ enum GNUNET_PSYC_MessageState state; /** * Flags for the currently being received message from the PSYC service. */ enum GNUNET_PSYC_MessageFlags flags; /** * Expected value size for the modifier being received from the PSYC service. */ uint32_t mod_value_size_expected; /** * Actual value size for the modifier being received from the PSYC service. */ uint32_t mod_value_size; }; /**** Messages ****/ /** * Create a PSYC message. * * @param method_name * PSYC method for the message. * @param env * Environment for the message. * @param data * Data payload for the message. * @param data_size * Size of @a data. * * @return Message header with size information, * followed by the message parts. */ struct GNUNET_PSYC_Message * GNUNET_PSYC_message_create (const char *method_name, const struct GNUNET_PSYC_Environment *env, const void *data, size_t data_size) { struct GNUNET_PSYC_Modifier *mod = NULL; struct GNUNET_PSYC_MessageMethod *pmeth = NULL; struct GNUNET_PSYC_MessageModifier *pmod = NULL; struct GNUNET_MessageHeader *pmsg = NULL; uint16_t env_size = 0; if (NULL != env) { mod = GNUNET_PSYC_env_head (env); while (NULL != mod) { env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size; mod = mod->next; } } struct GNUNET_PSYC_Message *msg; uint16_t method_name_size = strlen (method_name) + 1; if (method_name_size == 1) return NULL; uint16_t msg_size = sizeof (*msg) /* header */ + sizeof (*pmeth) + method_name_size /* method */ + env_size /* modifiers */ + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */ + sizeof (*pmsg); /* end of message */ msg = GNUNET_malloc (msg_size); msg->header.size = htons (msg_size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */ pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); GNUNET_memcpy (&pmeth[1], method_name, method_name_size); uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; if (NULL != env) { mod = GNUNET_PSYC_env_head (env); while (NULL != mod) { uint16_t mod_name_size = strlen (mod->name) + 1; pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p); pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size; p += pmod->header.size; pmod->header.size = htons (pmod->header.size); pmod->oper = mod->oper; pmod->name_size = htons (mod_name_size); pmod->value_size = htonl (mod->value_size); GNUNET_memcpy (&pmod[1], mod->name, mod_name_size); if (0 < mod->value_size) GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size); mod = mod->next; } } if (0 < data_size) { pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p); pmsg->size = sizeof (*pmsg) + data_size; p += pmsg->size; pmsg->size = htons (pmsg->size); pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); GNUNET_memcpy (&pmsg[1], data, data_size); } pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p); pmsg->size = htons (sizeof (*pmsg)); pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); GNUNET_assert (p + sizeof (*pmsg) == msg_size); return msg; } void GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, const struct GNUNET_MessageHeader *msg) { uint16_t size = ntohs (msg->size); uint16_t type = ntohs (msg->type); GNUNET_log (kind, "Message of type %d and size %u:\n", type, size); switch (type) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: { const struct GNUNET_PSYC_MessageHeader *pmsg = (const struct GNUNET_PSYC_MessageHeader *) msg; GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n", GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags)); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { const struct GNUNET_PSYC_MessageMethod *meth = (const struct GNUNET_PSYC_MessageMethod *) msg; GNUNET_log (kind, "\t%.*s\n", (int) (size - sizeof (*meth)), (const char *) &meth[1]); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { const struct GNUNET_PSYC_MessageModifier *mod = (const struct GNUNET_PSYC_MessageModifier *) msg; uint16_t name_size = ntohs (mod->name_size); char oper = ' ' < mod->oper ? mod->oper : ' '; GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, (int) name_size, (const char *) &mod[1], (int) (size - sizeof (*mod) - name_size), ((const char *) &mod[1]) + name_size); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: GNUNET_log (kind, "\t%.*s\n", (int) (size - sizeof (*msg)), (const char *) &msg[1]); break; } } /**** Transmitting messages ****/ /** * Create a transmission handle. */ struct GNUNET_PSYC_TransmitHandle * GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) { struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); tmit->mq = mq; return tmit; } /** * Destroy a transmission handle. */ void GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) { GNUNET_free (tmit); } /** * Queue a message part for transmission. * * The message part is added to the current message buffer. * When this buffer is full, it is added to the transmission queue. * * @param tmit * Transmission handle. * @param msg * Message part, or NULL. * @param tmit_now * Transmit message now, or wait for buffer to fill up? * #GNUNET_YES or #GNUNET_NO. */ static void transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, const struct GNUNET_MessageHeader *msg, uint8_t tmit_now) { uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing message part of type %u and size %u (tmit_now: %u)).\n", NULL != msg ? ntohs (msg->type) : 0, size, tmit_now); if (NULL != tmit->msg) { if (NULL == msg || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size) { /* End of message or buffer is full, add it to transmission queue * and start with empty buffer */ tmit->msg->size = htons (tmit->msg->size); GNUNET_MQ_send (tmit->mq, tmit->env); tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } else { /* Message fits in current buffer, append */ GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); tmit->msg->size += size; } } if (NULL == tmit->msg && NULL != msg) { /* Empty buffer, copy over message. */ tmit->env = GNUNET_MQ_msg_extra (tmit->msg, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* store current message size in host byte order * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + size; GNUNET_memcpy (&tmit->msg[1], msg, size); } if (NULL != tmit->msg && (GNUNET_YES == tmit_now || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) { /* End of message or buffer is full, add it to transmission queue. */ tmit->msg->size = htons (tmit->msg->size); GNUNET_MQ_send (tmit->mq, tmit->env); tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } } /** * Request data from client to transmit. * * @param tmit Transmission handle. */ static void transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) { int notify_ret = GNUNET_YES; uint16_t data_size = 0; char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); if (NULL != tmit->notify_data) { data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; tmit->in_notify = GNUNET_YES; notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); tmit->in_notify = GNUNET_NO; } LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_data (ret: %d, size: %u): %.*s\n", notify_ret, data_size, data_size, &msg[1]); switch (notify_ret) { case GNUNET_NO: if (0 == data_size) { /* Transmission paused, nothing to send. */ tmit->paused = GNUNET_YES; return; } break; case GNUNET_YES: tmit->state = GNUNET_PSYC_MESSAGE_STATE_END; break; default: LOG (GNUNET_ERROR_TYPE_ERROR, "TransmitNotifyData callback returned error when requesting data.\n"); tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); msg->size = htons (sizeof (*msg)); transmit_queue_insert (tmit, msg, GNUNET_YES); tmit->in_transmit = GNUNET_NO; return; } if (0 < data_size) { GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); msg->size = htons (sizeof (*msg) + data_size); transmit_queue_insert (tmit, msg, !notify_ret); } /* End of message. */ if (GNUNET_YES == notify_ret) { msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); msg->size = htons (sizeof (*msg)); transmit_queue_insert (tmit, msg, GNUNET_YES); /* FIXME: wait for ACK before setting in_transmit to no */ tmit->in_transmit = GNUNET_NO; } } /** * Request a modifier from a client to transmit. * * @param tmit Transmission handle. */ static void transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) { uint16_t max_data_size = 0; uint16_t data_size = 0; char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; int notify_ret = GNUNET_YES; switch (tmit->state) { case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: { struct GNUNET_PSYC_MessageModifier *mod = (struct GNUNET_PSYC_MessageModifier *) msg; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); if (NULL != tmit->notify_mod) { max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; data_size = max_data_size; tmit->in_notify = GNUNET_YES; notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], &mod->oper, &mod->value_size); tmit->in_notify = GNUNET_NO; } mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_mod (ret: %d, size: %u + %u): %.*s\n", notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]); if (mod->name_size < data_size) { tmit->mod_value_remaining = mod->value_size - (data_size - mod->name_size); mod->value_size = htonl (mod->value_size); mod->name_size = htons (mod->name_size); } else if (0 < data_size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); notify_ret = GNUNET_SYSERR; } break; } case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: { msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); msg->size = sizeof (struct GNUNET_MessageHeader); if (NULL != tmit->notify_mod) { max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; data_size = max_data_size; tmit->in_notify = GNUNET_YES; notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &msg[1], NULL, NULL); tmit->in_notify = GNUNET_NO; } tmit->mod_value_remaining -= data_size; LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_mod (ret: %d, size: %u): %.*s\n", notify_ret, data_size, data_size, &msg[1]); break; } default: GNUNET_assert (0); } switch (notify_ret) { case GNUNET_NO: if (0 == data_size) { /* Transmission paused, nothing to send. */ tmit->paused = GNUNET_YES; return; } tmit->state = (0 == tmit->mod_value_remaining) ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; break; case GNUNET_YES: /* End of modifiers. */ GNUNET_assert (0 == tmit->mod_value_remaining); break; default: LOG (GNUNET_ERROR_TYPE_ERROR, "TransmitNotifyModifier callback returned with error.\n"); tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); msg->size = htons (sizeof (*msg)); transmit_queue_insert (tmit, msg, GNUNET_YES); tmit->in_transmit = GNUNET_NO; return; } if (0 < data_size) { GNUNET_assert (data_size <= max_data_size); msg->size = htons (msg->size + data_size); transmit_queue_insert (tmit, msg, GNUNET_NO); } if (GNUNET_YES == notify_ret) { tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA; if (0 == tmit->acks_pending) transmit_data (tmit); } else { transmit_mod (tmit); } } int transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, uint32_t *full_value_size) { struct GNUNET_PSYC_TransmitHandle *tmit = cls; uint16_t name_size = 0; uint32_t value_size = 0; const char *value = NULL; if (NULL != oper) { /* New modifier */ if (NULL != tmit->mod) tmit->mod = tmit->mod->next; if (NULL == tmit->mod) { /* No more modifiers, continue with data */ *data_size = 0; return GNUNET_YES; } GNUNET_assert (tmit->mod->value_size < UINT32_MAX); *full_value_size = tmit->mod->value_size; *oper = tmit->mod->oper; name_size = strlen (tmit->mod->name) + 1; if (name_size + tmit->mod->value_size <= *data_size) { value_size = tmit->mod->value_size; *data_size = name_size + value_size; } else /* full modifier does not fit in data, continuation needed */ { value_size = *data_size - name_size; tmit->mod_value = tmit->mod->value + value_size; } GNUNET_memcpy (data, tmit->mod->name, name_size); GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size); return GNUNET_NO; } else { /* Modifier continuation */ GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining); value = tmit->mod_value; if (tmit->mod_value_remaining <= *data_size) { value_size = tmit->mod_value_remaining; tmit->mod_value = NULL; } else { value_size = *data_size; tmit->mod_value += value_size; } if (*data_size < value_size) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Value in environment larger than buffer: %u < %zu\n", *data_size, value_size); *data_size = 0; return GNUNET_NO; } *data_size = value_size; GNUNET_memcpy (data, value, value_size); return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO; } } /** * Transmit a message. * * @param tmit * Transmission handle. * @param method_name * Which method should be invoked. * @param env * Environment for the message. * Should stay available until the first call to notify_data. * Can be NULL if there are no modifiers or @a notify_mod is * provided instead. * @param notify_mod * Function to call to obtain modifiers. * Can be NULL if there are no modifiers or @a env is provided instead. * @param notify_data * Function to call to obtain fragments of the data. * @param notify_cls * Closure for @a notify_mod and @a notify_data. * @param flags * Flags for the message being transmitted. * * @return #GNUNET_OK if the transmission was started. * #GNUNET_SYSERR if another transmission is already going on. */ int GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, const char *method_name, const struct GNUNET_PSYC_Environment *env, GNUNET_PSYC_TransmitNotifyModifier notify_mod, GNUNET_PSYC_TransmitNotifyData notify_data, void *notify_cls, uint32_t flags) { if (GNUNET_NO != tmit->in_transmit) return GNUNET_SYSERR; tmit->in_transmit = GNUNET_YES; size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; tmit->env = GNUNET_MQ_msg_extra (tmit->msg, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* store current message size in host byte order * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; if (NULL != notify_mod) { tmit->notify_mod = notify_mod; tmit->notify_mod_cls = notify_cls; } else { tmit->notify_mod = &transmit_notify_env; tmit->notify_mod_cls = tmit; if (NULL != env) { struct GNUNET_PSYC_Modifier mod = {}; mod.next = GNUNET_PSYC_env_head (env); tmit->mod = &mod; struct GNUNET_PSYC_Modifier *m = tmit->mod; while (NULL != (m = m->next)) { if (m->oper != GNUNET_PSYC_OP_SET) flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY; } } else { tmit->mod = NULL; } } pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1]; pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + size); pmeth->flags = htonl (flags); GNUNET_memcpy (&pmeth[1], method_name, size); tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; tmit->notify_data = notify_data; tmit->notify_data_cls = notify_cls; transmit_mod (tmit); return GNUNET_OK; } /** * Resume transmission. * * @param tmit Transmission handle. */ void GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) { if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify) return; if (0 == tmit->acks_pending) { tmit->paused = GNUNET_NO; transmit_data (tmit); } } /** * Abort transmission request. * * @param tmit Transmission handle. */ void GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit) { if (GNUNET_NO == tmit->in_transmit) return; tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; tmit->in_transmit = GNUNET_NO; tmit->paused = GNUNET_NO; /* FIXME */ struct GNUNET_MessageHeader msg; msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); msg.size = htons (sizeof (msg)); transmit_queue_insert (tmit, &msg, GNUNET_YES); } /** * Got acknowledgement of a transmitted message part, continue transmission. * * @param tmit Transmission handle. */ void GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) { if (0 == tmit->acks_pending) { LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); GNUNET_break (0); return; } tmit->acks_pending--; if (GNUNET_YES == tmit->paused) return; switch (tmit->state) { case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: transmit_mod (tmit); break; case GNUNET_PSYC_MESSAGE_STATE_DATA: transmit_data (tmit); break; case GNUNET_PSYC_MESSAGE_STATE_END: case GNUNET_PSYC_MESSAGE_STATE_CANCEL: break; default: LOG (GNUNET_ERROR_TYPE_DEBUG, "Ignoring message ACK in state %u.\n", tmit->state); } } /**** Receiving messages ****/ /** * Create handle for receiving messages. */ struct GNUNET_PSYC_ReceiveHandle * GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb, GNUNET_PSYC_MessagePartCallback message_part_cb, void *cb_cls) { struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv)); recv->message_cb = message_cb; recv->message_part_cb = message_part_cb; recv->cb_cls = cb_cls; return recv; } /** * Destroy handle for receiving messages. */ void GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv) { GNUNET_free (recv); } /** * Reset stored data related to the last received message. */ void GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv) { recv->state = GNUNET_PSYC_MESSAGE_STATE_START; recv->flags = 0; recv->message_id = 0; recv->mod_value_size = 0; recv->mod_value_size_expected = 0; } static void recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) { if (NULL != recv->message_part_cb) recv->message_part_cb (recv->cb_cls, NULL, NULL); if (NULL != recv->message_cb) recv->message_cb (recv->cb_cls, NULL); GNUNET_PSYC_receive_reset (recv); } /** * Handle incoming PSYC message. * * @param recv Receive handle. * @param msg The message. * * @return #GNUNET_OK on success, * #GNUNET_SYSERR on receive error. */ int GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, const struct GNUNET_PSYC_MessageHeader *msg) { uint16_t size = ntohs (msg->header.size); uint32_t flags = ntohl (msg->flags); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, (struct GNUNET_MessageHeader *) msg); if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state) { recv->message_id = GNUNET_ntohll (msg->message_id); recv->flags = flags; recv->slave_pub_key = msg->slave_pub_key; recv->mod_value_size = 0; recv->mod_value_size_expected = 0; } else if (GNUNET_ntohll (msg->message_id) != recv->message_id) { // FIXME LOG (GNUNET_ERROR_TYPE_WARNING, "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", GNUNET_ntohll (msg->message_id), recv->message_id); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } else if (flags != recv->flags) { LOG (GNUNET_ERROR_TYPE_WARNING, "Unexpected message flags. Got: %lu, expected: %lu\n", flags, recv->flags); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; for (pos = 0; sizeof (*msg) + pos < size; pos += psize) { const struct GNUNET_MessageHeader *pmsg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); psize = ntohs (pmsg->size); ptype = ntohs (pmsg->type); size_eq = size_min = 0; if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Dropping message of type %u with invalid size %u.\n", ptype, psize); recv_error (recv); return GNUNET_SYSERR; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message part of type %u and size %u from PSYC.\n", ptype, psize); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); switch (ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: size_min = sizeof (struct GNUNET_PSYC_MessageMethod); break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: size_min = sizeof (struct GNUNET_PSYC_MessageModifier); break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: size_min = sizeof (struct GNUNET_MessageHeader); break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: size_eq = sizeof (struct GNUNET_MessageHeader); break; default: GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } if (! ((0 < size_eq && psize == size_eq) || (0 < size_min && size_min <= psize))) { GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } switch (ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { struct GNUNET_PSYC_MessageMethod *meth = (struct GNUNET_PSYC_MessageMethod *) pmsg; if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping out of order message method (%u).\n", recv->state); /* It is normal to receive an incomplete message right after connecting, * but should not happen later. * FIXME: add a check for this condition. */ GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } if ('\0' != *((char *) meth + psize - 1)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping message with malformed method. " "Message ID: %" PRIu64 "\n", recv->message_id); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD; break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping out of order message modifier (%u).\n", recv->state); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } struct GNUNET_PSYC_MessageModifier *mod = (struct GNUNET_PSYC_MessageModifier *) pmsg; uint16_t name_size = ntohs (mod->name_size); recv->mod_value_size_expected = ntohl (mod->value_size); recv->mod_value_size = psize - sizeof (*mod) - name_size; if (psize < sizeof (*mod) + name_size || '\0' != *((char *) &mod[1] + name_size - 1) || recv->mod_value_size_expected < recv->mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: { recv->mod_value_size += psize - sizeof (*pmsg); if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state) || recv->mod_value_size_expected < recv->mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping out of order message modifier continuation " "!(%u == %u || %u == %u) || %lu < %lu.\n", GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state, GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state, recv->mod_value_size_expected, recv->mod_value_size); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: { if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD || recv->mod_value_size_expected != recv->mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping out of order message data fragment " "(%u < %u || %lu != %lu).\n", recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD, recv->mod_value_size_expected, recv->mod_value_size); GNUNET_break_op (0); recv_error (recv); return GNUNET_SYSERR; } recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA; break; } } if (NULL != recv->message_part_cb) recv->message_part_cb (recv->cb_cls, msg, pmsg); switch (ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: GNUNET_PSYC_receive_reset (recv); break; } } if (NULL != recv->message_cb) recv->message_cb (recv->cb_cls, msg); return GNUNET_OK; } /** * Check if @a data contains a series of valid message parts. * * @param data_size Size of @a data. * @param data Data. * @param[out] first_ptype Type of first message part. * @param[out] last_ptype Type of last message part. * * @return Number of message parts found in @a data. * or GNUNET_SYSERR if the message contains invalid parts. */ int GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, uint16_t *first_ptype, uint16_t *last_ptype) { const struct GNUNET_MessageHeader *pmsg; uint16_t parts = 0, ptype = 0, psize = 0, pos = 0; if (NULL != first_ptype) *first_ptype = 0; if (NULL != last_ptype) *last_ptype = 0; for (pos = 0; pos < data_size; pos += psize, parts++) { pmsg = (const struct GNUNET_MessageHeader *) (data + pos); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); psize = ntohs (pmsg->size); ptype = ntohs (pmsg->type); if (0 == parts && NULL != first_ptype) *first_ptype = ptype; if (NULL != last_ptype && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) *last_ptype = ptype; if (psize < sizeof (*pmsg) || pos + psize > data_size || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Invalid message part of type %u and size %u.\n", ptype, psize); return GNUNET_SYSERR; } /** @todo FIXME: check message part order */ } return parts; } struct ParseMessageClosure { struct GNUNET_PSYC_Environment *env; const char **method_name; const void **data; uint16_t *data_size; enum GNUNET_PSYC_MessageState msg_state; }; static void parse_message_part_cb (void *cls, const struct GNUNET_PSYC_MessageHeader *msg, const struct GNUNET_MessageHeader *pmsg) { struct ParseMessageClosure *pmc = cls; if (NULL == pmsg) { pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; return; } switch (ntohs (pmsg->type)) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { struct GNUNET_PSYC_MessageMethod * pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg; *pmc->method_name = (const char *) &pmeth[1]; pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { struct GNUNET_PSYC_MessageModifier * pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg; const char *name = (const char *) &pmod[1]; const void *value = name + ntohs (pmod->name_size); GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value, ntohl (pmod->value_size)); pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: *pmc->data = &pmsg[1]; *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg); pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; break; default: pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; } } /** * Parse PSYC message. * * @param msg * The PSYC message to parse. * @param[out] method_name * Pointer to the method name inside @a pmsg. * @param env * The environment for the message with a list of modifiers. * @param[out] data * Pointer to data inside @a msg. * @param[out] data_size * Size of @data is written here. * * @return #GNUNET_OK on success, * #GNUNET_SYSERR on parse error. */ int GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg, const char **method_name, struct GNUNET_PSYC_Environment *env, const void **data, uint16_t *data_size) { struct ParseMessageClosure cls; cls.env = env; cls.method_name = method_name; cls.data = data; cls.data_size = data_size; struct GNUNET_PSYC_ReceiveHandle * recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls); int ret = GNUNET_PSYC_receive_message (recv, msg); GNUNET_PSYC_receive_destroy (recv); if (GNUNET_OK != ret) return GNUNET_SYSERR; return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state) ? GNUNET_OK : GNUNET_NO; } /** * Initialize PSYC message header. */ void GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); pmsg->header.size = htons (psize); pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->message_id = mmsg->message_id; pmsg->fragment_offset = mmsg->fragment_offset; pmsg->flags = htonl (flags); GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); } /** * Create a new PSYC message header from a multicast message. */ struct GNUNET_PSYC_MessageHeader * GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); pmsg = GNUNET_malloc (psize); GNUNET_PSYC_message_header_init (pmsg, mmsg, flags); return pmsg; } /** * Create a new PSYC message header from a PSYC message. */ struct GNUNET_PSYC_MessageHeader * GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg) { uint16_t msg_size = ntohs (msg->header.size); struct GNUNET_PSYC_MessageHeader * pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg)); pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg)); GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); return pmsg; }