diff options
Diffstat (limited to 'src/psyc/gnunet-service-psyc.c')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 2860 |
1 files changed, 2860 insertions, 0 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c new file mode 100644 index 0000000..6f2f7a9 --- /dev/null +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -0,0 +1,2860 @@ | |||
1 | /* | ||
2 | * This file is part of GNUnet | ||
3 | * Copyright (C) 2013 GNUnet e.V. | ||
4 | * | ||
5 | * GNUnet is free software: you can redistribute it and/or modify it | ||
6 | * under the terms of the GNU Affero General Public License as published | ||
7 | * by the Free Software Foundation, either version 3 of the License, | ||
8 | * or (at your option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * Affero General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU Affero General Public License | ||
16 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psyc/gnunet-service-psyc.c | ||
23 | * @brief PSYC service | ||
24 | * @author Gabor X Toth | ||
25 | */ | ||
26 | |||
27 | #include <inttypes.h> | ||
28 | |||
29 | #include "platform.h" | ||
30 | #include "gnunet_util_lib.h" | ||
31 | #include "gnunet_constants.h" | ||
32 | #include "gnunet_protocols.h" | ||
33 | #include "gnunet_statistics_service.h" | ||
34 | #include "gnunet_multicast_service.h" | ||
35 | #include "gnunet_psycstore_service.h" | ||
36 | #include "gnunet_psyc_service.h" | ||
37 | #include "gnunet_psyc_util_lib.h" | ||
38 | #include "psyc.h" | ||
39 | |||
40 | |||
41 | /** | ||
42 | * Handle to our current configuration. | ||
43 | */ | ||
44 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
45 | |||
46 | /** | ||
47 | * Service handle. | ||
48 | */ | ||
49 | static struct GNUNET_SERVICE_Handle *service; | ||
50 | |||
51 | /** | ||
52 | * Handle to the statistics service. | ||
53 | */ | ||
54 | static struct GNUNET_STATISTICS_Handle *stats; | ||
55 | |||
56 | /** | ||
57 | * Handle to the PSYCstore. | ||
58 | */ | ||
59 | static struct GNUNET_PSYCSTORE_Handle *store; | ||
60 | |||
61 | /** | ||
62 | * All connected masters. | ||
63 | * Channel's pub_key_hash -> struct Master | ||
64 | */ | ||
65 | static struct GNUNET_CONTAINER_MultiHashMap *masters; | ||
66 | |||
67 | /** | ||
68 | * All connected slaves. | ||
69 | * Channel's pub_key_hash -> struct Slave | ||
70 | */ | ||
71 | static struct GNUNET_CONTAINER_MultiHashMap *slaves; | ||
72 | |||
73 | /** | ||
74 | * Connected slaves per channel. | ||
75 | * Channel's pub_key_hash -> Slave's pub_key -> struct Slave | ||
76 | */ | ||
77 | static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves; | ||
78 | |||
79 | |||
80 | /** | ||
81 | * Message in the transmission queue. | ||
82 | */ | ||
83 | struct TransmitMessage | ||
84 | { | ||
85 | struct TransmitMessage *prev; | ||
86 | struct TransmitMessage *next; | ||
87 | |||
88 | struct GNUNET_SERVICE_Client *client; | ||
89 | |||
90 | /** | ||
91 | * ID assigned to the message. | ||
92 | */ | ||
93 | uint64_t id; | ||
94 | |||
95 | /** | ||
96 | * Size of message. | ||
97 | */ | ||
98 | uint16_t size; | ||
99 | |||
100 | /** | ||
101 | * Type of first message part. | ||
102 | */ | ||
103 | uint16_t first_ptype; | ||
104 | |||
105 | /** | ||
106 | * Type of last message part. | ||
107 | */ | ||
108 | uint16_t last_ptype; | ||
109 | |||
110 | /* Followed by message */ | ||
111 | }; | ||
112 | |||
113 | |||
114 | /** | ||
115 | * Cache for received message fragments. | ||
116 | * Message fragments are only sent to clients after all modifiers arrived. | ||
117 | * | ||
118 | * chan_key -> MultiHashMap chan_msgs | ||
119 | */ | ||
120 | static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; | ||
121 | |||
122 | |||
123 | /** | ||
124 | * Entry in the chan_msgs hashmap of @a recv_cache: | ||
125 | * fragment_id -> RecvCacheEntry | ||
126 | */ | ||
127 | struct RecvCacheEntry | ||
128 | { | ||
129 | struct GNUNET_MULTICAST_MessageHeader *mmsg; | ||
130 | uint16_t ref_count; | ||
131 | }; | ||
132 | |||
133 | |||
134 | /** | ||
135 | * Entry in the @a recv_frags hash map of a @a Channel. | ||
136 | * message_id -> FragmentQueue | ||
137 | */ | ||
138 | struct FragmentQueue | ||
139 | { | ||
140 | /** | ||
141 | * Fragment IDs stored in @a recv_cache. | ||
142 | */ | ||
143 | struct GNUNET_CONTAINER_Heap *fragments; | ||
144 | |||
145 | /** | ||
146 | * Total size of received fragments. | ||
147 | */ | ||
148 | uint64_t size; | ||
149 | |||
150 | /** | ||
151 | * Total size of received header fragments (METHOD & MODIFIERs) | ||
152 | */ | ||
153 | uint64_t header_size; | ||
154 | |||
155 | /** | ||
156 | * The @a state_delta field from struct GNUNET_PSYC_MessageMethod. | ||
157 | */ | ||
158 | uint64_t state_delta; | ||
159 | |||
160 | /** | ||
161 | * The @a flags field from struct GNUNET_PSYC_MessageMethod. | ||
162 | */ | ||
163 | uint32_t flags; | ||
164 | |||
165 | /** | ||
166 | * Receive state of message. | ||
167 | * | ||
168 | * @see MessageFragmentState | ||
169 | */ | ||
170 | uint8_t state; | ||
171 | |||
172 | /** | ||
173 | * Whether the state is already modified in PSYCstore. | ||
174 | */ | ||
175 | uint8_t state_is_modified; | ||
176 | |||
177 | /** | ||
178 | * Is the message queued for delivery to the client? | ||
179 | * i.e. added to the recv_msgs queue | ||
180 | */ | ||
181 | uint8_t is_queued; | ||
182 | }; | ||
183 | |||
184 | |||
185 | /** | ||
186 | * List of connected clients. | ||
187 | */ | ||
188 | struct ClientList | ||
189 | { | ||
190 | struct ClientList *prev; | ||
191 | struct ClientList *next; | ||
192 | |||
193 | struct GNUNET_SERVICE_Client *client; | ||
194 | }; | ||
195 | |||
196 | |||
197 | struct Operation | ||
198 | { | ||
199 | struct Operation *prev; | ||
200 | struct Operation *next; | ||
201 | |||
202 | struct GNUNET_SERVICE_Client *client; | ||
203 | struct Channel *channel; | ||
204 | uint64_t op_id; | ||
205 | uint32_t flags; | ||
206 | }; | ||
207 | |||
208 | |||
209 | /** | ||
210 | * Common part of the client context for both a channel master and slave. | ||
211 | */ | ||
212 | struct Channel | ||
213 | { | ||
214 | struct ClientList *clients_head; | ||
215 | struct ClientList *clients_tail; | ||
216 | |||
217 | struct Operation *op_head; | ||
218 | struct Operation *op_tail; | ||
219 | |||
220 | struct TransmitMessage *tmit_head; | ||
221 | struct TransmitMessage *tmit_tail; | ||
222 | |||
223 | /** | ||
224 | * Current PSYCstore operation. | ||
225 | */ | ||
226 | struct GNUNET_PSYCSTORE_OperationHandle *store_op; | ||
227 | |||
228 | /** | ||
229 | * Received fragments not yet sent to the client. | ||
230 | * message_id -> FragmentQueue | ||
231 | */ | ||
232 | struct GNUNET_CONTAINER_MultiHashMap *recv_frags; | ||
233 | |||
234 | /** | ||
235 | * Received message IDs not yet sent to the client. | ||
236 | */ | ||
237 | struct GNUNET_CONTAINER_Heap *recv_msgs; | ||
238 | |||
239 | /** | ||
240 | * Public key of the channel. | ||
241 | */ | ||
242 | struct GNUNET_CRYPTO_EddsaPublicKey pub_key; | ||
243 | |||
244 | /** | ||
245 | * Hash of @a pub_key. | ||
246 | */ | ||
247 | struct GNUNET_HashCode pub_key_hash; | ||
248 | |||
249 | /** | ||
250 | * Last message ID sent to the client. | ||
251 | * 0 if there is no such message. | ||
252 | */ | ||
253 | uint64_t max_message_id; | ||
254 | |||
255 | /** | ||
256 | * ID of the last stateful message, where the state operations has been | ||
257 | * processed and saved to PSYCstore and which has been sent to the client. | ||
258 | * 0 if there is no such message. | ||
259 | */ | ||
260 | uint64_t max_state_message_id; | ||
261 | |||
262 | /** | ||
263 | * Expected value size for the modifier being received from the PSYC service. | ||
264 | */ | ||
265 | uint32_t tmit_mod_value_size_expected; | ||
266 | |||
267 | /** | ||
268 | * Actual value size for the modifier being received from the PSYC service. | ||
269 | */ | ||
270 | uint32_t tmit_mod_value_size; | ||
271 | |||
272 | /** | ||
273 | * Is this channel ready to receive messages from client? | ||
274 | * #GNUNET_YES or #GNUNET_NO | ||
275 | */ | ||
276 | uint8_t is_ready; | ||
277 | |||
278 | /** | ||
279 | * Is the client disconnected? | ||
280 | * #GNUNET_YES or #GNUNET_NO | ||
281 | */ | ||
282 | uint8_t is_disconnecting; | ||
283 | |||
284 | /** | ||
285 | * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? | ||
286 | */ | ||
287 | uint8_t is_master; | ||
288 | |||
289 | union { | ||
290 | struct Master *master; | ||
291 | struct Slave *slave; | ||
292 | }; | ||
293 | }; | ||
294 | |||
295 | |||
296 | /** | ||
297 | * Client context for a channel master. | ||
298 | */ | ||
299 | struct Master | ||
300 | { | ||
301 | /** | ||
302 | * Channel struct common for Master and Slave | ||
303 | */ | ||
304 | struct Channel channel; | ||
305 | |||
306 | /** | ||
307 | * Private key of the channel. | ||
308 | */ | ||
309 | struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; | ||
310 | |||
311 | /** | ||
312 | * Handle for the multicast origin. | ||
313 | */ | ||
314 | struct GNUNET_MULTICAST_Origin *origin; | ||
315 | |||
316 | /** | ||
317 | * Transmit handle for multicast. | ||
318 | */ | ||
319 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle; | ||
320 | |||
321 | /** | ||
322 | * Incoming join requests from multicast. | ||
323 | * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle * | ||
324 | */ | ||
325 | struct GNUNET_CONTAINER_MultiHashMap *join_reqs; | ||
326 | |||
327 | /** | ||
328 | * Last message ID transmitted to this channel. | ||
329 | * | ||
330 | * Incremented before sending a message, thus the message_id in messages sent | ||
331 | * starts from 1. | ||
332 | */ | ||
333 | uint64_t max_message_id; | ||
334 | |||
335 | /** | ||
336 | * ID of the last message with state operations transmitted to the channel. | ||
337 | * 0 if there is no such message. | ||
338 | */ | ||
339 | uint64_t max_state_message_id; | ||
340 | |||
341 | /** | ||
342 | * Maximum group generation transmitted to the channel. | ||
343 | */ | ||
344 | uint64_t max_group_generation; | ||
345 | |||
346 | /** | ||
347 | * @see enum GNUNET_PSYC_Policy | ||
348 | */ | ||
349 | enum GNUNET_PSYC_Policy policy; | ||
350 | }; | ||
351 | |||
352 | |||
353 | /** | ||
354 | * Client context for a channel slave. | ||
355 | */ | ||
356 | struct Slave | ||
357 | { | ||
358 | /** | ||
359 | * Channel struct common for Master and Slave | ||
360 | */ | ||
361 | struct Channel channel; | ||
362 | |||
363 | /** | ||
364 | * Private key of the slave. | ||
365 | */ | ||
366 | struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key; | ||
367 | |||
368 | /** | ||
369 | * Public key of the slave. | ||
370 | */ | ||
371 | struct GNUNET_CRYPTO_EcdsaPublicKey pub_key; | ||
372 | |||
373 | /** | ||
374 | * Hash of @a pub_key. | ||
375 | */ | ||
376 | struct GNUNET_HashCode pub_key_hash; | ||
377 | |||
378 | /** | ||
379 | * Handle for the multicast member. | ||
380 | */ | ||
381 | struct GNUNET_MULTICAST_Member *member; | ||
382 | |||
383 | /** | ||
384 | * Transmit handle for multicast. | ||
385 | */ | ||
386 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle; | ||
387 | |||
388 | /** | ||
389 | * Peer identity of the origin. | ||
390 | */ | ||
391 | struct GNUNET_PeerIdentity origin; | ||
392 | |||
393 | /** | ||
394 | * Number of items in @a relays. | ||
395 | */ | ||
396 | uint32_t relay_count; | ||
397 | |||
398 | /** | ||
399 | * Relays that multicast can use to connect. | ||
400 | */ | ||
401 | struct GNUNET_PeerIdentity *relays; | ||
402 | |||
403 | /** | ||
404 | * Join request to be transmitted to the master on join. | ||
405 | */ | ||
406 | struct GNUNET_PSYC_Message *join_msg; | ||
407 | |||
408 | /** | ||
409 | * Join decision received from multicast. | ||
410 | */ | ||
411 | struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn; | ||
412 | |||
413 | /** | ||
414 | * Maximum request ID for this channel. | ||
415 | */ | ||
416 | uint64_t max_request_id; | ||
417 | |||
418 | /** | ||
419 | * Join flags. | ||
420 | */ | ||
421 | enum GNUNET_PSYC_SlaveJoinFlags join_flags; | ||
422 | }; | ||
423 | |||
424 | |||
425 | /** | ||
426 | * Client context. | ||
427 | */ | ||
428 | struct Client { | ||
429 | struct GNUNET_SERVICE_Client *client; | ||
430 | struct Channel *channel; | ||
431 | }; | ||
432 | |||
433 | |||
434 | struct ReplayRequestKey | ||
435 | { | ||
436 | uint64_t fragment_id; | ||
437 | uint64_t message_id; | ||
438 | uint64_t fragment_offset; | ||
439 | uint64_t flags; | ||
440 | }; | ||
441 | |||
442 | |||
443 | static void | ||
444 | transmit_message (struct Channel *chn); | ||
445 | |||
446 | static uint64_t | ||
447 | message_queue_run (struct Channel *chn); | ||
448 | |||
449 | static uint64_t | ||
450 | message_queue_drop (struct Channel *chn); | ||
451 | |||
452 | |||
453 | static void | ||
454 | schedule_transmit_message (void *cls) | ||
455 | { | ||
456 | struct Channel *chn = cls; | ||
457 | |||
458 | transmit_message (chn); | ||
459 | } | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Task run during shutdown. | ||
464 | * | ||
465 | * @param cls unused | ||
466 | */ | ||
467 | static void | ||
468 | shutdown_task (void *cls) | ||
469 | { | ||
470 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
471 | "shutting down...\n"); | ||
472 | GNUNET_PSYCSTORE_disconnect (store); | ||
473 | if (NULL != stats) | ||
474 | { | ||
475 | GNUNET_STATISTICS_destroy (stats, GNUNET_YES); | ||
476 | stats = NULL; | ||
477 | } | ||
478 | } | ||
479 | |||
480 | |||
481 | static struct Operation * | ||
482 | op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client, | ||
483 | uint64_t op_id, uint32_t flags) | ||
484 | { | ||
485 | struct Operation *op = GNUNET_malloc (sizeof (*op)); | ||
486 | op->client = client; | ||
487 | op->channel = chn; | ||
488 | op->op_id = op_id; | ||
489 | op->flags = flags; | ||
490 | GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); | ||
491 | return op; | ||
492 | } | ||
493 | |||
494 | |||
495 | static void | ||
496 | op_remove (struct Operation *op) | ||
497 | { | ||
498 | GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op); | ||
499 | GNUNET_free (op); | ||
500 | } | ||
501 | |||
502 | |||
503 | /** | ||
504 | * Clean up master data structures after a client disconnected. | ||
505 | */ | ||
506 | static void | ||
507 | cleanup_master (struct Master *mst) | ||
508 | { | ||
509 | struct Channel *chn = &mst->channel; | ||
510 | |||
511 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); | ||
512 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); | ||
513 | } | ||
514 | |||
515 | |||
516 | /** | ||
517 | * Clean up slave data structures after a client disconnected. | ||
518 | */ | ||
519 | static void | ||
520 | cleanup_slave (struct Slave *slv) | ||
521 | { | ||
522 | struct Channel *chn = &slv->channel; | ||
523 | struct GNUNET_CONTAINER_MultiHashMap * | ||
524 | chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, | ||
525 | &chn->pub_key_hash); | ||
526 | GNUNET_assert (NULL != chn_slv); | ||
527 | GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv); | ||
528 | |||
529 | if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv)) | ||
530 | { | ||
531 | GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash, | ||
532 | chn_slv); | ||
533 | GNUNET_CONTAINER_multihashmap_destroy (chn_slv); | ||
534 | } | ||
535 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); | ||
536 | |||
537 | if (NULL != slv->join_msg) | ||
538 | { | ||
539 | GNUNET_free (slv->join_msg); | ||
540 | slv->join_msg = NULL; | ||
541 | } | ||
542 | if (NULL != slv->relays) | ||
543 | { | ||
544 | GNUNET_free (slv->relays); | ||
545 | slv->relays = NULL; | ||
546 | } | ||
547 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); | ||
548 | } | ||
549 | |||
550 | |||
551 | /** | ||
552 | * Clean up channel data structures after a client disconnected. | ||
553 | */ | ||
554 | static void | ||
555 | cleanup_channel (struct Channel *chn) | ||
556 | { | ||
557 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
558 | "%p Cleaning up channel %s. master? %u\n", | ||
559 | chn, | ||
560 | GNUNET_h2s (&chn->pub_key_hash), | ||
561 | chn->is_master); | ||
562 | message_queue_drop (chn); | ||
563 | GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags); | ||
564 | chn->recv_frags = NULL; | ||
565 | |||
566 | if (NULL != chn->store_op) | ||
567 | { | ||
568 | GNUNET_PSYCSTORE_operation_cancel (chn->store_op); | ||
569 | chn->store_op = NULL; | ||
570 | } | ||
571 | |||
572 | (GNUNET_YES == chn->is_master) | ||
573 | ? cleanup_master (chn->master) | ||
574 | : cleanup_slave (chn->slave); | ||
575 | GNUNET_free (chn); | ||
576 | } | ||
577 | |||
578 | |||
579 | /** | ||
580 | * Called whenever a client is disconnected. | ||
581 | * Frees our resources associated with that client. | ||
582 | * | ||
583 | * @param cls closure | ||
584 | * @param client identification of the client | ||
585 | * @param app_ctx must match @a client | ||
586 | */ | ||
587 | static void | ||
588 | client_notify_disconnect (void *cls, | ||
589 | struct GNUNET_SERVICE_Client *client, | ||
590 | void *app_ctx) | ||
591 | { | ||
592 | struct Client *c = app_ctx; | ||
593 | struct Channel *chn = c->channel; | ||
594 | GNUNET_free (c); | ||
595 | |||
596 | if (NULL == chn) | ||
597 | { | ||
598 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
599 | "%p User context is NULL in client_notify_disconnect ()\n", | ||
600 | chn); | ||
601 | GNUNET_break (0); | ||
602 | return; | ||
603 | } | ||
604 | |||
605 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
606 | "%p Client %p (%s) disconnected from channel %s\n", | ||
607 | chn, | ||
608 | client, | ||
609 | (GNUNET_YES == chn->is_master) ? "master" : "slave", | ||
610 | GNUNET_h2s (&chn->pub_key_hash)); | ||
611 | |||
612 | struct ClientList *cli = chn->clients_head; | ||
613 | while (NULL != cli) | ||
614 | { | ||
615 | if (cli->client == client) | ||
616 | { | ||
617 | GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli); | ||
618 | GNUNET_free (cli); | ||
619 | break; | ||
620 | } | ||
621 | cli = cli->next; | ||
622 | } | ||
623 | |||
624 | struct Operation *op = chn->op_head; | ||
625 | while (NULL != op) | ||
626 | { | ||
627 | if (op->client == client) | ||
628 | { | ||
629 | op->client = NULL; | ||
630 | break; | ||
631 | } | ||
632 | op = op->next; | ||
633 | } | ||
634 | |||
635 | if (NULL == chn->clients_head) | ||
636 | { /* Last client disconnected. */ | ||
637 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
638 | "%p Last client (%s) disconnected from channel %s\n", | ||
639 | chn, | ||
640 | (GNUNET_YES == chn->is_master) ? "master" : "slave", | ||
641 | GNUNET_h2s (&chn->pub_key_hash)); | ||
642 | chn->is_disconnecting = GNUNET_YES; | ||
643 | cleanup_channel (chn); | ||
644 | } | ||
645 | } | ||
646 | |||
647 | |||
648 | /** | ||
649 | * A new client connected. | ||
650 | * | ||
651 | * @param cls NULL | ||
652 | * @param client client to add | ||
653 | * @param mq message queue for @a client | ||
654 | * @return @a client | ||
655 | */ | ||
656 | static void * | ||
657 | client_notify_connect (void *cls, | ||
658 | struct GNUNET_SERVICE_Client *client, | ||
659 | struct GNUNET_MQ_Handle *mq) | ||
660 | { | ||
661 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); | ||
662 | |||
663 | struct Client *c = GNUNET_malloc (sizeof (*c)); | ||
664 | c->client = client; | ||
665 | |||
666 | return c; | ||
667 | } | ||
668 | |||
669 | |||
670 | /** | ||
671 | * Send message to all clients connected to the channel. | ||
672 | */ | ||
673 | static void | ||
674 | client_send_msg (const struct Channel *chn, | ||
675 | const struct GNUNET_MessageHeader *msg) | ||
676 | { | ||
677 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
678 | "Sending message to clients of channel %p.\n", | ||
679 | chn); | ||
680 | |||
681 | struct ClientList *cli = chn->clients_head; | ||
682 | while (NULL != cli) | ||
683 | { | ||
684 | struct GNUNET_MQ_Envelope * | ||
685 | env = GNUNET_MQ_msg_copy (msg); | ||
686 | |||
687 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client), | ||
688 | env); | ||
689 | cli = cli->next; | ||
690 | } | ||
691 | } | ||
692 | |||
693 | |||
694 | /** | ||
695 | * Send a result code back to the client. | ||
696 | * | ||
697 | * @param client | ||
698 | * Client that should receive the result code. | ||
699 | * @param result_code | ||
700 | * Code to transmit. | ||
701 | * @param op_id | ||
702 | * Operation ID in network byte order. | ||
703 | * @param data | ||
704 | * Data payload or NULL. | ||
705 | * @param data_size | ||
706 | * Size of @a data. | ||
707 | */ | ||
708 | static void | ||
709 | client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id, | ||
710 | int64_t result_code, const void *data, uint16_t data_size) | ||
711 | { | ||
712 | struct GNUNET_OperationResultMessage *res; | ||
713 | struct GNUNET_MQ_Envelope * | ||
714 | env = GNUNET_MQ_msg_extra (res, | ||
715 | data_size, | ||
716 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); | ||
717 | res->result_code = GNUNET_htonll (result_code); | ||
718 | res->op_id = op_id; | ||
719 | if (0 < data_size) | ||
720 | GNUNET_memcpy (&res[1], data, data_size); | ||
721 | |||
722 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
723 | "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n", | ||
724 | client, | ||
725 | GNUNET_ntohll (op_id), | ||
726 | result_code, | ||
727 | data_size); | ||
728 | |||
729 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
730 | } | ||
731 | |||
732 | |||
733 | /** | ||
734 | * Closure for join_mem_test_cb() | ||
735 | */ | ||
736 | struct JoinMemTestClosure | ||
737 | { | ||
738 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; | ||
739 | struct Channel *channel; | ||
740 | struct GNUNET_MULTICAST_JoinHandle *join_handle; | ||
741 | struct GNUNET_PSYC_JoinRequestMessage *join_msg; | ||
742 | }; | ||
743 | |||
744 | |||
745 | /** | ||
746 | * Membership test result callback used for join requests. | ||
747 | */ | ||
748 | static void | ||
749 | join_mem_test_cb (void *cls, int64_t result, | ||
750 | const char *err_msg, uint16_t err_msg_size) | ||
751 | { | ||
752 | struct JoinMemTestClosure *jcls = cls; | ||
753 | |||
754 | if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master) | ||
755 | { /* Pass on join request to client if this is a master channel */ | ||
756 | struct Master *mst = jcls->channel->master; | ||
757 | struct GNUNET_HashCode slave_pub_hash; | ||
758 | GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key), | ||
759 | &slave_pub_hash); | ||
760 | GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle, | ||
761 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
762 | client_send_msg (jcls->channel, &jcls->join_msg->header); | ||
763 | } | ||
764 | else | ||
765 | { | ||
766 | if (GNUNET_SYSERR == result) | ||
767 | { | ||
768 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
769 | "Could not perform membership test (%.*s)\n", | ||
770 | err_msg_size, err_msg); | ||
771 | } | ||
772 | // FIXME: add relays | ||
773 | GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL); | ||
774 | } | ||
775 | GNUNET_free (jcls->join_msg); | ||
776 | GNUNET_free (jcls); | ||
777 | } | ||
778 | |||
779 | |||
780 | /** | ||
781 | * Incoming join request from multicast. | ||
782 | */ | ||
783 | static void | ||
784 | mcast_recv_join_request (void *cls, | ||
785 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, | ||
786 | const struct GNUNET_MessageHeader *join_msg, | ||
787 | struct GNUNET_MULTICAST_JoinHandle *jh) | ||
788 | { | ||
789 | struct Channel *chn = cls; | ||
790 | uint16_t join_msg_size = 0; | ||
791 | |||
792 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
793 | "%p Got join request.\n", | ||
794 | chn); | ||
795 | if (NULL != join_msg) | ||
796 | { | ||
797 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type)) | ||
798 | { | ||
799 | join_msg_size = ntohs (join_msg->size); | ||
800 | } | ||
801 | else | ||
802 | { | ||
803 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
804 | "%p Got join message with invalid type %u.\n", | ||
805 | chn, | ||
806 | ntohs (join_msg->type)); | ||
807 | } | ||
808 | } | ||
809 | |||
810 | struct GNUNET_PSYC_JoinRequestMessage * | ||
811 | req = GNUNET_malloc (sizeof (*req) + join_msg_size); | ||
812 | req->header.size = htons (sizeof (*req) + join_msg_size); | ||
813 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); | ||
814 | req->slave_pub_key = *slave_pub_key; | ||
815 | if (0 < join_msg_size) | ||
816 | GNUNET_memcpy (&req[1], join_msg, join_msg_size); | ||
817 | |||
818 | struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); | ||
819 | jcls->slave_pub_key = *slave_pub_key; | ||
820 | jcls->channel = chn; | ||
821 | jcls->join_handle = jh; | ||
822 | jcls->join_msg = req; | ||
823 | |||
824 | GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key, | ||
825 | chn->max_message_id, 0, | ||
826 | &join_mem_test_cb, jcls); | ||
827 | } | ||
828 | |||
829 | |||
830 | /** | ||
831 | * Join decision received from multicast. | ||
832 | */ | ||
833 | static void | ||
834 | mcast_recv_join_decision (void *cls, int is_admitted, | ||
835 | const struct GNUNET_PeerIdentity *peer, | ||
836 | uint16_t relay_count, | ||
837 | const struct GNUNET_PeerIdentity *relays, | ||
838 | const struct GNUNET_MessageHeader *join_resp) | ||
839 | { | ||
840 | struct Slave *slv = cls; | ||
841 | struct Channel *chn = &slv->channel; | ||
842 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
843 | "%p Got join decision: %d\n", | ||
844 | slv, | ||
845 | is_admitted); | ||
846 | if (GNUNET_YES == chn->is_ready) | ||
847 | { | ||
848 | /* Already admitted */ | ||
849 | return; | ||
850 | } | ||
851 | |||
852 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; | ||
853 | struct GNUNET_PSYC_JoinDecisionMessage * | ||
854 | dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size); | ||
855 | dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size); | ||
856 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | ||
857 | dcsn->is_admitted = htonl (is_admitted); | ||
858 | if (0 < join_resp_size) | ||
859 | GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); | ||
860 | |||
861 | client_send_msg (chn, &dcsn->header); | ||
862 | |||
863 | if (GNUNET_YES == is_admitted | ||
864 | && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)) | ||
865 | { | ||
866 | chn->is_ready = GNUNET_YES; | ||
867 | } | ||
868 | } | ||
869 | |||
870 | |||
871 | static int | ||
872 | store_recv_fragment_replay (void *cls, | ||
873 | struct GNUNET_MULTICAST_MessageHeader *msg, | ||
874 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
875 | { | ||
876 | struct GNUNET_MULTICAST_ReplayHandle *rh = cls; | ||
877 | |||
878 | GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK); | ||
879 | return GNUNET_YES; | ||
880 | } | ||
881 | |||
882 | |||
883 | /** | ||
884 | * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. | ||
885 | */ | ||
886 | static void | ||
887 | store_recv_fragment_replay_result (void *cls, | ||
888 | int64_t result, | ||
889 | const char *err_msg, | ||
890 | uint16_t err_msg_size) | ||
891 | { | ||
892 | struct GNUNET_MULTICAST_ReplayHandle *rh = cls; | ||
893 | |||
894 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
895 | "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", | ||
896 | rh, | ||
897 | result, | ||
898 | err_msg_size, | ||
899 | err_msg); | ||
900 | switch (result) | ||
901 | { | ||
902 | case GNUNET_YES: | ||
903 | break; | ||
904 | |||
905 | case GNUNET_NO: | ||
906 | GNUNET_MULTICAST_replay_response (rh, NULL, | ||
907 | GNUNET_MULTICAST_REC_NOT_FOUND); | ||
908 | return; | ||
909 | |||
910 | case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED: | ||
911 | GNUNET_MULTICAST_replay_response (rh, NULL, | ||
912 | GNUNET_MULTICAST_REC_ACCESS_DENIED); | ||
913 | return; | ||
914 | |||
915 | case GNUNET_SYSERR: | ||
916 | GNUNET_MULTICAST_replay_response (rh, NULL, | ||
917 | GNUNET_MULTICAST_REC_INTERNAL_ERROR); | ||
918 | return; | ||
919 | } | ||
920 | /* GNUNET_MULTICAST_replay_response frees 'rh' when passed | ||
921 | * an error code, so it must be ensured no further processing | ||
922 | * is attempted on 'rh'. Maybe this should be refactored as | ||
923 | * it doesn't look very intuitive. --lynX | ||
924 | */ | ||
925 | GNUNET_MULTICAST_replay_response_end (rh); | ||
926 | } | ||
927 | |||
928 | |||
929 | /** | ||
930 | * Incoming fragment replay request from multicast. | ||
931 | */ | ||
932 | static void | ||
933 | mcast_recv_replay_fragment (void *cls, | ||
934 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, | ||
935 | uint64_t fragment_id, uint64_t flags, | ||
936 | struct GNUNET_MULTICAST_ReplayHandle *rh) | ||
937 | |||
938 | { | ||
939 | struct Channel *chn = cls; | ||
940 | GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key, | ||
941 | fragment_id, fragment_id, | ||
942 | &store_recv_fragment_replay, | ||
943 | &store_recv_fragment_replay_result, rh); | ||
944 | } | ||
945 | |||
946 | |||
947 | /** | ||
948 | * Incoming message replay request from multicast. | ||
949 | */ | ||
950 | static void | ||
951 | mcast_recv_replay_message (void *cls, | ||
952 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, | ||
953 | uint64_t message_id, | ||
954 | uint64_t fragment_offset, | ||
955 | uint64_t flags, | ||
956 | struct GNUNET_MULTICAST_ReplayHandle *rh) | ||
957 | { | ||
958 | struct Channel *chn = cls; | ||
959 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key, | ||
960 | message_id, message_id, 1, NULL, | ||
961 | &store_recv_fragment_replay, | ||
962 | &store_recv_fragment_replay_result, rh); | ||
963 | } | ||
964 | |||
965 | |||
966 | /** | ||
967 | * Convert an uint64_t in network byte order to a HashCode | ||
968 | * that can be used as key in a MultiHashMap | ||
969 | */ | ||
970 | static inline void | ||
971 | hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) | ||
972 | { | ||
973 | /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ | ||
974 | /* TODO: use built-in byte swap functions if available */ | ||
975 | |||
976 | n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); | ||
977 | n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); | ||
978 | |||
979 | *key = (struct GNUNET_HashCode) {}; | ||
980 | *((uint64_t *) key) | ||
981 | = (n << 32) | (n >> 32); | ||
982 | } | ||
983 | |||
984 | |||
985 | /** | ||
986 | * Convert an uint64_t in host byte order to a HashCode | ||
987 | * that can be used as key in a MultiHashMap | ||
988 | */ | ||
989 | static inline void | ||
990 | hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) | ||
991 | { | ||
992 | #if __BYTE_ORDER == __BIG_ENDIAN | ||
993 | hash_key_from_nll (key, n); | ||
994 | #elif __BYTE_ORDER == __LITTLE_ENDIAN | ||
995 | *key = (struct GNUNET_HashCode) {}; | ||
996 | *((uint64_t *) key) = n; | ||
997 | #else | ||
998 | #error byteorder undefined | ||
999 | #endif | ||
1000 | } | ||
1001 | |||
1002 | |||
1003 | /** | ||
1004 | * Initialize PSYC message header. | ||
1005 | */ | ||
1006 | static inline void | ||
1007 | psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, | ||
1008 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) | ||
1009 | { | ||
1010 | uint16_t size = ntohs (mmsg->header.size); | ||
1011 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1012 | |||
1013 | pmsg->header.size = htons (psize); | ||
1014 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1015 | pmsg->message_id = mmsg->message_id; | ||
1016 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
1017 | pmsg->flags = htonl (flags); | ||
1018 | |||
1019 | GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
1020 | } | ||
1021 | |||
1022 | |||
1023 | /** | ||
1024 | * Create a new PSYC message from a multicast message for sending it to clients. | ||
1025 | */ | ||
1026 | static inline struct GNUNET_PSYC_MessageHeader * | ||
1027 | psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) | ||
1028 | { | ||
1029 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
1030 | uint16_t size = ntohs (mmsg->header.size); | ||
1031 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
1032 | |||
1033 | pmsg = GNUNET_malloc (psize); | ||
1034 | psyc_msg_init (pmsg, mmsg, flags); | ||
1035 | return pmsg; | ||
1036 | } | ||
1037 | |||
1038 | |||
1039 | /** | ||
1040 | * Send multicast message to all clients connected to the channel. | ||
1041 | */ | ||
1042 | static void | ||
1043 | client_send_mcast_msg (struct Channel *chn, | ||
1044 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1045 | uint32_t flags) | ||
1046 | { | ||
1047 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1048 | "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", | ||
1049 | chn, | ||
1050 | GNUNET_ntohll (mmsg->fragment_id), | ||
1051 | GNUNET_ntohll (mmsg->message_id)); | ||
1052 | |||
1053 | struct GNUNET_PSYC_MessageHeader * | ||
1054 | pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); | ||
1055 | client_send_msg (chn, &pmsg->header); | ||
1056 | GNUNET_free (pmsg); | ||
1057 | } | ||
1058 | |||
1059 | |||
1060 | /** | ||
1061 | * Send multicast request to all clients connected to the channel. | ||
1062 | */ | ||
1063 | static void | ||
1064 | client_send_mcast_req (struct Master *mst, | ||
1065 | const struct GNUNET_MULTICAST_RequestHeader *req) | ||
1066 | { | ||
1067 | struct Channel *chn = &mst->channel; | ||
1068 | |||
1069 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
1070 | uint16_t size = ntohs (req->header.size); | ||
1071 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); | ||
1072 | |||
1073 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1074 | "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", | ||
1075 | chn, | ||
1076 | GNUNET_ntohll (req->fragment_id), | ||
1077 | GNUNET_ntohll (req->request_id)); | ||
1078 | |||
1079 | pmsg = GNUNET_malloc (psize); | ||
1080 | pmsg->header.size = htons (psize); | ||
1081 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1082 | pmsg->message_id = req->request_id; | ||
1083 | pmsg->fragment_offset = req->fragment_offset; | ||
1084 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | ||
1085 | pmsg->slave_pub_key = req->member_pub_key; | ||
1086 | GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | ||
1087 | |||
1088 | client_send_msg (chn, &pmsg->header); | ||
1089 | |||
1090 | /* FIXME: save req to PSYCstore so that it can be resent later to clients */ | ||
1091 | |||
1092 | GNUNET_free (pmsg); | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | ||
1097 | * Insert a multicast message fragment into the queue belonging to the message. | ||
1098 | * | ||
1099 | * @param chn Channel. | ||
1100 | * @param mmsg Multicast message fragment. | ||
1101 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. | ||
1102 | * @param first_ptype First PSYC message part type in @a mmsg. | ||
1103 | * @param last_ptype Last PSYC message part type in @a mmsg. | ||
1104 | */ | ||
1105 | static void | ||
1106 | fragment_queue_insert (struct Channel *chn, | ||
1107 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
1108 | uint16_t first_ptype, uint16_t last_ptype) | ||
1109 | { | ||
1110 | const uint16_t size = ntohs (mmsg->header.size); | ||
1111 | const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); | ||
1112 | struct GNUNET_CONTAINER_MultiHashMap | ||
1113 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | ||
1114 | &chn->pub_key_hash); | ||
1115 | |||
1116 | struct GNUNET_HashCode msg_id_hash; | ||
1117 | hash_key_from_nll (&msg_id_hash, mmsg->message_id); | ||
1118 | |||
1119 | struct FragmentQueue | ||
1120 | *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); | ||
1121 | |||
1122 | if (NULL == fragq) | ||
1123 | { | ||
1124 | fragq = GNUNET_malloc (sizeof (*fragq)); | ||
1125 | fragq->state = MSG_FRAG_STATE_HEADER; | ||
1126 | fragq->fragments | ||
1127 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1128 | |||
1129 | GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq, | ||
1130 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
1131 | |||
1132 | if (NULL == chan_msgs) | ||
1133 | { | ||
1134 | chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
1135 | GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs, | ||
1136 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
1137 | } | ||
1138 | } | ||
1139 | |||
1140 | struct GNUNET_HashCode frag_id_hash; | ||
1141 | hash_key_from_nll (&frag_id_hash, mmsg->fragment_id); | ||
1142 | struct RecvCacheEntry | ||
1143 | *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); | ||
1144 | if (NULL == cache_entry) | ||
1145 | { | ||
1146 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1147 | "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n", | ||
1148 | chn, | ||
1149 | GNUNET_ntohll (mmsg->message_id), | ||
1150 | GNUNET_ntohll (mmsg->fragment_id)); | ||
1151 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1152 | "%p header_size: %" PRIu64 " + %u\n", | ||
1153 | chn, | ||
1154 | fragq->header_size, | ||
1155 | size); | ||
1156 | cache_entry = GNUNET_malloc (sizeof (*cache_entry)); | ||
1157 | cache_entry->ref_count = 1; | ||
1158 | cache_entry->mmsg = GNUNET_malloc (size); | ||
1159 | GNUNET_memcpy (cache_entry->mmsg, mmsg, size); | ||
1160 | GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry, | ||
1161 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
1162 | } | ||
1163 | else | ||
1164 | { | ||
1165 | cache_entry->ref_count++; | ||
1166 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1167 | "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n", | ||
1168 | chn, | ||
1169 | GNUNET_ntohll (mmsg->message_id), | ||
1170 | GNUNET_ntohll (mmsg->fragment_id), | ||
1171 | cache_entry->ref_count); | ||
1172 | } | ||
1173 | |||
1174 | if (MSG_FRAG_STATE_HEADER == fragq->state) | ||
1175 | { | ||
1176 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
1177 | { | ||
1178 | struct GNUNET_PSYC_MessageMethod * | ||
1179 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1]; | ||
1180 | fragq->state_delta = GNUNET_ntohll (pmeth->state_delta); | ||
1181 | fragq->flags = ntohl (pmeth->flags); | ||
1182 | } | ||
1183 | |||
1184 | if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA) | ||
1185 | { | ||
1186 | fragq->header_size += size; | ||
1187 | } | ||
1188 | else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype | ||
1189 | || frag_offset == fragq->header_size) | ||
1190 | { /* header is now complete */ | ||
1191 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1192 | "%p Header of message %" PRIu64 " is complete.\n", | ||
1193 | chn, | ||
1194 | GNUNET_ntohll (mmsg->message_id)); | ||
1195 | |||
1196 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1197 | "%p Adding message %" PRIu64 " to queue.\n", | ||
1198 | chn, | ||
1199 | GNUNET_ntohll (mmsg->message_id)); | ||
1200 | fragq->state = MSG_FRAG_STATE_DATA; | ||
1201 | } | ||
1202 | else | ||
1203 | { | ||
1204 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1205 | "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", | ||
1206 | chn, | ||
1207 | GNUNET_ntohll (mmsg->message_id), | ||
1208 | frag_offset, | ||
1209 | fragq->header_size); | ||
1210 | } | ||
1211 | } | ||
1212 | |||
1213 | switch (last_ptype) | ||
1214 | { | ||
1215 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
1216 | if (frag_offset == fragq->size) | ||
1217 | fragq->state = MSG_FRAG_STATE_END; | ||
1218 | else | ||
1219 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1220 | "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", | ||
1221 | chn, | ||
1222 | GNUNET_ntohll (mmsg->message_id), | ||
1223 | frag_offset, | ||
1224 | fragq->size); | ||
1225 | break; | ||
1226 | |||
1227 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
1228 | /* Drop message without delivering to client if it's a single fragment */ | ||
1229 | fragq->state = | ||
1230 | (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
1231 | ? MSG_FRAG_STATE_DROP | ||
1232 | : MSG_FRAG_STATE_CANCEL; | ||
1233 | } | ||
1234 | |||
1235 | switch (fragq->state) | ||
1236 | { | ||
1237 | case MSG_FRAG_STATE_DATA: | ||
1238 | case MSG_FRAG_STATE_END: | ||
1239 | case MSG_FRAG_STATE_CANCEL: | ||
1240 | if (GNUNET_NO == fragq->is_queued) | ||
1241 | { | ||
1242 | GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, | ||
1243 | GNUNET_ntohll (mmsg->message_id)); | ||
1244 | fragq->is_queued = GNUNET_YES; | ||
1245 | } | ||
1246 | } | ||
1247 | |||
1248 | fragq->size += size; | ||
1249 | GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL, | ||
1250 | GNUNET_ntohll (mmsg->fragment_id)); | ||
1251 | } | ||
1252 | |||
1253 | |||
1254 | /** | ||
1255 | * Run fragment queue of a message. | ||
1256 | * | ||
1257 | * Send fragments of a message in order to client, after all modifiers arrived | ||
1258 | * from multicast. | ||
1259 | * | ||
1260 | * @param chn | ||
1261 | * Channel. | ||
1262 | * @param msg_id | ||
1263 | * ID of the message @a fragq belongs to. | ||
1264 | * @param fragq | ||
1265 | * Fragment queue of the message. | ||
1266 | * @param drop | ||
1267 | * Drop message without delivering to client? | ||
1268 | * #GNUNET_YES or #GNUNET_NO. | ||
1269 | */ | ||
1270 | static void | ||
1271 | fragment_queue_run (struct Channel *chn, uint64_t msg_id, | ||
1272 | struct FragmentQueue *fragq, uint8_t drop) | ||
1273 | { | ||
1274 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1275 | "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n", | ||
1276 | chn, | ||
1277 | msg_id, | ||
1278 | fragq->state); | ||
1279 | |||
1280 | struct GNUNET_CONTAINER_MultiHashMap | ||
1281 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | ||
1282 | &chn->pub_key_hash); | ||
1283 | GNUNET_assert (NULL != chan_msgs); | ||
1284 | uint64_t frag_id; | ||
1285 | |||
1286 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, | ||
1287 | &frag_id)) | ||
1288 | { | ||
1289 | struct GNUNET_HashCode frag_id_hash; | ||
1290 | hash_key_from_hll (&frag_id_hash, frag_id); | ||
1291 | struct RecvCacheEntry *cache_entry | ||
1292 | = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); | ||
1293 | if (cache_entry != NULL) | ||
1294 | { | ||
1295 | if (GNUNET_NO == drop) | ||
1296 | { | ||
1297 | client_send_mcast_msg (chn, cache_entry->mmsg, 0); | ||
1298 | } | ||
1299 | if (cache_entry->ref_count <= 1) | ||
1300 | { | ||
1301 | GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash, | ||
1302 | cache_entry); | ||
1303 | GNUNET_free (cache_entry->mmsg); | ||
1304 | GNUNET_free (cache_entry); | ||
1305 | } | ||
1306 | else | ||
1307 | { | ||
1308 | cache_entry->ref_count--; | ||
1309 | } | ||
1310 | } | ||
1311 | #if CACHE_AGING_IMPLEMENTED | ||
1312 | else if (GNUNET_NO == drop) | ||
1313 | { | ||
1314 | /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */ | ||
1315 | } | ||
1316 | #endif | ||
1317 | |||
1318 | GNUNET_CONTAINER_heap_remove_root (fragq->fragments); | ||
1319 | } | ||
1320 | |||
1321 | if (MSG_FRAG_STATE_END <= fragq->state) | ||
1322 | { | ||
1323 | struct GNUNET_HashCode msg_id_hash; | ||
1324 | hash_key_from_hll (&msg_id_hash, msg_id); | ||
1325 | |||
1326 | GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); | ||
1327 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); | ||
1328 | GNUNET_free (fragq); | ||
1329 | } | ||
1330 | else | ||
1331 | { | ||
1332 | fragq->is_queued = GNUNET_NO; | ||
1333 | } | ||
1334 | } | ||
1335 | |||
1336 | |||
1337 | struct StateModifyClosure | ||
1338 | { | ||
1339 | struct Channel *channel; | ||
1340 | uint64_t msg_id; | ||
1341 | struct GNUNET_HashCode msg_id_hash; | ||
1342 | }; | ||
1343 | |||
1344 | |||
1345 | void | ||
1346 | store_recv_state_modify_result (void *cls, int64_t result, | ||
1347 | const char *err_msg, uint16_t err_msg_size) | ||
1348 | { | ||
1349 | struct StateModifyClosure *mcls = cls; | ||
1350 | struct Channel *chn = mcls->channel; | ||
1351 | uint64_t msg_id = mcls->msg_id; | ||
1352 | |||
1353 | struct FragmentQueue * | ||
1354 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); | ||
1355 | |||
1356 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1357 | "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", | ||
1358 | chn, result, err_msg_size, err_msg); | ||
1359 | |||
1360 | switch (result) | ||
1361 | { | ||
1362 | case GNUNET_OK: | ||
1363 | case GNUNET_NO: | ||
1364 | if (NULL != fragq) | ||
1365 | fragq->state_is_modified = GNUNET_YES; | ||
1366 | if (chn->max_state_message_id < msg_id) | ||
1367 | chn->max_state_message_id = msg_id; | ||
1368 | if (chn->max_message_id < msg_id) | ||
1369 | chn->max_message_id = msg_id; | ||
1370 | |||
1371 | if (NULL != fragq) | ||
1372 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | ||
1373 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | ||
1374 | message_queue_run (chn); | ||
1375 | break; | ||
1376 | |||
1377 | default: | ||
1378 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1379 | "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", | ||
1380 | chn, result, err_msg_size, err_msg); | ||
1381 | /** @todo FIXME: handle state_modify error */ | ||
1382 | } | ||
1383 | } | ||
1384 | |||
1385 | |||
1386 | /** | ||
1387 | * Run message queue. | ||
1388 | * | ||
1389 | * Send messages in queue to client in order after a message has arrived from | ||
1390 | * multicast, according to the following: | ||
1391 | * - A message is only sent if all of its modifiers arrived. | ||
1392 | * - A stateful message is only sent if the previous stateful message | ||
1393 | * has already been delivered to the client. | ||
1394 | * | ||
1395 | * @param chn Channel. | ||
1396 | * | ||
1397 | * @return Number of messages removed from queue and sent to client. | ||
1398 | */ | ||
1399 | static uint64_t | ||
1400 | message_queue_run (struct Channel *chn) | ||
1401 | { | ||
1402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1403 | "%p Running message queue.\n", chn); | ||
1404 | uint64_t n = 0; | ||
1405 | uint64_t msg_id; | ||
1406 | |||
1407 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, | ||
1408 | &msg_id)) | ||
1409 | { | ||
1410 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1411 | "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); | ||
1412 | struct GNUNET_HashCode msg_id_hash; | ||
1413 | hash_key_from_hll (&msg_id_hash, msg_id); | ||
1414 | |||
1415 | struct FragmentQueue * | ||
1416 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); | ||
1417 | |||
1418 | if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) | ||
1419 | { | ||
1420 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1421 | "%p No fragq (%p) or header not complete.\n", | ||
1422 | chn, fragq); | ||
1423 | break; | ||
1424 | } | ||
1425 | |||
1426 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1427 | "%p Fragment queue entry: state: %u, state delta: " | ||
1428 | "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", | ||
1429 | chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); | ||
1430 | |||
1431 | if (MSG_FRAG_STATE_DATA <= fragq->state) | ||
1432 | { | ||
1433 | /* Check if there's a missing message before the current one */ | ||
1434 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) | ||
1435 | { | ||
1436 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn); | ||
1437 | |||
1438 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) | ||
1439 | && (chn->max_message_id != msg_id - 1 | ||
1440 | && chn->max_message_id != msg_id)) | ||
1441 | { | ||
1442 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1443 | "%p Out of order message. " | ||
1444 | "(%" PRIu64 " != %" PRIu64 " - 1)\n", | ||
1445 | chn, chn->max_message_id, msg_id); | ||
1446 | break; | ||
1447 | // FIXME: keep track of messages processed in this queue run, | ||
1448 | // and only stop after reaching the end | ||
1449 | } | ||
1450 | } | ||
1451 | else | ||
1452 | { | ||
1453 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn); | ||
1454 | if (GNUNET_YES != fragq->state_is_modified) | ||
1455 | { | ||
1456 | if (msg_id - fragq->state_delta != chn->max_state_message_id) | ||
1457 | { | ||
1458 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1459 | "%p Out of order stateful message. " | ||
1460 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", | ||
1461 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); | ||
1462 | break; | ||
1463 | // FIXME: keep track of messages processed in this queue run, | ||
1464 | // and only stop after reaching the end | ||
1465 | } | ||
1466 | |||
1467 | struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); | ||
1468 | mcls->channel = chn; | ||
1469 | mcls->msg_id = msg_id; | ||
1470 | mcls->msg_id_hash = msg_id_hash; | ||
1471 | |||
1472 | /* Apply modifiers to state in PSYCstore */ | ||
1473 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, | ||
1474 | fragq->state_delta, | ||
1475 | store_recv_state_modify_result, mcls); | ||
1476 | break; // continue after asynchronous state modify result | ||
1477 | } | ||
1478 | } | ||
1479 | chn->max_message_id = msg_id; | ||
1480 | } | ||
1481 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | ||
1482 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | ||
1483 | n++; | ||
1484 | } | ||
1485 | |||
1486 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1487 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); | ||
1488 | return n; | ||
1489 | } | ||
1490 | |||
1491 | |||
1492 | /** | ||
1493 | * Drop message queue of a channel. | ||
1494 | * | ||
1495 | * Remove all messages in queue without sending it to clients. | ||
1496 | * | ||
1497 | * @param chn Channel. | ||
1498 | * | ||
1499 | * @return Number of messages removed from queue. | ||
1500 | */ | ||
1501 | static uint64_t | ||
1502 | message_queue_drop (struct Channel *chn) | ||
1503 | { | ||
1504 | uint64_t n = 0; | ||
1505 | uint64_t msg_id; | ||
1506 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, | ||
1507 | &msg_id)) | ||
1508 | { | ||
1509 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1510 | "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id); | ||
1511 | struct GNUNET_HashCode msg_id_hash; | ||
1512 | hash_key_from_hll (&msg_id_hash, msg_id); | ||
1513 | |||
1514 | struct FragmentQueue * | ||
1515 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); | ||
1516 | GNUNET_assert (NULL != fragq); | ||
1517 | fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); | ||
1518 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | ||
1519 | n++; | ||
1520 | } | ||
1521 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1522 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); | ||
1523 | return n; | ||
1524 | } | ||
1525 | |||
1526 | |||
1527 | /** | ||
1528 | * Received result of GNUNET_PSYCSTORE_fragment_store(). | ||
1529 | */ | ||
1530 | static void | ||
1531 | store_recv_fragment_store_result (void *cls, int64_t result, | ||
1532 | const char *err_msg, uint16_t err_msg_size) | ||
1533 | { | ||
1534 | struct Channel *chn = cls; | ||
1535 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1536 | "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", | ||
1537 | chn, result, err_msg_size, err_msg); | ||
1538 | } | ||
1539 | |||
1540 | |||
1541 | /** | ||
1542 | * Handle incoming message fragment from multicast. | ||
1543 | * | ||
1544 | * Store it using PSYCstore and send it to the clients of the channel in order. | ||
1545 | */ | ||
1546 | static void | ||
1547 | mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg) | ||
1548 | { | ||
1549 | struct Channel *chn = cls; | ||
1550 | uint16_t size = ntohs (mmsg->header.size); | ||
1551 | |||
1552 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1553 | "%p Received multicast message of size %u. " | ||
1554 | "fragment_id=%" PRIu64 ", message_id=%" PRIu64 | ||
1555 | ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n", | ||
1556 | chn, size, | ||
1557 | GNUNET_ntohll (mmsg->fragment_id), | ||
1558 | GNUNET_ntohll (mmsg->message_id), | ||
1559 | GNUNET_ntohll (mmsg->fragment_offset), | ||
1560 | GNUNET_ntohll (mmsg->flags)); | ||
1561 | |||
1562 | GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, | ||
1563 | &store_recv_fragment_store_result, chn); | ||
1564 | |||
1565 | uint16_t first_ptype = 0, last_ptype = 0; | ||
1566 | int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), | ||
1567 | (const char *) &mmsg[1], | ||
1568 | &first_ptype, &last_ptype); | ||
1569 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1570 | "%p Message check result %d, first part type %u, last part type %u\n", | ||
1571 | chn, check, first_ptype, last_ptype); | ||
1572 | if (GNUNET_SYSERR == check) | ||
1573 | { | ||
1574 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1575 | "%p Dropping incoming multicast message with invalid parts.\n", | ||
1576 | chn); | ||
1577 | GNUNET_break_op (0); | ||
1578 | return; | ||
1579 | } | ||
1580 | |||
1581 | fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); | ||
1582 | message_queue_run (chn); | ||
1583 | } | ||
1584 | |||
1585 | |||
1586 | /** | ||
1587 | * Incoming request fragment from multicast for a master. | ||
1588 | * | ||
1589 | * @param cls Master. | ||
1590 | * @param req The request. | ||
1591 | */ | ||
1592 | static void | ||
1593 | mcast_recv_request (void *cls, | ||
1594 | const struct GNUNET_MULTICAST_RequestHeader *req) | ||
1595 | { | ||
1596 | struct Master *mst = cls; | ||
1597 | uint16_t size = ntohs (req->header.size); | ||
1598 | |||
1599 | char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key); | ||
1600 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1601 | "%p Received multicast request of size %u from %s.\n", | ||
1602 | mst, size, str); | ||
1603 | GNUNET_free (str); | ||
1604 | |||
1605 | uint16_t first_ptype = 0, last_ptype = 0; | ||
1606 | if (GNUNET_SYSERR | ||
1607 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*req), | ||
1608 | (const char *) &req[1], | ||
1609 | &first_ptype, &last_ptype)) | ||
1610 | { | ||
1611 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1612 | "%p Dropping incoming multicast request with invalid parts.\n", | ||
1613 | mst); | ||
1614 | GNUNET_break_op (0); | ||
1615 | return; | ||
1616 | } | ||
1617 | |||
1618 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1619 | "Message parts: first: type %u, last: type %u\n", | ||
1620 | first_ptype, last_ptype); | ||
1621 | |||
1622 | /* FIXME: in-order delivery */ | ||
1623 | client_send_mcast_req (mst, req); | ||
1624 | } | ||
1625 | |||
1626 | |||
1627 | /** | ||
1628 | * Response from PSYCstore with the current counter values for a channel master. | ||
1629 | */ | ||
1630 | static void | ||
1631 | store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, | ||
1632 | uint64_t max_message_id, uint64_t max_group_generation, | ||
1633 | uint64_t max_state_message_id) | ||
1634 | { | ||
1635 | struct Master *mst = cls; | ||
1636 | struct Channel *chn = &mst->channel; | ||
1637 | chn->store_op = NULL; | ||
1638 | |||
1639 | struct GNUNET_PSYC_CountersResultMessage res; | ||
1640 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | ||
1641 | res.header.size = htons (sizeof (res)); | ||
1642 | res.result_code = htonl (result); | ||
1643 | res.max_message_id = GNUNET_htonll (max_message_id); | ||
1644 | |||
1645 | if (GNUNET_OK == result || GNUNET_NO == result) | ||
1646 | { | ||
1647 | mst->max_message_id = max_message_id; | ||
1648 | chn->max_message_id = max_message_id; | ||
1649 | chn->max_state_message_id = max_state_message_id; | ||
1650 | mst->max_group_generation = max_group_generation; | ||
1651 | mst->origin | ||
1652 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, | ||
1653 | mcast_recv_join_request, | ||
1654 | mcast_recv_replay_fragment, | ||
1655 | mcast_recv_replay_message, | ||
1656 | mcast_recv_request, | ||
1657 | mcast_recv_message, chn); | ||
1658 | chn->is_ready = GNUNET_YES; | ||
1659 | } | ||
1660 | else | ||
1661 | { | ||
1662 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1663 | "%p GNUNET_PSYCSTORE_counters_get() " | ||
1664 | "returned %d for channel %s.\n", | ||
1665 | chn, result, GNUNET_h2s (&chn->pub_key_hash)); | ||
1666 | } | ||
1667 | |||
1668 | client_send_msg (chn, &res.header); | ||
1669 | } | ||
1670 | |||
1671 | |||
1672 | /** | ||
1673 | * Response from PSYCstore with the current counter values for a channel slave. | ||
1674 | */ | ||
1675 | void | ||
1676 | store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, | ||
1677 | uint64_t max_message_id, uint64_t max_group_generation, | ||
1678 | uint64_t max_state_message_id) | ||
1679 | { | ||
1680 | struct Slave *slv = cls; | ||
1681 | struct Channel *chn = &slv->channel; | ||
1682 | chn->store_op = NULL; | ||
1683 | |||
1684 | struct GNUNET_PSYC_CountersResultMessage res; | ||
1685 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | ||
1686 | res.header.size = htons (sizeof (res)); | ||
1687 | res.result_code = htonl (result); | ||
1688 | res.max_message_id = GNUNET_htonll (max_message_id); | ||
1689 | |||
1690 | if (GNUNET_YES == result || GNUNET_NO == result) | ||
1691 | { | ||
1692 | chn->max_message_id = max_message_id; | ||
1693 | chn->max_state_message_id = max_state_message_id; | ||
1694 | slv->member | ||
1695 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, | ||
1696 | &slv->origin, | ||
1697 | slv->relay_count, slv->relays, | ||
1698 | &slv->join_msg->header, | ||
1699 | mcast_recv_join_request, | ||
1700 | mcast_recv_join_decision, | ||
1701 | mcast_recv_replay_fragment, | ||
1702 | mcast_recv_replay_message, | ||
1703 | mcast_recv_message, chn); | ||
1704 | if (NULL != slv->join_msg) | ||
1705 | { | ||
1706 | GNUNET_free (slv->join_msg); | ||
1707 | slv->join_msg = NULL; | ||
1708 | } | ||
1709 | } | ||
1710 | else | ||
1711 | { | ||
1712 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1713 | "%p GNUNET_PSYCSTORE_counters_get() " | ||
1714 | "returned %d for channel %s.\n", | ||
1715 | chn, result, GNUNET_h2s (&chn->pub_key_hash)); | ||
1716 | } | ||
1717 | |||
1718 | client_send_msg (chn, &res.header); | ||
1719 | } | ||
1720 | |||
1721 | |||
1722 | static void | ||
1723 | channel_init (struct Channel *chn) | ||
1724 | { | ||
1725 | chn->recv_msgs | ||
1726 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1727 | chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
1728 | } | ||
1729 | |||
1730 | |||
1731 | /** | ||
1732 | * Handle a connecting client starting a channel master. | ||
1733 | */ | ||
1734 | static void | ||
1735 | handle_client_master_start (void *cls, | ||
1736 | const struct MasterStartRequest *req) | ||
1737 | { | ||
1738 | struct Client *c = cls; | ||
1739 | struct GNUNET_SERVICE_Client *client = c->client; | ||
1740 | |||
1741 | struct GNUNET_CRYPTO_EddsaPublicKey pub_key; | ||
1742 | struct GNUNET_HashCode pub_key_hash; | ||
1743 | |||
1744 | GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key); | ||
1745 | GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); | ||
1746 | |||
1747 | struct Master * | ||
1748 | mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); | ||
1749 | struct Channel *chn; | ||
1750 | |||
1751 | if (NULL == mst) | ||
1752 | { | ||
1753 | mst = GNUNET_malloc (sizeof (*mst)); | ||
1754 | mst->policy = ntohl (req->policy); | ||
1755 | mst->priv_key = req->channel_key; | ||
1756 | mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
1757 | |||
1758 | chn = c->channel = &mst->channel; | ||
1759 | chn->master = mst; | ||
1760 | chn->is_master = GNUNET_YES; | ||
1761 | chn->pub_key = pub_key; | ||
1762 | chn->pub_key_hash = pub_key_hash; | ||
1763 | channel_init (chn); | ||
1764 | |||
1765 | GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn, | ||
1766 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1767 | chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, | ||
1768 | store_recv_master_counters, mst); | ||
1769 | } | ||
1770 | else | ||
1771 | { | ||
1772 | chn = &mst->channel; | ||
1773 | |||
1774 | struct GNUNET_PSYC_CountersResultMessage *res; | ||
1775 | struct GNUNET_MQ_Envelope * | ||
1776 | env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | ||
1777 | res->result_code = htonl (GNUNET_OK); | ||
1778 | res->max_message_id = GNUNET_htonll (mst->max_message_id); | ||
1779 | |||
1780 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
1781 | } | ||
1782 | |||
1783 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1784 | "%p Client connected as master to channel %s.\n", | ||
1785 | mst, GNUNET_h2s (&chn->pub_key_hash)); | ||
1786 | |||
1787 | struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); | ||
1788 | cli->client = client; | ||
1789 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); | ||
1790 | |||
1791 | GNUNET_SERVICE_client_continue (client); | ||
1792 | } | ||
1793 | |||
1794 | |||
1795 | static int | ||
1796 | check_client_slave_join (void *cls, | ||
1797 | const struct SlaveJoinRequest *req) | ||
1798 | { | ||
1799 | return GNUNET_OK; | ||
1800 | } | ||
1801 | |||
1802 | |||
1803 | /** | ||
1804 | * Handle a connecting client joining as a channel slave. | ||
1805 | */ | ||
1806 | static void | ||
1807 | handle_client_slave_join (void *cls, | ||
1808 | const struct SlaveJoinRequest *req) | ||
1809 | { | ||
1810 | struct Client *c = cls; | ||
1811 | struct GNUNET_SERVICE_Client *client = c->client; | ||
1812 | |||
1813 | uint16_t req_size = ntohs (req->header.size); | ||
1814 | |||
1815 | struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; | ||
1816 | struct GNUNET_HashCode pub_key_hash, slv_pub_hash; | ||
1817 | |||
1818 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1819 | "got join request from client %p\n", | ||
1820 | client); | ||
1821 | GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); | ||
1822 | GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); | ||
1823 | GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); | ||
1824 | |||
1825 | struct GNUNET_CONTAINER_MultiHashMap * | ||
1826 | chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); | ||
1827 | struct Slave *slv = NULL; | ||
1828 | struct Channel *chn; | ||
1829 | |||
1830 | if (NULL != chn_slv) | ||
1831 | { | ||
1832 | slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash); | ||
1833 | } | ||
1834 | if (NULL == slv) | ||
1835 | { | ||
1836 | slv = GNUNET_malloc (sizeof (*slv)); | ||
1837 | slv->priv_key = req->slave_key; | ||
1838 | slv->pub_key = slv_pub_key; | ||
1839 | slv->pub_key_hash = slv_pub_hash; | ||
1840 | slv->origin = req->origin; | ||
1841 | slv->relay_count = ntohl (req->relay_count); | ||
1842 | slv->join_flags = ntohl (req->flags); | ||
1843 | |||
1844 | const struct GNUNET_PeerIdentity * | ||
1845 | relays = (const struct GNUNET_PeerIdentity *) &req[1]; | ||
1846 | uint16_t relay_size = slv->relay_count * sizeof (*relays); | ||
1847 | uint16_t join_msg_size = 0; | ||
1848 | |||
1849 | if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader) | ||
1850 | <= req_size) | ||
1851 | { | ||
1852 | struct GNUNET_PSYC_Message * | ||
1853 | join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size); | ||
1854 | join_msg_size = ntohs (join_msg->header.size); | ||
1855 | slv->join_msg = GNUNET_malloc (join_msg_size); | ||
1856 | GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size); | ||
1857 | } | ||
1858 | if (sizeof (*req) + relay_size + join_msg_size != req_size) | ||
1859 | { | ||
1860 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1861 | "%u + %u + %u != %u\n", | ||
1862 | (unsigned int) sizeof (*req), | ||
1863 | relay_size, | ||
1864 | join_msg_size, | ||
1865 | req_size); | ||
1866 | GNUNET_break (0); | ||
1867 | GNUNET_SERVICE_client_drop (client); | ||
1868 | GNUNET_free (slv); | ||
1869 | return; | ||
1870 | } | ||
1871 | if (0 < slv->relay_count) | ||
1872 | { | ||
1873 | slv->relays = GNUNET_malloc (relay_size); | ||
1874 | GNUNET_memcpy (slv->relays, &req[1], relay_size); | ||
1875 | } | ||
1876 | |||
1877 | chn = c->channel = &slv->channel; | ||
1878 | chn->slave = slv; | ||
1879 | chn->is_master = GNUNET_NO; | ||
1880 | chn->pub_key = req->channel_pub_key; | ||
1881 | chn->pub_key_hash = pub_key_hash; | ||
1882 | channel_init (chn); | ||
1883 | |||
1884 | if (NULL == chn_slv) | ||
1885 | { | ||
1886 | chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
1887 | GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv, | ||
1888 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
1889 | } | ||
1890 | GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn, | ||
1891 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
1892 | GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, | ||
1893 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1894 | chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, | ||
1895 | &store_recv_slave_counters, slv); | ||
1896 | } | ||
1897 | else | ||
1898 | { | ||
1899 | chn = &slv->channel; | ||
1900 | |||
1901 | struct GNUNET_PSYC_CountersResultMessage *res; | ||
1902 | |||
1903 | struct GNUNET_MQ_Envelope * | ||
1904 | env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | ||
1905 | res->result_code = htonl (GNUNET_OK); | ||
1906 | res->max_message_id = GNUNET_htonll (chn->max_message_id); | ||
1907 | |||
1908 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
1909 | |||
1910 | if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags) | ||
1911 | { | ||
1912 | mcast_recv_join_decision (slv, GNUNET_YES, | ||
1913 | NULL, 0, NULL, NULL); | ||
1914 | } | ||
1915 | else if (NULL == slv->member) | ||
1916 | { | ||
1917 | slv->member | ||
1918 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, | ||
1919 | &slv->origin, | ||
1920 | slv->relay_count, slv->relays, | ||
1921 | &slv->join_msg->header, | ||
1922 | &mcast_recv_join_request, | ||
1923 | &mcast_recv_join_decision, | ||
1924 | &mcast_recv_replay_fragment, | ||
1925 | &mcast_recv_replay_message, | ||
1926 | &mcast_recv_message, chn); | ||
1927 | if (NULL != slv->join_msg) | ||
1928 | { | ||
1929 | GNUNET_free (slv->join_msg); | ||
1930 | slv->join_msg = NULL; | ||
1931 | } | ||
1932 | } | ||
1933 | else if (NULL != slv->join_dcsn) | ||
1934 | { | ||
1935 | struct GNUNET_MQ_Envelope * | ||
1936 | env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header); | ||
1937 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
1938 | } | ||
1939 | } | ||
1940 | |||
1941 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1942 | "Client %p connected as slave to channel %s.\n", | ||
1943 | client, | ||
1944 | GNUNET_h2s (&chn->pub_key_hash)); | ||
1945 | |||
1946 | struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); | ||
1947 | cli->client = client; | ||
1948 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); | ||
1949 | |||
1950 | GNUNET_SERVICE_client_continue (client); | ||
1951 | } | ||
1952 | |||
1953 | |||
1954 | struct JoinDecisionClosure | ||
1955 | { | ||
1956 | int32_t is_admitted; | ||
1957 | struct GNUNET_MessageHeader *msg; | ||
1958 | }; | ||
1959 | |||
1960 | |||
1961 | /** | ||
1962 | * Iterator callback for sending join decisions to multicast. | ||
1963 | */ | ||
1964 | static int | ||
1965 | mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, | ||
1966 | void *value) | ||
1967 | { | ||
1968 | struct JoinDecisionClosure *jcls = cls; | ||
1969 | struct GNUNET_MULTICAST_JoinHandle *jh = value; | ||
1970 | // FIXME: add relays | ||
1971 | GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); | ||
1972 | return GNUNET_YES; | ||
1973 | } | ||
1974 | |||
1975 | |||
1976 | static int | ||
1977 | check_client_join_decision (void *cls, | ||
1978 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) | ||
1979 | { | ||
1980 | return GNUNET_OK; | ||
1981 | } | ||
1982 | |||
1983 | |||
1984 | /** | ||
1985 | * Join decision from client. | ||
1986 | */ | ||
1987 | static void | ||
1988 | handle_client_join_decision (void *cls, | ||
1989 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) | ||
1990 | { | ||
1991 | struct Client *c = cls; | ||
1992 | struct GNUNET_SERVICE_Client *client = c->client; | ||
1993 | struct Channel *chn = c->channel; | ||
1994 | if (NULL == chn) | ||
1995 | { | ||
1996 | GNUNET_break (0); | ||
1997 | GNUNET_SERVICE_client_drop (client); | ||
1998 | return; | ||
1999 | } | ||
2000 | GNUNET_assert (GNUNET_YES == chn->is_master); | ||
2001 | struct Master *mst = chn->master; | ||
2002 | |||
2003 | struct JoinDecisionClosure jcls; | ||
2004 | jcls.is_admitted = ntohl (dcsn->is_admitted); | ||
2005 | jcls.msg | ||
2006 | = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size)) | ||
2007 | ? (struct GNUNET_MessageHeader *) &dcsn[1] | ||
2008 | : NULL; | ||
2009 | |||
2010 | struct GNUNET_HashCode slave_pub_hash; | ||
2011 | GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key), | ||
2012 | &slave_pub_hash); | ||
2013 | |||
2014 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2015 | "%p Got join decision (%d) from client for channel %s..\n", | ||
2016 | mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); | ||
2017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2018 | "%p ..and slave %s.\n", | ||
2019 | mst, GNUNET_h2s (&slave_pub_hash)); | ||
2020 | |||
2021 | GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash, | ||
2022 | &mcast_send_join_decision, &jcls); | ||
2023 | GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash); | ||
2024 | GNUNET_SERVICE_client_continue (client); | ||
2025 | } | ||
2026 | |||
2027 | |||
2028 | static void | ||
2029 | channel_part_cb (void *cls) | ||
2030 | { | ||
2031 | struct GNUNET_SERVICE_Client *client = cls; | ||
2032 | struct GNUNET_MQ_Envelope *env; | ||
2033 | |||
2034 | env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK); | ||
2035 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), | ||
2036 | env); | ||
2037 | } | ||
2038 | |||
2039 | |||
2040 | static void | ||
2041 | handle_client_part_request (void *cls, | ||
2042 | const struct GNUNET_MessageHeader *msg) | ||
2043 | { | ||
2044 | struct Client *c = cls; | ||
2045 | |||
2046 | c->channel->is_disconnecting = GNUNET_YES; | ||
2047 | if (GNUNET_YES == c->channel->is_master) | ||
2048 | { | ||
2049 | struct Master *mst = (struct Master *) c->channel; | ||
2050 | |||
2051 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2052 | "Got part request from master %p\n", | ||
2053 | mst); | ||
2054 | GNUNET_assert (NULL != mst->origin); | ||
2055 | GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client); | ||
2056 | } | ||
2057 | else | ||
2058 | { | ||
2059 | struct Slave *slv = (struct Slave *) c->channel; | ||
2060 | |||
2061 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2062 | "Got part request from slave %p\n", | ||
2063 | slv); | ||
2064 | GNUNET_assert (NULL != slv->member); | ||
2065 | GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client); | ||
2066 | } | ||
2067 | GNUNET_SERVICE_client_continue (c->client); | ||
2068 | } | ||
2069 | |||
2070 | |||
2071 | /** | ||
2072 | * Send acknowledgement to a client. | ||
2073 | * | ||
2074 | * Sent after a message fragment has been passed on to multicast. | ||
2075 | * | ||
2076 | * @param chn The channel struct for the client. | ||
2077 | */ | ||
2078 | static void | ||
2079 | send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client) | ||
2080 | { | ||
2081 | struct GNUNET_MessageHeader *res; | ||
2082 | struct GNUNET_MQ_Envelope * | ||
2083 | env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); | ||
2084 | |||
2085 | /* FIXME? */ | ||
2086 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
2087 | } | ||
2088 | |||
2089 | |||
2090 | /** | ||
2091 | * Callback for the transmit functions of multicast. | ||
2092 | */ | ||
2093 | static int | ||
2094 | transmit_notify (void *cls, size_t *data_size, void *data) | ||
2095 | { | ||
2096 | struct Channel *chn = cls; | ||
2097 | struct TransmitMessage *tmit_msg = chn->tmit_head; | ||
2098 | |||
2099 | if (NULL == tmit_msg || *data_size < tmit_msg->size) | ||
2100 | { | ||
2101 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2102 | "%p transmit_notify: nothing to send.\n", chn); | ||
2103 | if (NULL != tmit_msg && *data_size < tmit_msg->size) | ||
2104 | GNUNET_break (0); | ||
2105 | *data_size = 0; | ||
2106 | return GNUNET_NO; | ||
2107 | } | ||
2108 | |||
2109 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2110 | "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size); | ||
2111 | |||
2112 | *data_size = tmit_msg->size; | ||
2113 | GNUNET_memcpy (data, &tmit_msg[1], *data_size); | ||
2114 | |||
2115 | int ret | ||
2116 | = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
2117 | ? GNUNET_NO | ||
2118 | : GNUNET_YES; | ||
2119 | |||
2120 | /* FIXME: handle disconnecting clients */ | ||
2121 | if (NULL != tmit_msg->client) | ||
2122 | send_message_ack (chn, tmit_msg->client); | ||
2123 | |||
2124 | GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); | ||
2125 | |||
2126 | if (NULL != chn->tmit_head) | ||
2127 | { | ||
2128 | GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn); | ||
2129 | } | ||
2130 | else if (GNUNET_YES == chn->is_disconnecting | ||
2131 | && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
2132 | { | ||
2133 | /* FIXME: handle partial message (when still in_transmit) */ | ||
2134 | GNUNET_free (tmit_msg); | ||
2135 | return GNUNET_SYSERR; | ||
2136 | } | ||
2137 | GNUNET_free (tmit_msg); | ||
2138 | return ret; | ||
2139 | } | ||
2140 | |||
2141 | |||
2142 | /** | ||
2143 | * Callback for the transmit functions of multicast. | ||
2144 | */ | ||
2145 | static int | ||
2146 | master_transmit_notify (void *cls, size_t *data_size, void *data) | ||
2147 | { | ||
2148 | int ret = transmit_notify (cls, data_size, data); | ||
2149 | |||
2150 | if (GNUNET_YES == ret) | ||
2151 | { | ||
2152 | struct Master *mst = cls; | ||
2153 | mst->tmit_handle = NULL; | ||
2154 | } | ||
2155 | return ret; | ||
2156 | } | ||
2157 | |||
2158 | |||
2159 | /** | ||
2160 | * Callback for the transmit functions of multicast. | ||
2161 | */ | ||
2162 | static int | ||
2163 | slave_transmit_notify (void *cls, size_t *data_size, void *data) | ||
2164 | { | ||
2165 | int ret = transmit_notify (cls, data_size, data); | ||
2166 | |||
2167 | if (GNUNET_YES == ret) | ||
2168 | { | ||
2169 | struct Slave *slv = cls; | ||
2170 | slv->tmit_handle = NULL; | ||
2171 | } | ||
2172 | return ret; | ||
2173 | } | ||
2174 | |||
2175 | |||
2176 | /** | ||
2177 | * Transmit a message from a channel master to the multicast group. | ||
2178 | */ | ||
2179 | static void | ||
2180 | master_transmit_message (struct Master *mst) | ||
2181 | { | ||
2182 | struct Channel *chn = &mst->channel; | ||
2183 | struct TransmitMessage *tmit_msg = chn->tmit_head; | ||
2184 | if (NULL == tmit_msg) | ||
2185 | return; | ||
2186 | if (NULL == mst->tmit_handle) | ||
2187 | { | ||
2188 | mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, | ||
2189 | tmit_msg->id, | ||
2190 | mst->max_group_generation, | ||
2191 | &master_transmit_notify, | ||
2192 | mst); | ||
2193 | } | ||
2194 | else | ||
2195 | { | ||
2196 | GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); | ||
2197 | } | ||
2198 | } | ||
2199 | |||
2200 | |||
2201 | /** | ||
2202 | * Transmit a message from a channel slave to the multicast group. | ||
2203 | */ | ||
2204 | static void | ||
2205 | slave_transmit_message (struct Slave *slv) | ||
2206 | { | ||
2207 | if (NULL == slv->channel.tmit_head) | ||
2208 | return; | ||
2209 | if (NULL == slv->tmit_handle) | ||
2210 | { | ||
2211 | slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, | ||
2212 | slv->channel.tmit_head->id, | ||
2213 | &slave_transmit_notify, | ||
2214 | slv); | ||
2215 | } | ||
2216 | else | ||
2217 | { | ||
2218 | GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); | ||
2219 | } | ||
2220 | } | ||
2221 | |||
2222 | |||
2223 | static void | ||
2224 | transmit_message (struct Channel *chn) | ||
2225 | { | ||
2226 | chn->is_master | ||
2227 | ? master_transmit_message (chn->master) | ||
2228 | : slave_transmit_message (chn->slave); | ||
2229 | } | ||
2230 | |||
2231 | |||
2232 | /** | ||
2233 | * Queue a message from a channel master for sending to the multicast group. | ||
2234 | */ | ||
2235 | static void | ||
2236 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) | ||
2237 | { | ||
2238 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) | ||
2239 | { | ||
2240 | tmit_msg->id = ++mst->max_message_id; | ||
2241 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2242 | "%p master_queue_message: message_id=%" PRIu64 "\n", | ||
2243 | mst, tmit_msg->id); | ||
2244 | struct GNUNET_PSYC_MessageMethod *pmeth | ||
2245 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | ||
2246 | |||
2247 | if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET) | ||
2248 | { | ||
2249 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET); | ||
2250 | } | ||
2251 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) | ||
2252 | { | ||
2253 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2254 | "%p master_queue_message: state_delta=%" PRIu64 "\n", | ||
2255 | mst, tmit_msg->id - mst->max_state_message_id); | ||
2256 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id | ||
2257 | - mst->max_state_message_id); | ||
2258 | mst->max_state_message_id = tmit_msg->id; | ||
2259 | } | ||
2260 | else | ||
2261 | { | ||
2262 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2263 | "%p master_queue_message: state not modified\n", mst); | ||
2264 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | ||
2265 | } | ||
2266 | |||
2267 | if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH) | ||
2268 | { | ||
2269 | /// @todo add state_hash to PSYC header | ||
2270 | } | ||
2271 | } | ||
2272 | } | ||
2273 | |||
2274 | |||
2275 | /** | ||
2276 | * Queue a message from a channel slave for sending to the multicast group. | ||
2277 | */ | ||
2278 | static void | ||
2279 | slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) | ||
2280 | { | ||
2281 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) | ||
2282 | { | ||
2283 | struct GNUNET_PSYC_MessageMethod *pmeth | ||
2284 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | ||
2285 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | ||
2286 | tmit_msg->id = ++slv->max_request_id; | ||
2287 | } | ||
2288 | } | ||
2289 | |||
2290 | |||
2291 | /** | ||
2292 | * Queue PSYC message parts for sending to multicast. | ||
2293 | * | ||
2294 | * @param chn | ||
2295 | * Channel to send to. | ||
2296 | * @param client | ||
2297 | * Client the message originates from. | ||
2298 | * @param data_size | ||
2299 | * Size of @a data. | ||
2300 | * @param data | ||
2301 | * Concatenated message parts. | ||
2302 | * @param first_ptype | ||
2303 | * First message part type in @a data. | ||
2304 | * @param last_ptype | ||
2305 | * Last message part type in @a data. | ||
2306 | */ | ||
2307 | static struct TransmitMessage * | ||
2308 | queue_message (struct Channel *chn, | ||
2309 | struct GNUNET_SERVICE_Client *client, | ||
2310 | size_t data_size, | ||
2311 | const void *data, | ||
2312 | uint16_t first_ptype, uint16_t last_ptype) | ||
2313 | { | ||
2314 | struct TransmitMessage * | ||
2315 | tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size); | ||
2316 | GNUNET_memcpy (&tmit_msg[1], data, data_size); | ||
2317 | tmit_msg->client = client; | ||
2318 | tmit_msg->size = data_size; | ||
2319 | tmit_msg->first_ptype = first_ptype; | ||
2320 | tmit_msg->last_ptype = last_ptype; | ||
2321 | |||
2322 | /* FIXME: separate queue per message ID */ | ||
2323 | |||
2324 | GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); | ||
2325 | |||
2326 | chn->is_master | ||
2327 | ? master_queue_message (chn->master, tmit_msg) | ||
2328 | : slave_queue_message (chn->slave, tmit_msg); | ||
2329 | return tmit_msg; | ||
2330 | } | ||
2331 | |||
2332 | |||
2333 | /** | ||
2334 | * Cancel transmission of current message. | ||
2335 | * | ||
2336 | * @param chn Channel to send to. | ||
2337 | * @param client Client the message originates from. | ||
2338 | */ | ||
2339 | static void | ||
2340 | transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client) | ||
2341 | { | ||
2342 | uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; | ||
2343 | |||
2344 | struct GNUNET_MessageHeader msg; | ||
2345 | msg.size = htons (sizeof (msg)); | ||
2346 | msg.type = htons (type); | ||
2347 | |||
2348 | queue_message (chn, client, sizeof (msg), &msg, type, type); | ||
2349 | transmit_message (chn); | ||
2350 | |||
2351 | /* FIXME: cleanup */ | ||
2352 | } | ||
2353 | |||
2354 | |||
2355 | static int | ||
2356 | check_client_psyc_message (void *cls, | ||
2357 | const struct GNUNET_MessageHeader *msg) | ||
2358 | { | ||
2359 | return GNUNET_OK; | ||
2360 | } | ||
2361 | |||
2362 | |||
2363 | /** | ||
2364 | * Incoming message from a master or slave client. | ||
2365 | */ | ||
2366 | static void | ||
2367 | handle_client_psyc_message (void *cls, | ||
2368 | const struct GNUNET_MessageHeader *msg) | ||
2369 | { | ||
2370 | struct Client *c = cls; | ||
2371 | struct GNUNET_SERVICE_Client *client = c->client; | ||
2372 | struct Channel *chn = c->channel; | ||
2373 | if (NULL == chn) | ||
2374 | { | ||
2375 | GNUNET_break (0); | ||
2376 | GNUNET_SERVICE_client_drop (client); | ||
2377 | return; | ||
2378 | } | ||
2379 | |||
2380 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2381 | "%p Received message from client.\n", chn); | ||
2382 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); | ||
2383 | |||
2384 | if (GNUNET_YES != chn->is_ready) | ||
2385 | { | ||
2386 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
2387 | "%p Channel is not ready yet, disconnecting client %p.\n", | ||
2388 | chn, | ||
2389 | client); | ||
2390 | GNUNET_break (0); | ||
2391 | GNUNET_SERVICE_client_drop (client); | ||
2392 | return; | ||
2393 | } | ||
2394 | |||
2395 | uint16_t size = ntohs (msg->size); | ||
2396 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) | ||
2397 | { | ||
2398 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2399 | "%p Message payload too large: %u < %u.\n", | ||
2400 | chn, | ||
2401 | (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
2402 | (unsigned int) (size - sizeof (*msg))); | ||
2403 | GNUNET_break (0); | ||
2404 | transmit_cancel (chn, client); | ||
2405 | GNUNET_SERVICE_client_drop (client); | ||
2406 | return; | ||
2407 | } | ||
2408 | |||
2409 | uint16_t first_ptype = 0, last_ptype = 0; | ||
2410 | if (GNUNET_SYSERR | ||
2411 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg), | ||
2412 | (const char *) &msg[1], | ||
2413 | &first_ptype, &last_ptype)) | ||
2414 | { | ||
2415 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2416 | "%p Received invalid message part from client.\n", chn); | ||
2417 | GNUNET_break (0); | ||
2418 | transmit_cancel (chn, client); | ||
2419 | GNUNET_SERVICE_client_drop (client); | ||
2420 | return; | ||
2421 | } | ||
2422 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2423 | "%p Received message with first part type %u and last part type %u.\n", | ||
2424 | chn, first_ptype, last_ptype); | ||
2425 | |||
2426 | queue_message (chn, client, size - sizeof (*msg), &msg[1], | ||
2427 | first_ptype, last_ptype); | ||
2428 | transmit_message (chn); | ||
2429 | /* FIXME: send a few ACKs even before transmit_notify is called */ | ||
2430 | |||
2431 | GNUNET_SERVICE_client_continue (client); | ||
2432 | }; | ||
2433 | |||
2434 | |||
2435 | /** | ||
2436 | * Received result of GNUNET_PSYCSTORE_membership_store() | ||
2437 | */ | ||
2438 | static void | ||
2439 | store_recv_membership_store_result (void *cls, | ||
2440 | int64_t result, | ||
2441 | const char *err_msg, | ||
2442 | uint16_t err_msg_size) | ||
2443 | { | ||
2444 | struct Operation *op = cls; | ||
2445 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2446 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n", | ||
2447 | op->channel, | ||
2448 | result, | ||
2449 | (int) err_msg_size, | ||
2450 | err_msg); | ||
2451 | |||
2452 | if (NULL != op->client) | ||
2453 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | ||
2454 | op_remove (op); | ||
2455 | } | ||
2456 | |||
2457 | |||
2458 | /** | ||
2459 | * Client requests to add/remove a slave in the membership database. | ||
2460 | */ | ||
2461 | static void | ||
2462 | handle_client_membership_store (void *cls, | ||
2463 | const struct ChannelMembershipStoreRequest *req) | ||
2464 | { | ||
2465 | struct Client *c = cls; | ||
2466 | struct GNUNET_SERVICE_Client *client = c->client; | ||
2467 | struct Channel *chn = c->channel; | ||
2468 | if (NULL == chn) | ||
2469 | { | ||
2470 | GNUNET_break (0); | ||
2471 | GNUNET_SERVICE_client_drop (client); | ||
2472 | return; | ||
2473 | } | ||
2474 | |||
2475 | struct Operation *op = op_add (chn, client, req->op_id, 0); | ||
2476 | |||
2477 | uint64_t announced_at = GNUNET_ntohll (req->announced_at); | ||
2478 | uint64_t effective_since = GNUNET_ntohll (req->effective_since); | ||
2479 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2480 | "%p Received membership store request from client.\n", chn); | ||
2481 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2482 | "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n", | ||
2483 | chn, req->did_join, announced_at, effective_since); | ||
2484 | |||
2485 | GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key, | ||
2486 | req->did_join, announced_at, effective_since, | ||
2487 | 0, /* FIXME: group_generation */ | ||
2488 | &store_recv_membership_store_result, op); | ||
2489 | GNUNET_SERVICE_client_continue (client); | ||
2490 | } | ||
2491 | |||
2492 | |||
2493 | /** | ||
2494 | * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), | ||
2495 | * in response to a history request from a client. | ||
2496 | */ | ||
2497 | static int | ||
2498 | store_recv_fragment_history (void *cls, | ||
2499 | struct GNUNET_MULTICAST_MessageHeader *mmsg, | ||
2500 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
2501 | { | ||
2502 | struct Operation *op = cls; | ||
2503 | if (NULL == op->client) | ||
2504 | { /* Requesting client already disconnected. */ | ||
2505 | return GNUNET_NO; | ||
2506 | } | ||
2507 | struct Channel *chn = op->channel; | ||
2508 | |||
2509 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
2510 | uint16_t msize = ntohs (mmsg->header.size); | ||
2511 | uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); | ||
2512 | |||
2513 | struct GNUNET_OperationResultMessage * | ||
2514 | res = GNUNET_malloc (sizeof (*res) + psize); | ||
2515 | res->header.size = htons (sizeof (*res) + psize); | ||
2516 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); | ||
2517 | res->op_id = op->op_id; | ||
2518 | res->result_code = GNUNET_htonll (GNUNET_OK); | ||
2519 | |||
2520 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | ||
2521 | GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); | ||
2522 | GNUNET_memcpy (&res[1], pmsg, psize); | ||
2523 | |||
2524 | /** @todo FIXME: send only to requesting client */ | ||
2525 | client_send_msg (chn, &res->header); | ||
2526 | |||
2527 | GNUNET_free (res); | ||
2528 | return GNUNET_YES; | ||
2529 | } | ||
2530 | |||
2531 | |||
2532 | /** | ||
2533 | * Received the result of GNUNET_PSYCSTORE_fragment_get(), | ||
2534 | * in response to a history request from a client. | ||
2535 | */ | ||
2536 | static void | ||
2537 | store_recv_fragment_history_result (void *cls, int64_t result, | ||
2538 | const char *err_msg, uint16_t err_msg_size) | ||
2539 | { | ||
2540 | struct Operation *op = cls; | ||
2541 | if (NULL == op->client) | ||
2542 | { /* Requesting client already disconnected. */ | ||
2543 | return; | ||
2544 | } | ||
2545 | |||
2546 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2547 | "%p History replay #%" PRIu64 ": " | ||
2548 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", | ||
2549 | op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); | ||
2550 | |||
2551 | if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) | ||
2552 | { | ||
2553 | /** @todo Multicast replay request for messages not found locally. */ | ||
2554 | } | ||
2555 | |||
2556 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | ||
2557 | op_remove (op); | ||
2558 | } | ||
2559 | |||
2560 | |||
2561 | static int | ||
2562 | check_client_history_replay (void *cls, | ||
2563 | const struct GNUNET_PSYC_HistoryRequestMessage *req) | ||
2564 | { | ||
2565 | return GNUNET_OK; | ||
2566 | } | ||
2567 | |||
2568 | |||
2569 | /** | ||
2570 | * Client requests channel history. | ||
2571 | */ | ||
2572 | static void | ||
2573 | handle_client_history_replay (void *cls, | ||
2574 | const struct GNUNET_PSYC_HistoryRequestMessage *req) | ||
2575 | { | ||
2576 | struct Client *c = cls; | ||
2577 | struct GNUNET_SERVICE_Client *client = c->client; | ||
2578 | struct Channel *chn = c->channel; | ||
2579 | if (NULL == chn) | ||
2580 | { | ||
2581 | GNUNET_break (0); | ||
2582 | GNUNET_SERVICE_client_drop (client); | ||
2583 | return; | ||
2584 | } | ||
2585 | |||
2586 | uint16_t size = ntohs (req->header.size); | ||
2587 | const char *method_prefix = (const char *) &req[1]; | ||
2588 | |||
2589 | if (size < sizeof (*req) + 1 | ||
2590 | || '\0' != method_prefix[size - sizeof (*req) - 1]) | ||
2591 | { | ||
2592 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2593 | "%p History replay #%" PRIu64 ": " | ||
2594 | "invalid method prefix. size: %u < %u?\n", | ||
2595 | chn, | ||
2596 | GNUNET_ntohll (req->op_id), | ||
2597 | size, | ||
2598 | (unsigned int) sizeof (*req) + 1); | ||
2599 | GNUNET_break (0); | ||
2600 | GNUNET_SERVICE_client_drop (client); | ||
2601 | return; | ||
2602 | } | ||
2603 | |||
2604 | struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); | ||
2605 | |||
2606 | if (0 == req->message_limit) | ||
2607 | { | ||
2608 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, | ||
2609 | GNUNET_ntohll (req->start_message_id), | ||
2610 | GNUNET_ntohll (req->end_message_id), | ||
2611 | 0, method_prefix, | ||
2612 | &store_recv_fragment_history, | ||
2613 | &store_recv_fragment_history_result, op); | ||
2614 | } | ||
2615 | else | ||
2616 | { | ||
2617 | GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, | ||
2618 | GNUNET_ntohll (req->message_limit), | ||
2619 | method_prefix, | ||
2620 | &store_recv_fragment_history, | ||
2621 | &store_recv_fragment_history_result, | ||
2622 | op); | ||
2623 | } | ||
2624 | GNUNET_SERVICE_client_continue (client); | ||
2625 | } | ||
2626 | |||
2627 | |||
2628 | /** | ||
2629 | * Received state var from PSYCstore, send it to client. | ||
2630 | */ | ||
2631 | static int | ||
2632 | store_recv_state_var (void *cls, const char *name, | ||
2633 | const void *value, uint32_t value_size) | ||
2634 | { | ||
2635 | struct Operation *op = cls; | ||
2636 | struct GNUNET_OperationResultMessage *res; | ||
2637 | struct GNUNET_MQ_Envelope *env; | ||
2638 | |||
2639 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2640 | "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", | ||
2641 | op->channel, GNUNET_ntohll (op->op_id), name); | ||
2642 | |||
2643 | if (NULL != name) /* First part */ | ||
2644 | { | ||
2645 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; | ||
2646 | struct GNUNET_PSYC_MessageModifier *mod; | ||
2647 | env = GNUNET_MQ_msg_extra (res, | ||
2648 | sizeof (*mod) + name_size + value_size, | ||
2649 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | ||
2650 | res->op_id = op->op_id; | ||
2651 | |||
2652 | mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; | ||
2653 | mod->header.size = htons (sizeof (*mod) + name_size + value_size); | ||
2654 | mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
2655 | mod->name_size = htons (name_size); | ||
2656 | mod->value_size = htonl (value_size); | ||
2657 | mod->oper = htons (GNUNET_PSYC_OP_ASSIGN); | ||
2658 | GNUNET_memcpy (&mod[1], name, name_size); | ||
2659 | GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size); | ||
2660 | } | ||
2661 | else /* Continuation */ | ||
2662 | { | ||
2663 | struct GNUNET_MessageHeader *mod; | ||
2664 | env = GNUNET_MQ_msg_extra (res, | ||
2665 | sizeof (*mod) + value_size, | ||
2666 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | ||
2667 | res->op_id = op->op_id; | ||
2668 | |||
2669 | mod = (struct GNUNET_MessageHeader *) &res[1]; | ||
2670 | mod->size = htons (sizeof (*mod) + value_size); | ||
2671 | mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
2672 | GNUNET_memcpy (&mod[1], value, value_size); | ||
2673 | } | ||
2674 | |||
2675 | // FIXME: client might have been disconnected | ||
2676 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env); | ||
2677 | return GNUNET_YES; | ||
2678 | } | ||
2679 | |||
2680 | |||
2681 | /** | ||
2682 | * Received result of GNUNET_PSYCSTORE_state_get() | ||
2683 | * or GNUNET_PSYCSTORE_state_get_prefix() | ||
2684 | */ | ||
2685 | static void | ||
2686 | store_recv_state_result (void *cls, int64_t result, | ||
2687 | const char *err_msg, uint16_t err_msg_size) | ||
2688 | { | ||
2689 | struct Operation *op = cls; | ||
2690 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2691 | "%p state_get #%" PRIu64 ": " | ||
2692 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", | ||
2693 | op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); | ||
2694 | |||
2695 | // FIXME: client might have been disconnected | ||
2696 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | ||
2697 | op_remove (op); | ||
2698 | } | ||
2699 | |||
2700 | |||
2701 | static int | ||
2702 | check_client_state_get (void *cls, | ||
2703 | const struct StateRequest *req) | ||
2704 | { | ||
2705 | struct Client *c = cls; | ||
2706 | struct Channel *chn = c->channel; | ||
2707 | if (NULL == chn) | ||
2708 | { | ||
2709 | GNUNET_break (0); | ||
2710 | return GNUNET_SYSERR; | ||
2711 | } | ||
2712 | |||
2713 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
2714 | const char *name = (const char *) &req[1]; | ||
2715 | if (0 == name_size || '\0' != name[name_size - 1]) | ||
2716 | { | ||
2717 | GNUNET_break (0); | ||
2718 | return GNUNET_SYSERR; | ||
2719 | } | ||
2720 | |||
2721 | return GNUNET_OK; | ||
2722 | } | ||
2723 | |||
2724 | |||
2725 | /** | ||
2726 | * Client requests best matching state variable from PSYCstore. | ||
2727 | */ | ||
2728 | static void | ||
2729 | handle_client_state_get (void *cls, | ||
2730 | const struct StateRequest *req) | ||
2731 | { | ||
2732 | struct Client *c = cls; | ||
2733 | struct GNUNET_SERVICE_Client *client = c->client; | ||
2734 | struct Channel *chn = c->channel; | ||
2735 | |||
2736 | const char *name = (const char *) &req[1]; | ||
2737 | struct Operation *op = op_add (chn, client, req->op_id, 0); | ||
2738 | GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, | ||
2739 | &store_recv_state_var, | ||
2740 | &store_recv_state_result, op); | ||
2741 | GNUNET_SERVICE_client_continue (client); | ||
2742 | } | ||
2743 | |||
2744 | |||
2745 | static int | ||
2746 | check_client_state_get_prefix (void *cls, | ||
2747 | const struct StateRequest *req) | ||
2748 | { | ||
2749 | struct Client *c = cls; | ||
2750 | struct Channel *chn = c->channel; | ||
2751 | if (NULL == chn) | ||
2752 | { | ||
2753 | GNUNET_break (0); | ||
2754 | return GNUNET_SYSERR; | ||
2755 | } | ||
2756 | |||
2757 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
2758 | const char *name = (const char *) &req[1]; | ||
2759 | if (0 == name_size || '\0' != name[name_size - 1]) | ||
2760 | { | ||
2761 | GNUNET_break (0); | ||
2762 | return GNUNET_SYSERR; | ||
2763 | } | ||
2764 | |||
2765 | return GNUNET_OK; | ||
2766 | } | ||
2767 | |||
2768 | |||
2769 | /** | ||
2770 | * Client requests state variables with a given prefix from PSYCstore. | ||
2771 | */ | ||
2772 | static void | ||
2773 | handle_client_state_get_prefix (void *cls, | ||
2774 | const struct StateRequest *req) | ||
2775 | { | ||
2776 | struct Client *c = cls; | ||
2777 | struct GNUNET_SERVICE_Client *client = c->client; | ||
2778 | struct Channel *chn = c->channel; | ||
2779 | |||
2780 | const char *name = (const char *) &req[1]; | ||
2781 | struct Operation *op = op_add (chn, client, req->op_id, 0); | ||
2782 | GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, | ||
2783 | &store_recv_state_var, | ||
2784 | &store_recv_state_result, op); | ||
2785 | GNUNET_SERVICE_client_continue (client); | ||
2786 | } | ||
2787 | |||
2788 | |||
2789 | /** | ||
2790 | * Initialize the PSYC service. | ||
2791 | * | ||
2792 | * @param cls Closure. | ||
2793 | * @param server The initialized server. | ||
2794 | * @param c Configuration to use. | ||
2795 | */ | ||
2796 | static void | ||
2797 | run (void *cls, | ||
2798 | const struct GNUNET_CONFIGURATION_Handle *c, | ||
2799 | struct GNUNET_SERVICE_Handle *svc) | ||
2800 | { | ||
2801 | cfg = c; | ||
2802 | service = svc; | ||
2803 | store = GNUNET_PSYCSTORE_connect (cfg); | ||
2804 | stats = GNUNET_STATISTICS_create ("psyc", cfg); | ||
2805 | masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
2806 | slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
2807 | channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2808 | recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
2809 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); | ||
2810 | } | ||
2811 | |||
2812 | |||
2813 | /** | ||
2814 | * Define "main" method using service macro. | ||
2815 | */ | ||
2816 | GNUNET_SERVICE_MAIN | ||
2817 | ("psyc", | ||
2818 | GNUNET_SERVICE_OPTION_NONE, | ||
2819 | &run, | ||
2820 | &client_notify_connect, | ||
2821 | &client_notify_disconnect, | ||
2822 | NULL, | ||
2823 | GNUNET_MQ_hd_fixed_size (client_master_start, | ||
2824 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, | ||
2825 | struct MasterStartRequest, | ||
2826 | NULL), | ||
2827 | GNUNET_MQ_hd_var_size (client_slave_join, | ||
2828 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, | ||
2829 | struct SlaveJoinRequest, | ||
2830 | NULL), | ||
2831 | GNUNET_MQ_hd_var_size (client_join_decision, | ||
2832 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | ||
2833 | struct GNUNET_PSYC_JoinDecisionMessage, | ||
2834 | NULL), | ||
2835 | GNUNET_MQ_hd_fixed_size (client_part_request, | ||
2836 | GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST, | ||
2837 | struct GNUNET_MessageHeader, | ||
2838 | NULL), | ||
2839 | GNUNET_MQ_hd_var_size (client_psyc_message, | ||
2840 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
2841 | struct GNUNET_MessageHeader, | ||
2842 | NULL), | ||
2843 | GNUNET_MQ_hd_fixed_size (client_membership_store, | ||
2844 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, | ||
2845 | struct ChannelMembershipStoreRequest, | ||
2846 | NULL), | ||
2847 | GNUNET_MQ_hd_var_size (client_history_replay, | ||
2848 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, | ||
2849 | struct GNUNET_PSYC_HistoryRequestMessage, | ||
2850 | NULL), | ||
2851 | GNUNET_MQ_hd_var_size (client_state_get, | ||
2852 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, | ||
2853 | struct StateRequest, | ||
2854 | NULL), | ||
2855 | GNUNET_MQ_hd_var_size (client_state_get_prefix, | ||
2856 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, | ||
2857 | struct StateRequest, | ||
2858 | NULL)); | ||
2859 | |||
2860 | /* end of gnunet-service-psyc.c */ | ||