From 5a042e00e06de726a21d9db05ddeb2ac16ca7c0c Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Sat, 26 Sep 2015 17:09:57 +0000 Subject: multicast: replay --- src/multicast/multicast_api.c | 236 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 204 insertions(+), 32 deletions(-) (limited to 'src/multicast/multicast_api.c') diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index ef4cc73e7..9f0c77f36 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -137,6 +137,11 @@ struct GNUNET_MULTICAST_Member GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; + /** + * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *replay_reqs; + uint64_t next_fragment_id; }; @@ -176,6 +181,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle */ struct GNUNET_MULTICAST_ReplayHandle { + struct GNUNET_MULTICAST_Group *grp; + struct MulticastReplayRequestMessage req; }; @@ -184,6 +191,9 @@ struct GNUNET_MULTICAST_ReplayHandle */ struct GNUNET_MULTICAST_MemberReplayHandle { + + GNUNET_MULTICAST_ResultCallback result_cb; + void *result_cls; }; @@ -263,11 +273,14 @@ group_recv_message (void *cls, struct GNUNET_MULTICAST_MessageHeader * mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; + if (GNUNET_YES == grp->is_disconnecting) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Calling message callback with a message of size %u.\n", ntohs (mmsg->header.size)); - if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb) + if (NULL != grp->message_cb) grp->message_cb (grp->cb_cls, mmsg); } @@ -296,6 +309,73 @@ origin_recv_request (void *cls, } +/** + * Receive multicast replay request from service. + */ +static void +group_recv_replay_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct MulticastReplayRequestMessage * + rep = (struct MulticastReplayRequestMessage *) msg; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n"); + + if (0 != rep->fragment_id) + { + if (NULL != grp->replay_frag_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_frag_cb (grp->cb_cls, &rep->member_key, + GNUNET_ntohll (rep->fragment_id), + GNUNET_ntohll (rep->flags), rh); + } + } + else if (0 != rep->message_id) + { + if (NULL != grp->replay_msg_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_msg_cb (grp->cb_cls, &rep->member_key, + GNUNET_ntohll (rep->message_id), + GNUNET_ntohll (rep->fragment_offset), + GNUNET_ntohll (rep->flags), rh); + } + } +} + + +/** + * Receive multicast replay request from service. + */ +static void +member_recv_replay_response (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member * + mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + grp = &mem->grp; + struct MulticastReplayResponseMessage * + res = (struct MulticastReplayResponseMessage *) msg; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); +} + /** * Member receives join decision. */ @@ -369,20 +449,24 @@ member_recv_join_decision (void *cls, */ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = { - { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, + { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - { &group_recv_message, NULL, + { group_recv_message, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - { &origin_recv_request, NULL, + { origin_recv_request, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, - { &group_recv_join_request, NULL, + { group_recv_join_request, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, + { group_recv_replay_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, + { NULL, NULL, 0, 0, GNUNET_NO } }; @@ -392,20 +476,28 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = */ static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = { - { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, + { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - { &group_recv_message, NULL, + { group_recv_message, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - { &group_recv_join_request, NULL, + { group_recv_join_request, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - { &member_recv_join_decision, NULL, + { member_recv_join_decision, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, + { group_recv_replay_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, + + { member_recv_replay_response, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, + { NULL, NULL, 0, 0, GNUNET_NO } }; @@ -514,15 +606,45 @@ GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestH /** * Replay a message fragment for the multicast group. * - * @param rh Replay handle identifying which replay operation was requested. - * @param msg Replayed message fragment, NULL if unknown/error. - * @param ec Error code. + * @param rh + * Replay handle identifying which replay operation was requested. + * @param msg + * Replayed message fragment, NULL if not found / an error occurred. + * @param ec + * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode + * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated. */ void GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, const struct GNUNET_MessageHeader *msg, enum GNUNET_MULTICAST_ReplayErrorCode ec) { + uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; + struct MulticastReplayResponseMessage * + res = GNUNET_malloc (sizeof (*res) + msg_size); + *res = (struct MulticastReplayResponseMessage) { + .header = { + .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE), + .size = htons (sizeof (*res) + msg_size), + }, + .fragment_id = rh->req.fragment_id, + .message_id = rh->req.message_id, + .fragment_offset = rh->req.fragment_offset, + .flags = rh->req.flags, + .error_code = htonl (ec), + }; + + if (GNUNET_MULTICAST_REC_OK == ec) + { + GNUNET_assert (NULL != msg); + memcpy (&res[1], msg, msg_size); + } + + GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header); + GNUNET_free (res); + + if (GNUNET_MULTICAST_REC_OK != ec) + GNUNET_free (rh); } @@ -536,6 +658,19 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, void GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) { + struct MulticastReplayResponseMessage end = { + .header = { + .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END), + .size = htons (sizeof (end)), + }, + .fragment_id = rh->req.fragment_id, + .message_id = rh->req.message_id, + .fragment_offset = rh->req.fragment_offset, + .flags = rh->req.flags, + }; + + GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header); + GNUNET_free (rh); } @@ -827,6 +962,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->join_req_cb = join_request_cb; grp->member_test_cb = member_test_cb; grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; grp->message_cb = message_cb; grp->cb_cls = cls; @@ -864,26 +1000,55 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, } +void +member_replay_request (struct GNUNET_MULTICAST_Member *mem, + uint64_t fragment_id, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags) +{ + struct MulticastReplayRequestMessage rep = { + .header = { + .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST), + .size = htons (sizeof (rep)), + }, + .fragment_id = GNUNET_htonll (fragment_id), + .message_id = GNUNET_htonll (message_id), + .fragment_offset = GNUNET_htonll (fragment_offset), + .flags = GNUNET_htonll (flags), + }; + GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header); +} + + /** * Request a fragment to be replayed by fragment ID. * * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param fragment_id ID of a message fragment that this client would like to - see replayed. - * @param flags Additional flags for the replay request. It is used and defined - * by the replay callback. FIXME: which replay callback? FIXME: use enum? - * FIXME: why not pass reply cb here? - * @return Replay request handle, NULL on error. + * @param member + * Membership handle. + * @param fragment_id + * ID of a message fragment that this client would like to see replayed. + * @param flags + * Additional flags for the replay request. + * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback + * @param result_cb + * Function to call when the replayed message fragment arrives. + * @param result_cls + * Closure for @a result_cb. + * + * @return Replay request handle. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, uint64_t fragment_id, - uint64_t flags) + uint64_t flags, + GNUNET_MULTICAST_ResultCallback result_cb, + void *result_cls) { - return NULL; + member_replay_request (mem, fragment_id, 0, 0, flags); } @@ -893,24 +1058,31 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param message_id ID of the message this client would like to see replayed. - * @param fragment_offset Offset of the fragment within the message to replay. - * @param flags Additional flags for the replay request. It is used & defined - * by the replay callback. - * @param result_cb Function to be called for the replayed message. - * @param result_cb_cls Closure for @a result_cb. + * @param member + * Membership handle. + * @param message_id + * ID of the message this client would like to see replayed. + * @param fragment_offset + * Offset of the fragment within the message to replay. + * @param flags + * Additional flags for the replay request. + * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback + * @param result_cb + * Function to call for each replayed message fragment. + * @param result_cls + * Closure for @a result_cb. + * * @return Replay request handle, NULL on error. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem, uint64_t message_id, uint64_t fragment_offset, uint64_t flags, GNUNET_MULTICAST_ResultCallback result_cb, - void *result_cb_cls) + void *result_cls) { - return NULL; + member_replay_request (mem, 0, message_id, fragment_offset, flags); } -- cgit v1.2.3