aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-01-12 23:26:47 +0000
committerGabor X Toth <*@tg-x.net>2016-01-12 23:26:47 +0000
commit50eaf8d7de763d25b7dae7ffdee8d7c6b5fe71ea (patch)
treea8023bdb9c9446a45792d7100303265c78713a50 /src/psycstore
parent3cbdbe18dbd56def00c0014381ff90b4ee664904 (diff)
downloadgnunet-50eaf8d7de763d25b7dae7ffdee8d7c6b5fe71ea.tar.gz
gnunet-50eaf8d7de763d25b7dae7ffdee8d7c6b5fe71ea.zip
psycutil reorg: message, env, slicer
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/Makefile.am14
-rw-r--r--src/psycstore/gnunet-service-psycstore.c4
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c6
-rw-r--r--src/psycstore/psyc_util_lib.c1330
-rw-r--r--src/psycstore/psycstore_api.c2
-rw-r--r--src/psycstore/test_plugin_psycstore.c13
-rw-r--r--src/psycstore/test_psycstore.c6
7 files changed, 18 insertions, 1357 deletions
diff --git a/src/psycstore/Makefile.am b/src/psycstore/Makefile.am
index 639c53262..a527837de 100644
--- a/src/psycstore/Makefile.am
+++ b/src/psycstore/Makefile.am
@@ -27,17 +27,7 @@ SQLITE_TESTS = test_plugin_psycstore_sqlite
27endif 27endif
28endif 28endif
29 29
30lib_LTLIBRARIES = libgnunetpsycutil.la libgnunetpsycstore.la 30lib_LTLIBRARIES = libgnunetpsycstore.la
31
32libgnunetpsycutil_la_SOURCES = \
33 psyc_util_lib.c
34libgnunetpsycutil_la_LIBADD = \
35 $(top_builddir)/src/util/libgnunetutil.la \
36 $(top_builddir)/src/env/libgnunetenv.la \
37 $(GN_LIBINTL) $(XLIB)
38libgnunetpsycutil_la_LDFLAGS = \
39 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
40 -version-info 0:0:0
41 31
42libgnunetpsycstore_la_SOURCES = \ 32libgnunetpsycstore_la_SOURCES = \
43 psycstore_api.c \ 33 psycstore_api.c \
@@ -59,7 +49,7 @@ gnunet_service_psycstore_SOURCES = \
59gnunet_service_psycstore_LDADD = \ 49gnunet_service_psycstore_LDADD = \
60 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 50 $(top_builddir)/src/statistics/libgnunetstatistics.la \
61 $(top_builddir)/src/util/libgnunetutil.la \ 51 $(top_builddir)/src/util/libgnunetutil.la \
62 libgnunetpsycutil.la \ 52 $(top_builddir)/src/psycutil/libgnunetpsycutil.la \
63 $(GN_LIBINTL) 53 $(GN_LIBINTL)
64 54
65plugin_LTLIBRARIES = \ 55plugin_LTLIBRARIES = \
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 53e593455..2bcc87b24 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -551,7 +551,7 @@ recv_state_message_part (void *cls,
551 const char *name = (const char *) &pmod[1]; 551 const char *name = (const char *) &pmod[1];
552 const void *value = name + name_size; 552 const void *value = name + name_size;
553 553
554 if (GNUNET_ENV_OP_SET != pmod->oper) 554 if (GNUNET_PSYC_OP_SET != pmod->oper)
555 { // Apply non-transient operation. 555 { // Apply non-transient operation.
556 if (psize == sizeof (*pmod) + name_size + value_size) 556 if (psize == sizeof (*pmod) + name_size + value_size)
557 { 557 {
@@ -576,7 +576,7 @@ recv_state_message_part (void *cls,
576 } 576 }
577 577
578 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 578 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
579 if (GNUNET_ENV_OP_SET != scls->mod_oper) 579 if (GNUNET_PSYC_OP_SET != scls->mod_oper)
580 { 580 {
581 if (scls->mod_value_remaining == 0) 581 if (scls->mod_value_remaining == 0)
582 { 582 {
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 4ad3b1517..cc460d27f 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -35,7 +35,7 @@
35#include "gnunet_psycstore_service.h" 35#include "gnunet_psycstore_service.h"
36#include "gnunet_multicast_service.h" 36#include "gnunet_multicast_service.h"
37#include "gnunet_crypto_lib.h" 37#include "gnunet_crypto_lib.h"
38#include "gnunet_env_lib.h" 38#include "gnunet_psyc_util_lib.h"
39#include "psycstore.h" 39#include "psycstore.h"
40#include <sqlite3.h> 40#include <sqlite3.h>
41 41
@@ -1564,7 +1564,7 @@ state_modify_begin (void *cls,
1564static int 1564static int
1565state_modify_op (void *cls, 1565state_modify_op (void *cls,
1566 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1566 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1567 enum GNUNET_ENV_Operator op, 1567 enum GNUNET_PSYC_Operator op,
1568 const char *name, const void *value, size_t value_size) 1568 const char *name, const void *value, size_t value_size)
1569{ 1569{
1570 struct Plugin *plugin = cls; 1570 struct Plugin *plugin = cls;
@@ -1572,7 +1572,7 @@ state_modify_op (void *cls,
1572 1572
1573 switch (op) 1573 switch (op)
1574 { 1574 {
1575 case GNUNET_ENV_OP_ASSIGN: 1575 case GNUNET_PSYC_OP_ASSIGN:
1576 return state_assign (plugin, plugin->insert_state_current, channel_key, 1576 return state_assign (plugin, plugin->insert_state_current, channel_key,
1577 name, value, value_size); 1577 name, value, value_size);
1578 1578
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c
deleted file mode 100644
index e87d6106d..000000000
--- a/src/psycstore/psyc_util_lib.c
+++ /dev/null
@@ -1,1330 +0,0 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21/**
22 * @file psycstore/psyc_util_lib.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_env_lib.h"
32#include "gnunet_psyc_service.h"
33#include "gnunet_psyc_util_lib.h"
34
35#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37
38struct GNUNET_PSYC_TransmitHandle
39{
40 /**
41 * Client connection to service.
42 */
43 struct GNUNET_CLIENT_MANAGER_Connection *client;
44
45 /**
46 * Message currently being received from the client.
47 */
48 struct GNUNET_MessageHeader *msg;
49
50 /**
51 * Callback to request next modifier from client.
52 */
53 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
54
55 /**
56 * Closure for the notify callbacks.
57 */
58 void *notify_mod_cls;
59
60 /**
61 * Callback to request next data fragment from client.
62 */
63 GNUNET_PSYC_TransmitNotifyData notify_data;
64
65 /**
66 * Closure for the notify callbacks.
67 */
68 void *notify_data_cls;
69
70 /**
71 * Modifier of the environment that is currently being transmitted.
72 */
73 struct GNUNET_ENV_Modifier *mod;
74
75 /**
76 *
77 */
78 const char *mod_value;
79
80 /**
81 * Number of bytes remaining to be transmitted from the current modifier value.
82 */
83 uint32_t mod_value_remaining;
84
85 /**
86 * State of the current message being received from client.
87 */
88 enum GNUNET_PSYC_MessageState state;
89
90 /**
91 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
92 */
93 uint8_t acks_pending;
94
95 /**
96 * Is transmission paused?
97 */
98 uint8_t paused;
99
100 /**
101 * Are we currently transmitting a message?
102 */
103 uint8_t in_transmit;
104
105 /**
106 * Notify callback is currently being called.
107 */
108 uint8_t in_notify;
109
110};
111
112
113
114struct GNUNET_PSYC_ReceiveHandle
115{
116 /**
117 * Message callback.
118 */
119 GNUNET_PSYC_MessageCallback message_cb;
120
121 /**
122 * Message part callback.
123 */
124 GNUNET_PSYC_MessagePartCallback message_part_cb;
125
126 /**
127 * Closure for the callbacks.
128 */
129 void *cb_cls;
130
131 /**
132 * ID of the message being received from the PSYC service.
133 */
134 uint64_t message_id;
135
136 /**
137 * Public key of the slave from which a message is being received.
138 */
139 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
140
141 /**
142 * State of the currently being received message from the PSYC service.
143 */
144 enum GNUNET_PSYC_MessageState state;
145
146 /**
147 * Flags for the currently being received message from the PSYC service.
148 */
149 enum GNUNET_PSYC_MessageFlags flags;
150
151 /**
152 * Expected value size for the modifier being received from the PSYC service.
153 */
154 uint32_t mod_value_size_expected;
155
156 /**
157 * Actual value size for the modifier being received from the PSYC service.
158 */
159 uint32_t mod_value_size;
160};
161
162
163/**** Messages ****/
164
165
166/**
167 * Create a PSYC message.
168 *
169 * @param method_name
170 * PSYC method for the message.
171 * @param env
172 * Environment for the message.
173 * @param data
174 * Data payload for the message.
175 * @param data_size
176 * Size of @a data.
177 *
178 * @return Message header with size information,
179 * followed by the message parts.
180 */
181struct GNUNET_PSYC_Message *
182GNUNET_PSYC_message_create (const char *method_name,
183 const struct GNUNET_ENV_Environment *env,
184 const void *data,
185 size_t data_size)
186{
187 struct GNUNET_ENV_Modifier *mod = NULL;
188 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
189 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
190 struct GNUNET_MessageHeader *pmsg = NULL;
191 uint16_t env_size = 0;
192 if (NULL != env)
193 {
194 mod = GNUNET_ENV_environment_head (env);
195 while (NULL != mod)
196 {
197 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
198 mod = mod->next;
199 }
200 }
201
202 struct GNUNET_PSYC_Message *msg;
203 uint16_t method_name_size = strlen (method_name) + 1;
204 if (method_name_size == 1)
205 return NULL;
206
207 uint16_t msg_size = sizeof (*msg) /* header */
208 + sizeof (*pmeth) + method_name_size /* method */
209 + env_size /* modifiers */
210 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
211 + sizeof (*pmsg); /* end of message */
212 msg = GNUNET_malloc (msg_size);
213 msg->header.size = htons (msg_size);
214 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
215
216 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
217 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
218 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
219 memcpy (&pmeth[1], method_name, method_name_size);
220
221 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
222 if (NULL != env)
223 {
224 mod = GNUNET_ENV_environment_head (env);
225 while (NULL != mod)
226 {
227 uint16_t mod_name_size = strlen (mod->name) + 1;
228 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
229 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
230 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
231 p += pmod->header.size;
232 pmod->header.size = htons (pmod->header.size);
233
234 pmod->oper = mod->oper;
235 pmod->name_size = htons (mod_name_size);
236 pmod->value_size = htonl (mod->value_size);
237
238 memcpy (&pmod[1], mod->name, mod_name_size);
239 if (0 < mod->value_size)
240 memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
241
242 mod = mod->next;
243 }
244 }
245
246 if (0 < data_size)
247 {
248 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
249 pmsg->size = sizeof (*pmsg) + data_size;
250 p += pmsg->size;
251 pmsg->size = htons (pmsg->size);
252 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
253 memcpy (&pmsg[1], data, data_size);
254 }
255
256 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
257 pmsg->size = htons (sizeof (*pmsg));
258 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
259
260 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
261 return msg;
262}
263
264
265void
266GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
267 const struct GNUNET_MessageHeader *msg)
268{
269 uint16_t size = ntohs (msg->size);
270 uint16_t type = ntohs (msg->type);
271 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
272 switch (type)
273 {
274 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
275 {
276 struct GNUNET_PSYC_MessageHeader *pmsg
277 = (struct GNUNET_PSYC_MessageHeader *) msg;
278 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
279 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
280 break;
281 }
282 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
283 {
284 struct GNUNET_PSYC_MessageMethod *meth
285 = (struct GNUNET_PSYC_MessageMethod *) msg;
286 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
287 break;
288 }
289 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
290 {
291 struct GNUNET_PSYC_MessageModifier *mod
292 = (struct GNUNET_PSYC_MessageModifier *) msg;
293 uint16_t name_size = ntohs (mod->name_size);
294 char oper = ' ' < mod->oper ? mod->oper : ' ';
295 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
296 size - sizeof (*mod) - name_size,
297 ((char *) &mod[1]) + name_size);
298 break;
299 }
300 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
301 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
302 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
303 break;
304 }
305}
306
307
308/**** Transmitting messages ****/
309
310
311/**
312 * Create a transmission handle.
313 */
314struct GNUNET_PSYC_TransmitHandle *
315GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
316{
317 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
318 tmit->client = client;
319 return tmit;
320}
321
322
323/**
324 * Destroy a transmission handle.
325 */
326void
327GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
328{
329 GNUNET_free (tmit);
330}
331
332
333/**
334 * Queue a message part for transmission.
335 *
336 * The message part is added to the current message buffer.
337 * When this buffer is full, it is added to the transmission queue.
338 *
339 * @param tmit
340 * Transmission handle.
341 * @param msg
342 * Message part, or NULL.
343 * @param tmit_now
344 * Transmit message now, or wait for buffer to fill up?
345 * #GNUNET_YES or #GNUNET_NO.
346 */
347static void
348transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
349 const struct GNUNET_MessageHeader *msg,
350 uint8_t tmit_now)
351{
352 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
353
354 LOG (GNUNET_ERROR_TYPE_DEBUG,
355 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
356 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
357
358 if (NULL != tmit->msg)
359 {
360 if (NULL == msg
361 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
362 {
363 /* End of message or buffer is full, add it to transmission queue
364 * and start with empty buffer */
365 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
366 tmit->msg->size = htons (tmit->msg->size);
367 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
368 tmit->msg = NULL;
369 tmit->acks_pending++;
370 }
371 else
372 {
373 /* Message fits in current buffer, append */
374 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
375 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
376 tmit->msg->size += size;
377 }
378 }
379
380 if (NULL == tmit->msg && NULL != msg)
381 {
382 /* Empty buffer, copy over message. */
383 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
384 tmit->msg->size = sizeof (*tmit->msg) + size;
385 memcpy (&tmit->msg[1], msg, size);
386 }
387
388 if (NULL != tmit->msg
389 && (GNUNET_YES == tmit_now
390 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
391 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
392 {
393 /* End of message or buffer is full, add it to transmission queue. */
394 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
395 tmit->msg->size = htons (tmit->msg->size);
396 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
397 tmit->msg = NULL;
398 tmit->acks_pending++;
399 }
400}
401
402
403/**
404 * Request data from client to transmit.
405 *
406 * @param tmit Transmission handle.
407 */
408static void
409transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
410{
411 int notify_ret = GNUNET_YES;
412 uint16_t data_size = 0;
413 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
414 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
415 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
416
417 if (NULL != tmit->notify_data)
418 {
419 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
420 tmit->in_notify = GNUNET_YES;
421 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
422 tmit->in_notify = GNUNET_NO;
423 }
424 LOG (GNUNET_ERROR_TYPE_DEBUG,
425 "transmit_data (ret: %d, size: %u): %.*s\n",
426 notify_ret, data_size, data_size, &msg[1]);
427 switch (notify_ret)
428 {
429 case GNUNET_NO:
430 if (0 == data_size)
431 {
432 /* Transmission paused, nothing to send. */
433 tmit->paused = GNUNET_YES;
434 return;
435 }
436 break;
437
438 case GNUNET_YES:
439 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
440 break;
441
442 default:
443 LOG (GNUNET_ERROR_TYPE_ERROR,
444 "TransmitNotifyData callback returned error when requesting data.\n");
445
446 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
447 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
448 msg->size = htons (sizeof (*msg));
449 transmit_queue_insert (tmit, msg, GNUNET_YES);
450 tmit->in_transmit = GNUNET_NO;
451 return;
452 }
453
454 if (0 < data_size)
455 {
456 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
457 msg->size = htons (sizeof (*msg) + data_size);
458 transmit_queue_insert (tmit, msg, !notify_ret);
459 }
460
461 /* End of message. */
462 if (GNUNET_YES == notify_ret)
463 {
464 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
465 msg->size = htons (sizeof (*msg));
466 transmit_queue_insert (tmit, msg, GNUNET_YES);
467 /* FIXME: wait for ACK before setting in_transmit to no */
468 tmit->in_transmit = GNUNET_NO;
469 }
470}
471
472
473/**
474 * Request a modifier from a client to transmit.
475 *
476 * @param tmit Transmission handle.
477 */
478static void
479transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
480{
481 uint16_t max_data_size = 0;
482 uint16_t data_size = 0;
483 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
484 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
485 int notify_ret = GNUNET_YES;
486
487 switch (tmit->state)
488 {
489 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
490 {
491 struct GNUNET_PSYC_MessageModifier *mod
492 = (struct GNUNET_PSYC_MessageModifier *) msg;
493 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
494 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
495
496 if (NULL != tmit->notify_mod)
497 {
498 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
499 data_size = max_data_size;
500 tmit->in_notify = GNUNET_YES;
501 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
502 &mod->oper, &mod->value_size);
503 tmit->in_notify = GNUNET_NO;
504 }
505
506 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
507 LOG (GNUNET_ERROR_TYPE_DEBUG,
508 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
509 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
510 if (mod->name_size < data_size)
511 {
512 tmit->mod_value_remaining
513 = mod->value_size - (data_size - mod->name_size);
514 mod->value_size = htonl (mod->value_size);
515 mod->name_size = htons (mod->name_size);
516 }
517 else if (0 < data_size)
518 {
519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
520 notify_ret = GNUNET_SYSERR;
521 }
522 break;
523 }
524 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
525 {
526 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
527 msg->size = sizeof (struct GNUNET_MessageHeader);
528
529 if (NULL != tmit->notify_mod)
530 {
531 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
532 data_size = max_data_size;
533 tmit->in_notify = GNUNET_YES;
534 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
535 &data_size, &msg[1], NULL, NULL);
536 tmit->in_notify = GNUNET_NO;
537 }
538 tmit->mod_value_remaining -= data_size;
539 LOG (GNUNET_ERROR_TYPE_DEBUG,
540 "transmit_mod (ret: %d, size: %u): %.*s\n",
541 notify_ret, data_size, data_size, &msg[1]);
542 break;
543 }
544 default:
545 GNUNET_assert (0);
546 }
547
548 switch (notify_ret)
549 {
550 case GNUNET_NO:
551 if (0 == data_size)
552 { /* Transmission paused, nothing to send. */
553 tmit->paused = GNUNET_YES;
554 return;
555 }
556 tmit->state
557 = (0 == tmit->mod_value_remaining)
558 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
559 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
560 break;
561
562 case GNUNET_YES: /* End of modifiers. */
563 GNUNET_assert (0 == tmit->mod_value_remaining);
564 break;
565
566 default:
567 LOG (GNUNET_ERROR_TYPE_ERROR,
568 "TransmitNotifyModifier callback returned with error.\n");
569
570 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
571 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
572 msg->size = htons (sizeof (*msg));
573 transmit_queue_insert (tmit, msg, GNUNET_YES);
574 tmit->in_transmit = GNUNET_NO;
575 return;
576 }
577
578 if (0 < data_size)
579 {
580 GNUNET_assert (data_size <= max_data_size);
581 msg->size = htons (msg->size + data_size);
582 transmit_queue_insert (tmit, msg, GNUNET_NO);
583 }
584
585 if (GNUNET_YES == notify_ret)
586 {
587 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
588 if (0 == tmit->acks_pending)
589 transmit_data (tmit);
590 }
591 else
592 {
593 transmit_mod (tmit);
594 }
595}
596
597
598int
599transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
600 uint32_t *full_value_size)
601
602{
603 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
604 uint16_t name_size = 0;
605 uint32_t value_size = 0;
606 const char *value = NULL;
607
608 if (NULL != oper)
609 { /* New modifier */
610 if (NULL != tmit->mod)
611 tmit->mod = tmit->mod->next;
612 if (NULL == tmit->mod)
613 { /* No more modifiers, continue with data */
614 *data_size = 0;
615 return GNUNET_YES;
616 }
617
618 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
619 *full_value_size = tmit->mod->value_size;
620 *oper = tmit->mod->oper;
621 name_size = strlen (tmit->mod->name) + 1;
622
623 if (name_size + tmit->mod->value_size <= *data_size)
624 {
625 value_size = tmit->mod->value_size;
626 *data_size = name_size + value_size;
627 }
628 else /* full modifier does not fit in data, continuation needed */
629 {
630 value_size = *data_size - name_size;
631 tmit->mod_value = tmit->mod->value + value_size;
632 }
633
634 memcpy (data, tmit->mod->name, name_size);
635 memcpy ((char *)data + name_size, tmit->mod->value, value_size);
636 return GNUNET_NO;
637 }
638 else
639 { /* Modifier continuation */
640 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
641 value = tmit->mod_value;
642 if (tmit->mod_value_remaining <= *data_size)
643 {
644 value_size = tmit->mod_value_remaining;
645 tmit->mod_value = NULL;
646 }
647 else
648 {
649 value_size = *data_size;
650 tmit->mod_value += value_size;
651 }
652
653 if (*data_size < value_size)
654 {
655 LOG (GNUNET_ERROR_TYPE_DEBUG,
656 "Value in environment larger than buffer: %u < %zu\n",
657 *data_size, value_size);
658 *data_size = 0;
659 return GNUNET_NO;
660 }
661
662 *data_size = value_size;
663 memcpy (data, value, value_size);
664 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
665 }
666}
667
668
669/**
670 * Transmit a message.
671 *
672 * @param tmit
673 * Transmission handle.
674 * @param method_name
675 * Which method should be invoked.
676 * @param env
677 * Environment for the message.
678 * Should stay available until the first call to notify_data.
679 * Can be NULL if there are no modifiers or @a notify_mod is
680 * provided instead.
681 * @param notify_mod
682 * Function to call to obtain modifiers.
683 * Can be NULL if there are no modifiers or @a env is provided instead.
684 * @param notify_data
685 * Function to call to obtain fragments of the data.
686 * @param notify_cls
687 * Closure for @a notify_mod and @a notify_data.
688 * @param flags
689 * Flags for the message being transmitted.
690 *
691 * @return #GNUNET_OK if the transmission was started.
692 * #GNUNET_SYSERR if another transmission is already going on.
693 */
694int
695GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
696 const char *method_name,
697 const struct GNUNET_ENV_Environment *env,
698 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
699 GNUNET_PSYC_TransmitNotifyData notify_data,
700 void *notify_cls,
701 uint32_t flags)
702{
703 if (GNUNET_NO != tmit->in_transmit)
704 return GNUNET_SYSERR;
705 tmit->in_transmit = GNUNET_YES;
706
707 size_t size = strlen (method_name) + 1;
708 struct GNUNET_PSYC_MessageMethod *pmeth;
709 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
710 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
711
712 if (NULL != notify_mod)
713 {
714 tmit->notify_mod = notify_mod;
715 tmit->notify_mod_cls = notify_cls;
716 }
717 else
718 {
719 tmit->notify_mod = &transmit_notify_env;
720 tmit->notify_mod_cls = tmit;
721 if (NULL != env)
722 {
723 struct GNUNET_ENV_Modifier mod = {};
724 mod.next = GNUNET_ENV_environment_head (env);
725 tmit->mod = &mod;
726
727 struct GNUNET_ENV_Modifier *m = tmit->mod;
728 while (NULL != (m = m->next))
729 {
730 if (m->oper != GNUNET_ENV_OP_SET)
731 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
732 }
733 }
734 else
735 {
736 tmit->mod = NULL;
737 }
738 }
739
740 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
741 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
742 pmeth->header.size = htons (sizeof (*pmeth) + size);
743 pmeth->flags = htonl (flags);
744 memcpy (&pmeth[1], method_name, size);
745
746 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
747 tmit->notify_data = notify_data;
748 tmit->notify_data_cls = notify_cls;
749
750 transmit_mod (tmit);
751 return GNUNET_OK;
752}
753
754
755/**
756 * Resume transmission.
757 *
758 * @param tmit Transmission handle.
759 */
760void
761GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
762{
763 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
764 return;
765
766 if (0 == tmit->acks_pending)
767 {
768 tmit->paused = GNUNET_NO;
769 transmit_data (tmit);
770 }
771}
772
773
774/**
775 * Abort transmission request.
776 *
777 * @param tmit Transmission handle.
778 */
779void
780GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
781{
782 if (GNUNET_NO == tmit->in_transmit)
783 return;
784
785 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
786 tmit->in_transmit = GNUNET_NO;
787 tmit->paused = GNUNET_NO;
788
789 /* FIXME */
790 struct GNUNET_MessageHeader msg;
791 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
792 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
793 msg.size = htons (sizeof (msg));
794 transmit_queue_insert (tmit, &msg, GNUNET_YES);
795}
796
797
798/**
799 * Got acknowledgement of a transmitted message part, continue transmission.
800 *
801 * @param tmit Transmission handle.
802 */
803void
804GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
805{
806 if (0 == tmit->acks_pending)
807 {
808 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
809 GNUNET_break (0);
810 return;
811 }
812 tmit->acks_pending--;
813
814 switch (tmit->state)
815 {
816 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
817 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
818 transmit_mod (tmit);
819 break;
820
821 case GNUNET_PSYC_MESSAGE_STATE_DATA:
822 transmit_data (tmit);
823 break;
824
825 case GNUNET_PSYC_MESSAGE_STATE_END:
826 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
827 break;
828
829 default:
830 LOG (GNUNET_ERROR_TYPE_DEBUG,
831 "Ignoring message ACK in state %u.\n", tmit->state);
832 }
833}
834
835
836/**** Receiving messages ****/
837
838
839/**
840 * Create handle for receiving messages.
841 */
842struct GNUNET_PSYC_ReceiveHandle *
843GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
844 GNUNET_PSYC_MessagePartCallback message_part_cb,
845 void *cb_cls)
846{
847 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
848 recv->message_cb = message_cb;
849 recv->message_part_cb = message_part_cb;
850 recv->cb_cls = cb_cls;
851 return recv;
852}
853
854
855/**
856 * Destroy handle for receiving messages.
857 */
858void
859GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
860{
861 GNUNET_free (recv);
862}
863
864
865/**
866 * Reset stored data related to the last received message.
867 */
868void
869GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
870{
871 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
872 recv->flags = 0;
873 recv->message_id = 0;
874 recv->mod_value_size = 0;
875 recv->mod_value_size_expected = 0;
876}
877
878
879static void
880recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
881{
882 if (NULL != recv->message_part_cb)
883 recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
884 0, NULL);
885
886 if (NULL != recv->message_cb)
887 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
888
889 GNUNET_PSYC_receive_reset (recv);
890}
891
892
893/**
894 * Handle incoming PSYC message.
895 *
896 * @param recv Receive handle.
897 * @param msg The message.
898 *
899 * @return #GNUNET_OK on success,
900 * #GNUNET_SYSERR on receive error.
901 */
902int
903GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
904 const struct GNUNET_PSYC_MessageHeader *msg)
905{
906 uint16_t size = ntohs (msg->header.size);
907 uint32_t flags = ntohl (msg->flags);
908 uint64_t message_id;
909
910 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
911 (struct GNUNET_MessageHeader *) msg);
912
913 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
914 {
915 recv->message_id = GNUNET_ntohll (msg->message_id);
916 recv->flags = flags;
917 recv->slave_key = msg->slave_key;
918 recv->mod_value_size = 0;
919 recv->mod_value_size_expected = 0;
920 }
921 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
922 {
923 // FIXME
924 LOG (GNUNET_ERROR_TYPE_WARNING,
925 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
926 GNUNET_ntohll (msg->message_id), recv->message_id);
927 GNUNET_break_op (0);
928 recv_error (recv);
929 return GNUNET_SYSERR;
930 }
931 else if (flags != recv->flags)
932 {
933 LOG (GNUNET_ERROR_TYPE_WARNING,
934 "Unexpected message flags. Got: %lu, expected: %lu\n",
935 flags, recv->flags);
936 GNUNET_break_op (0);
937 recv_error (recv);
938 return GNUNET_SYSERR;
939 }
940 message_id = recv->message_id;
941
942 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
943
944 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
945 {
946 const struct GNUNET_MessageHeader *pmsg
947 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
948 psize = ntohs (pmsg->size);
949 ptype = ntohs (pmsg->type);
950 size_eq = size_min = 0;
951
952 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
953 {
954 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
955 "Dropping message of type %u with invalid size %u.\n",
956 ptype, psize);
957 recv_error (recv);
958 return GNUNET_SYSERR;
959 }
960
961 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962 "Received message part of type %u and size %u from PSYC.\n",
963 ptype, psize);
964 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
965
966 switch (ptype)
967 {
968 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
969 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
970 break;
971 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
972 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
973 break;
974 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
975 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
976 size_min = sizeof (struct GNUNET_MessageHeader);
977 break;
978 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
979 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
980 size_eq = sizeof (struct GNUNET_MessageHeader);
981 break;
982 default:
983 GNUNET_break_op (0);
984 recv_error (recv);
985 return GNUNET_SYSERR;
986 }
987
988 if (! ((0 < size_eq && psize == size_eq)
989 || (0 < size_min && size_min <= psize)))
990 {
991 GNUNET_break_op (0);
992 recv_error (recv);
993 return GNUNET_SYSERR;
994 }
995
996 switch (ptype)
997 {
998 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
999 {
1000 struct GNUNET_PSYC_MessageMethod *meth
1001 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1002
1003 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1004 {
1005 LOG (GNUNET_ERROR_TYPE_WARNING,
1006 "Dropping out of order message method (%u).\n",
1007 recv->state);
1008 /* It is normal to receive an incomplete message right after connecting,
1009 * but should not happen later.
1010 * FIXME: add a check for this condition.
1011 */
1012 GNUNET_break_op (0);
1013 recv_error (recv);
1014 return GNUNET_SYSERR;
1015 }
1016
1017 if ('\0' != *((char *) meth + psize - 1))
1018 {
1019 LOG (GNUNET_ERROR_TYPE_WARNING,
1020 "Dropping message with malformed method. "
1021 "Message ID: %" PRIu64 "\n", recv->message_id);
1022 GNUNET_break_op (0);
1023 recv_error (recv);
1024 return GNUNET_SYSERR;
1025 }
1026 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1027 break;
1028 }
1029 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1030 {
1031 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1032 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1033 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1034 {
1035 LOG (GNUNET_ERROR_TYPE_WARNING,
1036 "Dropping out of order message modifier (%u).\n",
1037 recv->state);
1038 GNUNET_break_op (0);
1039 recv_error (recv);
1040 return GNUNET_SYSERR;
1041 }
1042
1043 struct GNUNET_PSYC_MessageModifier *mod
1044 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1045
1046 uint16_t name_size = ntohs (mod->name_size);
1047 recv->mod_value_size_expected = ntohl (mod->value_size);
1048 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1049
1050 if (psize < sizeof (*mod) + name_size
1051 || '\0' != *((char *) &mod[1] + name_size - 1)
1052 || recv->mod_value_size_expected < recv->mod_value_size)
1053 {
1054 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1055 GNUNET_break_op (0);
1056 recv_error (recv);
1057 return GNUNET_SYSERR;
1058 }
1059 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1060 break;
1061 }
1062 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1063 {
1064 recv->mod_value_size += psize - sizeof (*pmsg);
1065
1066 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1067 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1068 || recv->mod_value_size_expected < recv->mod_value_size)
1069 {
1070 LOG (GNUNET_ERROR_TYPE_WARNING,
1071 "Dropping out of order message modifier continuation "
1072 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1073 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1074 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1075 recv->mod_value_size_expected, recv->mod_value_size);
1076 GNUNET_break_op (0);
1077 recv_error (recv);
1078 return GNUNET_SYSERR;
1079 }
1080 break;
1081 }
1082 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1083 {
1084 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1085 || recv->mod_value_size_expected != recv->mod_value_size)
1086 {
1087 LOG (GNUNET_ERROR_TYPE_WARNING,
1088 "Dropping out of order message data fragment "
1089 "(%u < %u || %lu != %lu).\n",
1090 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1091 recv->mod_value_size_expected, recv->mod_value_size);
1092
1093 GNUNET_break_op (0);
1094 recv_error (recv);
1095 return GNUNET_SYSERR;
1096 }
1097 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1098 break;
1099 }
1100 }
1101
1102 if (NULL != recv->message_part_cb)
1103 recv->message_part_cb (recv->cb_cls, &recv->slave_key,
1104 recv->message_id, recv->flags,
1105 0, // FIXME: data_offset
1106 pmsg);
1107
1108 switch (ptype)
1109 {
1110 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1111 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1112 GNUNET_PSYC_receive_reset (recv);
1113 break;
1114 }
1115 }
1116
1117 if (NULL != recv->message_cb)
1118 recv->message_cb (recv->cb_cls, message_id, flags, msg);
1119 return GNUNET_OK;
1120}
1121
1122
1123/**
1124 * Check if @a data contains a series of valid message parts.
1125 *
1126 * @param data_size Size of @a data.
1127 * @param data Data.
1128 * @param[out] first_ptype Type of first message part.
1129 * @param[out] last_ptype Type of last message part.
1130 *
1131 * @return Number of message parts found in @a data.
1132 * or GNUNET_SYSERR if the message contains invalid parts.
1133 */
1134int
1135GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1136 uint16_t *first_ptype, uint16_t *last_ptype)
1137{
1138 const struct GNUNET_MessageHeader *pmsg;
1139 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1140 if (NULL != first_ptype)
1141 *first_ptype = 0;
1142 if (NULL != last_ptype)
1143 *last_ptype = 0;
1144
1145 for (pos = 0; pos < data_size; pos += psize, parts++)
1146 {
1147 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1148 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1149 psize = ntohs (pmsg->size);
1150 ptype = ntohs (pmsg->type);
1151 if (0 == parts && NULL != first_ptype)
1152 *first_ptype = ptype;
1153 if (NULL != last_ptype
1154 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1155 *last_ptype = ptype;
1156 if (psize < sizeof (*pmsg)
1157 || pos + psize > data_size
1158 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1159 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1160 {
1161 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1162 "Invalid message part of type %u and size %u.\n",
1163 ptype, psize);
1164 return GNUNET_SYSERR;
1165 }
1166 /** @todo FIXME: check message part order */
1167 }
1168 return parts;
1169}
1170
1171
1172struct ParseMessageClosure
1173{
1174 struct GNUNET_ENV_Environment *env;
1175 const char **method_name;
1176 const void **data;
1177 uint16_t *data_size;
1178 enum GNUNET_PSYC_MessageState msg_state;
1179};
1180
1181
1182static void
1183parse_message_part_cb (void *cls,
1184 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1185 uint64_t message_id, uint32_t flags, uint64_t data_offset,
1186 const struct GNUNET_MessageHeader *msg)
1187{
1188 struct ParseMessageClosure *pmc = cls;
1189 if (NULL == msg)
1190 {
1191 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1192 return;
1193 }
1194
1195 switch (ntohs (msg->type))
1196 {
1197 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1198 {
1199 struct GNUNET_PSYC_MessageMethod *
1200 pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1201 *pmc->method_name = (const char *) &pmeth[1];
1202 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1203 break;
1204 }
1205
1206 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1207 {
1208 struct GNUNET_PSYC_MessageModifier *
1209 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1210
1211 const char *name = (const char *) &pmod[1];
1212 const void *value = name + ntohs (pmod->name_size);
1213 GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1214 ntohl (pmod->value_size));
1215 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1216 break;
1217 }
1218
1219 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1220 *pmc->data = &msg[1];
1221 *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1222 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1223 break;
1224
1225 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1226 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1227 break;
1228
1229 default:
1230 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1231 }
1232}
1233
1234
1235/**
1236 * Parse PSYC message.
1237 *
1238 * @param msg
1239 * The PSYC message to parse.
1240 * @param[out] method_name
1241 * Pointer to the method name inside @a pmsg.
1242 * @param env
1243 * The environment for the message with a list of modifiers.
1244 * @param[out] data
1245 * Pointer to data inside @a pmsg.
1246 * @param[out] data_size
1247 * Size of @data is written here.
1248 *
1249 * @return #GNUNET_OK on success,
1250 * #GNUNET_SYSERR on parse error.
1251 */
1252int
1253GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1254 const char **method_name,
1255 struct GNUNET_ENV_Environment *env,
1256 const void **data,
1257 uint16_t *data_size)
1258{
1259 struct ParseMessageClosure cls;
1260 cls.env = env;
1261 cls.method_name = method_name;
1262 cls.data = data;
1263 cls.data_size = data_size;
1264
1265 struct GNUNET_PSYC_ReceiveHandle *
1266 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1267 int ret = GNUNET_PSYC_receive_message (recv, msg);
1268 GNUNET_PSYC_receive_destroy (recv);
1269
1270 if (GNUNET_OK != ret)
1271 return GNUNET_SYSERR;
1272
1273 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1274 ? GNUNET_OK
1275 : GNUNET_NO;
1276}
1277
1278
1279/**
1280 * Initialize PSYC message header.
1281 */
1282void
1283GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1284 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1285 uint32_t flags)
1286{
1287 uint16_t size = ntohs (mmsg->header.size);
1288 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1289
1290 pmsg->header.size = htons (psize);
1291 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1292 pmsg->message_id = mmsg->message_id;
1293 pmsg->fragment_offset = mmsg->fragment_offset;
1294 pmsg->flags = htonl (flags);
1295
1296 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1297}
1298
1299
1300/**
1301 * Create a new PSYC message header from a multicast message.
1302 */
1303struct GNUNET_PSYC_MessageHeader *
1304GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1305 uint32_t flags)
1306{
1307 struct GNUNET_PSYC_MessageHeader *pmsg;
1308 uint16_t size = ntohs (mmsg->header.size);
1309 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1310
1311 pmsg = GNUNET_malloc (psize);
1312 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1313 return pmsg;
1314}
1315
1316
1317/**
1318 * Create a new PSYC message header from a PSYC message.
1319 */
1320struct GNUNET_PSYC_MessageHeader *
1321GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1322{
1323 uint16_t msg_size = ntohs (msg->header.size);
1324 struct GNUNET_PSYC_MessageHeader *
1325 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1326 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1327 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1328 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1329 return pmsg;
1330}
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 8f067b452..bfef07d3d 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -1303,7 +1303,7 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1303 uint64_t max_state_message_id, 1303 uint64_t max_state_message_id,
1304 uint64_t state_hash_message_id, 1304 uint64_t state_hash_message_id,
1305 size_t modifier_count, 1305 size_t modifier_count,
1306 const struct GNUNET_ENV_Modifier *modifiers, 1306 const struct GNUNET_PSYC_Modifier *modifiers,
1307 GNUNET_PSYCSTORE_ResultCallback rcb, 1307 GNUNET_PSYCSTORE_ResultCallback rcb,
1308 void *rcb_cls) 1308 void *rcb_cls)
1309{ 1309{
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index 3a022ec52..c15765a88 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -18,11 +18,12 @@
18 * Boston, MA 02110-1301, USA. 18 * Boston, MA 02110-1301, USA.
19 */ 19 */
20 20
21/* 21/**
22 * @file psycstore/test_plugin_psycstore.c
23 * @brief Test for the PSYCstore plugins.
24 * @author Gabor X Toth 22 * @author Gabor X Toth
25 * @author Christian Grothoff 23 * @author Christian Grothoff
24 *
25 * @file
26 * Test for the PSYCstore plugins.
26 */ 27 */
27#include "platform.h" 28#include "platform.h"
28#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
@@ -309,12 +310,12 @@ run (void *cls, char *const *args, const char *cfgfile,
309 message_id, 0)); 310 message_id, 0));
310 311
311 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, 312 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
312 GNUNET_ENV_OP_ASSIGN, 313 GNUNET_PSYC_OP_ASSIGN,
313 "_foo", 314 "_foo",
314 C2ARG("one two three"))); 315 C2ARG("one two three")));
315 316
316 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, 317 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
317 GNUNET_ENV_OP_ASSIGN, 318 GNUNET_PSYC_OP_ASSIGN,
318 "_foo_bar", slave_key, 319 "_foo_bar", slave_key,
319 sizeof (*slave_key))); 320 sizeof (*slave_key)));
320 321
@@ -401,7 +402,7 @@ run (void *cls, char *const *args, const char *cfgfile,
401 message_id - max_state_msg_id)); 402 message_id - max_state_msg_id));
402 403
403 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, 404 GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
404 GNUNET_ENV_OP_ASSIGN, 405 GNUNET_PSYC_OP_ASSIGN,
405 "_sync_foo", 406 "_sync_foo",
406 C2ARG("five six seven"))); 407 C2ARG("five six seven")));
407 408
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index 1e7004021..e57545198 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -77,7 +77,7 @@ struct StateClosure {
77 size_t value_size[16]; 77 size_t value_size[16];
78} scls; 78} scls;
79 79
80static struct GNUNET_ENV_Modifier modifiers[16]; 80static struct GNUNET_PSYC_Modifier modifiers[16];
81 81
82/** 82/**
83 * Clean up all resources used. 83 * Clean up all resources used.
@@ -328,13 +328,13 @@ message_get_latest_result (void *cls, int64_t result,
328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%d\n", result); 328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%d\n", result);
329 GNUNET_assert (0 < result && fcls->n == fcls->n_expected); 329 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
330 330
331 modifiers[0] = (struct GNUNET_ENV_Modifier) { 331 modifiers[0] = (struct GNUNET_PSYC_Modifier) {
332 .oper = '=', 332 .oper = '=',
333 .name = "_sync_foo", 333 .name = "_sync_foo",
334 .value = "three two one", 334 .value = "three two one",
335 .value_size = sizeof ("three two one") - 1 335 .value_size = sizeof ("three two one") - 1
336 }; 336 };
337 modifiers[1] = (struct GNUNET_ENV_Modifier) { 337 modifiers[1] = (struct GNUNET_PSYC_Modifier) {
338 .oper = '=', 338 .oper = '=',
339 .name = "_sync_bar", 339 .name = "_sync_bar",
340 .value = "ten eleven twelve", 340 .value = "ten eleven twelve",