aboutsummaryrefslogtreecommitdiff
path: root/src/multicast/multicast_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r--src/multicast/multicast_api.c1399
1 files changed, 1399 insertions, 0 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
new file mode 100644
index 0000000..e5e8302
--- /dev/null
+++ b/src/multicast/multicast_api.c
@@ -0,0 +1,1399 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19*/
20
21/**
22 * @file multicast/multicast_api.c
23 * @brief Multicast service; implements multicast groups using CADET connections.
24 * @author Christian Grothoff
25 * @author Gabor X Toth
26 */
27
28#include "platform.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_multicast_service.h"
31#include "multicast.h"
32
33#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
34
35
36/**
37 * Handle for a request to send a message to all multicast group members
38 * (from the origin).
39 */
40struct GNUNET_MULTICAST_OriginTransmitHandle
41{
42 GNUNET_MULTICAST_OriginTransmitNotify notify;
43 void *notify_cls;
44 struct GNUNET_MULTICAST_Origin *origin;
45
46 uint64_t message_id;
47 uint64_t group_generation;
48 uint64_t fragment_offset;
49};
50
51
52/**
53 * Handle for a message to be delivered from a member to the origin.
54 */
55struct GNUNET_MULTICAST_MemberTransmitHandle
56{
57 GNUNET_MULTICAST_MemberTransmitNotify notify;
58 void *notify_cls;
59 struct GNUNET_MULTICAST_Member *member;
60
61 uint64_t request_id;
62 uint64_t fragment_offset;
63};
64
65
66struct GNUNET_MULTICAST_Group
67{
68 /**
69 * Configuration to use.
70 */
71 const struct GNUNET_CONFIGURATION_Handle *cfg;
72
73 /**
74 * Client connection to the service.
75 */
76 struct GNUNET_MQ_Handle *mq;
77
78 /**
79 * Message to send on connect.
80 */
81 struct GNUNET_MQ_Envelope *connect_env;
82
83 /**
84 * Time to wait until we try to reconnect on failure.
85 */
86 struct GNUNET_TIME_Relative reconnect_delay;
87
88 /**
89 * Task for reconnecting when the listener fails.
90 */
91 struct GNUNET_SCHEDULER_Task *reconnect_task;
92
93 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
94 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
95 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
96 GNUNET_MULTICAST_MessageCallback message_cb;
97 void *cb_cls;
98
99 /**
100 * Function called after disconnected from the service.
101 */
102 GNUNET_ContinuationCallback disconnect_cb;
103
104 /**
105 * Closure for @a disconnect_cb.
106 */
107 void *disconnect_cls;
108
109 /**
110 * Are we currently transmitting a message?
111 */
112 uint8_t in_transmit;
113
114 /**
115 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
116 */
117 uint8_t acks_pending;
118
119 /**
120 * Is this the origin or a member?
121 */
122 uint8_t is_origin;
123
124 /**
125 * Is this channel in the process of disconnecting from the service?
126 * #GNUNET_YES or #GNUNET_NO
127 */
128 uint8_t is_disconnecting;
129};
130
131
132/**
133 * Handle for the origin of a multicast group.
134 */
135struct GNUNET_MULTICAST_Origin
136{
137 struct GNUNET_MULTICAST_Group grp;
138 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
139
140 GNUNET_MULTICAST_RequestCallback request_cb;
141};
142
143
144/**
145 * Handle for a multicast group member.
146 */
147struct GNUNET_MULTICAST_Member
148{
149 struct GNUNET_MULTICAST_Group grp;
150 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
151
152 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
153
154 /**
155 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
156 */
157 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
158
159 uint64_t next_fragment_id;
160};
161
162
163/**
164 * Handle that identifies a join request.
165 *
166 * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
167 * corresponding calls to #GNUNET_MULTICAST_join_decision().
168 */
169struct GNUNET_MULTICAST_JoinHandle
170{
171 struct GNUNET_MULTICAST_Group *group;
172
173 /**
174 * Public key of the member requesting join.
175 */
176 struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
177
178 /**
179 * Peer identity of the member requesting join.
180 */
181 struct GNUNET_PeerIdentity peer;
182};
183
184
185/**
186 * Opaque handle to a replay request from the multicast service.
187 */
188struct GNUNET_MULTICAST_ReplayHandle
189{
190 struct GNUNET_MULTICAST_Group *grp;
191 struct MulticastReplayRequestMessage req;
192};
193
194
195/**
196 * Handle for a replay request.
197 */
198struct GNUNET_MULTICAST_MemberReplayHandle
199{
200};
201
202
203static void
204origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
205
206static void
207member_to_origin (struct GNUNET_MULTICAST_Member *mem);
208
209
210/**
211 * Check join request message.
212 */
213static int
214check_group_join_request (void *cls,
215 const struct MulticastJoinRequestMessage *jreq)
216{
217 uint16_t size = ntohs (jreq->header.size);
218
219 if (sizeof (*jreq) == size)
220 return GNUNET_OK;
221
222 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
223 return GNUNET_OK;
224
225 return GNUNET_SYSERR;
226}
227
228
229/**
230 * Receive join request from service.
231 */
232static void
233handle_group_join_request (void *cls,
234 const struct MulticastJoinRequestMessage *jreq)
235{
236 struct GNUNET_MULTICAST_Group *grp = cls;
237 struct GNUNET_MULTICAST_JoinHandle *jh;
238 const struct GNUNET_MessageHeader *jmsg = NULL;
239
240 if (NULL == grp)
241 {
242 GNUNET_break (0);
243 return;
244 }
245 if (NULL == grp->join_req_cb)
246 return;
247
248 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
249 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
250
251 jh = GNUNET_malloc (sizeof (*jh));
252 jh->group = grp;
253 jh->member_pub_key = jreq->member_pub_key;
254 jh->peer = jreq->peer;
255 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
256
257 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
258}
259
260
261/**
262 * Check multicast message.
263 */
264static int
265check_group_message (void *cls,
266 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
267{
268 return GNUNET_OK;
269}
270
271
272/**
273 * Receive multicast message from service.
274 */
275static void
276handle_group_message (void *cls,
277 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
278{
279 struct GNUNET_MULTICAST_Group *grp = cls;
280
281 if (GNUNET_YES == grp->is_disconnecting)
282 return;
283
284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285 "Calling message callback with a message of size %u.\n",
286 ntohs (mmsg->header.size));
287
288 if (NULL != grp->message_cb)
289 grp->message_cb (grp->cb_cls, mmsg);
290
291 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
292}
293
294
295/**
296 * Receive message/request fragment acknowledgement from service.
297 */
298static void
299handle_group_fragment_ack (void *cls,
300 const struct GNUNET_MessageHeader *msg)
301{
302 struct GNUNET_MULTICAST_Group *grp = cls;
303
304 LOG (GNUNET_ERROR_TYPE_DEBUG,
305 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
306 grp, grp->in_transmit, grp->acks_pending);
307
308 if (0 == grp->acks_pending)
309 {
310 LOG (GNUNET_ERROR_TYPE_DEBUG,
311 "%p Ignoring extraneous fragment ACK.\n", grp);
312 return;
313 }
314 grp->acks_pending--;
315
316 if (GNUNET_YES != grp->in_transmit)
317 return;
318
319 if (GNUNET_YES == grp->is_origin)
320 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
321 else
322 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
323
324 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
325}
326
327
328/**
329 * Check unicast request.
330 */
331static int
332check_origin_request (void *cls,
333 const struct GNUNET_MULTICAST_RequestHeader *req)
334{
335 return GNUNET_OK;
336}
337
338
339/**
340 * Origin receives unicast request from a member.
341 */
342static void
343handle_origin_request (void *cls,
344 const struct GNUNET_MULTICAST_RequestHeader *req)
345{
346 struct GNUNET_MULTICAST_Group *grp;
347 struct GNUNET_MULTICAST_Origin *orig = cls;
348 grp = &orig->grp;
349
350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351 "Calling request callback with a request of size %u.\n",
352 ntohs (req->header.size));
353
354 if (NULL != orig->request_cb)
355 orig->request_cb (grp->cb_cls, req);
356
357 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
358}
359
360
361/**
362 * Receive multicast replay request from service.
363 */
364static void
365handle_group_replay_request (void *cls,
366 const struct MulticastReplayRequestMessage *rep)
367
368{
369 struct GNUNET_MULTICAST_Group *grp = cls;
370
371 if (GNUNET_YES == grp->is_disconnecting)
372 return;
373
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
375
376 if (0 != rep->fragment_id)
377 {
378 if (NULL != grp->replay_frag_cb)
379 {
380 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
381 rh->grp = grp;
382 rh->req = *rep;
383 grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
384 GNUNET_ntohll (rep->fragment_id),
385 GNUNET_ntohll (rep->flags), rh);
386 }
387 }
388 else if (0 != rep->message_id)
389 {
390 if (NULL != grp->replay_msg_cb)
391 {
392 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
393 rh->grp = grp;
394 rh->req = *rep;
395 grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
396 GNUNET_ntohll (rep->message_id),
397 GNUNET_ntohll (rep->fragment_offset),
398 GNUNET_ntohll (rep->flags), rh);
399 }
400 }
401
402 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
403}
404
405
406/**
407 * Check replay response.
408 */
409static int
410check_member_replay_response (void *cls,
411 const struct MulticastReplayResponseMessage *res)
412{
413 uint16_t size = ntohs (res->header.size);
414
415 if (sizeof (*res) == size)
416 return GNUNET_OK;
417
418 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
419 return GNUNET_OK;
420
421 return GNUNET_SYSERR;
422}
423
424
425/**
426 * Receive replay response from service.
427 */
428static void
429handle_member_replay_response (void *cls,
430 const struct MulticastReplayResponseMessage *res)
431{
432 struct GNUNET_MULTICAST_Group *grp;
433 struct GNUNET_MULTICAST_Member *mem = cls;
434 grp = &mem->grp;
435
436 if (GNUNET_YES == grp->is_disconnecting)
437 return;
438
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
440
441 // FIXME: return result
442}
443
444
445/**
446 * Check join decision.
447 */
448static int
449check_member_join_decision (void *cls,
450 const struct MulticastJoinDecisionMessageHeader *hdcsn)
451{
452 return GNUNET_OK; // checked in handle below
453}
454
455
456/**
457 * Member receives join decision.
458 */
459static void
460handle_member_join_decision (void *cls,
461 const struct MulticastJoinDecisionMessageHeader *hdcsn)
462{
463 struct GNUNET_MULTICAST_Group *grp;
464 struct GNUNET_MULTICAST_Member *mem = cls;
465 grp = &mem->grp;
466
467 const struct MulticastJoinDecisionMessage *
468 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
469
470 uint16_t dcsn_size = ntohs (dcsn->header.size);
471 int is_admitted = ntohl (dcsn->is_admitted);
472
473 LOG (GNUNET_ERROR_TYPE_DEBUG,
474 "%p Member got join decision from multicast: %d\n",
475 mem, is_admitted);
476
477 const struct GNUNET_MessageHeader *join_resp = NULL;
478 uint16_t join_resp_size = 0;
479
480 uint16_t relay_count = ntohl (dcsn->relay_count);
481 const struct GNUNET_PeerIdentity *relays = NULL;
482 uint16_t relay_size = relay_count * sizeof (*relays);
483 if (0 < relay_count)
484 {
485 if (dcsn_size < sizeof (*dcsn) + relay_size)
486 {
487 GNUNET_break_op (0);
488 is_admitted = GNUNET_SYSERR;
489 }
490 else
491 {
492 relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
493 }
494 }
495
496 if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
497 {
498 join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
499 join_resp_size = ntohs (join_resp->size);
500 }
501 if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
502 {
503 LOG (GNUNET_ERROR_TYPE_DEBUG,
504 "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
505 dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
506 GNUNET_break_op (0);
507 is_admitted = GNUNET_SYSERR;
508 }
509
510 if (NULL != mem->join_dcsn_cb)
511 mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
512 relay_count, relays, join_resp);
513
514 // FIXME:
515 //if (GNUNET_YES != is_admitted)
516 // GNUNET_MULTICAST_member_part (mem);
517
518 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
519}
520
521
522static void
523group_cleanup (struct GNUNET_MULTICAST_Group *grp)
524{
525 if (NULL != grp->connect_env)
526 {
527 GNUNET_MQ_discard (grp->connect_env);
528 grp->connect_env = NULL;
529 }
530 if (NULL != grp->mq)
531 {
532 GNUNET_MQ_destroy (grp->mq);
533 grp->mq = NULL;
534 }
535 if (NULL != grp->disconnect_cb)
536 {
537 grp->disconnect_cb (grp->disconnect_cls);
538 grp->disconnect_cb = NULL;
539 }
540 GNUNET_free (grp);
541}
542
543
544static void
545handle_group_part_ack (void *cls,
546 const struct GNUNET_MessageHeader *msg)
547{
548 struct GNUNET_MULTICAST_Group *grp = cls;
549
550 group_cleanup (grp);
551}
552
553
554/**
555 * Function to call with the decision made for a join request.
556 *
557 * Must be called once and only once in response to an invocation of the
558 * #GNUNET_MULTICAST_JoinRequestCallback.
559 *
560 * @param join
561 * Join request handle.
562 * @param is_admitted
563 * #GNUNET_YES if the join is approved,
564 * #GNUNET_NO if it is disapproved,
565 * #GNUNET_SYSERR if we cannot answer the request.
566 * @param relay_count
567 * Number of relays given.
568 * @param relays
569 * Array of suggested peers that might be useful relays to use
570 * when joining the multicast group (essentially a list of peers that
571 * are already part of the multicast group and might thus be willing
572 * to help with routing). If empty, only this local peer (which must
573 * be the multicast origin) is a good candidate for building the
574 * multicast tree. Note that it is unnecessary to specify our own
575 * peer identity in this array.
576 * @param join_resp
577 * Message to send in response to the joining peer;
578 * can also be used to redirect the peer to a different group at the
579 * application layer; this response is to be transmitted to the
580 * peer that issued the request even if admission is denied.
581 */
582struct GNUNET_MULTICAST_ReplayHandle *
583GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
584 int is_admitted,
585 uint16_t relay_count,
586 const struct GNUNET_PeerIdentity *relays,
587 const struct GNUNET_MessageHeader *join_resp)
588{
589 struct GNUNET_MULTICAST_Group *grp = join->group;
590 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
591 uint16_t relay_size = relay_count * sizeof (*relays);
592
593 struct MulticastJoinDecisionMessageHeader *hdcsn;
594 struct MulticastJoinDecisionMessage *dcsn;
595 struct GNUNET_MQ_Envelope *
596 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
597 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
598 hdcsn->member_pub_key = join->member_pub_key;
599 hdcsn->peer = join->peer;
600
601 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
602 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
603 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
604 dcsn->is_admitted = htonl (is_admitted);
605 dcsn->relay_count = htonl (relay_count);
606 if (0 < relay_size)
607 GNUNET_memcpy (&dcsn[1], relays, relay_size);
608 if (0 < join_resp_size)
609 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
610
611 GNUNET_MQ_send (grp->mq, env);
612 GNUNET_free (join);
613 return NULL;
614}
615
616
617/**
618 * Replay a message fragment for the multicast group.
619 *
620 * @param rh
621 * Replay handle identifying which replay operation was requested.
622 * @param msg
623 * Replayed message fragment, NULL if not found / an error occurred.
624 * @param ec
625 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
626 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
627 */
628void
629GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
630 const struct GNUNET_MessageHeader *msg,
631 enum GNUNET_MULTICAST_ReplayErrorCode ec)
632{
633 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
634 struct MulticastReplayResponseMessage *res;
635 struct GNUNET_MQ_Envelope *
636 env = GNUNET_MQ_msg_extra (res, msg_size,
637 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
638 res->fragment_id = rh->req.fragment_id;
639 res->message_id = rh->req.message_id;
640 res->fragment_offset = rh->req.fragment_offset;
641 res->flags = rh->req.flags;
642 res->error_code = htonl (ec);
643
644 if (GNUNET_MULTICAST_REC_OK == ec)
645 {
646 GNUNET_assert (NULL != msg);
647 GNUNET_memcpy (&res[1], msg, msg_size);
648 }
649
650 GNUNET_MQ_send (rh->grp->mq, env);
651
652 if (GNUNET_MULTICAST_REC_OK != ec)
653 GNUNET_free (rh);
654}
655
656
657/**
658 * Indicate the end of the replay session.
659 *
660 * Invalidates the replay handle.
661 *
662 * @param rh
663 * Replay session to end.
664 */
665void
666GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
667{
668 struct MulticastReplayResponseMessage *end;
669 struct GNUNET_MQ_Envelope *
670 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
671
672 end->fragment_id = rh->req.fragment_id;
673 end->message_id = rh->req.message_id;
674 end->fragment_offset = rh->req.fragment_offset;
675 end->flags = rh->req.flags;
676
677 GNUNET_MQ_send (rh->grp->mq, env);
678 GNUNET_free (rh);
679}
680
681
682/**
683 * Replay a message for the multicast group.
684 *
685 * @param rh
686 * Replay handle identifying which replay operation was requested.
687 * @param notify
688 * Function to call to get the message.
689 * @param notify_cls
690 * Closure for @a notify.
691 */
692void
693GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
694 GNUNET_MULTICAST_ReplayTransmitNotify notify,
695 void *notify_cls)
696{
697}
698
699
700static void
701origin_connect (struct GNUNET_MULTICAST_Origin *orig);
702
703
704static void
705origin_reconnect (void *cls)
706{
707 origin_connect (cls);
708}
709
710
711/**
712 * Origin client disconnected from service.
713 *
714 * Reconnect after backoff period.
715 */
716static void
717origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
718{
719 struct GNUNET_MULTICAST_Origin *orig = cls;
720 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
721
722 LOG (GNUNET_ERROR_TYPE_DEBUG,
723 "Origin client disconnected (%d), re-connecting\n",
724 (int) error);
725 if (NULL != grp->mq)
726 {
727 GNUNET_MQ_destroy (grp->mq);
728 grp->mq = NULL;
729 }
730
731 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
732 origin_reconnect,
733 orig);
734 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
735}
736
737
738/**
739 * Connect to service as origin.
740 */
741static void
742origin_connect (struct GNUNET_MULTICAST_Origin *orig)
743{
744 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
745
746 struct GNUNET_MQ_MessageHandler handlers[] = {
747 GNUNET_MQ_hd_var_size (group_message,
748 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
749 struct GNUNET_MULTICAST_MessageHeader,
750 grp),
751 GNUNET_MQ_hd_var_size (origin_request,
752 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
753 struct GNUNET_MULTICAST_RequestHeader,
754 orig),
755 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
756 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
757 struct GNUNET_MessageHeader,
758 grp),
759 GNUNET_MQ_hd_var_size (group_join_request,
760 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
761 struct MulticastJoinRequestMessage,
762 grp),
763 GNUNET_MQ_hd_fixed_size (group_part_ack,
764 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
765 struct GNUNET_MessageHeader,
766 grp),
767 GNUNET_MQ_hd_fixed_size (group_replay_request,
768 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
769 struct MulticastReplayRequestMessage,
770 grp),
771 GNUNET_MQ_handler_end ()
772 };
773
774 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
775 handlers, origin_disconnected, orig);
776 GNUNET_assert (NULL != grp->mq);
777 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
778}
779
780
781/**
782 * Start a multicast group.
783 *
784 * Will advertise the origin in the P2P overlay network under the respective
785 * public key so that other peer can find this peer to join it. Peers that
786 * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
787 * either an existing group member or to the origin. If the joining is
788 * approved, the member is cleared for @e replay and will begin to receive
789 * messages transmitted to the group. If joining is disapproved, the failed
790 * candidate will be given a response. Members in the group can send messages
791 * to the origin (one at a time).
792 *
793 * @param cfg
794 * Configuration to use.
795 * @param priv_key
796 * ECC key that will be used to sign messages for this
797 * multicast session; public key is used to identify the multicast group;
798 * @param max_fragment_id
799 * Maximum fragment ID already sent to the group.
800 * 0 for a new group.
801 * @param join_request_cb
802 * Function called to approve / disapprove joining of a peer.
803 * @param replay_frag_cb
804 * Function that can be called to replay a message fragment.
805 * @param replay_msg_cb
806 * Function that can be called to replay a message.
807 * @param request_cb
808 * Function called with message fragments from group members.
809 * @param message_cb
810 * Function called with the message fragments sent to the
811 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
812 * should be stored for answering replay requests later.
813 * @param cls
814 * Closure for the various callbacks that follow.
815 *
816 * @return Handle for the origin, NULL on error.
817 */
818struct GNUNET_MULTICAST_Origin *
819GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
820 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
821 uint64_t max_fragment_id,
822 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
823 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
824 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
825 GNUNET_MULTICAST_RequestCallback request_cb,
826 GNUNET_MULTICAST_MessageCallback message_cb,
827 void *cls)
828{
829 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
830 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
831
832 struct MulticastOriginStartMessage *start;
833 grp->connect_env = GNUNET_MQ_msg (start,
834 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
835 start->max_fragment_id = max_fragment_id;
836 start->group_key = *priv_key;
837
838 grp->cfg = cfg;
839 grp->is_origin = GNUNET_YES;
840 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
841
842 grp->cb_cls = cls;
843 grp->join_req_cb = join_request_cb;
844 grp->replay_frag_cb = replay_frag_cb;
845 grp->replay_msg_cb = replay_msg_cb;
846 grp->message_cb = message_cb;
847
848 orig->request_cb = request_cb;
849
850 origin_connect (orig);
851 return orig;
852}
853
854
855/**
856 * Stop a multicast group.
857 *
858 * @param origin
859 * Multicast group to stop.
860 */
861void
862GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
863 GNUNET_ContinuationCallback stop_cb,
864 void *stop_cls)
865{
866 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
867 struct GNUNET_MQ_Envelope *env;
868
869 grp->is_disconnecting = GNUNET_YES;
870 grp->disconnect_cb = stop_cb;
871 grp->disconnect_cls = stop_cls;
872 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
873 GNUNET_MQ_send (grp->mq, env);
874}
875
876
877static void
878origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
879{
880 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
881 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
882 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
883 GNUNET_assert (GNUNET_YES == grp->in_transmit);
884
885 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
886 struct GNUNET_MULTICAST_MessageHeader *msg;
887 struct GNUNET_MQ_Envelope *
888 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
889 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
890
891 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
892
893 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
894 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
895 {
896 LOG (GNUNET_ERROR_TYPE_ERROR,
897 "%p OriginTransmitNotify() returned error or invalid message size.\n",
898 orig);
899 /* FIXME: handle error */
900 GNUNET_MQ_discard (env);
901 return;
902 }
903
904 if (GNUNET_NO == ret && 0 == buf_size)
905 {
906 LOG (GNUNET_ERROR_TYPE_DEBUG,
907 "%p OriginTransmitNotify() - transmission paused.\n", orig);
908 GNUNET_MQ_discard (env);
909 return; /* Transmission paused. */
910 }
911
912 msg->header.size = htons (sizeof (*msg) + buf_size);
913 msg->message_id = GNUNET_htonll (tmit->message_id);
914 msg->group_generation = tmit->group_generation;
915 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
916 tmit->fragment_offset += sizeof (*msg) + buf_size;
917
918 grp->acks_pending++;
919 GNUNET_MQ_send (grp->mq, env);
920
921 if (GNUNET_YES == ret)
922 grp->in_transmit = GNUNET_NO;
923}
924
925
926/**
927 * Send a message to the multicast group.
928 *
929 * @param orig
930 * Handle to the multicast group.
931 * @param message_id
932 * Application layer ID for the message. Opaque to multicast.
933 * @param group_generation
934 * Group generation of the message.
935 * Documented in struct GNUNET_MULTICAST_MessageHeader.
936 * @param notify
937 * Function to call to get the message.
938 * @param notify_cls
939 * Closure for @a notify.
940 *
941 * @return Message handle on success,
942 * NULL on error (i.e. another request is already pending).
943 */
944struct GNUNET_MULTICAST_OriginTransmitHandle *
945GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
946 uint64_t message_id,
947 uint64_t group_generation,
948 GNUNET_MULTICAST_OriginTransmitNotify notify,
949 void *notify_cls)
950{
951 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
952 if (GNUNET_YES == grp->in_transmit)
953 return NULL;
954 grp->in_transmit = GNUNET_YES;
955
956 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
957 tmit->origin = orig;
958 tmit->message_id = message_id;
959 tmit->fragment_offset = 0;
960 tmit->group_generation = group_generation;
961 tmit->notify = notify;
962 tmit->notify_cls = notify_cls;
963
964 origin_to_all (orig);
965 return tmit;
966}
967
968
969/**
970 * Resume message transmission to multicast group.
971 *
972 * @param th
973 * Transmission to cancel.
974 */
975void
976GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
977{
978 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
979 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
980 return;
981 origin_to_all (th->origin);
982}
983
984
985/**
986 * Cancel request for message transmission to multicast group.
987 *
988 * @param th
989 * Transmission to cancel.
990 */
991void
992GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
993{
994 th->origin->grp.in_transmit = GNUNET_NO;
995}
996
997
998static void
999member_connect (struct GNUNET_MULTICAST_Member *mem);
1000
1001
1002static void
1003member_reconnect (void *cls)
1004{
1005 member_connect (cls);
1006}
1007
1008
1009/**
1010 * Member client disconnected from service.
1011 *
1012 * Reconnect after backoff period.
1013 */
1014static void
1015member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1016{
1017 struct GNUNET_MULTICAST_Member *mem = cls;
1018 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1019
1020 LOG (GNUNET_ERROR_TYPE_DEBUG,
1021 "Member client disconnected (%d), re-connecting\n",
1022 (int) error);
1023 GNUNET_MQ_destroy (grp->mq);
1024 grp->mq = NULL;
1025
1026 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1027 member_reconnect,
1028 mem);
1029 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1030}
1031
1032
1033/**
1034 * Connect to service as member.
1035 */
1036static void
1037member_connect (struct GNUNET_MULTICAST_Member *mem)
1038{
1039 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1040
1041 struct GNUNET_MQ_MessageHandler handlers[] = {
1042 GNUNET_MQ_hd_var_size (group_message,
1043 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1044 struct GNUNET_MULTICAST_MessageHeader,
1045 grp),
1046 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1047 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1048 struct GNUNET_MessageHeader,
1049 grp),
1050 GNUNET_MQ_hd_var_size (group_join_request,
1051 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1052 struct MulticastJoinRequestMessage,
1053 grp),
1054 GNUNET_MQ_hd_var_size (member_join_decision,
1055 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1056 struct MulticastJoinDecisionMessageHeader,
1057 mem),
1058 GNUNET_MQ_hd_fixed_size (group_part_ack,
1059 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
1060 struct GNUNET_MessageHeader,
1061 grp),
1062 GNUNET_MQ_hd_fixed_size (group_replay_request,
1063 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1064 struct MulticastReplayRequestMessage,
1065 grp),
1066 GNUNET_MQ_hd_var_size (member_replay_response,
1067 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1068 struct MulticastReplayResponseMessage,
1069 mem),
1070 GNUNET_MQ_handler_end ()
1071 };
1072
1073 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1074 handlers, member_disconnected, mem);
1075 GNUNET_assert (NULL != grp->mq);
1076 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1077}
1078
1079
1080/**
1081 * Join a multicast group.
1082 *
1083 * The entity joining is always the local peer. Further information about the
1084 * candidate can be provided in the @a join_request message. If the join fails, the
1085 * @a message_cb is invoked with a (failure) response and then with NULL. If
1086 * the join succeeds, outstanding (state) messages and ongoing multicast
1087 * messages will be given to the @a message_cb until the member decides to part
1088 * the group. The @a replay_cb function may be called at any time by the
1089 * multicast service to support relaying messages to other members of the group.
1090 *
1091 * @param cfg
1092 * Configuration to use.
1093 * @param group_key
1094 * ECC public key that identifies the group to join.
1095 * @param member_key
1096 * ECC key that identifies the member
1097 * and used to sign requests sent to the origin.
1098 * @param origin
1099 * Peer ID of the origin to send unicast requsets to. If NULL,
1100 * unicast requests are sent back via multiple hops on the reverse path
1101 * of multicast messages.
1102 * @param relay_count
1103 * Number of peers in the @a relays array.
1104 * @param relays
1105 * Peer identities of members of the group, which serve as relays
1106 * and can be used to join the group at. and send the @a join_request to.
1107 * If empty, the @a join_request is sent directly to the @a origin.
1108 * @param join_msg
1109 * Application-dependent join message to be passed to the peer @a origin.
1110 * @param join_request_cb
1111 * Function called to approve / disapprove joining of a peer.
1112 * @param join_decision_cb
1113 * Function called to inform about the join decision.
1114 * @param replay_frag_cb
1115 * Function that can be called to replay message fragments
1116 * this peer already knows from this group. NULL if this
1117 * client is unable to support replay.
1118 * @param replay_msg_cb
1119 * Function that can be called to replay message fragments
1120 * this peer already knows from this group. NULL if this
1121 * client is unable to support replay.
1122 * @param message_cb
1123 * Function to be called for all message fragments we
1124 * receive from the group, excluding those our @a replay_cb
1125 * already has.
1126 * @param cls
1127 * Closure for callbacks.
1128 *
1129 * @return Handle for the member, NULL on error.
1130 */
1131struct GNUNET_MULTICAST_Member *
1132GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1133 const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1134 const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1135 const struct GNUNET_PeerIdentity *origin,
1136 uint16_t relay_count,
1137 const struct GNUNET_PeerIdentity *relays,
1138 const struct GNUNET_MessageHeader *join_msg,
1139 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1140 GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1141 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1142 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1143 GNUNET_MULTICAST_MessageCallback message_cb,
1144 void *cls)
1145{
1146 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1147 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1148
1149 uint16_t relay_size = relay_count * sizeof (*relays);
1150 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1151 struct MulticastMemberJoinMessage *join;
1152 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1153 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1154 join->group_pub_key = *group_pub_key;
1155 join->member_key = *member_key;
1156 join->origin = *origin;
1157 join->relay_count = ntohl (relay_count);
1158 if (0 < relay_size)
1159 GNUNET_memcpy (&join[1], relays, relay_size);
1160 if (0 < join_msg_size)
1161 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1162
1163 grp->cfg = cfg;
1164 grp->is_origin = GNUNET_NO;
1165 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1166
1167 mem->join_dcsn_cb = join_decision_cb;
1168 grp->join_req_cb = join_request_cb;
1169 grp->replay_frag_cb = replay_frag_cb;
1170 grp->replay_msg_cb = replay_msg_cb;
1171 grp->message_cb = message_cb;
1172 grp->cb_cls = cls;
1173
1174 member_connect (mem);
1175 return mem;
1176}
1177
1178
1179/**
1180 * Part a multicast group.
1181 *
1182 * Disconnects from all group members and invalidates the @a member handle.
1183 *
1184 * An application-dependent part message can be transmitted beforehand using
1185 * #GNUNET_MULTICAST_member_to_origin())
1186 *
1187 * @param member
1188 * Membership handle.
1189 */
1190void
1191GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1192 GNUNET_ContinuationCallback part_cb,
1193 void *part_cls)
1194{
1195 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1196 struct GNUNET_MQ_Envelope *env;
1197
1198 mem->join_dcsn_cb = NULL;
1199 grp->join_req_cb = NULL;
1200 grp->message_cb = NULL;
1201 grp->replay_msg_cb = NULL;
1202 grp->replay_frag_cb = NULL;
1203 grp->is_disconnecting = GNUNET_YES;
1204 grp->disconnect_cb = part_cb;
1205 grp->disconnect_cls = part_cls;
1206 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
1207 GNUNET_MQ_send (grp->mq, env);
1208}
1209
1210
1211void
1212member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1213 uint64_t fragment_id,
1214 uint64_t message_id,
1215 uint64_t fragment_offset,
1216 uint64_t flags)
1217{
1218 struct MulticastReplayRequestMessage *rep;
1219 struct GNUNET_MQ_Envelope *
1220 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1221
1222 rep->fragment_id = GNUNET_htonll (fragment_id);
1223 rep->message_id = GNUNET_htonll (message_id);
1224 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1225 rep->flags = GNUNET_htonll (flags);
1226
1227 GNUNET_MQ_send (mem->grp.mq, env);
1228}
1229
1230
1231/**
1232 * Request a fragment to be replayed by fragment ID.
1233 *
1234 * Useful if messages below the @e max_known_fragment_id given when joining are
1235 * needed and not known to the client.
1236 *
1237 * @param member
1238 * Membership handle.
1239 * @param fragment_id
1240 * ID of a message fragment that this client would like to see replayed.
1241 * @param flags
1242 * Additional flags for the replay request.
1243 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1244 *
1245 * @return Replay request handle.
1246 */
1247struct GNUNET_MULTICAST_MemberReplayHandle *
1248GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1249 uint64_t fragment_id,
1250 uint64_t flags)
1251{
1252 member_replay_request (mem, fragment_id, 0, 0, flags);
1253 // FIXME: return something useful
1254 return NULL;
1255}
1256
1257
1258/**
1259 * Request a message fragment to be replayed.
1260 *
1261 * Useful if messages below the @e max_known_fragment_id given when joining are
1262 * needed and not known to the client.
1263 *
1264 * @param member
1265 * Membership handle.
1266 * @param message_id
1267 * ID of the message this client would like to see replayed.
1268 * @param fragment_offset
1269 * Offset of the fragment within the message to replay.
1270 * @param flags
1271 * Additional flags for the replay request.
1272 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1273 *
1274 * @return Replay request handle, NULL on error.
1275 */
1276struct GNUNET_MULTICAST_MemberReplayHandle *
1277GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1278 uint64_t message_id,
1279 uint64_t fragment_offset,
1280 uint64_t flags)
1281{
1282 member_replay_request (mem, 0, message_id, fragment_offset, flags);
1283 // FIXME: return something useful
1284 return NULL;
1285}
1286
1287
1288static void
1289member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1290{
1291 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1292 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1293 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1294 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1295
1296 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1297 struct GNUNET_MULTICAST_RequestHeader *req;
1298 struct GNUNET_MQ_Envelope *
1299 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1300 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1301
1302 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1303
1304 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1305 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1306 {
1307 LOG (GNUNET_ERROR_TYPE_ERROR,
1308 "MemberTransmitNotify() returned error or invalid message size. "
1309 "ret=%d, buf_size=%u\n", ret, buf_size);
1310 /* FIXME: handle error */
1311 GNUNET_MQ_discard (env);
1312 return;
1313 }
1314
1315 if (GNUNET_NO == ret && 0 == buf_size)
1316 {
1317 /* Transmission paused. */
1318 GNUNET_MQ_discard (env);
1319 return;
1320 }
1321
1322 req->header.size = htons (sizeof (*req) + buf_size);
1323 req->request_id = GNUNET_htonll (tmit->request_id);
1324 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1325 tmit->fragment_offset += sizeof (*req) + buf_size;
1326
1327 GNUNET_MQ_send (grp->mq, env);
1328
1329 if (GNUNET_YES == ret)
1330 grp->in_transmit = GNUNET_NO;
1331}
1332
1333
1334/**
1335 * Send a message to the origin of the multicast group.
1336 *
1337 * @param mem
1338 * Membership handle.
1339 * @param request_id
1340 * Application layer ID for the request. Opaque to multicast.
1341 * @param notify
1342 * Callback to call to get the message.
1343 * @param notify_cls
1344 * Closure for @a notify.
1345 *
1346 * @return Handle to cancel request, NULL on error (i.e. request already pending).
1347 */
1348struct GNUNET_MULTICAST_MemberTransmitHandle *
1349GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1350 uint64_t request_id,
1351 GNUNET_MULTICAST_MemberTransmitNotify notify,
1352 void *notify_cls)
1353{
1354 if (GNUNET_YES == mem->grp.in_transmit)
1355 return NULL;
1356 mem->grp.in_transmit = GNUNET_YES;
1357
1358 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1359 tmit->member = mem;
1360 tmit->request_id = request_id;
1361 tmit->fragment_offset = 0;
1362 tmit->notify = notify;
1363 tmit->notify_cls = notify_cls;
1364
1365 member_to_origin (mem);
1366 return tmit;
1367}
1368
1369
1370/**
1371 * Resume message transmission to origin.
1372 *
1373 * @param th
1374 * Transmission to cancel.
1375 */
1376void
1377GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1378{
1379 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1380 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1381 return;
1382 member_to_origin (th->member);
1383}
1384
1385
1386/**
1387 * Cancel request for message transmission to origin.
1388 *
1389 * @param th
1390 * Transmission to cancel.
1391 */
1392void
1393GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1394{
1395 th->member->grp.in_transmit = GNUNET_NO;
1396}
1397
1398
1399/* end of multicast_api.c */