aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-02-11 20:39:36 +0100
committerChristian Grothoff <christian@grothoff.org>2019-02-11 20:39:36 +0100
commit1f59e703d82b47f3aeaf432045a2633c2841169b (patch)
tree6af5609b388cf1906a29b5d572bec2dd8fb2ae1c /src/psyc
downloadgnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.tar.gz
gnunet-secushare-1f59e703d82b47f3aeaf432045a2633c2841169b.zip
initial import from gnunet.git
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/.gitignore2
-rw-r--r--src/psyc/Makefile.am77
-rw-r--r--src/psyc/gnunet-service-psyc.c2860
-rw-r--r--src/psyc/psyc.conf.in12
-rw-r--r--src/psyc/psyc.h178
-rw-r--r--src/psyc/psyc_api.c1584
-rw-r--r--src/psyc/psyc_test_lib.h67
-rw-r--r--src/psyc/test_psyc.c1018
-rw-r--r--src/psyc/test_psyc2.c284
-rw-r--r--src/psyc/test_psyc_api_join.c282
10 files changed, 6364 insertions, 0 deletions
diff --git a/src/psyc/.gitignore b/src/psyc/.gitignore
new file mode 100644
index 0000000..14a1753
--- /dev/null
+++ b/src/psyc/.gitignore
@@ -0,0 +1,2 @@
1gnunet-service-psyc
2test_psyc
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
new file mode 100644
index 0000000..511e3e3
--- /dev/null
+++ b/src/psyc/Makefile.am
@@ -0,0 +1,77 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8pkgcfg_DATA = \
9 psyc.conf
10
11
12if MINGW
13 WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols
14endif
15
16if USE_COVERAGE
17 AM_CFLAGS = --coverage -O0
18 XLIB = -lgcov
19endif
20
21lib_LTLIBRARIES = libgnunetpsyc.la
22
23libgnunetpsyc_la_SOURCES = \
24 psyc_api.c psyc.h
25libgnunetpsyc_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/psycutil/libgnunetpsycutil.la \
28 $(GN_LIBINTL) $(XLIB)
29libgnunetpsyc_la_LDFLAGS = \
30 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
31 -version-info 0:0:0
32
33bin_PROGRAMS =
34
35libexec_PROGRAMS = \
36 gnunet-service-psyc
37
38gnunet_service_psyc_SOURCES = \
39 gnunet-service-psyc.c
40gnunet_service_psyc_LDADD = \
41 $(top_builddir)/src/util/libgnunetutil.la \
42 $(top_builddir)/src/statistics/libgnunetstatistics.la \
43 $(top_builddir)/src/multicast/libgnunetmulticast.la \
44 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
45 $(top_builddir)/src/psycutil/libgnunetpsycutil.la \
46 $(GN_LIBINTL)
47gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
48
49
50if HAVE_TESTING
51check_PROGRAMS = \
52 test_psyc
53# test_psyc2
54endif
55
56if ENABLE_TEST_RUN
57AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
58TESTS = $(check_PROGRAMS)
59endif
60
61test_psyc_SOURCES = \
62 test_psyc.c
63test_psyc_LDADD = \
64 libgnunetpsyc.la \
65 $(top_builddir)/src/psycutil/libgnunetpsycutil.la \
66 $(top_builddir)/src/testing/libgnunettesting.la \
67 $(top_builddir)/src/util/libgnunetutil.la
68#test_psyc2_SOURCES = \
69# test_psyc2.c
70#test_psyc2_LDADD = \
71# libgnunetpsyc.la \
72# $(top_builddir)/src/psycutil/libgnunetpsycutil.la \
73# $(top_builddir)/src/testbed/libgnunettestbed.la \
74# $(top_builddir)/src/util/libgnunetutil.la
75
76EXTRA_DIST = \
77 test_psyc.conf
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
new file mode 100644
index 0000000..6f2f7a9
--- /dev/null
+++ b/src/psyc/gnunet-service-psyc.c
@@ -0,0 +1,2860 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/gnunet-service-psyc.c
23 * @brief PSYC service
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_protocols.h"
33#include "gnunet_statistics_service.h"
34#include "gnunet_multicast_service.h"
35#include "gnunet_psycstore_service.h"
36#include "gnunet_psyc_service.h"
37#include "gnunet_psyc_util_lib.h"
38#include "psyc.h"
39
40
41/**
42 * Handle to our current configuration.
43 */
44static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46/**
47 * Service handle.
48 */
49static struct GNUNET_SERVICE_Handle *service;
50
51/**
52 * Handle to the statistics service.
53 */
54static struct GNUNET_STATISTICS_Handle *stats;
55
56/**
57 * Handle to the PSYCstore.
58 */
59static struct GNUNET_PSYCSTORE_Handle *store;
60
61/**
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
64 */
65static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67/**
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
70 */
71static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73/**
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76 */
77static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80/**
81 * Message in the transmission queue.
82 */
83struct TransmitMessage
84{
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
87
88 struct GNUNET_SERVICE_Client *client;
89
90 /**
91 * ID assigned to the message.
92 */
93 uint64_t id;
94
95 /**
96 * Size of message.
97 */
98 uint16_t size;
99
100 /**
101 * Type of first message part.
102 */
103 uint16_t first_ptype;
104
105 /**
106 * Type of last message part.
107 */
108 uint16_t last_ptype;
109
110 /* Followed by message */
111};
112
113
114/**
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
117 *
118 * chan_key -> MultiHashMap chan_msgs
119 */
120static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123/**
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
126 */
127struct RecvCacheEntry
128{
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
130 uint16_t ref_count;
131};
132
133
134/**
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
137 */
138struct FragmentQueue
139{
140 /**
141 * Fragment IDs stored in @a recv_cache.
142 */
143 struct GNUNET_CONTAINER_Heap *fragments;
144
145 /**
146 * Total size of received fragments.
147 */
148 uint64_t size;
149
150 /**
151 * Total size of received header fragments (METHOD & MODIFIERs)
152 */
153 uint64_t header_size;
154
155 /**
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157 */
158 uint64_t state_delta;
159
160 /**
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162 */
163 uint32_t flags;
164
165 /**
166 * Receive state of message.
167 *
168 * @see MessageFragmentState
169 */
170 uint8_t state;
171
172 /**
173 * Whether the state is already modified in PSYCstore.
174 */
175 uint8_t state_is_modified;
176
177 /**
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
180 */
181 uint8_t is_queued;
182};
183
184
185/**
186 * List of connected clients.
187 */
188struct ClientList
189{
190 struct ClientList *prev;
191 struct ClientList *next;
192
193 struct GNUNET_SERVICE_Client *client;
194};
195
196
197struct Operation
198{
199 struct Operation *prev;
200 struct Operation *next;
201
202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *channel;
204 uint64_t op_id;
205 uint32_t flags;
206};
207
208
209/**
210 * Common part of the client context for both a channel master and slave.
211 */
212struct Channel
213{
214 struct ClientList *clients_head;
215 struct ClientList *clients_tail;
216
217 struct Operation *op_head;
218 struct Operation *op_tail;
219
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
222
223 /**
224 * Current PSYCstore operation.
225 */
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228 /**
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
231 */
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234 /**
235 * Received message IDs not yet sent to the client.
236 */
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239 /**
240 * Public key of the channel.
241 */
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244 /**
245 * Hash of @a pub_key.
246 */
247 struct GNUNET_HashCode pub_key_hash;
248
249 /**
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
252 */
253 uint64_t max_message_id;
254
255 /**
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
259 */
260 uint64_t max_state_message_id;
261
262 /**
263 * Expected value size for the modifier being received from the PSYC service.
264 */
265 uint32_t tmit_mod_value_size_expected;
266
267 /**
268 * Actual value size for the modifier being received from the PSYC service.
269 */
270 uint32_t tmit_mod_value_size;
271
272 /**
273 * Is this channel ready to receive messages from client?
274 * #GNUNET_YES or #GNUNET_NO
275 */
276 uint8_t is_ready;
277
278 /**
279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO
281 */
282 uint8_t is_disconnecting;
283
284 /**
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286 */
287 uint8_t is_master;
288
289 union {
290 struct Master *master;
291 struct Slave *slave;
292 };
293};
294
295
296/**
297 * Client context for a channel master.
298 */
299struct Master
300{
301 /**
302 * Channel struct common for Master and Slave
303 */
304 struct Channel channel;
305
306 /**
307 * Private key of the channel.
308 */
309 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310
311 /**
312 * Handle for the multicast origin.
313 */
314 struct GNUNET_MULTICAST_Origin *origin;
315
316 /**
317 * Transmit handle for multicast.
318 */
319 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320
321 /**
322 * Incoming join requests from multicast.
323 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
324 */
325 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326
327 /**
328 * Last message ID transmitted to this channel.
329 *
330 * Incremented before sending a message, thus the message_id in messages sent
331 * starts from 1.
332 */
333 uint64_t max_message_id;
334
335 /**
336 * ID of the last message with state operations transmitted to the channel.
337 * 0 if there is no such message.
338 */
339 uint64_t max_state_message_id;
340
341 /**
342 * Maximum group generation transmitted to the channel.
343 */
344 uint64_t max_group_generation;
345
346 /**
347 * @see enum GNUNET_PSYC_Policy
348 */
349 enum GNUNET_PSYC_Policy policy;
350};
351
352
353/**
354 * Client context for a channel slave.
355 */
356struct Slave
357{
358 /**
359 * Channel struct common for Master and Slave
360 */
361 struct Channel channel;
362
363 /**
364 * Private key of the slave.
365 */
366 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367
368 /**
369 * Public key of the slave.
370 */
371 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372
373 /**
374 * Hash of @a pub_key.
375 */
376 struct GNUNET_HashCode pub_key_hash;
377
378 /**
379 * Handle for the multicast member.
380 */
381 struct GNUNET_MULTICAST_Member *member;
382
383 /**
384 * Transmit handle for multicast.
385 */
386 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387
388 /**
389 * Peer identity of the origin.
390 */
391 struct GNUNET_PeerIdentity origin;
392
393 /**
394 * Number of items in @a relays.
395 */
396 uint32_t relay_count;
397
398 /**
399 * Relays that multicast can use to connect.
400 */
401 struct GNUNET_PeerIdentity *relays;
402
403 /**
404 * Join request to be transmitted to the master on join.
405 */
406 struct GNUNET_PSYC_Message *join_msg;
407
408 /**
409 * Join decision received from multicast.
410 */
411 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412
413 /**
414 * Maximum request ID for this channel.
415 */
416 uint64_t max_request_id;
417
418 /**
419 * Join flags.
420 */
421 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
422};
423
424
425/**
426 * Client context.
427 */
428struct Client {
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
431};
432
433
434struct ReplayRequestKey
435{
436 uint64_t fragment_id;
437 uint64_t message_id;
438 uint64_t fragment_offset;
439 uint64_t flags;
440};
441
442
443static void
444transmit_message (struct Channel *chn);
445
446static uint64_t
447message_queue_run (struct Channel *chn);
448
449static uint64_t
450message_queue_drop (struct Channel *chn);
451
452
453static void
454schedule_transmit_message (void *cls)
455{
456 struct Channel *chn = cls;
457
458 transmit_message (chn);
459}
460
461
462/**
463 * Task run during shutdown.
464 *
465 * @param cls unused
466 */
467static void
468shutdown_task (void *cls)
469{
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "shutting down...\n");
472 GNUNET_PSYCSTORE_disconnect (store);
473 if (NULL != stats)
474 {
475 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
476 stats = NULL;
477 }
478}
479
480
481static struct Operation *
482op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483 uint64_t op_id, uint32_t flags)
484{
485 struct Operation *op = GNUNET_malloc (sizeof (*op));
486 op->client = client;
487 op->channel = chn;
488 op->op_id = op_id;
489 op->flags = flags;
490 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
491 return op;
492}
493
494
495static void
496op_remove (struct Operation *op)
497{
498 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
499 GNUNET_free (op);
500}
501
502
503/**
504 * Clean up master data structures after a client disconnected.
505 */
506static void
507cleanup_master (struct Master *mst)
508{
509 struct Channel *chn = &mst->channel;
510
511 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
512 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
513}
514
515
516/**
517 * Clean up slave data structures after a client disconnected.
518 */
519static void
520cleanup_slave (struct Slave *slv)
521{
522 struct Channel *chn = &slv->channel;
523 struct GNUNET_CONTAINER_MultiHashMap *
524 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
525 &chn->pub_key_hash);
526 GNUNET_assert (NULL != chn_slv);
527 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
528
529 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
530 {
531 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
532 chn_slv);
533 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
534 }
535 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
536
537 if (NULL != slv->join_msg)
538 {
539 GNUNET_free (slv->join_msg);
540 slv->join_msg = NULL;
541 }
542 if (NULL != slv->relays)
543 {
544 GNUNET_free (slv->relays);
545 slv->relays = NULL;
546 }
547 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
548}
549
550
551/**
552 * Clean up channel data structures after a client disconnected.
553 */
554static void
555cleanup_channel (struct Channel *chn)
556{
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "%p Cleaning up channel %s. master? %u\n",
559 chn,
560 GNUNET_h2s (&chn->pub_key_hash),
561 chn->is_master);
562 message_queue_drop (chn);
563 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
564 chn->recv_frags = NULL;
565
566 if (NULL != chn->store_op)
567 {
568 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
569 chn->store_op = NULL;
570 }
571
572 (GNUNET_YES == chn->is_master)
573 ? cleanup_master (chn->master)
574 : cleanup_slave (chn->slave);
575 GNUNET_free (chn);
576}
577
578
579/**
580 * Called whenever a client is disconnected.
581 * Frees our resources associated with that client.
582 *
583 * @param cls closure
584 * @param client identification of the client
585 * @param app_ctx must match @a client
586 */
587static void
588client_notify_disconnect (void *cls,
589 struct GNUNET_SERVICE_Client *client,
590 void *app_ctx)
591{
592 struct Client *c = app_ctx;
593 struct Channel *chn = c->channel;
594 GNUNET_free (c);
595
596 if (NULL == chn)
597 {
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "%p User context is NULL in client_notify_disconnect ()\n",
600 chn);
601 GNUNET_break (0);
602 return;
603 }
604
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "%p Client %p (%s) disconnected from channel %s\n",
607 chn,
608 client,
609 (GNUNET_YES == chn->is_master) ? "master" : "slave",
610 GNUNET_h2s (&chn->pub_key_hash));
611
612 struct ClientList *cli = chn->clients_head;
613 while (NULL != cli)
614 {
615 if (cli->client == client)
616 {
617 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
618 GNUNET_free (cli);
619 break;
620 }
621 cli = cli->next;
622 }
623
624 struct Operation *op = chn->op_head;
625 while (NULL != op)
626 {
627 if (op->client == client)
628 {
629 op->client = NULL;
630 break;
631 }
632 op = op->next;
633 }
634
635 if (NULL == chn->clients_head)
636 { /* Last client disconnected. */
637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638 "%p Last client (%s) disconnected from channel %s\n",
639 chn,
640 (GNUNET_YES == chn->is_master) ? "master" : "slave",
641 GNUNET_h2s (&chn->pub_key_hash));
642 chn->is_disconnecting = GNUNET_YES;
643 cleanup_channel (chn);
644 }
645}
646
647
648/**
649 * A new client connected.
650 *
651 * @param cls NULL
652 * @param client client to add
653 * @param mq message queue for @a client
654 * @return @a client
655 */
656static void *
657client_notify_connect (void *cls,
658 struct GNUNET_SERVICE_Client *client,
659 struct GNUNET_MQ_Handle *mq)
660{
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
662
663 struct Client *c = GNUNET_malloc (sizeof (*c));
664 c->client = client;
665
666 return c;
667}
668
669
670/**
671 * Send message to all clients connected to the channel.
672 */
673static void
674client_send_msg (const struct Channel *chn,
675 const struct GNUNET_MessageHeader *msg)
676{
677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678 "Sending message to clients of channel %p.\n",
679 chn);
680
681 struct ClientList *cli = chn->clients_head;
682 while (NULL != cli)
683 {
684 struct GNUNET_MQ_Envelope *
685 env = GNUNET_MQ_msg_copy (msg);
686
687 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
688 env);
689 cli = cli->next;
690 }
691}
692
693
694/**
695 * Send a result code back to the client.
696 *
697 * @param client
698 * Client that should receive the result code.
699 * @param result_code
700 * Code to transmit.
701 * @param op_id
702 * Operation ID in network byte order.
703 * @param data
704 * Data payload or NULL.
705 * @param data_size
706 * Size of @a data.
707 */
708static void
709client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
710 int64_t result_code, const void *data, uint16_t data_size)
711{
712 struct GNUNET_OperationResultMessage *res;
713 struct GNUNET_MQ_Envelope *
714 env = GNUNET_MQ_msg_extra (res,
715 data_size,
716 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
717 res->result_code = GNUNET_htonll (result_code);
718 res->op_id = op_id;
719 if (0 < data_size)
720 GNUNET_memcpy (&res[1], data, data_size);
721
722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
724 client,
725 GNUNET_ntohll (op_id),
726 result_code,
727 data_size);
728
729 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
730}
731
732
733/**
734 * Closure for join_mem_test_cb()
735 */
736struct JoinMemTestClosure
737{
738 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
739 struct Channel *channel;
740 struct GNUNET_MULTICAST_JoinHandle *join_handle;
741 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
742};
743
744
745/**
746 * Membership test result callback used for join requests.
747 */
748static void
749join_mem_test_cb (void *cls, int64_t result,
750 const char *err_msg, uint16_t err_msg_size)
751{
752 struct JoinMemTestClosure *jcls = cls;
753
754 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
755 { /* Pass on join request to client if this is a master channel */
756 struct Master *mst = jcls->channel->master;
757 struct GNUNET_HashCode slave_pub_hash;
758 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
759 &slave_pub_hash);
760 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
761 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
762 client_send_msg (jcls->channel, &jcls->join_msg->header);
763 }
764 else
765 {
766 if (GNUNET_SYSERR == result)
767 {
768 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
769 "Could not perform membership test (%.*s)\n",
770 err_msg_size, err_msg);
771 }
772 // FIXME: add relays
773 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
774 }
775 GNUNET_free (jcls->join_msg);
776 GNUNET_free (jcls);
777}
778
779
780/**
781 * Incoming join request from multicast.
782 */
783static void
784mcast_recv_join_request (void *cls,
785 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
786 const struct GNUNET_MessageHeader *join_msg,
787 struct GNUNET_MULTICAST_JoinHandle *jh)
788{
789 struct Channel *chn = cls;
790 uint16_t join_msg_size = 0;
791
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "%p Got join request.\n",
794 chn);
795 if (NULL != join_msg)
796 {
797 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
798 {
799 join_msg_size = ntohs (join_msg->size);
800 }
801 else
802 {
803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
804 "%p Got join message with invalid type %u.\n",
805 chn,
806 ntohs (join_msg->type));
807 }
808 }
809
810 struct GNUNET_PSYC_JoinRequestMessage *
811 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
812 req->header.size = htons (sizeof (*req) + join_msg_size);
813 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
814 req->slave_pub_key = *slave_pub_key;
815 if (0 < join_msg_size)
816 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
817
818 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
819 jcls->slave_pub_key = *slave_pub_key;
820 jcls->channel = chn;
821 jcls->join_handle = jh;
822 jcls->join_msg = req;
823
824 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
825 chn->max_message_id, 0,
826 &join_mem_test_cb, jcls);
827}
828
829
830/**
831 * Join decision received from multicast.
832 */
833static void
834mcast_recv_join_decision (void *cls, int is_admitted,
835 const struct GNUNET_PeerIdentity *peer,
836 uint16_t relay_count,
837 const struct GNUNET_PeerIdentity *relays,
838 const struct GNUNET_MessageHeader *join_resp)
839{
840 struct Slave *slv = cls;
841 struct Channel *chn = &slv->channel;
842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843 "%p Got join decision: %d\n",
844 slv,
845 is_admitted);
846 if (GNUNET_YES == chn->is_ready)
847 {
848 /* Already admitted */
849 return;
850 }
851
852 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
853 struct GNUNET_PSYC_JoinDecisionMessage *
854 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
855 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
856 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
857 dcsn->is_admitted = htonl (is_admitted);
858 if (0 < join_resp_size)
859 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
860
861 client_send_msg (chn, &dcsn->header);
862
863 if (GNUNET_YES == is_admitted
864 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
865 {
866 chn->is_ready = GNUNET_YES;
867 }
868}
869
870
871static int
872store_recv_fragment_replay (void *cls,
873 struct GNUNET_MULTICAST_MessageHeader *msg,
874 enum GNUNET_PSYCSTORE_MessageFlags flags)
875{
876 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
877
878 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
879 return GNUNET_YES;
880}
881
882
883/**
884 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
885 */
886static void
887store_recv_fragment_replay_result (void *cls,
888 int64_t result,
889 const char *err_msg,
890 uint16_t err_msg_size)
891{
892 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
893
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
896 rh,
897 result,
898 err_msg_size,
899 err_msg);
900 switch (result)
901 {
902 case GNUNET_YES:
903 break;
904
905 case GNUNET_NO:
906 GNUNET_MULTICAST_replay_response (rh, NULL,
907 GNUNET_MULTICAST_REC_NOT_FOUND);
908 return;
909
910 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
911 GNUNET_MULTICAST_replay_response (rh, NULL,
912 GNUNET_MULTICAST_REC_ACCESS_DENIED);
913 return;
914
915 case GNUNET_SYSERR:
916 GNUNET_MULTICAST_replay_response (rh, NULL,
917 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
918 return;
919 }
920 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
921 * an error code, so it must be ensured no further processing
922 * is attempted on 'rh'. Maybe this should be refactored as
923 * it doesn't look very intuitive. --lynX
924 */
925 GNUNET_MULTICAST_replay_response_end (rh);
926}
927
928
929/**
930 * Incoming fragment replay request from multicast.
931 */
932static void
933mcast_recv_replay_fragment (void *cls,
934 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
935 uint64_t fragment_id, uint64_t flags,
936 struct GNUNET_MULTICAST_ReplayHandle *rh)
937
938{
939 struct Channel *chn = cls;
940 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
941 fragment_id, fragment_id,
942 &store_recv_fragment_replay,
943 &store_recv_fragment_replay_result, rh);
944}
945
946
947/**
948 * Incoming message replay request from multicast.
949 */
950static void
951mcast_recv_replay_message (void *cls,
952 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
953 uint64_t message_id,
954 uint64_t fragment_offset,
955 uint64_t flags,
956 struct GNUNET_MULTICAST_ReplayHandle *rh)
957{
958 struct Channel *chn = cls;
959 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
960 message_id, message_id, 1, NULL,
961 &store_recv_fragment_replay,
962 &store_recv_fragment_replay_result, rh);
963}
964
965
966/**
967 * Convert an uint64_t in network byte order to a HashCode
968 * that can be used as key in a MultiHashMap
969 */
970static inline void
971hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
972{
973 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
974 /* TODO: use built-in byte swap functions if available */
975
976 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
977 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
978
979 *key = (struct GNUNET_HashCode) {};
980 *((uint64_t *) key)
981 = (n << 32) | (n >> 32);
982}
983
984
985/**
986 * Convert an uint64_t in host byte order to a HashCode
987 * that can be used as key in a MultiHashMap
988 */
989static inline void
990hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
991{
992#if __BYTE_ORDER == __BIG_ENDIAN
993 hash_key_from_nll (key, n);
994#elif __BYTE_ORDER == __LITTLE_ENDIAN
995 *key = (struct GNUNET_HashCode) {};
996 *((uint64_t *) key) = n;
997#else
998 #error byteorder undefined
999#endif
1000}
1001
1002
1003/**
1004 * Initialize PSYC message header.
1005 */
1006static inline void
1007psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1008 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1009{
1010 uint16_t size = ntohs (mmsg->header.size);
1011 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1012
1013 pmsg->header.size = htons (psize);
1014 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1015 pmsg->message_id = mmsg->message_id;
1016 pmsg->fragment_offset = mmsg->fragment_offset;
1017 pmsg->flags = htonl (flags);
1018
1019 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1020}
1021
1022
1023/**
1024 * Create a new PSYC message from a multicast message for sending it to clients.
1025 */
1026static inline struct GNUNET_PSYC_MessageHeader *
1027psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1028{
1029 struct GNUNET_PSYC_MessageHeader *pmsg;
1030 uint16_t size = ntohs (mmsg->header.size);
1031 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1032
1033 pmsg = GNUNET_malloc (psize);
1034 psyc_msg_init (pmsg, mmsg, flags);
1035 return pmsg;
1036}
1037
1038
1039/**
1040 * Send multicast message to all clients connected to the channel.
1041 */
1042static void
1043client_send_mcast_msg (struct Channel *chn,
1044 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1045 uint32_t flags)
1046{
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1049 chn,
1050 GNUNET_ntohll (mmsg->fragment_id),
1051 GNUNET_ntohll (mmsg->message_id));
1052
1053 struct GNUNET_PSYC_MessageHeader *
1054 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1055 client_send_msg (chn, &pmsg->header);
1056 GNUNET_free (pmsg);
1057}
1058
1059
1060/**
1061 * Send multicast request to all clients connected to the channel.
1062 */
1063static void
1064client_send_mcast_req (struct Master *mst,
1065 const struct GNUNET_MULTICAST_RequestHeader *req)
1066{
1067 struct Channel *chn = &mst->channel;
1068
1069 struct GNUNET_PSYC_MessageHeader *pmsg;
1070 uint16_t size = ntohs (req->header.size);
1071 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1072
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1075 chn,
1076 GNUNET_ntohll (req->fragment_id),
1077 GNUNET_ntohll (req->request_id));
1078
1079 pmsg = GNUNET_malloc (psize);
1080 pmsg->header.size = htons (psize);
1081 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1082 pmsg->message_id = req->request_id;
1083 pmsg->fragment_offset = req->fragment_offset;
1084 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1085 pmsg->slave_pub_key = req->member_pub_key;
1086 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1087
1088 client_send_msg (chn, &pmsg->header);
1089
1090 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1091
1092 GNUNET_free (pmsg);
1093}
1094
1095
1096/**
1097 * Insert a multicast message fragment into the queue belonging to the message.
1098 *
1099 * @param chn Channel.
1100 * @param mmsg Multicast message fragment.
1101 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1102 * @param first_ptype First PSYC message part type in @a mmsg.
1103 * @param last_ptype Last PSYC message part type in @a mmsg.
1104 */
1105static void
1106fragment_queue_insert (struct Channel *chn,
1107 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1108 uint16_t first_ptype, uint16_t last_ptype)
1109{
1110 const uint16_t size = ntohs (mmsg->header.size);
1111 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1112 struct GNUNET_CONTAINER_MultiHashMap
1113 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1114 &chn->pub_key_hash);
1115
1116 struct GNUNET_HashCode msg_id_hash;
1117 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1118
1119 struct FragmentQueue
1120 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1121
1122 if (NULL == fragq)
1123 {
1124 fragq = GNUNET_malloc (sizeof (*fragq));
1125 fragq->state = MSG_FRAG_STATE_HEADER;
1126 fragq->fragments
1127 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1128
1129 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1130 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1131
1132 if (NULL == chan_msgs)
1133 {
1134 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1135 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1136 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1137 }
1138 }
1139
1140 struct GNUNET_HashCode frag_id_hash;
1141 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1142 struct RecvCacheEntry
1143 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1144 if (NULL == cache_entry)
1145 {
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1148 chn,
1149 GNUNET_ntohll (mmsg->message_id),
1150 GNUNET_ntohll (mmsg->fragment_id));
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "%p header_size: %" PRIu64 " + %u\n",
1153 chn,
1154 fragq->header_size,
1155 size);
1156 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1157 cache_entry->ref_count = 1;
1158 cache_entry->mmsg = GNUNET_malloc (size);
1159 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1160 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1161 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1162 }
1163 else
1164 {
1165 cache_entry->ref_count++;
1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1168 chn,
1169 GNUNET_ntohll (mmsg->message_id),
1170 GNUNET_ntohll (mmsg->fragment_id),
1171 cache_entry->ref_count);
1172 }
1173
1174 if (MSG_FRAG_STATE_HEADER == fragq->state)
1175 {
1176 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1177 {
1178 struct GNUNET_PSYC_MessageMethod *
1179 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1180 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1181 fragq->flags = ntohl (pmeth->flags);
1182 }
1183
1184 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1185 {
1186 fragq->header_size += size;
1187 }
1188 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1189 || frag_offset == fragq->header_size)
1190 { /* header is now complete */
1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192 "%p Header of message %" PRIu64 " is complete.\n",
1193 chn,
1194 GNUNET_ntohll (mmsg->message_id));
1195
1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197 "%p Adding message %" PRIu64 " to queue.\n",
1198 chn,
1199 GNUNET_ntohll (mmsg->message_id));
1200 fragq->state = MSG_FRAG_STATE_DATA;
1201 }
1202 else
1203 {
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1206 chn,
1207 GNUNET_ntohll (mmsg->message_id),
1208 frag_offset,
1209 fragq->header_size);
1210 }
1211 }
1212
1213 switch (last_ptype)
1214 {
1215 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1216 if (frag_offset == fragq->size)
1217 fragq->state = MSG_FRAG_STATE_END;
1218 else
1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1221 chn,
1222 GNUNET_ntohll (mmsg->message_id),
1223 frag_offset,
1224 fragq->size);
1225 break;
1226
1227 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1228 /* Drop message without delivering to client if it's a single fragment */
1229 fragq->state =
1230 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1231 ? MSG_FRAG_STATE_DROP
1232 : MSG_FRAG_STATE_CANCEL;
1233 }
1234
1235 switch (fragq->state)
1236 {
1237 case MSG_FRAG_STATE_DATA:
1238 case MSG_FRAG_STATE_END:
1239 case MSG_FRAG_STATE_CANCEL:
1240 if (GNUNET_NO == fragq->is_queued)
1241 {
1242 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1243 GNUNET_ntohll (mmsg->message_id));
1244 fragq->is_queued = GNUNET_YES;
1245 }
1246 }
1247
1248 fragq->size += size;
1249 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1250 GNUNET_ntohll (mmsg->fragment_id));
1251}
1252
1253
1254/**
1255 * Run fragment queue of a message.
1256 *
1257 * Send fragments of a message in order to client, after all modifiers arrived
1258 * from multicast.
1259 *
1260 * @param chn
1261 * Channel.
1262 * @param msg_id
1263 * ID of the message @a fragq belongs to.
1264 * @param fragq
1265 * Fragment queue of the message.
1266 * @param drop
1267 * Drop message without delivering to client?
1268 * #GNUNET_YES or #GNUNET_NO.
1269 */
1270static void
1271fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1272 struct FragmentQueue *fragq, uint8_t drop)
1273{
1274 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1275 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1276 chn,
1277 msg_id,
1278 fragq->state);
1279
1280 struct GNUNET_CONTAINER_MultiHashMap
1281 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1282 &chn->pub_key_hash);
1283 GNUNET_assert (NULL != chan_msgs);
1284 uint64_t frag_id;
1285
1286 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1287 &frag_id))
1288 {
1289 struct GNUNET_HashCode frag_id_hash;
1290 hash_key_from_hll (&frag_id_hash, frag_id);
1291 struct RecvCacheEntry *cache_entry
1292 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1293 if (cache_entry != NULL)
1294 {
1295 if (GNUNET_NO == drop)
1296 {
1297 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1298 }
1299 if (cache_entry->ref_count <= 1)
1300 {
1301 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1302 cache_entry);
1303 GNUNET_free (cache_entry->mmsg);
1304 GNUNET_free (cache_entry);
1305 }
1306 else
1307 {
1308 cache_entry->ref_count--;
1309 }
1310 }
1311#if CACHE_AGING_IMPLEMENTED
1312 else if (GNUNET_NO == drop)
1313 {
1314 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1315 }
1316#endif
1317
1318 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1319 }
1320
1321 if (MSG_FRAG_STATE_END <= fragq->state)
1322 {
1323 struct GNUNET_HashCode msg_id_hash;
1324 hash_key_from_hll (&msg_id_hash, msg_id);
1325
1326 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1327 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1328 GNUNET_free (fragq);
1329 }
1330 else
1331 {
1332 fragq->is_queued = GNUNET_NO;
1333 }
1334}
1335
1336
1337struct StateModifyClosure
1338{
1339 struct Channel *channel;
1340 uint64_t msg_id;
1341 struct GNUNET_HashCode msg_id_hash;
1342};
1343
1344
1345void
1346store_recv_state_modify_result (void *cls, int64_t result,
1347 const char *err_msg, uint16_t err_msg_size)
1348{
1349 struct StateModifyClosure *mcls = cls;
1350 struct Channel *chn = mcls->channel;
1351 uint64_t msg_id = mcls->msg_id;
1352
1353 struct FragmentQueue *
1354 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1355
1356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1358 chn, result, err_msg_size, err_msg);
1359
1360 switch (result)
1361 {
1362 case GNUNET_OK:
1363 case GNUNET_NO:
1364 if (NULL != fragq)
1365 fragq->state_is_modified = GNUNET_YES;
1366 if (chn->max_state_message_id < msg_id)
1367 chn->max_state_message_id = msg_id;
1368 if (chn->max_message_id < msg_id)
1369 chn->max_message_id = msg_id;
1370
1371 if (NULL != fragq)
1372 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1373 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1374 message_queue_run (chn);
1375 break;
1376
1377 default:
1378 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1379 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1380 chn, result, err_msg_size, err_msg);
1381 /** @todo FIXME: handle state_modify error */
1382 }
1383}
1384
1385
1386/**
1387 * Run message queue.
1388 *
1389 * Send messages in queue to client in order after a message has arrived from
1390 * multicast, according to the following:
1391 * - A message is only sent if all of its modifiers arrived.
1392 * - A stateful message is only sent if the previous stateful message
1393 * has already been delivered to the client.
1394 *
1395 * @param chn Channel.
1396 *
1397 * @return Number of messages removed from queue and sent to client.
1398 */
1399static uint64_t
1400message_queue_run (struct Channel *chn)
1401{
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "%p Running message queue.\n", chn);
1404 uint64_t n = 0;
1405 uint64_t msg_id;
1406
1407 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1408 &msg_id))
1409 {
1410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1411 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1412 struct GNUNET_HashCode msg_id_hash;
1413 hash_key_from_hll (&msg_id_hash, msg_id);
1414
1415 struct FragmentQueue *
1416 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1417
1418 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1419 {
1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421 "%p No fragq (%p) or header not complete.\n",
1422 chn, fragq);
1423 break;
1424 }
1425
1426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427 "%p Fragment queue entry: state: %u, state delta: "
1428 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1429 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1430
1431 if (MSG_FRAG_STATE_DATA <= fragq->state)
1432 {
1433 /* Check if there's a missing message before the current one */
1434 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1435 {
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1437
1438 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1439 && (chn->max_message_id != msg_id - 1
1440 && chn->max_message_id != msg_id))
1441 {
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "%p Out of order message. "
1444 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1445 chn, chn->max_message_id, msg_id);
1446 break;
1447 // FIXME: keep track of messages processed in this queue run,
1448 // and only stop after reaching the end
1449 }
1450 }
1451 else
1452 {
1453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1454 if (GNUNET_YES != fragq->state_is_modified)
1455 {
1456 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1457 {
1458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1459 "%p Out of order stateful message. "
1460 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1461 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1462 break;
1463 // FIXME: keep track of messages processed in this queue run,
1464 // and only stop after reaching the end
1465 }
1466
1467 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1468 mcls->channel = chn;
1469 mcls->msg_id = msg_id;
1470 mcls->msg_id_hash = msg_id_hash;
1471
1472 /* Apply modifiers to state in PSYCstore */
1473 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1474 fragq->state_delta,
1475 store_recv_state_modify_result, mcls);
1476 break; // continue after asynchronous state modify result
1477 }
1478 }
1479 chn->max_message_id = msg_id;
1480 }
1481 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1482 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1483 n++;
1484 }
1485
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1488 return n;
1489}
1490
1491
1492/**
1493 * Drop message queue of a channel.
1494 *
1495 * Remove all messages in queue without sending it to clients.
1496 *
1497 * @param chn Channel.
1498 *
1499 * @return Number of messages removed from queue.
1500 */
1501static uint64_t
1502message_queue_drop (struct Channel *chn)
1503{
1504 uint64_t n = 0;
1505 uint64_t msg_id;
1506 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1507 &msg_id))
1508 {
1509 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1511 struct GNUNET_HashCode msg_id_hash;
1512 hash_key_from_hll (&msg_id_hash, msg_id);
1513
1514 struct FragmentQueue *
1515 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1516 GNUNET_assert (NULL != fragq);
1517 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1518 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1519 n++;
1520 }
1521 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1523 return n;
1524}
1525
1526
1527/**
1528 * Received result of GNUNET_PSYCSTORE_fragment_store().
1529 */
1530static void
1531store_recv_fragment_store_result (void *cls, int64_t result,
1532 const char *err_msg, uint16_t err_msg_size)
1533{
1534 struct Channel *chn = cls;
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1537 chn, result, err_msg_size, err_msg);
1538}
1539
1540
1541/**
1542 * Handle incoming message fragment from multicast.
1543 *
1544 * Store it using PSYCstore and send it to the clients of the channel in order.
1545 */
1546static void
1547mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1548{
1549 struct Channel *chn = cls;
1550 uint16_t size = ntohs (mmsg->header.size);
1551
1552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553 "%p Received multicast message of size %u. "
1554 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1555 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1556 chn, size,
1557 GNUNET_ntohll (mmsg->fragment_id),
1558 GNUNET_ntohll (mmsg->message_id),
1559 GNUNET_ntohll (mmsg->fragment_offset),
1560 GNUNET_ntohll (mmsg->flags));
1561
1562 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1563 &store_recv_fragment_store_result, chn);
1564
1565 uint16_t first_ptype = 0, last_ptype = 0;
1566 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1567 (const char *) &mmsg[1],
1568 &first_ptype, &last_ptype);
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "%p Message check result %d, first part type %u, last part type %u\n",
1571 chn, check, first_ptype, last_ptype);
1572 if (GNUNET_SYSERR == check)
1573 {
1574 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1575 "%p Dropping incoming multicast message with invalid parts.\n",
1576 chn);
1577 GNUNET_break_op (0);
1578 return;
1579 }
1580
1581 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1582 message_queue_run (chn);
1583}
1584
1585
1586/**
1587 * Incoming request fragment from multicast for a master.
1588 *
1589 * @param cls Master.
1590 * @param req The request.
1591 */
1592static void
1593mcast_recv_request (void *cls,
1594 const struct GNUNET_MULTICAST_RequestHeader *req)
1595{
1596 struct Master *mst = cls;
1597 uint16_t size = ntohs (req->header.size);
1598
1599 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601 "%p Received multicast request of size %u from %s.\n",
1602 mst, size, str);
1603 GNUNET_free (str);
1604
1605 uint16_t first_ptype = 0, last_ptype = 0;
1606 if (GNUNET_SYSERR
1607 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1608 (const char *) &req[1],
1609 &first_ptype, &last_ptype))
1610 {
1611 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1612 "%p Dropping incoming multicast request with invalid parts.\n",
1613 mst);
1614 GNUNET_break_op (0);
1615 return;
1616 }
1617
1618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619 "Message parts: first: type %u, last: type %u\n",
1620 first_ptype, last_ptype);
1621
1622 /* FIXME: in-order delivery */
1623 client_send_mcast_req (mst, req);
1624}
1625
1626
1627/**
1628 * Response from PSYCstore with the current counter values for a channel master.
1629 */
1630static void
1631store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1632 uint64_t max_message_id, uint64_t max_group_generation,
1633 uint64_t max_state_message_id)
1634{
1635 struct Master *mst = cls;
1636 struct Channel *chn = &mst->channel;
1637 chn->store_op = NULL;
1638
1639 struct GNUNET_PSYC_CountersResultMessage res;
1640 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1641 res.header.size = htons (sizeof (res));
1642 res.result_code = htonl (result);
1643 res.max_message_id = GNUNET_htonll (max_message_id);
1644
1645 if (GNUNET_OK == result || GNUNET_NO == result)
1646 {
1647 mst->max_message_id = max_message_id;
1648 chn->max_message_id = max_message_id;
1649 chn->max_state_message_id = max_state_message_id;
1650 mst->max_group_generation = max_group_generation;
1651 mst->origin
1652 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1653 mcast_recv_join_request,
1654 mcast_recv_replay_fragment,
1655 mcast_recv_replay_message,
1656 mcast_recv_request,
1657 mcast_recv_message, chn);
1658 chn->is_ready = GNUNET_YES;
1659 }
1660 else
1661 {
1662 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1663 "%p GNUNET_PSYCSTORE_counters_get() "
1664 "returned %d for channel %s.\n",
1665 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1666 }
1667
1668 client_send_msg (chn, &res.header);
1669}
1670
1671
1672/**
1673 * Response from PSYCstore with the current counter values for a channel slave.
1674 */
1675void
1676store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1677 uint64_t max_message_id, uint64_t max_group_generation,
1678 uint64_t max_state_message_id)
1679{
1680 struct Slave *slv = cls;
1681 struct Channel *chn = &slv->channel;
1682 chn->store_op = NULL;
1683
1684 struct GNUNET_PSYC_CountersResultMessage res;
1685 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1686 res.header.size = htons (sizeof (res));
1687 res.result_code = htonl (result);
1688 res.max_message_id = GNUNET_htonll (max_message_id);
1689
1690 if (GNUNET_YES == result || GNUNET_NO == result)
1691 {
1692 chn->max_message_id = max_message_id;
1693 chn->max_state_message_id = max_state_message_id;
1694 slv->member
1695 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1696 &slv->origin,
1697 slv->relay_count, slv->relays,
1698 &slv->join_msg->header,
1699 mcast_recv_join_request,
1700 mcast_recv_join_decision,
1701 mcast_recv_replay_fragment,
1702 mcast_recv_replay_message,
1703 mcast_recv_message, chn);
1704 if (NULL != slv->join_msg)
1705 {
1706 GNUNET_free (slv->join_msg);
1707 slv->join_msg = NULL;
1708 }
1709 }
1710 else
1711 {
1712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713 "%p GNUNET_PSYCSTORE_counters_get() "
1714 "returned %d for channel %s.\n",
1715 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1716 }
1717
1718 client_send_msg (chn, &res.header);
1719}
1720
1721
1722static void
1723channel_init (struct Channel *chn)
1724{
1725 chn->recv_msgs
1726 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1727 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1728}
1729
1730
1731/**
1732 * Handle a connecting client starting a channel master.
1733 */
1734static void
1735handle_client_master_start (void *cls,
1736 const struct MasterStartRequest *req)
1737{
1738 struct Client *c = cls;
1739 struct GNUNET_SERVICE_Client *client = c->client;
1740
1741 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1742 struct GNUNET_HashCode pub_key_hash;
1743
1744 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1745 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1746
1747 struct Master *
1748 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1749 struct Channel *chn;
1750
1751 if (NULL == mst)
1752 {
1753 mst = GNUNET_malloc (sizeof (*mst));
1754 mst->policy = ntohl (req->policy);
1755 mst->priv_key = req->channel_key;
1756 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1757
1758 chn = c->channel = &mst->channel;
1759 chn->master = mst;
1760 chn->is_master = GNUNET_YES;
1761 chn->pub_key = pub_key;
1762 chn->pub_key_hash = pub_key_hash;
1763 channel_init (chn);
1764
1765 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1766 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1767 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1768 store_recv_master_counters, mst);
1769 }
1770 else
1771 {
1772 chn = &mst->channel;
1773
1774 struct GNUNET_PSYC_CountersResultMessage *res;
1775 struct GNUNET_MQ_Envelope *
1776 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1777 res->result_code = htonl (GNUNET_OK);
1778 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1779
1780 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1781 }
1782
1783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784 "%p Client connected as master to channel %s.\n",
1785 mst, GNUNET_h2s (&chn->pub_key_hash));
1786
1787 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1788 cli->client = client;
1789 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1790
1791 GNUNET_SERVICE_client_continue (client);
1792}
1793
1794
1795static int
1796check_client_slave_join (void *cls,
1797 const struct SlaveJoinRequest *req)
1798{
1799 return GNUNET_OK;
1800}
1801
1802
1803/**
1804 * Handle a connecting client joining as a channel slave.
1805 */
1806static void
1807handle_client_slave_join (void *cls,
1808 const struct SlaveJoinRequest *req)
1809{
1810 struct Client *c = cls;
1811 struct GNUNET_SERVICE_Client *client = c->client;
1812
1813 uint16_t req_size = ntohs (req->header.size);
1814
1815 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1816 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1817
1818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819 "got join request from client %p\n",
1820 client);
1821 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1822 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1823 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1824
1825 struct GNUNET_CONTAINER_MultiHashMap *
1826 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1827 struct Slave *slv = NULL;
1828 struct Channel *chn;
1829
1830 if (NULL != chn_slv)
1831 {
1832 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1833 }
1834 if (NULL == slv)
1835 {
1836 slv = GNUNET_malloc (sizeof (*slv));
1837 slv->priv_key = req->slave_key;
1838 slv->pub_key = slv_pub_key;
1839 slv->pub_key_hash = slv_pub_hash;
1840 slv->origin = req->origin;
1841 slv->relay_count = ntohl (req->relay_count);
1842 slv->join_flags = ntohl (req->flags);
1843
1844 const struct GNUNET_PeerIdentity *
1845 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1846 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1847 uint16_t join_msg_size = 0;
1848
1849 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1850 <= req_size)
1851 {
1852 struct GNUNET_PSYC_Message *
1853 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1854 join_msg_size = ntohs (join_msg->header.size);
1855 slv->join_msg = GNUNET_malloc (join_msg_size);
1856 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1857 }
1858 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1859 {
1860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1861 "%u + %u + %u != %u\n",
1862 (unsigned int) sizeof (*req),
1863 relay_size,
1864 join_msg_size,
1865 req_size);
1866 GNUNET_break (0);
1867 GNUNET_SERVICE_client_drop (client);
1868 GNUNET_free (slv);
1869 return;
1870 }
1871 if (0 < slv->relay_count)
1872 {
1873 slv->relays = GNUNET_malloc (relay_size);
1874 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1875 }
1876
1877 chn = c->channel = &slv->channel;
1878 chn->slave = slv;
1879 chn->is_master = GNUNET_NO;
1880 chn->pub_key = req->channel_pub_key;
1881 chn->pub_key_hash = pub_key_hash;
1882 channel_init (chn);
1883
1884 if (NULL == chn_slv)
1885 {
1886 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1887 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1888 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1889 }
1890 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1891 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1892 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1893 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1894 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1895 &store_recv_slave_counters, slv);
1896 }
1897 else
1898 {
1899 chn = &slv->channel;
1900
1901 struct GNUNET_PSYC_CountersResultMessage *res;
1902
1903 struct GNUNET_MQ_Envelope *
1904 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1905 res->result_code = htonl (GNUNET_OK);
1906 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1907
1908 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1909
1910 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1911 {
1912 mcast_recv_join_decision (slv, GNUNET_YES,
1913 NULL, 0, NULL, NULL);
1914 }
1915 else if (NULL == slv->member)
1916 {
1917 slv->member
1918 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1919 &slv->origin,
1920 slv->relay_count, slv->relays,
1921 &slv->join_msg->header,
1922 &mcast_recv_join_request,
1923 &mcast_recv_join_decision,
1924 &mcast_recv_replay_fragment,
1925 &mcast_recv_replay_message,
1926 &mcast_recv_message, chn);
1927 if (NULL != slv->join_msg)
1928 {
1929 GNUNET_free (slv->join_msg);
1930 slv->join_msg = NULL;
1931 }
1932 }
1933 else if (NULL != slv->join_dcsn)
1934 {
1935 struct GNUNET_MQ_Envelope *
1936 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1937 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1938 }
1939 }
1940
1941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942 "Client %p connected as slave to channel %s.\n",
1943 client,
1944 GNUNET_h2s (&chn->pub_key_hash));
1945
1946 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1947 cli->client = client;
1948 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1949
1950 GNUNET_SERVICE_client_continue (client);
1951}
1952
1953
1954struct JoinDecisionClosure
1955{
1956 int32_t is_admitted;
1957 struct GNUNET_MessageHeader *msg;
1958};
1959
1960
1961/**
1962 * Iterator callback for sending join decisions to multicast.
1963 */
1964static int
1965mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1966 void *value)
1967{
1968 struct JoinDecisionClosure *jcls = cls;
1969 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1970 // FIXME: add relays
1971 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1972 return GNUNET_YES;
1973}
1974
1975
1976static int
1977check_client_join_decision (void *cls,
1978 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1979{
1980 return GNUNET_OK;
1981}
1982
1983
1984/**
1985 * Join decision from client.
1986 */
1987static void
1988handle_client_join_decision (void *cls,
1989 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1990{
1991 struct Client *c = cls;
1992 struct GNUNET_SERVICE_Client *client = c->client;
1993 struct Channel *chn = c->channel;
1994 if (NULL == chn)
1995 {
1996 GNUNET_break (0);
1997 GNUNET_SERVICE_client_drop (client);
1998 return;
1999 }
2000 GNUNET_assert (GNUNET_YES == chn->is_master);
2001 struct Master *mst = chn->master;
2002
2003 struct JoinDecisionClosure jcls;
2004 jcls.is_admitted = ntohl (dcsn->is_admitted);
2005 jcls.msg
2006 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2007 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2008 : NULL;
2009
2010 struct GNUNET_HashCode slave_pub_hash;
2011 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2012 &slave_pub_hash);
2013
2014 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015 "%p Got join decision (%d) from client for channel %s..\n",
2016 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018 "%p ..and slave %s.\n",
2019 mst, GNUNET_h2s (&slave_pub_hash));
2020
2021 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2022 &mcast_send_join_decision, &jcls);
2023 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2024 GNUNET_SERVICE_client_continue (client);
2025}
2026
2027
2028static void
2029channel_part_cb (void *cls)
2030{
2031 struct GNUNET_SERVICE_Client *client = cls;
2032 struct GNUNET_MQ_Envelope *env;
2033
2034 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2035 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2036 env);
2037}
2038
2039
2040static void
2041handle_client_part_request (void *cls,
2042 const struct GNUNET_MessageHeader *msg)
2043{
2044 struct Client *c = cls;
2045
2046 c->channel->is_disconnecting = GNUNET_YES;
2047 if (GNUNET_YES == c->channel->is_master)
2048 {
2049 struct Master *mst = (struct Master *) c->channel;
2050
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "Got part request from master %p\n",
2053 mst);
2054 GNUNET_assert (NULL != mst->origin);
2055 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2056 }
2057 else
2058 {
2059 struct Slave *slv = (struct Slave *) c->channel;
2060
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Got part request from slave %p\n",
2063 slv);
2064 GNUNET_assert (NULL != slv->member);
2065 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2066 }
2067 GNUNET_SERVICE_client_continue (c->client);
2068}
2069
2070
2071/**
2072 * Send acknowledgement to a client.
2073 *
2074 * Sent after a message fragment has been passed on to multicast.
2075 *
2076 * @param chn The channel struct for the client.
2077 */
2078static void
2079send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2080{
2081 struct GNUNET_MessageHeader *res;
2082 struct GNUNET_MQ_Envelope *
2083 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2084
2085 /* FIXME? */
2086 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2087}
2088
2089
2090/**
2091 * Callback for the transmit functions of multicast.
2092 */
2093static int
2094transmit_notify (void *cls, size_t *data_size, void *data)
2095{
2096 struct Channel *chn = cls;
2097 struct TransmitMessage *tmit_msg = chn->tmit_head;
2098
2099 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2100 {
2101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102 "%p transmit_notify: nothing to send.\n", chn);
2103 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2104 GNUNET_break (0);
2105 *data_size = 0;
2106 return GNUNET_NO;
2107 }
2108
2109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2111
2112 *data_size = tmit_msg->size;
2113 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2114
2115 int ret
2116 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2117 ? GNUNET_NO
2118 : GNUNET_YES;
2119
2120 /* FIXME: handle disconnecting clients */
2121 if (NULL != tmit_msg->client)
2122 send_message_ack (chn, tmit_msg->client);
2123
2124 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2125
2126 if (NULL != chn->tmit_head)
2127 {
2128 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2129 }
2130 else if (GNUNET_YES == chn->is_disconnecting
2131 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2132 {
2133 /* FIXME: handle partial message (when still in_transmit) */
2134 GNUNET_free (tmit_msg);
2135 return GNUNET_SYSERR;
2136 }
2137 GNUNET_free (tmit_msg);
2138 return ret;
2139}
2140
2141
2142/**
2143 * Callback for the transmit functions of multicast.
2144 */
2145static int
2146master_transmit_notify (void *cls, size_t *data_size, void *data)
2147{
2148 int ret = transmit_notify (cls, data_size, data);
2149
2150 if (GNUNET_YES == ret)
2151 {
2152 struct Master *mst = cls;
2153 mst->tmit_handle = NULL;
2154 }
2155 return ret;
2156}
2157
2158
2159/**
2160 * Callback for the transmit functions of multicast.
2161 */
2162static int
2163slave_transmit_notify (void *cls, size_t *data_size, void *data)
2164{
2165 int ret = transmit_notify (cls, data_size, data);
2166
2167 if (GNUNET_YES == ret)
2168 {
2169 struct Slave *slv = cls;
2170 slv->tmit_handle = NULL;
2171 }
2172 return ret;
2173}
2174
2175
2176/**
2177 * Transmit a message from a channel master to the multicast group.
2178 */
2179static void
2180master_transmit_message (struct Master *mst)
2181{
2182 struct Channel *chn = &mst->channel;
2183 struct TransmitMessage *tmit_msg = chn->tmit_head;
2184 if (NULL == tmit_msg)
2185 return;
2186 if (NULL == mst->tmit_handle)
2187 {
2188 mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2189 tmit_msg->id,
2190 mst->max_group_generation,
2191 &master_transmit_notify,
2192 mst);
2193 }
2194 else
2195 {
2196 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2197 }
2198}
2199
2200
2201/**
2202 * Transmit a message from a channel slave to the multicast group.
2203 */
2204static void
2205slave_transmit_message (struct Slave *slv)
2206{
2207 if (NULL == slv->channel.tmit_head)
2208 return;
2209 if (NULL == slv->tmit_handle)
2210 {
2211 slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2212 slv->channel.tmit_head->id,
2213 &slave_transmit_notify,
2214 slv);
2215 }
2216 else
2217 {
2218 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2219 }
2220}
2221
2222
2223static void
2224transmit_message (struct Channel *chn)
2225{
2226 chn->is_master
2227 ? master_transmit_message (chn->master)
2228 : slave_transmit_message (chn->slave);
2229}
2230
2231
2232/**
2233 * Queue a message from a channel master for sending to the multicast group.
2234 */
2235static void
2236master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2237{
2238 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2239 {
2240 tmit_msg->id = ++mst->max_message_id;
2241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242 "%p master_queue_message: message_id=%" PRIu64 "\n",
2243 mst, tmit_msg->id);
2244 struct GNUNET_PSYC_MessageMethod *pmeth
2245 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2246
2247 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2248 {
2249 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2250 }
2251 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2252 {
2253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2255 mst, tmit_msg->id - mst->max_state_message_id);
2256 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2257 - mst->max_state_message_id);
2258 mst->max_state_message_id = tmit_msg->id;
2259 }
2260 else
2261 {
2262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2263 "%p master_queue_message: state not modified\n", mst);
2264 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2265 }
2266
2267 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2268 {
2269 /// @todo add state_hash to PSYC header
2270 }
2271 }
2272}
2273
2274
2275/**
2276 * Queue a message from a channel slave for sending to the multicast group.
2277 */
2278static void
2279slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2280{
2281 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2282 {
2283 struct GNUNET_PSYC_MessageMethod *pmeth
2284 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2285 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2286 tmit_msg->id = ++slv->max_request_id;
2287 }
2288}
2289
2290
2291/**
2292 * Queue PSYC message parts for sending to multicast.
2293 *
2294 * @param chn
2295 * Channel to send to.
2296 * @param client
2297 * Client the message originates from.
2298 * @param data_size
2299 * Size of @a data.
2300 * @param data
2301 * Concatenated message parts.
2302 * @param first_ptype
2303 * First message part type in @a data.
2304 * @param last_ptype
2305 * Last message part type in @a data.
2306 */
2307static struct TransmitMessage *
2308queue_message (struct Channel *chn,
2309 struct GNUNET_SERVICE_Client *client,
2310 size_t data_size,
2311 const void *data,
2312 uint16_t first_ptype, uint16_t last_ptype)
2313{
2314 struct TransmitMessage *
2315 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2316 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2317 tmit_msg->client = client;
2318 tmit_msg->size = data_size;
2319 tmit_msg->first_ptype = first_ptype;
2320 tmit_msg->last_ptype = last_ptype;
2321
2322 /* FIXME: separate queue per message ID */
2323
2324 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2325
2326 chn->is_master
2327 ? master_queue_message (chn->master, tmit_msg)
2328 : slave_queue_message (chn->slave, tmit_msg);
2329 return tmit_msg;
2330}
2331
2332
2333/**
2334 * Cancel transmission of current message.
2335 *
2336 * @param chn Channel to send to.
2337 * @param client Client the message originates from.
2338 */
2339static void
2340transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2341{
2342 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2343
2344 struct GNUNET_MessageHeader msg;
2345 msg.size = htons (sizeof (msg));
2346 msg.type = htons (type);
2347
2348 queue_message (chn, client, sizeof (msg), &msg, type, type);
2349 transmit_message (chn);
2350
2351 /* FIXME: cleanup */
2352}
2353
2354
2355static int
2356check_client_psyc_message (void *cls,
2357 const struct GNUNET_MessageHeader *msg)
2358{
2359 return GNUNET_OK;
2360}
2361
2362
2363/**
2364 * Incoming message from a master or slave client.
2365 */
2366static void
2367handle_client_psyc_message (void *cls,
2368 const struct GNUNET_MessageHeader *msg)
2369{
2370 struct Client *c = cls;
2371 struct GNUNET_SERVICE_Client *client = c->client;
2372 struct Channel *chn = c->channel;
2373 if (NULL == chn)
2374 {
2375 GNUNET_break (0);
2376 GNUNET_SERVICE_client_drop (client);
2377 return;
2378 }
2379
2380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381 "%p Received message from client.\n", chn);
2382 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2383
2384 if (GNUNET_YES != chn->is_ready)
2385 {
2386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2387 "%p Channel is not ready yet, disconnecting client %p.\n",
2388 chn,
2389 client);
2390 GNUNET_break (0);
2391 GNUNET_SERVICE_client_drop (client);
2392 return;
2393 }
2394
2395 uint16_t size = ntohs (msg->size);
2396 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2397 {
2398 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2399 "%p Message payload too large: %u < %u.\n",
2400 chn,
2401 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2402 (unsigned int) (size - sizeof (*msg)));
2403 GNUNET_break (0);
2404 transmit_cancel (chn, client);
2405 GNUNET_SERVICE_client_drop (client);
2406 return;
2407 }
2408
2409 uint16_t first_ptype = 0, last_ptype = 0;
2410 if (GNUNET_SYSERR
2411 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2412 (const char *) &msg[1],
2413 &first_ptype, &last_ptype))
2414 {
2415 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2416 "%p Received invalid message part from client.\n", chn);
2417 GNUNET_break (0);
2418 transmit_cancel (chn, client);
2419 GNUNET_SERVICE_client_drop (client);
2420 return;
2421 }
2422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2423 "%p Received message with first part type %u and last part type %u.\n",
2424 chn, first_ptype, last_ptype);
2425
2426 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2427 first_ptype, last_ptype);
2428 transmit_message (chn);
2429 /* FIXME: send a few ACKs even before transmit_notify is called */
2430
2431 GNUNET_SERVICE_client_continue (client);
2432};
2433
2434
2435/**
2436 * Received result of GNUNET_PSYCSTORE_membership_store()
2437 */
2438static void
2439store_recv_membership_store_result (void *cls,
2440 int64_t result,
2441 const char *err_msg,
2442 uint16_t err_msg_size)
2443{
2444 struct Operation *op = cls;
2445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2447 op->channel,
2448 result,
2449 (int) err_msg_size,
2450 err_msg);
2451
2452 if (NULL != op->client)
2453 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2454 op_remove (op);
2455}
2456
2457
2458/**
2459 * Client requests to add/remove a slave in the membership database.
2460 */
2461static void
2462handle_client_membership_store (void *cls,
2463 const struct ChannelMembershipStoreRequest *req)
2464{
2465 struct Client *c = cls;
2466 struct GNUNET_SERVICE_Client *client = c->client;
2467 struct Channel *chn = c->channel;
2468 if (NULL == chn)
2469 {
2470 GNUNET_break (0);
2471 GNUNET_SERVICE_client_drop (client);
2472 return;
2473 }
2474
2475 struct Operation *op = op_add (chn, client, req->op_id, 0);
2476
2477 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2478 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480 "%p Received membership store request from client.\n", chn);
2481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2483 chn, req->did_join, announced_at, effective_since);
2484
2485 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2486 req->did_join, announced_at, effective_since,
2487 0, /* FIXME: group_generation */
2488 &store_recv_membership_store_result, op);
2489 GNUNET_SERVICE_client_continue (client);
2490}
2491
2492
2493/**
2494 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2495 * in response to a history request from a client.
2496 */
2497static int
2498store_recv_fragment_history (void *cls,
2499 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2500 enum GNUNET_PSYCSTORE_MessageFlags flags)
2501{
2502 struct Operation *op = cls;
2503 if (NULL == op->client)
2504 { /* Requesting client already disconnected. */
2505 return GNUNET_NO;
2506 }
2507 struct Channel *chn = op->channel;
2508
2509 struct GNUNET_PSYC_MessageHeader *pmsg;
2510 uint16_t msize = ntohs (mmsg->header.size);
2511 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2512
2513 struct GNUNET_OperationResultMessage *
2514 res = GNUNET_malloc (sizeof (*res) + psize);
2515 res->header.size = htons (sizeof (*res) + psize);
2516 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2517 res->op_id = op->op_id;
2518 res->result_code = GNUNET_htonll (GNUNET_OK);
2519
2520 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2521 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2522 GNUNET_memcpy (&res[1], pmsg, psize);
2523
2524 /** @todo FIXME: send only to requesting client */
2525 client_send_msg (chn, &res->header);
2526
2527 GNUNET_free (res);
2528 return GNUNET_YES;
2529}
2530
2531
2532/**
2533 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2534 * in response to a history request from a client.
2535 */
2536static void
2537store_recv_fragment_history_result (void *cls, int64_t result,
2538 const char *err_msg, uint16_t err_msg_size)
2539{
2540 struct Operation *op = cls;
2541 if (NULL == op->client)
2542 { /* Requesting client already disconnected. */
2543 return;
2544 }
2545
2546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547 "%p History replay #%" PRIu64 ": "
2548 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2549 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2550
2551 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2552 {
2553 /** @todo Multicast replay request for messages not found locally. */
2554 }
2555
2556 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2557 op_remove (op);
2558}
2559
2560
2561static int
2562check_client_history_replay (void *cls,
2563 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2564{
2565 return GNUNET_OK;
2566}
2567
2568
2569/**
2570 * Client requests channel history.
2571 */
2572static void
2573handle_client_history_replay (void *cls,
2574 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2575{
2576 struct Client *c = cls;
2577 struct GNUNET_SERVICE_Client *client = c->client;
2578 struct Channel *chn = c->channel;
2579 if (NULL == chn)
2580 {
2581 GNUNET_break (0);
2582 GNUNET_SERVICE_client_drop (client);
2583 return;
2584 }
2585
2586 uint16_t size = ntohs (req->header.size);
2587 const char *method_prefix = (const char *) &req[1];
2588
2589 if (size < sizeof (*req) + 1
2590 || '\0' != method_prefix[size - sizeof (*req) - 1])
2591 {
2592 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2593 "%p History replay #%" PRIu64 ": "
2594 "invalid method prefix. size: %u < %u?\n",
2595 chn,
2596 GNUNET_ntohll (req->op_id),
2597 size,
2598 (unsigned int) sizeof (*req) + 1);
2599 GNUNET_break (0);
2600 GNUNET_SERVICE_client_drop (client);
2601 return;
2602 }
2603
2604 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2605
2606 if (0 == req->message_limit)
2607 {
2608 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2609 GNUNET_ntohll (req->start_message_id),
2610 GNUNET_ntohll (req->end_message_id),
2611 0, method_prefix,
2612 &store_recv_fragment_history,
2613 &store_recv_fragment_history_result, op);
2614 }
2615 else
2616 {
2617 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2618 GNUNET_ntohll (req->message_limit),
2619 method_prefix,
2620 &store_recv_fragment_history,
2621 &store_recv_fragment_history_result,
2622 op);
2623 }
2624 GNUNET_SERVICE_client_continue (client);
2625}
2626
2627
2628/**
2629 * Received state var from PSYCstore, send it to client.
2630 */
2631static int
2632store_recv_state_var (void *cls, const char *name,
2633 const void *value, uint32_t value_size)
2634{
2635 struct Operation *op = cls;
2636 struct GNUNET_OperationResultMessage *res;
2637 struct GNUNET_MQ_Envelope *env;
2638
2639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2640 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2641 op->channel, GNUNET_ntohll (op->op_id), name);
2642
2643 if (NULL != name) /* First part */
2644 {
2645 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2646 struct GNUNET_PSYC_MessageModifier *mod;
2647 env = GNUNET_MQ_msg_extra (res,
2648 sizeof (*mod) + name_size + value_size,
2649 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2650 res->op_id = op->op_id;
2651
2652 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2653 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2654 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2655 mod->name_size = htons (name_size);
2656 mod->value_size = htonl (value_size);
2657 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2658 GNUNET_memcpy (&mod[1], name, name_size);
2659 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2660 }
2661 else /* Continuation */
2662 {
2663 struct GNUNET_MessageHeader *mod;
2664 env = GNUNET_MQ_msg_extra (res,
2665 sizeof (*mod) + value_size,
2666 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2667 res->op_id = op->op_id;
2668
2669 mod = (struct GNUNET_MessageHeader *) &res[1];
2670 mod->size = htons (sizeof (*mod) + value_size);
2671 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2672 GNUNET_memcpy (&mod[1], value, value_size);
2673 }
2674
2675 // FIXME: client might have been disconnected
2676 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2677 return GNUNET_YES;
2678}
2679
2680
2681/**
2682 * Received result of GNUNET_PSYCSTORE_state_get()
2683 * or GNUNET_PSYCSTORE_state_get_prefix()
2684 */
2685static void
2686store_recv_state_result (void *cls, int64_t result,
2687 const char *err_msg, uint16_t err_msg_size)
2688{
2689 struct Operation *op = cls;
2690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2691 "%p state_get #%" PRIu64 ": "
2692 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2693 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2694
2695 // FIXME: client might have been disconnected
2696 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2697 op_remove (op);
2698}
2699
2700
2701static int
2702check_client_state_get (void *cls,
2703 const struct StateRequest *req)
2704{
2705 struct Client *c = cls;
2706 struct Channel *chn = c->channel;
2707 if (NULL == chn)
2708 {
2709 GNUNET_break (0);
2710 return GNUNET_SYSERR;
2711 }
2712
2713 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2714 const char *name = (const char *) &req[1];
2715 if (0 == name_size || '\0' != name[name_size - 1])
2716 {
2717 GNUNET_break (0);
2718 return GNUNET_SYSERR;
2719 }
2720
2721 return GNUNET_OK;
2722}
2723
2724
2725/**
2726 * Client requests best matching state variable from PSYCstore.
2727 */
2728static void
2729handle_client_state_get (void *cls,
2730 const struct StateRequest *req)
2731{
2732 struct Client *c = cls;
2733 struct GNUNET_SERVICE_Client *client = c->client;
2734 struct Channel *chn = c->channel;
2735
2736 const char *name = (const char *) &req[1];
2737 struct Operation *op = op_add (chn, client, req->op_id, 0);
2738 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2739 &store_recv_state_var,
2740 &store_recv_state_result, op);
2741 GNUNET_SERVICE_client_continue (client);
2742}
2743
2744
2745static int
2746check_client_state_get_prefix (void *cls,
2747 const struct StateRequest *req)
2748{
2749 struct Client *c = cls;
2750 struct Channel *chn = c->channel;
2751 if (NULL == chn)
2752 {
2753 GNUNET_break (0);
2754 return GNUNET_SYSERR;
2755 }
2756
2757 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2758 const char *name = (const char *) &req[1];
2759 if (0 == name_size || '\0' != name[name_size - 1])
2760 {
2761 GNUNET_break (0);
2762 return GNUNET_SYSERR;
2763 }
2764
2765 return GNUNET_OK;
2766}
2767
2768
2769/**
2770 * Client requests state variables with a given prefix from PSYCstore.
2771 */
2772static void
2773handle_client_state_get_prefix (void *cls,
2774 const struct StateRequest *req)
2775{
2776 struct Client *c = cls;
2777 struct GNUNET_SERVICE_Client *client = c->client;
2778 struct Channel *chn = c->channel;
2779
2780 const char *name = (const char *) &req[1];
2781 struct Operation *op = op_add (chn, client, req->op_id, 0);
2782 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2783 &store_recv_state_var,
2784 &store_recv_state_result, op);
2785 GNUNET_SERVICE_client_continue (client);
2786}
2787
2788
2789/**
2790 * Initialize the PSYC service.
2791 *
2792 * @param cls Closure.
2793 * @param server The initialized server.
2794 * @param c Configuration to use.
2795 */
2796static void
2797run (void *cls,
2798 const struct GNUNET_CONFIGURATION_Handle *c,
2799 struct GNUNET_SERVICE_Handle *svc)
2800{
2801 cfg = c;
2802 service = svc;
2803 store = GNUNET_PSYCSTORE_connect (cfg);
2804 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2805 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2806 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2807 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2808 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2809 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2810}
2811
2812
2813/**
2814 * Define "main" method using service macro.
2815 */
2816GNUNET_SERVICE_MAIN
2817("psyc",
2818 GNUNET_SERVICE_OPTION_NONE,
2819 &run,
2820 &client_notify_connect,
2821 &client_notify_disconnect,
2822 NULL,
2823 GNUNET_MQ_hd_fixed_size (client_master_start,
2824 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2825 struct MasterStartRequest,
2826 NULL),
2827 GNUNET_MQ_hd_var_size (client_slave_join,
2828 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2829 struct SlaveJoinRequest,
2830 NULL),
2831 GNUNET_MQ_hd_var_size (client_join_decision,
2832 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2833 struct GNUNET_PSYC_JoinDecisionMessage,
2834 NULL),
2835 GNUNET_MQ_hd_fixed_size (client_part_request,
2836 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2837 struct GNUNET_MessageHeader,
2838 NULL),
2839 GNUNET_MQ_hd_var_size (client_psyc_message,
2840 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2841 struct GNUNET_MessageHeader,
2842 NULL),
2843 GNUNET_MQ_hd_fixed_size (client_membership_store,
2844 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2845 struct ChannelMembershipStoreRequest,
2846 NULL),
2847 GNUNET_MQ_hd_var_size (client_history_replay,
2848 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2849 struct GNUNET_PSYC_HistoryRequestMessage,
2850 NULL),
2851 GNUNET_MQ_hd_var_size (client_state_get,
2852 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2853 struct StateRequest,
2854 NULL),
2855 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2856 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2857 struct StateRequest,
2858 NULL));
2859
2860/* end of gnunet-service-psyc.c */
diff --git a/src/psyc/psyc.conf.in b/src/psyc/psyc.conf.in
new file mode 100644
index 0000000..764ccfa
--- /dev/null
+++ b/src/psyc/psyc.conf.in
@@ -0,0 +1,12 @@
1[psyc]
2START_ON_DEMAND = @START_ON_DEMAND@
3BINARY = gnunet-service-psyc
4
5UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-psyc.sock
6UNIX_MATCH_UID = YES
7UNIX_MATCH_GID = YES
8
9@UNIXONLY@PORT = 2115
10HOSTNAME = localhost
11ACCEPT_FROM = 127.0.0.1;
12ACCEPT_FROM6 = ::1;
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
new file mode 100644
index 0000000..74bbf3e
--- /dev/null
+++ b/src/psyc/psyc.h
@@ -0,0 +1,178 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/psyc.h
23 * @brief Common type definitions for the PSYC service and API.
24 * @author Gabor X Toth
25 */
26
27#ifndef PSYC_H
28#define PSYC_H
29
30#include "platform.h"
31#include "gnunet_psyc_service.h"
32
33
34int
35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
36 uint16_t *first_ptype, uint16_t *last_ptype);
37
38void
39GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
40 const struct GNUNET_MessageHeader *msg);
41
42
43enum MessageState
44{
45 MSG_STATE_START = 0,
46 MSG_STATE_HEADER = 1,
47 MSG_STATE_METHOD = 2,
48 MSG_STATE_MODIFIER = 3,
49 MSG_STATE_MOD_CONT = 4,
50 MSG_STATE_DATA = 5,
51 MSG_STATE_END = 6,
52 MSG_STATE_CANCEL = 7,
53 MSG_STATE_ERROR = 8,
54};
55
56
57enum MessageFragmentState
58{
59 MSG_FRAG_STATE_START = 0,
60 MSG_FRAG_STATE_HEADER = 1,
61 MSG_FRAG_STATE_DATA = 2,
62 MSG_FRAG_STATE_END = 3,
63 MSG_FRAG_STATE_CANCEL = 4,
64 MSG_FRAG_STATE_DROP = 5,
65};
66
67
68GNUNET_NETWORK_STRUCT_BEGIN
69
70
71/**** library -> service ****/
72
73
74struct MasterStartRequest
75{
76 /**
77 * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_START
78 */
79 struct GNUNET_MessageHeader header;
80
81 uint32_t policy GNUNET_PACKED;
82
83 struct GNUNET_CRYPTO_EddsaPrivateKey channel_key;
84};
85
86
87struct SlaveJoinRequest
88{
89 /**
90 * Type: GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN
91 */
92 struct GNUNET_MessageHeader header;
93
94 uint32_t relay_count GNUNET_PACKED;
95
96 struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key;
97
98 struct GNUNET_CRYPTO_EcdsaPrivateKey slave_key;
99
100 struct GNUNET_PeerIdentity origin;
101
102 uint32_t flags GNUNET_PACKED;
103
104 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
105
106 /* Followed by struct GNUNET_MessageHeader join_msg */
107};
108
109
110struct ChannelMembershipStoreRequest
111{
112 /**
113 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE
114 */
115 struct GNUNET_MessageHeader header;
116
117 uint32_t reserved GNUNET_PACKED;
118
119 uint64_t op_id GNUNET_PACKED;
120
121 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
122
123 uint64_t announced_at GNUNET_PACKED;
124
125 uint64_t effective_since GNUNET_PACKED;
126
127 uint8_t did_join;
128};
129
130
131struct HistoryRequest
132{
133 /**
134 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REQUEST
135 */
136 struct GNUNET_MessageHeader header;
137
138 uint32_t reserved GNUNET_PACKED;
139
140 /**
141 * ID for this operation.
142 */
143 uint64_t op_id GNUNET_PACKED;
144
145 uint64_t start_message_id GNUNET_PACKED;
146
147 uint64_t end_message_id GNUNET_PACKED;
148
149 uint64_t message_limit GNUNET_PACKED;
150};
151
152
153struct StateRequest
154{
155 /**
156 * Types:
157 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET
158 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET_PREFIX
159 */
160 struct GNUNET_MessageHeader header;
161
162 uint32_t reserved GNUNET_PACKED;
163
164 /**
165 * ID for this operation.
166 */
167 uint64_t op_id GNUNET_PACKED;
168
169 /* Followed by NUL-terminated name. */
170};
171
172
173/**** service -> library ****/
174
175
176GNUNET_NETWORK_STRUCT_END
177
178#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
new file mode 100644
index 0000000..37ea112
--- /dev/null
+++ b/src/psyc/psyc_api.c
@@ -0,0 +1,1584 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/psyc_api.c
23 * @brief PSYC service; high-level access to the PSYC protocol
24 * note that clients of this API are NOT expected to
25 * understand the PSYC message format, only the semantics!
26 * Parsing (and serializing) the PSYC stream format is done
27 * within the implementation of the libgnunetpsyc library,
28 * and this API deliberately exposes as little as possible
29 * of the actual data stream format to the application!
30 * @author Gabor X Toth
31 */
32
33#include <inttypes.h>
34
35#include "platform.h"
36#include "gnunet_util_lib.h"
37#include "gnunet_multicast_service.h"
38#include "gnunet_psyc_service.h"
39#include "gnunet_psyc_util_lib.h"
40#include "psyc.h"
41
42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43
44
45/**
46 * Handle to access PSYC channel operations for both the master and slaves.
47 */
48struct GNUNET_PSYC_Channel
49{
50 /**
51 * Configuration to use.
52 */
53 const struct GNUNET_CONFIGURATION_Handle *cfg;
54
55 /**
56 * Client connection to the service.
57 */
58 struct GNUNET_MQ_Handle *mq;
59
60 /**
61 * Message to send on connect.
62 */
63 struct GNUNET_MQ_Envelope *connect_env;
64
65 /**
66 * Time to wait until we try to reconnect on failure.
67 */
68 struct GNUNET_TIME_Relative reconnect_delay;
69
70 /**
71 * Task for reconnecting when the listener fails.
72 */
73 struct GNUNET_SCHEDULER_Task *reconnect_task;
74
75 /**
76 * Async operations.
77 */
78 struct GNUNET_OP_Handle *op;
79
80 /**
81 * Transmission handle;
82 */
83 struct GNUNET_PSYC_TransmitHandle *tmit;
84
85 /**
86 * Receipt handle;
87 */
88 struct GNUNET_PSYC_ReceiveHandle *recv;
89
90 /**
91 * Function called after disconnected from the service.
92 */
93 GNUNET_ContinuationCallback disconnect_cb;
94
95 /**
96 * Closure for @a disconnect_cb.
97 */
98 void *disconnect_cls;
99
100 /**
101 * Are we polling for incoming messages right now?
102 */
103 uint8_t in_receive;
104
105 /**
106 * Is this a master or slave channel?
107 */
108 uint8_t is_master;
109
110 /**
111 * Is this channel in the process of disconnecting from the service?
112 * #GNUNET_YES or #GNUNET_NO
113 */
114 uint8_t is_disconnecting;
115};
116
117
118/**
119 * Handle for the master of a PSYC channel.
120 */
121struct GNUNET_PSYC_Master
122{
123 struct GNUNET_PSYC_Channel chn;
124
125 GNUNET_PSYC_MasterStartCallback start_cb;
126
127 /**
128 * Join request callback.
129 */
130 GNUNET_PSYC_JoinRequestCallback join_req_cb;
131
132 /**
133 * Closure for the callbacks.
134 */
135 void *cb_cls;
136};
137
138
139/**
140 * Handle for a PSYC channel slave.
141 */
142struct GNUNET_PSYC_Slave
143{
144 struct GNUNET_PSYC_Channel chn;
145
146 GNUNET_PSYC_SlaveConnectCallback connect_cb;
147
148 GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
149
150 /**
151 * Closure for the callbacks.
152 */
153 void *cb_cls;
154};
155
156
157/**
158 * Handle that identifies a join request.
159 *
160 * Used to match calls to #GNUNET_PSYC_JoinRequestCallback to the
161 * corresponding calls to GNUNET_PSYC_join_decision().
162 */
163struct GNUNET_PSYC_JoinHandle
164{
165 struct GNUNET_PSYC_Master *mst;
166 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
167};
168
169
170/**
171 * Handle for a pending PSYC transmission operation.
172 */
173struct GNUNET_PSYC_SlaveTransmitHandle
174{
175
176};
177
178
179struct GNUNET_PSYC_HistoryRequest
180{
181 /**
182 * Channel.
183 */
184 struct GNUNET_PSYC_Channel *chn;
185
186 /**
187 * Operation ID.
188 */
189 uint64_t op_id;
190
191 /**
192 * Message handler.
193 */
194 struct GNUNET_PSYC_ReceiveHandle *recv;
195
196 /**
197 * Function to call when the operation finished.
198 */
199 GNUNET_ResultCallback result_cb;
200
201 /**
202 * Closure for @a result_cb.
203 */
204 void *cls;
205};
206
207
208struct GNUNET_PSYC_StateRequest
209{
210 /**
211 * Channel.
212 */
213 struct GNUNET_PSYC_Channel *chn;
214
215 /**
216 * Operation ID.
217 */
218 uint64_t op_id;
219
220 /**
221 * State variable result callback.
222 */
223 GNUNET_PSYC_StateVarCallback var_cb;
224
225 /**
226 * Function to call when the operation finished.
227 */
228 GNUNET_ResultCallback result_cb;
229
230 /**
231 * Closure for @a result_cb.
232 */
233 void *cls;
234};
235
236
237static int
238check_channel_result (void *cls,
239 const struct GNUNET_OperationResultMessage *res)
240{
241 return GNUNET_OK;
242}
243
244
245static void
246handle_channel_result (void *cls,
247 const struct GNUNET_OperationResultMessage *res)
248{
249 struct GNUNET_PSYC_Channel *chn = cls;
250
251 uint16_t size = ntohs (res->header.size);
252 if (size < sizeof (*res))
253 { /* Error, message too small. */
254 GNUNET_break (0);
255 return;
256 }
257
258 uint16_t data_size = size - sizeof (*res);
259 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id),
261 GNUNET_ntohll (res->result_code),
262 data, data_size, NULL);
263
264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
265 "handle_channel_result: Received result message with OP ID %" PRIu64 "\n",
266 GNUNET_ntohll (res->op_id));
267}
268
269
270static void
271op_recv_history_result (void *cls, int64_t result,
272 const void *data, uint16_t data_size)
273{
274 LOG (GNUNET_ERROR_TYPE_DEBUG,
275 "Received history replay result: %" PRId64 ".\n", result);
276
277 struct GNUNET_PSYC_HistoryRequest *hist = cls;
278
279 if (NULL != hist->result_cb)
280 hist->result_cb (hist->cls, result, data, data_size);
281
282 GNUNET_PSYC_receive_destroy (hist->recv);
283 GNUNET_free (hist);
284}
285
286
287static void
288op_recv_state_result (void *cls, int64_t result,
289 const void *data, uint16_t data_size)
290{
291 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "Received state request result: %" PRId64 ".\n", result);
293
294 struct GNUNET_PSYC_StateRequest *sr = cls;
295
296 if (NULL != sr->result_cb)
297 sr->result_cb (sr->cls, result, data, data_size);
298
299 GNUNET_free (sr);
300}
301
302
303static int
304check_channel_history_result (void *cls,
305 const struct GNUNET_OperationResultMessage *res)
306{
307 struct GNUNET_PSYC_MessageHeader *
308 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
309 uint16_t size = ntohs (res->header.size);
310
311 if ( (NULL == pmsg) ||
312 (size < sizeof (*res) + sizeof (*pmsg)) )
313 { /* Error, message too small. */
314 GNUNET_break_op (0);
315 return GNUNET_SYSERR;
316 }
317 return GNUNET_OK;
318}
319
320
321static void
322handle_channel_history_result (void *cls,
323 const struct GNUNET_OperationResultMessage *res)
324{
325 struct GNUNET_PSYC_Channel *chn = cls;
326 struct GNUNET_PSYC_MessageHeader *
327 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
328 GNUNET_ResultCallback result_cb = NULL;
329 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
330
331 LOG (GNUNET_ERROR_TYPE_DEBUG,
332 "%p Received historic fragment for message #%" PRIu64 ".\n",
333 chn,
334 GNUNET_ntohll (pmsg->message_id));
335
336 if (GNUNET_YES != GNUNET_OP_get (chn->op,
337 GNUNET_ntohll (res->op_id),
338 &result_cb, (void *) &hist, NULL))
339 { /* Operation not found. */
340 LOG (GNUNET_ERROR_TYPE_WARNING,
341 "%p Replay operation not found for historic fragment of message #%"
342 PRIu64 ".\n",
343 chn, GNUNET_ntohll (pmsg->message_id));
344 return;
345 }
346
347 GNUNET_PSYC_receive_message (hist->recv,
348 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
349}
350
351
352static int
353check_channel_state_result (void *cls,
354 const struct GNUNET_OperationResultMessage *res)
355{
356 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
357 uint16_t mod_size;
358 uint16_t size;
359
360 if (NULL == mod)
361 {
362 GNUNET_break_op (0);
363 return GNUNET_SYSERR;
364 }
365 mod_size = ntohs (mod->size);
366 size = ntohs (res->header.size);
367 if (size - sizeof (*res) != mod_size)
368 {
369 GNUNET_break_op (0);
370 return GNUNET_SYSERR;
371 }
372 return GNUNET_OK;
373}
374
375
376static void
377handle_channel_state_result (void *cls,
378 const struct GNUNET_OperationResultMessage *res)
379{
380 struct GNUNET_PSYC_Channel *chn = cls;
381
382 GNUNET_ResultCallback result_cb = NULL;
383 struct GNUNET_PSYC_StateRequest *sr = NULL;
384
385 if (GNUNET_YES != GNUNET_OP_get (chn->op,
386 GNUNET_ntohll (res->op_id),
387 &result_cb, (void *) &sr, NULL))
388 { /* Operation not found. */
389 return;
390 }
391
392 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
393 if (NULL == mod)
394 {
395 GNUNET_break_op (0);
396 return;
397 }
398 uint16_t mod_size = ntohs (mod->size);
399
400 switch (ntohs (mod->type))
401 {
402 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
403 {
404 const struct GNUNET_PSYC_MessageModifier *
405 pmod = (const struct GNUNET_PSYC_MessageModifier *) mod;
406
407 const char *name = (const char *) &pmod[1];
408 uint16_t name_size = ntohs (pmod->name_size);
409 if (0 == name_size
410 || mod_size - sizeof (*pmod) < name_size
411 || '\0' != name[name_size - 1])
412 {
413 GNUNET_break_op (0);
414 return;
415 }
416 sr->var_cb (sr->cls, mod, name, name + name_size,
417 ntohs (pmod->header.size) - sizeof (*pmod),
418 ntohs (pmod->value_size));
419 break;
420 }
421
422 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
423 sr->var_cb (sr->cls, mod, NULL, (const char *) &mod[1],
424 mod_size - sizeof (*mod), 0);
425 break;
426 }
427}
428
429
430static int
431check_channel_message (void *cls,
432 const struct GNUNET_PSYC_MessageHeader *pmsg)
433{
434 return GNUNET_OK;
435}
436
437
438static void
439handle_channel_message (void *cls,
440 const struct GNUNET_PSYC_MessageHeader *pmsg)
441{
442 struct GNUNET_PSYC_Channel *chn = cls;
443
444 GNUNET_PSYC_receive_message (chn->recv, pmsg);
445}
446
447
448static void
449handle_channel_message_ack (void *cls,
450 const struct GNUNET_MessageHeader *msg)
451{
452 struct GNUNET_PSYC_Channel *chn = cls;
453
454 GNUNET_PSYC_transmit_got_ack (chn->tmit);
455}
456
457
458static void
459handle_master_start_ack (void *cls,
460 const struct GNUNET_PSYC_CountersResultMessage *cres)
461{
462 struct GNUNET_PSYC_Master *mst = cls;
463
464 int32_t result = ntohl (cres->result_code);
465 if (GNUNET_OK != result && GNUNET_NO != result)
466 {
467 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result);
468 GNUNET_break (0);
469 /* FIXME: disconnect */
470 }
471 if (NULL != mst->start_cb)
472 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
473}
474
475
476static int
477check_master_join_request (void *cls,
478 const struct GNUNET_PSYC_JoinRequestMessage *req)
479{
480 if ( ((sizeof (*req) + sizeof (struct GNUNET_PSYC_Message)) <= ntohs (req->header.size)) &&
481 (NULL == GNUNET_MQ_extract_nested_mh (req)) )
482 {
483 GNUNET_break_op (0);
484 return GNUNET_SYSERR;
485 }
486 return GNUNET_OK;
487}
488
489
490static void
491handle_master_join_request (void *cls,
492 const struct GNUNET_PSYC_JoinRequestMessage *req)
493{
494 struct GNUNET_PSYC_Master *mst = cls;
495
496 if (NULL == mst->join_req_cb)
497 return;
498
499 const struct GNUNET_PSYC_Message *join_msg = NULL;
500 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
501 {
502 join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req);
503 LOG (GNUNET_ERROR_TYPE_DEBUG,
504 "Received join_msg of type %u and size %u.\n",
505 ntohs (join_msg->header.type),
506 ntohs (join_msg->header.size));
507 }
508
509 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
510 jh->mst = mst;
511 jh->slave_pub_key = req->slave_pub_key;
512
513 if (NULL != mst->join_req_cb)
514 mst->join_req_cb (mst->cb_cls, req, &req->slave_pub_key, join_msg, jh);
515}
516
517
518static void
519handle_slave_join_ack (void *cls,
520 const struct GNUNET_PSYC_CountersResultMessage *cres)
521{
522 struct GNUNET_PSYC_Slave *slv = cls;
523
524 int32_t result = ntohl (cres->result_code);
525 if (GNUNET_YES != result && GNUNET_NO != result)
526 {
527 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
528 GNUNET_break (0);
529 /* FIXME: disconnect */
530 }
531 if (NULL != slv->connect_cb)
532 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
533}
534
535
536static int
537check_slave_join_decision (void *cls,
538 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
539{
540 return GNUNET_OK;
541}
542
543
544static void
545handle_slave_join_decision (void *cls,
546 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
547{
548 struct GNUNET_PSYC_Slave *slv = cls;
549
550 struct GNUNET_PSYC_Message *pmsg = NULL;
551 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
552 pmsg = (struct GNUNET_PSYC_Message *) &dcsn[1];
553
554 if (NULL != slv->join_dcsn_cb)
555 slv->join_dcsn_cb (slv->cb_cls, dcsn, ntohl (dcsn->is_admitted), pmsg);
556}
557
558
559static void
560channel_cleanup (struct GNUNET_PSYC_Channel *chn)
561{
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "cleaning up channel %p\n",
564 chn);
565 if (NULL != chn->tmit)
566 {
567 GNUNET_PSYC_transmit_destroy (chn->tmit);
568 chn->tmit = NULL;
569 }
570 if (NULL != chn->recv)
571 {
572
573 GNUNET_PSYC_receive_destroy (chn->recv);
574 chn->recv = NULL;
575 }
576 if (NULL != chn->connect_env)
577 {
578 GNUNET_MQ_discard (chn->connect_env);
579 chn->connect_env = NULL;
580 }
581 if (NULL != chn->mq)
582 {
583 GNUNET_MQ_destroy (chn->mq);
584 chn->mq = NULL;
585 }
586 if (NULL != chn->disconnect_cb)
587 {
588 chn->disconnect_cb (chn->disconnect_cls);
589 chn->disconnect_cb = NULL;
590 }
591 GNUNET_free (chn);
592}
593
594
595static void
596handle_channel_part_ack (void *cls,
597 const struct GNUNET_MessageHeader *msg)
598{
599 struct GNUNET_PSYC_Channel *chn = cls;
600
601 channel_cleanup (chn);
602}
603
604
605/*** MASTER ***/
606
607
608static void
609master_connect (struct GNUNET_PSYC_Master *mst);
610
611
612static void
613master_reconnect (void *cls)
614{
615 master_connect (cls);
616}
617
618
619/**
620 * Master client disconnected from service.
621 *
622 * Reconnect after backoff period.
623 */
624static void
625master_disconnected (void *cls, enum GNUNET_MQ_Error error)
626{
627 struct GNUNET_PSYC_Master *mst = cls;
628 struct GNUNET_PSYC_Channel *chn = &mst->chn;
629
630 LOG (GNUNET_ERROR_TYPE_DEBUG,
631 "Master client disconnected (%d), re-connecting\n",
632 (int) error);
633 if (NULL != chn->tmit)
634 {
635 GNUNET_PSYC_transmit_destroy (chn->tmit);
636 chn->tmit = NULL;
637 }
638 if (NULL != chn->mq)
639 {
640 GNUNET_MQ_destroy (chn->mq);
641 chn->mq = NULL;
642 }
643 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
644 master_reconnect,
645 mst);
646 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
647}
648
649
650static void
651master_connect (struct GNUNET_PSYC_Master *mst)
652{
653 struct GNUNET_PSYC_Channel *chn = &mst->chn;
654
655 struct GNUNET_MQ_MessageHandler handlers[] = {
656 GNUNET_MQ_hd_fixed_size (master_start_ack,
657 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
658 struct GNUNET_PSYC_CountersResultMessage,
659 mst),
660 GNUNET_MQ_hd_var_size (master_join_request,
661 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
662 struct GNUNET_PSYC_JoinRequestMessage,
663 mst),
664 GNUNET_MQ_hd_fixed_size (channel_part_ack,
665 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
666 struct GNUNET_MessageHeader,
667 chn),
668 GNUNET_MQ_hd_var_size (channel_message,
669 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
670 struct GNUNET_PSYC_MessageHeader,
671 chn),
672 GNUNET_MQ_hd_fixed_size (channel_message_ack,
673 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
674 struct GNUNET_MessageHeader,
675 chn),
676 GNUNET_MQ_hd_var_size (channel_history_result,
677 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
678 struct GNUNET_OperationResultMessage,
679 chn),
680 GNUNET_MQ_hd_var_size (channel_state_result,
681 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
682 struct GNUNET_OperationResultMessage,
683 chn),
684 GNUNET_MQ_hd_var_size (channel_result,
685 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
686 struct GNUNET_OperationResultMessage,
687 chn),
688 GNUNET_MQ_handler_end ()
689 };
690
691 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
692 "psyc",
693 handlers,
694 &master_disconnected,
695 mst);
696 GNUNET_assert (NULL != chn->mq);
697 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
698
699 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
700}
701
702
703/**
704 * Start a PSYC master channel.
705 *
706 * Will start a multicast group identified by the given ECC key. Messages
707 * received from group members will be given to the respective handler methods.
708 * If a new member wants to join a group, the "join" method handler will be
709 * invoked; the join handler must then generate a "join" message to approve the
710 * joining of the new member. The channel can also change group membership
711 * without explicit requests. Note that PSYC doesn't itself "understand" join
712 * or part messages, the respective methods must call other PSYC functions to
713 * inform PSYC about the meaning of the respective events.
714 *
715 * @param cfg Configuration to use (to connect to PSYC service).
716 * @param channel_key ECC key that will be used to sign messages for this
717 * PSYC session. The public key is used to identify the PSYC channel.
718 * Note that end-users will usually not use the private key directly, but
719 * rather look it up in GNS for places managed by other users, or select
720 * a file with the private key(s) when setting up their own channels
721 * FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
722 * one in the future.
723 * @param policy Channel policy specifying join and history restrictions.
724 * Used to automate join decisions.
725 * @param message_cb Function to invoke on message parts received from slaves.
726 * @param join_request_cb Function to invoke when a slave wants to join.
727 * @param master_start_cb Function to invoke after the channel master started.
728 * @param cls Closure for @a method and @a join_cb.
729 *
730 * @return Handle for the channel master, NULL on error.
731 */
732struct GNUNET_PSYC_Master *
733GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
734 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
735 enum GNUNET_PSYC_Policy policy,
736 GNUNET_PSYC_MasterStartCallback start_cb,
737 GNUNET_PSYC_JoinRequestCallback join_request_cb,
738 GNUNET_PSYC_MessageCallback message_cb,
739 GNUNET_PSYC_MessagePartCallback message_part_cb,
740 void *cls)
741{
742 struct GNUNET_PSYC_Master *mst = GNUNET_new (struct GNUNET_PSYC_Master);
743 struct GNUNET_PSYC_Channel *chn = &mst->chn;
744 struct MasterStartRequest *req;
745
746 chn->connect_env = GNUNET_MQ_msg (req,
747 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
748 req->channel_key = *channel_key;
749 req->policy = policy;
750
751 chn->cfg = cfg;
752 chn->is_master = GNUNET_YES;
753 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
754
755 chn->op = GNUNET_OP_create ();
756 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
757
758 mst->start_cb = start_cb;
759 mst->join_req_cb = join_request_cb;
760 mst->cb_cls = cls;
761
762 master_connect (mst);
763 return mst;
764}
765
766
767/**
768 * Stop a PSYC master channel.
769 *
770 * @param master PSYC channel master to stop.
771 * @param keep_active FIXME
772 */
773void
774GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
775 int keep_active,
776 GNUNET_ContinuationCallback stop_cb,
777 void *stop_cls)
778{
779 struct GNUNET_PSYC_Channel *chn = &mst->chn;
780 struct GNUNET_MQ_Envelope *env;
781
782 chn->is_disconnecting = GNUNET_YES;
783 chn->disconnect_cb = stop_cb;
784 chn->disconnect_cls = stop_cls;
785 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
786 GNUNET_MQ_send (chn->mq, env);
787}
788
789
790/**
791 * Function to call with the decision made for a join request.
792 *
793 * Must be called once and only once in response to an invocation of the
794 * #GNUNET_PSYC_JoinCallback.
795 *
796 * @param jh Join request handle.
797 * @param is_admitted #GNUNET_YES if the join is approved,
798 * #GNUNET_NO if it is disapproved,
799 * #GNUNET_SYSERR if we cannot answer the request.
800 * @param relay_count Number of relays given.
801 * @param relays Array of suggested peers that might be useful relays to use
802 * when joining the multicast group (essentially a list of peers that
803 * are already part of the multicast group and might thus be willing
804 * to help with routing). If empty, only this local peer (which must
805 * be the multicast origin) is a good candidate for building the
806 * multicast tree. Note that it is unnecessary to specify our own
807 * peer identity in this array.
808 * @param join_resp Application-dependent join response message.
809 *
810 * @return #GNUNET_OK on success,
811 * #GNUNET_SYSERR if the message is too large.
812 */
813int
814GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
815 int is_admitted,
816 uint32_t relay_count,
817 const struct GNUNET_PeerIdentity *relays,
818 const struct GNUNET_PSYC_Message *join_resp)
819{
820 struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
821 struct GNUNET_PSYC_JoinDecisionMessage *dcsn;
822 uint16_t join_resp_size
823 = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
824 uint16_t relay_size = relay_count * sizeof (*relays);
825
826 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
827 < sizeof (*dcsn) + relay_size + join_resp_size)
828 return GNUNET_SYSERR;
829
830 struct GNUNET_MQ_Envelope *
831 env = GNUNET_MQ_msg_extra (dcsn, relay_size + join_resp_size,
832 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
833 dcsn->is_admitted = htonl (is_admitted);
834 dcsn->slave_pub_key = jh->slave_pub_key;
835
836 if (0 < join_resp_size)
837 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
838
839 GNUNET_MQ_send (chn->mq, env);
840 GNUNET_free (jh);
841 return GNUNET_OK;
842}
843
844
845/**
846 * Send a message to call a method to all members in the PSYC channel.
847 *
848 * @param master Handle to the PSYC channel.
849 * @param method_name Which method should be invoked.
850 * @param notify_mod Function to call to obtain modifiers.
851 * @param notify_data Function to call to obtain fragments of the data.
852 * @param notify_cls Closure for @a notify_mod and @a notify_data.
853 * @param flags Flags for the message being transmitted.
854 *
855 * @return Transmission handle, NULL on error (i.e. more than one request queued).
856 */
857struct GNUNET_PSYC_MasterTransmitHandle *
858GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
859 const char *method_name,
860 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
861 GNUNET_PSYC_TransmitNotifyData notify_data,
862 void *notify_cls,
863 enum GNUNET_PSYC_MasterTransmitFlags flags)
864{
865 if (GNUNET_OK
866 == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL,
867 notify_mod, notify_data, notify_cls,
868 flags))
869 return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit;
870 else
871 return NULL;
872}
873
874
875/**
876 * Resume transmission to the channel.
877 *
878 * @param tmit Handle of the request that is being resumed.
879 */
880void
881GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
882{
883 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
884}
885
886
887/**
888 * Abort transmission request to the channel.
889 *
890 * @param tmit Handle of the request that is being aborted.
891 */
892void
893GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
894{
895 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
896}
897
898
899/**
900 * Convert a channel @a master to a @e channel handle to access the @e channel
901 * APIs.
902 *
903 * @param master Channel master handle.
904 *
905 * @return Channel handle, valid for as long as @a master is valid.
906 */
907struct GNUNET_PSYC_Channel *
908GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
909{
910 return &master->chn;
911}
912
913
914/*** SLAVE ***/
915
916
917static void
918slave_connect (struct GNUNET_PSYC_Slave *slv);
919
920
921static void
922slave_reconnect (void *cls)
923{
924 slave_connect (cls);
925}
926
927
928/**
929 * Slave client disconnected from service.
930 *
931 * Reconnect after backoff period.
932 */
933static void
934slave_disconnected (void *cls,
935 enum GNUNET_MQ_Error error)
936{
937 struct GNUNET_PSYC_Slave *slv = cls;
938 struct GNUNET_PSYC_Channel *chn = &slv->chn;
939
940 LOG (GNUNET_ERROR_TYPE_DEBUG,
941 "Slave client disconnected (%d), re-connecting\n",
942 (int) error);
943 if (NULL != chn->tmit)
944 {
945 GNUNET_PSYC_transmit_destroy (chn->tmit);
946 chn->tmit = NULL;
947 }
948 if (NULL != chn->mq)
949 {
950 GNUNET_MQ_destroy (chn->mq);
951 chn->mq = NULL;
952 }
953 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
954 &slave_reconnect,
955 slv);
956 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
957}
958
959
960static void
961slave_connect (struct GNUNET_PSYC_Slave *slv)
962{
963 struct GNUNET_PSYC_Channel *chn = &slv->chn;
964
965 struct GNUNET_MQ_MessageHandler handlers[] = {
966 GNUNET_MQ_hd_fixed_size (slave_join_ack,
967 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
968 struct GNUNET_PSYC_CountersResultMessage,
969 slv),
970 GNUNET_MQ_hd_var_size (slave_join_decision,
971 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
972 struct GNUNET_PSYC_JoinDecisionMessage,
973 slv),
974 GNUNET_MQ_hd_fixed_size (channel_part_ack,
975 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
976 struct GNUNET_MessageHeader,
977 chn),
978 GNUNET_MQ_hd_var_size (channel_message,
979 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
980 struct GNUNET_PSYC_MessageHeader,
981 chn),
982 GNUNET_MQ_hd_fixed_size (channel_message_ack,
983 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
984 struct GNUNET_MessageHeader,
985 chn),
986 GNUNET_MQ_hd_var_size (channel_history_result,
987 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
988 struct GNUNET_OperationResultMessage,
989 chn),
990 GNUNET_MQ_hd_var_size (channel_state_result,
991 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
992 struct GNUNET_OperationResultMessage,
993 chn),
994 GNUNET_MQ_hd_var_size (channel_result,
995 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
996 struct GNUNET_OperationResultMessage,
997 chn),
998 GNUNET_MQ_handler_end ()
999 };
1000
1001 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
1002 "psyc",
1003 handlers,
1004 &slave_disconnected,
1005 slv);
1006 if (NULL == chn->mq)
1007 {
1008 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
1009 &slave_reconnect,
1010 slv);
1011 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
1012 return;
1013 }
1014 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
1015
1016 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
1017}
1018
1019
1020/**
1021 * Join a PSYC channel.
1022 *
1023 * The entity joining is always the local peer. The user must immediately use
1024 * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
1025 * channel; if the join request succeeds, the channel state (and @e recent
1026 * method calls) will be replayed to the joining member. There is no explicit
1027 * notification on failure (as the channel may simply take days to approve,
1028 * and disapproval is simply being ignored).
1029 *
1030 * @param cfg
1031 * Configuration to use.
1032 * @param channel_key ECC public key that identifies the channel we wish to join.
1033 * @param slave_key ECC private-public key pair that identifies the slave, and
1034 * used by multicast to sign the join request and subsequent unicast
1035 * requests sent to the master.
1036 * @param origin Peer identity of the origin.
1037 * @param relay_count Number of peers in the @a relays array.
1038 * @param relays Peer identities of members of the multicast group, which serve
1039 * as relays and used to join the group at.
1040 * @param message_cb Function to invoke on message parts received from the
1041 * channel, typically at least contains method handlers for @e join and
1042 * @e part.
1043 * @param slave_connect_cb Function invoked once we have connected to the
1044 * PSYC service.
1045 * @param join_decision_cb Function invoked once we have received a join
1046 * decision.
1047 * @param cls Closure for @a message_cb and @a slave_joined_cb.
1048 * @param method_name Method name for the join request.
1049 * @param env Environment containing transient variables for the request, or NULL.
1050 * @param data Payload for the join message.
1051 * @param data_size Number of bytes in @a data.
1052 *
1053 * @return Handle for the slave, NULL on error.
1054 */
1055struct GNUNET_PSYC_Slave *
1056GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1057 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_pub_key,
1058 const struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key,
1059 enum GNUNET_PSYC_SlaveJoinFlags flags,
1060 const struct GNUNET_PeerIdentity *origin,
1061 uint32_t relay_count,
1062 const struct GNUNET_PeerIdentity *relays,
1063 GNUNET_PSYC_MessageCallback message_cb,
1064 GNUNET_PSYC_MessagePartCallback message_part_cb,
1065 GNUNET_PSYC_SlaveConnectCallback connect_cb,
1066 GNUNET_PSYC_JoinDecisionCallback join_decision_cb,
1067 void *cls,
1068 const struct GNUNET_PSYC_Message *join_msg)
1069{
1070 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1071 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1072 uint16_t relay_size = relay_count * sizeof (*relays);
1073 uint16_t join_msg_size;
1074 if (NULL == join_msg)
1075 join_msg_size = 0;
1076 else
1077 join_msg_size = ntohs (join_msg->header.size);
1078
1079 struct SlaveJoinRequest *req;
1080 chn->connect_env = GNUNET_MQ_msg_extra (req, relay_size + join_msg_size,
1081 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1082 req->channel_pub_key = *channel_pub_key;
1083 req->slave_key = *slave_key;
1084 req->origin = *origin;
1085 req->relay_count = htonl (relay_count);
1086 req->flags = htonl (flags);
1087
1088 if (0 < relay_size)
1089 GNUNET_memcpy (&req[1], relays, relay_size);
1090
1091 if (NULL != join_msg)
1092 GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size);
1093
1094 chn->cfg = cfg;
1095 chn->is_master = GNUNET_NO;
1096 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1097
1098 chn->op = GNUNET_OP_create ();
1099 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1100
1101 slv->connect_cb = connect_cb;
1102 slv->join_dcsn_cb = join_decision_cb;
1103 slv->cb_cls = cls;
1104
1105 slave_connect (slv);
1106 return slv;
1107}
1108
1109
1110/**
1111 * Part a PSYC channel.
1112 *
1113 * Will terminate the connection to the PSYC service. Polite clients should
1114 * first explicitly send a part request (via GNUNET_PSYC_slave_transmit()).
1115 *
1116 * @param slave Slave handle.
1117 */
1118void
1119GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
1120 int keep_active,
1121 GNUNET_ContinuationCallback part_cb,
1122 void *part_cls)
1123{
1124 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1125 struct GNUNET_MQ_Envelope *env;
1126
1127 chn->is_disconnecting = GNUNET_YES;
1128 chn->disconnect_cb = part_cb;
1129 chn->disconnect_cls = part_cls;
1130 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
1131 GNUNET_MQ_send (chn->mq, env);
1132}
1133
1134
1135/**
1136 * Request a message to be sent to the channel master.
1137 *
1138 * @param slave Slave handle.
1139 * @param method_name Which (PSYC) method should be invoked (on host).
1140 * @param notify_mod Function to call to obtain modifiers.
1141 * @param notify_data Function to call to obtain fragments of the data.
1142 * @param notify_cls Closure for @a notify.
1143 * @param flags Flags for the message being transmitted.
1144 *
1145 * @return Transmission handle, NULL on error (i.e. more than one request
1146 * queued).
1147 */
1148struct GNUNET_PSYC_SlaveTransmitHandle *
1149GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv,
1150 const char *method_name,
1151 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1152 GNUNET_PSYC_TransmitNotifyData notify_data,
1153 void *notify_cls,
1154 enum GNUNET_PSYC_SlaveTransmitFlags flags)
1155
1156{
1157 if (GNUNET_OK
1158 == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL,
1159 notify_mod, notify_data, notify_cls,
1160 flags))
1161 return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit;
1162 else
1163 return NULL;
1164}
1165
1166
1167/**
1168 * Resume transmission to the master.
1169 *
1170 * @param tmit Handle of the request that is being resumed.
1171 */
1172void
1173GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1174{
1175 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1176}
1177
1178
1179/**
1180 * Abort transmission request to master.
1181 *
1182 * @param tmit Handle of the request that is being aborted.
1183 */
1184void
1185GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1186{
1187 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1188}
1189
1190
1191/**
1192 * Convert @a slave to a @e channel handle to access the @e channel APIs.
1193 *
1194 * @param slv Slave handle.
1195 *
1196 * @return Channel handle, valid for as long as @a slave is valid.
1197 */
1198struct GNUNET_PSYC_Channel *
1199GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1200{
1201 return &slv->chn;
1202}
1203
1204
1205/**
1206 * Add a slave to the channel's membership list.
1207 *
1208 * Note that this will NOT generate any PSYC traffic, it will merely update the
1209 * local database to modify how we react to <em>membership test</em> queries.
1210 * The channel master still needs to explicitly transmit a @e join message to
1211 * notify other channel members and they then also must still call this function
1212 * in their respective methods handling the @e join message. This way, how @e
1213 * join and @e part operations are exactly implemented is still up to the
1214 * application; for example, there might be a @e part_all method to kick out
1215 * everyone.
1216 *
1217 * Note that channel slaves are explicitly trusted to execute such methods
1218 * correctly; not doing so correctly will result in either denying other slaves
1219 * access or offering access to channel data to non-members.
1220 *
1221 * @param chn
1222 * Channel handle.
1223 * @param slave_pub_key
1224 * Identity of channel slave to add.
1225 * @param announced_at
1226 * ID of the message that announced the membership change.
1227 * @param effective_since
1228 * Addition of slave is in effect since this message ID.
1229 * @param result_cb
1230 * Function to call with the result of the operation.
1231 * The @e result_code argument is #GNUNET_OK on success, or
1232 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1233 * can contain an optional error message.
1234 * @param cls
1235 * Closure for @a result_cb.
1236 */
1237void
1238GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1239 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
1240 uint64_t announced_at,
1241 uint64_t effective_since,
1242 GNUNET_ResultCallback result_cb,
1243 void *cls)
1244{
1245 struct ChannelMembershipStoreRequest *req;
1246 struct GNUNET_MQ_Envelope *
1247 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1248 req->slave_pub_key = *slave_pub_key;
1249 req->announced_at = GNUNET_htonll (announced_at);
1250 req->effective_since = GNUNET_htonll (effective_since);
1251 req->did_join = GNUNET_YES;
1252 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1253
1254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255 "GNUNET_PSYC_channel_slave_add, OP ID: %" PRIu64 "\n",
1256 GNUNET_ntohll (req->op_id));
1257 GNUNET_MQ_send (chn->mq, env);
1258}
1259
1260
1261/**
1262 * Remove a slave from the channel's membership list.
1263 *
1264 * Note that this will NOT generate any PSYC traffic, it will merely update the
1265 * local database to modify how we react to <em>membership test</em> queries.
1266 * The channel master still needs to explicitly transmit a @e part message to
1267 * notify other channel members and they then also must still call this function
1268 * in their respective methods handling the @e part message. This way, how
1269 * @e join and @e part operations are exactly implemented is still up to the
1270 * application; for example, there might be a @e part_all message to kick out
1271 * everyone.
1272 *
1273 * Note that channel members are explicitly trusted to perform these
1274 * operations correctly; not doing so correctly will result in either
1275 * denying members access or offering access to channel data to
1276 * non-members.
1277 *
1278 * @param chn
1279 * Channel handle.
1280 * @param slave_pub_key
1281 * Identity of channel slave to remove.
1282 * @param announced_at
1283 * ID of the message that announced the membership change.
1284 * @param result_cb
1285 * Function to call with the result of the operation.
1286 * The @e result_code argument is #GNUNET_OK on success, or
1287 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1288 * can contain an optional error message.
1289 * @param cls
1290 * Closure for @a result_cb.
1291 */
1292void
1293GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1294 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
1295 uint64_t announced_at,
1296 GNUNET_ResultCallback result_cb,
1297 void *cls)
1298{
1299 struct ChannelMembershipStoreRequest *req;
1300 struct GNUNET_MQ_Envelope *
1301 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1302 req->slave_pub_key = *slave_pub_key;
1303 req->announced_at = GNUNET_htonll (announced_at);
1304 req->did_join = GNUNET_NO;
1305 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1306
1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308 "GNUNET_PSYC_channel_slave_remove, OP ID: %" PRIu64 "\n",
1309 GNUNET_ntohll (req->op_id));
1310 GNUNET_MQ_send (chn->mq, env);
1311}
1312
1313
1314static struct GNUNET_PSYC_HistoryRequest *
1315channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1316 uint64_t start_message_id,
1317 uint64_t end_message_id,
1318 uint64_t message_limit,
1319 const char *method_prefix,
1320 uint32_t flags,
1321 GNUNET_PSYC_MessageCallback message_cb,
1322 GNUNET_PSYC_MessagePartCallback message_part_cb,
1323 GNUNET_ResultCallback result_cb,
1324 void *cls)
1325{
1326 struct GNUNET_PSYC_HistoryRequestMessage *req;
1327 struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
1328 hist->chn = chn;
1329 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1330 hist->result_cb = result_cb;
1331 hist->cls = cls;
1332 hist->op_id = GNUNET_OP_add (chn->op, op_recv_history_result, hist, NULL);
1333
1334 GNUNET_assert (NULL != method_prefix);
1335 uint16_t method_size = strnlen (method_prefix,
1336 GNUNET_MAX_MESSAGE_SIZE
1337 - sizeof (*req)) + 1;
1338 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1339
1340 struct GNUNET_MQ_Envelope *
1341 env = GNUNET_MQ_msg_extra (req, method_size,
1342 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1343 req->start_message_id = GNUNET_htonll (start_message_id);
1344 req->end_message_id = GNUNET_htonll (end_message_id);
1345 req->message_limit = GNUNET_htonll (message_limit);
1346 req->flags = htonl (flags);
1347 req->op_id = GNUNET_htonll (hist->op_id);
1348
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350 "channel_history_replay, OP ID: %" PRIu64 "\n",
1351 GNUNET_ntohll (req->op_id));
1352 GNUNET_memcpy (&req[1], method_prefix, method_size);
1353
1354 GNUNET_MQ_send (chn->mq, env);
1355 return hist;
1356}
1357
1358
1359/**
1360 * Request to replay a part of the message history of the channel.
1361 *
1362 * Historic messages (but NOT the state at the time) will be replayed and given
1363 * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1364 *
1365 * Messages are retrieved from the local PSYCstore if available,
1366 * otherwise requested from the network.
1367 *
1368 * @param channel
1369 * Which channel should be replayed?
1370 * @param start_message_id
1371 * Earliest interesting point in history.
1372 * @param end_message_id
1373 * Last (inclusive) interesting point in history.
1374 * @param method_prefix
1375 * Retrieve only messages with a matching method prefix.
1376 * @param flags
1377 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1378 * @param result_cb
1379 * Function to call when the requested history has been fully replayed.
1380 * @param cls
1381 * Closure for the callbacks.
1382 *
1383 * @return Handle to cancel history replay operation.
1384 */
1385struct GNUNET_PSYC_HistoryRequest *
1386GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1387 uint64_t start_message_id,
1388 uint64_t end_message_id,
1389 const char *method_prefix,
1390 uint32_t flags,
1391 GNUNET_PSYC_MessageCallback message_cb,
1392 GNUNET_PSYC_MessagePartCallback message_part_cb,
1393 GNUNET_ResultCallback result_cb,
1394 void *cls)
1395{
1396 return channel_history_replay (chn, start_message_id, end_message_id, 0,
1397 method_prefix, flags,
1398 message_cb, message_part_cb, result_cb, cls);
1399}
1400
1401
1402/**
1403 * Request to replay the latest messages from the message history of the channel.
1404 *
1405 * Historic messages (but NOT the state at the time) will be replayed (given to
1406 * the normal method handlers) if available and if access is permitted.
1407 *
1408 * @param channel
1409 * Which channel should be replayed?
1410 * @param message_limit
1411 * Maximum number of messages to replay.
1412 * @param method_prefix
1413 * Retrieve only messages with a matching method prefix.
1414 * Use NULL or "" to retrieve all.
1415 * @param flags
1416 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1417 * @param result_cb
1418 * Function to call when the requested history has been fully replayed.
1419 * @param cls
1420 * Closure for the callbacks.
1421 *
1422 * @return Handle to cancel history replay operation.
1423 */
1424struct GNUNET_PSYC_HistoryRequest *
1425GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1426 uint64_t message_limit,
1427 const char *method_prefix,
1428 uint32_t flags,
1429 GNUNET_PSYC_MessageCallback message_cb,
1430 GNUNET_PSYC_MessagePartCallback message_part_cb,
1431 GNUNET_ResultCallback result_cb,
1432 void *cls)
1433{
1434 return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags,
1435 message_cb, message_part_cb, result_cb, cls);
1436}
1437
1438
1439void
1440GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1441 struct GNUNET_PSYC_HistoryRequest *hist)
1442{
1443 GNUNET_PSYC_receive_destroy (hist->recv);
1444 GNUNET_OP_remove (hist->chn->op, hist->op_id);
1445 GNUNET_free (hist);
1446}
1447
1448
1449/**
1450 * Retrieve the best matching channel state variable.
1451 *
1452 * If the requested variable name is not present in the state, the nearest
1453 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1454 * if "_a_b" does not exist.
1455 *
1456 * @param channel
1457 * Channel handle.
1458 * @param full_name
1459 * Full name of the requested variable.
1460 * The actual variable returned might have a shorter name.
1461 * @param var_cb
1462 * Function called once when a matching state variable is found.
1463 * Not called if there's no matching state variable.
1464 * @param result_cb
1465 * Function called after the operation finished.
1466 * (i.e. all state variables have been returned via @a state_cb)
1467 * @param cls
1468 * Closure for the callbacks.
1469 */
1470static struct GNUNET_PSYC_StateRequest *
1471channel_state_get (struct GNUNET_PSYC_Channel *chn,
1472 uint16_t type, const char *name,
1473 GNUNET_PSYC_StateVarCallback var_cb,
1474 GNUNET_ResultCallback result_cb, void *cls)
1475{
1476 struct StateRequest *req;
1477 struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr));
1478 sr->chn = chn;
1479 sr->var_cb = var_cb;
1480 sr->result_cb = result_cb;
1481 sr->cls = cls;
1482 sr->op_id = GNUNET_OP_add (chn->op, op_recv_state_result, sr, NULL);
1483
1484 GNUNET_assert (NULL != name);
1485 size_t name_size = strnlen (name, GNUNET_MAX_MESSAGE_SIZE
1486 - sizeof (*req)) + 1;
1487 struct GNUNET_MQ_Envelope *
1488 env = GNUNET_MQ_msg_extra (req, name_size, type);
1489 req->op_id = GNUNET_htonll (sr->op_id);
1490
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "channel_state_get, OP ID: %" PRIu64 "\n",
1493 GNUNET_ntohll (req->op_id));
1494
1495 GNUNET_memcpy (&req[1], name, name_size);
1496
1497 GNUNET_MQ_send (chn->mq, env);
1498 return sr;
1499}
1500
1501
1502/**
1503 * Retrieve the best matching channel state variable.
1504 *
1505 * If the requested variable name is not present in the state, the nearest
1506 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1507 * if "_a_b" does not exist.
1508 *
1509 * @param channel
1510 * Channel handle.
1511 * @param full_name
1512 * Full name of the requested variable.
1513 * The actual variable returned might have a shorter name.
1514 * @param var_cb
1515 * Function called once when a matching state variable is found.
1516 * Not called if there's no matching state variable.
1517 * @param result_cb
1518 * Function called after the operation finished.
1519 * (i.e. all state variables have been returned via @a state_cb)
1520 * @param cls
1521 * Closure for the callbacks.
1522 */
1523struct GNUNET_PSYC_StateRequest *
1524GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1525 const char *full_name,
1526 GNUNET_PSYC_StateVarCallback var_cb,
1527 GNUNET_ResultCallback result_cb,
1528 void *cls)
1529{
1530 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
1531 full_name, var_cb, result_cb, cls);
1532
1533}
1534
1535
1536/**
1537 * Return all channel state variables whose name matches a given prefix.
1538 *
1539 * A name matches if it starts with the given @a name_prefix, thus requesting
1540 * the empty prefix ("") will match all values; requesting "_a_b" will also
1541 * return values stored under "_a_b_c".
1542 *
1543 * The @a state_cb is invoked on all matching state variables asynchronously, as
1544 * the state is stored in and retrieved from the PSYCstore,
1545 *
1546 * @param channel
1547 * Channel handle.
1548 * @param name_prefix
1549 * Prefix of the state variable name to match.
1550 * @param var_cb
1551 * Function called once when a matching state variable is found.
1552 * Not called if there's no matching state variable.
1553 * @param result_cb
1554 * Function called after the operation finished.
1555 * (i.e. all state variables have been returned via @a state_cb)
1556 * @param cls
1557 * Closure for the callbacks.
1558 */
1559struct GNUNET_PSYC_StateRequest *
1560GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1561 const char *name_prefix,
1562 GNUNET_PSYC_StateVarCallback var_cb,
1563 GNUNET_ResultCallback result_cb,
1564 void *cls)
1565{
1566 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
1567 name_prefix, var_cb, result_cb, cls);
1568}
1569
1570
1571/**
1572 * Cancel a state request operation.
1573 *
1574 * @param sr
1575 * Handle for the operation to cancel.
1576 */
1577void
1578GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1579{
1580 GNUNET_OP_remove (sr->chn->op, sr->op_id);
1581 GNUNET_free (sr);
1582}
1583
1584/* end of psyc_api.c */
diff --git a/src/psyc/psyc_test_lib.h b/src/psyc/psyc_test_lib.h
new file mode 100644
index 0000000..0ad9910
--- /dev/null
+++ b/src/psyc/psyc_test_lib.h
@@ -0,0 +1,67 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/test_psyc_api_join.c
23 * @brief library for writing psyc tests
24 * @author xrs
25 */
26
27#define MAX_TESTBED_OPS 32
28
29struct pctx
30{
31 int idx;
32
33 struct GNUNET_TESTBED_Peer *testbed_peer;
34
35 const struct GNUNET_PeerIdentity *peer_id;
36
37 const struct GNUNET_PeerIdentity *peer_id_master;
38
39 /**
40 * Used to simulate egos (not peerid)
41 */
42 const struct GNUNET_CRYPTO_EcdsaPrivateKey *id_key;
43
44 const struct GNUNET_CRYPTO_EcdsaPublicKey *id_pub_key;
45
46 /**
47 * Used to store either GNUNET_PSYC_Master or GNUNET_PSYC_Slave handle
48 */
49 void *psyc;
50
51 struct GNUNET_PSYC_Channel *channel;
52
53 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
54
55 struct GNUNET_CRYPTO_EddsaPublicKey *channel_pub_key;
56
57 int test_ok;
58};
59
60static struct GNUNET_SCHEDULER_Task *timeout_task_id;
61
62static int result = GNUNET_SYSERR;
63
64static struct GNUNET_TESTBED_Operation *op[MAX_TESTBED_OPS];
65
66static int op_cnt = 0;
67
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
new file mode 100644
index 0000000..b6e27bb
--- /dev/null
+++ b/src/psyc/test_psyc.c
@@ -0,0 +1,1018 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/test_psyc.c
23 * @brief Tests for the PSYC API.
24 * @author Gabor X Toth
25 * @author Christian Grothoff
26 */
27
28#include <inttypes.h>
29
30#include "platform.h"
31#include "gnunet_crypto_lib.h"
32#include "gnunet_common.h"
33#include "gnunet_util_lib.h"
34#include "gnunet_testing_lib.h"
35#include "gnunet_psyc_util_lib.h"
36#include "gnunet_psyc_service.h"
37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
39
40/**
41 * Return value from 'main'.
42 */
43static int res;
44
45static const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47static struct GNUNET_PeerIdentity this_peer;
48
49/**
50 * Handle for task for timeout termination.
51 */
52static struct GNUNET_SCHEDULER_Task * end_badly_task;
53
54static struct GNUNET_PSYC_Master *mst;
55static struct GNUNET_PSYC_Slave *slv;
56
57static struct GNUNET_PSYC_Channel *mst_chn, *slv_chn;
58
59static struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
60static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
61
62static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key;
63static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
64
65struct TransmitClosure
66{
67 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
68 struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
69 struct GNUNET_PSYC_Environment *env;
70 struct GNUNET_PSYC_Modifier *mod;
71 char *data[16];
72 const char *mod_value;
73 size_t mod_value_size;
74 uint8_t data_delay[16];
75 uint8_t data_count;
76 uint8_t paused;
77 uint8_t n;
78};
79
80static struct TransmitClosure *tmit;
81
82static uint8_t join_req_count, end_count;
83
84enum
85{
86 TEST_NONE = 0,
87 TEST_MASTER_START = 1,
88 TEST_SLAVE_JOIN_REJECT = 2,
89 TEST_SLAVE_JOIN_ACCEPT = 3,
90 TEST_SLAVE_ADD = 4,
91 TEST_SLAVE_REMOVE = 5,
92 TEST_SLAVE_TRANSMIT = 6,
93 TEST_MASTER_TRANSMIT = 7,
94 TEST_MASTER_HISTORY_REPLAY_LATEST = 8,
95 TEST_SLAVE_HISTORY_REPLAY_LATEST = 9,
96 TEST_MASTER_HISTORY_REPLAY = 10,
97 TEST_SLAVE_HISTORY_REPLAY = 11,
98 TEST_MASTER_STATE_GET = 12,
99 TEST_SLAVE_STATE_GET = 13,
100 TEST_MASTER_STATE_GET_PREFIX = 14,
101 TEST_SLAVE_STATE_GET_PREFIX = 15,
102} test;
103
104
105static void
106master_transmit ();
107
108static void
109master_history_replay_latest ();
110
111
112static void
113master_stopped (void *cls)
114{
115 if (NULL != tmit)
116 {
117 GNUNET_PSYC_env_destroy (tmit->env);
118 GNUNET_free (tmit);
119 tmit = NULL;
120 }
121 GNUNET_SCHEDULER_shutdown ();
122}
123
124
125static void
126slave_parted (void *cls)
127{
128 if (NULL != mst)
129 {
130 GNUNET_PSYC_master_stop (mst, GNUNET_NO, &master_stopped, NULL);
131 mst = NULL;
132 }
133 else
134 master_stopped (NULL);
135}
136
137
138/**
139 * Clean up all resources used.
140 */
141static void
142cleanup ()
143{
144 if (NULL != slv)
145 {
146 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL);
147 slv = NULL;
148 }
149 else
150 slave_parted (NULL);
151}
152
153
154/**
155 * Terminate the test case (failure).
156 *
157 * @param cls NULL
158 */
159static void
160end_badly (void *cls)
161{
162 res = 1;
163 cleanup ();
164 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n");
165}
166
167
168/**
169 * Terminate the test case (success).
170 *
171 * @param cls NULL
172 */
173static void
174end_normally (void *cls)
175{
176 res = 0;
177 cleanup ();
178 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n");
179}
180
181
182/**
183 * Finish the test case (successfully).
184 */
185static void
186end ()
187{
188 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Ending tests.\n");
189
190 if (end_badly_task != NULL)
191 {
192 GNUNET_SCHEDULER_cancel (end_badly_task);
193 end_badly_task = NULL;
194 }
195 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
196 &end_normally, NULL);
197}
198
199
200static void
201master_message_cb (void *cls, const struct GNUNET_PSYC_MessageHeader *msg)
202{
203 GNUNET_assert (NULL != msg);
204 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
205 "Test #%d: Master got PSYC message fragment of size %u "
206 "belonging to message ID %" PRIu64 " with flags %x\n",
207 test, ntohs (msg->header.size),
208 GNUNET_ntohll (msg->message_id), ntohl (msg->flags));
209 // FIXME
210}
211
212
213static void
214master_message_part_cb (void *cls, const struct GNUNET_PSYC_MessageHeader *msg,
215 const struct GNUNET_MessageHeader *pmsg)
216{
217 GNUNET_assert (NULL != msg && NULL != pmsg);
218
219 uint64_t message_id = GNUNET_ntohll (msg->message_id);
220 uint32_t flags = ntohl (msg->flags);
221
222 uint16_t type = ntohs (pmsg->type);
223 uint16_t size = ntohs (pmsg->size);
224
225 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
226 "Test #%d: Master got message part of type %u and size %u "
227 "belonging to message ID %" PRIu64 " with flags %x\n",
228 test, type, size, message_id, flags);
229
230 switch (test)
231 {
232 case TEST_SLAVE_TRANSMIT:
233 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
234 {
235 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
236 "Test #%d: Unexpected request flags: %x" PRIu32 "\n",
237 test, flags);
238 GNUNET_assert (0);
239 return;
240 }
241 // FIXME: check rest of message
242
243 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
244 master_transmit ();
245 break;
246
247 case TEST_MASTER_TRANSMIT:
248 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
249 master_history_replay_latest ();
250 break;
251
252 case TEST_MASTER_HISTORY_REPLAY:
253 case TEST_MASTER_HISTORY_REPLAY_LATEST:
254 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
255 {
256 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
257 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
258 test, flags);
259 GNUNET_assert (0);
260 return;
261 }
262 break;
263
264 default:
265 GNUNET_assert (0);
266 }
267}
268
269
270static void
271slave_message_cb (void *cls, const struct GNUNET_PSYC_MessageHeader *msg)
272{
273 GNUNET_assert (NULL != msg);
274 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
275 "Test #%d: Slave got PSYC message fragment of size %u "
276 "belonging to message ID %" PRIu64 " with flags %x\n",
277 test, ntohs (msg->header.size),
278 GNUNET_ntohll (msg->message_id), ntohl (msg->flags));
279 // FIXME
280}
281
282
283static void
284slave_message_part_cb (void *cls,
285 const struct GNUNET_PSYC_MessageHeader *msg,
286 const struct GNUNET_MessageHeader *pmsg)
287{
288 GNUNET_assert (NULL != msg && NULL != pmsg);
289
290 uint64_t message_id = GNUNET_ntohll (msg->message_id);
291 uint32_t flags = ntohl (msg->flags);
292
293 uint16_t type = ntohs (pmsg->type);
294 uint16_t size = ntohs (pmsg->size);
295
296 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
297 "Test #%d: Slave got message part of type %u and size %u "
298 "belonging to message ID %" PRIu64 " with flags %x\n",
299 test, type, size, message_id, flags);
300
301 switch (test)
302 {
303 case TEST_MASTER_TRANSMIT:
304 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
305 master_history_replay_latest ();
306 break;
307
308 case TEST_SLAVE_HISTORY_REPLAY:
309 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
310 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
311 {
312 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
313 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
314 test, flags);
315 GNUNET_assert (0);
316 return;
317 }
318 break;
319
320 default:
321 GNUNET_assert (0);
322 }
323}
324
325
326static void
327state_get_var (void *cls, const struct GNUNET_MessageHeader *mod,
328 const char *name, const void *value,
329 uint32_t value_size, uint32_t full_value_size)
330{
331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332 "Got state var: %s\n%.*s\n",
333 name,
334 (int) value_size,
335 (const char *) value);
336}
337
338
339/*** Slave state_get_prefix() ***/
340
341static void
342slave_state_get_prefix_result (void *cls, int64_t result,
343 const void *err_msg, uint16_t err_msg_size)
344{
345 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
346 "Test #%d: slave_state_get_prefix:\t%" PRId64 " (%.*s)\n",
347 test, result,
348 (int) err_msg_size,
349 (const char *) err_msg);
350 // FIXME: GNUNET_assert (2 == result);
351 end ();
352}
353
354
355static void
356slave_state_get_prefix ()
357{
358 test = TEST_SLAVE_STATE_GET_PREFIX;
359 GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", state_get_var,
360 slave_state_get_prefix_result, NULL);
361}
362
363
364/*** Master state_get_prefix() ***/
365
366
367static void
368master_state_get_prefix_result (void *cls, int64_t result,
369 const void *err_msg, uint16_t err_msg_size)
370{
371 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
372 "Test #%d: master_state_get_prefix:\t%" PRId64 " (%s)\n",
373 test, result, (char *) err_msg);
374 // FIXME: GNUNET_assert (2 == result);
375 slave_state_get_prefix ();
376}
377
378
379static void
380master_state_get_prefix ()
381{
382 test = TEST_MASTER_STATE_GET_PREFIX;
383 GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", state_get_var,
384 master_state_get_prefix_result, NULL);
385}
386
387
388/*** Slave state_get() ***/
389
390
391static void
392slave_state_get_result (void *cls, int64_t result,
393 const void *err_msg, uint16_t err_msg_size)
394{
395 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
396 "Test #%d: slave_state_get:\t%" PRId64 " (%.*s)\n",
397 test, result, err_msg_size, (char *) err_msg);
398 // FIXME: GNUNET_assert (2 == result);
399 master_state_get_prefix ();
400}
401
402
403static void
404slave_state_get ()
405{
406 test = TEST_SLAVE_STATE_GET;
407 GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", state_get_var,
408 slave_state_get_result, NULL);
409}
410
411
412/*** Master state_get() ***/
413
414
415static void
416master_state_get_result (void *cls, int64_t result,
417 const void *err_msg, uint16_t err_msg_size)
418{
419 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
420 "Test #%d: master_state_get:\t%" PRId64 " (%.*s)\n",
421 test, result, err_msg_size, (char *) err_msg);
422 // FIXME: GNUNET_assert (1 == result);
423 slave_state_get ();
424}
425
426
427static void
428master_state_get ()
429{
430 test = TEST_MASTER_STATE_GET;
431 GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", state_get_var,
432 master_state_get_result, NULL);
433}
434
435
436/*** Slave history_replay() ***/
437
438static void
439slave_history_replay_result (void *cls, int64_t result,
440 const void *err_msg, uint16_t err_msg_size)
441{
442 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
443 "Test #%d: slave_history_replay:\t%" PRId64 " (%.*s)\n",
444 test, result,
445 (int) err_msg_size,
446 (const char *) err_msg);
447 GNUNET_assert (9 == result);
448
449 master_state_get ();
450}
451
452
453static void
454slave_history_replay ()
455{
456 test = TEST_SLAVE_HISTORY_REPLAY;
457 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
458 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
459 slave_message_cb,
460 slave_message_part_cb,
461 slave_history_replay_result, NULL);
462}
463
464
465/*** Master history_replay() ***/
466
467
468static void
469master_history_replay_result (void *cls, int64_t result,
470 const void *err_msg, uint16_t err_msg_size)
471{
472 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
473 "Test #%d: master_history_replay:\t%" PRId64 " (%.*s)\n",
474 test, result,
475 (int) err_msg_size,
476 (const char *) err_msg);
477 GNUNET_assert (9 == result);
478
479 slave_history_replay ();
480}
481
482
483static void
484master_history_replay ()
485{
486 test = TEST_MASTER_HISTORY_REPLAY;
487 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
488 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
489 master_message_cb,
490 master_message_part_cb,
491 master_history_replay_result, NULL);
492}
493
494
495/*** Slave history_replay_latest() ***/
496
497
498static void
499slave_history_replay_latest_result (void *cls, int64_t result,
500 const void *err_msg, uint16_t err_msg_size)
501{
502 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
503 "Test #%d: slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
504 test, result,
505 (int) err_msg_size,
506 (const char *) err_msg);
507 GNUNET_assert (9 == result);
508
509 master_history_replay ();
510}
511
512
513static void
514slave_history_replay_latest ()
515{
516 test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
517 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
518 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
519 &slave_message_cb,
520 &slave_message_part_cb,
521 &slave_history_replay_latest_result,
522 NULL);
523}
524
525
526/*** Master history_replay_latest() ***/
527
528
529static void
530master_history_replay_latest_result (void *cls, int64_t result,
531 const void *err_msg, uint16_t err_msg_size)
532{
533 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
534 "Test #%d: master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
535 test, result, err_msg_size, (char *) err_msg);
536 GNUNET_assert (9 == result);
537
538 slave_history_replay_latest ();
539}
540
541
542static void
543master_history_replay_latest ()
544{
545 test = TEST_MASTER_HISTORY_REPLAY_LATEST;
546 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
547 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
548 &master_message_cb,
549 &master_message_part_cb,
550 &master_history_replay_latest_result,
551 NULL);
552}
553
554
555static void
556transmit_resume (void *cls)
557{
558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559 "Test #%d: Transmission resumed.\n", test);
560 struct TransmitClosure *tmit = cls;
561 if (NULL != tmit->mst_tmit)
562 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
563 else
564 GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
565}
566
567
568static int
569tmit_notify_data (void *cls, uint16_t *data_size, void *data)
570{
571 struct TransmitClosure *tmit = cls;
572 if (0 == tmit->data_count)
573 {
574 *data_size = 0;
575 return GNUNET_YES;
576 }
577
578 uint16_t size = strlen (tmit->data[tmit->n]);
579 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580 "Test #%d: Transmit notify data: %u bytes available, "
581 "processing fragment %u/%u (size %u).\n",
582 test, *data_size, tmit->n + 1, tmit->data_count, size);
583 if (*data_size < size)
584 {
585 *data_size = 0;
586 GNUNET_assert (0);
587 return GNUNET_SYSERR;
588 }
589
590 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
591 {
592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593 "Test #%d: Transmission paused.\n", test);
594 tmit->paused = GNUNET_YES;
595 GNUNET_SCHEDULER_add_delayed (
596 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
597 tmit->data_delay[tmit->n]),
598 &transmit_resume, tmit);
599 *data_size = 0;
600 return GNUNET_NO;
601 }
602 tmit->paused = GNUNET_NO;
603
604 *data_size = size;
605 GNUNET_memcpy (data, tmit->data[tmit->n], size);
606
607 return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
608}
609
610
611static int
612tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
613 uint32_t *full_value_size)
614{
615 struct TransmitClosure *tmit = cls;
616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617 "Test #%d: Transmit notify modifier: %u bytes available, "
618 "%u modifiers left to process.\n",
619 test, *data_size, (unsigned int) GNUNET_PSYC_env_get_count (tmit->env));
620
621 uint16_t name_size = 0;
622 size_t value_size = 0;
623 const char *value = NULL;
624
625 if (NULL != oper && NULL != tmit->mod)
626 { /* New modifier */
627 tmit->mod = tmit->mod->next;
628 if (NULL == tmit->mod)
629 { /* No more modifiers, continue with data */
630 *data_size = 0;
631 return GNUNET_YES;
632 }
633
634 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
635 *full_value_size = tmit->mod->value_size;
636 *oper = tmit->mod->oper;
637 name_size = strlen (tmit->mod->name);
638
639 if (name_size + 1 + tmit->mod->value_size <= *data_size)
640 {
641 *data_size = name_size + 1 + tmit->mod->value_size;
642 }
643 else
644 {
645 tmit->mod_value_size = tmit->mod->value_size;
646 value_size = *data_size - name_size - 1;
647 tmit->mod_value_size -= value_size;
648 tmit->mod_value = tmit->mod->value + value_size;
649 }
650
651 GNUNET_memcpy (data, tmit->mod->name, name_size);
652 ((char *)data)[name_size] = '\0';
653 GNUNET_memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
654 }
655 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
656 { /* Modifier continuation */
657 value = tmit->mod_value;
658 if (tmit->mod_value_size <= *data_size)
659 {
660 value_size = tmit->mod_value_size;
661 tmit->mod_value = NULL;
662 }
663 else
664 {
665 value_size = *data_size;
666 tmit->mod_value += value_size;
667 }
668 tmit->mod_value_size -= value_size;
669
670 if (*data_size < value_size)
671 {
672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673 "value larger than buffer: %u < %zu\n",
674 *data_size, value_size);
675 *data_size = 0;
676 return GNUNET_NO;
677 }
678
679 *data_size = value_size;
680 GNUNET_memcpy (data, value, value_size);
681 }
682
683 return GNUNET_NO;
684}
685
686
687static void
688slave_join ();
689
690
691static void
692slave_transmit ()
693{
694 test = TEST_SLAVE_TRANSMIT;
695 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
696 "Test #%d: Slave sending request to master.\n", test);
697
698 tmit = GNUNET_new (struct TransmitClosure);
699 tmit->env = GNUNET_PSYC_env_create ();
700 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
701 "_abc", "abc def", 7);
702 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
703 "_abc_def", "abc def ghi", 11);
704 tmit->mod = GNUNET_PSYC_env_head (tmit->env);
705 tmit->n = 0;
706 tmit->data[0] = "slave test";
707 tmit->data_count = 1;
708 tmit->slv_tmit
709 = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
710 &tmit_notify_data, tmit,
711 GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
712}
713
714
715static void
716slave_remove_cb (void *cls, int64_t result,
717 const void *err_msg, uint16_t err_msg_size)
718{
719 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
720 "Test #%d: slave_remove:\t%" PRId64 " (%.*s)\n",
721 test, result, err_msg_size, (char *) err_msg);
722
723 slave_transmit ();
724}
725
726
727static void
728slave_remove ()
729{
730 test = TEST_SLAVE_REMOVE;
731 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
732 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
733 &slave_remove_cb, chn);
734}
735
736
737static void
738slave_add_cb (void *cls, int64_t result,
739 const void *err_msg, uint16_t err_msg_size)
740{
741 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
742 "Test #%d: slave_add:\t%" PRId64 " (%.*s)\n",
743 test, result, err_msg_size, (char *) err_msg);
744 slave_remove ();
745}
746
747
748static void
749slave_add ()
750{
751 test = TEST_SLAVE_ADD;
752 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
753 GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
754}
755
756
757static void
758schedule_second_slave_join (void *cls)
759{
760 slave_join (TEST_SLAVE_JOIN_ACCEPT);
761}
762
763
764static void
765first_slave_parted (void *cls)
766{
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n");
768 GNUNET_SCHEDULER_add_now (&schedule_second_slave_join, NULL);
769}
770
771
772static void
773schedule_first_slave_part (void *cls)
774{
775 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &first_slave_parted, NULL);
776}
777
778
779static void
780join_decision_cb (void *cls,
781 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
782 int is_admitted,
783 const struct GNUNET_PSYC_Message *join_msg)
784{
785 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
786 "Test #%d: Slave got join decision: %d\n", test, is_admitted);
787
788 switch (test)
789 {
790 case TEST_SLAVE_JOIN_REJECT:
791 GNUNET_assert (0 == is_admitted);
792 GNUNET_assert (1 == join_req_count);
793 GNUNET_SCHEDULER_add_now (&schedule_first_slave_part, NULL);
794 break;
795
796 case TEST_SLAVE_JOIN_ACCEPT:
797 GNUNET_assert (1 == is_admitted);
798 GNUNET_assert (2 == join_req_count);
799 slave_add ();
800 break;
801
802 default:
803 GNUNET_break (0);
804 }
805}
806
807
808static void
809join_request_cb (void *cls,
810 const struct GNUNET_PSYC_JoinRequestMessage *req,
811 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
812 const struct GNUNET_PSYC_Message *join_msg,
813 struct GNUNET_PSYC_JoinHandle *jh)
814{
815 struct GNUNET_HashCode slave_key_hash;
816 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
817 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
818 "Test #%d: Got join request #%u from %s.\n",
819 test, join_req_count, GNUNET_h2s (&slave_key_hash));
820
821 /* Reject first request */
822 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
823 GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
824}
825
826
827static void
828slave_connect_cb (void *cls, int result, uint64_t max_message_id)
829{
830 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
831 "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n",
832 test, result, max_message_id);
833 GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test);
834 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
835}
836
837
838static void
839slave_join (int t)
840{
841 test = t;
842 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
843 "Test #%d: Joining slave.\n", t);
844
845 struct GNUNET_PeerIdentity origin = this_peer;
846 struct GNUNET_PSYC_Environment *env = GNUNET_PSYC_env_create ();
847 GNUNET_PSYC_env_add (env, GNUNET_PSYC_OP_ASSIGN,
848 "_foo", "bar baz", 7);
849 GNUNET_PSYC_env_add (env, GNUNET_PSYC_OP_ASSIGN,
850 "_foo_bar", "foo bar baz", 11);
851 struct GNUNET_PSYC_Message *
852 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9);
853
854 slv = GNUNET_PSYC_slave_join (cfg,
855 &channel_pub_key,
856 slave_key,
857 GNUNET_PSYC_SLAVE_JOIN_NONE,
858 &origin,
859 0,
860 NULL,
861 &slave_message_cb,
862 &slave_message_part_cb,
863 &slave_connect_cb,
864 &join_decision_cb,
865 NULL,
866 join_msg);
867 GNUNET_free (join_msg);
868 slv_chn = GNUNET_PSYC_slave_get_channel (slv);
869 GNUNET_PSYC_env_destroy (env);
870}
871
872
873static void
874master_transmit ()
875{
876 test = TEST_MASTER_TRANSMIT;
877 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
878 "Test #%d: Master sending message to all.\n", test);
879 end_count = 0;
880
881 uint32_t i, j;
882
883 char *name_max = "_test_max";
884 uint8_t name_max_size = sizeof ("_test_max");
885 char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
886 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
887 val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
888
889 char *name_cont = "_test_cont";
890 uint8_t name_cont_size = sizeof ("_test_cont");
891 char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
892 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
893 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
894 val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
895 for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
896 val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
897
898 tmit = GNUNET_new (struct TransmitClosure);
899 tmit->env = GNUNET_PSYC_env_create ();
900 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
901 "_foo", "bar baz", 7);
902 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
903 name_max, val_max,
904 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
905 - name_max_size);
906 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
907 "_foo_bar", "foo bar baz", 11);
908 GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
909 name_cont, val_cont,
910 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
911 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
912 tmit->mod = GNUNET_PSYC_env_head (tmit->env);
913 tmit->data[0] = "foo";
914 tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
915 for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
916 tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
917 tmit->data[2] = "foo bar";
918 tmit->data[3] = "foo bar baz";
919 tmit->data_delay[1] = 3;
920 tmit->data_count = 4;
921 tmit->mst_tmit
922 = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
923 &tmit_notify_data, tmit,
924 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
925}
926
927
928static void
929master_start_cb (void *cls, int result, uint64_t max_message_id)
930{
931 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932 "Test #%d: Master started: %d, max_message_id: %" PRIu64 "\n",
933 test, result, max_message_id);
934 GNUNET_assert (TEST_MASTER_START == test);
935 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
936 slave_join (TEST_SLAVE_JOIN_REJECT);
937}
938
939
940static void
941master_start ()
942{
943 test = TEST_MASTER_START;
944 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
945 "Test #%d: Starting master.\n", test);
946 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
947 &master_start_cb, &join_request_cb,
948 &master_message_cb, &master_message_part_cb,
949 NULL);
950 mst_chn = GNUNET_PSYC_master_get_channel (mst);
951}
952
953
954static void
955schedule_master_start (void *cls)
956{
957 master_start ();
958}
959
960
961/**
962 * Main function of the test, run from scheduler.
963 *
964 * @param cls NULL
965 * @param cfg configuration we use (also to connect to PSYC service)
966 * @param peer handle to access more of the peer (not used)
967 */
968static void
969#if DEBUG_TEST_PSYC
970run (void *cls, char *const *args, const char *cfgfile,
971 const struct GNUNET_CONFIGURATION_Handle *c)
972#else
973run (void *cls,
974 const struct GNUNET_CONFIGURATION_Handle *c,
975 struct GNUNET_TESTING_Peer *peer)
976#endif
977{
978 cfg = c;
979 end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL);
980
981 GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
982
983 channel_key = GNUNET_CRYPTO_eddsa_key_create ();
984 slave_key = GNUNET_CRYPTO_ecdsa_key_create ();
985
986 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
987 GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key);
988
989#if DEBUG_TEST_PSYC
990 master_start ();
991#else
992 /* Allow some time for the services to initialize. */
993 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
994 &schedule_master_start, NULL);
995#endif
996}
997
998
999int
1000main (int argc, char *argv[])
1001{
1002 res = 1;
1003#if DEBUG_TEST_PSYC
1004 const struct GNUNET_GETOPT_CommandLineOption opts[] = {
1005 GNUNET_GETOPT_OPTION_END
1006 };
1007 if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "test-psyc",
1008 "test-psyc [options]",
1009 opts, &run, NULL))
1010 return 1;
1011#else
1012 if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL))
1013 return 1;
1014#endif
1015 return res;
1016}
1017
1018/* end of test_psyc.c */
diff --git a/src/psyc/test_psyc2.c b/src/psyc/test_psyc2.c
new file mode 100644
index 0000000..c6e7237
--- /dev/null
+++ b/src/psyc/test_psyc2.c
@@ -0,0 +1,284 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/test_psyc2.c
23 * @brief Testbed test for the PSYC API.
24 * @author xrs
25 */
26
27#include "platform.h"
28#include "gnunet_crypto_lib.h"
29#include "gnunet_common.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_testbed_service.h"
32#include "gnunet_psyc_util_lib.h"
33#include "gnunet_psyc_service.h"
34
35#define PEERS_REQUESTED 2
36
37static int result;
38
39static struct GNUNET_SCHEDULER_Task *timeout_tid;
40static struct pctx **pctx;
41
42static struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
43static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key;
44
45static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
46static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
47
48/**
49 * Task To perform tests
50 */
51static struct GNUNET_SCHEDULER_Task *test_task;
52
53/**
54 * Peer id couter
55 */
56static unsigned int pids;
57
58struct pctx
59{
60 int idx;
61 struct GNUNET_TESTBED_Peer *peer;
62 const struct GNUNET_PeerIdentity *id;
63
64 struct GNUNET_TESTBED_Operation *op;
65
66 /**
67 * psyc service handle
68 */
69 void *psyc;
70 struct GNUNET_PSYC_Master *mst;
71 struct GNUNET_PSYC_Slave *slv;
72
73 /**
74 * result for test on peer
75 */
76 int test_ok;
77};
78
79static void
80shutdown_task (void *cls)
81{
82 if (NULL != pctx)
83 {
84 if (NULL != pctx[0]->mst)
85 GNUNET_PSYC_master_stop (pctx[0]->mst, GNUNET_NO, NULL, NULL);
86
87 for (int i=0; i < PEERS_REQUESTED; i++)
88 {
89 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Operation done.\n");
90 GNUNET_TESTBED_operation_done (pctx[i]->op);
91 GNUNET_free_non_null (pctx[i]);
92 }
93 GNUNET_free (pctx);
94 }
95
96 if (NULL != timeout_tid)
97 GNUNET_SCHEDULER_cancel (timeout_tid);
98}
99
100static void
101timeout_task (void *cls)
102{
103 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Timeout!\n");
104 result = GNUNET_SYSERR;
105 GNUNET_SCHEDULER_shutdown ();
106}
107
108static void
109start_test (void *cls)
110{
111}
112
113static void
114pinfo_cb (void *cls,
115 struct GNUNET_TESTBED_Operation *operation,
116 const struct GNUNET_TESTBED_PeerInformation *pinfo,
117 const char *emsg)
118{
119 struct pctx *pc = (struct pctx*) cls;
120
121 pc->id = pinfo->result.id;
122
123 pids++;
124 if (pids < (PEERS_REQUESTED - 1))
125 return;
126 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n");
127 test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
128}
129
130static void
131mst_start_cb ()
132{
133}
134
135static void
136join_request_cb ()
137{
138}
139
140static void
141mst_message_cb ()
142{
143}
144
145static void
146mst_message_part_cb ()
147{
148}
149
150static void
151slv_message_cb ()
152{
153}
154
155static void
156slv_message_part_cb ()
157{
158}
159
160static void
161slv_connect_cb ()
162{
163}
164
165static void
166join_decision_cb ()
167{
168}
169
170static void *
171psyc_ca (void *cls,
172 const struct GNUNET_CONFIGURATION_Handle *cfg)
173{
174 struct GNUNET_PSYC_Message *join_msg = NULL;
175 struct pctx *pc = (struct pctx *) cls;
176
177 if (0 == pc->idx)
178 {
179 pc->mst = GNUNET_PSYC_master_start (cfg, channel_key,
180 GNUNET_PSYC_CHANNEL_PRIVATE,
181 &mst_start_cb, &join_request_cb,
182 &mst_message_cb, &mst_message_part_cb,
183 NULL);
184 return pc->mst;
185 }
186
187 pc->slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key,
188 GNUNET_PSYC_SLAVE_JOIN_NONE,
189 &pid, 0, NULL, &slv_message_cb,
190 &slv_message_part_cb,
191 &slv_connect_cb, &join_decision_cb,
192 NULL, join_msg);
193 return pc->slv;
194}
195
196static void
197psyc_da (void *cls,
198 void *op_result)
199{
200 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Disconnected from service.\n");
201}
202
203static void
204service_connect (void *cls,
205 struct GNUNET_TESTBED_Operation *op,
206 void *ca_result,
207 const char *emsg)
208{
209 struct pctx *pc = (struct pctx *) cls;
210
211 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
212 "Connected to service\n");
213
214 GNUNET_assert (NULL != ca_result);
215
216 // FIXME: we need a simple service handle to connect to the service, then
217 // get peer information and AFTER that make PSYC ops. Compare to CADET.
218 pc->psyc = ca_result;
219
220 GNUNET_TESTBED_peer_get_information (pc->peer,
221 GNUNET_TESTBED_PIT_IDENTITY,
222 pinfo_cb, pc);
223}
224
225static void
226testbed_master (void *cls,
227 struct GNUNET_TESTBED_RunHandle *h,
228 unsigned int num_peers,
229 struct GNUNET_TESTBED_Peer **p,
230 unsigned int links_succeeded,
231 unsigned int links_failed)
232{
233 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connected to testbed_master()\n");
234
235 // Create ctx for peers
236 pctx = GNUNET_new_array (PEERS_REQUESTED, struct pctx*);
237 for (int i = 0; i<PEERS_REQUESTED; i++)
238 {
239 pctx[i] = GNUNET_new (struct pctx);
240 pctx[i]->idx = i;
241 pctx[i]->peer = p[i];
242 pctx[i]->id = NULL;
243 pctx[i]->mst = NULL;
244 pctx[i]->op = NULL;
245 pctx[i]->test_ok = GNUNET_NO;
246 }
247
248 channel_key = GNUNET_CRYPTO_eddsa_key_create ();
249 slave_key = GNUNET_CRYPTO_ecdsa_key_create ();
250
251 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
252 GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key);
253
254 pctx[0]->op =
255 GNUNET_TESTBED_service_connect (NULL, p[0], "psyc", service_connect,
256 pctx[0], psyc_ca, psyc_da, pctx[0]);
257
258 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
259
260 timeout_tid =
261 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
262 &timeout_task, NULL);
263}
264
265int
266main (int argc, char *argv[])
267{
268 int ret;
269
270 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test\n");
271
272 result = GNUNET_SYSERR;
273
274 ret = GNUNET_TESTBED_test_run ("test-psyc2", "test_psyc.conf",
275 PEERS_REQUESTED, 0LL, NULL, NULL,
276 testbed_master, NULL);
277
278 if ((GNUNET_OK != ret) || (GNUNET_OK != result))
279 return 1;
280
281 return 0;
282}
283
284/* end of test-psyc2.c */
diff --git a/src/psyc/test_psyc_api_join.c b/src/psyc/test_psyc_api_join.c
new file mode 100644
index 0000000..419fa11
--- /dev/null
+++ b/src/psyc/test_psyc_api_join.c
@@ -0,0 +1,282 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/test_psyc_api_join.c
23 * @brief Testbed test for the PSYC API.
24 * @author xrs
25 */
26
27/**
28 * Lessons Learned:
29 * - define topology in config
30 * - psyc slave join needs part to end (same with master)
31 * - GNUNET_SCHEDULER_add_delayed return value will outdate at call time
32 * - main can not contain GNUNET_log()
33 */
34
35#include "platform.h"
36#include "gnunet_crypto_lib.h"
37#include "gnunet_common.h"
38#include "gnunet_util_lib.h"
39#include "gnunet_testbed_service.h"
40#include "gnunet_psyc_util_lib.h"
41#include "gnunet_psyc_service.h"
42#include "psyc_test_lib.h"
43
44static struct pctx PEERS[2];
45
46static int pids;
47
48
49static void
50shutdown_task (void *cls)
51{
52 if (NULL != timeout_task_id) {
53 GNUNET_SCHEDULER_cancel (timeout_task_id);
54 timeout_task_id = NULL;
55 }
56
57 for (int i=0;i<2;i++) {
58 GNUNET_free (PEERS[i].channel_pub_key);
59
60 if (NULL != PEERS[i].psyc)
61 {
62 if (0 == i)
63 GNUNET_PSYC_master_stop (PEERS[i].psyc, GNUNET_NO, NULL, NULL);
64 else
65 GNUNET_PSYC_slave_part (PEERS[i].psyc, GNUNET_NO, NULL, NULL);
66 }
67 }
68
69 for (int i=0;i<MAX_TESTBED_OPS;i++)
70 if (NULL != op[i])
71 GNUNET_TESTBED_operation_done (op[i]);
72
73 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Shut down!\n");
74}
75
76static void
77timeout_task (void *cls)
78{
79 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Timeout!\n");
80
81 timeout_task_id = NULL;
82
83 result = GNUNET_SYSERR;
84 GNUNET_SCHEDULER_shutdown ();
85}
86
87static void
88join_decision_cb (void *cls,
89 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
90 int is_admitted,
91 const struct GNUNET_PSYC_Message *join_msg)
92{
93 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
94 "slave: got join decision: %s\n",
95 (GNUNET_YES == is_admitted) ? "admitted":"rejected");
96
97 result = (GNUNET_YES == is_admitted) ? GNUNET_OK : GNUNET_SYSERR;
98
99 GNUNET_SCHEDULER_shutdown ();
100}
101
102static void
103join_request_cb (void *cls,
104 const struct GNUNET_PSYC_JoinRequestMessage *req,
105 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
106 const struct GNUNET_PSYC_Message *join_msg,
107 struct GNUNET_PSYC_JoinHandle *jh)
108{
109 struct GNUNET_HashCode slave_key_hash;
110
111 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "master: got join request.\n");
112
113 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
114
115 GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, NULL);
116}
117
118static void
119psyc_da ()
120{
121 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnect form PSYC service\n");
122}
123
124static void *
125psyc_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
126{
127 struct pctx *peer = (struct pctx*) cls;
128
129 // Case: master role
130 if (0 == peer->idx) {
131 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to PSYC as master ...\n");
132
133 peer->psyc = (struct GNUNET_PSYC_Master *)
134 GNUNET_PSYC_master_start (cfg,
135 peer->channel_key,
136 GNUNET_PSYC_CHANNEL_PRIVATE,
137 NULL,
138 join_request_cb,
139 NULL,
140 NULL,
141 cls);
142 return peer->psyc;
143 }
144
145 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to PSYC as slave ...\n");
146
147 struct GNUNET_PSYC_Environment *env = GNUNET_PSYC_env_create ();
148 GNUNET_PSYC_env_add (env, GNUNET_PSYC_OP_ASSIGN, "_foo", "bar baz", 7);
149 GNUNET_PSYC_env_add (env, GNUNET_PSYC_OP_ASSIGN, "_foo_bar", "foo bar baz", 11);
150
151 struct GNUNET_PSYC_Message *
152 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 40);
153
154 peer->psyc = (struct GNUNET_PSYC_Slave *)
155 GNUNET_PSYC_slave_join (cfg,
156 peer->channel_pub_key,
157 peer->id_key,
158 GNUNET_PSYC_SLAVE_JOIN_NONE,
159 peer->peer_id_master,
160 0,
161 NULL,
162 NULL,
163 NULL,
164 NULL,
165 join_decision_cb,
166 cls,
167 join_msg);
168
169 GNUNET_free (join_msg);
170 peer->channel = GNUNET_PSYC_slave_get_channel (peer->psyc);
171 GNUNET_PSYC_env_destroy (env);
172
173 return peer->psyc;
174}
175
176static void
177service_connect (void *cls,
178 struct GNUNET_TESTBED_Operation *op,
179 void *ca_result,
180 const char *emsg)
181{
182 GNUNET_assert (NULL != ca_result);
183
184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connected to the service\n");
185}
186
187static void
188connect_to_services (void *cls)
189{
190 for (int i = 0; i < 2; i++)
191 {
192 PEERS[i].peer_id_master = PEERS[0].peer_id;
193
194 op[op_cnt++] =
195 GNUNET_TESTBED_service_connect (NULL, PEERS[i].testbed_peer, "psyc",
196 &service_connect, &PEERS[i], &psyc_ca,
197 &psyc_da, &PEERS[i]);
198 }
199}
200
201static void
202pinfo_cb (void *cls,
203 struct GNUNET_TESTBED_Operation *operation,
204 const struct GNUNET_TESTBED_PeerInformation *pinfo,
205 const char *emsg)
206{
207 struct pctx *peer = (struct pctx*) cls;
208
209 peer->peer_id = pinfo->result.id;
210
211 pids++;
212 if (pids < 2)
213 return;
214 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting test\n");
215
216 GNUNET_SCHEDULER_add_now (&connect_to_services, NULL);
217}
218
219static void
220testbed_master (void *cls,
221 struct GNUNET_TESTBED_RunHandle *h,
222 unsigned int num_peers,
223 struct GNUNET_TESTBED_Peer **p,
224 unsigned int links_succeeded,
225 unsigned int links_failed)
226{
227 struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key = NULL;
228
229 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connected to testbed_master\n");
230
231 // Set up shutdown logic
232 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
233 timeout_task_id =
234 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 15),
235 &timeout_task, NULL);
236 GNUNET_assert (NULL != timeout_task_id);
237
238 // Set up channel key
239 channel_key = GNUNET_CRYPTO_eddsa_key_create ();
240 GNUNET_assert (NULL != channel_key);
241
242 // Set up information contexts for peers
243 for (int i=0 ; i < 2 ; i++)
244 {
245 PEERS[i].idx = i;
246 PEERS[i].testbed_peer = p[i];
247
248 // Create "egos"
249 PEERS[i].id_key = GNUNET_CRYPTO_ecdsa_key_create ();
250
251 // Set up channel keys shared by master and slave
252 PEERS[i].channel_key = channel_key;
253
254 PEERS[i].channel_pub_key =
255 GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_EddsaPublicKey));
256 // Get public key
257 GNUNET_CRYPTO_eddsa_key_get_public (PEERS[i].channel_key,
258 PEERS[i].channel_pub_key);
259 // Get peerinfo
260 op[op_cnt++] =
261 GNUNET_TESTBED_peer_get_information (p[i],
262 GNUNET_TESTBED_PIT_IDENTITY,
263 pinfo_cb, &PEERS[i]);
264 }
265}
266
267int
268main (int argc, char *argv[])
269{
270 int ret;
271
272 ret = GNUNET_TESTBED_test_run ("test_psyc_api_join", "test_psyc.conf",
273 2, 0LL, NULL, NULL,
274 &testbed_master, NULL);
275
276 if ( (GNUNET_OK != ret) || (GNUNET_OK != result) )
277 return 1;
278
279 return 0;
280}
281
282/* end of test_psyc_api_join.c */