/*
* This file is part of GNUnet
* Copyright (C) 2013 GNUnet e.V.
*
* GNUnet is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* GNUnet is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
/**
* @file psycutil/psyc_message.c
* @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
* @author Gabor X Toth
*/
#include
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_psyc_util_lib.h"
#include "gnunet_psyc_service.h"
#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
struct GNUNET_PSYC_TransmitHandle
{
/**
* Client connection to service.
*/
struct GNUNET_MQ_Handle *mq;
/**
* Message currently being received from the client.
*/
struct GNUNET_MessageHeader *msg;
/**
* Envelope for @a msg
*/
struct GNUNET_MQ_Envelope *env;
/**
* Callback to request next modifier from client.
*/
GNUNET_PSYC_TransmitNotifyModifier notify_mod;
/**
* Closure for the notify callbacks.
*/
void *notify_mod_cls;
/**
* Callback to request next data fragment from client.
*/
GNUNET_PSYC_TransmitNotifyData notify_data;
/**
* Closure for the notify callbacks.
*/
void *notify_data_cls;
/**
* Modifier of the environment that is currently being transmitted.
*/
struct GNUNET_PSYC_Modifier *mod;
/**
*
*/
const char *mod_value;
/**
* Number of bytes remaining to be transmitted from the current modifier value.
*/
uint32_t mod_value_remaining;
/**
* State of the current message being received from client.
*/
enum GNUNET_PSYC_MessageState state;
/**
* Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
*/
uint8_t acks_pending;
/**
* Is transmission paused?
*/
uint8_t paused;
/**
* Are we currently transmitting a message?
*/
uint8_t in_transmit;
/**
* Notify callback is currently being called.
*/
uint8_t in_notify;
};
struct GNUNET_PSYC_ReceiveHandle
{
/**
* Message callback.
*/
GNUNET_PSYC_MessageCallback message_cb;
/**
* Message part callback.
*/
GNUNET_PSYC_MessagePartCallback message_part_cb;
/**
* Closure for the callbacks.
*/
void *cb_cls;
/**
* ID of the message being received from the PSYC service.
*/
uint64_t message_id;
/**
* Public key of the slave from which a message is being received.
*/
struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
/**
* State of the currently being received message from the PSYC service.
*/
enum GNUNET_PSYC_MessageState state;
/**
* Flags for the currently being received message from the PSYC service.
*/
enum GNUNET_PSYC_MessageFlags flags;
/**
* Expected value size for the modifier being received from the PSYC service.
*/
uint32_t mod_value_size_expected;
/**
* Actual value size for the modifier being received from the PSYC service.
*/
uint32_t mod_value_size;
};
/**** Messages ****/
/**
* Create a PSYC message.
*
* @param method_name
* PSYC method for the message.
* @param env
* Environment for the message.
* @param data
* Data payload for the message.
* @param data_size
* Size of @a data.
*
* @return Message header with size information,
* followed by the message parts.
*/
struct GNUNET_PSYC_Message *
GNUNET_PSYC_message_create (const char *method_name,
const struct GNUNET_PSYC_Environment *env,
const void *data,
size_t data_size)
{
struct GNUNET_PSYC_Modifier *mod = NULL;
struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
struct GNUNET_PSYC_MessageModifier *pmod = NULL;
struct GNUNET_MessageHeader *pmsg = NULL;
uint16_t env_size = 0;
if (NULL != env)
{
mod = GNUNET_PSYC_env_head (env);
while (NULL != mod)
{
env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
mod = mod->next;
}
}
struct GNUNET_PSYC_Message *msg;
uint16_t method_name_size = strlen (method_name) + 1;
if (method_name_size == 1)
return NULL;
uint16_t msg_size = sizeof (*msg) /* header */
+ sizeof (*pmeth) + method_name_size /* method */
+ env_size /* modifiers */
+ ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
+ sizeof (*pmsg); /* end of message */
msg = GNUNET_malloc (msg_size);
msg->header.size = htons (msg_size);
msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
if (NULL != env)
{
mod = GNUNET_PSYC_env_head (env);
while (NULL != mod)
{
uint16_t mod_name_size = strlen (mod->name) + 1;
pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
p += pmod->header.size;
pmod->header.size = htons (pmod->header.size);
pmod->oper = mod->oper;
pmod->name_size = htons (mod_name_size);
pmod->value_size = htonl (mod->value_size);
GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
if (0 < mod->value_size)
GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
mod = mod->next;
}
}
if (0 < data_size)
{
pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
pmsg->size = sizeof (*pmsg) + data_size;
p += pmsg->size;
pmsg->size = htons (pmsg->size);
pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
GNUNET_memcpy (&pmsg[1], data, data_size);
}
pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
pmsg->size = htons (sizeof (*pmsg));
pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
GNUNET_assert (p + sizeof (*pmsg) == msg_size);
return msg;
}
void
GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
const struct GNUNET_MessageHeader *msg)
{
uint16_t size = ntohs (msg->size);
uint16_t type = ntohs (msg->type);
GNUNET_log (kind,
"Message of type %d and size %u:\n",
type,
size);
switch (type)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
{
const struct GNUNET_PSYC_MessageHeader *pmsg
= (const struct GNUNET_PSYC_MessageHeader *) msg;
GNUNET_log (kind,
"\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
GNUNET_ntohll (pmsg->message_id),
ntohl (pmsg->flags));
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
{
const struct GNUNET_PSYC_MessageMethod *meth
= (const struct GNUNET_PSYC_MessageMethod *) msg;
GNUNET_log (kind,
"\t%.*s\n",
(int) (size - sizeof (*meth)),
(const char *) &meth[1]);
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
{
const struct GNUNET_PSYC_MessageModifier *mod
= (const struct GNUNET_PSYC_MessageModifier *) msg;
uint16_t name_size = ntohs (mod->name_size);
char oper = ' ' < mod->oper ? mod->oper : ' ';
GNUNET_log (kind,
"\t%c%.*s\t%.*s\n",
oper,
(int) name_size,
(const char *) &mod[1],
(int) (size - sizeof (*mod) - name_size),
((const char *) &mod[1]) + name_size);
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
GNUNET_log (kind,
"\t%.*s\n",
(int) (size - sizeof (*msg)),
(const char *) &msg[1]);
break;
}
}
/**** Transmitting messages ****/
/**
* Create a transmission handle.
*/
struct GNUNET_PSYC_TransmitHandle *
GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
tmit->mq = mq;
return tmit;
}
/**
* Destroy a transmission handle.
*/
void
GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
{
GNUNET_free (tmit);
}
/**
* Queue a message part for transmission.
*
* The message part is added to the current message buffer.
* When this buffer is full, it is added to the transmission queue.
*
* @param tmit
* Transmission handle.
* @param msg
* Message part, or NULL.
* @param tmit_now
* Transmit message now, or wait for buffer to fill up?
* #GNUNET_YES or #GNUNET_NO.
*/
static void
transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
const struct GNUNET_MessageHeader *msg,
uint8_t tmit_now)
{
uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queueing message part of type %u and size %u (tmit_now: %u)).\n",
NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
if (NULL != tmit->msg)
{
if (NULL == msg
|| GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
{
/* End of message or buffer is full, add it to transmission queue
* and start with empty buffer */
tmit->msg->size = htons (tmit->msg->size);
GNUNET_MQ_send (tmit->mq, tmit->env);
tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
else
{
/* Message fits in current buffer, append */
GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
tmit->msg->size += size;
}
}
if (NULL == tmit->msg && NULL != msg)
{
/* Empty buffer, copy over message. */
tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
/* store current message size in host byte order
* then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + size;
GNUNET_memcpy (&tmit->msg[1], msg, size);
}
if (NULL != tmit->msg
&& (GNUNET_YES == tmit_now
|| (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
< tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
{
/* End of message or buffer is full, add it to transmission queue. */
tmit->msg->size = htons (tmit->msg->size);
GNUNET_MQ_send (tmit->mq, tmit->env);
tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
}
/**
* Request data from client to transmit.
*
* @param tmit Transmission handle.
*/
static void
transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
{
int notify_ret = GNUNET_YES;
uint16_t data_size = 0;
char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
if (NULL != tmit->notify_data)
{
data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
tmit->in_notify = GNUNET_NO;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"transmit_data (ret: %d, size: %u): %.*s\n",
notify_ret, data_size, data_size, &msg[1]);
switch (notify_ret)
{
case GNUNET_NO:
if (0 == data_size)
{
/* Transmission paused, nothing to send. */
tmit->paused = GNUNET_YES;
return;
}
break;
case GNUNET_YES:
tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
break;
default:
LOG (GNUNET_ERROR_TYPE_ERROR,
"TransmitNotifyData callback returned error when requesting data.\n");
tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
transmit_queue_insert (tmit, msg, GNUNET_YES);
tmit->in_transmit = GNUNET_NO;
return;
}
if (0 < data_size)
{
GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
msg->size = htons (sizeof (*msg) + data_size);
transmit_queue_insert (tmit, msg, !notify_ret);
}
/* End of message. */
if (GNUNET_YES == notify_ret)
{
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
msg->size = htons (sizeof (*msg));
transmit_queue_insert (tmit, msg, GNUNET_YES);
/* FIXME: wait for ACK before setting in_transmit to no */
tmit->in_transmit = GNUNET_NO;
}
}
/**
* Request a modifier from a client to transmit.
*
* @param tmit Transmission handle.
*/
static void
transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
{
uint16_t max_data_size = 0;
uint16_t data_size = 0;
char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
int notify_ret = GNUNET_YES;
switch (tmit->state)
{
case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
{
struct GNUNET_PSYC_MessageModifier *mod
= (struct GNUNET_PSYC_MessageModifier *) msg;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
if (NULL != tmit->notify_mod)
{
max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
data_size = max_data_size;
tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
&mod->oper, &mod->value_size);
tmit->in_notify = GNUNET_NO;
}
mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"transmit_mod (ret: %d, size: %u + %u): %.*s\n",
notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
if (mod->name_size < data_size)
{
tmit->mod_value_remaining
= mod->value_size - (data_size - mod->name_size);
mod->value_size = htonl (mod->value_size);
mod->name_size = htons (mod->name_size);
}
else if (0 < data_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
notify_ret = GNUNET_SYSERR;
}
break;
}
case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
{
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
msg->size = sizeof (struct GNUNET_MessageHeader);
if (NULL != tmit->notify_mod)
{
max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
data_size = max_data_size;
tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
&data_size, &msg[1], NULL, NULL);
tmit->in_notify = GNUNET_NO;
}
tmit->mod_value_remaining -= data_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"transmit_mod (ret: %d, size: %u): %.*s\n",
notify_ret, data_size, data_size, &msg[1]);
break;
}
default:
GNUNET_assert (0);
}
switch (notify_ret)
{
case GNUNET_NO:
if (0 == data_size)
{ /* Transmission paused, nothing to send. */
tmit->paused = GNUNET_YES;
return;
}
tmit->state
= (0 == tmit->mod_value_remaining)
? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
: GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
break;
case GNUNET_YES: /* End of modifiers. */
GNUNET_assert (0 == tmit->mod_value_remaining);
break;
default:
LOG (GNUNET_ERROR_TYPE_ERROR,
"TransmitNotifyModifier callback returned with error.\n");
tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
transmit_queue_insert (tmit, msg, GNUNET_YES);
tmit->in_transmit = GNUNET_NO;
return;
}
if (0 < data_size)
{
GNUNET_assert (data_size <= max_data_size);
msg->size = htons (msg->size + data_size);
transmit_queue_insert (tmit, msg, GNUNET_NO);
}
if (GNUNET_YES == notify_ret)
{
tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
if (0 == tmit->acks_pending)
transmit_data (tmit);
}
else
{
transmit_mod (tmit);
}
}
int
transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
uint32_t *full_value_size)
{
struct GNUNET_PSYC_TransmitHandle *tmit = cls;
uint16_t name_size = 0;
uint32_t value_size = 0;
const char *value = NULL;
if (NULL != oper)
{ /* New modifier */
if (NULL != tmit->mod)
tmit->mod = tmit->mod->next;
if (NULL == tmit->mod)
{ /* No more modifiers, continue with data */
*data_size = 0;
return GNUNET_YES;
}
GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
*full_value_size = tmit->mod->value_size;
*oper = tmit->mod->oper;
name_size = strlen (tmit->mod->name) + 1;
if (name_size + tmit->mod->value_size <= *data_size)
{
value_size = tmit->mod->value_size;
*data_size = name_size + value_size;
}
else /* full modifier does not fit in data, continuation needed */
{
value_size = *data_size - name_size;
tmit->mod_value = tmit->mod->value + value_size;
}
GNUNET_memcpy (data, tmit->mod->name, name_size);
GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
return GNUNET_NO;
}
else
{ /* Modifier continuation */
GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
value = tmit->mod_value;
if (tmit->mod_value_remaining <= *data_size)
{
value_size = tmit->mod_value_remaining;
tmit->mod_value = NULL;
}
else
{
value_size = *data_size;
tmit->mod_value += value_size;
}
if (*data_size < value_size)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Value in environment larger than buffer: %u < %zu\n",
*data_size, value_size);
*data_size = 0;
return GNUNET_NO;
}
*data_size = value_size;
GNUNET_memcpy (data, value, value_size);
return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
}
}
/**
* Transmit a message.
*
* @param tmit
* Transmission handle.
* @param method_name
* Which method should be invoked.
* @param env
* Environment for the message.
* Should stay available until the first call to notify_data.
* Can be NULL if there are no modifiers or @a notify_mod is
* provided instead.
* @param notify_mod
* Function to call to obtain modifiers.
* Can be NULL if there are no modifiers or @a env is provided instead.
* @param notify_data
* Function to call to obtain fragments of the data.
* @param notify_cls
* Closure for @a notify_mod and @a notify_data.
* @param flags
* Flags for the message being transmitted.
*
* @return #GNUNET_OK if the transmission was started.
* #GNUNET_SYSERR if another transmission is already going on.
*/
int
GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
const char *method_name,
const struct GNUNET_PSYC_Environment *env,
GNUNET_PSYC_TransmitNotifyModifier notify_mod,
GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
uint32_t flags)
{
if (GNUNET_NO != tmit->in_transmit)
return GNUNET_SYSERR;
tmit->in_transmit = GNUNET_YES;
size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
/* store current message size in host byte order
* then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
if (NULL != notify_mod)
{
tmit->notify_mod = notify_mod;
tmit->notify_mod_cls = notify_cls;
}
else
{
tmit->notify_mod = &transmit_notify_env;
tmit->notify_mod_cls = tmit;
if (NULL != env)
{
struct GNUNET_PSYC_Modifier mod = {};
mod.next = GNUNET_PSYC_env_head (env);
tmit->mod = &mod;
struct GNUNET_PSYC_Modifier *m = tmit->mod;
while (NULL != (m = m->next))
{
if (m->oper != GNUNET_PSYC_OP_SET)
flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
}
}
else
{
tmit->mod = NULL;
}
}
pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
pmeth->header.size = htons (sizeof (*pmeth) + size);
pmeth->flags = htonl (flags);
GNUNET_memcpy (&pmeth[1], method_name, size);
tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
tmit->notify_data = notify_data;
tmit->notify_data_cls = notify_cls;
transmit_mod (tmit);
return GNUNET_OK;
}
/**
* Resume transmission.
*
* @param tmit Transmission handle.
*/
void
GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
{
if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
return;
if (0 == tmit->acks_pending)
{
tmit->paused = GNUNET_NO;
transmit_data (tmit);
}
}
/**
* Abort transmission request.
*
* @param tmit Transmission handle.
*/
void
GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
{
if (GNUNET_NO == tmit->in_transmit)
return;
tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
tmit->in_transmit = GNUNET_NO;
tmit->paused = GNUNET_NO;
/* FIXME */
struct GNUNET_MessageHeader msg;
msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg.size = htons (sizeof (msg));
transmit_queue_insert (tmit, &msg, GNUNET_YES);
}
/**
* Got acknowledgement of a transmitted message part, continue transmission.
*
* @param tmit Transmission handle.
*/
void
GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
{
if (0 == tmit->acks_pending)
{
LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
GNUNET_break (0);
return;
}
tmit->acks_pending--;
if (GNUNET_YES == tmit->paused)
return;
switch (tmit->state)
{
case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
transmit_mod (tmit);
break;
case GNUNET_PSYC_MESSAGE_STATE_DATA:
transmit_data (tmit);
break;
case GNUNET_PSYC_MESSAGE_STATE_END:
case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring message ACK in state %u.\n", tmit->state);
}
}
/**** Receiving messages ****/
/**
* Create handle for receiving messages.
*/
struct GNUNET_PSYC_ReceiveHandle *
GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
GNUNET_PSYC_MessagePartCallback message_part_cb,
void *cb_cls)
{
struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
recv->message_cb = message_cb;
recv->message_part_cb = message_part_cb;
recv->cb_cls = cb_cls;
return recv;
}
/**
* Destroy handle for receiving messages.
*/
void
GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
{
GNUNET_free (recv);
}
/**
* Reset stored data related to the last received message.
*/
void
GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
{
recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
recv->flags = 0;
recv->message_id = 0;
recv->mod_value_size = 0;
recv->mod_value_size_expected = 0;
}
static void
recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
{
if (NULL != recv->message_part_cb)
recv->message_part_cb (recv->cb_cls, NULL, NULL);
if (NULL != recv->message_cb)
recv->message_cb (recv->cb_cls, NULL);
GNUNET_PSYC_receive_reset (recv);
}
/**
* Handle incoming PSYC message.
*
* @param recv Receive handle.
* @param msg The message.
*
* @return #GNUNET_OK on success,
* #GNUNET_SYSERR on receive error.
*/
int
GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
const struct GNUNET_PSYC_MessageHeader *msg)
{
uint16_t size = ntohs (msg->header.size);
uint32_t flags = ntohl (msg->flags);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
(struct GNUNET_MessageHeader *) msg);
if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
{
recv->message_id = GNUNET_ntohll (msg->message_id);
recv->flags = flags;
recv->slave_pub_key = msg->slave_pub_key;
recv->mod_value_size = 0;
recv->mod_value_size_expected = 0;
}
else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
{
// FIXME
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
GNUNET_ntohll (msg->message_id), recv->message_id);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
else if (flags != recv->flags)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message flags. Got: %lu, expected: %lu\n",
flags, recv->flags);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
{
const struct GNUNET_MessageHeader *pmsg
= (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
psize = ntohs (pmsg->size);
ptype = ntohs (pmsg->type);
size_eq = size_min = 0;
if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Dropping message of type %u with invalid size %u.\n",
ptype, psize);
recv_error (recv);
return GNUNET_SYSERR;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received message part of type %u and size %u from PSYC.\n",
ptype, psize);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
size_min = sizeof (struct GNUNET_MessageHeader);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
size_eq = sizeof (struct GNUNET_MessageHeader);
break;
default:
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
if (! ((0 < size_eq && psize == size_eq)
|| (0 < size_min && size_min <= psize)))
{
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
{
struct GNUNET_PSYC_MessageMethod *meth
= (struct GNUNET_PSYC_MessageMethod *) pmsg;
if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Dropping out of order message method (%u).\n",
recv->state);
/* It is normal to receive an incomplete message right after connecting,
* but should not happen later.
* FIXME: add a check for this condition.
*/
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
if ('\0' != *((char *) meth + psize - 1))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Dropping message with malformed method. "
"Message ID: %" PRIu64 "\n", recv->message_id);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
{
if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
|| GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
|| GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Dropping out of order message modifier (%u).\n",
recv->state);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
struct GNUNET_PSYC_MessageModifier *mod
= (struct GNUNET_PSYC_MessageModifier *) pmsg;
uint16_t name_size = ntohs (mod->name_size);
recv->mod_value_size_expected = ntohl (mod->value_size);
recv->mod_value_size = psize - sizeof (*mod) - name_size;
if (psize < sizeof (*mod) + name_size
|| '\0' != *((char *) &mod[1] + name_size - 1)
|| recv->mod_value_size_expected < recv->mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
{
recv->mod_value_size += psize - sizeof (*pmsg);
if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
|| GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
|| recv->mod_value_size_expected < recv->mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Dropping out of order message modifier continuation "
"!(%u == %u || %u == %u) || %lu < %lu.\n",
GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
recv->mod_value_size_expected, recv->mod_value_size);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
{
if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
|| recv->mod_value_size_expected != recv->mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Dropping out of order message data fragment "
"(%u < %u || %lu != %lu).\n",
recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
recv->mod_value_size_expected, recv->mod_value_size);
GNUNET_break_op (0);
recv_error (recv);
return GNUNET_SYSERR;
}
recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
break;
}
}
if (NULL != recv->message_part_cb)
recv->message_part_cb (recv->cb_cls, msg, pmsg);
switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
GNUNET_PSYC_receive_reset (recv);
break;
}
}
if (NULL != recv->message_cb)
recv->message_cb (recv->cb_cls, msg);
return GNUNET_OK;
}
/**
* Check if @a data contains a series of valid message parts.
*
* @param data_size Size of @a data.
* @param data Data.
* @param[out] first_ptype Type of first message part.
* @param[out] last_ptype Type of last message part.
*
* @return Number of message parts found in @a data.
* or GNUNET_SYSERR if the message contains invalid parts.
*/
int
GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
uint16_t *first_ptype, uint16_t *last_ptype)
{
const struct GNUNET_MessageHeader *pmsg;
uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
if (NULL != first_ptype)
*first_ptype = 0;
if (NULL != last_ptype)
*last_ptype = 0;
for (pos = 0; pos < data_size; pos += psize, parts++)
{
pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
psize = ntohs (pmsg->size);
ptype = ntohs (pmsg->type);
if (0 == parts && NULL != first_ptype)
*first_ptype = ptype;
if (NULL != last_ptype
&& *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
*last_ptype = ptype;
if (psize < sizeof (*pmsg)
|| pos + psize > data_size
|| ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
|| GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Invalid message part of type %u and size %u.\n",
ptype, psize);
return GNUNET_SYSERR;
}
/** @todo FIXME: check message part order */
}
return parts;
}
struct ParseMessageClosure
{
struct GNUNET_PSYC_Environment *env;
const char **method_name;
const void **data;
uint16_t *data_size;
enum GNUNET_PSYC_MessageState msg_state;
};
static void
parse_message_part_cb (void *cls,
const struct GNUNET_PSYC_MessageHeader *msg,
const struct GNUNET_MessageHeader *pmsg)
{
struct ParseMessageClosure *pmc = cls;
if (NULL == pmsg)
{
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
return;
}
switch (ntohs (pmsg->type))
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
{
struct GNUNET_PSYC_MessageMethod *
pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
*pmc->method_name = (const char *) &pmeth[1];
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
{
struct GNUNET_PSYC_MessageModifier *
pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
const char *name = (const char *) &pmod[1];
const void *value = name + ntohs (pmod->name_size);
GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
ntohl (pmod->value_size));
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
*pmc->data = &pmsg[1];
*pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
break;
default:
pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
}
}
/**
* Parse PSYC message.
*
* @param msg
* The PSYC message to parse.
* @param[out] method_name
* Pointer to the method name inside @a pmsg.
* @param env
* The environment for the message with a list of modifiers.
* @param[out] data
* Pointer to data inside @a msg.
* @param[out] data_size
* Size of @data is written here.
*
* @return #GNUNET_OK on success,
* #GNUNET_SYSERR on parse error.
*/
int
GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
const char **method_name,
struct GNUNET_PSYC_Environment *env,
const void **data,
uint16_t *data_size)
{
struct ParseMessageClosure cls;
cls.env = env;
cls.method_name = method_name;
cls.data = data;
cls.data_size = data_size;
struct GNUNET_PSYC_ReceiveHandle *
recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
int ret = GNUNET_PSYC_receive_message (recv, msg);
GNUNET_PSYC_receive_destroy (recv);
if (GNUNET_OK != ret)
return GNUNET_SYSERR;
return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
? GNUNET_OK
: GNUNET_NO;
}
/**
* Initialize PSYC message header.
*/
void
GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
const struct GNUNET_MULTICAST_MessageHeader *mmsg,
uint32_t flags)
{
uint16_t size = ntohs (mmsg->header.size);
uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
pmsg->header.size = htons (psize);
pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
pmsg->message_id = mmsg->message_id;
pmsg->fragment_offset = mmsg->fragment_offset;
pmsg->flags = htonl (flags);
GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
}
/**
* Create a new PSYC message header from a multicast message.
*/
struct GNUNET_PSYC_MessageHeader *
GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
uint32_t flags)
{
struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t size = ntohs (mmsg->header.size);
uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
pmsg = GNUNET_malloc (psize);
GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
return pmsg;
}
/**
* Create a new PSYC message header from a PSYC message.
*/
struct GNUNET_PSYC_MessageHeader *
GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
{
uint16_t msg_size = ntohs (msg->header.size);
struct GNUNET_PSYC_MessageHeader *
pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
return pmsg;
}