aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-29 16:35:53 +0000
committerGabor X Toth <*@tg-x.net>2014-05-29 16:35:53 +0000
commita5877668ba805c5e0efe622e6ce4c58ff5609bf9 (patch)
tree5560a9107d5add87388ac8900184f621050fa514 /src/psyc
parent0a3564262a17b8e0dddb553c31bb783c99ee1ec1 (diff)
downloadgnunet-a5877668ba805c5e0efe622e6ce4c58ff5609bf9.tar.gz
gnunet-a5877668ba805c5e0efe622e6ce4c58ff5609bf9.zip
PSYC util lib: receiving/transmitting/logging PSYC messages
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/Makefile.am29
-rw-r--r--src/psyc/psyc_common.c118
-rw-r--r--src/psyc/psyc_util_lib.c1006
3 files changed, 1028 insertions, 125 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index 212c383ac..6a4f249a9 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -17,21 +17,35 @@ if USE_COVERAGE
17 XLIB = -lgcov 17 XLIB = -lgcov
18endif 18endif
19 19
20lib_LTLIBRARIES = libgnunetpsyc.la 20lib_LTLIBRARIES = libgnunetpsycutil.la libgnunetpsyc.la
21
22libgnunetpsycutil_la_SOURCES = \
23 psyc_util_lib.c
24libgnunetpsycutil_la_LIBADD = \
25 $(top_builddir)/src/util/libgnunetutil.la \
26 $(top_builddir)/src/env/libgnunetenv.la \
27 $(GN_LIBINTL) $(XLIB)
28libgnunetpsycutil_la_LDFLAGS = \
29 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
30 -version-info 0:0:0
31libgnunetpsycutil_la_DEPENDENCIES = \
32 $(top_builddir)/src/util/libgnunetutil.la \
33 $(top_builddir)/src/env/libgnunetenv.la
21 34
22libgnunetpsyc_la_SOURCES = \ 35libgnunetpsyc_la_SOURCES = \
23 psyc_api.c psyc.h \ 36 psyc_api.c psyc.h
24 psyc_common.c
25libgnunetpsyc_la_LIBADD = \ 37libgnunetpsyc_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \ 38 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/env/libgnunetenv.la \ 39 $(top_builddir)/src/env/libgnunetenv.la \
40 $(top_builddir)/src/psyc/libgnunetpsycutil.la \
28 $(GN_LIBINTL) $(XLIB) 41 $(GN_LIBINTL) $(XLIB)
29libgnunetpsyc_la_LDFLAGS = \ 42libgnunetpsyc_la_LDFLAGS = \
30 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 43 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
31 -version-info 0:0:0 44 -version-info 0:0:0
32libgnunetpsyc_la_DEPENDENCIES = \ 45libgnunetpsyc_la_DEPENDENCIES = \
33 $(top_builddir)/src/util/libgnunetutil.la \ 46 $(top_builddir)/src/util/libgnunetutil.la \
34 $(top_builddir)/src/env/libgnunetenv.la 47 $(top_builddir)/src/env/libgnunetenv.la \
48 $(top_builddir)/src/psyc/libgnunetpsycutil.la
35 49
36bin_PROGRAMS = 50bin_PROGRAMS =
37 51
@@ -39,19 +53,20 @@ libexec_PROGRAMS = \
39 gnunet-service-psyc 53 gnunet-service-psyc
40 54
41gnunet_service_psyc_SOURCES = \ 55gnunet_service_psyc_SOURCES = \
42 gnunet-service-psyc.c \ 56 gnunet-service-psyc.c
43 psyc_common.c
44gnunet_service_psyc_LDADD = \ 57gnunet_service_psyc_LDADD = \
45 $(top_builddir)/src/util/libgnunetutil.la \ 58 $(top_builddir)/src/util/libgnunetutil.la \
46 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 59 $(top_builddir)/src/statistics/libgnunetstatistics.la \
47 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 60 $(top_builddir)/src/multicast/libgnunetmulticast.la \
48 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \ 61 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
62 $(top_builddir)/src/psyc/libgnunetpsycutil.la \
49 $(GN_LIBINTL) 63 $(GN_LIBINTL)
50gnunet_service_psyc_DEPENDENCIES = \ 64gnunet_service_psyc_DEPENDENCIES = \
51 $(top_builddir)/src/util/libgnunetutil.la \ 65 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 66 $(top_builddir)/src/statistics/libgnunetstatistics.la \
53 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 67 $(top_builddir)/src/multicast/libgnunetmulticast.la \
54 $(top_builddir)/src/psycstore/libgnunetpsycstore.la 68 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
69 $(top_builddir)/src/psyc/libgnunetpsycutil.la
55gnunet_service_psyc_CFLAGS = $(AM_CFLAGS) 70gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
56 71
57 72
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
deleted file mode 100644
index af9cc0c6f..000000000
--- a/src/psyc/psyc_common.c
+++ /dev/null
@@ -1,118 +0,0 @@
1/*
2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file psyc/psyc_common.c
23 * @brief Common functions for PSYC
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28#include "psyc.h"
29
30/**
31 * Check if @a data contains a series of valid message parts.
32 *
33 * @param data_size Size of @a data.
34 * @param data Data.
35 * @param[out] first_ptype Type of first message part.
36 * @param[out] last_ptype Type of last message part.
37 *
38 * @return Number of message parts found in @a data.
39 * or GNUNET_SYSERR if the message contains invalid parts.
40 */
41int
42GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
43 uint16_t *first_ptype, uint16_t *last_ptype)
44{
45 const struct GNUNET_MessageHeader *pmsg;
46 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
47 if (NULL != first_ptype)
48 *first_ptype = 0;
49 if (NULL != last_ptype)
50 *last_ptype = 0;
51
52 for (pos = 0; pos < data_size; pos += psize, parts++)
53 {
54 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
55 psize = ntohs (pmsg->size);
56 ptype = ntohs (pmsg->type);
57 if (0 == parts && NULL != first_ptype)
58 *first_ptype = ptype;
59 if (NULL != last_ptype
60 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
61 *last_ptype = ptype;
62 if (psize < sizeof (*pmsg)
63 || pos + psize > data_size
64 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
65 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
66 {
67 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
68 "Invalid message part of type %u and size %u.\n",
69 ptype, psize);
70 return GNUNET_SYSERR;
71 }
72 /* FIXME: check message part order */
73 }
74 return parts;
75}
76
77
78void
79GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
80 const struct GNUNET_MessageHeader *msg)
81{
82 uint16_t size = ntohs (msg->size);
83 uint16_t type = ntohs (msg->type);
84 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
85 switch (type)
86 {
87 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
88 {
89 struct GNUNET_PSYC_MessageHeader *pmsg
90 = (struct GNUNET_PSYC_MessageHeader *) msg;
91 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
92 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
93 break;
94 }
95 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
96 {
97 struct GNUNET_PSYC_MessageMethod *meth
98 = (struct GNUNET_PSYC_MessageMethod *) msg;
99 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
100 break;
101 }
102 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
103 {
104 struct GNUNET_PSYC_MessageModifier *mod
105 = (struct GNUNET_PSYC_MessageModifier *) msg;
106 uint16_t name_size = ntohs (mod->name_size);
107 char oper = ' ' < mod->oper ? mod->oper : ' ';
108 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
109 size - sizeof (*mod) - name_size - 1,
110 ((char *) &mod[1]) + name_size + 1);
111 break;
112 }
113 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
114 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
115 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
116 break;
117 }
118}
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
new file mode 100644
index 000000000..6dd968190
--- /dev/null
+++ b/src/psyc/psyc_util_lib.c
@@ -0,0 +1,1006 @@
1/*
2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file psyc/psyc_common.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_psyc_service.h"
32#include "gnunet_psyc_util_lib.h"
33
34#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36/**
37 * Message receive states.
38 */
39enum GNUNET_PSYC_MessageState
40{
41 GNUNET_PSYC_MESSAGE_STATE_START = 0,
42 GNUNET_PSYC_MESSAGE_STATE_HEADER = 1,
43 GNUNET_PSYC_MESSAGE_STATE_METHOD = 2,
44 GNUNET_PSYC_MESSAGE_STATE_MODIFIER = 3,
45 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT = 4,
46 GNUNET_PSYC_MESSAGE_STATE_DATA = 5,
47 GNUNET_PSYC_MESSAGE_STATE_END = 6,
48 GNUNET_PSYC_MESSAGE_STATE_CANCEL = 7,
49 GNUNET_PSYC_MESSAGE_STATE_ERROR = 8,
50};
51
52
53struct GNUNET_PSYC_TransmitHandle
54{
55 /**
56 * Client connection to service.
57 */
58 struct GNUNET_CLIENT_MANAGER_Connection *client;
59
60 /**
61 * Message currently being received from the client.
62 */
63 struct GNUNET_MessageHeader *msg;
64
65 /**
66 * Callback to request next modifier from client.
67 */
68 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
69
70 /**
71 * Closure for the notify callbacks.
72 */
73 void *notify_mod_cls;
74
75 /**
76 * Callback to request next data fragment from client.
77 */
78 GNUNET_PSYC_TransmitNotifyData notify_data;
79
80 /**
81 * Closure for the notify callbacks.
82 */
83 void *notify_data_cls;
84
85 /**
86 * Modifier of the environment that is currently being transmitted.
87 */
88 struct GNUNET_ENV_Modifier *mod;
89
90 /**
91 *
92 */
93 const char *mod_value;
94 size_t mod_value_size;
95
96 /**
97 * State of the current message being received from client.
98 */
99 enum GNUNET_PSYC_MessageState state;
100
101 /**
102 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
103 */
104 uint8_t acks_pending;
105
106 /**
107 * Is transmission paused?
108 */
109 uint8_t paused;
110
111 /**
112 * Are we currently transmitting a message?
113 */
114 uint8_t in_transmit;
115};
116
117
118
119struct GNUNET_PSYC_ReceiveHandle
120{
121 /**
122 * Message part callback.
123 */
124 GNUNET_PSYC_MessageCallback message_cb;
125
126 /**
127 * Message part callback for historic message.
128 */
129 GNUNET_PSYC_MessageCallback hist_message_cb;
130
131 /**
132 * Closure for the callbacks.
133 */
134 void *cb_cls;
135
136 /**
137 * ID of the message being received from the PSYC service.
138 */
139 uint64_t message_id;
140
141 /**
142 * Public key of the slave from which a message is being received.
143 */
144 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
145
146 /**
147 * State of the currently being received message from the PSYC service.
148 */
149 enum GNUNET_PSYC_MessageState state;
150
151 /**
152 * Flags for the currently being received message from the PSYC service.
153 */
154 enum GNUNET_PSYC_MessageFlags flags;
155
156 /**
157 * Expected value size for the modifier being received from the PSYC service.
158 */
159 uint32_t mod_value_size_expected;
160
161 /**
162 * Actual value size for the modifier being received from the PSYC service.
163 */
164 uint32_t mod_value_size;
165};
166
167
168void
169GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
170 const struct GNUNET_MessageHeader *msg)
171{
172 uint16_t size = ntohs (msg->size);
173 uint16_t type = ntohs (msg->type);
174 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
175 switch (type)
176 {
177 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
178 {
179 struct GNUNET_PSYC_MessageHeader *pmsg
180 = (struct GNUNET_PSYC_MessageHeader *) msg;
181 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
182 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
183 break;
184 }
185 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
186 {
187 struct GNUNET_PSYC_MessageMethod *meth
188 = (struct GNUNET_PSYC_MessageMethod *) msg;
189 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
190 break;
191 }
192 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
193 {
194 struct GNUNET_PSYC_MessageModifier *mod
195 = (struct GNUNET_PSYC_MessageModifier *) msg;
196 uint16_t name_size = ntohs (mod->name_size);
197 char oper = ' ' < mod->oper ? mod->oper : ' ';
198 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
199 size - sizeof (*mod) - name_size - 1,
200 ((char *) &mod[1]) + name_size + 1);
201 break;
202 }
203 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
204 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
205 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
206 break;
207 }
208}
209
210
211/**** Transmitting messages ****/
212
213
214/**
215 * Create a transmission handle.
216 */
217struct GNUNET_PSYC_TransmitHandle *
218GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
219{
220 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
221 tmit->client = client;
222 return tmit;
223}
224
225
226/**
227 * Destroy a transmission handle.
228 */
229void
230GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
231{
232 GNUNET_free (tmit);
233}
234
235
236/**
237 * Queue a message part for transmission.
238 *
239 * The message part is added to the current message buffer.
240 * When this buffer is full, it is added to the transmission queue.
241 *
242 * @param tmit Transmission handle.
243 * @param msg Message part, or NULL.
244 * @param end End of message? #GNUNET_YES or #GNUNET_NO.
245 */
246static void
247transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
248 const struct GNUNET_MessageHeader *msg,
249 uint8_t end)
250{
251 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
252
253 LOG (GNUNET_ERROR_TYPE_DEBUG,
254 "Queueing message of type %u and size %u (end: %u)).\n",
255 ntohs (msg->type), size, end);
256
257 if (NULL != tmit->msg)
258 {
259 if (NULL == msg
260 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
261 {
262 /* End of message or buffer is full, add it to transmission queue
263 * and start with empty buffer */
264 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
265 tmit->msg->size = htons (tmit->msg->size);
266 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
267 tmit->msg = NULL;
268 tmit->acks_pending++;
269 }
270 else
271 {
272 /* Message fits in current buffer, append */
273 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
274 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
275 tmit->msg->size += size;
276 }
277 }
278
279 if (NULL == tmit->msg && NULL != msg)
280 {
281 /* Empty buffer, copy over message. */
282 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
283 tmit->msg->size = sizeof (*tmit->msg) + size;
284 memcpy (&tmit->msg[1], msg, size);
285 }
286
287 if (NULL != tmit->msg
288 && (GNUNET_YES == end
289 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
290 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
291 {
292 /* End of message or buffer is full, add it to transmission queue. */
293 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
294 tmit->msg->size = htons (tmit->msg->size);
295 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
296 tmit->msg = NULL;
297 tmit->acks_pending++;
298 }
299
300 if (GNUNET_YES == end)
301 tmit->in_transmit = GNUNET_NO;
302}
303
304
305/**
306 * Request data from client to transmit.
307 *
308 * @param tmit Transmission handle.
309 */
310static void
311transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
312{
313 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
314 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
315 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
316 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
317
318 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
319 switch (notify_ret)
320 {
321 case GNUNET_NO:
322 if (0 == data_size)
323 {
324 /* Transmission paused, nothing to send. */
325 tmit->paused = GNUNET_YES;
326 return;
327 }
328 break;
329
330 case GNUNET_YES:
331 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
332 break;
333
334 default:
335 LOG (GNUNET_ERROR_TYPE_ERROR,
336 "TransmitNotifyData callback returned error when requesting data.\n");
337
338 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
339 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
340 msg->size = htons (sizeof (*msg));
341 transmit_queue_insert (tmit, msg, GNUNET_YES);
342 return;
343 }
344
345 if (0 < data_size)
346 {
347 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
348 msg->size = htons (sizeof (*msg) + data_size);
349 transmit_queue_insert (tmit, msg, !notify_ret);
350 }
351
352 /* End of message. */
353 if (GNUNET_YES == notify_ret)
354 {
355 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
356 msg->size = htons (sizeof (*msg));
357 transmit_queue_insert (tmit, msg, GNUNET_YES);
358 }
359}
360
361
362/**
363 * Request a modifier from a client to transmit.
364 *
365 * @param tmit Transmission handle.
366 */
367static void
368transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
369{
370 uint16_t max_data_size, data_size;
371 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
372 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
373 int notify_ret;
374
375 switch (tmit->state)
376 {
377 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
378 {
379 struct GNUNET_PSYC_MessageModifier *mod
380 = (struct GNUNET_PSYC_MessageModifier *) msg;
381 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
382 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
383 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
384 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
385 &mod->oper, &mod->value_size);
386 mod->name_size = strnlen ((char *) &mod[1], data_size);
387 if (mod->name_size < data_size)
388 {
389 mod->value_size = htonl (mod->value_size);
390 mod->name_size = htons (mod->name_size);
391 }
392 else if (0 < data_size)
393 {
394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
395 notify_ret = GNUNET_SYSERR;
396 }
397 break;
398 }
399 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
400 {
401 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
402 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
403 msg->size = sizeof (struct GNUNET_MessageHeader);
404 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
405 &data_size, &msg[1], NULL, NULL);
406 break;
407 }
408 default:
409 GNUNET_assert (0);
410 }
411
412 switch (notify_ret)
413 {
414 case GNUNET_NO:
415 if (0 == data_size)
416 { /* Transmission paused, nothing to send. */
417 tmit->paused = GNUNET_YES;
418 return;
419 }
420 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
421 break;
422
423 case GNUNET_YES:
424 if (0 == data_size)
425 {
426 /* End of modifiers. */
427 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
428 if (0 == tmit->acks_pending)
429 transmit_data (tmit);
430
431 return;
432 }
433 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
434 break;
435
436 default:
437 LOG (GNUNET_ERROR_TYPE_ERROR,
438 "TransmitNotifyModifier callback returned error "
439 "when requesting a modifier.\n");
440
441 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg));
444
445 transmit_queue_insert (tmit, msg, GNUNET_YES);
446 return;
447 }
448
449 if (0 < data_size)
450 {
451 GNUNET_assert (data_size <= max_data_size);
452 msg->size = htons (msg->size + data_size);
453 transmit_queue_insert (tmit, msg, GNUNET_NO);
454 }
455
456 transmit_mod (tmit);
457}
458
459
460int
461transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
462 uint32_t *full_value_size)
463
464{
465 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
466 uint16_t name_size = 0;
467 size_t value_size = 0;
468 const char *value = NULL;
469
470 if (NULL != oper && NULL != tmit->mod)
471 { /* New modifier */
472 tmit->mod = tmit->mod->next;
473 if (NULL == tmit->mod)
474 { /* No more modifiers, continue with data */
475 *data_size = 0;
476 return GNUNET_YES;
477 }
478
479 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
480 *full_value_size = tmit->mod->value_size;
481 *oper = tmit->mod->oper;
482 name_size = strlen (tmit->mod->name);
483
484 if (name_size + 1 + tmit->mod->value_size <= *data_size)
485 {
486 *data_size = name_size + 1 + tmit->mod->value_size;
487 }
488 else
489 {
490 tmit->mod_value_size = tmit->mod->value_size;
491 value_size = *data_size - name_size - 1;
492 tmit->mod_value_size -= value_size;
493 tmit->mod_value = tmit->mod->value + value_size;
494 }
495
496 memcpy (data, tmit->mod->name, name_size);
497 ((char *)data)[name_size] = '\0';
498 memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
499 }
500 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
501 { /* Modifier continuation */
502 value = tmit->mod_value;
503 if (tmit->mod_value_size <= *data_size)
504 {
505 value_size = tmit->mod_value_size;
506 tmit->mod_value = NULL;
507 }
508 else
509 {
510 value_size = *data_size;
511 tmit->mod_value += value_size;
512 }
513 tmit->mod_value_size -= value_size;
514
515 if (*data_size < value_size)
516 {
517 LOG (GNUNET_ERROR_TYPE_DEBUG,
518 "Value in environment larger than buffer: %u < %zu\n",
519 *data_size, value_size);
520 *data_size = 0;
521 return GNUNET_NO;
522 }
523
524 *data_size = value_size;
525 memcpy (data, value, value_size);
526 }
527
528 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
529}
530
531
532/**
533 * Transmit a message.
534 *
535 * @param tmit Transmission handle.
536 * @param method_name Which method should be invoked.
537 * @param env Environment for the message.
538 * Should stay available until the first call to notify_data.
539 * Can be NULL if there are no modifiers or @a notify_mod is provided instead.
540 * @param notify_mod Function to call to obtain modifiers.
541 * Can be NULL if there are no modifiers or @a env is provided instead.
542 * @param notify_data Function to call to obtain fragments of the data.
543 * @param notify_cls Closure for @a notify_mod and @a notify_data.
544 * @param flags Flags for the message being transmitted.
545 *
546 * @return #GNUNET_OK if the transmission was started.
547 * #GNUNET_SYSERR if another transmission is already going on.
548 */
549int
550GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
551 const char *method_name,
552 const struct GNUNET_ENV_Environment *env,
553 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
554 GNUNET_PSYC_TransmitNotifyData notify_data,
555 void *notify_cls,
556 uint32_t flags)
557{
558 if (GNUNET_NO != tmit->in_transmit)
559 return GNUNET_SYSERR;
560 tmit->in_transmit = GNUNET_YES;
561
562 size_t size = strlen (method_name) + 1;
563 struct GNUNET_PSYC_MessageMethod *pmeth;
564 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
565 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
566
567 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
568 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
569 pmeth->header.size = htons (sizeof (*pmeth) + size);
570 pmeth->flags = htonl (flags);
571 memcpy (&pmeth[1], method_name, size);
572
573 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
574 tmit->notify_data = notify_data;
575 tmit->notify_data_cls = notify_cls;
576
577 if (NULL != notify_mod)
578 {
579 tmit->notify_mod = notify_mod;
580 tmit->notify_mod_cls = notify_cls;
581 }
582 else
583 {
584 tmit->notify_mod = &transmit_notify_env;
585 tmit->notify_mod_cls = tmit;
586 tmit->mod
587 = (NULL != env)
588 ? GNUNET_ENV_environment_head (env)
589 : NULL;
590 }
591
592 transmit_mod (tmit);
593 return GNUNET_OK;
594}
595
596
597/**
598 * Resume transmission.
599 *
600 * @param tmit Transmission handle.
601 */
602void
603GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
604{
605 if (0 == tmit->acks_pending)
606 {
607 tmit->paused = GNUNET_NO;
608 transmit_data (tmit);
609 }
610}
611
612
613/**
614 * Abort transmission request.
615 *
616 * @param tmit Transmission handle.
617 */
618void
619GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
620{
621 if (GNUNET_NO == tmit->in_transmit)
622 return;
623
624 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
625 tmit->in_transmit = GNUNET_NO;
626 tmit->paused = GNUNET_NO;
627
628 /* FIXME */
629 struct GNUNET_MessageHeader msg;
630 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
631 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
632 msg.size = htons (sizeof (msg));
633 transmit_queue_insert (tmit, &msg, GNUNET_YES);
634}
635
636
637/**
638 * Got acknowledgement of a transmitted message part, continue transmission.
639 *
640 * @param tmit Transmission handle.
641 */
642void
643GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
644{
645 if (0 == tmit->acks_pending)
646 {
647 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
648 GNUNET_break (0);
649 return;
650 }
651 tmit->acks_pending--;
652
653 switch (tmit->state)
654 {
655 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
656 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
657 if (GNUNET_NO == tmit->paused)
658 transmit_mod (tmit);
659 break;
660
661 case GNUNET_PSYC_MESSAGE_STATE_DATA:
662 if (GNUNET_NO == tmit->paused)
663 transmit_data (tmit);
664 break;
665
666 case GNUNET_PSYC_MESSAGE_STATE_END:
667 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
668 break;
669
670 default:
671 LOG (GNUNET_ERROR_TYPE_DEBUG,
672 "Ignoring message ACK in state %u.\n", tmit->state);
673 }
674}
675
676
677/**** Receiving messages ****/
678
679
680/**
681 * Create handle for receiving messages.
682 */
683struct GNUNET_PSYC_ReceiveHandle *
684GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
685 GNUNET_PSYC_MessageCallback hist_message_cb,
686 void *cb_cls)
687{
688 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
689 recv->message_cb = message_cb;
690 recv->hist_message_cb = hist_message_cb;
691 recv->cb_cls = cb_cls;
692 return recv;
693}
694
695
696/**
697 * Destroy handle for receiving messages.
698 */
699void
700GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
701{
702 GNUNET_free (recv);
703}
704
705
706/**
707 * Reset stored data related to the last received message.
708 */
709void
710GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
711{
712 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
713 recv->flags = 0;
714 recv->message_id = 0;
715 recv->mod_value_size = 0;
716 recv->mod_value_size_expected = 0;
717}
718
719
720static void
721recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
722{
723 GNUNET_PSYC_MessageCallback message_cb
724 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
725 ? recv->hist_message_cb
726 : recv->message_cb;
727
728 if (NULL != message_cb)
729 message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
730
731 GNUNET_PSYC_receive_reset (recv);
732}
733
734
735/**
736 * Handle incoming PSYC message.
737 *
738 * @param recv Receive handle.
739 * @param msg The message.
740 *
741 * @return #GNUNET_OK on success,
742 * #GNUNET_SYSERR on receive error.
743 */
744int
745GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
746 const struct GNUNET_PSYC_MessageHeader *msg)
747{
748 uint16_t size = ntohs (msg->header.size);
749 uint32_t flags = ntohl (msg->flags);
750
751 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
752 (struct GNUNET_MessageHeader *) msg);
753
754 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
755 {
756 recv->message_id = GNUNET_ntohll (msg->message_id);
757 recv->flags = flags;
758 recv->slave_key = msg->slave_key;
759 recv->mod_value_size = 0;
760 recv->mod_value_size_expected = 0;
761 }
762 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
763 {
764 // FIXME
765 LOG (GNUNET_ERROR_TYPE_WARNING,
766 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
767 GNUNET_ntohll (msg->message_id), recv->message_id);
768 GNUNET_break_op (0);
769 recv_error (recv);
770 return GNUNET_SYSERR;
771 }
772 else if (flags != recv->flags)
773 {
774 LOG (GNUNET_ERROR_TYPE_WARNING,
775 "Unexpected message flags. Got: %lu, expected: %lu\n",
776 flags, recv->flags);
777 GNUNET_break_op (0);
778 recv_error (recv);
779 return GNUNET_SYSERR;
780 }
781
782 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
783
784 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
785 {
786 const struct GNUNET_MessageHeader *pmsg
787 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
788 psize = ntohs (pmsg->size);
789 ptype = ntohs (pmsg->type);
790 size_eq = size_min = 0;
791
792 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
793 {
794 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
795 "Dropping message of type %u with invalid size %u.\n",
796 ptype, psize);
797 recv_error (recv);
798 return GNUNET_SYSERR;
799 }
800
801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802 "Received message part from PSYC.\n");
803 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
804
805 switch (ptype)
806 {
807 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
808 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
809 break;
810 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
811 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
812 break;
813 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
814 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
815 size_min = sizeof (struct GNUNET_MessageHeader);
816 break;
817 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
818 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
819 size_eq = sizeof (struct GNUNET_MessageHeader);
820 break;
821 default:
822 GNUNET_break_op (0);
823 recv_error (recv);
824 return GNUNET_SYSERR;
825 }
826
827 if (! ((0 < size_eq && psize == size_eq)
828 || (0 < size_min && size_min <= psize)))
829 {
830 GNUNET_break_op (0);
831 recv_error (recv);
832 return GNUNET_SYSERR;
833 }
834
835 switch (ptype)
836 {
837 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
838 {
839 struct GNUNET_PSYC_MessageMethod *meth
840 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
841
842 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
843 {
844 LOG (GNUNET_ERROR_TYPE_WARNING,
845 "Dropping out of order message method (%u).\n",
846 recv->state);
847 /* It is normal to receive an incomplete message right after connecting,
848 * but should not happen later.
849 * FIXME: add a check for this condition.
850 */
851 GNUNET_break_op (0);
852 recv_error (recv);
853 return GNUNET_SYSERR;
854 }
855
856 if ('\0' != *((char *) meth + psize - 1))
857 {
858 LOG (GNUNET_ERROR_TYPE_WARNING,
859 "Dropping message with malformed method. "
860 "Message ID: %" PRIu64 "\n", recv->message_id);
861 GNUNET_break_op (0);
862 recv_error (recv);
863 return GNUNET_SYSERR;
864 }
865 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
866 break;
867 }
868 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
869 {
870 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
871 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
872 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
873 {
874 LOG (GNUNET_ERROR_TYPE_WARNING,
875 "Dropping out of order message modifier (%u).\n",
876 recv->state);
877 GNUNET_break_op (0);
878 recv_error (recv);
879 return GNUNET_SYSERR;
880 }
881
882 struct GNUNET_PSYC_MessageModifier *mod
883 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
884
885 uint16_t name_size = ntohs (mod->name_size);
886 recv->mod_value_size_expected = ntohl (mod->value_size);
887 recv->mod_value_size = psize - sizeof (*mod) - name_size - 1;
888
889 if (psize < sizeof (*mod) + name_size + 1
890 || '\0' != *((char *) &mod[1] + name_size)
891 || recv->mod_value_size_expected < recv->mod_value_size)
892 {
893 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
894 GNUNET_break_op (0);
895 recv_error (recv);
896 return GNUNET_SYSERR;
897 }
898 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
899 break;
900 }
901 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
902 {
903 recv->mod_value_size += psize - sizeof (*pmsg);
904
905 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
906 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
907 || recv->mod_value_size_expected < recv->mod_value_size)
908 {
909 LOG (GNUNET_ERROR_TYPE_WARNING,
910 "Dropping out of order message modifier continuation "
911 "!(%u == %u || %u == %u) || %lu < %lu.\n",
912 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
913 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
914 recv->mod_value_size_expected, recv->mod_value_size);
915 GNUNET_break_op (0);
916 recv_error (recv);
917 return GNUNET_SYSERR;
918 }
919 break;
920 }
921 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
922 {
923 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
924 || recv->mod_value_size_expected != recv->mod_value_size)
925 {
926 LOG (GNUNET_ERROR_TYPE_WARNING,
927 "Dropping out of order message data fragment "
928 "(%u < %u || %lu != %lu).\n",
929 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
930 recv->mod_value_size_expected, recv->mod_value_size);
931
932 GNUNET_break_op (0);
933 recv_error (recv);
934 return GNUNET_SYSERR;
935 }
936 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
937 break;
938 }
939 }
940
941 GNUNET_PSYC_MessageCallback message_cb
942 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
943 ? recv->hist_message_cb
944 : recv->message_cb;
945
946 if (NULL != message_cb)
947 message_cb (recv->cb_cls, recv->message_id, recv->flags, pmsg);
948
949 switch (ptype)
950 {
951 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
952 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
953 GNUNET_PSYC_receive_reset (recv);
954 break;
955 }
956 }
957 return GNUNET_OK;
958}
959
960
961/**
962 * Check if @a data contains a series of valid message parts.
963 *
964 * @param data_size Size of @a data.
965 * @param data Data.
966 * @param[out] first_ptype Type of first message part.
967 * @param[out] last_ptype Type of last message part.
968 *
969 * @return Number of message parts found in @a data.
970 * or GNUNET_SYSERR if the message contains invalid parts.
971 */
972int
973GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
974 uint16_t *first_ptype, uint16_t *last_ptype)
975{
976 const struct GNUNET_MessageHeader *pmsg;
977 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
978 if (NULL != first_ptype)
979 *first_ptype = 0;
980 if (NULL != last_ptype)
981 *last_ptype = 0;
982
983 for (pos = 0; pos < data_size; pos += psize, parts++)
984 {
985 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
986 psize = ntohs (pmsg->size);
987 ptype = ntohs (pmsg->type);
988 if (0 == parts && NULL != first_ptype)
989 *first_ptype = ptype;
990 if (NULL != last_ptype
991 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
992 *last_ptype = ptype;
993 if (psize < sizeof (*pmsg)
994 || pos + psize > data_size
995 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
996 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
997 {
998 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
999 "Invalid message part of type %u and size %u.\n",
1000 ptype, psize);
1001 return GNUNET_SYSERR;
1002 }
1003 /* FIXME: check message part order */
1004 }
1005 return parts;
1006}