diff options
Diffstat (limited to 'src/psycutil/psyc_message.c')
-rw-r--r-- | src/psycutil/psyc_message.c | 1355 |
1 files changed, 0 insertions, 1355 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c deleted file mode 100644 index a03eff47f..000000000 --- a/src/psycutil/psyc_message.c +++ /dev/null | |||
@@ -1,1355 +0,0 @@ | |||
1 | /* | ||
2 | * This file is part of GNUnet | ||
3 | * Copyright (C) 2013 GNUnet e.V. | ||
4 | * | ||
5 | * GNUnet is free software: you can redistribute it and/or modify it | ||
6 | * under the terms of the GNU Affero General Public License as published | ||
7 | * by the Free Software Foundation, either version 3 of the License, | ||
8 | * or (at your option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * Affero General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU Affero General Public License | ||
16 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psycutil/psyc_message.c | ||
23 | * @brief PSYC utilities; receiving/transmitting/logging PSYC messages. | ||
24 | * @author Gabor X Toth | ||
25 | */ | ||
26 | |||
27 | #include <inttypes.h> | ||
28 | |||
29 | #include "platform.h" | ||
30 | #include "gnunet_util_lib.h" | ||
31 | #include "gnunet_psyc_util_lib.h" | ||
32 | #include "gnunet_psyc_service.h" | ||
33 | |||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__) | ||
35 | |||
36 | |||
37 | struct GNUNET_PSYC_TransmitHandle | ||
38 | { | ||
39 | /** | ||
40 | * Client connection to service. | ||
41 | */ | ||
42 | struct GNUNET_MQ_Handle *mq; | ||
43 | |||
44 | /** | ||
45 | * Message currently being received from the client. | ||
46 | */ | ||
47 | struct GNUNET_MessageHeader *msg; | ||
48 | |||
49 | /** | ||
50 | * Envelope for @a msg | ||
51 | */ | ||
52 | struct GNUNET_MQ_Envelope *env; | ||
53 | |||
54 | /** | ||
55 | * Callback to request next modifier from client. | ||
56 | */ | ||
57 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | ||
58 | |||
59 | /** | ||
60 | * Closure for the notify callbacks. | ||
61 | */ | ||
62 | void *notify_mod_cls; | ||
63 | |||
64 | /** | ||
65 | * Callback to request next data fragment from client. | ||
66 | */ | ||
67 | GNUNET_PSYC_TransmitNotifyData notify_data; | ||
68 | |||
69 | /** | ||
70 | * Closure for the notify callbacks. | ||
71 | */ | ||
72 | void *notify_data_cls; | ||
73 | |||
74 | /** | ||
75 | * Modifier of the environment that is currently being transmitted. | ||
76 | */ | ||
77 | struct GNUNET_PSYC_Modifier *mod; | ||
78 | |||
79 | /** | ||
80 | * | ||
81 | */ | ||
82 | const char *mod_value; | ||
83 | |||
84 | /** | ||
85 | * Number of bytes remaining to be transmitted from the current modifier value. | ||
86 | */ | ||
87 | uint32_t mod_value_remaining; | ||
88 | |||
89 | /** | ||
90 | * State of the current message being received from client. | ||
91 | */ | ||
92 | enum GNUNET_PSYC_MessageState state; | ||
93 | |||
94 | /** | ||
95 | * Number of PSYC_TRANSMIT_ACK messages we are still waiting for. | ||
96 | */ | ||
97 | uint8_t acks_pending; | ||
98 | |||
99 | /** | ||
100 | * Is transmission paused? | ||
101 | */ | ||
102 | uint8_t paused; | ||
103 | |||
104 | /** | ||
105 | * Are we currently transmitting a message? | ||
106 | */ | ||
107 | uint8_t in_transmit; | ||
108 | |||
109 | /** | ||
110 | * Notify callback is currently being called. | ||
111 | */ | ||
112 | uint8_t in_notify; | ||
113 | |||
114 | }; | ||
115 | |||
116 | |||
117 | |||
118 | struct GNUNET_PSYC_ReceiveHandle | ||
119 | { | ||
120 | /** | ||
121 | * Message callback. | ||
122 | */ | ||
123 | GNUNET_PSYC_MessageCallback message_cb; | ||
124 | |||
125 | /** | ||
126 | * Message part callback. | ||
127 | */ | ||
128 | GNUNET_PSYC_MessagePartCallback message_part_cb; | ||
129 | |||
130 | /** | ||
131 | * Closure for the callbacks. | ||
132 | */ | ||
133 | void *cb_cls; | ||
134 | |||
135 | /** | ||
136 | * ID of the message being received from the PSYC service. | ||
137 | */ | ||
138 | uint64_t message_id; | ||
139 | |||
140 | /** | ||
141 | * Public key of the slave from which a message is being received. | ||
142 | */ | ||
143 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; | ||
144 | |||
145 | /** | ||
146 | * State of the currently being received message from the PSYC service. | ||
147 | */ | ||
148 | enum GNUNET_PSYC_MessageState state; | ||
149 | |||
150 | /** | ||
151 | * Flags for the currently being received message from the PSYC service. | ||
152 | */ | ||
153 | enum GNUNET_PSYC_MessageFlags flags; | ||
154 | |||
155 | /** | ||
156 | * Expected value size for the modifier being received from the PSYC service. | ||
157 | */ | ||
158 | uint32_t mod_value_size_expected; | ||
159 | |||
160 | /** | ||
161 | * Actual value size for the modifier being received from the PSYC service. | ||
162 | */ | ||
163 | uint32_t mod_value_size; | ||
164 | }; | ||
165 | |||
166 | |||
167 | /**** Messages ****/ | ||
168 | |||
169 | |||
170 | /** | ||
171 | * Create a PSYC message. | ||
172 | * | ||
173 | * @param method_name | ||
174 | * PSYC method for the message. | ||
175 | * @param env | ||
176 | * Environment for the message. | ||
177 | * @param data | ||
178 | * Data payload for the message. | ||
179 | * @param data_size | ||
180 | * Size of @a data. | ||
181 | * | ||
182 | * @return Message header with size information, | ||
183 | * followed by the message parts. | ||
184 | */ | ||
185 | struct GNUNET_PSYC_Message * | ||
186 | GNUNET_PSYC_message_create (const char *method_name, | ||
187 | const struct GNUNET_PSYC_Environment *env, | ||
188 | const void *data, | ||
189 | size_t data_size) | ||
190 | { | ||
191 | struct GNUNET_PSYC_Modifier *mod = NULL; | ||
192 | struct GNUNET_PSYC_MessageMethod *pmeth = NULL; | ||
193 | struct GNUNET_PSYC_MessageModifier *pmod = NULL; | ||
194 | struct GNUNET_MessageHeader *pmsg = NULL; | ||
195 | uint16_t env_size = 0; | ||
196 | if (NULL != env) | ||
197 | { | ||
198 | mod = GNUNET_PSYC_env_head (env); | ||
199 | while (NULL != mod) | ||
200 | { | ||
201 | env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size; | ||
202 | mod = mod->next; | ||
203 | } | ||
204 | } | ||
205 | |||
206 | struct GNUNET_PSYC_Message *msg; | ||
207 | uint16_t method_name_size = strlen (method_name) + 1; | ||
208 | if (method_name_size == 1) | ||
209 | return NULL; | ||
210 | |||
211 | uint16_t msg_size = sizeof (*msg) /* header */ | ||
212 | + sizeof (*pmeth) + method_name_size /* method */ | ||
213 | + env_size /* modifiers */ | ||
214 | + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */ | ||
215 | + sizeof (*pmsg); /* end of message */ | ||
216 | msg = GNUNET_malloc (msg_size); | ||
217 | msg->header.size = htons (msg_size); | ||
218 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */ | ||
219 | |||
220 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; | ||
221 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
222 | pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); | ||
223 | GNUNET_memcpy (&pmeth[1], method_name, method_name_size); | ||
224 | |||
225 | uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; | ||
226 | if (NULL != env) | ||
227 | { | ||
228 | mod = GNUNET_PSYC_env_head (env); | ||
229 | while (NULL != mod) | ||
230 | { | ||
231 | uint16_t mod_name_size = strlen (mod->name) + 1; | ||
232 | pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p); | ||
233 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
234 | pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size; | ||
235 | p += pmod->header.size; | ||
236 | pmod->header.size = htons (pmod->header.size); | ||
237 | |||
238 | pmod->oper = mod->oper; | ||
239 | pmod->name_size = htons (mod_name_size); | ||
240 | pmod->value_size = htonl (mod->value_size); | ||
241 | |||
242 | GNUNET_memcpy (&pmod[1], mod->name, mod_name_size); | ||
243 | if (0 < mod->value_size) | ||
244 | GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size); | ||
245 | |||
246 | mod = mod->next; | ||
247 | } | ||
248 | } | ||
249 | |||
250 | if (0 < data_size) | ||
251 | { | ||
252 | pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p); | ||
253 | pmsg->size = sizeof (*pmsg) + data_size; | ||
254 | p += pmsg->size; | ||
255 | pmsg->size = htons (pmsg->size); | ||
256 | pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
257 | GNUNET_memcpy (&pmsg[1], data, data_size); | ||
258 | } | ||
259 | |||
260 | pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p); | ||
261 | pmsg->size = htons (sizeof (*pmsg)); | ||
262 | pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
263 | |||
264 | GNUNET_assert (p + sizeof (*pmsg) == msg_size); | ||
265 | return msg; | ||
266 | } | ||
267 | |||
268 | |||
269 | void | ||
270 | GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | ||
271 | const struct GNUNET_MessageHeader *msg) | ||
272 | { | ||
273 | uint16_t size = ntohs (msg->size); | ||
274 | uint16_t type = ntohs (msg->type); | ||
275 | |||
276 | GNUNET_log (kind, | ||
277 | "Message of type %d and size %u:\n", | ||
278 | type, | ||
279 | size); | ||
280 | switch (type) | ||
281 | { | ||
282 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
283 | { | ||
284 | const struct GNUNET_PSYC_MessageHeader *pmsg | ||
285 | = (const struct GNUNET_PSYC_MessageHeader *) msg; | ||
286 | GNUNET_log (kind, | ||
287 | "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n", | ||
288 | GNUNET_ntohll (pmsg->message_id), | ||
289 | ntohl (pmsg->flags)); | ||
290 | break; | ||
291 | } | ||
292 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
293 | { | ||
294 | const struct GNUNET_PSYC_MessageMethod *meth | ||
295 | = (const struct GNUNET_PSYC_MessageMethod *) msg; | ||
296 | GNUNET_log (kind, | ||
297 | "\t%.*s\n", | ||
298 | (int) (size - sizeof (*meth)), | ||
299 | (const char *) &meth[1]); | ||
300 | break; | ||
301 | } | ||
302 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
303 | { | ||
304 | const struct GNUNET_PSYC_MessageModifier *mod | ||
305 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | ||
306 | uint16_t name_size = ntohs (mod->name_size); | ||
307 | char oper = ' ' < mod->oper ? mod->oper : ' '; | ||
308 | GNUNET_log (kind, | ||
309 | "\t%c%.*s\t%.*s\n", | ||
310 | oper, | ||
311 | (int) name_size, | ||
312 | (const char *) &mod[1], | ||
313 | (int) (size - sizeof (*mod) - name_size), | ||
314 | ((const char *) &mod[1]) + name_size); | ||
315 | break; | ||
316 | } | ||
317 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
318 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
319 | GNUNET_log (kind, | ||
320 | "\t%.*s\n", | ||
321 | (int) (size - sizeof (*msg)), | ||
322 | (const char *) &msg[1]); | ||
323 | break; | ||
324 | } | ||
325 | } | ||
326 | |||
327 | |||
328 | /**** Transmitting messages ****/ | ||
329 | |||
330 | |||
331 | /** | ||
332 | * Create a transmission handle. | ||
333 | */ | ||
334 | struct GNUNET_PSYC_TransmitHandle * | ||
335 | GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) | ||
336 | { | ||
337 | struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); | ||
338 | |||
339 | tmit->mq = mq; | ||
340 | return tmit; | ||
341 | } | ||
342 | |||
343 | |||
344 | /** | ||
345 | * Destroy a transmission handle. | ||
346 | */ | ||
347 | void | ||
348 | GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
349 | { | ||
350 | GNUNET_free (tmit); | ||
351 | } | ||
352 | |||
353 | |||
354 | /** | ||
355 | * Queue a message part for transmission. | ||
356 | * | ||
357 | * The message part is added to the current message buffer. | ||
358 | * When this buffer is full, it is added to the transmission queue. | ||
359 | * | ||
360 | * @param tmit | ||
361 | * Transmission handle. | ||
362 | * @param msg | ||
363 | * Message part, or NULL. | ||
364 | * @param tmit_now | ||
365 | * Transmit message now, or wait for buffer to fill up? | ||
366 | * #GNUNET_YES or #GNUNET_NO. | ||
367 | */ | ||
368 | static void | ||
369 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | ||
370 | const struct GNUNET_MessageHeader *msg, | ||
371 | uint8_t tmit_now) | ||
372 | { | ||
373 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; | ||
374 | |||
375 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
376 | "Queueing message part of type %u and size %u (tmit_now: %u)).\n", | ||
377 | NULL != msg ? ntohs (msg->type) : 0, size, tmit_now); | ||
378 | |||
379 | if (NULL != tmit->msg) | ||
380 | { | ||
381 | if (NULL == msg | ||
382 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size) | ||
383 | { | ||
384 | /* End of message or buffer is full, add it to transmission queue | ||
385 | * and start with empty buffer */ | ||
386 | tmit->msg->size = htons (tmit->msg->size); | ||
387 | GNUNET_MQ_send (tmit->mq, tmit->env); | ||
388 | tmit->env = NULL; | ||
389 | tmit->msg = NULL; | ||
390 | tmit->acks_pending++; | ||
391 | } | ||
392 | else | ||
393 | { | ||
394 | /* Message fits in current buffer, append */ | ||
395 | GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); | ||
396 | tmit->msg->size += size; | ||
397 | } | ||
398 | } | ||
399 | |||
400 | if (NULL == tmit->msg && NULL != msg) | ||
401 | { | ||
402 | /* Empty buffer, copy over message. */ | ||
403 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, | ||
404 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
405 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
406 | /* store current message size in host byte order | ||
407 | * then later switch it to network byte order before sending */ | ||
408 | tmit->msg->size = sizeof (*tmit->msg) + size; | ||
409 | |||
410 | GNUNET_memcpy (&tmit->msg[1], msg, size); | ||
411 | } | ||
412 | |||
413 | if (NULL != tmit->msg | ||
414 | && (GNUNET_YES == tmit_now | ||
415 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | ||
416 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
417 | { | ||
418 | /* End of message or buffer is full, add it to transmission queue. */ | ||
419 | tmit->msg->size = htons (tmit->msg->size); | ||
420 | GNUNET_MQ_send (tmit->mq, tmit->env); | ||
421 | tmit->env = NULL; | ||
422 | tmit->msg = NULL; | ||
423 | tmit->acks_pending++; | ||
424 | } | ||
425 | } | ||
426 | |||
427 | |||
428 | /** | ||
429 | * Request data from client to transmit. | ||
430 | * | ||
431 | * @param tmit Transmission handle. | ||
432 | */ | ||
433 | static void | ||
434 | transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
435 | { | ||
436 | int notify_ret = GNUNET_YES; | ||
437 | uint16_t data_size = 0; | ||
438 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | ||
439 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | ||
440 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
441 | |||
442 | if (NULL != tmit->notify_data) | ||
443 | { | ||
444 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
445 | tmit->in_notify = GNUNET_YES; | ||
446 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | ||
447 | tmit->in_notify = GNUNET_NO; | ||
448 | } | ||
449 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
450 | "transmit_data (ret: %d, size: %u): %.*s\n", | ||
451 | notify_ret, data_size, data_size, &msg[1]); | ||
452 | switch (notify_ret) | ||
453 | { | ||
454 | case GNUNET_NO: | ||
455 | if (0 == data_size) | ||
456 | { | ||
457 | /* Transmission paused, nothing to send. */ | ||
458 | tmit->paused = GNUNET_YES; | ||
459 | return; | ||
460 | } | ||
461 | break; | ||
462 | |||
463 | case GNUNET_YES: | ||
464 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_END; | ||
465 | break; | ||
466 | |||
467 | default: | ||
468 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
469 | "TransmitNotifyData callback returned error when requesting data.\n"); | ||
470 | |||
471 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | ||
472 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
473 | msg->size = htons (sizeof (*msg)); | ||
474 | transmit_queue_insert (tmit, msg, GNUNET_YES); | ||
475 | tmit->in_transmit = GNUNET_NO; | ||
476 | return; | ||
477 | } | ||
478 | |||
479 | if (0 < data_size) | ||
480 | { | ||
481 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
482 | msg->size = htons (sizeof (*msg) + data_size); | ||
483 | transmit_queue_insert (tmit, msg, !notify_ret); | ||
484 | } | ||
485 | |||
486 | /* End of message. */ | ||
487 | if (GNUNET_YES == notify_ret) | ||
488 | { | ||
489 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
490 | msg->size = htons (sizeof (*msg)); | ||
491 | transmit_queue_insert (tmit, msg, GNUNET_YES); | ||
492 | /* FIXME: wait for ACK before setting in_transmit to no */ | ||
493 | tmit->in_transmit = GNUNET_NO; | ||
494 | } | ||
495 | } | ||
496 | |||
497 | |||
498 | /** | ||
499 | * Request a modifier from a client to transmit. | ||
500 | * | ||
501 | * @param tmit Transmission handle. | ||
502 | */ | ||
503 | static void | ||
504 | transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
505 | { | ||
506 | uint16_t max_data_size = 0; | ||
507 | uint16_t data_size = 0; | ||
508 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | ||
509 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | ||
510 | int notify_ret = GNUNET_YES; | ||
511 | |||
512 | switch (tmit->state) | ||
513 | { | ||
514 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: | ||
515 | { | ||
516 | struct GNUNET_PSYC_MessageModifier *mod | ||
517 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
518 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
519 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
520 | |||
521 | if (NULL != tmit->notify_mod) | ||
522 | { | ||
523 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | ||
524 | data_size = max_data_size; | ||
525 | tmit->in_notify = GNUNET_YES; | ||
526 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | ||
527 | &mod->oper, &mod->value_size); | ||
528 | tmit->in_notify = GNUNET_NO; | ||
529 | } | ||
530 | |||
531 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; | ||
532 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
533 | "transmit_mod (ret: %d, size: %u + %u): %.*s\n", | ||
534 | notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]); | ||
535 | if (mod->name_size < data_size) | ||
536 | { | ||
537 | tmit->mod_value_remaining | ||
538 | = mod->value_size - (data_size - mod->name_size); | ||
539 | mod->value_size = htonl (mod->value_size); | ||
540 | mod->name_size = htons (mod->name_size); | ||
541 | } | ||
542 | else if (0 < data_size) | ||
543 | { | ||
544 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); | ||
545 | notify_ret = GNUNET_SYSERR; | ||
546 | } | ||
547 | break; | ||
548 | } | ||
549 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: | ||
550 | { | ||
551 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
552 | msg->size = sizeof (struct GNUNET_MessageHeader); | ||
553 | |||
554 | if (NULL != tmit->notify_mod) | ||
555 | { | ||
556 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
557 | data_size = max_data_size; | ||
558 | tmit->in_notify = GNUNET_YES; | ||
559 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | ||
560 | &data_size, &msg[1], NULL, NULL); | ||
561 | tmit->in_notify = GNUNET_NO; | ||
562 | } | ||
563 | tmit->mod_value_remaining -= data_size; | ||
564 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
565 | "transmit_mod (ret: %d, size: %u): %.*s\n", | ||
566 | notify_ret, data_size, data_size, &msg[1]); | ||
567 | break; | ||
568 | } | ||
569 | default: | ||
570 | GNUNET_assert (0); | ||
571 | } | ||
572 | |||
573 | switch (notify_ret) | ||
574 | { | ||
575 | case GNUNET_NO: | ||
576 | if (0 == data_size) | ||
577 | { /* Transmission paused, nothing to send. */ | ||
578 | tmit->paused = GNUNET_YES; | ||
579 | return; | ||
580 | } | ||
581 | tmit->state | ||
582 | = (0 == tmit->mod_value_remaining) | ||
583 | ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER | ||
584 | : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; | ||
585 | break; | ||
586 | |||
587 | case GNUNET_YES: /* End of modifiers. */ | ||
588 | GNUNET_assert (0 == tmit->mod_value_remaining); | ||
589 | break; | ||
590 | |||
591 | default: | ||
592 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
593 | "TransmitNotifyModifier callback returned with error.\n"); | ||
594 | |||
595 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | ||
596 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
597 | msg->size = htons (sizeof (*msg)); | ||
598 | transmit_queue_insert (tmit, msg, GNUNET_YES); | ||
599 | tmit->in_transmit = GNUNET_NO; | ||
600 | return; | ||
601 | } | ||
602 | |||
603 | if (0 < data_size) | ||
604 | { | ||
605 | GNUNET_assert (data_size <= max_data_size); | ||
606 | msg->size = htons (msg->size + data_size); | ||
607 | transmit_queue_insert (tmit, msg, GNUNET_NO); | ||
608 | } | ||
609 | |||
610 | if (GNUNET_YES == notify_ret) | ||
611 | { | ||
612 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
613 | if (0 == tmit->acks_pending) | ||
614 | transmit_data (tmit); | ||
615 | } | ||
616 | else | ||
617 | { | ||
618 | transmit_mod (tmit); | ||
619 | } | ||
620 | } | ||
621 | |||
622 | |||
623 | int | ||
624 | transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | ||
625 | uint32_t *full_value_size) | ||
626 | |||
627 | { | ||
628 | struct GNUNET_PSYC_TransmitHandle *tmit = cls; | ||
629 | uint16_t name_size = 0; | ||
630 | uint32_t value_size = 0; | ||
631 | const char *value = NULL; | ||
632 | |||
633 | if (NULL != oper) | ||
634 | { /* New modifier */ | ||
635 | if (NULL != tmit->mod) | ||
636 | tmit->mod = tmit->mod->next; | ||
637 | if (NULL == tmit->mod) | ||
638 | { /* No more modifiers, continue with data */ | ||
639 | *data_size = 0; | ||
640 | return GNUNET_YES; | ||
641 | } | ||
642 | |||
643 | GNUNET_assert (tmit->mod->value_size < UINT32_MAX); | ||
644 | *full_value_size = tmit->mod->value_size; | ||
645 | *oper = tmit->mod->oper; | ||
646 | name_size = strlen (tmit->mod->name) + 1; | ||
647 | |||
648 | if (name_size + tmit->mod->value_size <= *data_size) | ||
649 | { | ||
650 | value_size = tmit->mod->value_size; | ||
651 | *data_size = name_size + value_size; | ||
652 | } | ||
653 | else /* full modifier does not fit in data, continuation needed */ | ||
654 | { | ||
655 | value_size = *data_size - name_size; | ||
656 | tmit->mod_value = tmit->mod->value + value_size; | ||
657 | } | ||
658 | |||
659 | GNUNET_memcpy (data, tmit->mod->name, name_size); | ||
660 | GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size); | ||
661 | return GNUNET_NO; | ||
662 | } | ||
663 | else | ||
664 | { /* Modifier continuation */ | ||
665 | GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining); | ||
666 | value = tmit->mod_value; | ||
667 | if (tmit->mod_value_remaining <= *data_size) | ||
668 | { | ||
669 | value_size = tmit->mod_value_remaining; | ||
670 | tmit->mod_value = NULL; | ||
671 | } | ||
672 | else | ||
673 | { | ||
674 | value_size = *data_size; | ||
675 | tmit->mod_value += value_size; | ||
676 | } | ||
677 | |||
678 | if (*data_size < value_size) | ||
679 | { | ||
680 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
681 | "Value in environment larger than buffer: %u < %zu\n", | ||
682 | *data_size, value_size); | ||
683 | *data_size = 0; | ||
684 | return GNUNET_NO; | ||
685 | } | ||
686 | |||
687 | *data_size = value_size; | ||
688 | GNUNET_memcpy (data, value, value_size); | ||
689 | return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO; | ||
690 | } | ||
691 | } | ||
692 | |||
693 | |||
694 | /** | ||
695 | * Transmit a message. | ||
696 | * | ||
697 | * @param tmit | ||
698 | * Transmission handle. | ||
699 | * @param method_name | ||
700 | * Which method should be invoked. | ||
701 | * @param env | ||
702 | * Environment for the message. | ||
703 | * Should stay available until the first call to notify_data. | ||
704 | * Can be NULL if there are no modifiers or @a notify_mod is | ||
705 | * provided instead. | ||
706 | * @param notify_mod | ||
707 | * Function to call to obtain modifiers. | ||
708 | * Can be NULL if there are no modifiers or @a env is provided instead. | ||
709 | * @param notify_data | ||
710 | * Function to call to obtain fragments of the data. | ||
711 | * @param notify_cls | ||
712 | * Closure for @a notify_mod and @a notify_data. | ||
713 | * @param flags | ||
714 | * Flags for the message being transmitted. | ||
715 | * | ||
716 | * @return #GNUNET_OK if the transmission was started. | ||
717 | * #GNUNET_SYSERR if another transmission is already going on. | ||
718 | */ | ||
719 | int | ||
720 | GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | ||
721 | const char *method_name, | ||
722 | const struct GNUNET_PSYC_Environment *env, | ||
723 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | ||
724 | GNUNET_PSYC_TransmitNotifyData notify_data, | ||
725 | void *notify_cls, | ||
726 | uint32_t flags) | ||
727 | { | ||
728 | if (GNUNET_NO != tmit->in_transmit) | ||
729 | return GNUNET_SYSERR; | ||
730 | tmit->in_transmit = GNUNET_YES; | ||
731 | |||
732 | size_t size = strlen (method_name) + 1; | ||
733 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
734 | |||
735 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, | ||
736 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
737 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
738 | /* store current message size in host byte order | ||
739 | * then later switch it to network byte order before sending */ | ||
740 | tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; | ||
741 | |||
742 | if (NULL != notify_mod) | ||
743 | { | ||
744 | tmit->notify_mod = notify_mod; | ||
745 | tmit->notify_mod_cls = notify_cls; | ||
746 | } | ||
747 | else | ||
748 | { | ||
749 | tmit->notify_mod = &transmit_notify_env; | ||
750 | tmit->notify_mod_cls = tmit; | ||
751 | if (NULL != env) | ||
752 | { | ||
753 | struct GNUNET_PSYC_Modifier mod = {}; | ||
754 | mod.next = GNUNET_PSYC_env_head (env); | ||
755 | tmit->mod = &mod; | ||
756 | |||
757 | struct GNUNET_PSYC_Modifier *m = tmit->mod; | ||
758 | while (NULL != (m = m->next)) | ||
759 | { | ||
760 | if (m->oper != GNUNET_PSYC_OP_SET) | ||
761 | flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY; | ||
762 | } | ||
763 | } | ||
764 | else | ||
765 | { | ||
766 | tmit->mod = NULL; | ||
767 | } | ||
768 | } | ||
769 | |||
770 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1]; | ||
771 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
772 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
773 | pmeth->flags = htonl (flags); | ||
774 | GNUNET_memcpy (&pmeth[1], method_name, size); | ||
775 | |||
776 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
777 | tmit->notify_data = notify_data; | ||
778 | tmit->notify_data_cls = notify_cls; | ||
779 | |||
780 | transmit_mod (tmit); | ||
781 | return GNUNET_OK; | ||
782 | } | ||
783 | |||
784 | |||
785 | /** | ||
786 | * Resume transmission. | ||
787 | * | ||
788 | * @param tmit Transmission handle. | ||
789 | */ | ||
790 | void | ||
791 | GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
792 | { | ||
793 | if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify) | ||
794 | return; | ||
795 | |||
796 | if (0 == tmit->acks_pending) | ||
797 | { | ||
798 | tmit->paused = GNUNET_NO; | ||
799 | transmit_data (tmit); | ||
800 | } | ||
801 | } | ||
802 | |||
803 | |||
804 | /** | ||
805 | * Abort transmission request. | ||
806 | * | ||
807 | * @param tmit Transmission handle. | ||
808 | */ | ||
809 | void | ||
810 | GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
811 | { | ||
812 | if (GNUNET_NO == tmit->in_transmit) | ||
813 | return; | ||
814 | |||
815 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | ||
816 | tmit->in_transmit = GNUNET_NO; | ||
817 | tmit->paused = GNUNET_NO; | ||
818 | |||
819 | /* FIXME */ | ||
820 | struct GNUNET_MessageHeader msg; | ||
821 | msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
822 | msg.size = htons (sizeof (msg)); | ||
823 | transmit_queue_insert (tmit, &msg, GNUNET_YES); | ||
824 | } | ||
825 | |||
826 | |||
827 | /** | ||
828 | * Got acknowledgement of a transmitted message part, continue transmission. | ||
829 | * | ||
830 | * @param tmit Transmission handle. | ||
831 | */ | ||
832 | void | ||
833 | GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) | ||
834 | { | ||
835 | if (0 == tmit->acks_pending) | ||
836 | { | ||
837 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
838 | GNUNET_break (0); | ||
839 | return; | ||
840 | } | ||
841 | tmit->acks_pending--; | ||
842 | |||
843 | if (GNUNET_YES == tmit->paused) | ||
844 | return; | ||
845 | |||
846 | switch (tmit->state) | ||
847 | { | ||
848 | case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: | ||
849 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: | ||
850 | transmit_mod (tmit); | ||
851 | break; | ||
852 | |||
853 | case GNUNET_PSYC_MESSAGE_STATE_DATA: | ||
854 | transmit_data (tmit); | ||
855 | break; | ||
856 | |||
857 | case GNUNET_PSYC_MESSAGE_STATE_END: | ||
858 | case GNUNET_PSYC_MESSAGE_STATE_CANCEL: | ||
859 | break; | ||
860 | |||
861 | default: | ||
862 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
863 | "Ignoring message ACK in state %u.\n", tmit->state); | ||
864 | } | ||
865 | } | ||
866 | |||
867 | |||
868 | /**** Receiving messages ****/ | ||
869 | |||
870 | |||
871 | /** | ||
872 | * Create handle for receiving messages. | ||
873 | */ | ||
874 | struct GNUNET_PSYC_ReceiveHandle * | ||
875 | GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb, | ||
876 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
877 | void *cb_cls) | ||
878 | { | ||
879 | struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv)); | ||
880 | recv->message_cb = message_cb; | ||
881 | recv->message_part_cb = message_part_cb; | ||
882 | recv->cb_cls = cb_cls; | ||
883 | return recv; | ||
884 | } | ||
885 | |||
886 | |||
887 | /** | ||
888 | * Destroy handle for receiving messages. | ||
889 | */ | ||
890 | void | ||
891 | GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv) | ||
892 | { | ||
893 | GNUNET_free (recv); | ||
894 | } | ||
895 | |||
896 | |||
897 | /** | ||
898 | * Reset stored data related to the last received message. | ||
899 | */ | ||
900 | void | ||
901 | GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv) | ||
902 | { | ||
903 | recv->state = GNUNET_PSYC_MESSAGE_STATE_START; | ||
904 | recv->flags = 0; | ||
905 | recv->message_id = 0; | ||
906 | recv->mod_value_size = 0; | ||
907 | recv->mod_value_size_expected = 0; | ||
908 | } | ||
909 | |||
910 | |||
911 | static void | ||
912 | recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) | ||
913 | { | ||
914 | if (NULL != recv->message_part_cb) | ||
915 | recv->message_part_cb (recv->cb_cls, NULL, NULL); | ||
916 | |||
917 | if (NULL != recv->message_cb) | ||
918 | recv->message_cb (recv->cb_cls, NULL); | ||
919 | |||
920 | GNUNET_PSYC_receive_reset (recv); | ||
921 | } | ||
922 | |||
923 | |||
924 | /** | ||
925 | * Handle incoming PSYC message. | ||
926 | * | ||
927 | * @param recv Receive handle. | ||
928 | * @param msg The message. | ||
929 | * | ||
930 | * @return #GNUNET_OK on success, | ||
931 | * #GNUNET_SYSERR on receive error. | ||
932 | */ | ||
933 | int | ||
934 | GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | ||
935 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
936 | { | ||
937 | uint16_t size = ntohs (msg->header.size); | ||
938 | uint32_t flags = ntohl (msg->flags); | ||
939 | |||
940 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | ||
941 | (struct GNUNET_MessageHeader *) msg); | ||
942 | |||
943 | if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state) | ||
944 | { | ||
945 | recv->message_id = GNUNET_ntohll (msg->message_id); | ||
946 | recv->flags = flags; | ||
947 | recv->slave_pub_key = msg->slave_pub_key; | ||
948 | recv->mod_value_size = 0; | ||
949 | recv->mod_value_size_expected = 0; | ||
950 | } | ||
951 | else if (GNUNET_ntohll (msg->message_id) != recv->message_id) | ||
952 | { | ||
953 | // FIXME | ||
954 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
955 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | ||
956 | GNUNET_ntohll (msg->message_id), recv->message_id); | ||
957 | GNUNET_break_op (0); | ||
958 | recv_error (recv); | ||
959 | return GNUNET_SYSERR; | ||
960 | } | ||
961 | else if (flags != recv->flags) | ||
962 | { | ||
963 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
964 | "Unexpected message flags. Got: %lu, expected: %lu\n", | ||
965 | flags, recv->flags); | ||
966 | GNUNET_break_op (0); | ||
967 | recv_error (recv); | ||
968 | return GNUNET_SYSERR; | ||
969 | } | ||
970 | |||
971 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; | ||
972 | |||
973 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
974 | { | ||
975 | const struct GNUNET_MessageHeader *pmsg | ||
976 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
977 | psize = ntohs (pmsg->size); | ||
978 | ptype = ntohs (pmsg->type); | ||
979 | size_eq = size_min = 0; | ||
980 | |||
981 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
982 | { | ||
983 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
984 | "Dropping message of type %u with invalid size %u.\n", | ||
985 | ptype, psize); | ||
986 | recv_error (recv); | ||
987 | return GNUNET_SYSERR; | ||
988 | } | ||
989 | |||
990 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
991 | "Received message part of type %u and size %u from PSYC.\n", | ||
992 | ptype, psize); | ||
993 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
994 | |||
995 | switch (ptype) | ||
996 | { | ||
997 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
998 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
999 | break; | ||
1000 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
1001 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
1002 | break; | ||
1003 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
1004 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
1005 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
1006 | break; | ||
1007 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
1008 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
1009 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
1010 | break; | ||
1011 | default: | ||
1012 | GNUNET_break_op (0); | ||
1013 | recv_error (recv); | ||
1014 | return GNUNET_SYSERR; | ||
1015 | } | ||
1016 | |||
1017 | if (! ((0 < size_eq && psize == size_eq) | ||
1018 | || (0 < size_min && size_min <= psize))) | ||
1019 | { | ||
1020 | GNUNET_break_op (0); | ||
1021 | recv_error (recv); | ||
1022 | return GNUNET_SYSERR; | ||
1023 | } | ||
1024 | |||
1025 | switch (ptype) | ||
1026 | { | ||
1027 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
1028 | { | ||
1029 | struct GNUNET_PSYC_MessageMethod *meth | ||
1030 | = (struct GNUNET_PSYC_MessageMethod *) pmsg; | ||
1031 | |||
1032 | if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state) | ||
1033 | { | ||
1034 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1035 | "Dropping out of order message method (%u).\n", | ||
1036 | recv->state); | ||
1037 | /* It is normal to receive an incomplete message right after connecting, | ||
1038 | * but should not happen later. | ||
1039 | * FIXME: add a check for this condition. | ||
1040 | */ | ||
1041 | GNUNET_break_op (0); | ||
1042 | recv_error (recv); | ||
1043 | return GNUNET_SYSERR; | ||
1044 | } | ||
1045 | |||
1046 | if ('\0' != *((char *) meth + psize - 1)) | ||
1047 | { | ||
1048 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1049 | "Dropping message with malformed method. " | ||
1050 | "Message ID: %" PRIu64 "\n", recv->message_id); | ||
1051 | GNUNET_break_op (0); | ||
1052 | recv_error (recv); | ||
1053 | return GNUNET_SYSERR; | ||
1054 | } | ||
1055 | recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD; | ||
1056 | break; | ||
1057 | } | ||
1058 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
1059 | { | ||
1060 | if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state | ||
1061 | || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state | ||
1062 | || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)) | ||
1063 | { | ||
1064 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1065 | "Dropping out of order message modifier (%u).\n", | ||
1066 | recv->state); | ||
1067 | GNUNET_break_op (0); | ||
1068 | recv_error (recv); | ||
1069 | return GNUNET_SYSERR; | ||
1070 | } | ||
1071 | |||
1072 | struct GNUNET_PSYC_MessageModifier *mod | ||
1073 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; | ||
1074 | |||
1075 | uint16_t name_size = ntohs (mod->name_size); | ||
1076 | recv->mod_value_size_expected = ntohl (mod->value_size); | ||
1077 | recv->mod_value_size = psize - sizeof (*mod) - name_size; | ||
1078 | |||
1079 | if (psize < sizeof (*mod) + name_size | ||
1080 | || '\0' != *((char *) &mod[1] + name_size - 1) | ||
1081 | || recv->mod_value_size_expected < recv->mod_value_size) | ||
1082 | { | ||
1083 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); | ||
1084 | GNUNET_break_op (0); | ||
1085 | recv_error (recv); | ||
1086 | return GNUNET_SYSERR; | ||
1087 | } | ||
1088 | recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
1089 | break; | ||
1090 | } | ||
1091 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
1092 | { | ||
1093 | recv->mod_value_size += psize - sizeof (*pmsg); | ||
1094 | |||
1095 | if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state | ||
1096 | || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state) | ||
1097 | || recv->mod_value_size_expected < recv->mod_value_size) | ||
1098 | { | ||
1099 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1100 | "Dropping out of order message modifier continuation " | ||
1101 | "!(%u == %u || %u == %u) || %lu < %lu.\n", | ||
1102 | GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state, | ||
1103 | GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state, | ||
1104 | recv->mod_value_size_expected, recv->mod_value_size); | ||
1105 | GNUNET_break_op (0); | ||
1106 | recv_error (recv); | ||
1107 | return GNUNET_SYSERR; | ||
1108 | } | ||
1109 | break; | ||
1110 | } | ||
1111 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
1112 | { | ||
1113 | if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD | ||
1114 | || recv->mod_value_size_expected != recv->mod_value_size) | ||
1115 | { | ||
1116 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1117 | "Dropping out of order message data fragment " | ||
1118 | "(%u < %u || %lu != %lu).\n", | ||
1119 | recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD, | ||
1120 | recv->mod_value_size_expected, recv->mod_value_size); | ||
1121 | |||
1122 | GNUNET_break_op (0); | ||
1123 | recv_error (recv); | ||
1124 | return GNUNET_SYSERR; | ||
1125 | } | ||
1126 | recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
1127 | break; | ||
1128 | } | ||
1129 | } | ||
1130 | |||
1131 | if (NULL != recv->message_part_cb) | ||
1132 | recv->message_part_cb (recv->cb_cls, msg, pmsg); | ||
1133 | |||
1134 | switch (ptype) | ||
1135 | { | ||
1136 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
1137 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
1138 | GNUNET_PSYC_receive_reset (recv); | ||
1139 | break; | ||
1140 | } | ||
1141 | } | ||
1142 | |||
1143 | if (NULL != recv->message_cb) | ||
1144 | recv->message_cb (recv->cb_cls, msg); | ||
1145 | return GNUNET_OK; | ||
1146 | } | ||
1147 | |||
1148 | |||
1149 | /** | ||
1150 | * Check if @a data contains a series of valid message parts. | ||
1151 | * | ||
1152 | * @param data_size Size of @a data. | ||
1153 | * @param data Data. | ||
1154 | * @param[out] first_ptype Type of first message part. | ||
1155 | * @param[out] last_ptype Type of last message part. | ||
1156 | * | ||
1157 | * @return Number of message parts found in @a data. | ||
1158 | * or GNUNET_SYSERR if the message contains invalid parts. | ||
1159 | */ | ||
1160 | int | ||
1161 | GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, | ||
1162 | uint16_t *first_ptype, uint16_t *last_ptype) | ||
1163 | { | ||
1164 | const struct GNUNET_MessageHeader *pmsg; | ||
1165 | uint16_t parts = 0, ptype = 0, psize = 0, pos = 0; | ||
1166 | if (NULL != first_ptype) | ||
1167 | *first_ptype = 0; | ||
1168 | if (NULL != last_ptype) | ||
1169 | *last_ptype = 0; | ||
1170 | |||
1171 | for (pos = 0; pos < data_size; pos += psize, parts++) | ||
1172 | { | ||
1173 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); | ||
1174 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
1175 | psize = ntohs (pmsg->size); | ||
1176 | ptype = ntohs (pmsg->type); | ||
1177 | if (0 == parts && NULL != first_ptype) | ||
1178 | *first_ptype = ptype; | ||
1179 | if (NULL != last_ptype | ||
1180 | && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
1181 | *last_ptype = ptype; | ||
1182 | if (psize < sizeof (*pmsg) | ||
1183 | || pos + psize > data_size | ||
1184 | || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD | ||
1185 | || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) | ||
1186 | { | ||
1187 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1188 | "Invalid message part of type %u and size %u.\n", | ||
1189 | ptype, psize); | ||
1190 | return GNUNET_SYSERR; | ||
1191 | } | ||
1192 | /** @todo FIXME: check message part order */ | ||
1193 | } | ||
1194 | return parts; | ||
1195 | } | ||
1196 | |||
1197 | |||
1198 | struct ParseMessageClosure | ||
1199 | { | ||
1200 | struct GNUNET_PSYC_Environment *env; | ||
1201 | const char **method_name; | ||
1202 | const void **data; | ||
1203 | uint16_t *data_size; | ||
1204 | enum GNUNET_PSYC_MessageState msg_state; | ||
1205 | }; | ||
1206 | |||
1207 | |||
1208 | static void | ||
1209 | parse_message_part_cb (void *cls, | ||
1210 | const struct GNUNET_PSYC_MessageHeader *msg, | ||
1211 | const struct GNUNET_MessageHeader *pmsg) | ||
1212 | { | ||
1213 | struct ParseMessageClosure *pmc = cls; | ||
1214 | if (NULL == pmsg) | ||
1215 | { | ||
1216 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
1217 | return; | ||
1218 | } | ||
1219 | |||
1220 | switch (ntohs (pmsg->type)) | ||
1221 | { | ||
1222 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
1223 | { | ||
1224 | struct GNUNET_PSYC_MessageMethod * | ||
1225 | pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg; | ||
1226 | *pmc->method_name = (const char *) &pmeth[1]; | ||
1227 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; | ||
1228 | break; | ||
1229 | } | ||
1230 | |||
1231 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
1232 | { | ||
1233 | struct GNUNET_PSYC_MessageModifier * | ||
1234 | pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg; | ||
1235 | |||
1236 | const char *name = (const char *) &pmod[1]; | ||
1237 | const void *value = name + ntohs (pmod->name_size); | ||
1238 | GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value, | ||
1239 | ntohl (pmod->value_size)); | ||
1240 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
1241 | break; | ||
1242 | } | ||
1243 | |||
1244 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
1245 | *pmc->data = &pmsg[1]; | ||
1246 | *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg); | ||
1247 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
1248 | break; | ||
1249 | |||
1250 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
1251 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; | ||
1252 | break; | ||
1253 | |||
1254 | default: | ||
1255 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
1256 | } | ||
1257 | } | ||
1258 | |||
1259 | |||
1260 | /** | ||
1261 | * Parse PSYC message. | ||
1262 | * | ||
1263 | * @param msg | ||
1264 | * The PSYC message to parse. | ||
1265 | * @param[out] method_name | ||
1266 | * Pointer to the method name inside @a pmsg. | ||
1267 | * @param env | ||
1268 | * The environment for the message with a list of modifiers. | ||
1269 | * @param[out] data | ||
1270 | * Pointer to data inside @a msg. | ||
1271 | * @param[out] data_size | ||
1272 | * Size of @data is written here. | ||
1273 | * | ||
1274 | * @return #GNUNET_OK on success, | ||
1275 | * #GNUNET_SYSERR on parse error. | ||
1276 | */ | ||
1277 | int | ||
1278 | GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg, | ||
1279 | const char **method_name, | ||
1280 | struct GNUNET_PSYC_Environment *env, | ||
1281 | const void **data, | ||
1282 | uint16_t *data_size) | ||
1283 | { | ||
1284 | struct ParseMessageClosure cls; | ||
1285 | cls.env = env; | ||
1286 | cls.method_name = method_name; | ||
1287 | cls.data = data; | ||
1288 | cls.data_size = data_size; | ||
1289 | |||
1290 | struct GNUNET_PSYC_ReceiveHandle * | ||
1291 | recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls); | ||
1292 | int ret = GNUNET_PSYC_receive_message (recv, msg); | ||
1293 | GNUNET_PSYC_receive_destroy (recv); | ||
1294 | |||
1295 | if (GNUNET_OK != ret) | ||
1296 | return GNUNET_SYSERR; | ||
1297 | |||
1298 | return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state) | ||
1299 | ? GNUNET_OK | ||
1300 | : GNUNET_NO; | ||
1301 | } | ||
1302 | |||
1303 | |||
1304 | /** | ||
1305 | * Initialize PSYC message header. | ||
1306 | */ | ||
1307 | void | ||
1308 | GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, | ||
1309 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1310 | uint32_t flags) | ||
1311 | { | ||
1312 | uint16_t size = ntohs (mmsg->header.size); | ||
1313 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1314 | |||
1315 | pmsg->header.size = htons (psize); | ||
1316 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1317 | pmsg->message_id = mmsg->message_id; | ||
1318 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
1319 | pmsg->flags = htonl (flags); | ||
1320 | |||
1321 | GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
1322 | } | ||
1323 | |||
1324 | |||
1325 | /** | ||
1326 | * Create a new PSYC message header from a multicast message. | ||
1327 | */ | ||
1328 | struct GNUNET_PSYC_MessageHeader * | ||
1329 | GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1330 | uint32_t flags) | ||
1331 | { | ||
1332 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
1333 | uint16_t size = ntohs (mmsg->header.size); | ||
1334 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1335 | |||
1336 | pmsg = GNUNET_malloc (psize); | ||
1337 | GNUNET_PSYC_message_header_init (pmsg, mmsg, flags); | ||
1338 | return pmsg; | ||
1339 | } | ||
1340 | |||
1341 | |||
1342 | /** | ||
1343 | * Create a new PSYC message header from a PSYC message. | ||
1344 | */ | ||
1345 | struct GNUNET_PSYC_MessageHeader * | ||
1346 | GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg) | ||
1347 | { | ||
1348 | uint16_t msg_size = ntohs (msg->header.size); | ||
1349 | struct GNUNET_PSYC_MessageHeader * | ||
1350 | pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg)); | ||
1351 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1352 | pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg)); | ||
1353 | GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); | ||
1354 | return pmsg; | ||
1355 | } | ||