aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2015-07-19 15:33:15 +0000
committerDavid Barksdale <amatus.amongus@gmail.com>2015-07-19 15:33:15 +0000
commitbba1229ab25ed0ae236c3062e371bb97d9a05375 (patch)
tree2634f877006166fd6bd5ac93b6e924a7a533d18b /src/psyc
parentfaf6cdf7e592414f041d486bf620967817485435 (diff)
downloadgnunet-bba1229ab25ed0ae236c3062e371bb97d9a05375.tar.gz
gnunet-bba1229ab25ed0ae236c3062e371bb97d9a05375.zip
Move libgnunetpsycutil.la to psycstore
This solves a circular dependency between psyc and psycstore.
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/Makefile.am18
-rw-r--r--src/psyc/psyc_util_lib.c1233
2 files changed, 4 insertions, 1247 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index 497f1f0c5..4bb28ce0a 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -18,24 +18,14 @@ if USE_COVERAGE
18 XLIB = -lgcov 18 XLIB = -lgcov
19endif 19endif
20 20
21lib_LTLIBRARIES = libgnunetpsycutil.la libgnunetpsyc.la 21lib_LTLIBRARIES = libgnunetpsyc.la
22
23libgnunetpsycutil_la_SOURCES = \
24 psyc_util_lib.c
25libgnunetpsycutil_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/env/libgnunetenv.la \
28 $(GN_LIBINTL) $(XLIB)
29libgnunetpsycutil_la_LDFLAGS = \
30 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
31 -version-info 0:0:0
32 22
33libgnunetpsyc_la_SOURCES = \ 23libgnunetpsyc_la_SOURCES = \
34 psyc_api.c psyc.h 24 psyc_api.c psyc.h
35libgnunetpsyc_la_LIBADD = \ 25libgnunetpsyc_la_LIBADD = \
36 $(top_builddir)/src/util/libgnunetutil.la \ 26 $(top_builddir)/src/util/libgnunetutil.la \
37 $(top_builddir)/src/env/libgnunetenv.la \ 27 $(top_builddir)/src/env/libgnunetenv.la \
38 libgnunetpsycutil.la \ 28 $(top_builddir)/src/psycstore/libgnunetpsycutil.la \
39 $(GN_LIBINTL) $(XLIB) 29 $(GN_LIBINTL) $(XLIB)
40libgnunetpsyc_la_LDFLAGS = \ 30libgnunetpsyc_la_LDFLAGS = \
41 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 31 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
@@ -53,7 +43,7 @@ gnunet_service_psyc_LDADD = \
53 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 43 $(top_builddir)/src/statistics/libgnunetstatistics.la \
54 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 44 $(top_builddir)/src/multicast/libgnunetmulticast.la \
55 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \ 45 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
56 libgnunetpsycutil.la \ 46 $(top_builddir)/src/psycstore/libgnunetpsycutil.la \
57 $(GN_LIBINTL) 47 $(GN_LIBINTL)
58gnunet_service_psyc_CFLAGS = $(AM_CFLAGS) 48gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
59 49
@@ -72,7 +62,7 @@ test_psyc_SOURCES = \
72 test_psyc.c 62 test_psyc.c
73test_psyc_LDADD = \ 63test_psyc_LDADD = \
74 libgnunetpsyc.la \ 64 libgnunetpsyc.la \
75 libgnunetpsycutil.la \ 65 $(top_builddir)/src/psycstore/libgnunetpsycutil.la \
76 $(top_builddir)/src/testing/libgnunettesting.la \ 66 $(top_builddir)/src/testing/libgnunettesting.la \
77 $(top_builddir)/src/core/libgnunetcore.la \ 67 $(top_builddir)/src/core/libgnunetcore.la \
78 $(top_builddir)/src/env/libgnunetenv.la \ 68 $(top_builddir)/src/env/libgnunetenv.la \
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
deleted file mode 100644
index 4ad7a914b..000000000
--- a/src/psyc/psyc_util_lib.c
+++ /dev/null
@@ -1,1233 +0,0 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21/**
22 * @file psyc/psyc_util_lib.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_env_lib.h"
32#include "gnunet_psyc_service.h"
33#include "gnunet_psyc_util_lib.h"
34
35#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37
38struct GNUNET_PSYC_TransmitHandle
39{
40 /**
41 * Client connection to service.
42 */
43 struct GNUNET_CLIENT_MANAGER_Connection *client;
44
45 /**
46 * Message currently being received from the client.
47 */
48 struct GNUNET_MessageHeader *msg;
49
50 /**
51 * Callback to request next modifier from client.
52 */
53 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
54
55 /**
56 * Closure for the notify callbacks.
57 */
58 void *notify_mod_cls;
59
60 /**
61 * Callback to request next data fragment from client.
62 */
63 GNUNET_PSYC_TransmitNotifyData notify_data;
64
65 /**
66 * Closure for the notify callbacks.
67 */
68 void *notify_data_cls;
69
70 /**
71 * Modifier of the environment that is currently being transmitted.
72 */
73 struct GNUNET_ENV_Modifier *mod;
74
75 /**
76 *
77 */
78 const char *mod_value;
79
80 /**
81 * Number of bytes remaining to be transmitted from the current modifier value.
82 */
83 uint32_t mod_value_remaining;
84
85 /**
86 * State of the current message being received from client.
87 */
88 enum GNUNET_PSYC_MessageState state;
89
90 /**
91 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
92 */
93 uint8_t acks_pending;
94
95 /**
96 * Is transmission paused?
97 */
98 uint8_t paused;
99
100 /**
101 * Are we currently transmitting a message?
102 */
103 uint8_t in_transmit;
104};
105
106
107
108struct GNUNET_PSYC_ReceiveHandle
109{
110 /**
111 * Message callback.
112 */
113 GNUNET_PSYC_MessageCallback message_cb;
114
115 /**
116 * Message part callback.
117 */
118 GNUNET_PSYC_MessagePartCallback message_part_cb;
119
120 /**
121 * Closure for the callbacks.
122 */
123 void *cb_cls;
124
125 /**
126 * ID of the message being received from the PSYC service.
127 */
128 uint64_t message_id;
129
130 /**
131 * Public key of the slave from which a message is being received.
132 */
133 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
134
135 /**
136 * State of the currently being received message from the PSYC service.
137 */
138 enum GNUNET_PSYC_MessageState state;
139
140 /**
141 * Flags for the currently being received message from the PSYC service.
142 */
143 enum GNUNET_PSYC_MessageFlags flags;
144
145 /**
146 * Expected value size for the modifier being received from the PSYC service.
147 */
148 uint32_t mod_value_size_expected;
149
150 /**
151 * Actual value size for the modifier being received from the PSYC service.
152 */
153 uint32_t mod_value_size;
154};
155
156
157/**** Messages ****/
158
159
160/**
161 * Create a PSYC message.
162 *
163 * @param method_name
164 * PSYC method for the message.
165 * @param env
166 * Environment for the message.
167 * @param data
168 * Data payload for the message.
169 * @param data_size
170 * Size of @a data.
171 *
172 * @return Message header with size information,
173 * followed by the message parts.
174 */
175struct GNUNET_PSYC_Message *
176GNUNET_PSYC_message_create (const char *method_name,
177 const struct GNUNET_ENV_Environment *env,
178 const void *data,
179 size_t data_size)
180{
181 struct GNUNET_ENV_Modifier *mod = NULL;
182 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
183 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
184 struct GNUNET_MessageHeader *pmsg = NULL;
185 uint16_t env_size = 0;
186 if (NULL != env)
187 {
188 mod = GNUNET_ENV_environment_head (env);
189 while (NULL != mod)
190 {
191 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
192 mod = mod->next;
193 }
194 }
195
196 struct GNUNET_PSYC_Message *msg;
197 uint16_t method_name_size = strlen (method_name) + 1;
198 if (method_name_size == 1)
199 return NULL;
200
201 uint16_t msg_size = sizeof (*msg) /* header */
202 + sizeof (*pmeth) + method_name_size /* method */
203 + env_size /* modifiers */
204 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */
205 + sizeof (*pmsg); /* end of message */
206 msg = GNUNET_malloc (msg_size);
207 msg->header.size = htons (msg_size);
208 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
209
210 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
212 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
213 memcpy (&pmeth[1], method_name, method_name_size);
214
215 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
216 if (NULL != env)
217 {
218 mod = GNUNET_ENV_environment_head (env);
219 while (NULL != mod)
220 {
221 uint16_t mod_name_size = strlen (mod->name) + 1;
222 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
223 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
224 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
225 p += pmod->header.size;
226 pmod->header.size = htons (pmod->header.size);
227
228 memcpy (&pmod[1], mod->name, mod_name_size);
229 if (0 < mod->value_size)
230 memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
231
232 mod = mod->next;
233 }
234 }
235
236 if (0 < data_size)
237 {
238 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
239 pmsg->size = sizeof (*pmsg) + data_size;
240 p += pmsg->size;
241 pmsg->size = htons (pmsg->size);
242 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
243 memcpy (&pmsg[1], data, data_size);
244 }
245
246 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
247 pmsg->size = htons (sizeof (*pmsg));
248 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
249
250 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
251 return msg;
252}
253
254
255void
256GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
257 const struct GNUNET_MessageHeader *msg)
258{
259 uint16_t size = ntohs (msg->size);
260 uint16_t type = ntohs (msg->type);
261 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
262 switch (type)
263 {
264 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
265 {
266 struct GNUNET_PSYC_MessageHeader *pmsg
267 = (struct GNUNET_PSYC_MessageHeader *) msg;
268 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
269 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
270 break;
271 }
272 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
273 {
274 struct GNUNET_PSYC_MessageMethod *meth
275 = (struct GNUNET_PSYC_MessageMethod *) msg;
276 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
277 break;
278 }
279 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
280 {
281 struct GNUNET_PSYC_MessageModifier *mod
282 = (struct GNUNET_PSYC_MessageModifier *) msg;
283 uint16_t name_size = ntohs (mod->name_size);
284 char oper = ' ' < mod->oper ? mod->oper : ' ';
285 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
286 size - sizeof (*mod) - name_size,
287 ((char *) &mod[1]) + name_size);
288 break;
289 }
290 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
291 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
292 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
293 break;
294 }
295}
296
297
298/**** Transmitting messages ****/
299
300
301/**
302 * Create a transmission handle.
303 */
304struct GNUNET_PSYC_TransmitHandle *
305GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
306{
307 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
308 tmit->client = client;
309 return tmit;
310}
311
312
313/**
314 * Destroy a transmission handle.
315 */
316void
317GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
318{
319 GNUNET_free (tmit);
320}
321
322
323/**
324 * Queue a message part for transmission.
325 *
326 * The message part is added to the current message buffer.
327 * When this buffer is full, it is added to the transmission queue.
328 *
329 * @param tmit
330 * Transmission handle.
331 * @param msg
332 * Message part, or NULL.
333 * @param end
334 * End of message?
335 * #GNUNET_YES or #GNUNET_NO.
336 */
337static void
338transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
339 const struct GNUNET_MessageHeader *msg,
340 uint8_t end)
341{
342 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
343
344 LOG (GNUNET_ERROR_TYPE_DEBUG,
345 "Queueing message part of type %u and size %u (end: %u)).\n",
346 NULL != msg ? ntohs (msg->type) : 0, size, end);
347
348 if (NULL != tmit->msg)
349 {
350 if (NULL == msg
351 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
352 {
353 /* End of message or buffer is full, add it to transmission queue
354 * and start with empty buffer */
355 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
356 tmit->msg->size = htons (tmit->msg->size);
357 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
358 tmit->msg = NULL;
359 tmit->acks_pending++;
360 }
361 else
362 {
363 /* Message fits in current buffer, append */
364 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
365 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
366 tmit->msg->size += size;
367 }
368 }
369
370 if (NULL == tmit->msg && NULL != msg)
371 {
372 /* Empty buffer, copy over message. */
373 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
374 tmit->msg->size = sizeof (*tmit->msg) + size;
375 memcpy (&tmit->msg[1], msg, size);
376 }
377
378 if (NULL != tmit->msg
379 && (GNUNET_YES == end
380 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
381 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
382 {
383 /* End of message or buffer is full, add it to transmission queue. */
384 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
385 tmit->msg->size = htons (tmit->msg->size);
386 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
387 tmit->msg = NULL;
388 tmit->acks_pending++;
389 }
390
391 if (GNUNET_YES == end)
392 tmit->in_transmit = GNUNET_NO;
393}
394
395
396/**
397 * Request data from client to transmit.
398 *
399 * @param tmit Transmission handle.
400 */
401static void
402transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
403{
404 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
405 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
406 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
407 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
408
409 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
410 LOG (GNUNET_ERROR_TYPE_DEBUG,
411 "transmit_data (ret: %d, size: %u): %.*s\n",
412 notify_ret, data_size, data_size, &msg[1]);
413 switch (notify_ret)
414 {
415 case GNUNET_NO:
416 if (0 == data_size)
417 {
418 /* Transmission paused, nothing to send. */
419 tmit->paused = GNUNET_YES;
420 return;
421 }
422 break;
423
424 case GNUNET_YES:
425 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
426 break;
427
428 default:
429 LOG (GNUNET_ERROR_TYPE_ERROR,
430 "TransmitNotifyData callback returned error when requesting data.\n");
431
432 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
433 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
434 msg->size = htons (sizeof (*msg));
435 transmit_queue_insert (tmit, msg, GNUNET_YES);
436 return;
437 }
438
439 if (0 < data_size)
440 {
441 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
442 msg->size = htons (sizeof (*msg) + data_size);
443 transmit_queue_insert (tmit, msg, !notify_ret);
444 }
445
446 /* End of message. */
447 if (GNUNET_YES == notify_ret)
448 {
449 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
450 msg->size = htons (sizeof (*msg));
451 transmit_queue_insert (tmit, msg, GNUNET_YES);
452 }
453}
454
455
456/**
457 * Request a modifier from a client to transmit.
458 *
459 * @param tmit Transmission handle.
460 */
461static void
462transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
463{
464 uint16_t max_data_size, data_size;
465 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
466 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
467 int notify_ret;
468
469 switch (tmit->state)
470 {
471 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
472 {
473 struct GNUNET_PSYC_MessageModifier *mod
474 = (struct GNUNET_PSYC_MessageModifier *) msg;
475 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
476 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
477 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
478 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
479 &mod->oper, &mod->value_size);
480
481 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
482 LOG (GNUNET_ERROR_TYPE_DEBUG,
483 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
484 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
485 if (mod->name_size < data_size)
486 {
487 tmit->mod_value_remaining
488 = mod->value_size - (data_size - mod->name_size);
489 mod->value_size = htonl (mod->value_size);
490 mod->name_size = htons (mod->name_size);
491 }
492 else if (0 < data_size)
493 {
494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
495 notify_ret = GNUNET_SYSERR;
496 }
497 break;
498 }
499 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
500 {
501 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
502 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
503 msg->size = sizeof (struct GNUNET_MessageHeader);
504 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
505 &data_size, &msg[1], NULL, NULL);
506 tmit->mod_value_remaining -= data_size;
507 LOG (GNUNET_ERROR_TYPE_DEBUG,
508 "transmit_mod (ret: %d, size: %u): %.*s\n",
509 notify_ret, data_size, data_size, &msg[1]);
510 break;
511 }
512 default:
513 GNUNET_assert (0);
514 }
515
516 switch (notify_ret)
517 {
518 case GNUNET_NO:
519 if (0 == data_size)
520 { /* Transmission paused, nothing to send. */
521 tmit->paused = GNUNET_YES;
522 return;
523 }
524 tmit->state
525 = (0 == tmit->mod_value_remaining)
526 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
527 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
528 break;
529
530 case GNUNET_YES: /* End of modifiers. */
531 GNUNET_assert (0 == tmit->mod_value_remaining);
532 break;
533
534 default:
535 LOG (GNUNET_ERROR_TYPE_ERROR,
536 "TransmitNotifyModifier callback returned with error.\n");
537
538 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
539 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
540 msg->size = htons (sizeof (*msg));
541
542 transmit_queue_insert (tmit, msg, GNUNET_YES);
543 return;
544 }
545
546 if (0 < data_size)
547 {
548 GNUNET_assert (data_size <= max_data_size);
549 msg->size = htons (msg->size + data_size);
550 transmit_queue_insert (tmit, msg, GNUNET_NO);
551 }
552
553 if (GNUNET_YES == notify_ret)
554 {
555 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
556 if (0 == tmit->acks_pending)
557 transmit_data (tmit);
558 }
559 else
560 {
561 transmit_mod (tmit);
562 }
563}
564
565
566int
567transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
568 uint32_t *full_value_size)
569
570{
571 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
572 uint16_t name_size = 0;
573 size_t value_size = 0;
574 const char *value = NULL;
575
576 if (NULL != oper)
577 { /* New modifier */
578 if (NULL != tmit->mod)
579 tmit->mod = tmit->mod->next;
580 if (NULL == tmit->mod)
581 { /* No more modifiers, continue with data */
582 *data_size = 0;
583 return GNUNET_YES;
584 }
585
586 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
587 *full_value_size = tmit->mod->value_size;
588 *oper = tmit->mod->oper;
589 name_size = strlen (tmit->mod->name) + 1;
590
591 if (name_size + tmit->mod->value_size <= *data_size)
592 {
593 *data_size = name_size + tmit->mod->value_size;
594 }
595 else
596 {
597 value_size = *data_size - name_size;
598 tmit->mod_value = tmit->mod->value + value_size;
599 }
600
601 memcpy (data, tmit->mod->name, name_size);
602 memcpy ((char *)data + name_size, tmit->mod->value, value_size);
603 return GNUNET_NO;
604 }
605 else
606 { /* Modifier continuation */
607 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
608 value = tmit->mod_value;
609 if (tmit->mod_value_remaining <= *data_size)
610 {
611 value_size = tmit->mod_value_remaining;
612 tmit->mod_value = NULL;
613 }
614 else
615 {
616 value_size = *data_size;
617 tmit->mod_value += value_size;
618 }
619
620 if (*data_size < value_size)
621 {
622 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Value in environment larger than buffer: %u < %zu\n",
624 *data_size, value_size);
625 *data_size = 0;
626 return GNUNET_NO;
627 }
628
629 *data_size = value_size;
630 memcpy (data, value, value_size);
631 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
632 }
633}
634
635
636/**
637 * Transmit a message.
638 *
639 * @param tmit
640 * Transmission handle.
641 * @param method_name
642 * Which method should be invoked.
643 * @param env
644 * Environment for the message.
645 * Should stay available until the first call to notify_data.
646 * Can be NULL if there are no modifiers or @a notify_mod is
647 * provided instead.
648 * @param notify_mod
649 * Function to call to obtain modifiers.
650 * Can be NULL if there are no modifiers or @a env is provided instead.
651 * @param notify_data
652 * Function to call to obtain fragments of the data.
653 * @param notify_cls
654 * Closure for @a notify_mod and @a notify_data.
655 * @param flags
656 * Flags for the message being transmitted.
657 *
658 * @return #GNUNET_OK if the transmission was started.
659 * #GNUNET_SYSERR if another transmission is already going on.
660 */
661int
662GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
663 const char *method_name,
664 const struct GNUNET_ENV_Environment *env,
665 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
666 GNUNET_PSYC_TransmitNotifyData notify_data,
667 void *notify_cls,
668 uint32_t flags)
669{
670 if (GNUNET_NO != tmit->in_transmit)
671 return GNUNET_SYSERR;
672 tmit->in_transmit = GNUNET_YES;
673
674 size_t size = strlen (method_name) + 1;
675 struct GNUNET_PSYC_MessageMethod *pmeth;
676 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
677 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
678
679 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
680 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
681 pmeth->header.size = htons (sizeof (*pmeth) + size);
682 pmeth->flags = htonl (flags);
683 memcpy (&pmeth[1], method_name, size);
684
685 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
686 tmit->notify_data = notify_data;
687 tmit->notify_data_cls = notify_cls;
688
689 if (NULL != notify_mod)
690 {
691 tmit->notify_mod = notify_mod;
692 tmit->notify_mod_cls = notify_cls;
693 }
694 else
695 {
696 tmit->notify_mod = &transmit_notify_env;
697 tmit->notify_mod_cls = tmit;
698 if (NULL != env)
699 {
700 struct GNUNET_ENV_Modifier mod = {};
701 mod.next = GNUNET_ENV_environment_head (env);
702 tmit->mod = &mod;
703 }
704 else
705 {
706 tmit->mod = NULL;
707 }
708 }
709
710 transmit_mod (tmit);
711 return GNUNET_OK;
712}
713
714
715/**
716 * Resume transmission.
717 *
718 * @param tmit Transmission handle.
719 */
720void
721GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
722{
723 if (0 == tmit->acks_pending)
724 {
725 tmit->paused = GNUNET_NO;
726 transmit_data (tmit);
727 }
728}
729
730
731/**
732 * Abort transmission request.
733 *
734 * @param tmit Transmission handle.
735 */
736void
737GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
738{
739 if (GNUNET_NO == tmit->in_transmit)
740 return;
741
742 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
743 tmit->in_transmit = GNUNET_NO;
744 tmit->paused = GNUNET_NO;
745
746 /* FIXME */
747 struct GNUNET_MessageHeader msg;
748 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
749 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
750 msg.size = htons (sizeof (msg));
751 transmit_queue_insert (tmit, &msg, GNUNET_YES);
752}
753
754
755/**
756 * Got acknowledgement of a transmitted message part, continue transmission.
757 *
758 * @param tmit Transmission handle.
759 */
760void
761GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
762{
763 if (0 == tmit->acks_pending)
764 {
765 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
766 GNUNET_break (0);
767 return;
768 }
769 tmit->acks_pending--;
770
771 switch (tmit->state)
772 {
773 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
774 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
775 if (GNUNET_NO == tmit->paused)
776 transmit_mod (tmit);
777 break;
778
779 case GNUNET_PSYC_MESSAGE_STATE_DATA:
780 if (GNUNET_NO == tmit->paused)
781 transmit_data (tmit);
782 break;
783
784 case GNUNET_PSYC_MESSAGE_STATE_END:
785 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
786 break;
787
788 default:
789 LOG (GNUNET_ERROR_TYPE_DEBUG,
790 "Ignoring message ACK in state %u.\n", tmit->state);
791 }
792}
793
794
795/**** Receiving messages ****/
796
797
798/**
799 * Create handle for receiving messages.
800 */
801struct GNUNET_PSYC_ReceiveHandle *
802GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
803 GNUNET_PSYC_MessagePartCallback message_part_cb,
804 void *cb_cls)
805{
806 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
807 recv->message_cb = message_cb;
808 recv->message_part_cb = message_part_cb;
809 recv->cb_cls = cb_cls;
810 return recv;
811}
812
813
814/**
815 * Destroy handle for receiving messages.
816 */
817void
818GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
819{
820 GNUNET_free (recv);
821}
822
823
824/**
825 * Reset stored data related to the last received message.
826 */
827void
828GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
829{
830 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
831 recv->flags = 0;
832 recv->message_id = 0;
833 recv->mod_value_size = 0;
834 recv->mod_value_size_expected = 0;
835}
836
837
838static void
839recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
840{
841 if (NULL != recv->message_part_cb)
842 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL);
843
844 if (NULL != recv->message_cb)
845 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
846
847 GNUNET_PSYC_receive_reset (recv);
848}
849
850
851/**
852 * Handle incoming PSYC message.
853 *
854 * @param recv Receive handle.
855 * @param msg The message.
856 *
857 * @return #GNUNET_OK on success,
858 * #GNUNET_SYSERR on receive error.
859 */
860int
861GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
862 const struct GNUNET_PSYC_MessageHeader *msg)
863{
864 uint16_t size = ntohs (msg->header.size);
865 uint32_t flags = ntohl (msg->flags);
866 uint64_t message_id;
867
868 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
869 (struct GNUNET_MessageHeader *) msg);
870
871 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
872 {
873 recv->message_id = GNUNET_ntohll (msg->message_id);
874 recv->flags = flags;
875 recv->slave_key = msg->slave_key;
876 recv->mod_value_size = 0;
877 recv->mod_value_size_expected = 0;
878 }
879 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
880 {
881 // FIXME
882 LOG (GNUNET_ERROR_TYPE_WARNING,
883 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
884 GNUNET_ntohll (msg->message_id), recv->message_id);
885 GNUNET_break_op (0);
886 recv_error (recv);
887 return GNUNET_SYSERR;
888 }
889 else if (flags != recv->flags)
890 {
891 LOG (GNUNET_ERROR_TYPE_WARNING,
892 "Unexpected message flags. Got: %lu, expected: %lu\n",
893 flags, recv->flags);
894 GNUNET_break_op (0);
895 recv_error (recv);
896 return GNUNET_SYSERR;
897 }
898 message_id = recv->message_id;
899
900 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
901
902 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
903 {
904 const struct GNUNET_MessageHeader *pmsg
905 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
906 psize = ntohs (pmsg->size);
907 ptype = ntohs (pmsg->type);
908 size_eq = size_min = 0;
909
910 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
911 {
912 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
913 "Dropping message of type %u with invalid size %u.\n",
914 ptype, psize);
915 recv_error (recv);
916 return GNUNET_SYSERR;
917 }
918
919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920 "Received message part of type %u and size %u from PSYC.\n",
921 ptype, psize);
922 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
923
924 switch (ptype)
925 {
926 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
927 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
928 break;
929 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
930 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
931 break;
932 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
933 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
934 size_min = sizeof (struct GNUNET_MessageHeader);
935 break;
936 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
937 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
938 size_eq = sizeof (struct GNUNET_MessageHeader);
939 break;
940 default:
941 GNUNET_break_op (0);
942 recv_error (recv);
943 return GNUNET_SYSERR;
944 }
945
946 if (! ((0 < size_eq && psize == size_eq)
947 || (0 < size_min && size_min <= psize)))
948 {
949 GNUNET_break_op (0);
950 recv_error (recv);
951 return GNUNET_SYSERR;
952 }
953
954 switch (ptype)
955 {
956 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
957 {
958 struct GNUNET_PSYC_MessageMethod *meth
959 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
960
961 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
962 {
963 LOG (GNUNET_ERROR_TYPE_WARNING,
964 "Dropping out of order message method (%u).\n",
965 recv->state);
966 /* It is normal to receive an incomplete message right after connecting,
967 * but should not happen later.
968 * FIXME: add a check for this condition.
969 */
970 GNUNET_break_op (0);
971 recv_error (recv);
972 return GNUNET_SYSERR;
973 }
974
975 if ('\0' != *((char *) meth + psize - 1))
976 {
977 LOG (GNUNET_ERROR_TYPE_WARNING,
978 "Dropping message with malformed method. "
979 "Message ID: %" PRIu64 "\n", recv->message_id);
980 GNUNET_break_op (0);
981 recv_error (recv);
982 return GNUNET_SYSERR;
983 }
984 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
985 break;
986 }
987 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
988 {
989 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
990 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
991 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
992 {
993 LOG (GNUNET_ERROR_TYPE_WARNING,
994 "Dropping out of order message modifier (%u).\n",
995 recv->state);
996 GNUNET_break_op (0);
997 recv_error (recv);
998 return GNUNET_SYSERR;
999 }
1000
1001 struct GNUNET_PSYC_MessageModifier *mod
1002 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1003
1004 uint16_t name_size = ntohs (mod->name_size);
1005 recv->mod_value_size_expected = ntohl (mod->value_size);
1006 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1007
1008 if (psize < sizeof (*mod) + name_size
1009 || '\0' != *((char *) &mod[1] + name_size - 1)
1010 || recv->mod_value_size_expected < recv->mod_value_size)
1011 {
1012 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1013 GNUNET_break_op (0);
1014 recv_error (recv);
1015 return GNUNET_SYSERR;
1016 }
1017 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1018 break;
1019 }
1020 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1021 {
1022 recv->mod_value_size += psize - sizeof (*pmsg);
1023
1024 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1025 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1026 || recv->mod_value_size_expected < recv->mod_value_size)
1027 {
1028 LOG (GNUNET_ERROR_TYPE_WARNING,
1029 "Dropping out of order message modifier continuation "
1030 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1031 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1032 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1033 recv->mod_value_size_expected, recv->mod_value_size);
1034 GNUNET_break_op (0);
1035 recv_error (recv);
1036 return GNUNET_SYSERR;
1037 }
1038 break;
1039 }
1040 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1041 {
1042 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1043 || recv->mod_value_size_expected != recv->mod_value_size)
1044 {
1045 LOG (GNUNET_ERROR_TYPE_WARNING,
1046 "Dropping out of order message data fragment "
1047 "(%u < %u || %lu != %lu).\n",
1048 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1049 recv->mod_value_size_expected, recv->mod_value_size);
1050
1051 GNUNET_break_op (0);
1052 recv_error (recv);
1053 return GNUNET_SYSERR;
1054 }
1055 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1056 break;
1057 }
1058 }
1059
1060 if (NULL != recv->message_part_cb)
1061 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset
1062 recv->flags, pmsg);
1063
1064 switch (ptype)
1065 {
1066 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1067 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1068 GNUNET_PSYC_receive_reset (recv);
1069 break;
1070 }
1071 }
1072
1073 if (NULL != recv->message_cb)
1074 recv->message_cb (recv->cb_cls, message_id, flags, msg);
1075 return GNUNET_OK;
1076}
1077
1078
1079/**
1080 * Check if @a data contains a series of valid message parts.
1081 *
1082 * @param data_size Size of @a data.
1083 * @param data Data.
1084 * @param[out] first_ptype Type of first message part.
1085 * @param[out] last_ptype Type of last message part.
1086 *
1087 * @return Number of message parts found in @a data.
1088 * or GNUNET_SYSERR if the message contains invalid parts.
1089 */
1090int
1091GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1092 uint16_t *first_ptype, uint16_t *last_ptype)
1093{
1094 const struct GNUNET_MessageHeader *pmsg;
1095 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1096 if (NULL != first_ptype)
1097 *first_ptype = 0;
1098 if (NULL != last_ptype)
1099 *last_ptype = 0;
1100
1101 for (pos = 0; pos < data_size; pos += psize, parts++)
1102 {
1103 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1104 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1105 psize = ntohs (pmsg->size);
1106 ptype = ntohs (pmsg->type);
1107 if (0 == parts && NULL != first_ptype)
1108 *first_ptype = ptype;
1109 if (NULL != last_ptype
1110 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1111 *last_ptype = ptype;
1112 if (psize < sizeof (*pmsg)
1113 || pos + psize > data_size
1114 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1115 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1116 {
1117 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1118 "Invalid message part of type %u and size %u.\n",
1119 ptype, psize);
1120 return GNUNET_SYSERR;
1121 }
1122 /** @todo FIXME: check message part order */
1123 }
1124 return parts;
1125}
1126
1127
1128struct ParseMessageClosure
1129{
1130 struct GNUNET_ENV_Environment *env;
1131 const char **method_name;
1132 const void **data;
1133 uint16_t *data_size;
1134 enum GNUNET_PSYC_MessageState msg_state;
1135};
1136
1137
1138static void
1139parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset,
1140 uint32_t flags, const struct GNUNET_MessageHeader *msg)
1141{
1142 struct ParseMessageClosure *pmc = cls;
1143 if (NULL == msg)
1144 {
1145 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1146 return;
1147 }
1148
1149 switch (ntohs (msg->type))
1150 {
1151 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1152 {
1153 struct GNUNET_PSYC_MessageMethod *
1154 pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1155 *pmc->method_name = (const char *) &pmeth[1];
1156 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1157 break;
1158 }
1159
1160 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1161 {
1162 struct GNUNET_PSYC_MessageModifier *
1163 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1164
1165 const char *name = (const char *) &pmod[1];
1166 const void *value = name + pmod->name_size;
1167 GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1168 pmod->value_size);
1169 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1170 break;
1171 }
1172
1173 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1174 *pmc->data = &msg[1];
1175 *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1176 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1177 break;
1178
1179 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1180 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1181 break;
1182
1183 default:
1184 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1185 }
1186}
1187
1188
1189/**
1190 * Parse PSYC message.
1191 *
1192 * @param msg
1193 * The PSYC message to parse.
1194 * @param[out] method_name
1195 * Pointer to the method name inside @a pmsg.
1196 * @param env
1197 * The environment for the message with a list of modifiers.
1198 * @param[out] data
1199 * Pointer to data inside @a pmsg.
1200 * @param[out] data_size
1201 * Size of @data is written here.
1202 *
1203 * @return #GNUNET_OK on success,
1204 * #GNUNET_SYSERR on parse error.
1205 */
1206int
1207GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1208 const char **method_name,
1209 struct GNUNET_ENV_Environment *env,
1210 const void **data,
1211 uint16_t *data_size)
1212{
1213 struct ParseMessageClosure cls;
1214 cls.env = env;
1215 cls.method_name = method_name;
1216 cls.data = data;
1217 cls.data_size = data_size;
1218
1219 uint16_t msg_size = ntohs (msg->header.size);
1220 struct GNUNET_PSYC_MessageHeader *
1221 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1222 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1223
1224 struct GNUNET_PSYC_ReceiveHandle *
1225 recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls);
1226 GNUNET_PSYC_receive_message (recv, pmsg);
1227 GNUNET_PSYC_receive_destroy (recv);
1228 GNUNET_free (pmsg);
1229
1230 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1231 ? GNUNET_OK
1232 : GNUNET_SYSERR;
1233}