aboutsummaryrefslogtreecommitdiff
path: root/src/psycutil/psyc_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psycutil/psyc_message.c')
-rw-r--r--src/psycutil/psyc_message.c1355
1 files changed, 0 insertions, 1355 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c
deleted file mode 100644
index a03eff47f..000000000
--- a/src/psycutil/psyc_message.c
+++ /dev/null
@@ -1,1355 +0,0 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psycutil/psyc_message.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_psyc_util_lib.h"
32#include "gnunet_psyc_service.h"
33
34#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37struct GNUNET_PSYC_TransmitHandle
38{
39 /**
40 * Client connection to service.
41 */
42 struct GNUNET_MQ_Handle *mq;
43
44 /**
45 * Message currently being received from the client.
46 */
47 struct GNUNET_MessageHeader *msg;
48
49 /**
50 * Envelope for @a msg
51 */
52 struct GNUNET_MQ_Envelope *env;
53
54 /**
55 * Callback to request next modifier from client.
56 */
57 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
58
59 /**
60 * Closure for the notify callbacks.
61 */
62 void *notify_mod_cls;
63
64 /**
65 * Callback to request next data fragment from client.
66 */
67 GNUNET_PSYC_TransmitNotifyData notify_data;
68
69 /**
70 * Closure for the notify callbacks.
71 */
72 void *notify_data_cls;
73
74 /**
75 * Modifier of the environment that is currently being transmitted.
76 */
77 struct GNUNET_PSYC_Modifier *mod;
78
79 /**
80 *
81 */
82 const char *mod_value;
83
84 /**
85 * Number of bytes remaining to be transmitted from the current modifier value.
86 */
87 uint32_t mod_value_remaining;
88
89 /**
90 * State of the current message being received from client.
91 */
92 enum GNUNET_PSYC_MessageState state;
93
94 /**
95 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
96 */
97 uint8_t acks_pending;
98
99 /**
100 * Is transmission paused?
101 */
102 uint8_t paused;
103
104 /**
105 * Are we currently transmitting a message?
106 */
107 uint8_t in_transmit;
108
109 /**
110 * Notify callback is currently being called.
111 */
112 uint8_t in_notify;
113
114};
115
116
117
118struct GNUNET_PSYC_ReceiveHandle
119{
120 /**
121 * Message callback.
122 */
123 GNUNET_PSYC_MessageCallback message_cb;
124
125 /**
126 * Message part callback.
127 */
128 GNUNET_PSYC_MessagePartCallback message_part_cb;
129
130 /**
131 * Closure for the callbacks.
132 */
133 void *cb_cls;
134
135 /**
136 * ID of the message being received from the PSYC service.
137 */
138 uint64_t message_id;
139
140 /**
141 * Public key of the slave from which a message is being received.
142 */
143 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
144
145 /**
146 * State of the currently being received message from the PSYC service.
147 */
148 enum GNUNET_PSYC_MessageState state;
149
150 /**
151 * Flags for the currently being received message from the PSYC service.
152 */
153 enum GNUNET_PSYC_MessageFlags flags;
154
155 /**
156 * Expected value size for the modifier being received from the PSYC service.
157 */
158 uint32_t mod_value_size_expected;
159
160 /**
161 * Actual value size for the modifier being received from the PSYC service.
162 */
163 uint32_t mod_value_size;
164};
165
166
167/**** Messages ****/
168
169
170/**
171 * Create a PSYC message.
172 *
173 * @param method_name
174 * PSYC method for the message.
175 * @param env
176 * Environment for the message.
177 * @param data
178 * Data payload for the message.
179 * @param data_size
180 * Size of @a data.
181 *
182 * @return Message header with size information,
183 * followed by the message parts.
184 */
185struct GNUNET_PSYC_Message *
186GNUNET_PSYC_message_create (const char *method_name,
187 const struct GNUNET_PSYC_Environment *env,
188 const void *data,
189 size_t data_size)
190{
191 struct GNUNET_PSYC_Modifier *mod = NULL;
192 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
193 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
194 struct GNUNET_MessageHeader *pmsg = NULL;
195 uint16_t env_size = 0;
196 if (NULL != env)
197 {
198 mod = GNUNET_PSYC_env_head (env);
199 while (NULL != mod)
200 {
201 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
202 mod = mod->next;
203 }
204 }
205
206 struct GNUNET_PSYC_Message *msg;
207 uint16_t method_name_size = strlen (method_name) + 1;
208 if (method_name_size == 1)
209 return NULL;
210
211 uint16_t msg_size = sizeof (*msg) /* header */
212 + sizeof (*pmeth) + method_name_size /* method */
213 + env_size /* modifiers */
214 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
215 + sizeof (*pmsg); /* end of message */
216 msg = GNUNET_malloc (msg_size);
217 msg->header.size = htons (msg_size);
218 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
219
220 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
221 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
222 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
223 GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
224
225 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
226 if (NULL != env)
227 {
228 mod = GNUNET_PSYC_env_head (env);
229 while (NULL != mod)
230 {
231 uint16_t mod_name_size = strlen (mod->name) + 1;
232 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
233 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
234 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
235 p += pmod->header.size;
236 pmod->header.size = htons (pmod->header.size);
237
238 pmod->oper = mod->oper;
239 pmod->name_size = htons (mod_name_size);
240 pmod->value_size = htonl (mod->value_size);
241
242 GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
243 if (0 < mod->value_size)
244 GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
245
246 mod = mod->next;
247 }
248 }
249
250 if (0 < data_size)
251 {
252 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
253 pmsg->size = sizeof (*pmsg) + data_size;
254 p += pmsg->size;
255 pmsg->size = htons (pmsg->size);
256 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
257 GNUNET_memcpy (&pmsg[1], data, data_size);
258 }
259
260 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
261 pmsg->size = htons (sizeof (*pmsg));
262 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
263
264 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
265 return msg;
266}
267
268
269void
270GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
271 const struct GNUNET_MessageHeader *msg)
272{
273 uint16_t size = ntohs (msg->size);
274 uint16_t type = ntohs (msg->type);
275
276 GNUNET_log (kind,
277 "Message of type %d and size %u:\n",
278 type,
279 size);
280 switch (type)
281 {
282 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
283 {
284 const struct GNUNET_PSYC_MessageHeader *pmsg
285 = (const struct GNUNET_PSYC_MessageHeader *) msg;
286 GNUNET_log (kind,
287 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
288 GNUNET_ntohll (pmsg->message_id),
289 ntohl (pmsg->flags));
290 break;
291 }
292 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
293 {
294 const struct GNUNET_PSYC_MessageMethod *meth
295 = (const struct GNUNET_PSYC_MessageMethod *) msg;
296 GNUNET_log (kind,
297 "\t%.*s\n",
298 (int) (size - sizeof (*meth)),
299 (const char *) &meth[1]);
300 break;
301 }
302 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
303 {
304 const struct GNUNET_PSYC_MessageModifier *mod
305 = (const struct GNUNET_PSYC_MessageModifier *) msg;
306 uint16_t name_size = ntohs (mod->name_size);
307 char oper = ' ' < mod->oper ? mod->oper : ' ';
308 GNUNET_log (kind,
309 "\t%c%.*s\t%.*s\n",
310 oper,
311 (int) name_size,
312 (const char *) &mod[1],
313 (int) (size - sizeof (*mod) - name_size),
314 ((const char *) &mod[1]) + name_size);
315 break;
316 }
317 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
318 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
319 GNUNET_log (kind,
320 "\t%.*s\n",
321 (int) (size - sizeof (*msg)),
322 (const char *) &msg[1]);
323 break;
324 }
325}
326
327
328/**** Transmitting messages ****/
329
330
331/**
332 * Create a transmission handle.
333 */
334struct GNUNET_PSYC_TransmitHandle *
335GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
336{
337 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
338
339 tmit->mq = mq;
340 return tmit;
341}
342
343
344/**
345 * Destroy a transmission handle.
346 */
347void
348GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
349{
350 GNUNET_free (tmit);
351}
352
353
354/**
355 * Queue a message part for transmission.
356 *
357 * The message part is added to the current message buffer.
358 * When this buffer is full, it is added to the transmission queue.
359 *
360 * @param tmit
361 * Transmission handle.
362 * @param msg
363 * Message part, or NULL.
364 * @param tmit_now
365 * Transmit message now, or wait for buffer to fill up?
366 * #GNUNET_YES or #GNUNET_NO.
367 */
368static void
369transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
370 const struct GNUNET_MessageHeader *msg,
371 uint8_t tmit_now)
372{
373 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
374
375 LOG (GNUNET_ERROR_TYPE_DEBUG,
376 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
377 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
378
379 if (NULL != tmit->msg)
380 {
381 if (NULL == msg
382 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
383 {
384 /* End of message or buffer is full, add it to transmission queue
385 * and start with empty buffer */
386 tmit->msg->size = htons (tmit->msg->size);
387 GNUNET_MQ_send (tmit->mq, tmit->env);
388 tmit->env = NULL;
389 tmit->msg = NULL;
390 tmit->acks_pending++;
391 }
392 else
393 {
394 /* Message fits in current buffer, append */
395 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
396 tmit->msg->size += size;
397 }
398 }
399
400 if (NULL == tmit->msg && NULL != msg)
401 {
402 /* Empty buffer, copy over message. */
403 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406 /* store current message size in host byte order
407 * then later switch it to network byte order before sending */
408 tmit->msg->size = sizeof (*tmit->msg) + size;
409
410 GNUNET_memcpy (&tmit->msg[1], msg, size);
411 }
412
413 if (NULL != tmit->msg
414 && (GNUNET_YES == tmit_now
415 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
416 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
417 {
418 /* End of message or buffer is full, add it to transmission queue. */
419 tmit->msg->size = htons (tmit->msg->size);
420 GNUNET_MQ_send (tmit->mq, tmit->env);
421 tmit->env = NULL;
422 tmit->msg = NULL;
423 tmit->acks_pending++;
424 }
425}
426
427
428/**
429 * Request data from client to transmit.
430 *
431 * @param tmit Transmission handle.
432 */
433static void
434transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
435{
436 int notify_ret = GNUNET_YES;
437 uint16_t data_size = 0;
438 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
439 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
440 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
441
442 if (NULL != tmit->notify_data)
443 {
444 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
445 tmit->in_notify = GNUNET_YES;
446 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
447 tmit->in_notify = GNUNET_NO;
448 }
449 LOG (GNUNET_ERROR_TYPE_DEBUG,
450 "transmit_data (ret: %d, size: %u): %.*s\n",
451 notify_ret, data_size, data_size, &msg[1]);
452 switch (notify_ret)
453 {
454 case GNUNET_NO:
455 if (0 == data_size)
456 {
457 /* Transmission paused, nothing to send. */
458 tmit->paused = GNUNET_YES;
459 return;
460 }
461 break;
462
463 case GNUNET_YES:
464 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
465 break;
466
467 default:
468 LOG (GNUNET_ERROR_TYPE_ERROR,
469 "TransmitNotifyData callback returned error when requesting data.\n");
470
471 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
472 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
473 msg->size = htons (sizeof (*msg));
474 transmit_queue_insert (tmit, msg, GNUNET_YES);
475 tmit->in_transmit = GNUNET_NO;
476 return;
477 }
478
479 if (0 < data_size)
480 {
481 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
482 msg->size = htons (sizeof (*msg) + data_size);
483 transmit_queue_insert (tmit, msg, !notify_ret);
484 }
485
486 /* End of message. */
487 if (GNUNET_YES == notify_ret)
488 {
489 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
490 msg->size = htons (sizeof (*msg));
491 transmit_queue_insert (tmit, msg, GNUNET_YES);
492 /* FIXME: wait for ACK before setting in_transmit to no */
493 tmit->in_transmit = GNUNET_NO;
494 }
495}
496
497
498/**
499 * Request a modifier from a client to transmit.
500 *
501 * @param tmit Transmission handle.
502 */
503static void
504transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
505{
506 uint16_t max_data_size = 0;
507 uint16_t data_size = 0;
508 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
509 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
510 int notify_ret = GNUNET_YES;
511
512 switch (tmit->state)
513 {
514 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
515 {
516 struct GNUNET_PSYC_MessageModifier *mod
517 = (struct GNUNET_PSYC_MessageModifier *) msg;
518 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
519 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
520
521 if (NULL != tmit->notify_mod)
522 {
523 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
524 data_size = max_data_size;
525 tmit->in_notify = GNUNET_YES;
526 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
527 &mod->oper, &mod->value_size);
528 tmit->in_notify = GNUNET_NO;
529 }
530
531 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
532 LOG (GNUNET_ERROR_TYPE_DEBUG,
533 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
534 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
535 if (mod->name_size < data_size)
536 {
537 tmit->mod_value_remaining
538 = mod->value_size - (data_size - mod->name_size);
539 mod->value_size = htonl (mod->value_size);
540 mod->name_size = htons (mod->name_size);
541 }
542 else if (0 < data_size)
543 {
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
545 notify_ret = GNUNET_SYSERR;
546 }
547 break;
548 }
549 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
550 {
551 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
552 msg->size = sizeof (struct GNUNET_MessageHeader);
553
554 if (NULL != tmit->notify_mod)
555 {
556 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
557 data_size = max_data_size;
558 tmit->in_notify = GNUNET_YES;
559 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
560 &data_size, &msg[1], NULL, NULL);
561 tmit->in_notify = GNUNET_NO;
562 }
563 tmit->mod_value_remaining -= data_size;
564 LOG (GNUNET_ERROR_TYPE_DEBUG,
565 "transmit_mod (ret: %d, size: %u): %.*s\n",
566 notify_ret, data_size, data_size, &msg[1]);
567 break;
568 }
569 default:
570 GNUNET_assert (0);
571 }
572
573 switch (notify_ret)
574 {
575 case GNUNET_NO:
576 if (0 == data_size)
577 { /* Transmission paused, nothing to send. */
578 tmit->paused = GNUNET_YES;
579 return;
580 }
581 tmit->state
582 = (0 == tmit->mod_value_remaining)
583 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
584 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
585 break;
586
587 case GNUNET_YES: /* End of modifiers. */
588 GNUNET_assert (0 == tmit->mod_value_remaining);
589 break;
590
591 default:
592 LOG (GNUNET_ERROR_TYPE_ERROR,
593 "TransmitNotifyModifier callback returned with error.\n");
594
595 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
596 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
597 msg->size = htons (sizeof (*msg));
598 transmit_queue_insert (tmit, msg, GNUNET_YES);
599 tmit->in_transmit = GNUNET_NO;
600 return;
601 }
602
603 if (0 < data_size)
604 {
605 GNUNET_assert (data_size <= max_data_size);
606 msg->size = htons (msg->size + data_size);
607 transmit_queue_insert (tmit, msg, GNUNET_NO);
608 }
609
610 if (GNUNET_YES == notify_ret)
611 {
612 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
613 if (0 == tmit->acks_pending)
614 transmit_data (tmit);
615 }
616 else
617 {
618 transmit_mod (tmit);
619 }
620}
621
622
623int
624transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
625 uint32_t *full_value_size)
626
627{
628 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
629 uint16_t name_size = 0;
630 uint32_t value_size = 0;
631 const char *value = NULL;
632
633 if (NULL != oper)
634 { /* New modifier */
635 if (NULL != tmit->mod)
636 tmit->mod = tmit->mod->next;
637 if (NULL == tmit->mod)
638 { /* No more modifiers, continue with data */
639 *data_size = 0;
640 return GNUNET_YES;
641 }
642
643 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
644 *full_value_size = tmit->mod->value_size;
645 *oper = tmit->mod->oper;
646 name_size = strlen (tmit->mod->name) + 1;
647
648 if (name_size + tmit->mod->value_size <= *data_size)
649 {
650 value_size = tmit->mod->value_size;
651 *data_size = name_size + value_size;
652 }
653 else /* full modifier does not fit in data, continuation needed */
654 {
655 value_size = *data_size - name_size;
656 tmit->mod_value = tmit->mod->value + value_size;
657 }
658
659 GNUNET_memcpy (data, tmit->mod->name, name_size);
660 GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
661 return GNUNET_NO;
662 }
663 else
664 { /* Modifier continuation */
665 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
666 value = tmit->mod_value;
667 if (tmit->mod_value_remaining <= *data_size)
668 {
669 value_size = tmit->mod_value_remaining;
670 tmit->mod_value = NULL;
671 }
672 else
673 {
674 value_size = *data_size;
675 tmit->mod_value += value_size;
676 }
677
678 if (*data_size < value_size)
679 {
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Value in environment larger than buffer: %u < %zu\n",
682 *data_size, value_size);
683 *data_size = 0;
684 return GNUNET_NO;
685 }
686
687 *data_size = value_size;
688 GNUNET_memcpy (data, value, value_size);
689 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
690 }
691}
692
693
694/**
695 * Transmit a message.
696 *
697 * @param tmit
698 * Transmission handle.
699 * @param method_name
700 * Which method should be invoked.
701 * @param env
702 * Environment for the message.
703 * Should stay available until the first call to notify_data.
704 * Can be NULL if there are no modifiers or @a notify_mod is
705 * provided instead.
706 * @param notify_mod
707 * Function to call to obtain modifiers.
708 * Can be NULL if there are no modifiers or @a env is provided instead.
709 * @param notify_data
710 * Function to call to obtain fragments of the data.
711 * @param notify_cls
712 * Closure for @a notify_mod and @a notify_data.
713 * @param flags
714 * Flags for the message being transmitted.
715 *
716 * @return #GNUNET_OK if the transmission was started.
717 * #GNUNET_SYSERR if another transmission is already going on.
718 */
719int
720GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
721 const char *method_name,
722 const struct GNUNET_PSYC_Environment *env,
723 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
724 GNUNET_PSYC_TransmitNotifyData notify_data,
725 void *notify_cls,
726 uint32_t flags)
727{
728 if (GNUNET_NO != tmit->in_transmit)
729 return GNUNET_SYSERR;
730 tmit->in_transmit = GNUNET_YES;
731
732 size_t size = strlen (method_name) + 1;
733 struct GNUNET_PSYC_MessageMethod *pmeth;
734
735 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738 /* store current message size in host byte order
739 * then later switch it to network byte order before sending */
740 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
741
742 if (NULL != notify_mod)
743 {
744 tmit->notify_mod = notify_mod;
745 tmit->notify_mod_cls = notify_cls;
746 }
747 else
748 {
749 tmit->notify_mod = &transmit_notify_env;
750 tmit->notify_mod_cls = tmit;
751 if (NULL != env)
752 {
753 struct GNUNET_PSYC_Modifier mod = {};
754 mod.next = GNUNET_PSYC_env_head (env);
755 tmit->mod = &mod;
756
757 struct GNUNET_PSYC_Modifier *m = tmit->mod;
758 while (NULL != (m = m->next))
759 {
760 if (m->oper != GNUNET_PSYC_OP_SET)
761 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
762 }
763 }
764 else
765 {
766 tmit->mod = NULL;
767 }
768 }
769
770 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
771 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
772 pmeth->header.size = htons (sizeof (*pmeth) + size);
773 pmeth->flags = htonl (flags);
774 GNUNET_memcpy (&pmeth[1], method_name, size);
775
776 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
777 tmit->notify_data = notify_data;
778 tmit->notify_data_cls = notify_cls;
779
780 transmit_mod (tmit);
781 return GNUNET_OK;
782}
783
784
785/**
786 * Resume transmission.
787 *
788 * @param tmit Transmission handle.
789 */
790void
791GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
792{
793 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
794 return;
795
796 if (0 == tmit->acks_pending)
797 {
798 tmit->paused = GNUNET_NO;
799 transmit_data (tmit);
800 }
801}
802
803
804/**
805 * Abort transmission request.
806 *
807 * @param tmit Transmission handle.
808 */
809void
810GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
811{
812 if (GNUNET_NO == tmit->in_transmit)
813 return;
814
815 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
816 tmit->in_transmit = GNUNET_NO;
817 tmit->paused = GNUNET_NO;
818
819 /* FIXME */
820 struct GNUNET_MessageHeader msg;
821 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
822 msg.size = htons (sizeof (msg));
823 transmit_queue_insert (tmit, &msg, GNUNET_YES);
824}
825
826
827/**
828 * Got acknowledgement of a transmitted message part, continue transmission.
829 *
830 * @param tmit Transmission handle.
831 */
832void
833GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
834{
835 if (0 == tmit->acks_pending)
836 {
837 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
838 GNUNET_break (0);
839 return;
840 }
841 tmit->acks_pending--;
842
843 if (GNUNET_YES == tmit->paused)
844 return;
845
846 switch (tmit->state)
847 {
848 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
849 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
850 transmit_mod (tmit);
851 break;
852
853 case GNUNET_PSYC_MESSAGE_STATE_DATA:
854 transmit_data (tmit);
855 break;
856
857 case GNUNET_PSYC_MESSAGE_STATE_END:
858 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
859 break;
860
861 default:
862 LOG (GNUNET_ERROR_TYPE_DEBUG,
863 "Ignoring message ACK in state %u.\n", tmit->state);
864 }
865}
866
867
868/**** Receiving messages ****/
869
870
871/**
872 * Create handle for receiving messages.
873 */
874struct GNUNET_PSYC_ReceiveHandle *
875GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
876 GNUNET_PSYC_MessagePartCallback message_part_cb,
877 void *cb_cls)
878{
879 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
880 recv->message_cb = message_cb;
881 recv->message_part_cb = message_part_cb;
882 recv->cb_cls = cb_cls;
883 return recv;
884}
885
886
887/**
888 * Destroy handle for receiving messages.
889 */
890void
891GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
892{
893 GNUNET_free (recv);
894}
895
896
897/**
898 * Reset stored data related to the last received message.
899 */
900void
901GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
902{
903 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
904 recv->flags = 0;
905 recv->message_id = 0;
906 recv->mod_value_size = 0;
907 recv->mod_value_size_expected = 0;
908}
909
910
911static void
912recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
913{
914 if (NULL != recv->message_part_cb)
915 recv->message_part_cb (recv->cb_cls, NULL, NULL);
916
917 if (NULL != recv->message_cb)
918 recv->message_cb (recv->cb_cls, NULL);
919
920 GNUNET_PSYC_receive_reset (recv);
921}
922
923
924/**
925 * Handle incoming PSYC message.
926 *
927 * @param recv Receive handle.
928 * @param msg The message.
929 *
930 * @return #GNUNET_OK on success,
931 * #GNUNET_SYSERR on receive error.
932 */
933int
934GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
935 const struct GNUNET_PSYC_MessageHeader *msg)
936{
937 uint16_t size = ntohs (msg->header.size);
938 uint32_t flags = ntohl (msg->flags);
939
940 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
941 (struct GNUNET_MessageHeader *) msg);
942
943 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
944 {
945 recv->message_id = GNUNET_ntohll (msg->message_id);
946 recv->flags = flags;
947 recv->slave_pub_key = msg->slave_pub_key;
948 recv->mod_value_size = 0;
949 recv->mod_value_size_expected = 0;
950 }
951 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
952 {
953 // FIXME
954 LOG (GNUNET_ERROR_TYPE_WARNING,
955 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
956 GNUNET_ntohll (msg->message_id), recv->message_id);
957 GNUNET_break_op (0);
958 recv_error (recv);
959 return GNUNET_SYSERR;
960 }
961 else if (flags != recv->flags)
962 {
963 LOG (GNUNET_ERROR_TYPE_WARNING,
964 "Unexpected message flags. Got: %lu, expected: %lu\n",
965 flags, recv->flags);
966 GNUNET_break_op (0);
967 recv_error (recv);
968 return GNUNET_SYSERR;
969 }
970
971 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
972
973 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
974 {
975 const struct GNUNET_MessageHeader *pmsg
976 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
977 psize = ntohs (pmsg->size);
978 ptype = ntohs (pmsg->type);
979 size_eq = size_min = 0;
980
981 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
982 {
983 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
984 "Dropping message of type %u with invalid size %u.\n",
985 ptype, psize);
986 recv_error (recv);
987 return GNUNET_SYSERR;
988 }
989
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "Received message part of type %u and size %u from PSYC.\n",
992 ptype, psize);
993 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
994
995 switch (ptype)
996 {
997 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
998 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
999 break;
1000 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1001 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1002 break;
1003 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1004 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1005 size_min = sizeof (struct GNUNET_MessageHeader);
1006 break;
1007 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1008 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1009 size_eq = sizeof (struct GNUNET_MessageHeader);
1010 break;
1011 default:
1012 GNUNET_break_op (0);
1013 recv_error (recv);
1014 return GNUNET_SYSERR;
1015 }
1016
1017 if (! ((0 < size_eq && psize == size_eq)
1018 || (0 < size_min && size_min <= psize)))
1019 {
1020 GNUNET_break_op (0);
1021 recv_error (recv);
1022 return GNUNET_SYSERR;
1023 }
1024
1025 switch (ptype)
1026 {
1027 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1028 {
1029 struct GNUNET_PSYC_MessageMethod *meth
1030 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1031
1032 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1033 {
1034 LOG (GNUNET_ERROR_TYPE_WARNING,
1035 "Dropping out of order message method (%u).\n",
1036 recv->state);
1037 /* It is normal to receive an incomplete message right after connecting,
1038 * but should not happen later.
1039 * FIXME: add a check for this condition.
1040 */
1041 GNUNET_break_op (0);
1042 recv_error (recv);
1043 return GNUNET_SYSERR;
1044 }
1045
1046 if ('\0' != *((char *) meth + psize - 1))
1047 {
1048 LOG (GNUNET_ERROR_TYPE_WARNING,
1049 "Dropping message with malformed method. "
1050 "Message ID: %" PRIu64 "\n", recv->message_id);
1051 GNUNET_break_op (0);
1052 recv_error (recv);
1053 return GNUNET_SYSERR;
1054 }
1055 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1056 break;
1057 }
1058 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1059 {
1060 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1061 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1062 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1063 {
1064 LOG (GNUNET_ERROR_TYPE_WARNING,
1065 "Dropping out of order message modifier (%u).\n",
1066 recv->state);
1067 GNUNET_break_op (0);
1068 recv_error (recv);
1069 return GNUNET_SYSERR;
1070 }
1071
1072 struct GNUNET_PSYC_MessageModifier *mod
1073 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1074
1075 uint16_t name_size = ntohs (mod->name_size);
1076 recv->mod_value_size_expected = ntohl (mod->value_size);
1077 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1078
1079 if (psize < sizeof (*mod) + name_size
1080 || '\0' != *((char *) &mod[1] + name_size - 1)
1081 || recv->mod_value_size_expected < recv->mod_value_size)
1082 {
1083 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1084 GNUNET_break_op (0);
1085 recv_error (recv);
1086 return GNUNET_SYSERR;
1087 }
1088 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1089 break;
1090 }
1091 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1092 {
1093 recv->mod_value_size += psize - sizeof (*pmsg);
1094
1095 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1096 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1097 || recv->mod_value_size_expected < recv->mod_value_size)
1098 {
1099 LOG (GNUNET_ERROR_TYPE_WARNING,
1100 "Dropping out of order message modifier continuation "
1101 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1102 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1103 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1104 recv->mod_value_size_expected, recv->mod_value_size);
1105 GNUNET_break_op (0);
1106 recv_error (recv);
1107 return GNUNET_SYSERR;
1108 }
1109 break;
1110 }
1111 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1112 {
1113 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1114 || recv->mod_value_size_expected != recv->mod_value_size)
1115 {
1116 LOG (GNUNET_ERROR_TYPE_WARNING,
1117 "Dropping out of order message data fragment "
1118 "(%u < %u || %lu != %lu).\n",
1119 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1120 recv->mod_value_size_expected, recv->mod_value_size);
1121
1122 GNUNET_break_op (0);
1123 recv_error (recv);
1124 return GNUNET_SYSERR;
1125 }
1126 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1127 break;
1128 }
1129 }
1130
1131 if (NULL != recv->message_part_cb)
1132 recv->message_part_cb (recv->cb_cls, msg, pmsg);
1133
1134 switch (ptype)
1135 {
1136 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1137 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1138 GNUNET_PSYC_receive_reset (recv);
1139 break;
1140 }
1141 }
1142
1143 if (NULL != recv->message_cb)
1144 recv->message_cb (recv->cb_cls, msg);
1145 return GNUNET_OK;
1146}
1147
1148
1149/**
1150 * Check if @a data contains a series of valid message parts.
1151 *
1152 * @param data_size Size of @a data.
1153 * @param data Data.
1154 * @param[out] first_ptype Type of first message part.
1155 * @param[out] last_ptype Type of last message part.
1156 *
1157 * @return Number of message parts found in @a data.
1158 * or GNUNET_SYSERR if the message contains invalid parts.
1159 */
1160int
1161GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1162 uint16_t *first_ptype, uint16_t *last_ptype)
1163{
1164 const struct GNUNET_MessageHeader *pmsg;
1165 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1166 if (NULL != first_ptype)
1167 *first_ptype = 0;
1168 if (NULL != last_ptype)
1169 *last_ptype = 0;
1170
1171 for (pos = 0; pos < data_size; pos += psize, parts++)
1172 {
1173 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1174 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1175 psize = ntohs (pmsg->size);
1176 ptype = ntohs (pmsg->type);
1177 if (0 == parts && NULL != first_ptype)
1178 *first_ptype = ptype;
1179 if (NULL != last_ptype
1180 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1181 *last_ptype = ptype;
1182 if (psize < sizeof (*pmsg)
1183 || pos + psize > data_size
1184 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1185 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1186 {
1187 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188 "Invalid message part of type %u and size %u.\n",
1189 ptype, psize);
1190 return GNUNET_SYSERR;
1191 }
1192 /** @todo FIXME: check message part order */
1193 }
1194 return parts;
1195}
1196
1197
1198struct ParseMessageClosure
1199{
1200 struct GNUNET_PSYC_Environment *env;
1201 const char **method_name;
1202 const void **data;
1203 uint16_t *data_size;
1204 enum GNUNET_PSYC_MessageState msg_state;
1205};
1206
1207
1208static void
1209parse_message_part_cb (void *cls,
1210 const struct GNUNET_PSYC_MessageHeader *msg,
1211 const struct GNUNET_MessageHeader *pmsg)
1212{
1213 struct ParseMessageClosure *pmc = cls;
1214 if (NULL == pmsg)
1215 {
1216 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1217 return;
1218 }
1219
1220 switch (ntohs (pmsg->type))
1221 {
1222 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1223 {
1224 struct GNUNET_PSYC_MessageMethod *
1225 pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1226 *pmc->method_name = (const char *) &pmeth[1];
1227 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1228 break;
1229 }
1230
1231 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1232 {
1233 struct GNUNET_PSYC_MessageModifier *
1234 pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1235
1236 const char *name = (const char *) &pmod[1];
1237 const void *value = name + ntohs (pmod->name_size);
1238 GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1239 ntohl (pmod->value_size));
1240 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1241 break;
1242 }
1243
1244 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1245 *pmc->data = &pmsg[1];
1246 *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1247 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1248 break;
1249
1250 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1251 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1252 break;
1253
1254 default:
1255 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1256 }
1257}
1258
1259
1260/**
1261 * Parse PSYC message.
1262 *
1263 * @param msg
1264 * The PSYC message to parse.
1265 * @param[out] method_name
1266 * Pointer to the method name inside @a pmsg.
1267 * @param env
1268 * The environment for the message with a list of modifiers.
1269 * @param[out] data
1270 * Pointer to data inside @a msg.
1271 * @param[out] data_size
1272 * Size of @data is written here.
1273 *
1274 * @return #GNUNET_OK on success,
1275 * #GNUNET_SYSERR on parse error.
1276 */
1277int
1278GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1279 const char **method_name,
1280 struct GNUNET_PSYC_Environment *env,
1281 const void **data,
1282 uint16_t *data_size)
1283{
1284 struct ParseMessageClosure cls;
1285 cls.env = env;
1286 cls.method_name = method_name;
1287 cls.data = data;
1288 cls.data_size = data_size;
1289
1290 struct GNUNET_PSYC_ReceiveHandle *
1291 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1292 int ret = GNUNET_PSYC_receive_message (recv, msg);
1293 GNUNET_PSYC_receive_destroy (recv);
1294
1295 if (GNUNET_OK != ret)
1296 return GNUNET_SYSERR;
1297
1298 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1299 ? GNUNET_OK
1300 : GNUNET_NO;
1301}
1302
1303
1304/**
1305 * Initialize PSYC message header.
1306 */
1307void
1308GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1309 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1310 uint32_t flags)
1311{
1312 uint16_t size = ntohs (mmsg->header.size);
1313 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1314
1315 pmsg->header.size = htons (psize);
1316 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1317 pmsg->message_id = mmsg->message_id;
1318 pmsg->fragment_offset = mmsg->fragment_offset;
1319 pmsg->flags = htonl (flags);
1320
1321 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1322}
1323
1324
1325/**
1326 * Create a new PSYC message header from a multicast message.
1327 */
1328struct GNUNET_PSYC_MessageHeader *
1329GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1330 uint32_t flags)
1331{
1332 struct GNUNET_PSYC_MessageHeader *pmsg;
1333 uint16_t size = ntohs (mmsg->header.size);
1334 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1335
1336 pmsg = GNUNET_malloc (psize);
1337 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1338 return pmsg;
1339}
1340
1341
1342/**
1343 * Create a new PSYC message header from a PSYC message.
1344 */
1345struct GNUNET_PSYC_MessageHeader *
1346GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1347{
1348 uint16_t msg_size = ntohs (msg->header.size);
1349 struct GNUNET_PSYC_MessageHeader *
1350 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1351 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1352 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1353 GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1354 return pmsg;
1355}