aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/gnunet-service-psyc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psyc/gnunet-service-psyc.c')
-rw-r--r--src/psyc/gnunet-service-psyc.c2860
1 files changed, 2860 insertions, 0 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
new file mode 100644
index 0000000..6f2f7a9
--- /dev/null
+++ b/src/psyc/gnunet-service-psyc.c
@@ -0,0 +1,2860 @@
1/*
2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
4 *
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file psyc/gnunet-service-psyc.c
23 * @brief PSYC service
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_protocols.h"
33#include "gnunet_statistics_service.h"
34#include "gnunet_multicast_service.h"
35#include "gnunet_psycstore_service.h"
36#include "gnunet_psyc_service.h"
37#include "gnunet_psyc_util_lib.h"
38#include "psyc.h"
39
40
41/**
42 * Handle to our current configuration.
43 */
44static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46/**
47 * Service handle.
48 */
49static struct GNUNET_SERVICE_Handle *service;
50
51/**
52 * Handle to the statistics service.
53 */
54static struct GNUNET_STATISTICS_Handle *stats;
55
56/**
57 * Handle to the PSYCstore.
58 */
59static struct GNUNET_PSYCSTORE_Handle *store;
60
61/**
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
64 */
65static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67/**
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
70 */
71static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73/**
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76 */
77static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80/**
81 * Message in the transmission queue.
82 */
83struct TransmitMessage
84{
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
87
88 struct GNUNET_SERVICE_Client *client;
89
90 /**
91 * ID assigned to the message.
92 */
93 uint64_t id;
94
95 /**
96 * Size of message.
97 */
98 uint16_t size;
99
100 /**
101 * Type of first message part.
102 */
103 uint16_t first_ptype;
104
105 /**
106 * Type of last message part.
107 */
108 uint16_t last_ptype;
109
110 /* Followed by message */
111};
112
113
114/**
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
117 *
118 * chan_key -> MultiHashMap chan_msgs
119 */
120static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123/**
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
126 */
127struct RecvCacheEntry
128{
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
130 uint16_t ref_count;
131};
132
133
134/**
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
137 */
138struct FragmentQueue
139{
140 /**
141 * Fragment IDs stored in @a recv_cache.
142 */
143 struct GNUNET_CONTAINER_Heap *fragments;
144
145 /**
146 * Total size of received fragments.
147 */
148 uint64_t size;
149
150 /**
151 * Total size of received header fragments (METHOD & MODIFIERs)
152 */
153 uint64_t header_size;
154
155 /**
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157 */
158 uint64_t state_delta;
159
160 /**
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162 */
163 uint32_t flags;
164
165 /**
166 * Receive state of message.
167 *
168 * @see MessageFragmentState
169 */
170 uint8_t state;
171
172 /**
173 * Whether the state is already modified in PSYCstore.
174 */
175 uint8_t state_is_modified;
176
177 /**
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
180 */
181 uint8_t is_queued;
182};
183
184
185/**
186 * List of connected clients.
187 */
188struct ClientList
189{
190 struct ClientList *prev;
191 struct ClientList *next;
192
193 struct GNUNET_SERVICE_Client *client;
194};
195
196
197struct Operation
198{
199 struct Operation *prev;
200 struct Operation *next;
201
202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *channel;
204 uint64_t op_id;
205 uint32_t flags;
206};
207
208
209/**
210 * Common part of the client context for both a channel master and slave.
211 */
212struct Channel
213{
214 struct ClientList *clients_head;
215 struct ClientList *clients_tail;
216
217 struct Operation *op_head;
218 struct Operation *op_tail;
219
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
222
223 /**
224 * Current PSYCstore operation.
225 */
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228 /**
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
231 */
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234 /**
235 * Received message IDs not yet sent to the client.
236 */
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239 /**
240 * Public key of the channel.
241 */
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244 /**
245 * Hash of @a pub_key.
246 */
247 struct GNUNET_HashCode pub_key_hash;
248
249 /**
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
252 */
253 uint64_t max_message_id;
254
255 /**
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
259 */
260 uint64_t max_state_message_id;
261
262 /**
263 * Expected value size for the modifier being received from the PSYC service.
264 */
265 uint32_t tmit_mod_value_size_expected;
266
267 /**
268 * Actual value size for the modifier being received from the PSYC service.
269 */
270 uint32_t tmit_mod_value_size;
271
272 /**
273 * Is this channel ready to receive messages from client?
274 * #GNUNET_YES or #GNUNET_NO
275 */
276 uint8_t is_ready;
277
278 /**
279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO
281 */
282 uint8_t is_disconnecting;
283
284 /**
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286 */
287 uint8_t is_master;
288
289 union {
290 struct Master *master;
291 struct Slave *slave;
292 };
293};
294
295
296/**
297 * Client context for a channel master.
298 */
299struct Master
300{
301 /**
302 * Channel struct common for Master and Slave
303 */
304 struct Channel channel;
305
306 /**
307 * Private key of the channel.
308 */
309 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310
311 /**
312 * Handle for the multicast origin.
313 */
314 struct GNUNET_MULTICAST_Origin *origin;
315
316 /**
317 * Transmit handle for multicast.
318 */
319 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320
321 /**
322 * Incoming join requests from multicast.
323 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
324 */
325 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326
327 /**
328 * Last message ID transmitted to this channel.
329 *
330 * Incremented before sending a message, thus the message_id in messages sent
331 * starts from 1.
332 */
333 uint64_t max_message_id;
334
335 /**
336 * ID of the last message with state operations transmitted to the channel.
337 * 0 if there is no such message.
338 */
339 uint64_t max_state_message_id;
340
341 /**
342 * Maximum group generation transmitted to the channel.
343 */
344 uint64_t max_group_generation;
345
346 /**
347 * @see enum GNUNET_PSYC_Policy
348 */
349 enum GNUNET_PSYC_Policy policy;
350};
351
352
353/**
354 * Client context for a channel slave.
355 */
356struct Slave
357{
358 /**
359 * Channel struct common for Master and Slave
360 */
361 struct Channel channel;
362
363 /**
364 * Private key of the slave.
365 */
366 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367
368 /**
369 * Public key of the slave.
370 */
371 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372
373 /**
374 * Hash of @a pub_key.
375 */
376 struct GNUNET_HashCode pub_key_hash;
377
378 /**
379 * Handle for the multicast member.
380 */
381 struct GNUNET_MULTICAST_Member *member;
382
383 /**
384 * Transmit handle for multicast.
385 */
386 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387
388 /**
389 * Peer identity of the origin.
390 */
391 struct GNUNET_PeerIdentity origin;
392
393 /**
394 * Number of items in @a relays.
395 */
396 uint32_t relay_count;
397
398 /**
399 * Relays that multicast can use to connect.
400 */
401 struct GNUNET_PeerIdentity *relays;
402
403 /**
404 * Join request to be transmitted to the master on join.
405 */
406 struct GNUNET_PSYC_Message *join_msg;
407
408 /**
409 * Join decision received from multicast.
410 */
411 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412
413 /**
414 * Maximum request ID for this channel.
415 */
416 uint64_t max_request_id;
417
418 /**
419 * Join flags.
420 */
421 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
422};
423
424
425/**
426 * Client context.
427 */
428struct Client {
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
431};
432
433
434struct ReplayRequestKey
435{
436 uint64_t fragment_id;
437 uint64_t message_id;
438 uint64_t fragment_offset;
439 uint64_t flags;
440};
441
442
443static void
444transmit_message (struct Channel *chn);
445
446static uint64_t
447message_queue_run (struct Channel *chn);
448
449static uint64_t
450message_queue_drop (struct Channel *chn);
451
452
453static void
454schedule_transmit_message (void *cls)
455{
456 struct Channel *chn = cls;
457
458 transmit_message (chn);
459}
460
461
462/**
463 * Task run during shutdown.
464 *
465 * @param cls unused
466 */
467static void
468shutdown_task (void *cls)
469{
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "shutting down...\n");
472 GNUNET_PSYCSTORE_disconnect (store);
473 if (NULL != stats)
474 {
475 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
476 stats = NULL;
477 }
478}
479
480
481static struct Operation *
482op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483 uint64_t op_id, uint32_t flags)
484{
485 struct Operation *op = GNUNET_malloc (sizeof (*op));
486 op->client = client;
487 op->channel = chn;
488 op->op_id = op_id;
489 op->flags = flags;
490 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
491 return op;
492}
493
494
495static void
496op_remove (struct Operation *op)
497{
498 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
499 GNUNET_free (op);
500}
501
502
503/**
504 * Clean up master data structures after a client disconnected.
505 */
506static void
507cleanup_master (struct Master *mst)
508{
509 struct Channel *chn = &mst->channel;
510
511 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
512 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
513}
514
515
516/**
517 * Clean up slave data structures after a client disconnected.
518 */
519static void
520cleanup_slave (struct Slave *slv)
521{
522 struct Channel *chn = &slv->channel;
523 struct GNUNET_CONTAINER_MultiHashMap *
524 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
525 &chn->pub_key_hash);
526 GNUNET_assert (NULL != chn_slv);
527 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
528
529 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
530 {
531 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
532 chn_slv);
533 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
534 }
535 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
536
537 if (NULL != slv->join_msg)
538 {
539 GNUNET_free (slv->join_msg);
540 slv->join_msg = NULL;
541 }
542 if (NULL != slv->relays)
543 {
544 GNUNET_free (slv->relays);
545 slv->relays = NULL;
546 }
547 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
548}
549
550
551/**
552 * Clean up channel data structures after a client disconnected.
553 */
554static void
555cleanup_channel (struct Channel *chn)
556{
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "%p Cleaning up channel %s. master? %u\n",
559 chn,
560 GNUNET_h2s (&chn->pub_key_hash),
561 chn->is_master);
562 message_queue_drop (chn);
563 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
564 chn->recv_frags = NULL;
565
566 if (NULL != chn->store_op)
567 {
568 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
569 chn->store_op = NULL;
570 }
571
572 (GNUNET_YES == chn->is_master)
573 ? cleanup_master (chn->master)
574 : cleanup_slave (chn->slave);
575 GNUNET_free (chn);
576}
577
578
579/**
580 * Called whenever a client is disconnected.
581 * Frees our resources associated with that client.
582 *
583 * @param cls closure
584 * @param client identification of the client
585 * @param app_ctx must match @a client
586 */
587static void
588client_notify_disconnect (void *cls,
589 struct GNUNET_SERVICE_Client *client,
590 void *app_ctx)
591{
592 struct Client *c = app_ctx;
593 struct Channel *chn = c->channel;
594 GNUNET_free (c);
595
596 if (NULL == chn)
597 {
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "%p User context is NULL in client_notify_disconnect ()\n",
600 chn);
601 GNUNET_break (0);
602 return;
603 }
604
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "%p Client %p (%s) disconnected from channel %s\n",
607 chn,
608 client,
609 (GNUNET_YES == chn->is_master) ? "master" : "slave",
610 GNUNET_h2s (&chn->pub_key_hash));
611
612 struct ClientList *cli = chn->clients_head;
613 while (NULL != cli)
614 {
615 if (cli->client == client)
616 {
617 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
618 GNUNET_free (cli);
619 break;
620 }
621 cli = cli->next;
622 }
623
624 struct Operation *op = chn->op_head;
625 while (NULL != op)
626 {
627 if (op->client == client)
628 {
629 op->client = NULL;
630 break;
631 }
632 op = op->next;
633 }
634
635 if (NULL == chn->clients_head)
636 { /* Last client disconnected. */
637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638 "%p Last client (%s) disconnected from channel %s\n",
639 chn,
640 (GNUNET_YES == chn->is_master) ? "master" : "slave",
641 GNUNET_h2s (&chn->pub_key_hash));
642 chn->is_disconnecting = GNUNET_YES;
643 cleanup_channel (chn);
644 }
645}
646
647
648/**
649 * A new client connected.
650 *
651 * @param cls NULL
652 * @param client client to add
653 * @param mq message queue for @a client
654 * @return @a client
655 */
656static void *
657client_notify_connect (void *cls,
658 struct GNUNET_SERVICE_Client *client,
659 struct GNUNET_MQ_Handle *mq)
660{
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
662
663 struct Client *c = GNUNET_malloc (sizeof (*c));
664 c->client = client;
665
666 return c;
667}
668
669
670/**
671 * Send message to all clients connected to the channel.
672 */
673static void
674client_send_msg (const struct Channel *chn,
675 const struct GNUNET_MessageHeader *msg)
676{
677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678 "Sending message to clients of channel %p.\n",
679 chn);
680
681 struct ClientList *cli = chn->clients_head;
682 while (NULL != cli)
683 {
684 struct GNUNET_MQ_Envelope *
685 env = GNUNET_MQ_msg_copy (msg);
686
687 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
688 env);
689 cli = cli->next;
690 }
691}
692
693
694/**
695 * Send a result code back to the client.
696 *
697 * @param client
698 * Client that should receive the result code.
699 * @param result_code
700 * Code to transmit.
701 * @param op_id
702 * Operation ID in network byte order.
703 * @param data
704 * Data payload or NULL.
705 * @param data_size
706 * Size of @a data.
707 */
708static void
709client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
710 int64_t result_code, const void *data, uint16_t data_size)
711{
712 struct GNUNET_OperationResultMessage *res;
713 struct GNUNET_MQ_Envelope *
714 env = GNUNET_MQ_msg_extra (res,
715 data_size,
716 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
717 res->result_code = GNUNET_htonll (result_code);
718 res->op_id = op_id;
719 if (0 < data_size)
720 GNUNET_memcpy (&res[1], data, data_size);
721
722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
724 client,
725 GNUNET_ntohll (op_id),
726 result_code,
727 data_size);
728
729 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
730}
731
732
733/**
734 * Closure for join_mem_test_cb()
735 */
736struct JoinMemTestClosure
737{
738 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
739 struct Channel *channel;
740 struct GNUNET_MULTICAST_JoinHandle *join_handle;
741 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
742};
743
744
745/**
746 * Membership test result callback used for join requests.
747 */
748static void
749join_mem_test_cb (void *cls, int64_t result,
750 const char *err_msg, uint16_t err_msg_size)
751{
752 struct JoinMemTestClosure *jcls = cls;
753
754 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
755 { /* Pass on join request to client if this is a master channel */
756 struct Master *mst = jcls->channel->master;
757 struct GNUNET_HashCode slave_pub_hash;
758 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
759 &slave_pub_hash);
760 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
761 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
762 client_send_msg (jcls->channel, &jcls->join_msg->header);
763 }
764 else
765 {
766 if (GNUNET_SYSERR == result)
767 {
768 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
769 "Could not perform membership test (%.*s)\n",
770 err_msg_size, err_msg);
771 }
772 // FIXME: add relays
773 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
774 }
775 GNUNET_free (jcls->join_msg);
776 GNUNET_free (jcls);
777}
778
779
780/**
781 * Incoming join request from multicast.
782 */
783static void
784mcast_recv_join_request (void *cls,
785 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
786 const struct GNUNET_MessageHeader *join_msg,
787 struct GNUNET_MULTICAST_JoinHandle *jh)
788{
789 struct Channel *chn = cls;
790 uint16_t join_msg_size = 0;
791
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "%p Got join request.\n",
794 chn);
795 if (NULL != join_msg)
796 {
797 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
798 {
799 join_msg_size = ntohs (join_msg->size);
800 }
801 else
802 {
803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
804 "%p Got join message with invalid type %u.\n",
805 chn,
806 ntohs (join_msg->type));
807 }
808 }
809
810 struct GNUNET_PSYC_JoinRequestMessage *
811 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
812 req->header.size = htons (sizeof (*req) + join_msg_size);
813 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
814 req->slave_pub_key = *slave_pub_key;
815 if (0 < join_msg_size)
816 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
817
818 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
819 jcls->slave_pub_key = *slave_pub_key;
820 jcls->channel = chn;
821 jcls->join_handle = jh;
822 jcls->join_msg = req;
823
824 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
825 chn->max_message_id, 0,
826 &join_mem_test_cb, jcls);
827}
828
829
830/**
831 * Join decision received from multicast.
832 */
833static void
834mcast_recv_join_decision (void *cls, int is_admitted,
835 const struct GNUNET_PeerIdentity *peer,
836 uint16_t relay_count,
837 const struct GNUNET_PeerIdentity *relays,
838 const struct GNUNET_MessageHeader *join_resp)
839{
840 struct Slave *slv = cls;
841 struct Channel *chn = &slv->channel;
842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843 "%p Got join decision: %d\n",
844 slv,
845 is_admitted);
846 if (GNUNET_YES == chn->is_ready)
847 {
848 /* Already admitted */
849 return;
850 }
851
852 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
853 struct GNUNET_PSYC_JoinDecisionMessage *
854 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
855 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
856 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
857 dcsn->is_admitted = htonl (is_admitted);
858 if (0 < join_resp_size)
859 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
860
861 client_send_msg (chn, &dcsn->header);
862
863 if (GNUNET_YES == is_admitted
864 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
865 {
866 chn->is_ready = GNUNET_YES;
867 }
868}
869
870
871static int
872store_recv_fragment_replay (void *cls,
873 struct GNUNET_MULTICAST_MessageHeader *msg,
874 enum GNUNET_PSYCSTORE_MessageFlags flags)
875{
876 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
877
878 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
879 return GNUNET_YES;
880}
881
882
883/**
884 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
885 */
886static void
887store_recv_fragment_replay_result (void *cls,
888 int64_t result,
889 const char *err_msg,
890 uint16_t err_msg_size)
891{
892 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
893
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
896 rh,
897 result,
898 err_msg_size,
899 err_msg);
900 switch (result)
901 {
902 case GNUNET_YES:
903 break;
904
905 case GNUNET_NO:
906 GNUNET_MULTICAST_replay_response (rh, NULL,
907 GNUNET_MULTICAST_REC_NOT_FOUND);
908 return;
909
910 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
911 GNUNET_MULTICAST_replay_response (rh, NULL,
912 GNUNET_MULTICAST_REC_ACCESS_DENIED);
913 return;
914
915 case GNUNET_SYSERR:
916 GNUNET_MULTICAST_replay_response (rh, NULL,
917 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
918 return;
919 }
920 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
921 * an error code, so it must be ensured no further processing
922 * is attempted on 'rh'. Maybe this should be refactored as
923 * it doesn't look very intuitive. --lynX
924 */
925 GNUNET_MULTICAST_replay_response_end (rh);
926}
927
928
929/**
930 * Incoming fragment replay request from multicast.
931 */
932static void
933mcast_recv_replay_fragment (void *cls,
934 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
935 uint64_t fragment_id, uint64_t flags,
936 struct GNUNET_MULTICAST_ReplayHandle *rh)
937
938{
939 struct Channel *chn = cls;
940 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
941 fragment_id, fragment_id,
942 &store_recv_fragment_replay,
943 &store_recv_fragment_replay_result, rh);
944}
945
946
947/**
948 * Incoming message replay request from multicast.
949 */
950static void
951mcast_recv_replay_message (void *cls,
952 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
953 uint64_t message_id,
954 uint64_t fragment_offset,
955 uint64_t flags,
956 struct GNUNET_MULTICAST_ReplayHandle *rh)
957{
958 struct Channel *chn = cls;
959 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
960 message_id, message_id, 1, NULL,
961 &store_recv_fragment_replay,
962 &store_recv_fragment_replay_result, rh);
963}
964
965
966/**
967 * Convert an uint64_t in network byte order to a HashCode
968 * that can be used as key in a MultiHashMap
969 */
970static inline void
971hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
972{
973 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
974 /* TODO: use built-in byte swap functions if available */
975
976 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
977 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
978
979 *key = (struct GNUNET_HashCode) {};
980 *((uint64_t *) key)
981 = (n << 32) | (n >> 32);
982}
983
984
985/**
986 * Convert an uint64_t in host byte order to a HashCode
987 * that can be used as key in a MultiHashMap
988 */
989static inline void
990hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
991{
992#if __BYTE_ORDER == __BIG_ENDIAN
993 hash_key_from_nll (key, n);
994#elif __BYTE_ORDER == __LITTLE_ENDIAN
995 *key = (struct GNUNET_HashCode) {};
996 *((uint64_t *) key) = n;
997#else
998 #error byteorder undefined
999#endif
1000}
1001
1002
1003/**
1004 * Initialize PSYC message header.
1005 */
1006static inline void
1007psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1008 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1009{
1010 uint16_t size = ntohs (mmsg->header.size);
1011 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1012
1013 pmsg->header.size = htons (psize);
1014 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1015 pmsg->message_id = mmsg->message_id;
1016 pmsg->fragment_offset = mmsg->fragment_offset;
1017 pmsg->flags = htonl (flags);
1018
1019 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1020}
1021
1022
1023/**
1024 * Create a new PSYC message from a multicast message for sending it to clients.
1025 */
1026static inline struct GNUNET_PSYC_MessageHeader *
1027psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1028{
1029 struct GNUNET_PSYC_MessageHeader *pmsg;
1030 uint16_t size = ntohs (mmsg->header.size);
1031 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1032
1033 pmsg = GNUNET_malloc (psize);
1034 psyc_msg_init (pmsg, mmsg, flags);
1035 return pmsg;
1036}
1037
1038
1039/**
1040 * Send multicast message to all clients connected to the channel.
1041 */
1042static void
1043client_send_mcast_msg (struct Channel *chn,
1044 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1045 uint32_t flags)
1046{
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1049 chn,
1050 GNUNET_ntohll (mmsg->fragment_id),
1051 GNUNET_ntohll (mmsg->message_id));
1052
1053 struct GNUNET_PSYC_MessageHeader *
1054 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1055 client_send_msg (chn, &pmsg->header);
1056 GNUNET_free (pmsg);
1057}
1058
1059
1060/**
1061 * Send multicast request to all clients connected to the channel.
1062 */
1063static void
1064client_send_mcast_req (struct Master *mst,
1065 const struct GNUNET_MULTICAST_RequestHeader *req)
1066{
1067 struct Channel *chn = &mst->channel;
1068
1069 struct GNUNET_PSYC_MessageHeader *pmsg;
1070 uint16_t size = ntohs (req->header.size);
1071 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1072
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1075 chn,
1076 GNUNET_ntohll (req->fragment_id),
1077 GNUNET_ntohll (req->request_id));
1078
1079 pmsg = GNUNET_malloc (psize);
1080 pmsg->header.size = htons (psize);
1081 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1082 pmsg->message_id = req->request_id;
1083 pmsg->fragment_offset = req->fragment_offset;
1084 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1085 pmsg->slave_pub_key = req->member_pub_key;
1086 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1087
1088 client_send_msg (chn, &pmsg->header);
1089
1090 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1091
1092 GNUNET_free (pmsg);
1093}
1094
1095
1096/**
1097 * Insert a multicast message fragment into the queue belonging to the message.
1098 *
1099 * @param chn Channel.
1100 * @param mmsg Multicast message fragment.
1101 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1102 * @param first_ptype First PSYC message part type in @a mmsg.
1103 * @param last_ptype Last PSYC message part type in @a mmsg.
1104 */
1105static void
1106fragment_queue_insert (struct Channel *chn,
1107 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1108 uint16_t first_ptype, uint16_t last_ptype)
1109{
1110 const uint16_t size = ntohs (mmsg->header.size);
1111 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1112 struct GNUNET_CONTAINER_MultiHashMap
1113 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1114 &chn->pub_key_hash);
1115
1116 struct GNUNET_HashCode msg_id_hash;
1117 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1118
1119 struct FragmentQueue
1120 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1121
1122 if (NULL == fragq)
1123 {
1124 fragq = GNUNET_malloc (sizeof (*fragq));
1125 fragq->state = MSG_FRAG_STATE_HEADER;
1126 fragq->fragments
1127 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1128
1129 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1130 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1131
1132 if (NULL == chan_msgs)
1133 {
1134 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1135 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1136 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1137 }
1138 }
1139
1140 struct GNUNET_HashCode frag_id_hash;
1141 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1142 struct RecvCacheEntry
1143 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1144 if (NULL == cache_entry)
1145 {
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1148 chn,
1149 GNUNET_ntohll (mmsg->message_id),
1150 GNUNET_ntohll (mmsg->fragment_id));
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "%p header_size: %" PRIu64 " + %u\n",
1153 chn,
1154 fragq->header_size,
1155 size);
1156 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1157 cache_entry->ref_count = 1;
1158 cache_entry->mmsg = GNUNET_malloc (size);
1159 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1160 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1161 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1162 }
1163 else
1164 {
1165 cache_entry->ref_count++;
1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1168 chn,
1169 GNUNET_ntohll (mmsg->message_id),
1170 GNUNET_ntohll (mmsg->fragment_id),
1171 cache_entry->ref_count);
1172 }
1173
1174 if (MSG_FRAG_STATE_HEADER == fragq->state)
1175 {
1176 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1177 {
1178 struct GNUNET_PSYC_MessageMethod *
1179 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1180 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1181 fragq->flags = ntohl (pmeth->flags);
1182 }
1183
1184 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1185 {
1186 fragq->header_size += size;
1187 }
1188 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1189 || frag_offset == fragq->header_size)
1190 { /* header is now complete */
1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192 "%p Header of message %" PRIu64 " is complete.\n",
1193 chn,
1194 GNUNET_ntohll (mmsg->message_id));
1195
1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197 "%p Adding message %" PRIu64 " to queue.\n",
1198 chn,
1199 GNUNET_ntohll (mmsg->message_id));
1200 fragq->state = MSG_FRAG_STATE_DATA;
1201 }
1202 else
1203 {
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1206 chn,
1207 GNUNET_ntohll (mmsg->message_id),
1208 frag_offset,
1209 fragq->header_size);
1210 }
1211 }
1212
1213 switch (last_ptype)
1214 {
1215 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1216 if (frag_offset == fragq->size)
1217 fragq->state = MSG_FRAG_STATE_END;
1218 else
1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1221 chn,
1222 GNUNET_ntohll (mmsg->message_id),
1223 frag_offset,
1224 fragq->size);
1225 break;
1226
1227 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1228 /* Drop message without delivering to client if it's a single fragment */
1229 fragq->state =
1230 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1231 ? MSG_FRAG_STATE_DROP
1232 : MSG_FRAG_STATE_CANCEL;
1233 }
1234
1235 switch (fragq->state)
1236 {
1237 case MSG_FRAG_STATE_DATA:
1238 case MSG_FRAG_STATE_END:
1239 case MSG_FRAG_STATE_CANCEL:
1240 if (GNUNET_NO == fragq->is_queued)
1241 {
1242 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1243 GNUNET_ntohll (mmsg->message_id));
1244 fragq->is_queued = GNUNET_YES;
1245 }
1246 }
1247
1248 fragq->size += size;
1249 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1250 GNUNET_ntohll (mmsg->fragment_id));
1251}
1252
1253
1254/**
1255 * Run fragment queue of a message.
1256 *
1257 * Send fragments of a message in order to client, after all modifiers arrived
1258 * from multicast.
1259 *
1260 * @param chn
1261 * Channel.
1262 * @param msg_id
1263 * ID of the message @a fragq belongs to.
1264 * @param fragq
1265 * Fragment queue of the message.
1266 * @param drop
1267 * Drop message without delivering to client?
1268 * #GNUNET_YES or #GNUNET_NO.
1269 */
1270static void
1271fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1272 struct FragmentQueue *fragq, uint8_t drop)
1273{
1274 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1275 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1276 chn,
1277 msg_id,
1278 fragq->state);
1279
1280 struct GNUNET_CONTAINER_MultiHashMap
1281 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1282 &chn->pub_key_hash);
1283 GNUNET_assert (NULL != chan_msgs);
1284 uint64_t frag_id;
1285
1286 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1287 &frag_id))
1288 {
1289 struct GNUNET_HashCode frag_id_hash;
1290 hash_key_from_hll (&frag_id_hash, frag_id);
1291 struct RecvCacheEntry *cache_entry
1292 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1293 if (cache_entry != NULL)
1294 {
1295 if (GNUNET_NO == drop)
1296 {
1297 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1298 }
1299 if (cache_entry->ref_count <= 1)
1300 {
1301 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1302 cache_entry);
1303 GNUNET_free (cache_entry->mmsg);
1304 GNUNET_free (cache_entry);
1305 }
1306 else
1307 {
1308 cache_entry->ref_count--;
1309 }
1310 }
1311#if CACHE_AGING_IMPLEMENTED
1312 else if (GNUNET_NO == drop)
1313 {
1314 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1315 }
1316#endif
1317
1318 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1319 }
1320
1321 if (MSG_FRAG_STATE_END <= fragq->state)
1322 {
1323 struct GNUNET_HashCode msg_id_hash;
1324 hash_key_from_hll (&msg_id_hash, msg_id);
1325
1326 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1327 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1328 GNUNET_free (fragq);
1329 }
1330 else
1331 {
1332 fragq->is_queued = GNUNET_NO;
1333 }
1334}
1335
1336
1337struct StateModifyClosure
1338{
1339 struct Channel *channel;
1340 uint64_t msg_id;
1341 struct GNUNET_HashCode msg_id_hash;
1342};
1343
1344
1345void
1346store_recv_state_modify_result (void *cls, int64_t result,
1347 const char *err_msg, uint16_t err_msg_size)
1348{
1349 struct StateModifyClosure *mcls = cls;
1350 struct Channel *chn = mcls->channel;
1351 uint64_t msg_id = mcls->msg_id;
1352
1353 struct FragmentQueue *
1354 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1355
1356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1358 chn, result, err_msg_size, err_msg);
1359
1360 switch (result)
1361 {
1362 case GNUNET_OK:
1363 case GNUNET_NO:
1364 if (NULL != fragq)
1365 fragq->state_is_modified = GNUNET_YES;
1366 if (chn->max_state_message_id < msg_id)
1367 chn->max_state_message_id = msg_id;
1368 if (chn->max_message_id < msg_id)
1369 chn->max_message_id = msg_id;
1370
1371 if (NULL != fragq)
1372 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1373 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1374 message_queue_run (chn);
1375 break;
1376
1377 default:
1378 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1379 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1380 chn, result, err_msg_size, err_msg);
1381 /** @todo FIXME: handle state_modify error */
1382 }
1383}
1384
1385
1386/**
1387 * Run message queue.
1388 *
1389 * Send messages in queue to client in order after a message has arrived from
1390 * multicast, according to the following:
1391 * - A message is only sent if all of its modifiers arrived.
1392 * - A stateful message is only sent if the previous stateful message
1393 * has already been delivered to the client.
1394 *
1395 * @param chn Channel.
1396 *
1397 * @return Number of messages removed from queue and sent to client.
1398 */
1399static uint64_t
1400message_queue_run (struct Channel *chn)
1401{
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "%p Running message queue.\n", chn);
1404 uint64_t n = 0;
1405 uint64_t msg_id;
1406
1407 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1408 &msg_id))
1409 {
1410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1411 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1412 struct GNUNET_HashCode msg_id_hash;
1413 hash_key_from_hll (&msg_id_hash, msg_id);
1414
1415 struct FragmentQueue *
1416 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1417
1418 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1419 {
1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421 "%p No fragq (%p) or header not complete.\n",
1422 chn, fragq);
1423 break;
1424 }
1425
1426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427 "%p Fragment queue entry: state: %u, state delta: "
1428 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1429 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1430
1431 if (MSG_FRAG_STATE_DATA <= fragq->state)
1432 {
1433 /* Check if there's a missing message before the current one */
1434 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1435 {
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1437
1438 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1439 && (chn->max_message_id != msg_id - 1
1440 && chn->max_message_id != msg_id))
1441 {
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "%p Out of order message. "
1444 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1445 chn, chn->max_message_id, msg_id);
1446 break;
1447 // FIXME: keep track of messages processed in this queue run,
1448 // and only stop after reaching the end
1449 }
1450 }
1451 else
1452 {
1453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1454 if (GNUNET_YES != fragq->state_is_modified)
1455 {
1456 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1457 {
1458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1459 "%p Out of order stateful message. "
1460 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1461 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1462 break;
1463 // FIXME: keep track of messages processed in this queue run,
1464 // and only stop after reaching the end
1465 }
1466
1467 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1468 mcls->channel = chn;
1469 mcls->msg_id = msg_id;
1470 mcls->msg_id_hash = msg_id_hash;
1471
1472 /* Apply modifiers to state in PSYCstore */
1473 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1474 fragq->state_delta,
1475 store_recv_state_modify_result, mcls);
1476 break; // continue after asynchronous state modify result
1477 }
1478 }
1479 chn->max_message_id = msg_id;
1480 }
1481 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1482 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1483 n++;
1484 }
1485
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1488 return n;
1489}
1490
1491
1492/**
1493 * Drop message queue of a channel.
1494 *
1495 * Remove all messages in queue without sending it to clients.
1496 *
1497 * @param chn Channel.
1498 *
1499 * @return Number of messages removed from queue.
1500 */
1501static uint64_t
1502message_queue_drop (struct Channel *chn)
1503{
1504 uint64_t n = 0;
1505 uint64_t msg_id;
1506 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1507 &msg_id))
1508 {
1509 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1511 struct GNUNET_HashCode msg_id_hash;
1512 hash_key_from_hll (&msg_id_hash, msg_id);
1513
1514 struct FragmentQueue *
1515 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1516 GNUNET_assert (NULL != fragq);
1517 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1518 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1519 n++;
1520 }
1521 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1523 return n;
1524}
1525
1526
1527/**
1528 * Received result of GNUNET_PSYCSTORE_fragment_store().
1529 */
1530static void
1531store_recv_fragment_store_result (void *cls, int64_t result,
1532 const char *err_msg, uint16_t err_msg_size)
1533{
1534 struct Channel *chn = cls;
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1537 chn, result, err_msg_size, err_msg);
1538}
1539
1540
1541/**
1542 * Handle incoming message fragment from multicast.
1543 *
1544 * Store it using PSYCstore and send it to the clients of the channel in order.
1545 */
1546static void
1547mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1548{
1549 struct Channel *chn = cls;
1550 uint16_t size = ntohs (mmsg->header.size);
1551
1552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553 "%p Received multicast message of size %u. "
1554 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1555 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1556 chn, size,
1557 GNUNET_ntohll (mmsg->fragment_id),
1558 GNUNET_ntohll (mmsg->message_id),
1559 GNUNET_ntohll (mmsg->fragment_offset),
1560 GNUNET_ntohll (mmsg->flags));
1561
1562 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1563 &store_recv_fragment_store_result, chn);
1564
1565 uint16_t first_ptype = 0, last_ptype = 0;
1566 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1567 (const char *) &mmsg[1],
1568 &first_ptype, &last_ptype);
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "%p Message check result %d, first part type %u, last part type %u\n",
1571 chn, check, first_ptype, last_ptype);
1572 if (GNUNET_SYSERR == check)
1573 {
1574 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1575 "%p Dropping incoming multicast message with invalid parts.\n",
1576 chn);
1577 GNUNET_break_op (0);
1578 return;
1579 }
1580
1581 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1582 message_queue_run (chn);
1583}
1584
1585
1586/**
1587 * Incoming request fragment from multicast for a master.
1588 *
1589 * @param cls Master.
1590 * @param req The request.
1591 */
1592static void
1593mcast_recv_request (void *cls,
1594 const struct GNUNET_MULTICAST_RequestHeader *req)
1595{
1596 struct Master *mst = cls;
1597 uint16_t size = ntohs (req->header.size);
1598
1599 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601 "%p Received multicast request of size %u from %s.\n",
1602 mst, size, str);
1603 GNUNET_free (str);
1604
1605 uint16_t first_ptype = 0, last_ptype = 0;
1606 if (GNUNET_SYSERR
1607 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1608 (const char *) &req[1],
1609 &first_ptype, &last_ptype))
1610 {
1611 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1612 "%p Dropping incoming multicast request with invalid parts.\n",
1613 mst);
1614 GNUNET_break_op (0);
1615 return;
1616 }
1617
1618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619 "Message parts: first: type %u, last: type %u\n",
1620 first_ptype, last_ptype);
1621
1622 /* FIXME: in-order delivery */
1623 client_send_mcast_req (mst, req);
1624}
1625
1626
1627/**
1628 * Response from PSYCstore with the current counter values for a channel master.
1629 */
1630static void
1631store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1632 uint64_t max_message_id, uint64_t max_group_generation,
1633 uint64_t max_state_message_id)
1634{
1635 struct Master *mst = cls;
1636 struct Channel *chn = &mst->channel;
1637 chn->store_op = NULL;
1638
1639 struct GNUNET_PSYC_CountersResultMessage res;
1640 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1641 res.header.size = htons (sizeof (res));
1642 res.result_code = htonl (result);
1643 res.max_message_id = GNUNET_htonll (max_message_id);
1644
1645 if (GNUNET_OK == result || GNUNET_NO == result)
1646 {
1647 mst->max_message_id = max_message_id;
1648 chn->max_message_id = max_message_id;
1649 chn->max_state_message_id = max_state_message_id;
1650 mst->max_group_generation = max_group_generation;
1651 mst->origin
1652 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1653 mcast_recv_join_request,
1654 mcast_recv_replay_fragment,
1655 mcast_recv_replay_message,
1656 mcast_recv_request,
1657 mcast_recv_message, chn);
1658 chn->is_ready = GNUNET_YES;
1659 }
1660 else
1661 {
1662 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1663 "%p GNUNET_PSYCSTORE_counters_get() "
1664 "returned %d for channel %s.\n",
1665 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1666 }
1667
1668 client_send_msg (chn, &res.header);
1669}
1670
1671
1672/**
1673 * Response from PSYCstore with the current counter values for a channel slave.
1674 */
1675void
1676store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1677 uint64_t max_message_id, uint64_t max_group_generation,
1678 uint64_t max_state_message_id)
1679{
1680 struct Slave *slv = cls;
1681 struct Channel *chn = &slv->channel;
1682 chn->store_op = NULL;
1683
1684 struct GNUNET_PSYC_CountersResultMessage res;
1685 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1686 res.header.size = htons (sizeof (res));
1687 res.result_code = htonl (result);
1688 res.max_message_id = GNUNET_htonll (max_message_id);
1689
1690 if (GNUNET_YES == result || GNUNET_NO == result)
1691 {
1692 chn->max_message_id = max_message_id;
1693 chn->max_state_message_id = max_state_message_id;
1694 slv->member
1695 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1696 &slv->origin,
1697 slv->relay_count, slv->relays,
1698 &slv->join_msg->header,
1699 mcast_recv_join_request,
1700 mcast_recv_join_decision,
1701 mcast_recv_replay_fragment,
1702 mcast_recv_replay_message,
1703 mcast_recv_message, chn);
1704 if (NULL != slv->join_msg)
1705 {
1706 GNUNET_free (slv->join_msg);
1707 slv->join_msg = NULL;
1708 }
1709 }
1710 else
1711 {
1712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713 "%p GNUNET_PSYCSTORE_counters_get() "
1714 "returned %d for channel %s.\n",
1715 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1716 }
1717
1718 client_send_msg (chn, &res.header);
1719}
1720
1721
1722static void
1723channel_init (struct Channel *chn)
1724{
1725 chn->recv_msgs
1726 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1727 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1728}
1729
1730
1731/**
1732 * Handle a connecting client starting a channel master.
1733 */
1734static void
1735handle_client_master_start (void *cls,
1736 const struct MasterStartRequest *req)
1737{
1738 struct Client *c = cls;
1739 struct GNUNET_SERVICE_Client *client = c->client;
1740
1741 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1742 struct GNUNET_HashCode pub_key_hash;
1743
1744 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1745 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1746
1747 struct Master *
1748 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1749 struct Channel *chn;
1750
1751 if (NULL == mst)
1752 {
1753 mst = GNUNET_malloc (sizeof (*mst));
1754 mst->policy = ntohl (req->policy);
1755 mst->priv_key = req->channel_key;
1756 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1757
1758 chn = c->channel = &mst->channel;
1759 chn->master = mst;
1760 chn->is_master = GNUNET_YES;
1761 chn->pub_key = pub_key;
1762 chn->pub_key_hash = pub_key_hash;
1763 channel_init (chn);
1764
1765 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1766 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1767 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1768 store_recv_master_counters, mst);
1769 }
1770 else
1771 {
1772 chn = &mst->channel;
1773
1774 struct GNUNET_PSYC_CountersResultMessage *res;
1775 struct GNUNET_MQ_Envelope *
1776 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1777 res->result_code = htonl (GNUNET_OK);
1778 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1779
1780 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1781 }
1782
1783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784 "%p Client connected as master to channel %s.\n",
1785 mst, GNUNET_h2s (&chn->pub_key_hash));
1786
1787 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1788 cli->client = client;
1789 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1790
1791 GNUNET_SERVICE_client_continue (client);
1792}
1793
1794
1795static int
1796check_client_slave_join (void *cls,
1797 const struct SlaveJoinRequest *req)
1798{
1799 return GNUNET_OK;
1800}
1801
1802
1803/**
1804 * Handle a connecting client joining as a channel slave.
1805 */
1806static void
1807handle_client_slave_join (void *cls,
1808 const struct SlaveJoinRequest *req)
1809{
1810 struct Client *c = cls;
1811 struct GNUNET_SERVICE_Client *client = c->client;
1812
1813 uint16_t req_size = ntohs (req->header.size);
1814
1815 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1816 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1817
1818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819 "got join request from client %p\n",
1820 client);
1821 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1822 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1823 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1824
1825 struct GNUNET_CONTAINER_MultiHashMap *
1826 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1827 struct Slave *slv = NULL;
1828 struct Channel *chn;
1829
1830 if (NULL != chn_slv)
1831 {
1832 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1833 }
1834 if (NULL == slv)
1835 {
1836 slv = GNUNET_malloc (sizeof (*slv));
1837 slv->priv_key = req->slave_key;
1838 slv->pub_key = slv_pub_key;
1839 slv->pub_key_hash = slv_pub_hash;
1840 slv->origin = req->origin;
1841 slv->relay_count = ntohl (req->relay_count);
1842 slv->join_flags = ntohl (req->flags);
1843
1844 const struct GNUNET_PeerIdentity *
1845 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1846 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1847 uint16_t join_msg_size = 0;
1848
1849 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1850 <= req_size)
1851 {
1852 struct GNUNET_PSYC_Message *
1853 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1854 join_msg_size = ntohs (join_msg->header.size);
1855 slv->join_msg = GNUNET_malloc (join_msg_size);
1856 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1857 }
1858 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1859 {
1860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1861 "%u + %u + %u != %u\n",
1862 (unsigned int) sizeof (*req),
1863 relay_size,
1864 join_msg_size,
1865 req_size);
1866 GNUNET_break (0);
1867 GNUNET_SERVICE_client_drop (client);
1868 GNUNET_free (slv);
1869 return;
1870 }
1871 if (0 < slv->relay_count)
1872 {
1873 slv->relays = GNUNET_malloc (relay_size);
1874 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1875 }
1876
1877 chn = c->channel = &slv->channel;
1878 chn->slave = slv;
1879 chn->is_master = GNUNET_NO;
1880 chn->pub_key = req->channel_pub_key;
1881 chn->pub_key_hash = pub_key_hash;
1882 channel_init (chn);
1883
1884 if (NULL == chn_slv)
1885 {
1886 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1887 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1888 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1889 }
1890 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1891 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1892 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1893 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1894 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1895 &store_recv_slave_counters, slv);
1896 }
1897 else
1898 {
1899 chn = &slv->channel;
1900
1901 struct GNUNET_PSYC_CountersResultMessage *res;
1902
1903 struct GNUNET_MQ_Envelope *
1904 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1905 res->result_code = htonl (GNUNET_OK);
1906 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1907
1908 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1909
1910 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1911 {
1912 mcast_recv_join_decision (slv, GNUNET_YES,
1913 NULL, 0, NULL, NULL);
1914 }
1915 else if (NULL == slv->member)
1916 {
1917 slv->member
1918 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1919 &slv->origin,
1920 slv->relay_count, slv->relays,
1921 &slv->join_msg->header,
1922 &mcast_recv_join_request,
1923 &mcast_recv_join_decision,
1924 &mcast_recv_replay_fragment,
1925 &mcast_recv_replay_message,
1926 &mcast_recv_message, chn);
1927 if (NULL != slv->join_msg)
1928 {
1929 GNUNET_free (slv->join_msg);
1930 slv->join_msg = NULL;
1931 }
1932 }
1933 else if (NULL != slv->join_dcsn)
1934 {
1935 struct GNUNET_MQ_Envelope *
1936 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1937 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1938 }
1939 }
1940
1941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942 "Client %p connected as slave to channel %s.\n",
1943 client,
1944 GNUNET_h2s (&chn->pub_key_hash));
1945
1946 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1947 cli->client = client;
1948 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1949
1950 GNUNET_SERVICE_client_continue (client);
1951}
1952
1953
1954struct JoinDecisionClosure
1955{
1956 int32_t is_admitted;
1957 struct GNUNET_MessageHeader *msg;
1958};
1959
1960
1961/**
1962 * Iterator callback for sending join decisions to multicast.
1963 */
1964static int
1965mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1966 void *value)
1967{
1968 struct JoinDecisionClosure *jcls = cls;
1969 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1970 // FIXME: add relays
1971 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1972 return GNUNET_YES;
1973}
1974
1975
1976static int
1977check_client_join_decision (void *cls,
1978 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1979{
1980 return GNUNET_OK;
1981}
1982
1983
1984/**
1985 * Join decision from client.
1986 */
1987static void
1988handle_client_join_decision (void *cls,
1989 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1990{
1991 struct Client *c = cls;
1992 struct GNUNET_SERVICE_Client *client = c->client;
1993 struct Channel *chn = c->channel;
1994 if (NULL == chn)
1995 {
1996 GNUNET_break (0);
1997 GNUNET_SERVICE_client_drop (client);
1998 return;
1999 }
2000 GNUNET_assert (GNUNET_YES == chn->is_master);
2001 struct Master *mst = chn->master;
2002
2003 struct JoinDecisionClosure jcls;
2004 jcls.is_admitted = ntohl (dcsn->is_admitted);
2005 jcls.msg
2006 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2007 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2008 : NULL;
2009
2010 struct GNUNET_HashCode slave_pub_hash;
2011 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2012 &slave_pub_hash);
2013
2014 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015 "%p Got join decision (%d) from client for channel %s..\n",
2016 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018 "%p ..and slave %s.\n",
2019 mst, GNUNET_h2s (&slave_pub_hash));
2020
2021 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2022 &mcast_send_join_decision, &jcls);
2023 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2024 GNUNET_SERVICE_client_continue (client);
2025}
2026
2027
2028static void
2029channel_part_cb (void *cls)
2030{
2031 struct GNUNET_SERVICE_Client *client = cls;
2032 struct GNUNET_MQ_Envelope *env;
2033
2034 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2035 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2036 env);
2037}
2038
2039
2040static void
2041handle_client_part_request (void *cls,
2042 const struct GNUNET_MessageHeader *msg)
2043{
2044 struct Client *c = cls;
2045
2046 c->channel->is_disconnecting = GNUNET_YES;
2047 if (GNUNET_YES == c->channel->is_master)
2048 {
2049 struct Master *mst = (struct Master *) c->channel;
2050
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "Got part request from master %p\n",
2053 mst);
2054 GNUNET_assert (NULL != mst->origin);
2055 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2056 }
2057 else
2058 {
2059 struct Slave *slv = (struct Slave *) c->channel;
2060
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Got part request from slave %p\n",
2063 slv);
2064 GNUNET_assert (NULL != slv->member);
2065 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2066 }
2067 GNUNET_SERVICE_client_continue (c->client);
2068}
2069
2070
2071/**
2072 * Send acknowledgement to a client.
2073 *
2074 * Sent after a message fragment has been passed on to multicast.
2075 *
2076 * @param chn The channel struct for the client.
2077 */
2078static void
2079send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2080{
2081 struct GNUNET_MessageHeader *res;
2082 struct GNUNET_MQ_Envelope *
2083 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2084
2085 /* FIXME? */
2086 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2087}
2088
2089
2090/**
2091 * Callback for the transmit functions of multicast.
2092 */
2093static int
2094transmit_notify (void *cls, size_t *data_size, void *data)
2095{
2096 struct Channel *chn = cls;
2097 struct TransmitMessage *tmit_msg = chn->tmit_head;
2098
2099 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2100 {
2101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102 "%p transmit_notify: nothing to send.\n", chn);
2103 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2104 GNUNET_break (0);
2105 *data_size = 0;
2106 return GNUNET_NO;
2107 }
2108
2109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2111
2112 *data_size = tmit_msg->size;
2113 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2114
2115 int ret
2116 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2117 ? GNUNET_NO
2118 : GNUNET_YES;
2119
2120 /* FIXME: handle disconnecting clients */
2121 if (NULL != tmit_msg->client)
2122 send_message_ack (chn, tmit_msg->client);
2123
2124 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2125
2126 if (NULL != chn->tmit_head)
2127 {
2128 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2129 }
2130 else if (GNUNET_YES == chn->is_disconnecting
2131 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2132 {
2133 /* FIXME: handle partial message (when still in_transmit) */
2134 GNUNET_free (tmit_msg);
2135 return GNUNET_SYSERR;
2136 }
2137 GNUNET_free (tmit_msg);
2138 return ret;
2139}
2140
2141
2142/**
2143 * Callback for the transmit functions of multicast.
2144 */
2145static int
2146master_transmit_notify (void *cls, size_t *data_size, void *data)
2147{
2148 int ret = transmit_notify (cls, data_size, data);
2149
2150 if (GNUNET_YES == ret)
2151 {
2152 struct Master *mst = cls;
2153 mst->tmit_handle = NULL;
2154 }
2155 return ret;
2156}
2157
2158
2159/**
2160 * Callback for the transmit functions of multicast.
2161 */
2162static int
2163slave_transmit_notify (void *cls, size_t *data_size, void *data)
2164{
2165 int ret = transmit_notify (cls, data_size, data);
2166
2167 if (GNUNET_YES == ret)
2168 {
2169 struct Slave *slv = cls;
2170 slv->tmit_handle = NULL;
2171 }
2172 return ret;
2173}
2174
2175
2176/**
2177 * Transmit a message from a channel master to the multicast group.
2178 */
2179static void
2180master_transmit_message (struct Master *mst)
2181{
2182 struct Channel *chn = &mst->channel;
2183 struct TransmitMessage *tmit_msg = chn->tmit_head;
2184 if (NULL == tmit_msg)
2185 return;
2186 if (NULL == mst->tmit_handle)
2187 {
2188 mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2189 tmit_msg->id,
2190 mst->max_group_generation,
2191 &master_transmit_notify,
2192 mst);
2193 }
2194 else
2195 {
2196 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2197 }
2198}
2199
2200
2201/**
2202 * Transmit a message from a channel slave to the multicast group.
2203 */
2204static void
2205slave_transmit_message (struct Slave *slv)
2206{
2207 if (NULL == slv->channel.tmit_head)
2208 return;
2209 if (NULL == slv->tmit_handle)
2210 {
2211 slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2212 slv->channel.tmit_head->id,
2213 &slave_transmit_notify,
2214 slv);
2215 }
2216 else
2217 {
2218 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2219 }
2220}
2221
2222
2223static void
2224transmit_message (struct Channel *chn)
2225{
2226 chn->is_master
2227 ? master_transmit_message (chn->master)
2228 : slave_transmit_message (chn->slave);
2229}
2230
2231
2232/**
2233 * Queue a message from a channel master for sending to the multicast group.
2234 */
2235static void
2236master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2237{
2238 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2239 {
2240 tmit_msg->id = ++mst->max_message_id;
2241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242 "%p master_queue_message: message_id=%" PRIu64 "\n",
2243 mst, tmit_msg->id);
2244 struct GNUNET_PSYC_MessageMethod *pmeth
2245 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2246
2247 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2248 {
2249 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2250 }
2251 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2252 {
2253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2255 mst, tmit_msg->id - mst->max_state_message_id);
2256 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2257 - mst->max_state_message_id);
2258 mst->max_state_message_id = tmit_msg->id;
2259 }
2260 else
2261 {
2262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2263 "%p master_queue_message: state not modified\n", mst);
2264 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2265 }
2266
2267 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2268 {
2269 /// @todo add state_hash to PSYC header
2270 }
2271 }
2272}
2273
2274
2275/**
2276 * Queue a message from a channel slave for sending to the multicast group.
2277 */
2278static void
2279slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2280{
2281 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2282 {
2283 struct GNUNET_PSYC_MessageMethod *pmeth
2284 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2285 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2286 tmit_msg->id = ++slv->max_request_id;
2287 }
2288}
2289
2290
2291/**
2292 * Queue PSYC message parts for sending to multicast.
2293 *
2294 * @param chn
2295 * Channel to send to.
2296 * @param client
2297 * Client the message originates from.
2298 * @param data_size
2299 * Size of @a data.
2300 * @param data
2301 * Concatenated message parts.
2302 * @param first_ptype
2303 * First message part type in @a data.
2304 * @param last_ptype
2305 * Last message part type in @a data.
2306 */
2307static struct TransmitMessage *
2308queue_message (struct Channel *chn,
2309 struct GNUNET_SERVICE_Client *client,
2310 size_t data_size,
2311 const void *data,
2312 uint16_t first_ptype, uint16_t last_ptype)
2313{
2314 struct TransmitMessage *
2315 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2316 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2317 tmit_msg->client = client;
2318 tmit_msg->size = data_size;
2319 tmit_msg->first_ptype = first_ptype;
2320 tmit_msg->last_ptype = last_ptype;
2321
2322 /* FIXME: separate queue per message ID */
2323
2324 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2325
2326 chn->is_master
2327 ? master_queue_message (chn->master, tmit_msg)
2328 : slave_queue_message (chn->slave, tmit_msg);
2329 return tmit_msg;
2330}
2331
2332
2333/**
2334 * Cancel transmission of current message.
2335 *
2336 * @param chn Channel to send to.
2337 * @param client Client the message originates from.
2338 */
2339static void
2340transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2341{
2342 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2343
2344 struct GNUNET_MessageHeader msg;
2345 msg.size = htons (sizeof (msg));
2346 msg.type = htons (type);
2347
2348 queue_message (chn, client, sizeof (msg), &msg, type, type);
2349 transmit_message (chn);
2350
2351 /* FIXME: cleanup */
2352}
2353
2354
2355static int
2356check_client_psyc_message (void *cls,
2357 const struct GNUNET_MessageHeader *msg)
2358{
2359 return GNUNET_OK;
2360}
2361
2362
2363/**
2364 * Incoming message from a master or slave client.
2365 */
2366static void
2367handle_client_psyc_message (void *cls,
2368 const struct GNUNET_MessageHeader *msg)
2369{
2370 struct Client *c = cls;
2371 struct GNUNET_SERVICE_Client *client = c->client;
2372 struct Channel *chn = c->channel;
2373 if (NULL == chn)
2374 {
2375 GNUNET_break (0);
2376 GNUNET_SERVICE_client_drop (client);
2377 return;
2378 }
2379
2380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381 "%p Received message from client.\n", chn);
2382 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2383
2384 if (GNUNET_YES != chn->is_ready)
2385 {
2386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2387 "%p Channel is not ready yet, disconnecting client %p.\n",
2388 chn,
2389 client);
2390 GNUNET_break (0);
2391 GNUNET_SERVICE_client_drop (client);
2392 return;
2393 }
2394
2395 uint16_t size = ntohs (msg->size);
2396 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2397 {
2398 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2399 "%p Message payload too large: %u < %u.\n",
2400 chn,
2401 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2402 (unsigned int) (size - sizeof (*msg)));
2403 GNUNET_break (0);
2404 transmit_cancel (chn, client);
2405 GNUNET_SERVICE_client_drop (client);
2406 return;
2407 }
2408
2409 uint16_t first_ptype = 0, last_ptype = 0;
2410 if (GNUNET_SYSERR
2411 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2412 (const char *) &msg[1],
2413 &first_ptype, &last_ptype))
2414 {
2415 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2416 "%p Received invalid message part from client.\n", chn);
2417 GNUNET_break (0);
2418 transmit_cancel (chn, client);
2419 GNUNET_SERVICE_client_drop (client);
2420 return;
2421 }
2422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2423 "%p Received message with first part type %u and last part type %u.\n",
2424 chn, first_ptype, last_ptype);
2425
2426 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2427 first_ptype, last_ptype);
2428 transmit_message (chn);
2429 /* FIXME: send a few ACKs even before transmit_notify is called */
2430
2431 GNUNET_SERVICE_client_continue (client);
2432};
2433
2434
2435/**
2436 * Received result of GNUNET_PSYCSTORE_membership_store()
2437 */
2438static void
2439store_recv_membership_store_result (void *cls,
2440 int64_t result,
2441 const char *err_msg,
2442 uint16_t err_msg_size)
2443{
2444 struct Operation *op = cls;
2445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2447 op->channel,
2448 result,
2449 (int) err_msg_size,
2450 err_msg);
2451
2452 if (NULL != op->client)
2453 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2454 op_remove (op);
2455}
2456
2457
2458/**
2459 * Client requests to add/remove a slave in the membership database.
2460 */
2461static void
2462handle_client_membership_store (void *cls,
2463 const struct ChannelMembershipStoreRequest *req)
2464{
2465 struct Client *c = cls;
2466 struct GNUNET_SERVICE_Client *client = c->client;
2467 struct Channel *chn = c->channel;
2468 if (NULL == chn)
2469 {
2470 GNUNET_break (0);
2471 GNUNET_SERVICE_client_drop (client);
2472 return;
2473 }
2474
2475 struct Operation *op = op_add (chn, client, req->op_id, 0);
2476
2477 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2478 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480 "%p Received membership store request from client.\n", chn);
2481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2483 chn, req->did_join, announced_at, effective_since);
2484
2485 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2486 req->did_join, announced_at, effective_since,
2487 0, /* FIXME: group_generation */
2488 &store_recv_membership_store_result, op);
2489 GNUNET_SERVICE_client_continue (client);
2490}
2491
2492
2493/**
2494 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2495 * in response to a history request from a client.
2496 */
2497static int
2498store_recv_fragment_history (void *cls,
2499 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2500 enum GNUNET_PSYCSTORE_MessageFlags flags)
2501{
2502 struct Operation *op = cls;
2503 if (NULL == op->client)
2504 { /* Requesting client already disconnected. */
2505 return GNUNET_NO;
2506 }
2507 struct Channel *chn = op->channel;
2508
2509 struct GNUNET_PSYC_MessageHeader *pmsg;
2510 uint16_t msize = ntohs (mmsg->header.size);
2511 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2512
2513 struct GNUNET_OperationResultMessage *
2514 res = GNUNET_malloc (sizeof (*res) + psize);
2515 res->header.size = htons (sizeof (*res) + psize);
2516 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2517 res->op_id = op->op_id;
2518 res->result_code = GNUNET_htonll (GNUNET_OK);
2519
2520 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2521 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2522 GNUNET_memcpy (&res[1], pmsg, psize);
2523
2524 /** @todo FIXME: send only to requesting client */
2525 client_send_msg (chn, &res->header);
2526
2527 GNUNET_free (res);
2528 return GNUNET_YES;
2529}
2530
2531
2532/**
2533 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2534 * in response to a history request from a client.
2535 */
2536static void
2537store_recv_fragment_history_result (void *cls, int64_t result,
2538 const char *err_msg, uint16_t err_msg_size)
2539{
2540 struct Operation *op = cls;
2541 if (NULL == op->client)
2542 { /* Requesting client already disconnected. */
2543 return;
2544 }
2545
2546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547 "%p History replay #%" PRIu64 ": "
2548 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2549 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2550
2551 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2552 {
2553 /** @todo Multicast replay request for messages not found locally. */
2554 }
2555
2556 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2557 op_remove (op);
2558}
2559
2560
2561static int
2562check_client_history_replay (void *cls,
2563 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2564{
2565 return GNUNET_OK;
2566}
2567
2568
2569/**
2570 * Client requests channel history.
2571 */
2572static void
2573handle_client_history_replay (void *cls,
2574 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2575{
2576 struct Client *c = cls;
2577 struct GNUNET_SERVICE_Client *client = c->client;
2578 struct Channel *chn = c->channel;
2579 if (NULL == chn)
2580 {
2581 GNUNET_break (0);
2582 GNUNET_SERVICE_client_drop (client);
2583 return;
2584 }
2585
2586 uint16_t size = ntohs (req->header.size);
2587 const char *method_prefix = (const char *) &req[1];
2588
2589 if (size < sizeof (*req) + 1
2590 || '\0' != method_prefix[size - sizeof (*req) - 1])
2591 {
2592 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2593 "%p History replay #%" PRIu64 ": "
2594 "invalid method prefix. size: %u < %u?\n",
2595 chn,
2596 GNUNET_ntohll (req->op_id),
2597 size,
2598 (unsigned int) sizeof (*req) + 1);
2599 GNUNET_break (0);
2600 GNUNET_SERVICE_client_drop (client);
2601 return;
2602 }
2603
2604 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2605
2606 if (0 == req->message_limit)
2607 {
2608 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2609 GNUNET_ntohll (req->start_message_id),
2610 GNUNET_ntohll (req->end_message_id),
2611 0, method_prefix,
2612 &store_recv_fragment_history,
2613 &store_recv_fragment_history_result, op);
2614 }
2615 else
2616 {
2617 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2618 GNUNET_ntohll (req->message_limit),
2619 method_prefix,
2620 &store_recv_fragment_history,
2621 &store_recv_fragment_history_result,
2622 op);
2623 }
2624 GNUNET_SERVICE_client_continue (client);
2625}
2626
2627
2628/**
2629 * Received state var from PSYCstore, send it to client.
2630 */
2631static int
2632store_recv_state_var (void *cls, const char *name,
2633 const void *value, uint32_t value_size)
2634{
2635 struct Operation *op = cls;
2636 struct GNUNET_OperationResultMessage *res;
2637 struct GNUNET_MQ_Envelope *env;
2638
2639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2640 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2641 op->channel, GNUNET_ntohll (op->op_id), name);
2642
2643 if (NULL != name) /* First part */
2644 {
2645 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2646 struct GNUNET_PSYC_MessageModifier *mod;
2647 env = GNUNET_MQ_msg_extra (res,
2648 sizeof (*mod) + name_size + value_size,
2649 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2650 res->op_id = op->op_id;
2651
2652 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2653 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2654 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2655 mod->name_size = htons (name_size);
2656 mod->value_size = htonl (value_size);
2657 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2658 GNUNET_memcpy (&mod[1], name, name_size);
2659 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2660 }
2661 else /* Continuation */
2662 {
2663 struct GNUNET_MessageHeader *mod;
2664 env = GNUNET_MQ_msg_extra (res,
2665 sizeof (*mod) + value_size,
2666 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2667 res->op_id = op->op_id;
2668
2669 mod = (struct GNUNET_MessageHeader *) &res[1];
2670 mod->size = htons (sizeof (*mod) + value_size);
2671 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2672 GNUNET_memcpy (&mod[1], value, value_size);
2673 }
2674
2675 // FIXME: client might have been disconnected
2676 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2677 return GNUNET_YES;
2678}
2679
2680
2681/**
2682 * Received result of GNUNET_PSYCSTORE_state_get()
2683 * or GNUNET_PSYCSTORE_state_get_prefix()
2684 */
2685static void
2686store_recv_state_result (void *cls, int64_t result,
2687 const char *err_msg, uint16_t err_msg_size)
2688{
2689 struct Operation *op = cls;
2690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2691 "%p state_get #%" PRIu64 ": "
2692 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2693 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2694
2695 // FIXME: client might have been disconnected
2696 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2697 op_remove (op);
2698}
2699
2700
2701static int
2702check_client_state_get (void *cls,
2703 const struct StateRequest *req)
2704{
2705 struct Client *c = cls;
2706 struct Channel *chn = c->channel;
2707 if (NULL == chn)
2708 {
2709 GNUNET_break (0);
2710 return GNUNET_SYSERR;
2711 }
2712
2713 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2714 const char *name = (const char *) &req[1];
2715 if (0 == name_size || '\0' != name[name_size - 1])
2716 {
2717 GNUNET_break (0);
2718 return GNUNET_SYSERR;
2719 }
2720
2721 return GNUNET_OK;
2722}
2723
2724
2725/**
2726 * Client requests best matching state variable from PSYCstore.
2727 */
2728static void
2729handle_client_state_get (void *cls,
2730 const struct StateRequest *req)
2731{
2732 struct Client *c = cls;
2733 struct GNUNET_SERVICE_Client *client = c->client;
2734 struct Channel *chn = c->channel;
2735
2736 const char *name = (const char *) &req[1];
2737 struct Operation *op = op_add (chn, client, req->op_id, 0);
2738 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2739 &store_recv_state_var,
2740 &store_recv_state_result, op);
2741 GNUNET_SERVICE_client_continue (client);
2742}
2743
2744
2745static int
2746check_client_state_get_prefix (void *cls,
2747 const struct StateRequest *req)
2748{
2749 struct Client *c = cls;
2750 struct Channel *chn = c->channel;
2751 if (NULL == chn)
2752 {
2753 GNUNET_break (0);
2754 return GNUNET_SYSERR;
2755 }
2756
2757 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2758 const char *name = (const char *) &req[1];
2759 if (0 == name_size || '\0' != name[name_size - 1])
2760 {
2761 GNUNET_break (0);
2762 return GNUNET_SYSERR;
2763 }
2764
2765 return GNUNET_OK;
2766}
2767
2768
2769/**
2770 * Client requests state variables with a given prefix from PSYCstore.
2771 */
2772static void
2773handle_client_state_get_prefix (void *cls,
2774 const struct StateRequest *req)
2775{
2776 struct Client *c = cls;
2777 struct GNUNET_SERVICE_Client *client = c->client;
2778 struct Channel *chn = c->channel;
2779
2780 const char *name = (const char *) &req[1];
2781 struct Operation *op = op_add (chn, client, req->op_id, 0);
2782 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2783 &store_recv_state_var,
2784 &store_recv_state_result, op);
2785 GNUNET_SERVICE_client_continue (client);
2786}
2787
2788
2789/**
2790 * Initialize the PSYC service.
2791 *
2792 * @param cls Closure.
2793 * @param server The initialized server.
2794 * @param c Configuration to use.
2795 */
2796static void
2797run (void *cls,
2798 const struct GNUNET_CONFIGURATION_Handle *c,
2799 struct GNUNET_SERVICE_Handle *svc)
2800{
2801 cfg = c;
2802 service = svc;
2803 store = GNUNET_PSYCSTORE_connect (cfg);
2804 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2805 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2806 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2807 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2808 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2809 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2810}
2811
2812
2813/**
2814 * Define "main" method using service macro.
2815 */
2816GNUNET_SERVICE_MAIN
2817("psyc",
2818 GNUNET_SERVICE_OPTION_NONE,
2819 &run,
2820 &client_notify_connect,
2821 &client_notify_disconnect,
2822 NULL,
2823 GNUNET_MQ_hd_fixed_size (client_master_start,
2824 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2825 struct MasterStartRequest,
2826 NULL),
2827 GNUNET_MQ_hd_var_size (client_slave_join,
2828 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2829 struct SlaveJoinRequest,
2830 NULL),
2831 GNUNET_MQ_hd_var_size (client_join_decision,
2832 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2833 struct GNUNET_PSYC_JoinDecisionMessage,
2834 NULL),
2835 GNUNET_MQ_hd_fixed_size (client_part_request,
2836 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2837 struct GNUNET_MessageHeader,
2838 NULL),
2839 GNUNET_MQ_hd_var_size (client_psyc_message,
2840 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2841 struct GNUNET_MessageHeader,
2842 NULL),
2843 GNUNET_MQ_hd_fixed_size (client_membership_store,
2844 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2845 struct ChannelMembershipStoreRequest,
2846 NULL),
2847 GNUNET_MQ_hd_var_size (client_history_replay,
2848 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2849 struct GNUNET_PSYC_HistoryRequestMessage,
2850 NULL),
2851 GNUNET_MQ_hd_var_size (client_state_get,
2852 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2853 struct StateRequest,
2854 NULL),
2855 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2856 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2857 struct StateRequest,
2858 NULL));
2859
2860/* end of gnunet-service-psyc.c */