From 1a0ffe2288b97b47a5b2bfbda2f9438680429422 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Mon, 6 Jan 2014 00:09:43 +0000 Subject: psyc: ipc messages, notify callback for modifiers, tests --- contrib/logread.pl | 2 +- src/include/gnunet_protocols.h | 2 +- src/include/gnunet_psyc_service.h | 7 +- src/multicast/multicast_api.c | 9 +- src/psyc/gnunet-service-psyc.c | 323 ++++++++------------------- src/psyc/psyc_api.c | 443 +++++++++++++++++++++----------------- src/psyc/test_psyc.c | 224 ++++++++++++------- src/psyc/test_psyc.conf | 7 + 8 files changed, 502 insertions(+), 515 deletions(-) diff --git a/contrib/logread.pl b/contrib/logread.pl index c6f82a68d..11baf2d86 100755 --- a/contrib/logread.pl +++ b/contrib/logread.pl @@ -98,7 +98,7 @@ while (<>) s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex; # Add message type names - s/(message(?:\s+of)?\s+type\s+)(\d+)/ + s/(message(?:\s+part)?(?:\s+of)?\s+type\s+)(\d+)/ $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') . CYAN " ($2)"/e; diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 9ca4155e8..2470b3ab1 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2130,7 +2130,7 @@ extern "C" #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697 -#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698 +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 698 #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index eb17c9351..f843fbe1f 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -426,6 +426,11 @@ typedef int uint16_t *data_size, void *data); +typedef int +(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls, + uint16_t *data_size, + void *data, + uint8_t *oper); /** * Flags for transmitting messages to a channel by the master. @@ -472,7 +477,7 @@ struct GNUNET_PSYC_MasterTransmitHandle; struct GNUNET_PSYC_MasterTransmitHandle * GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, const char *method_name, - GNUNET_PSYC_MasterTransmitNotify notify_mod, + GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, GNUNET_PSYC_MasterTransmitNotify notify_data, void *notify_cls, enum GNUNET_PSYC_MasterTransmitFlags flags); diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 6b784c2f0..bb6a57b58 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -362,8 +362,9 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; + char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; struct GNUNET_MULTICAST_MessageHeader *msg - = GNUNET_malloc (buf_size); + = (struct GNUNET_MULTICAST_MessageHeader *) buf; int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) @@ -380,12 +381,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (sizeof (*msg) + buf_size); - msg->message_id = mh->message_id; + msg->message_id = GNUNET_htonll (mh->message_id); msg->group_generation = mh->group_generation; /* FIXME: add fragment ID and signature in the service instead of here */ - msg->fragment_id = orig->next_fragment_id++; - msg->fragment_offset = mh->fragment_offset; + msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++); + msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset); mh->fragment_offset += sizeof (*msg) + buf_size; msg->purpose.size = htonl (sizeof (*msg) + buf_size - sizeof (msg->header) diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 628c39900..e5de7dcda 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -171,8 +171,8 @@ struct Slave }; -static void -transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); +static inline void +transmit_message (struct Channel *ch); /** @@ -205,6 +205,7 @@ client_cleanup (struct Channel *ch) struct Master *mst = (struct Master *) ch; if (NULL != mst->origin) GNUNET_MULTICAST_origin_stop (mst->origin); + GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst); } else { @@ -251,7 +252,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) /* Send pending messages to multicast before cleanup. */ if (NULL != ch->tmit_head) { - transmit_message (ch, GNUNET_TIME_UNIT_ZERO); + transmit_message (ch); } else { @@ -321,6 +322,10 @@ message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, const struct GNUNET_MessageHeader *msg = cls; struct Channel *ch = chan; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message of type %u and size %u to client 0x%zx.\n", + ntohs (msg->type), ntohs (msg->size), ch->client); + GNUNET_SERVER_notification_context_add (nc, ch->client); GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); @@ -363,24 +368,6 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_MULTICAST_MessageHeader *) msg, 0, NULL, NULL); - uint16_t size = ntohs (msg->size); - uint16_t psize = 0; - uint16_t pos = 0; - - for (pos = 0; sizeof (*msg) + pos < size; pos += psize) - { - const struct GNUNET_MessageHeader *pmsg - = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - uint16_t psize = ntohs (pmsg->size); - if (sizeof (*msg) + pos + psize > size) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Message received from multicast contains invalid PSYC " - "message. Not sending to clients.\n"); - return; - } - } - #if TODO /* FIXME: apply modifiers to state in PSYCstore */ GNUNET_PSYCSTORE_state_modify (store, chan_key, @@ -393,6 +380,26 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) = (const struct GNUNET_MULTICAST_MessageHeader *) msg; struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (msg->size); + uint16_t psize = 0; + uint16_t pos = 0; + + for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize) + { + const struct GNUNET_MessageHeader *pmsg + = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos); + psize = ntohs (pmsg->size); + if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Received invalid message part of type %u and size %u " + "from multicast. Not sending to clients.\n", + ntohs (pmsg->type), psize); + GNUNET_break_op (0); + return; + } + } + psize = sizeof (*pmsg) + size - sizeof (*mmsg); pmsg = GNUNET_malloc (psize); pmsg->header.size = htons (psize); @@ -572,19 +579,18 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, /** - * Send transmission acknowledgement to a client. + * Send acknowledgement to a client. * - * Sent after the last GNUNET_PSYC_MessageModifier and after each - * GNUNET_PSYC_MessageData. + * Sent after a message fragment has been passed on to multicast. * * @param ch The channel struct for the client. */ static void -send_transmit_ack (struct Channel *ch) +send_message_ack (struct Channel *ch) { struct GNUNET_MessageHeader res; res.size = htons (sizeof (res)); - res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); + res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); GNUNET_SERVER_notification_context_add (nc, ch->client); GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, @@ -599,9 +605,9 @@ static int transmit_notify (void *cls, size_t *data_size, void *data) { struct Channel *ch = cls; - struct TransmitMessage *msg = ch->tmit_head; + struct TransmitMessage *tmit_msg = ch->tmit_head; - if (NULL == msg || *data_size < msg->size) + if (NULL == tmit_msg || *data_size < tmit_msg->size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); *data_size = 0; @@ -609,21 +615,22 @@ transmit_notify (void *cls, size_t *data_size, void *data) } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "transmit_notify: sending %u bytes.\n", msg->size); + "transmit_notify: sending %u bytes.\n", tmit_msg->size); - *data_size = msg->size; - memcpy (data, msg->buf, *data_size); + *data_size = tmit_msg->size; + memcpy (data, tmit_msg->buf, *data_size); - GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); - GNUNET_free (msg); + GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); + GNUNET_free (tmit_msg); int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; + send_message_ack (ch); if (0 == ch->tmit_task) { if (NULL != ch->tmit_head) { - transmit_message (ch, GNUNET_TIME_UNIT_ZERO); + transmit_message (ch); } else if (ch->disconnected) { @@ -640,11 +647,9 @@ transmit_notify (void *cls, size_t *data_size, void *data) * Transmit a message from a channel master to the multicast group. */ static void -master_transmit_message (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +master_transmit_message (struct Master *mst) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); - struct Master *mst = cls; mst->channel.tmit_task = 0; if (NULL == mst->tmit_handle) { @@ -664,10 +669,8 @@ master_transmit_message (void *cls, * Transmit a message from a channel slave to the multicast group. */ static void -slave_transmit_message (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +slave_transmit_message (struct Slave *slv) { - struct Slave *slv = cls; slv->channel.tmit_task = 0; if (NULL == slv->tmit_handle) { @@ -682,214 +685,85 @@ slave_transmit_message (void *cls, } -/** - * Schedule message transmission from a channel to the multicast group. - * - * @param ch The channel. - * @param delay Transmission delay. - */ -static void -transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) +static inline void +transmit_message (struct Channel *ch) { - if (0 != ch->tmit_task) - GNUNET_SCHEDULER_cancel (ch->tmit_task); - - ch->tmit_task - = ch->is_master - ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch) - : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); -} - - -/** - * Queue incoming message parts from a client for transmission, and send them to - * the multicast group when the buffer is full or reached the end of message. - * - * @param ch Channel struct for the client. - * @param msg Message from the client. - * - * @return #GNUNET_OK on success, else #GNUNET_SYSERR. - */ -static int -queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) -{ - uint16_t size = ntohs (msg->size); - struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; - struct TransmitMessage *tmit_msg = ch->tmit_tail; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u and size %u " - "for transmission to multicast.\n", - ntohs (msg->type), size); - - if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size) - return GNUNET_SYSERR; - - if (NULL == tmit_msg - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Appending message to new buffer.\n"); - /* Start filling up new buffer */ - tmit_msg = GNUNET_new (struct TransmitMessage); - tmit_msg->buf = GNUNET_malloc (size); - memcpy (tmit_msg->buf, msg, size); - tmit_msg->size = size; - tmit_msg->state = ch->tmit_state; - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Appending message to existing buffer.\n"); - /* Append to existing buffer */ - tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); - memcpy (tmit_msg->buf + tmit_msg->size, msg, size); - tmit_msg->size += size; - tmit_msg->state = ch->tmit_state; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); - - /* Wait a bit for the remaining message parts from the client - if there's still some space left in the buffer. */ - if (tmit_msg->state < MSG_STATE_END - && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader) - < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD)) - { - tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); - } - else - { - send_transmit_ack (ch); - } - - transmit_message (ch, tmit_delay); - - return GNUNET_OK; + ch->is_master + ? master_transmit_message ((struct Master *) ch) + : slave_transmit_message ((struct Slave *) ch); } static void transmit_error (struct Channel *ch) { - struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); + struct GNUNET_MessageHeader *msg; + struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + + sizeof (*msg)); + msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; msg->size = ntohs (sizeof (*msg)); msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - queue_message (ch, msg); + tmit_msg->buf = (char *) &tmit_msg[1]; + tmit_msg->size = sizeof (*msg); + tmit_msg->state = ch->tmit_state; + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); + transmit_message (ch); + + /* FIXME: cleanup */ GNUNET_SERVER_client_disconnect (ch->client); } -/** - * Incoming method from a client. - */ -static void -handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) -{ - /* const struct GNUNET_PSYC_MessageMethod *meth - = (const struct GNUNET_PSYC_MessageMethod *) msg; */ - struct Channel *ch - = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (NULL != ch); - - if (MSG_STATE_START != ch->tmit_state) - { - transmit_error (ch); - return; - } - ch->tmit_state = MSG_STATE_METHOD; - - queue_message (ch, msg); - send_transmit_ack (ch); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -}; - - -/** - * Incoming modifier from a client. - */ -static void -handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) -{ - const struct GNUNET_PSYC_MessageModifier *mod - = (const struct GNUNET_PSYC_MessageModifier *) msg; - - struct Channel *ch - = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (NULL != ch); - - if (MSG_STATE_METHOD != ch->tmit_state - || MSG_STATE_MODIFIER != ch->tmit_state - || MSG_STATE_MOD_CONT != ch->tmit_state - || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) - { - transmit_error (ch); - return; - } - ch->tmit_mod_value_size_expected = ntohl (mod->value_size); - ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1; - - queue_message (ch, msg); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -}; - /** - * Incoming modifier from a client. + * Incoming message from a client. */ static void -handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); - ch->tmit_mod_value_size += ntohs (msg->size); + uint16_t size = ntohs (msg->size); + uint16_t psize = 0, pos = 0; - if (MSG_STATE_MODIFIER != ch->tmit_state - || MSG_STATE_MOD_CONT != ch->tmit_state - || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size) + if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n"); + GNUNET_break (0); transmit_error (ch); return; } - ch->tmit_state = MSG_STATE_MOD_CONT; - - queue_message (ch, msg); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -}; - -/** - * Incoming data from a client. - */ -static void -handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) -{ - struct Channel *ch - = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (NULL != ch); - - if (MSG_STATE_METHOD != ch->tmit_state - || MSG_STATE_MODIFIER != ch->tmit_state - || MSG_STATE_MOD_CONT != ch->tmit_state - || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) + for (pos = 0; sizeof (*msg) + pos < size; pos += psize) { - transmit_error (ch); - return; + const struct GNUNET_MessageHeader *pmsg + = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); + psize = ntohs (pmsg->size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message part of type %u and size %u " + "from client.\n", ntohs (pmsg->type), psize); + if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Received invalid message part of type %u and size %u " + "from client.\n", ntohs (pmsg->type), psize); + GNUNET_break (0); + transmit_error (ch); + return; + } } - ch->tmit_state = MSG_STATE_DATA; - queue_message (ch, msg); - send_transmit_ack (ch); - - if (MSG_STATE_END <= ch->tmit_state) - ch->tmit_state = MSG_STATE_START; + size -= sizeof (*msg); + struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); + tmit_msg->buf = (char *) &tmit_msg[1]; + memcpy (tmit_msg->buf, &msg[1], size); + tmit_msg->size = size; + tmit_msg->state = ch->tmit_state; + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); + transmit_message (ch); GNUNET_SERVER_receive_done (client, GNUNET_OK); }; @@ -912,22 +786,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, { &handle_slave_join, NULL, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, -#if TODO + { &handle_psyc_message, NULL, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, -#endif - { &handle_transmit_method, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, - - { &handle_transmit_modifier, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, - - { &handle_transmit_mod_cont, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 }, - - { &handle_transmit_data, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, - { NULL, NULL, 0, 0 } }; cfg = c; diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index a5a01fa92..e904e00b5 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -45,7 +45,7 @@ struct OperationHandle { struct OperationHandle *prev; struct OperationHandle *next; - const struct GNUNET_MessageHeader *msg; + struct GNUNET_MessageHeader *msg; }; /** @@ -78,6 +78,11 @@ struct GNUNET_PSYC_Channel */ struct OperationHandle *tmit_tail; + /** + * Message being transmitted to the PSYC service. + */ + struct OperationHandle *tmit_msg; + /** * Message to send on reconnect. */ @@ -138,11 +143,6 @@ struct GNUNET_PSYC_Channel */ uint32_t recv_mod_value_size; - /** - * Buffer space available for transmitting the next data fragment. - */ - uint16_t tmit_size; // FIXME - /** * Is transmission paused? */ @@ -151,7 +151,7 @@ struct GNUNET_PSYC_Channel /** * Are we still waiting for a PSYC_TRANSMIT_ACK? */ - uint8_t tmit_ack_pending; // FIXME + uint8_t tmit_ack_pending; /** * Are we polling for incoming messages right now? @@ -176,7 +176,7 @@ struct GNUNET_PSYC_Channel struct GNUNET_PSYC_MasterTransmitHandle { struct GNUNET_PSYC_Master *master; - GNUNET_PSYC_MasterTransmitNotify notify_mod; + GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod; GNUNET_PSYC_MasterTransmitNotify notify_data; void *notify_cls; enum MessageState state; @@ -246,16 +246,14 @@ struct GNUNET_PSYC_StateQuery }; -/** - * Try again to connect to the PSYC service. - * - * @param cls Handle to the PSYC service. - * @param tc Scheduler context - */ static void reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static void +master_transmit_data (struct GNUNET_PSYC_Master *mst); + + /** * Reschedule a connect attempt to the service. * @@ -323,6 +321,79 @@ recv_error (struct GNUNET_PSYC_Channel *ch) message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); } + +/** + * Queue an incoming message part for transmission to the PSYC service. + * + * The message part is added to the current message buffer. + * When this buffer is full, it is added to the transmission queue. + * + * @param ch Channel struct for the client. + * @param msg Modifier message part, or NULL when there's no more modifiers. + * @param end End of message. + */ +static void +queue_message (struct GNUNET_PSYC_Channel *ch, + const struct GNUNET_MessageHeader *msg, + uint8_t end) +{ + uint16_t size = msg ? ntohs (msg->size) : 0; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queueing message of type %u and size %u (end: %u)).\n", + ntohs (msg->type), size, end); + + struct OperationHandle *op = ch->tmit_msg; + if (NULL != op) + { + if (NULL == msg + || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) + { + /* End of message or buffer is full, add it to transmission queue + * and start with empty buffer */ + op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + op->msg->size = htons (op->msg->size); + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + ch->tmit_msg = op = NULL; + ch->tmit_ack_pending++; + } + else + { + /* Message fits in current buffer, append */ + ch->tmit_msg = op + = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); + op->msg = (struct GNUNET_MessageHeader *) &op[1]; + memcpy ((char *) op->msg + op->msg->size, msg, size); + op->msg->size += size; + } + } + + if (NULL == op && NULL != msg) + { + /* Empty buffer, copy over message. */ + ch->tmit_msg = op + = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); + op->msg = (struct GNUNET_MessageHeader *) &op[1]; + op->msg->size = sizeof (*op->msg) + size; + memcpy (&op->msg[1], msg, size); + } + + if (NULL != op + && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) + { + /* End of message or buffer is full, add it to transmission queue. */ + op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + op->msg->size = htons (op->msg->size); + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + ch->tmit_msg = op = NULL; + ch->tmit_ack_pending++; + } + + transmit_next (ch); +} + + /** * Request a modifier from a client to transmit. * @@ -332,32 +403,71 @@ static void master_transmit_mod (struct GNUNET_PSYC_Master *mst) { struct GNUNET_PSYC_Channel *ch = &mst->ch; - uint16_t max_data_size - = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) - ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size - : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; - uint16_t data_size = max_data_size; + uint16_t max_data_size, data_size; + char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; + struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; + int notify_ret; - struct GNUNET_MessageHeader *msg; - struct OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); - op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; - msg->type - = MSG_STATE_MODIFIER == mst->tmit->state - ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) - : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + switch (mst->tmit->state) + { + case MSG_STATE_MODIFIER: + { + struct GNUNET_PSYC_MessageModifier *mod + = (struct GNUNET_PSYC_MessageModifier *) msg; + max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; + msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); + msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); + notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, + &data_size, &mod[1], &mod->oper); + mod->name_size = strnlen ((char *) &mod[1], data_size); + if (mod->name_size < data_size) + { + mod->oper = htons (mod->oper); + mod->value_size = htons (data_size - 1 - mod->name_size); + mod->name_size = htons (mod->name_size); + } + else if (0 < data_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); + notify_ret = GNUNET_SYSERR; + } + break; + } + case MSG_STATE_MOD_CONT: + { + max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; + msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + msg->size = sizeof (struct GNUNET_MessageHeader); + notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, + &data_size, &msg[1], NULL); + break; + } + default: + GNUNET_assert (0); + } - int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, - &data_size, &msg[1]); switch (notify_ret) { case GNUNET_NO: - if (0 != data_size) - mst->tmit->state = MSG_STATE_MOD_CONT; + if (0 == data_size) + { /* Transmission paused, nothing to send. */ + ch->tmit_paused = GNUNET_YES; + return; + } + mst->tmit->state = MSG_STATE_MOD_CONT; break; case GNUNET_YES: - mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; + if (0 == data_size) + { + /* End of modifiers. */ + mst->tmit->state = MSG_STATE_DATA; + if (0 == ch->tmit_ack_pending) + master_transmit_data (mst); + + return; + } + mst->tmit->state = MSG_STATE_MODIFIER; break; default: @@ -368,36 +478,18 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); msg->size = htons (sizeof (*msg)); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); - transmit_next (ch); + queue_message (ch, msg, GNUNET_YES); return; } - if ((GNUNET_NO == notify_ret && 0 == data_size)) - { - /* Transmission paused, nothing to send. */ - ch->tmit_paused = GNUNET_YES; - GNUNET_free (op); - } - if (0 < data_size) { - GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); - msg->size = htons (sizeof (*msg) + data_size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); - } - - /* End of message. */ - if (GNUNET_YES == notify_ret) - { - op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); - op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); - msg->size = htons (sizeof (*msg)); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + GNUNET_assert (data_size <= max_data_size); + msg->size = htons (msg->size + data_size); + queue_message (ch, msg, GNUNET_NO); } - transmit_next (ch); + master_transmit_mod (mst); } @@ -410,11 +502,10 @@ static void master_transmit_data (struct GNUNET_PSYC_Master *mst) { struct GNUNET_PSYC_Channel *ch = &mst->ch; - struct GNUNET_MessageHeader *msg; uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; - struct OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); - op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; + char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; + struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; + msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, @@ -426,7 +517,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) { /* Transmission paused, nothing to send. */ ch->tmit_paused = GNUNET_YES; - GNUNET_free (op); + return; } break; @@ -441,9 +532,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) mst->tmit->state = MSG_STATE_START; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); msg->size = htons (sizeof (*msg)); - - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); - transmit_next (ch); + queue_message (ch, msg, GNUNET_YES); return; } @@ -451,20 +540,16 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) { GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); msg->size = htons (sizeof (*msg) + data_size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + queue_message (ch, msg, !notify_ret); } /* End of message. */ if (GNUNET_YES == notify_ret) { - op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); - op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); msg->size = htons (sizeof (*msg)); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + queue_message (ch, msg, GNUNET_YES); } - - transmit_next (ch); } @@ -476,57 +561,55 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) */ static void handle_psyc_message (struct GNUNET_PSYC_Channel *ch, - const struct GNUNET_PSYC_MessageHeader *pmsg) + const struct GNUNET_PSYC_MessageHeader *msg) { - const struct GNUNET_MessageHeader *msg; - uint16_t msize = ntohs (pmsg->header.size); - uint16_t pos = 0; - uint16_t size = 0; - uint16_t type, size_eq, size_min; + uint16_t size = ntohs (msg->header.size); if (MSG_STATE_START == ch->recv_state) { - ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); - ch->recv_flags = ntohl (pmsg->flags); + ch->recv_message_id = GNUNET_ntohll (msg->message_id); + ch->recv_flags = ntohl (msg->flags); } - else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) + else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) { LOG (GNUNET_ERROR_TYPE_WARNING, "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", - GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); + GNUNET_ntohll (msg->message_id), ch->recv_message_id); GNUNET_break_op (0); recv_error (ch); } - else if (ntohl (pmsg->flags) != ch->recv_flags) + else if (ntohl (msg->flags) != ch->recv_flags) { LOG (GNUNET_ERROR_TYPE_WARNING, "Unexpected message flags. Got: %lu, expected: %lu\n", - ntohl (pmsg->flags), ch->recv_flags); + ntohl (msg->flags), ch->recv_flags); GNUNET_break_op (0); recv_error (ch); } - for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) + uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; + + for (pos = 0; sizeof (*msg) + pos < size; pos += psize) { - msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - size = ntohs (msg->size); - type = ntohs (msg->type); + const struct GNUNET_MessageHeader *pmsg + = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); + psize = ntohs (pmsg->size); + ptype = ntohs (pmsg->type); size_eq = size_min = 0; - if (msize < sizeof (*pmsg) + pos + size) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message part of type %u and size %u from PSYC.\n", + ptype, psize); + + if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Discarding message of type %u with invalid size. " - "(%u < %u + %u + %u)\n", ntohs (msg->type), - msize, sizeof (*msg), pos, size); + "Discarding message of type %u with invalid size %u.\n", + ptype, psize); break; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message part of type %u and size %u from PSYC.\n", - ntohs (msg->type), size); - - switch (type) + switch (ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: size_min = sizeof (struct GNUNET_PSYC_MessageMethod); @@ -534,6 +617,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: size_min = sizeof (struct GNUNET_PSYC_MessageModifier); break; + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: size_min = sizeof (struct GNUNET_MessageHeader); break; @@ -543,22 +627,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, break; } - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) + if (! ((0 < size_eq && psize == size_eq) + || (0 < size_min && size_min <= psize))) { GNUNET_break (0); reschedule_connect (ch); return; } - switch (type) + switch (ptype) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { struct GNUNET_PSYC_MessageMethod *meth - = (struct GNUNET_PSYC_MessageMethod *) msg; + = (struct GNUNET_PSYC_MessageMethod *) pmsg; - if (MSG_STATE_HEADER != ch->recv_state) + if (MSG_STATE_START != ch->recv_state) { LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding out of order message method.\n"); @@ -568,89 +652,66 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, */ GNUNET_break_op (0); recv_error (ch); - break; + return; } - if ('\0' != (char *) meth + msg->size - 1) + if ('\0' != *((char *) meth + psize - 1)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding message with malformed method. " "Message ID: %" PRIu64 "\n", ch->recv_message_id); GNUNET_break_op (0); recv_error (ch); - break; + return; } - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); - ch->recv_state = MSG_STATE_METHOD; break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { - if (MSG_STATE_MODIFIER != ch->recv_state) + if (!(MSG_STATE_METHOD == ch->recv_state + || MSG_STATE_MODIFIER == ch->recv_state + || MSG_STATE_MOD_CONT == ch->recv_state)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding out of order message modifier.\n"); GNUNET_break_op (0); recv_error (ch); - break; + return; } struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) msg; + = (struct GNUNET_PSYC_MessageModifier *) pmsg; uint16_t name_size = ntohs (mod->name_size); ch->recv_mod_value_size_expected = ntohs (mod->value_size); - ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; + ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; - if (size < sizeof (*mod) + name_size + 1 - || '\0' != (char *) &mod[1] + mod->name_size + if (psize < sizeof (*mod) + name_size + 1 + || '\0' != *((char *) &mod[1] + name_size) || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); GNUNET_break_op (0); - break; + return; } - ch->recv_state = MSG_STATE_MODIFIER; - - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); - break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: { - ch->recv_mod_value_size += size - sizeof (*msg); + ch->recv_mod_value_size += psize - sizeof (*pmsg); - if (MSG_STATE_MODIFIER != ch->recv_state + if (!(MSG_STATE_MODIFIER == ch->recv_state + || MSG_STATE_MOD_CONT == ch->recv_state) || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding out of order message modifier continuation.\n"); GNUNET_break_op (0); recv_reset (ch); - break; + return; } - - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: @@ -662,12 +723,23 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, "Discarding out of order message data fragment.\n"); GNUNET_break_op (0); recv_reset (ch); - break; + return; } - ch->recv_state = MSG_STATE_DATA; break; } + } + + GNUNET_PSYC_MessageCallback message_cb + = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC + ? ch->hist_message_cb + : ch->message_cb; + + if (NULL != message_cb) + message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); + + switch (ptype) + { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: recv_reset (ch); @@ -717,18 +789,7 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: size_min = sizeof (struct GNUNET_PSYC_MessageHeader); break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - size_min = sizeof (struct GNUNET_PSYC_MessageMethod); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - size_min = sizeof (struct GNUNET_PSYC_MessageModifier); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - size_min = sizeof (struct GNUNET_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: size_eq = sizeof (struct GNUNET_MessageHeader); break; } @@ -761,9 +822,15 @@ message_handler (void *cls, #endif break; } - case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: { - ch->tmit_ack_pending = GNUNET_NO; + if (0 == ch->tmit_ack_pending) + { + LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); + GNUNET_break (0); + break; + } + ch->tmit_ack_pending--; if (ch->is_master) { @@ -771,10 +838,6 @@ message_handler (void *cls, switch (mst->tmit->state) { case MSG_STATE_MODIFIER: - if (GNUNET_NO == ch->tmit_paused) - master_transmit_mod (mst); - break; - case MSG_STATE_MOD_CONT: if (GNUNET_NO == ch->tmit_paused) master_transmit_mod (mst); @@ -795,12 +858,13 @@ message_handler (void *cls, else { LOG (GNUNET_ERROR_TYPE_WARNING, - "Ignoring transmit ack, there's no transmission going on.\n"); + "Ignoring message ACK, there's no transmission going on.\n"); + GNUNET_break (0); } break; default: - LOG (GNUNET_ERROR_TYPE_WARNING, - "Ignoring unexpected transmit ack.\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring message ACK in state %u.\n", mst->tmit->state); } } else @@ -811,12 +875,15 @@ message_handler (void *cls, } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: - handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); + handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); break; } - GNUNET_CLIENT_receive (ch->client, &message_handler, ch, - GNUNET_TIME_UNIT_FOREVER_REL); + if (NULL != ch->client) + { + GNUNET_CLIENT_receive (ch->client, &message_handler, ch, + GNUNET_TIME_UNIT_FOREVER_REL); + } } @@ -1029,6 +1096,8 @@ void GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) { disconnect (master); + if (NULL != master->tmit) + GNUNET_free (master->tmit); GNUNET_free (master); } @@ -1069,30 +1138,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, } -/* FIXME: split up value into <64K chunks and transmit the continuations in - * MOD_CONT msgs */ -static int -send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) -{ - struct GNUNET_PSYC_Channel *ch = cls; - size_t name_size = strlen (mod->name) + 1; - struct GNUNET_PSYC_MessageModifier *pmod; - struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod) - + name_size + mod->value_size); - pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) pmod; - - pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); - pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); - pmod->name_size = htons (name_size); - memcpy (&pmod[1], mod->name, name_size); - memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); - - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); - return GNUNET_YES; -} - - /** * Send a message to call a method to all members in the PSYC channel. * @@ -1107,7 +1152,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) struct GNUNET_PSYC_MasterTransmitHandle * GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, const char *method_name, - GNUNET_PSYC_MasterTransmitNotify notify_mod, + GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, GNUNET_PSYC_MasterTransmitNotify notify_data, void *notify_cls, enum GNUNET_PSYC_MasterTransmitFlags flags) @@ -1120,25 +1165,27 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; - struct OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); - pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) pmeth; + struct OperationHandle *op; + ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + + sizeof (*pmeth) + size); + op->msg = (struct GNUNET_MessageHeader *) &op[1]; + op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; + + pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + size); pmeth->flags = htonl (flags); memcpy (&pmeth[1], method_name, size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); - transmit_next (ch); - master->tmit = GNUNET_malloc (sizeof (*master->tmit)); master->tmit->master = master; master->tmit->notify_mod = notify_mod; master->tmit->notify_data = notify_data; master->tmit->notify_cls = notify_cls; - master->tmit->state = MSG_STATE_START; // FIXME + master->tmit->state = MSG_STATE_MODIFIER; + + master_transmit_mod (master); return master->tmit; } @@ -1152,7 +1199,7 @@ void GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) { struct GNUNET_PSYC_Channel *ch = &th->master->ch; - if (GNUNET_NO == ch->tmit_ack_pending) + if (0 == ch->tmit_ack_pending) { ch->tmit_paused = GNUNET_NO; master_transmit_data (th->master); diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 704819c50..33684b125 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -25,6 +25,8 @@ * @author Christian Grothoff */ +#include + #include "platform.h" #include "gnunet_crypto_lib.h" #include "gnunet_common.h" @@ -35,7 +37,7 @@ #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) -#define DEBUG_SERVICE 1 +#define DEBUG_SERVICE 0 /** @@ -62,17 +64,37 @@ static struct GNUNET_CRYPTO_EddsaPublicKey slave_pub_key; struct GNUNET_PSYC_MasterTransmitHandle *mth; +struct TransmitClosure +{ + struct GNUNET_PSYC_MasterTransmitHandle *handle; + struct GNUNET_ENV_Environment *env; + char *data[16]; + const char *mod_value; + size_t mod_value_size; + uint8_t data_count; + uint8_t paused; + uint8_t n; +}; + +struct TransmitClosure *tmit; + /** * Clean up all resources used. */ static void cleanup () { - if (mst != NULL) + if (NULL != mst) { GNUNET_PSYC_master_stop (mst); mst = NULL; } + if (NULL != tmit) + { + GNUNET_ENV_environment_destroy (tmit->env); + GNUNET_free (tmit); + tmit = NULL; + } GNUNET_SCHEDULER_shutdown (); } @@ -121,44 +143,40 @@ end () } -static int -method (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t message_id, const char *name, - size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, - uint64_t data_offset, const void *data, size_t data_size, - enum GNUNET_PSYC_MessageFlags flags) +static void +message (void *cls, uint64_t message_id, uint32_t flags, + const struct GNUNET_MessageHeader *msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Method: %s, modifiers: %lu, flags: %u\n%.*s\n", - name, modifier_count, flags, data_size, data); - return GNUNET_OK; -} + if (NULL == msg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error while receiving message %llu\n", message_id); + return; + } + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); -static int -join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const char *method_name, - size_t variable_count, const struct GNUNET_ENV_Modifier *variables, - const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh) -{ - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got message part of type %u and size %u " + "belonging to message ID %llu with flags %u\n", + type, size, message_id, flags); + + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) + end (); } -struct TransmitClosure +static void +join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const char *method_name, + size_t variable_count, const struct GNUNET_ENV_Modifier *variables, + const void *data, size_t data_size, + struct GNUNET_PSYC_JoinHandle *jh) { - struct GNUNET_PSYC_MasterTransmitHandle *handle; - - char *mod_names[16]; - char *mod_values[16]; - char *data[16]; - - uint8_t mod_count; - uint8_t data_count; - - uint8_t paused; - uint8_t n; -}; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got join request."); +} static void @@ -172,45 +190,95 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static int -tmit_notify_mod (void *cls, size_t *data_size, void *data) +tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) { struct TransmitClosure *tmit = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmit notify modifier: %lu bytes available, " - "processing modifier %u/%u.\n", - *data_size, tmit->n + 1, tmit->fragment_count); - /* FIXME: continuation */ - uint16_t name_size = strlen (tmit->mod_names[tmit->n]); - uint16_t value_size = strlen (tmit->mod_values[tmit->n]); - if (name_size + 1 + value_size <= *data_size) - return GNUNET_NO; - - *data_size = name_size + 1 + value_size; - memcpy (data, tmit->fragments[tmit->n], *data_size); - - if (++tmit->n < tmit->mod_count) - { - return GNUNET_NO; + "%u modifiers left to process.\n", + *data_size, GNUNET_ENV_environment_get_count (tmit->env)); + + enum GNUNET_ENV_Operator op = 0; + const char *name = NULL; + const char *value = NULL; + uint16_t name_size = 0; + size_t value_size = 0; + + if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) + { /* Modifier continuation */ + value = tmit->mod_value; + if (tmit->mod_value_size <= *data_size) + { + value_size = tmit->mod_value_size; + tmit->mod_value = NULL; + } + else + { + value_size = *data_size; + tmit->mod_value += value_size; + } + tmit->mod_value_size -= value_size; + + if (*data_size < value_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "value larger than buffer: %u < %zu\n", + *data_size, value_size); + *data_size = 0; + return GNUNET_NO; + } + + *data_size = value_size; + memcpy (data, value, value_size); } - else + else if (NULL != oper) { - tmit->n = 0; - return GNUNET_YES; + if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, + (void *) &value, &value_size)) + { /* No more modifiers, continue with data */ + *data_size = 0; + return GNUNET_YES; + } + + *oper = op; + name_size = strlen (name); + + if (name_size + 1 + value_size <= *data_size) + { + *data_size = name_size + 1 + value_size; + } + else + { + tmit->mod_value_size = value_size; + value_size = *data_size - name_size - 1; + tmit->mod_value_size -= value_size; + tmit->mod_value = value + value_size; + } + + memcpy (data, name, name_size); + ((char *)data)[name_size] = '\0'; + memcpy ((char *)data + name_size + 1, value, value_size); } + + return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; } static int -tmit_notify_data (void *cls, size_t *data_size, void *data) +tmit_notify_data (void *cls, uint16_t *data_size, void *data) { struct TransmitClosure *tmit = cls; + uint16_t size = strlen (tmit->data[tmit->n]); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmit notify data: %lu bytes available, " - "processing fragment %u/%u.\n", - *data_size, tmit->n + 1, tmit->fragment_count); - uint16_t size = strlen (tmit->data[tmit->n]); - if (size <= *data_size) - return GNUNET_NO; + "processing fragment %u/%u (size %u).\n", + *data_size, tmit->n + 1, tmit->data_count, size); + if (*data_size < size) + { + *data_size = 0; + GNUNET_assert (0); + return GNUNET_SYSERR; + } if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) { @@ -231,19 +299,18 @@ tmit_notify_data (void *cls, size_t *data_size, void *data) } -void +static void master_started (void *cls, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Master started: %lu\n", max_message_id); - - struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); - GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, - "_foo", "bar baz", 7); - GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, - "_foo_bar", "foo bar baz", 11); - - struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); + "Master started: %" PRIu64 "\n", max_message_id); + + tmit = GNUNET_new (struct TransmitClosure); + tmit->env = GNUNET_ENV_environment_create (); + GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, + "_foo", "bar baz", 7); + GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, + "_foo_bar", "foo bar baz", 11); tmit->data[0] = "foo"; tmit->data[1] = "foo bar"; tmit->data[2] = "foo bar baz"; @@ -255,7 +322,7 @@ master_started (void *cls, uint64_t max_message_id) } -void +static void slave_joined (void *cls, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); @@ -288,19 +355,19 @@ run (void *cls, GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); - mst = GNUNET_PSYC_master_start (cfg, channel_key, - GNUNET_PSYC_CHANNEL_PRIVATE, - &method, &join, &master_started, NULL); - return; + mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, + &message, &join_request, &master_started, NULL); + return; /* FIXME: test slave */ + struct GNUNET_PeerIdentity origin; struct GNUNET_PeerIdentity relays[16]; struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); - GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, - "_foo", "bar baz", 7); - GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, - "_foo_bar", "foo bar baz", 11); + GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, + "_foo", "bar baz", 7); + GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, + "_foo_bar", "foo bar baz", 11); slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, - 16, relays, &method, &join, &slave_joined, + 16, relays, &message, &join_request, &slave_joined, NULL, "_request_join", env, "some data", 9); GNUNET_ENV_environment_destroy (env); } @@ -319,8 +386,7 @@ main (int argc, char *argv[]) opts, &run, NULL)) return 1; #else - if (0 != GNUNET_TESTING_service_run ("test-psyc", "psyc", - "test_psyc.conf", &run, NULL)) + if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL)) return 1; #endif return res; diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf index 1e646239a..7a1eb8404 100644 --- a/src/psyc/test_psyc.conf +++ b/src/psyc/test_psyc.conf @@ -8,3 +8,10 @@ BINARY = gnunet-service-psyc UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock UNIX_MATCH_UID = NO UNIX_MATCH_GID = YES + +[psycstore] +AUTOSTART = YES +BINARY = gnunet-service-psycstore +UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock +UNIX_MATCH_UID = NO +UNIX_MATCH_GID = YES -- cgit v1.2.3