aboutsummaryrefslogtreecommitdiff
path: root/src/multicast/multicast_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-26 17:09:57 +0000
committerGabor X Toth <*@tg-x.net>2015-09-26 17:09:57 +0000
commit5a042e00e06de726a21d9db05ddeb2ac16ca7c0c (patch)
tree9b2f504796b72a50d061eb812be4aa87efbf6584 /src/multicast/multicast_api.c
parent0c5cea1c26efb8b53f643a6ce6b162d9c9fe2b8f (diff)
downloadgnunet-5a042e00e06de726a21d9db05ddeb2ac16ca7c0c.tar.gz
gnunet-5a042e00e06de726a21d9db05ddeb2ac16ca7c0c.zip
multicast: replay
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r--src/multicast/multicast_api.c236
1 files changed, 204 insertions, 32 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index ef4cc73e7..9f0c77f36 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -137,6 +137,11 @@ struct GNUNET_MULTICAST_Member
137 137
138 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; 138 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
139 139
140 /**
141 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
142 */
143 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
144
140 uint64_t next_fragment_id; 145 uint64_t next_fragment_id;
141}; 146};
142 147
@@ -176,6 +181,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle
176 */ 181 */
177struct GNUNET_MULTICAST_ReplayHandle 182struct GNUNET_MULTICAST_ReplayHandle
178{ 183{
184 struct GNUNET_MULTICAST_Group *grp;
185 struct MulticastReplayRequestMessage req;
179}; 186};
180 187
181 188
@@ -184,6 +191,9 @@ struct GNUNET_MULTICAST_ReplayHandle
184 */ 191 */
185struct GNUNET_MULTICAST_MemberReplayHandle 192struct GNUNET_MULTICAST_MemberReplayHandle
186{ 193{
194
195 GNUNET_MULTICAST_ResultCallback result_cb;
196 void *result_cls;
187}; 197};
188 198
189 199
@@ -263,11 +273,14 @@ group_recv_message (void *cls,
263 struct GNUNET_MULTICAST_MessageHeader * 273 struct GNUNET_MULTICAST_MessageHeader *
264 mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; 274 mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
265 275
276 if (GNUNET_YES == grp->is_disconnecting)
277 return;
278
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 279 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267 "Calling message callback with a message of size %u.\n", 280 "Calling message callback with a message of size %u.\n",
268 ntohs (mmsg->header.size)); 281 ntohs (mmsg->header.size));
269 282
270 if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb) 283 if (NULL != grp->message_cb)
271 grp->message_cb (grp->cb_cls, mmsg); 284 grp->message_cb (grp->cb_cls, mmsg);
272} 285}
273 286
@@ -297,6 +310,73 @@ origin_recv_request (void *cls,
297 310
298 311
299/** 312/**
313 * Receive multicast replay request from service.
314 */
315static void
316group_recv_replay_request (void *cls,
317 struct GNUNET_CLIENT_MANAGER_Connection *client,
318 const struct GNUNET_MessageHeader *msg)
319{
320 struct GNUNET_MULTICAST_Group *
321 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
322 struct MulticastReplayRequestMessage *
323 rep = (struct MulticastReplayRequestMessage *) msg;
324
325 if (GNUNET_YES == grp->is_disconnecting)
326 return;
327
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
329
330 if (0 != rep->fragment_id)
331 {
332 if (NULL != grp->replay_frag_cb)
333 {
334 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
335 rh->grp = grp;
336 rh->req = *rep;
337 grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
338 GNUNET_ntohll (rep->fragment_id),
339 GNUNET_ntohll (rep->flags), rh);
340 }
341 }
342 else if (0 != rep->message_id)
343 {
344 if (NULL != grp->replay_msg_cb)
345 {
346 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
347 rh->grp = grp;
348 rh->req = *rep;
349 grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
350 GNUNET_ntohll (rep->message_id),
351 GNUNET_ntohll (rep->fragment_offset),
352 GNUNET_ntohll (rep->flags), rh);
353 }
354 }
355}
356
357
358/**
359 * Receive multicast replay request from service.
360 */
361static void
362member_recv_replay_response (void *cls,
363 struct GNUNET_CLIENT_MANAGER_Connection *client,
364 const struct GNUNET_MessageHeader *msg)
365{
366 struct GNUNET_MULTICAST_Group *grp;
367 struct GNUNET_MULTICAST_Member *
368 mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
369 grp = &mem->grp;
370 struct MulticastReplayResponseMessage *
371 res = (struct MulticastReplayResponseMessage *) msg;
372
373 if (GNUNET_YES == grp->is_disconnecting)
374 return;
375
376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
377}
378
379/**
300 * Member receives join decision. 380 * Member receives join decision.
301 */ 381 */
302static void 382static void
@@ -369,20 +449,24 @@ member_recv_join_decision (void *cls,
369 */ 449 */
370static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = 450static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
371{ 451{
372 { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 452 { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
373 453
374 { &group_recv_message, NULL, 454 { group_recv_message, NULL,
375 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 455 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
376 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, 456 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
377 457
378 { &origin_recv_request, NULL, 458 { origin_recv_request, NULL,
379 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 459 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
380 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, 460 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
381 461
382 { &group_recv_join_request, NULL, 462 { group_recv_join_request, NULL,
383 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 463 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
384 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 464 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
385 465
466 { group_recv_replay_request, NULL,
467 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
468 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
469
386 { NULL, NULL, 0, 0, GNUNET_NO } 470 { NULL, NULL, 0, 0, GNUNET_NO }
387}; 471};
388 472
@@ -392,20 +476,28 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
392 */ 476 */
393static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = 477static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
394{ 478{
395 { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 479 { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
396 480
397 { &group_recv_message, NULL, 481 { group_recv_message, NULL,
398 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 482 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
399 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, 483 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
400 484
401 { &group_recv_join_request, NULL, 485 { group_recv_join_request, NULL,
402 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 486 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
403 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, 487 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
404 488
405 { &member_recv_join_decision, NULL, 489 { member_recv_join_decision, NULL,
406 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 490 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
407 sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, 491 sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
408 492
493 { group_recv_replay_request, NULL,
494 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
495 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
496
497 { member_recv_replay_response, NULL,
498 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
499 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
500
409 { NULL, NULL, 0, 0, GNUNET_NO } 501 { NULL, NULL, 0, 0, GNUNET_NO }
410}; 502};
411 503
@@ -514,15 +606,45 @@ GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestH
514/** 606/**
515 * Replay a message fragment for the multicast group. 607 * Replay a message fragment for the multicast group.
516 * 608 *
517 * @param rh Replay handle identifying which replay operation was requested. 609 * @param rh
518 * @param msg Replayed message fragment, NULL if unknown/error. 610 * Replay handle identifying which replay operation was requested.
519 * @param ec Error code. 611 * @param msg
612 * Replayed message fragment, NULL if not found / an error occurred.
613 * @param ec
614 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
615 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
520 */ 616 */
521void 617void
522GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, 618GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
523 const struct GNUNET_MessageHeader *msg, 619 const struct GNUNET_MessageHeader *msg,
524 enum GNUNET_MULTICAST_ReplayErrorCode ec) 620 enum GNUNET_MULTICAST_ReplayErrorCode ec)
525{ 621{
622 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
623 struct MulticastReplayResponseMessage *
624 res = GNUNET_malloc (sizeof (*res) + msg_size);
625 *res = (struct MulticastReplayResponseMessage) {
626 .header = {
627 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
628 .size = htons (sizeof (*res) + msg_size),
629 },
630 .fragment_id = rh->req.fragment_id,
631 .message_id = rh->req.message_id,
632 .fragment_offset = rh->req.fragment_offset,
633 .flags = rh->req.flags,
634 .error_code = htonl (ec),
635 };
636
637 if (GNUNET_MULTICAST_REC_OK == ec)
638 {
639 GNUNET_assert (NULL != msg);
640 memcpy (&res[1], msg, msg_size);
641 }
642
643 GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
644 GNUNET_free (res);
645
646 if (GNUNET_MULTICAST_REC_OK != ec)
647 GNUNET_free (rh);
526} 648}
527 649
528 650
@@ -536,6 +658,19 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
536void 658void
537GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) 659GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
538{ 660{
661 struct MulticastReplayResponseMessage end = {
662 .header = {
663 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
664 .size = htons (sizeof (end)),
665 },
666 .fragment_id = rh->req.fragment_id,
667 .message_id = rh->req.message_id,
668 .fragment_offset = rh->req.fragment_offset,
669 .flags = rh->req.flags,
670 };
671
672 GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
673 GNUNET_free (rh);
539} 674}
540 675
541 676
@@ -827,6 +962,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
827 grp->join_req_cb = join_request_cb; 962 grp->join_req_cb = join_request_cb;
828 grp->member_test_cb = member_test_cb; 963 grp->member_test_cb = member_test_cb;
829 grp->replay_frag_cb = replay_frag_cb; 964 grp->replay_frag_cb = replay_frag_cb;
965 grp->replay_msg_cb = replay_msg_cb;
830 grp->message_cb = message_cb; 966 grp->message_cb = message_cb;
831 grp->cb_cls = cls; 967 grp->cb_cls = cls;
832 968
@@ -864,26 +1000,55 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
864} 1000}
865 1001
866 1002
1003void
1004member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1005 uint64_t fragment_id,
1006 uint64_t message_id,
1007 uint64_t fragment_offset,
1008 uint64_t flags)
1009{
1010 struct MulticastReplayRequestMessage rep = {
1011 .header = {
1012 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
1013 .size = htons (sizeof (rep)),
1014 },
1015 .fragment_id = GNUNET_htonll (fragment_id),
1016 .message_id = GNUNET_htonll (message_id),
1017 .fragment_offset = GNUNET_htonll (fragment_offset),
1018 .flags = GNUNET_htonll (flags),
1019 };
1020 GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
1021}
1022
1023
867/** 1024/**
868 * Request a fragment to be replayed by fragment ID. 1025 * Request a fragment to be replayed by fragment ID.
869 * 1026 *
870 * Useful if messages below the @e max_known_fragment_id given when joining are 1027 * Useful if messages below the @e max_known_fragment_id given when joining are
871 * needed and not known to the client. 1028 * needed and not known to the client.
872 * 1029 *
873 * @param member Membership handle. 1030 * @param member
874 * @param fragment_id ID of a message fragment that this client would like to 1031 * Membership handle.
875 see replayed. 1032 * @param fragment_id
876 * @param flags Additional flags for the replay request. It is used and defined 1033 * ID of a message fragment that this client would like to see replayed.
877 * by the replay callback. FIXME: which replay callback? FIXME: use enum? 1034 * @param flags
878 * FIXME: why not pass reply cb here? 1035 * Additional flags for the replay request.
879 * @return Replay request handle, NULL on error. 1036 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1037 * @param result_cb
1038 * Function to call when the replayed message fragment arrives.
1039 * @param result_cls
1040 * Closure for @a result_cb.
1041 *
1042 * @return Replay request handle.
880 */ 1043 */
881struct GNUNET_MULTICAST_MemberReplayHandle * 1044struct GNUNET_MULTICAST_MemberReplayHandle *
882GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, 1045GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
883 uint64_t fragment_id, 1046 uint64_t fragment_id,
884 uint64_t flags) 1047 uint64_t flags,
1048 GNUNET_MULTICAST_ResultCallback result_cb,
1049 void *result_cls)
885{ 1050{
886 return NULL; 1051 member_replay_request (mem, fragment_id, 0, 0, flags);
887} 1052}
888 1053
889 1054
@@ -893,24 +1058,31 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
893 * Useful if messages below the @e max_known_fragment_id given when joining are 1058 * Useful if messages below the @e max_known_fragment_id given when joining are
894 * needed and not known to the client. 1059 * needed and not known to the client.
895 * 1060 *
896 * @param member Membership handle. 1061 * @param member
897 * @param message_id ID of the message this client would like to see replayed. 1062 * Membership handle.
898 * @param fragment_offset Offset of the fragment within the message to replay. 1063 * @param message_id
899 * @param flags Additional flags for the replay request. It is used & defined 1064 * ID of the message this client would like to see replayed.
900 * by the replay callback. 1065 * @param fragment_offset
901 * @param result_cb Function to be called for the replayed message. 1066 * Offset of the fragment within the message to replay.
902 * @param result_cb_cls Closure for @a result_cb. 1067 * @param flags
1068 * Additional flags for the replay request.
1069 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1070 * @param result_cb
1071 * Function to call for each replayed message fragment.
1072 * @param result_cls
1073 * Closure for @a result_cb.
1074 *
903 * @return Replay request handle, NULL on error. 1075 * @return Replay request handle, NULL on error.
904 */ 1076 */
905struct GNUNET_MULTICAST_MemberReplayHandle * 1077struct GNUNET_MULTICAST_MemberReplayHandle *
906GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member, 1078GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
907 uint64_t message_id, 1079 uint64_t message_id,
908 uint64_t fragment_offset, 1080 uint64_t fragment_offset,
909 uint64_t flags, 1081 uint64_t flags,
910 GNUNET_MULTICAST_ResultCallback result_cb, 1082 GNUNET_MULTICAST_ResultCallback result_cb,
911 void *result_cb_cls) 1083 void *result_cls)
912{ 1084{
913 return NULL; 1085 member_replay_request (mem, 0, message_id, fragment_offset, flags);
914} 1086}
915 1087
916 1088