From 12db287bf58f2076a65bc34382ce07ec9fe76e38 Mon Sep 17 00:00:00 2001 From: Tobias Platen Date: Wed, 20 Jul 2022 20:26:59 +0200 Subject: inital import of multicast from secushare --- configure.ac | 2 + src/Makefile.am | 1 + src/include/gnunet_multicast_service.h | 925 ++++++++++++ src/multicast/.gitignore | 7 + src/multicast/Makefile.am | 75 + src/multicast/gnunet-multicast.c | 79 ++ src/multicast/gnunet-service-multicast.c | 2236 ++++++++++++++++++++++++++++++ src/multicast/multicast.conf.in | 21 + src/multicast/multicast.h | 303 ++++ src/multicast/multicast_api.c | 1399 +++++++++++++++++++ src/multicast/test_multicast.c | 758 ++++++++++ src/multicast/test_multicast_2peers.c | 520 +++++++ src/multicast/test_multicast_multipeer.c | 643 +++++++++ 13 files changed, 6969 insertions(+) create mode 100644 src/include/gnunet_multicast_service.h create mode 100644 src/multicast/.gitignore create mode 100644 src/multicast/Makefile.am create mode 100644 src/multicast/gnunet-multicast.c create mode 100644 src/multicast/gnunet-service-multicast.c create mode 100644 src/multicast/multicast.conf.in create mode 100644 src/multicast/multicast.h create mode 100644 src/multicast/multicast_api.c create mode 100644 src/multicast/test_multicast.c create mode 100644 src/multicast/test_multicast_2peers.c create mode 100644 src/multicast/test_multicast_multipeer.c diff --git a/configure.ac b/configure.ac index 20258d622..9b46874ab 100644 --- a/configure.ac +++ b/configure.ac @@ -1435,6 +1435,8 @@ src/abd/abd.conf src/reclaim/Makefile src/messenger/Makefile src/messenger/messenger.conf +src/multicast/Makefile +src/multicast/multicast.conf pkgconfig/Makefile pkgconfig/gnunetarm.pc pkgconfig/gnunetats.pc diff --git a/src/Makefile.am b/src/Makefile.am index d7c0b51f0..b84d35b38 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -93,5 +93,6 @@ SUBDIRS = \ secretsharing \ reclaim \ messenger \ + multicast \ $(EXP_DIR) \ integration-tests diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h new file mode 100644 index 000000000..58fca0b2e --- /dev/null +++ b/src/include/gnunet_multicast_service.h @@ -0,0 +1,925 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012, 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +/** + * @author Gabor X Toth + * @author Christian Grothoff + * + * @file + * Multicast service; multicast messaging via CADET + * + * @defgroup multicast Multicast service + * Multicast messaging via CADET. + * @{ + */ + +#ifndef GNUNET_MULTICAST_SERVICE_H +#define GNUNET_MULTICAST_SERVICE_H + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + +#include "gnunet_util_lib.h" +#include "gnunet_transport_service.h" + +/** + * Version number of GNUnet-multicast API. + */ +#define GNUNET_MULTICAST_VERSION 0x00000000 + +/** + * Opaque handle for a multicast group member. + */ +struct GNUNET_MULTICAST_Member; + +/** + * Handle for the origin of a multicast group. + */ +struct GNUNET_MULTICAST_Origin; + + +enum GNUNET_MULTICAST_MessageFlags +{ + /** + * First fragment of a message. + */ + GNUNET_MULTICAST_MESSAGE_FIRST_FRAGMENT = 1 << 0, + + /** + * Last fragment of a message. + */ + GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT = 1 << 1, + + /** + * OR'ed flags if message is not fragmented. + */ + GNUNET_MULTICAST_MESSAGE_NOT_FRAGMENTED + = GNUNET_MULTICAST_MESSAGE_FIRST_FRAGMENT + | GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT, + + /** + * Historic message, used only locally when replaying messages from local + * storage. + */ + GNUNET_MULTICAST_MESSAGE_HISTORIC = 1 << 30 + +}; + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Header of a multicast message fragment. + * + * This format is public as the replay mechanism must replay message fragments using the + * same format. This is needed as we want to integrity-check message fragments within + * the multicast layer to avoid multicasting mal-formed messages. + */ +struct GNUNET_MULTICAST_MessageHeader +{ + + /** + * Header for all multicast message fragments from the origin. + */ + struct GNUNET_MessageHeader header; + + /** + * Number of hops this message fragment has taken since the origin. + * + * Helpful to determine shortest paths to the origin among honest peers for + * unicast requests from members. Updated at each hop and thus not signed and + * not secure. + */ + uint32_t hop_counter GNUNET_PACKED; + + /** + * ECC signature of the message fragment. + * + * Signature must match the public key of the multicast group. + */ + struct GNUNET_CRYPTO_EddsaSignature signature; + + /** + * Purpose for the signature and size of the signed data. + */ + struct GNUNET_CRYPTO_EccSignaturePurpose purpose; + + /** + * Number of the message fragment, monotonically increasing starting from 1. + */ + uint64_t fragment_id GNUNET_PACKED; + + /** + * Byte offset of this @e fragment of the @e message. + */ + uint64_t fragment_offset GNUNET_PACKED; + + /** + * Number of the message this fragment belongs to. + * + * Set in GNUNET_MULTICAST_origin_to_all(). + */ + uint64_t message_id GNUNET_PACKED; + + /** + * Counter that monotonically increases whenever a member parts the group. + * + * Set in GNUNET_MULTICAST_origin_to_all(). + * + * It has significance in case of replay requests: when a member has missed + * messages and gets a replay request: in this case if the @a group_generation + * is still the same before and after the missed messages, it means that no + * @e join or @e part operations happened during the missed messages. + */ + uint64_t group_generation GNUNET_PACKED; + + /** + * Flags for this message fragment. + * + * @see enum GNUNET_MULTICAST_MessageFlags + */ + uint32_t flags GNUNET_PACKED; + + /* Followed by message body. */ +}; + + +/** + * Header of a request from a member to the origin. + */ +struct GNUNET_MULTICAST_RequestHeader +{ + /** + * Header for all requests from a member to the origin. + */ + struct GNUNET_MessageHeader header; + + /** + * Public key of the sending member. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * ECC signature of the request fragment. + * + * Signature must match the public key of the multicast group. + */ + struct GNUNET_CRYPTO_EcdsaSignature signature; + + /** + * Purpose for the signature and size of the signed data. + */ + struct GNUNET_CRYPTO_EccSignaturePurpose purpose; + + /** + * Number of the request fragment. + * Monotonically increasing from 1. + */ + uint64_t fragment_id GNUNET_PACKED; + + /** + * Byte offset of this @e fragment of the @e request. + */ + uint64_t fragment_offset GNUNET_PACKED; + + /** + * Number of the request this fragment belongs to. + * + * Set in GNUNET_MULTICAST_origin_to_all(). + */ + uint64_t request_id GNUNET_PACKED; + + /** + * Flags for this request. + */ + enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED; + + /* Followed by request body. */ +}; + +GNUNET_NETWORK_STRUCT_END + + +/** + * Maximum size of a multicast message fragment. + */ +#define GNUNET_MULTICAST_FRAGMENT_MAX_SIZE (63 * 1024) + +#define GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD \ + (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE \ + - sizeof (struct GNUNET_MULTICAST_MessageHeader)) + + +/** + * Handle that identifies a join request. + * + * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the + * corresponding calls to #GNUNET_MULTICAST_join_decision(). + */ +struct GNUNET_MULTICAST_JoinHandle; + + +/** + * Function to call with the decision made for a join request. + * + * Must be called once and only once in response to an invocation of the + * #GNUNET_MULTICAST_JoinRequestCallback. + * + * @param jh + * Join request handle. + * @param is_admitted + * #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. + * @param relay_count + * Number of relays given. + * @param relays + * Array of suggested peers that might be useful relays to use + * when joining the multicast group (essentially a list of peers that + * are already part of the multicast group and might thus be willing + * to help with routing). If empty, only this local peer (which must + * be the multicast origin) is a good candidate for building the + * multicast tree. Note that it is unnecessary to specify our own + * peer identity in this array. + * @param join_resp + * Message to send in response to the joining peer; + * can also be used to redirect the peer to a different group at the + * application layer; this response is to be transmitted to the + * peer that issued the request even if admission is denied. + */ +struct GNUNET_MULTICAST_ReplayHandle * +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, + int is_admitted, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_resp); + + +/** + * Method called whenever another peer wants to join the multicast group. + * + * Implementations of this function must call GNUNET_MULTICAST_join_decision() + * with the decision. + * + * @param cls + * Closure. + * @param member_pub_key + * Public key of the member requesting join. + * @param join_msg + * Application-dependent join message from the new member. + * @param jh + * Join handle to pass to GNUNET_MULTICAST_join_decison(). + */ +typedef void +(*GNUNET_MULTICAST_JoinRequestCallback) (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh); + + +/** + * Method called to inform about the decision in response to a join request. + * + * If @a is_admitted is not #GNUNET_YES, then the multicast service disconnects + * the client and the multicast member handle returned by + * GNUNET_MULTICAST_member_join() is invalidated. + * + * @param cls + * Closure. + * @param is_admitted + * #GNUNET_YES or #GNUNET_NO or #GNUNET_SYSERR + * @param peer + * The peer we are connected to and the join decision is from. + * @param relay_count + * Number of peers in the @a relays array. + * @param relays + * Peer identities of members of the group, which serve as relays + * and can be used to join the group at. If empty, only the origin can + * be used to connect to the group. + * @param join_msg + * Application-dependent join message from the origin. + */ +typedef void +(*GNUNET_MULTICAST_JoinDecisionCallback) (void *cls, + int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg); + + +/** + * Function called whenever a group member has transmitted a request + * to the origin (other than joining or leaving). + * + * FIXME: need to distinguish between origin cancelling a message (some fragments + * were sent, then the rest 'discarded') and the case where we got disconnected; + * right now, both would mean 'msg' is NULL, but they could be quite different... + * So the semantics from the receiver side of + * GNUNET_MULTICAST_member_to_origin_cancel() are not clear here. Maybe we + * should do something with the flags in this case? + * + * @param cls + * Closure (set from GNUNET_MULTICAST_origin_start). + * @param sender + * Identity of the sender. + * @param req + * Request to the origin. + * @param flags + * Flags for the request. + */ +typedef void +(*GNUNET_MULTICAST_RequestCallback) (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req); + + +/** + * Function called whenever a group member is receiving a message fragment from + * the origin. + * + * If admission to the group is denied, this function is called once with the + * response of the @e origin (as given to GNUNET_MULTICAST_join_decision()) and + * then a second time with NULL to indicate that the connection failed for good. + * + * FIXME: need to distinguish between origin cancelling a message (some fragments + * were sent, then the rest 'discarded') and the case where we got disconnected; + * right now, both would mean 'msg' is NULL, but they could be quite different... + * So the semantics from the receiver side of + * GNUNET_MULTICAST_origin_to_all_cancel() are not clear here. + * + * @param cls + * Closure (set from GNUNET_MULTICAST_member_join()) + * @param msg + * Message from the origin, NULL if the origin shut down + * (or we were kicked out, and we should thus call + * GNUNET_MULTICAST_member_part() next) + */ +typedef void +(*GNUNET_MULTICAST_MessageCallback) (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg); + + +/** + * Opaque handle to a replay request from the multicast service. + */ +struct GNUNET_MULTICAST_ReplayHandle; + + +/** + * Functions with this signature are called whenever the multicast service needs + * a message fragment to be replayed by fragment_id. + * + * Implementations of this function MUST call GNUNET_MULTICAST_replay() ONCE + * (with a message or an error); however, if the origin is destroyed or the + * group is left, the replay handle must no longer be used. + * + * @param cls + * Closure (set from GNUNET_MULTICAST_origin_start() + * or GNUNET_MULTICAST_member_join()). + * @param member_pub_key + * The member requesting replay. + * @param fragment_id + * Which message fragment should be replayed. + * @param flags + * Flags for the replay. + * @param rh + * Handle to pass to message transmit function. + */ +typedef void +(*GNUNET_MULTICAST_ReplayFragmentCallback) (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + uint64_t fragment_id, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh); + +/** + * Functions with this signature are called whenever the multicast service needs + * a message fragment to be replayed by message_id and fragment_offset. + * + * Implementations of this function MUST call GNUNET_MULTICAST_replay() ONCE + * (with a message or an error); however, if the origin is destroyed or the + * group is left, the replay handle must no longer be used. + * + * @param cls + * Closure (set from GNUNET_MULTICAST_origin_start() + * or GNUNET_MULTICAST_member_join()). + * @param member_pub_key + * The member requesting replay. + * @param message_id + * Which message should be replayed. + * @param fragment_offset + * Offset of the fragment within of @a message_id to be replayed. + * @param flags + * Flags for the replay. + * @param rh + * Handle to pass to message transmit function. + */ +typedef void +(*GNUNET_MULTICAST_ReplayMessageCallback) (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh); + + +/** + * Possible error codes during replay. + */ +enum GNUNET_MULTICAST_ReplayErrorCode +{ + + /** + * Everything is fine. + */ + GNUNET_MULTICAST_REC_OK = 0, + + /** + * Message fragment not found in the message store. + * + * Either discarded if it is too old, or not arrived yet if this member has + * missed some messages. + */ + GNUNET_MULTICAST_REC_NOT_FOUND = 1, + + /** + * Fragment ID counter was larger than the highest counter this + * replay function has ever encountered; thus it is likely the + * origin never sent it and we're at the HEAD of the multicast + * stream as far as this node is concerned. + * + * FIXME: needed? + */ + GNUNET_MULTICAST_REC_PAST_HEAD = 2, + + /** + * Access is denied to the requested fragment, membership test did not pass. + */ + GNUNET_MULTICAST_REC_ACCESS_DENIED = 3, + + /** + * Internal error (i.e. database error). Try some other peer. + */ + GNUNET_MULTICAST_REC_INTERNAL_ERROR = 4 + +}; + + +/** + * Replay a message fragment for the multicast group. + * + * @param rh + * Replay handle identifying which replay operation was requested. + * @param msg + * Replayed message fragment, NULL if not found / an error occurred. + * @param ec + * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode + * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated. + */ +void +GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, + const struct GNUNET_MessageHeader *msg, + enum GNUNET_MULTICAST_ReplayErrorCode ec); + + +/** + * Indicate the end of the replay session. + * + * Invalidates the replay handle. + * + * @param rh Replay session to end. + */ +void +GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh); + + +/** + * Function called to provide data for a transmission for a replay. + * + * @see GNUNET_MULTICAST_replay2() + */ +typedef int +(*GNUNET_MULTICAST_ReplayTransmitNotify) (void *cls, + size_t *data_size, + void *data); + + +/** + * Replay a message for the multicast group. + * + * @param rh + * Replay handle identifying which replay operation was requested. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. + */ +void +GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, + GNUNET_MULTICAST_ReplayTransmitNotify notify, + void *notify_cls); + + +/** + * Start a multicast group. + * + * Peers that issue GNUNET_MULTICAST_member_join() can transmit a join request + * to either an existing group member or to the origin. If the joining is + * approved, the member is cleared for @e replay and will begin to receive + * messages transmitted to the group. If joining is disapproved, the failed + * candidate will be given a response. Members in the group can send messages + * to the origin. + * + * TODO: This function could optionally offer to advertise the origin in the + * P2P overlay network(where?) under the respective public key so that other + * peers can find an alternate PeerId to join it. Higher level protocols may + * however provide other means of solving the problem of the offline host + * (see secushare specs about that) and therefore merely need a way to provide + * a list of possible PeerIds. + * + * @param cfg + * Configuration to use. + * @param priv_key + * ECC key that will be used to sign messages for this + * multicast session; public key is used to identify the multicast group; + * @param max_fragment_id + * Maximum fragment ID already sent to the group. + * 0 for a new group. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param replay_frag_cb + * Function that can be called to replay a message fragment. + * @param replay_msg_cb + * Function that can be called to replay a message. + * @param request_cb + * Function called with message fragments from group members. + * @param message_cb + * Function called with the message fragments sent to the + * network by GNUNET_MULTICAST_origin_to_all(). These message fragments + * should be stored for answering replay requests later. + * @param cls + * Closure for the various callbacks that follow. + * + * @return Handle for the origin, NULL on error. + */ +struct GNUNET_MULTICAST_Origin * +GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, + uint64_t max_fragment_id, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, + GNUNET_MULTICAST_RequestCallback request_cb, + GNUNET_MULTICAST_MessageCallback message_cb, + void *cls); + +/** + * Function called to provide data for a transmission from the origin to all + * members. + * + * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO) + * invalidates the respective transmission handle. + * + * @param cls + * Closure. + * @param[in,out] data_size + * Initially set to the number of bytes available in + * @a data, should be set to the number of bytes written to data. + * @param[out] data + * Where to write the body of the message to give to the + * method. The function must copy at most @a data_size bytes to @a data. + * + * @return #GNUNET_SYSERR on error (fatal, aborts transmission) + * #GNUNET_NO on success, if more data is to be transmitted later. + * Should be used if @a data_size was not big enough to take all the + * data. If 0 is returned in @a data_size the transmission is paused, + * and can be resumed with GNUNET_MULTICAST_origin_to_all_resume(). + * #GNUNET_YES if this completes the transmission (all data supplied) + * @deprecated should move to MQ-style API! + */ +typedef int +(*GNUNET_MULTICAST_OriginTransmitNotify) (void *cls, + size_t *data_size, + void *data); + + +/** + * Handle for a request to send a message to all multicast group members + * (from the origin). + */ +struct GNUNET_MULTICAST_OriginTransmitHandle; + + +/** + * Send a message to the multicast group. + * + * @param origin + * Handle to the multicast group. + * @param message_id + * Application layer ID for the message. Opaque to multicast. + * @param group_generation + * Group generation of the message. Documented in + * struct GNUNET_MULTICAST_MessageHeader. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. + * + * @return NULL on error (i.e. request already pending). + * @deprecated should move to MQ-style API! + */ +struct GNUNET_MULTICAST_OriginTransmitHandle * +GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, + uint64_t message_id, + uint64_t group_generation, + GNUNET_MULTICAST_OriginTransmitNotify notify, + void *notify_cls); + + + +/** + * Resume message transmission to multicast group. + * + * @param th Transmission to cancel. + */ +void +GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th); + + +/** + * Cancel request for message transmission to multicast group. + * + * @param th Transmission to cancel. + */ +void +GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th); + + +/** + * Stop a multicast group. + * + * @param origin Multicast group to stop. + */ +void +GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin, + GNUNET_ContinuationCallback stop_cb, + void *stop_cls); + + +/** + * Join a multicast group. + * + * The entity joining is always the local peer. Further information about the + * candidate can be provided in @a join_msg. If the join fails, the + * @a message_cb is invoked with a (failure) response and then with NULL. If + * the join succeeds, outstanding (state) messages and ongoing multicast + * messages will be given to the @a message_cb until the member decides to part + * the group. The @a mem_test_cb and @a replay_cb functions may be called at + * anytime by the multicast service to support relaying messages to other + * members of the group. + * + * @param cfg + * Configuration to use. + * @param group_key + * ECC public key that identifies the group to join. + * @param member_pub_key + * ECC key that identifies the member + * and used to sign requests sent to the origin. + * @param origin + * Peer ID of the origin to send unicast requsets to. If NULL, + * unicast requests are sent back via multiple hops on the reverse path + * of multicast messages. + * @param relay_count + * Number of peers in the @a relays array. + * @param relays + * Peer identities of members of the group, which serve as relays + * and can be used to join the group at. and send the @a join_request to. + * If empty, the @a join_request is sent directly to the @a origin. + * @param join_msg + * Application-dependent join message to be passed to the peer @a origin. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param join_decision_cb + * Function called to inform about the join decision. + * @param replay_frag_cb + * Function that can be called to replay message fragments + * this peer already knows from this group. NULL if this + * client is unable to support replay. + * @param replay_msg_cb + * Function that can be called to replay message fragments + * this peer already knows from this group. NULL if this + * client is unable to support replay. + * @param message_cb + * Function to be called for all message fragments we + * receive from the group, excluding those our @a replay_cb + * already has. + * @param cls + * Closure for callbacks. + * + * @return Handle for the member, NULL on error. + */ +struct GNUNET_MULTICAST_Member * +GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_pub_key, + const struct GNUNET_PeerIdentity *origin, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_request, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, + GNUNET_MULTICAST_MessageCallback message_cb, + void *cls); + +/** + * Handle for a replay request. + */ +struct GNUNET_MULTICAST_MemberReplayHandle; + + +/** + * Request a fragment to be replayed by fragment ID. + * + * Useful if messages below the @e max_known_fragment_id given when joining are + * needed and not known to the client. + * + * @param member + * Membership handle. + * @param fragment_id + * ID of a message fragment that this client would like to see replayed. + * @param flags + * Additional flags for the replay request. + * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback + * + * @return Replay request handle, NULL on error. + */ +struct GNUNET_MULTICAST_MemberReplayHandle * +GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, + uint64_t fragment_id, + uint64_t flags); + + +/** + * Request a message fr to be replayed. + * + * Useful if messages below the @e max_known_fragment_id given when joining are + * needed and not known to the client. + * + * @param member + * Membership handle. + * @param message_id + * ID of the message this client would like to see replayed. + * @param fragment_offset + * Offset of the fragment within the message to replay. + * @param flags + * Additional flags for the replay request. + * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback + * + * @return Replay request handle, NULL on error. + */ +struct GNUNET_MULTICAST_MemberReplayHandle * +GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags); + + +/** + * Cancel a replay request. + * + * @param rh + * Request to cancel. + */ +void +GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh); + + +/** + * Part a multicast group. + * + * Disconnects from all group members and invalidates the @a member handle. + * + * An application-dependent part message can be transmitted beforehand using + * #GNUNET_MULTICAST_member_to_origin()) + * + * @param member + * Membership handle. + */ +void +GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member, + GNUNET_ContinuationCallback part_cb, + void *part_cls); + + +/** + * Function called to provide data for a transmission from a member to the origin. + * + * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO) + * invalidates the respective transmission handle. + * + * @param cls + * Closure. + * @param[in,out] data_size + * Initially set to the number of bytes available in + * @a data, should be set to the number of bytes written to data. + * @param[out] data + * Where to write the body of the message to give to the + * method. The function must copy at most @a data_size bytes to @a data. + * + * @return #GNUNET_SYSERR on error (fatal, aborts transmission) + * #GNUNET_NO on success, if more data is to be transmitted later. + * Should be used if @a data_size was not big enough to take all the + * data. If 0 is returned in @a data_size the transmission is paused, + * and can be resumed with GNUNET_MULTICAST_member_to_origin_resume(). + * #GNUNET_YES if this completes the transmission (all data supplied) + * @deprecated should move to MQ-style API! + */ +typedef int +(*GNUNET_MULTICAST_MemberTransmitNotify) (void *cls, + size_t *data_size, + void *data); + + +/** + * Handle for a message to be delivered from a member to the origin. + */ +struct GNUNET_MULTICAST_MemberTransmitHandle; + + +/** + * Send a message to the origin of the multicast group. + * + * @param member + * Membership handle. + * @param request_id + * Application layer ID for the request. Opaque to multicast. + * @param notify + * Callback to call to get the message. + * @param notify_cls + * Closure for @a notify. + * + * @return Handle to cancel request, NULL on error (i.e. request already pending). + * @deprecated should move to MQ-style API! + */ +struct GNUNET_MULTICAST_MemberTransmitHandle * +GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, + uint64_t request_id, + GNUNET_MULTICAST_MemberTransmitNotify notify, + void *notify_cls); + + +/** + * Resume message transmission to origin. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th); + + +/** + * Cancel request for message transmission to origin. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +/* ifndef GNUNET_MULTICAST_SERVICE_H */ +#endif + +/** @} */ /* end of group */ diff --git a/src/multicast/.gitignore b/src/multicast/.gitignore new file mode 100644 index 000000000..a97844e81 --- /dev/null +++ b/src/multicast/.gitignore @@ -0,0 +1,7 @@ +gnunet-service-multicast +gnunet-multicast +test_multicast +test_multicast_multipeer +test_multicast_2peers +test_multicast_multipeer_line +test_multicast_multipeer_star diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am new file mode 100644 index 000000000..0e1c77c41 --- /dev/null +++ b/src/multicast/Makefile.am @@ -0,0 +1,75 @@ +# This Makefile.am is in the public domain +AM_CPPFLAGS = -I$(top_srcdir)/src/include \ + $(GNUNET_CPPFLAGS) +pkgcfgdir= $(pkgdatadir)/config.d/ + +libexecdir= $(pkglibdir)/libexec/ + +pkgcfg_DATA = \ + multicast.conf + +if USE_COVERAGE + AM_CFLAGS = -fprofile-arcs -ftest-coverage +endif + +lib_LTLIBRARIES = libgnunetmulticast.la + +libgnunetmulticast_la_SOURCES = \ + multicast_api.c multicast.h +libgnunetmulticast_la_LIBADD = \ + -lgnunetutil \ + $(GN_LIBINTL) $(XLIB) +libgnunetmulticast_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) $(WINFLAGS) \ + -version-info 0:0:0 + + +bin_PROGRAMS = \ + gnunet-multicast + +libexec_PROGRAMS = \ + gnunet-service-multicast \ + $(EXP_LIBEXEC) + +gnunet_multicast_SOURCES = \ + gnunet-multicast.c +gnunet_multicast_LDADD = \ + -lgnunetutil \ + $(GN_LIBINTL) + +gnunet_service_multicast_SOURCES = \ + gnunet-service-multicast.c +gnunet_service_multicast_LDADD = \ + -lgnunetutil \ + -lgnunetcadet \ + -lgnunetstatistics \ + $(GN_LIBINTL) + +check_PROGRAMS = \ + test_multicast \ + test_multicast_multipeer_star \ + test_multicast_multipeer_line + +if ENABLE_TEST_RUN +AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@}; export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH; unset XDG_DATA_HOME; unset XDG_CONFIG_HOME; +TESTS = $(check_PROGRAMS) +endif + +test_multicast_SOURCES = \ + test_multicast.c +test_multicast_LDADD = \ + libgnunetmulticast.la \ + -lgnunettesting \ + -lgnunetutil +test_multicast_multipeer_star_SOURCES = \ + test_multicast_multipeer.c +test_multicast_multipeer_star_LDADD = \ + libgnunetmulticast.la \ + -lgnunettestbed \ + -lgnunetutil +test_multicast_multipeer_line_SOURCES = \ + test_multicast_multipeer.c +test_multicast_multipeer_line_LDADD = \ + libgnunetmulticast.la \ + -lgnunettestbed \ + -lgnunetutil diff --git a/src/multicast/gnunet-multicast.c b/src/multicast/gnunet-multicast.c new file mode 100644 index 000000000..802600487 --- /dev/null +++ b/src/multicast/gnunet-multicast.c @@ -0,0 +1,79 @@ +/* + This file is part of GNUnet. + Copyright (C) 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +/** + * @file multicast/gnunet-multicast.c + * @brief multicast for writing a tool + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +/* #include "gnunet_multicast_service.h" */ + +/** + * Final status code. + */ +static int ret; + +/** + * Main function that will be run by the scheduler. + * + * @param cls closure + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param cfg configuration + */ +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + /* main code here */ + puts( gettext_noop ("This command doesn't do anything yet.") ); + ret = -1; +} + + +/** + * The main function. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, char *const *argv) +{ + static const struct GNUNET_GETOPT_CommandLineOption options[] = { + /* FIMXE: add options here */ + GNUNET_GETOPT_OPTION_END + }; + if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) + return 2; + + ret = (GNUNET_OK == + GNUNET_PROGRAM_run (argc, argv, "gnunet-multicast", + gettext_noop ("This command doesn't do anything yet."), + options, &run, + NULL)) ? ret : 1; + //XXX GNUNET_free ((void*) argv); + return ret; +} + +/* end of gnunet-multicast.c */ diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c new file mode 100644 index 000000000..236a7c8cd --- /dev/null +++ b/src/multicast/gnunet-service-multicast.c @@ -0,0 +1,2236 @@ +/* + This file is part of GNUnet. + Copyright (C) 2009 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +/** + * @file multicast/gnunet-service-multicast.c + * @brief program that does multicast + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_signatures.h" +#include "gnunet_util_lib.h" +#include "gnunet_signatures.h" +#include "gnunet_applications.h" +#include "gnunet_statistics_service.h" +#include "gnunet_cadet_service.h" +#include "gnunet_multicast_service.h" +#include "multicast.h" + +/** + * Handle to our current configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Service handle. + */ +static struct GNUNET_SERVICE_Handle *service; + +/** + * CADET handle. + */ +static struct GNUNET_CADET_Handle *cadet; + +/** + * Identity of this peer. + */ +static struct GNUNET_PeerIdentity this_peer; + +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *stats; + +/** + * All connected origin clients. + * Group's pub_key_hash -> struct Origin * (uniq) + */ +static struct GNUNET_CONTAINER_MultiHashMap *origins; + +/** + * All connected member clients. + * Group's pub_key_hash -> struct Member * (multi) + */ +static struct GNUNET_CONTAINER_MultiHashMap *members; + +/** + * Connected member clients per group. + * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq) + */ +static struct GNUNET_CONTAINER_MultiHashMap *group_members; + +/** + * Incoming CADET channels with connected children in the tree. + * Group's pub_key_hash -> struct Channel * (multi) + */ +static struct GNUNET_CONTAINER_MultiHashMap *channels_in; + +/** + * Outgoing CADET channels connecting to parents in the tree. + * Group's pub_key_hash -> struct Channel * (multi) + */ +static struct GNUNET_CONTAINER_MultiHashMap *channels_out; + +/** + * Incoming replay requests from CADET. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct Channel * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet; + +/** + * Incoming replay requests from clients. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client; + + +/** + * Join status of a remote peer. + */ +enum JoinStatus +{ + JOIN_REFUSED = -1, + JOIN_NOT_ASKED = 0, + JOIN_WAITING = 1, + JOIN_ADMITTED = 2, +}; + +enum ChannelDirection +{ + DIR_INCOMING = 0, + DIR_OUTGOING = 1, +}; + + +/** + * Context for a CADET channel. + */ +struct Channel +{ + /** + * Group the channel belongs to. + * + * Only set for outgoing channels. + */ + struct Group *group; + + /** + * CADET channel. + */ + struct GNUNET_CADET_Channel *channel; + + // FIXME: not used + /** + * CADET transmission handle. + */ + struct GNUNET_CADET_TransmitHandle *tmit_handle; + + /** + * Public key of the target group. + */ + struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + + /** + * Hash of @a group_pub_key. + */ + struct GNUNET_HashCode group_pub_hash; + + /** + * Public key of the joining member. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * Remote peer identity. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Current window size, set by cadet_notify_window_change() + */ + int32_t window_size; + + /** + * Is the connection established? + */ + int8_t is_connected; + + /** + * Is the remote peer admitted to the group? + * @see enum JoinStatus + */ + int8_t join_status; + + /** + * Number of messages waiting to be sent to CADET. + */ + uint8_t msgs_pending; + + /** + * Channel direction. + * @see enum ChannelDirection + */ + uint8_t direction; +}; + + +/** + * List of connected clients. + */ +struct ClientList +{ + struct ClientList *prev; + struct ClientList *next; + struct GNUNET_SERVICE_Client *client; +}; + + +/** + * Client context for an origin or member. + */ +struct Group +{ + struct ClientList *clients_head; + struct ClientList *clients_tail; + + /** + * Public key of the group. + */ + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + + /** + * CADET port hash. + */ + struct GNUNET_HashCode cadet_port_hash; + + /** + * Is the client disconnected? #GNUNET_YES or #GNUNET_NO + */ + uint8_t is_disconnected; + + /** + * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)? + */ + uint8_t is_origin; + + union { + struct Origin *origin; + struct Member *member; + }; +}; + + +/** +* Client context for a group's origin. + */ +struct Origin +{ + struct Group group; + + /** + * Private key of the group. + */ + struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; + + /** + * CADET port. + */ + struct GNUNET_CADET_Port *cadet_port; + + /** + * Last message fragment ID sent to the group. + */ + uint64_t max_fragment_id; +}; + + +/** + * Client context for a group member. + */ +struct Member +{ + struct Group group; + + /** + * Private key of the member. + */ + struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key; + + /** + * Public key of the member. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + + /** + * Join request sent to the origin / members. + */ + struct MulticastJoinRequestMessage *join_req; + + /** + * Join decision sent in reply to our request. + * + * Only a positive decision is stored here, in case of a negative decision the + * client is disconnected. + */ + struct MulticastJoinDecisionMessageHeader *join_dcsn; + + /** + * CADET channel to the origin. + */ + struct Channel *origin_channel; + + /** + * Peer identity of origin. + */ + struct GNUNET_PeerIdentity origin; + + /** + * Peer identity of relays (other members to connect). + */ + struct GNUNET_PeerIdentity *relays; + + /** + * Last request fragment ID sent to the origin. + */ + uint64_t max_fragment_id; + + /** + * Number of @a relays. + */ + uint32_t relay_count; +}; + + +/** + * Client context. + */ +struct Client { + struct GNUNET_SERVICE_Client *client; + struct Group *group; +}; + + +struct ReplayRequestKey +{ + uint64_t fragment_id; + uint64_t message_id; + uint64_t fragment_offset; + uint64_t flags; +}; + + +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer); + +static void +cadet_channel_destroy (struct Channel *chn); + +static void +client_send_join_decision (struct Member *mem, + const struct MulticastJoinDecisionMessageHeader *hdcsn); + + +/** + * Task run during shutdown. + * + * @param cls unused + */ +static void +shutdown_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutting down\n"); + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + if (NULL != stats) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_YES); + stats = NULL; + } + /* FIXME: do more clean up here */ +} + + +/** + * Clean up origin data structures after a client disconnected. + */ +static void +cleanup_origin (struct Origin *orig) +{ + struct Group *grp = &orig->group; + GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig); + if (NULL != orig->cadet_port) + { + GNUNET_CADET_close_port (orig->cadet_port); + orig->cadet_port = NULL; + } + GNUNET_free (orig); +} + + +/** + * Clean up member data structures after a client disconnected. + */ +static void +cleanup_member (struct Member *mem) +{ + struct Group *grp = &mem->group; + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, + &grp->pub_key_hash); + GNUNET_assert (NULL != grp_mem); + GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem); + + if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem)) + { + GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash, + grp_mem); + GNUNET_CONTAINER_multihashmap_destroy (grp_mem); + } + if (NULL != mem->join_dcsn) + { + GNUNET_free (mem->join_dcsn); + mem->join_dcsn = NULL; + } + if (NULL != mem->origin_channel) + { + GNUNET_CADET_channel_destroy (mem->origin_channel->channel); + mem->origin_channel = NULL; + } + GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); + GNUNET_free (mem); +} + + +/** + * Clean up group data structures after a client disconnected. + */ +static void +cleanup_group (struct Group *grp) +{ + (GNUNET_YES == grp->is_origin) + ? cleanup_origin (grp->origin) + : cleanup_member (grp->member); +} + + +void +replay_key_hash (uint64_t fragment_id, uint64_t message_id, + uint64_t fragment_offset, uint64_t flags, + struct GNUNET_HashCode *key_hash) +{ + struct ReplayRequestKey key = { + .fragment_id = fragment_id, + .message_id = message_id, + .fragment_offset = fragment_offset, + .flags = flags, + }; + GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash); +} + + +/** + * Remove channel from replay request hashmap. + * + * @param chn + * Channel to remove. + * + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. + */ +static int +replay_req_remove_cadet (struct Channel *chn) +{ + if (NULL == chn || NULL == chn->group) + return GNUNET_SYSERR; + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->group->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct Channel *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) + { + if (c == chn) + { + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn); + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_YES; + } + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} + + +/** + * Remove client from replay request hashmap. + * + * @param client + * Client to remove. + * + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. + */ +static int +replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client) +{ + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct GNUNET_SERVICE_Client *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) + { + if (c == client) + { + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client); + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_YES; + } + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} + + +/** + * Send message to a client. + */ +static void +client_send (struct GNUNET_SERVICE_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to client.\n", client); + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); +} + + +/** + * Send message to all clients connected to the group. + */ +static void +client_send_group_keep_envelope (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) +{ + struct ClientList *cli = grp->clients_head; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to all clients of the group.\n", + grp); + while (NULL != cli) + { + GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client), + env); + cli = cli->next; + } +} + + +/** + * Send message to all clients connected to the group and + * takes care of freeing @env. + */ +static void +client_send_group (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) +{ + client_send_group_keep_envelope (grp, env); + GNUNET_MQ_discard (env); +} + + +/** + * Iterator callback for sending a message to origin clients. + */ +static int +client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *origin) +{ + struct GNUNET_MQ_Envelope *env = cls; + struct Member *orig = origin; + + client_send_group_keep_envelope (&orig->group, env); + return GNUNET_YES; +} + + +/** + * Iterator callback for sending a message to member clients. + */ +static int +client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *member) +{ + struct GNUNET_MQ_Envelope *env = cls; + struct Member *mem = member; + + if (NULL != mem->join_dcsn) + { /* Only send message to admitted members */ + client_send_group_keep_envelope (&mem->group, env); + } + return GNUNET_YES; +} + + +/** + * Send message to all origin and member clients connected to the group. + * + * @param pub_key_hash + * H(key_pub) of the group. + * @param msg + * Message to send. + */ +static int +client_send_all (struct GNUNET_HashCode *pub_key_hash, + struct GNUNET_MQ_Envelope *env) +{ + int n = 0; + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) env); + n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, + client_send_member_cb, + (void *) env); + GNUNET_MQ_discard (env); + return n; +} + + +/** + * Send message to a random origin client or a random member client. + * + * @param grp The group to send @a msg to. + * @param msg Message to send. + */ +static int +client_send_random (struct GNUNET_HashCode *pub_key_hash, + struct GNUNET_MQ_Envelope *env) +{ + int n = 0; + n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb, + (void *) env); + if (n <= 0) + n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb, + (void *) env); + GNUNET_MQ_discard (env); + return n; +} + + +/** + * Send message to all origin clients connected to the group. + * + * @param pub_key_hash + * H(key_pub) of the group. + * @param msg + * Message to send. + */ +static int +client_send_origin (struct GNUNET_HashCode *pub_key_hash, + struct GNUNET_MQ_Envelope *env) +{ + int n = 0; + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) env); + return n; +} + + +/** + * Send fragment acknowledgement to all clients of the channel. + * + * @param pub_key_hash + * H(key_pub) of the group. + */ +static void +client_send_ack (struct GNUNET_HashCode *pub_key_hash) +{ + struct GNUNET_MQ_Envelope *env; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sending message ACK to client.\n"); + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); + client_send_all (pub_key_hash, env); +} + + +struct CadetTransmitClosure +{ + struct Channel *chn; + const struct GNUNET_MessageHeader *msg; +}; + + +/** + * Send a message to a CADET channel. + * + * @param chn Channel. + * @param msg Message. + */ +static void +cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + + GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env); + + if (0 < chn->window_size) + { + client_send_ack (&chn->group_pub_hash); + } + else + { + chn->msgs_pending++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Queuing message. Pending messages: %u\n", + chn, chn->msgs_pending); + } +} + + +/** + * Create CADET channel and send a join request. + */ +static void +cadet_send_join_request (struct Member *mem) +{ + mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin); + cadet_send_channel (mem->origin_channel, &mem->join_req->header); + + uint32_t i; + for (i = 0; i < mem->relay_count; i++) + { + struct Channel * + chn = cadet_channel_create (&mem->group, &mem->relays[i]); + cadet_send_channel (chn, &mem->join_req->header); + } +} + + +static int +cadet_send_join_decision_cb (void *cls, + const struct GNUNET_HashCode *group_pub_hash, + void *channel) +{ + const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; + struct Channel *chn = channel; + + const struct MulticastJoinDecisionMessage *dcsn = + (struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)) + && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer))) + { + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { + chn->join_status = JOIN_ADMITTED; + } + else + { + chn->join_status = JOIN_REFUSED; + } + cadet_send_channel (chn, &hdcsn->header); + return GNUNET_YES; + } + + // return GNUNET_YES to continue the multihashmap_get iteration + return GNUNET_YES; +} + + +/** + * Send join decision to a remote peer. + */ +static void +cadet_send_join_decision (struct Group *grp, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash, + &cadet_send_join_decision_cb, + (void *) hdcsn); +} + + +/** + * Iterator callback for sending a message to origin clients. + */ +static int +cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *channel) +{ + const struct GNUNET_MessageHeader *msg = cls; + struct Channel *chn = channel; + if (JOIN_ADMITTED == chn->join_status) + cadet_send_channel (chn, msg); + return GNUNET_YES; +} + + +/** + * Send message to all connected children. + */ +static int +cadet_send_children (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + if (channels_in != NULL) + n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash, + cadet_send_cb, (void *) msg); + return n; +} + + +#if 0 // unused as yet +/** + * Send message to all connected parents. + */ +static int +cadet_send_parents (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + if (channels_in != NULL) + n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash, + cadet_send_cb, (void *) msg); + return n; +} +#endif + + +/** + * CADET channel connect handler. + * + * @see GNUNET_CADET_ConnectEventHandler() + */ +static void * +cadet_notify_connect (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Channel *chn = GNUNET_malloc (sizeof (struct Channel)); + chn->group = cls; + chn->channel = channel; + chn->direction = DIR_INCOMING; + chn->join_status = JOIN_NOT_ASKED; + + GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group->pub_key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; +} + + +/** + * CADET window size change handler. + * + * @see GNUNET_CADET_WindowSizeEventHandler() + */ +static void +cadet_notify_window_change (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + struct Channel *chn = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Window size changed to %d. Pending messages: %u\n", + chn, window_size, chn->msgs_pending); + + chn->is_connected = GNUNET_YES; + chn->window_size = (int32_t) window_size; + + for (int i = 0; i < window_size; i++) + { + if (0 < chn->msgs_pending) + { + client_send_ack (&chn->group_pub_hash); + chn->msgs_pending--; + } + else + { + break; + } + } +} + + +/** + * CADET channel disconnect handler. + * + * @see GNUNET_CADET_DisconnectEventHandler() + */ +static void +cadet_notify_disconnect (void *cls, + const struct GNUNET_CADET_Channel *channel) +{ + if (NULL == cls) + return; + + struct Channel *chn = cls; + if (NULL != chn->group) + { + if (GNUNET_NO == chn->group->is_origin) + { + struct Member *mem = (struct Member *) chn->group; + if (chn == mem->origin_channel) + mem->origin_channel = NULL; + } + } + + int ret; + do + { + ret = replay_req_remove_cadet (chn); + } + while (GNUNET_YES == ret); + + GNUNET_free (chn); +} + + +static int +check_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + + if (NULL == chn + || JOIN_NOT_ASKED != chn->join_status) + { + return GNUNET_SYSERR; + } + + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->reserved) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify_ (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming join request message from CADET. + */ +static void +handle_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + struct GNUNET_HashCode group_pub_hash; + GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash); + chn->group_pub_key = req->group_pub_key; + chn->group_pub_hash = group_pub_hash; + chn->member_pub_key = req->member_pub_key; + chn->peer = req->peer; + chn->join_status = JOIN_WAITING; + + client_send_all (&group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); +} + + +static int +check_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + uint16_t size = ntohs (hdcsn->header.size); + if (size < sizeof (struct MulticastJoinDecisionMessageHeader) + + sizeof (struct MulticastJoinDecisionMessage)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (NULL == chn->group || GNUNET_NO != chn->group->is_origin) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + switch (chn->join_status) + { + case JOIN_REFUSED: + return GNUNET_SYSERR; + + case JOIN_ADMITTED: + return GNUNET_OK; + + case JOIN_NOT_ASKED: + case JOIN_WAITING: + break; + } + + return GNUNET_OK; +} + + +/** + * Incoming join decision message from CADET. + */ +static void +handle_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer? + struct Member *mem = (struct Member *) chn->group; + client_send_join_decision (mem, hdcsn); + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { + chn->join_status = JOIN_ADMITTED; + } + else + { + chn->join_status = JOIN_REFUSED; + cadet_channel_destroy (chn); + } +} + + +static int +check_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + uint16_t size = ntohs (msg->header.size); + if (size < sizeof (*msg)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (msg->purpose.size) != (size + - sizeof (msg->header) + - sizeof (msg->hop_counter) + - sizeof (msg->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_eddsa_verify_ (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE, + &msg->purpose, &msg->signature, + &chn->group_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast message from CADET. + */ +static void +handle_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_all (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&msg->header)); +} + + +static int +check_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->member_pub_key) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify_ (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast request message from CADET. + */ +static void +handle_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_origin (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); +} + + +// FIXME: do checks in handle_cadet_replay_request +//static int +//check_cadet_replay_request (void *cls, +// const struct MulticastReplayRequestMessage *req) +//{ +// uint16_t size = ntohs (req->header.size); +// if (size < sizeof (*req)) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// struct Channel *chn = cls; +// if (NULL == chn) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// return GNUNET_OK; +//} + + +/** + * Incoming multicast replay request from CADET. + */ +static void +handle_cadet_replay_request (void *cls, + const struct MulticastReplayRequestMessage *req) +{ + struct Channel *chn = cls; + + GNUNET_CADET_receive_done (chn->channel); + + struct MulticastReplayRequestMessage rep = *req; + GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->group->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_cadet, + &chn->group->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + struct GNUNET_HashCode key_hash; + replay_key_hash (rep.fragment_id, + rep.message_id, + rep.fragment_offset, + rep.flags, + &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + client_send_random (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&rep.header)); +} + + +static int +check_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Incoming multicast replay response from CADET. + */ +static void +handle_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + /* @todo FIXME: got replay error response, send request to other members */ +} + + +static void +group_set_cadet_port_hash (struct Group *grp) +{ + struct CadetPort { + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + uint32_t app_type; + } port = { + grp->pub_key, + GNUNET_APPLICATION_TYPE_MULTICAST, + }; + GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash); +} + + + +/** + * Create new outgoing CADET channel. + * + * @param peer + * Peer to connect to. + * @param group_pub_key + * Public key of group the channel belongs to. + * @param group_pub_hash + * Hash of @a group_pub_key. + * + * @return Channel. + */ +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) +{ + struct Channel *chn = GNUNET_malloc (sizeof (*chn)); + chn->group = grp; + chn->group_pub_key = grp->pub_key; + chn->group_pub_hash = grp->pub_key_hash; + chn->peer = *peer; + chn->direction = DIR_OUTGOING; + chn->is_connected = GNUNET_NO; + chn->join_status = JOIN_WAITING; + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + chn), + + GNUNET_MQ_hd_var_size (cadet_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + chn), + + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + chn), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + chn), + + GNUNET_MQ_handler_end () + }; + + chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer, + &grp->cadet_port_hash, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); + GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; +} + + +/** + * Destroy outgoing CADET channel. + */ +static void +cadet_channel_destroy (struct Channel *chn) +{ + GNUNET_CADET_channel_destroy (chn->channel); + GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash); + GNUNET_free (chn); +} + +/** + * Handle a connecting client starting an origin. + */ +static void +handle_client_origin_start (void *cls, + const struct MulticastOriginStartMessage *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + struct GNUNET_HashCode pub_key_hash; + + GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key); + GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); + + struct Origin * + orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash); + struct Group *grp; + + if (NULL == orig) + { + orig = GNUNET_new (struct Origin); + orig->priv_key = msg->group_key; + orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id); + + grp = c->group = &orig->group; + grp->origin = orig; + grp->is_origin = GNUNET_YES; + grp->pub_key = pub_key; + grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; + + GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + group_set_cadet_port_hash (grp); + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + grp), + + GNUNET_MQ_handler_end () + }; + + + orig->cadet_port = GNUNET_CADET_open_port (cadet, + &grp->cadet_port_hash, + cadet_notify_connect, + grp, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); + } + else + { + grp = &orig->group; + } + + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client connected as origin to group %s.\n", + orig, GNUNET_h2s (&grp->pub_key_hash)); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_member_join (void *cls, + const struct MulticastMemberJoinMessage *msg) +{ + uint16_t msg_size = ntohs (msg->header.size); + struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; + uint32_t relay_count = ntohl (msg->relay_count); + + if (0 != relay_count) + { + if (UINT32_MAX / relay_count < sizeof (*relays)){ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "relay_count (%lu) * sizeof (*relays) (%lu) exceeds UINT32_MAX!\n", + (unsigned long)relay_count, + sizeof (*relays)); + return GNUNET_SYSERR; + } + } + uint32_t relay_size = relay_count * sizeof (*relays); + struct GNUNET_MessageHeader *join_msg = NULL; + uint16_t join_msg_size = 0; + if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader) + <= msg_size) + { + join_msg = (struct GNUNET_MessageHeader *) + (((char *) &msg[1]) + relay_size); + join_msg_size = ntohs (join_msg->size); + if (UINT16_MAX - join_msg_size < sizeof (struct MulticastJoinRequestMessage)){ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "join_msg_size (%u) + sizeof (struct MulticastJoinRequestMessage) (%lu) exceeds UINT16_MAX!\n", + (unsigned)join_msg_size, + (unsigned long)sizeof (struct MulticastJoinRequestMessage)); + return GNUNET_SYSERR; + } + } + if (msg_size != (sizeof (*msg) + relay_size + join_msg_size)){ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "msg_size does not match real size of message!\n"); + return GNUNET_SYSERR; + }else{ + return GNUNET_OK; + } +} + + +/** + * Handle a connecting client joining a group. + */ +static void +handle_client_member_join (void *cls, + const struct MulticastMemberJoinMessage *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + + uint16_t msg_size = ntohs (msg->header.size); + + struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key; + struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash; + + GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key); + GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash); + GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash); + struct Member *mem = NULL; + struct Group *grp; + + if (NULL != grp_mem) + { + mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash); + } + + if (NULL == mem) + { + mem = GNUNET_new (struct Member); + mem->origin = msg->origin; + mem->priv_key = msg->member_key; + mem->pub_key = mem_pub_key; + mem->pub_key_hash = mem_pub_key_hash; + mem->max_fragment_id = 0; // FIXME + + grp = c->group = &mem->group; + grp->member = mem; + grp->is_origin = GNUNET_NO; + grp->pub_key = msg->group_pub_key; + grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; + group_set_cadet_port_hash (grp); + + if (NULL == grp_mem) + { + grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + // FIXME: should the members hash map have option UNIQUE_FAST? + GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + } + else + { + grp = &mem->group; + } + + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); + + char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client connected to group %s as member %s (%s). size = %d\n", + GNUNET_h2s (&grp->pub_key_hash), + GNUNET_h2s2 (&mem->pub_key_hash), + str, + GNUNET_CONTAINER_multihashmap_size (members)); + GNUNET_free (str); + + if (NULL != mem->join_dcsn) + { /* Already got a join decision, send it to client. */ + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); + } + else + { /* First client of the group, send join request. */ + struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; + uint32_t relay_count = ntohl (msg->relay_count); + uint16_t relay_size = relay_count * sizeof (*relays); + struct GNUNET_MessageHeader *join_msg = NULL; + uint16_t join_msg_size = 0; + if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader) + <= msg_size) + { + join_msg = (struct GNUNET_MessageHeader *) + (((char *) &msg[1]) + relay_size); + join_msg_size = ntohs (join_msg->size); + } + + uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size; + struct MulticastJoinRequestMessage * + req = GNUNET_malloc (req_msg_size); + req->header.size = htons (req_msg_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST); + req->group_pub_key = grp->pub_key; + req->peer = this_peer; + GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key); + if (0 < join_msg_size) + GNUNET_memcpy (&req[1], join_msg, join_msg_size); + + req->member_pub_key = mem->pub_key; + req->purpose.size = htonl (req_msg_size + - sizeof (req->header) + - sizeof (req->reserved) + - sizeof (req->signature)); + req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); + + if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign_ (&mem->priv_key, &req->purpose, + &req->signature)) + { + /* FIXME: handle error */ + GNUNET_assert (0); + } + + if (NULL != mem->join_req) + GNUNET_free (mem->join_req); + mem->join_req = req; + + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&mem->join_req->header))) + { /* No local origins, send to remote origin */ + cadet_send_join_request (mem); + } + } + GNUNET_SERVICE_client_continue (client); +} + + +static void +client_send_join_decision (struct Member *mem, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + client_send_group (&mem->group, GNUNET_MQ_msg_copy (&hdcsn->header)); + + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { /* Member admitted, store join_decision. */ + uint16_t dcsn_size = ntohs (dcsn->header.size); + mem->join_dcsn = GNUNET_malloc (dcsn_size); + GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size); + } + else + { /* Refused entry, but replay would be still possible for past members. */ + } +} + + +static int +check_client_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + return GNUNET_OK; +} + + +/** + * Join decision from client. + */ +static void +handle_client_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p got join decision from client for group %s..\n", + grp, GNUNET_h2s (&grp->pub_key_hash)); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, + &grp->pub_key_hash); + struct Member *mem = NULL; + if (NULL != grp_mem) + { + struct GNUNET_HashCode member_key_hash; + GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key), + &member_key_hash); + mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p ..and member %s: %p\n", + grp, GNUNET_h2s (&member_key_hash), mem); + } + + if (NULL != mem) + { /* Found local member */ + client_send_join_decision (mem, hdcsn); + } + else + { /* Look for remote member */ + cadet_send_join_decision (grp, hdcsn); + } + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_part_request (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + struct GNUNET_MQ_Envelope *env; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p got part request from client for group %s.\n", + grp, GNUNET_h2s (&grp->pub_key_hash)); + grp->is_disconnected = GNUNET_YES; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK); + client_send_group (grp, env); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_multicast_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + return GNUNET_OK; +} + + +/** + * Incoming message from a client. + */ +static void +handle_client_multicast_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + // FIXME: what if GNUNET_YES == grp->is_disconnected? Do we allow sending messages? + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_YES == grp->is_origin); + struct Origin *orig = grp->origin; + + // FIXME: use GNUNET_MQ_msg_copy + /* FIXME: yucky, should use separate message structs for P2P and CS! */ + struct GNUNET_MULTICAST_MessageHeader * + out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header); + out->fragment_id = GNUNET_htonll (++orig->max_fragment_id); + out->purpose.size = htonl (ntohs (out->header.size) + - sizeof (out->header) + - sizeof (out->hop_counter) + - sizeof (out->signature)); + out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); + + if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign_ (&orig->priv_key, &out->purpose, + &out->signature)) + { + GNUNET_assert (0); + } + + client_send_all (&grp->pub_key_hash, GNUNET_MQ_msg_copy (&out->header)); + cadet_send_children (&grp->pub_key_hash, &out->header); + client_send_ack (&grp->pub_key_hash); + GNUNET_free (out); + + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_multicast_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + return GNUNET_OK; +} + + +/** + * Incoming request from a client. + */ +static void +handle_client_multicast_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_assert (GNUNET_NO == grp->is_origin); + struct Member *mem = grp->member; + + /* FIXME: yucky, should use separate message structs for P2P and CS! */ + struct GNUNET_MULTICAST_RequestHeader * + out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header); + out->member_pub_key = mem->pub_key; + out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); + out->purpose.size = htonl (ntohs (out->header.size) + - sizeof (out->header) + - sizeof (out->member_pub_key) + - sizeof (out->signature)); + out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); + + if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign_ (&mem->priv_key, &out->purpose, + &out->signature)) + { + GNUNET_assert (0); + } + + uint8_t send_ack = GNUNET_YES; + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&out->header))) + { /* No local origins, send to remote origin */ + if (NULL != mem->origin_channel) + { + cadet_send_channel (mem->origin_channel, &out->header); + send_ack = GNUNET_NO; + } + else + { + /* FIXME: not yet connected to origin */ + GNUNET_SERVICE_client_drop (client); + GNUNET_free (out); + return; + } + } + if (GNUNET_YES == send_ack) + { + client_send_ack (&grp->pub_key_hash); + } + GNUNET_free (out); + GNUNET_SERVICE_client_continue (client); +} + + +/** + * Incoming replay request from a client. + */ +static void +handle_client_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_assert (GNUNET_NO == grp->is_origin); + struct Member *mem = grp->member; + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_client, + &grp->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + + struct GNUNET_HashCode key_hash; + replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset, + rep->flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&rep->header))) + { /* No local origin, replay from remote members / origin. */ + if (NULL != mem->origin_channel) + { + cadet_send_channel (mem->origin_channel, &rep->header); + } + else + { + /* FIXME: not yet connected to origin */ + + GNUNET_assert (0); + GNUNET_SERVICE_client_drop (client); + return; + } + } + GNUNET_SERVICE_client_continue (client); +} + + +static int +cadet_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct Channel *chn = value; + struct GNUNET_MessageHeader *msg = cls; + + cadet_send_channel (chn, msg); + return GNUNET_OK; +} + + +static int +client_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct GNUNET_SERVICE_Client *client = value; + struct GNUNET_MessageHeader *msg = cls; + + client_send (client, msg); + return GNUNET_OK; +} + + +static int +check_client_replay_response_end (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + return GNUNET_OK; +} + + +/** + * End of replay response from a client. + */ +static void +handle_client_replay_response_end (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash); + } + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash); + } + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + const struct GNUNET_MessageHeader *msg; + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + msg = GNUNET_MQ_extract_nested_mh (res); + if (NULL == msg) + { + return GNUNET_SYSERR; + } + } + return GNUNET_OK; +} + + +/** + * Incoming replay response from a client. + * + * Respond with a multicast message on success, or otherwise with an error code. + */ +static void +handle_client_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + + const struct GNUNET_MessageHeader *msg = &res->header; + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + msg = GNUNET_MQ_extract_nested_mh (res); + } + + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash, + cadet_send_replay_response_cb, + (void *) msg); + } + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash, + client_send_replay_response_cb, + (void *) msg); + } + } + else + { + handle_client_replay_response_end (c, res); + return; + } + GNUNET_SERVICE_client_continue (client); +} + + +/** + * A new client connected. + * + * @param cls NULL + * @param client client to add + * @param mq message queue for @a client + * @return @a client + */ +static void * +client_notify_connect (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); + /* FIXME: send connect ACK */ + + struct Client *c = GNUNET_new (struct Client); + c->client = client; + + return c; +} + + +/** + * Called whenever a client is disconnected. + * Frees our resources associated with that client. + * + * @param cls closure + * @param client identification of the client + * @param app_ctx must match @a client + */ +static void +client_notify_disconnect (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_ctx) +{ + struct Client *c = app_ctx; + struct Group *grp = c->group; + GNUNET_free (c); + + if (NULL == grp) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p User context is NULL in client_disconnect()\n", grp); + GNUNET_break (0); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from group %s\n", + grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", + GNUNET_h2s (&grp->pub_key_hash)); + + // FIXME (due to protocol change): here we must not remove all clients, + // only the one we were notified about! + struct ClientList *cl = grp->clients_head; + while (NULL != cl) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating clients for group %p\n", + grp); + if (cl->client == client) + { + GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); + GNUNET_free (cl); + break; + } + cl = cl->next; + } + + while (GNUNET_YES == replay_req_remove_client (grp, client)); + + if (NULL == grp->clients_head) + { /* Last client disconnected. */ + cleanup_group (grp); + } +} + + +/** + * Service started. + * + * @param cls closure + * @param server the initialized server + * @param cfg configuration to use + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *svc) +{ + cfg = c; + service = svc; + GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer); + + stats = GNUNET_STATISTICS_create ("multicast", cfg); + origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + + cadet = GNUNET_CADET_connect (cfg); + + GNUNET_assert (NULL != cadet); + + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); +} + + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN +("multicast", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_notify_connect, + &client_notify_disconnect, + NULL, + GNUNET_MQ_hd_fixed_size (client_origin_start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, + struct MulticastOriginStartMessage, + NULL), + GNUNET_MQ_hd_var_size (client_member_join, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, + struct MulticastMemberJoinMessage, + NULL), + GNUNET_MQ_hd_var_size (client_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (client_part_request, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (client_multicast_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (client_multicast_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + NULL), + GNUNET_MQ_hd_fixed_size (client_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + NULL), + GNUNET_MQ_hd_var_size (client_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + NULL), + GNUNET_MQ_hd_var_size (client_replay_response_end, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, + struct MulticastReplayResponseMessage, + NULL)); + +/* end of gnunet-service-multicast.c */ diff --git a/src/multicast/multicast.conf.in b/src/multicast/multicast.conf.in new file mode 100644 index 000000000..df255e26d --- /dev/null +++ b/src/multicast/multicast.conf.in @@ -0,0 +1,21 @@ +[multicast] +START_ON_DEMAND = @START_ON_DEMAND@ +BINARY = gnunet-service-multicast + +UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-multicast.sock +UNIX_MATCH_UID = YES +UNIX_MATCH_GID = YES + +HOSTNAME = localhost +ACCEPT_FROM = 127.0.0.1; +ACCEPT_FROM6 = ::1; + +# DISABLE_SOCKET_FORWARDING = NO +# USERNAME = +# MAXBUF = +# TIMEOUT = +# DISABLEV6 = +# BINDTO = +# REJECT_FROM = +# REJECT_FROM6 = +# PREFIX = diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h new file mode 100644 index 000000000..8a3ca14c8 --- /dev/null +++ b/src/multicast/multicast.h @@ -0,0 +1,303 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012, 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +/** + * @file multicast/multicast.h + * @brief multicast IPC messages + * @author Christian Grothoff + * @author Gabor X Toth + */ +#ifndef MULTICAST_H +#define MULTICAST_H + +#include "platform.h" +#include "gnunet_multicast_service.h" + +GNUNET_NETWORK_STRUCT_BEGIN + + +/** + * Header of a join request sent to the origin or another member. + */ +struct MulticastJoinRequestMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved; + + /** + * ECC signature of the rest of the fields of the join request. + * + * Signature must match the public key of the joining member. + */ + struct GNUNET_CRYPTO_EcdsaSignature signature; + + /** + * Purpose for the signature and size of the signed data. + */ + struct GNUNET_CRYPTO_EccSignaturePurpose purpose; + + /** + * Public key of the target group. + */ + struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + + /** + * Public key of the joining member. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * Peer identity of the joining member. + */ + struct GNUNET_PeerIdentity peer; + + /* Followed by struct GNUNET_MessageHeader join_message */ +}; + + +/** + * Header of a join decision message sent to a peer requesting join. + */ +struct MulticastJoinDecisionMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION + */ + struct GNUNET_MessageHeader header; + + /** + * #GNUNET_YES if the peer was admitted + * #GNUNET_NO if entry was refused, + * #GNUNET_SYSERR if the request could not be answered. + */ + int32_t is_admitted; + + /** + * Number of relays given. + */ + uint32_t relay_count; + + /* Followed by relay_count peer identities */ + + /* Followed by the join response message */ +}; + + +/** + * Header added to a struct MulticastJoinDecisionMessage + * when sent between the client and service. + */ +struct MulticastJoinDecisionMessageHeader +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION + */ + struct GNUNET_MessageHeader header; + + /** + * C->S: Peer to send the join decision to. + * S->C: Peer we received the join decision from. + */ + struct GNUNET_PeerIdentity peer; + + /** + * C->S: Public key of the member requesting join. + * S->C: Unused. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /* Followed by struct MulticastJoinDecisionMessage */ +}; + + +/** + * Message sent from the client to the service to notify the service + * about the result of a membership test. + */ +struct MulticastMembershipTestResultMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Unique ID that identifies the associated membership test. + */ + uint32_t uid; + + /** + * #GNUNET_YES if the peer is a member + * #GNUNET_NO if peer is not a member, + * #GNUNET_SYSERR if the test could not be answered. + */ + int32_t is_admitted; +}; + + +/** + * Message sent from the client to the service OR the service to the + * client asking for a message fragment to be replayed. + */ +struct MulticastReplayRequestMessage +{ + + /** + * The message type should be + * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST. + */ + struct GNUNET_MessageHeader header; + + /** + * S->C: Public key of the member requesting replay. + * C->S: Unused. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * ID of the message that is being requested. + */ + uint64_t fragment_id; + + /** + * ID of the message that is being requested. + */ + uint64_t message_id; + + /** + * Offset of the fragment that is being requested. + */ + uint64_t fragment_offset; + + /** + * Additional flags for the request. + */ + uint64_t flags; + + /** + * Replay request ID. + */ + uint32_t uid; +}; + + +/** + * Message sent from the client to the service to give the service + * a replayed message. + */ +struct MulticastReplayResponseMessage +{ + + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE + * or GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the message that is being requested. + */ + uint64_t fragment_id; + + /** + * ID of the message that is being requested. + */ + uint64_t message_id; + + /** + * Offset of the fragment that is being requested. + */ + uint64_t fragment_offset; + + /** + * Additional flags for the request. + */ + uint64_t flags; + + /** + * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO). + */ + int32_t error_code; + + /* followed by replayed message */ +}; + + +/** + * Message sent from the client to the service to notify the service + * about the starting of a multicast group with this peers as its origin. + */ +struct MulticastOriginStartMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved; + + /** + * Private, non-ephemeral key for the multicast group. + */ + struct GNUNET_CRYPTO_EddsaPrivateKey group_key; + + /** + * Last fragment ID sent to the group, used to continue counting fragments if + * we resume operating * a group. + */ + uint64_t max_fragment_id; +}; + + +struct MulticastMemberJoinMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN + */ + struct GNUNET_MessageHeader header; + + uint32_t relay_count GNUNET_PACKED; + + struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + + struct GNUNET_CRYPTO_EcdsaPrivateKey member_key; + + struct GNUNET_PeerIdentity origin; + + /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ + + /* Followed by struct GNUNET_MessageHeader join_msg */ +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif +/* end of multicast.h */ diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c new file mode 100644 index 000000000..e5e830225 --- /dev/null +++ b/src/multicast/multicast_api.c @@ -0,0 +1,1399 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012, 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ + +/** + * @file multicast/multicast_api.c + * @brief Multicast service; implements multicast groups using CADET connections. + * @author Christian Grothoff + * @author Gabor X Toth + */ + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_multicast_service.h" +#include "multicast.h" + +#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__) + + +/** + * Handle for a request to send a message to all multicast group members + * (from the origin). + */ +struct GNUNET_MULTICAST_OriginTransmitHandle +{ + GNUNET_MULTICAST_OriginTransmitNotify notify; + void *notify_cls; + struct GNUNET_MULTICAST_Origin *origin; + + uint64_t message_id; + uint64_t group_generation; + uint64_t fragment_offset; +}; + + +/** + * Handle for a message to be delivered from a member to the origin. + */ +struct GNUNET_MULTICAST_MemberTransmitHandle +{ + GNUNET_MULTICAST_MemberTransmitNotify notify; + void *notify_cls; + struct GNUNET_MULTICAST_Member *member; + + uint64_t request_id; + uint64_t fragment_offset; +}; + + +struct GNUNET_MULTICAST_Group +{ + /** + * Configuration to use. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Client connection to the service. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Message to send on connect. + */ + struct GNUNET_MQ_Envelope *connect_env; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_delay; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + + GNUNET_MULTICAST_JoinRequestCallback join_req_cb; + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; + GNUNET_MULTICAST_MessageCallback message_cb; + void *cb_cls; + + /** + * Function called after disconnected from the service. + */ + GNUNET_ContinuationCallback disconnect_cb; + + /** + * Closure for @a disconnect_cb. + */ + void *disconnect_cls; + + /** + * Are we currently transmitting a message? + */ + uint8_t in_transmit; + + /** + * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for. + */ + uint8_t acks_pending; + + /** + * Is this the origin or a member? + */ + uint8_t is_origin; + + /** + * Is this channel in the process of disconnecting from the service? + * #GNUNET_YES or #GNUNET_NO + */ + uint8_t is_disconnecting; +}; + + +/** + * Handle for the origin of a multicast group. + */ +struct GNUNET_MULTICAST_Origin +{ + struct GNUNET_MULTICAST_Group grp; + struct GNUNET_MULTICAST_OriginTransmitHandle tmit; + + GNUNET_MULTICAST_RequestCallback request_cb; +}; + + +/** + * Handle for a multicast group member. + */ +struct GNUNET_MULTICAST_Member +{ + struct GNUNET_MULTICAST_Group grp; + struct GNUNET_MULTICAST_MemberTransmitHandle tmit; + + GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; + + /** + * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *replay_reqs; + + uint64_t next_fragment_id; +}; + + +/** + * Handle that identifies a join request. + * + * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the + * corresponding calls to #GNUNET_MULTICAST_join_decision(). + */ +struct GNUNET_MULTICAST_JoinHandle +{ + struct GNUNET_MULTICAST_Group *group; + + /** + * Public key of the member requesting join. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * Peer identity of the member requesting join. + */ + struct GNUNET_PeerIdentity peer; +}; + + +/** + * Opaque handle to a replay request from the multicast service. + */ +struct GNUNET_MULTICAST_ReplayHandle +{ + struct GNUNET_MULTICAST_Group *grp; + struct MulticastReplayRequestMessage req; +}; + + +/** + * Handle for a replay request. + */ +struct GNUNET_MULTICAST_MemberReplayHandle +{ +}; + + +static void +origin_to_all (struct GNUNET_MULTICAST_Origin *orig); + +static void +member_to_origin (struct GNUNET_MULTICAST_Member *mem); + + +/** + * Check join request message. + */ +static int +check_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) +{ + uint16_t size = ntohs (jreq->header.size); + + if (sizeof (*jreq) == size) + return GNUNET_OK; + + if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; +} + + +/** + * Receive join request from service. + */ +static void +handle_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + struct GNUNET_MULTICAST_JoinHandle *jh; + const struct GNUNET_MessageHeader *jmsg = NULL; + + if (NULL == grp) + { + GNUNET_break (0); + return; + } + if (NULL == grp->join_req_cb) + return; + + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) + jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; + + jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_pub_key = jreq->member_pub_key; + jh->peer = jreq->peer; + grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check multicast message. + */ +static int +check_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + return GNUNET_OK; +} + + +/** + * Receive multicast message from service. + */ +static void +handle_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling message callback with a message of size %u.\n", + ntohs (mmsg->header.size)); + + if (NULL != grp->message_cb) + grp->message_cb (grp->cb_cls, mmsg); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Receive message/request fragment acknowledgement from service. + */ +static void +handle_group_fragment_ack (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", + grp, grp->in_transmit, grp->acks_pending); + + if (0 == grp->acks_pending) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Ignoring extraneous fragment ACK.\n", grp); + return; + } + grp->acks_pending--; + + if (GNUNET_YES != grp->in_transmit) + return; + + if (GNUNET_YES == grp->is_origin) + origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); + else + member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check unicast request. + */ +static int +check_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + return GNUNET_OK; +} + + +/** + * Origin receives unicast request from a member. + */ +static void +handle_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Origin *orig = cls; + grp = &orig->grp; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling request callback with a request of size %u.\n", + ntohs (req->header.size)); + + if (NULL != orig->request_cb) + orig->request_cb (grp->cb_cls, req); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Receive multicast replay request from service. + */ +static void +handle_group_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) + +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n"); + + if (0 != rep->fragment_id) + { + if (NULL != grp->replay_frag_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->fragment_id), + GNUNET_ntohll (rep->flags), rh); + } + } + else if (0 != rep->message_id) + { + if (NULL != grp->replay_msg_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->message_id), + GNUNET_ntohll (rep->fragment_offset), + GNUNET_ntohll (rep->flags), rh); + } + } + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check replay response. + */ +static int +check_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + uint16_t size = ntohs (res->header.size); + + if (sizeof (*res) == size) + return GNUNET_OK; + + if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; +} + + +/** + * Receive replay response from service. + */ +static void +handle_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); + + // FIXME: return result +} + + +/** + * Check join decision. + */ +static int +check_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + return GNUNET_OK; // checked in handle below +} + + +/** + * Member receives join decision. + */ +static void +handle_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; + + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + uint16_t dcsn_size = ntohs (dcsn->header.size); + int is_admitted = ntohl (dcsn->is_admitted); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Member got join decision from multicast: %d\n", + mem, is_admitted); + + const struct GNUNET_MessageHeader *join_resp = NULL; + uint16_t join_resp_size = 0; + + uint16_t relay_count = ntohl (dcsn->relay_count); + const struct GNUNET_PeerIdentity *relays = NULL; + uint16_t relay_size = relay_count * sizeof (*relays); + if (0 < relay_count) + { + if (dcsn_size < sizeof (*dcsn) + relay_size) + { + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + else + { + relays = (struct GNUNET_PeerIdentity *) &dcsn[1]; + } + } + + if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size) + { + join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size); + join_resp_size = ntohs (join_resp->size); + } + if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received invalid join decision message from multicast: %u < %u + %u + %u\n", + dcsn_size , sizeof (*dcsn), relay_size, join_resp_size); + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + + if (NULL != mem->join_dcsn_cb) + mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer, + relay_count, relays, join_resp); + + // FIXME: + //if (GNUNET_YES != is_admitted) + // GNUNET_MULTICAST_member_part (mem); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static void +group_cleanup (struct GNUNET_MULTICAST_Group *grp) +{ + if (NULL != grp->connect_env) + { + GNUNET_MQ_discard (grp->connect_env); + grp->connect_env = NULL; + } + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + if (NULL != grp->disconnect_cb) + { + grp->disconnect_cb (grp->disconnect_cls); + grp->disconnect_cb = NULL; + } + GNUNET_free (grp); +} + + +static void +handle_group_part_ack (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + group_cleanup (grp); +} + + +/** + * Function to call with the decision made for a join request. + * + * Must be called once and only once in response to an invocation of the + * #GNUNET_MULTICAST_JoinRequestCallback. + * + * @param join + * Join request handle. + * @param is_admitted + * #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. + * @param relay_count + * Number of relays given. + * @param relays + * Array of suggested peers that might be useful relays to use + * when joining the multicast group (essentially a list of peers that + * are already part of the multicast group and might thus be willing + * to help with routing). If empty, only this local peer (which must + * be the multicast origin) is a good candidate for building the + * multicast tree. Note that it is unnecessary to specify our own + * peer identity in this array. + * @param join_resp + * Message to send in response to the joining peer; + * can also be used to redirect the peer to a different group at the + * application layer; this response is to be transmitted to the + * peer that issued the request even if admission is denied. + */ +struct GNUNET_MULTICAST_ReplayHandle * +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, + int is_admitted, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_resp) +{ + struct GNUNET_MULTICAST_Group *grp = join->group; + uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; + uint16_t relay_size = relay_count * sizeof (*relays); + + struct MulticastJoinDecisionMessageHeader *hdcsn; + struct MulticastJoinDecisionMessage *dcsn; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_pub_key = join->member_pub_key; + hdcsn->peer = join->peer; + + dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; + dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); + dcsn->is_admitted = htonl (is_admitted); + dcsn->relay_count = htonl (relay_count); + if (0 < relay_size) + GNUNET_memcpy (&dcsn[1], relays, relay_size); + if (0 < join_resp_size) + GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); + + GNUNET_MQ_send (grp->mq, env); + GNUNET_free (join); + return NULL; +} + + +/** + * Replay a message fragment for the multicast group. + * + * @param rh + * Replay handle identifying which replay operation was requested. + * @param msg + * Replayed message fragment, NULL if not found / an error occurred. + * @param ec + * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode + * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated. + */ +void +GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, + const struct GNUNET_MessageHeader *msg, + enum GNUNET_MULTICAST_ReplayErrorCode ec) +{ + uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; + struct MulticastReplayResponseMessage *res; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE); + res->fragment_id = rh->req.fragment_id; + res->message_id = rh->req.message_id; + res->fragment_offset = rh->req.fragment_offset; + res->flags = rh->req.flags; + res->error_code = htonl (ec); + + if (GNUNET_MULTICAST_REC_OK == ec) + { + GNUNET_assert (NULL != msg); + GNUNET_memcpy (&res[1], msg, msg_size); + } + + GNUNET_MQ_send (rh->grp->mq, env); + + if (GNUNET_MULTICAST_REC_OK != ec) + GNUNET_free (rh); +} + + +/** + * Indicate the end of the replay session. + * + * Invalidates the replay handle. + * + * @param rh + * Replay session to end. + */ +void +GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + struct MulticastReplayResponseMessage *end; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END); + + end->fragment_id = rh->req.fragment_id; + end->message_id = rh->req.message_id; + end->fragment_offset = rh->req.fragment_offset; + end->flags = rh->req.flags; + + GNUNET_MQ_send (rh->grp->mq, env); + GNUNET_free (rh); +} + + +/** + * Replay a message for the multicast group. + * + * @param rh + * Replay handle identifying which replay operation was requested. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. + */ +void +GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, + GNUNET_MULTICAST_ReplayTransmitNotify notify, + void *notify_cls) +{ +} + + +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig); + + +static void +origin_reconnect (void *cls) +{ + origin_connect (cls); +} + + +/** + * Origin client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +origin_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Origin *orig = cls; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + origin_reconnect, + orig); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as origin. + */ +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (origin_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + orig), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast", + handlers, origin_disconnected, orig); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); +} + + +/** + * Start a multicast group. + * + * Will advertise the origin in the P2P overlay network under the respective + * public key so that other peer can find this peer to join it. Peers that + * issue GNUNET_MULTICAST_member_join() can then transmit a join request to + * either an existing group member or to the origin. If the joining is + * approved, the member is cleared for @e replay and will begin to receive + * messages transmitted to the group. If joining is disapproved, the failed + * candidate will be given a response. Members in the group can send messages + * to the origin (one at a time). + * + * @param cfg + * Configuration to use. + * @param priv_key + * ECC key that will be used to sign messages for this + * multicast session; public key is used to identify the multicast group; + * @param max_fragment_id + * Maximum fragment ID already sent to the group. + * 0 for a new group. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param replay_frag_cb + * Function that can be called to replay a message fragment. + * @param replay_msg_cb + * Function that can be called to replay a message. + * @param request_cb + * Function called with message fragments from group members. + * @param message_cb + * Function called with the message fragments sent to the + * network by GNUNET_MULTICAST_origin_to_all(). These message fragments + * should be stored for answering replay requests later. + * @param cls + * Closure for the various callbacks that follow. + * + * @return Handle for the origin, NULL on error. + */ +struct GNUNET_MULTICAST_Origin * +GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, + uint64_t max_fragment_id, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, + GNUNET_MULTICAST_RequestCallback request_cb, + GNUNET_MULTICAST_MessageCallback message_cb, + void *cls) +{ + struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + struct MulticastOriginStartMessage *start; + grp->connect_env = GNUNET_MQ_msg (start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); + start->max_fragment_id = max_fragment_id; + start->group_key = *priv_key; + + grp->cfg = cfg; + grp->is_origin = GNUNET_YES; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + + grp->cb_cls = cls; + grp->join_req_cb = join_request_cb; + grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; + grp->message_cb = message_cb; + + orig->request_cb = request_cb; + + origin_connect (orig); + return orig; +} + + +/** + * Stop a multicast group. + * + * @param origin + * Multicast group to stop. + */ +void +GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, + GNUNET_ContinuationCallback stop_cb, + void *stop_cls) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + struct GNUNET_MQ_Envelope *env; + + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = stop_cb; + grp->disconnect_cls = stop_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); +} + + +static void +origin_to_all (struct GNUNET_MULTICAST_Origin *orig) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig); + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); + + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_MessageHeader *msg; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg), + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + + int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); + + if (! (GNUNET_YES == ret || GNUNET_NO == ret) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "%p OriginTransmitNotify() returned error or invalid message size.\n", + orig); + /* FIXME: handle error */ + GNUNET_MQ_discard (env); + return; + } + + if (GNUNET_NO == ret && 0 == buf_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p OriginTransmitNotify() - transmission paused.\n", orig); + GNUNET_MQ_discard (env); + return; /* Transmission paused. */ + } + + msg->header.size = htons (sizeof (*msg) + buf_size); + msg->message_id = GNUNET_htonll (tmit->message_id); + msg->group_generation = tmit->group_generation; + msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); + tmit->fragment_offset += sizeof (*msg) + buf_size; + + grp->acks_pending++; + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; +} + + +/** + * Send a message to the multicast group. + * + * @param orig + * Handle to the multicast group. + * @param message_id + * Application layer ID for the message. Opaque to multicast. + * @param group_generation + * Group generation of the message. + * Documented in struct GNUNET_MULTICAST_MessageHeader. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. + * + * @return Message handle on success, + * NULL on error (i.e. another request is already pending). + */ +struct GNUNET_MULTICAST_OriginTransmitHandle * +GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, + uint64_t message_id, + uint64_t group_generation, + GNUNET_MULTICAST_OriginTransmitNotify notify, + void *notify_cls) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + if (GNUNET_YES == grp->in_transmit) + return NULL; + grp->in_transmit = GNUNET_YES; + + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + tmit->origin = orig; + tmit->message_id = message_id; + tmit->fragment_offset = 0; + tmit->group_generation = group_generation; + tmit->notify = notify; + tmit->notify_cls = notify_cls; + + origin_to_all (orig); + return tmit; +} + + +/** + * Resume message transmission to multicast group. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) +{ + struct GNUNET_MULTICAST_Group *grp = &th->origin->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; + origin_to_all (th->origin); +} + + +/** + * Cancel request for message transmission to multicast group. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) +{ + th->origin->grp.in_transmit = GNUNET_NO; +} + + +static void +member_connect (struct GNUNET_MULTICAST_Member *mem); + + +static void +member_reconnect (void *cls) +{ + member_connect (cls); +} + + +/** + * Member client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +member_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Member *mem = cls; + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Member client disconnected (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + member_reconnect, + mem); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as member. + */ +static void +member_connect (struct GNUNET_MULTICAST_Member *mem) +{ + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + mem), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + mem), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast", + handlers, member_disconnected, mem); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); +} + + +/** + * Join a multicast group. + * + * The entity joining is always the local peer. Further information about the + * candidate can be provided in the @a join_request message. If the join fails, the + * @a message_cb is invoked with a (failure) response and then with NULL. If + * the join succeeds, outstanding (state) messages and ongoing multicast + * messages will be given to the @a message_cb until the member decides to part + * the group. The @a replay_cb function may be called at any time by the + * multicast service to support relaying messages to other members of the group. + * + * @param cfg + * Configuration to use. + * @param group_key + * ECC public key that identifies the group to join. + * @param member_key + * ECC key that identifies the member + * and used to sign requests sent to the origin. + * @param origin + * Peer ID of the origin to send unicast requsets to. If NULL, + * unicast requests are sent back via multiple hops on the reverse path + * of multicast messages. + * @param relay_count + * Number of peers in the @a relays array. + * @param relays + * Peer identities of members of the group, which serve as relays + * and can be used to join the group at. and send the @a join_request to. + * If empty, the @a join_request is sent directly to the @a origin. + * @param join_msg + * Application-dependent join message to be passed to the peer @a origin. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param join_decision_cb + * Function called to inform about the join decision. + * @param replay_frag_cb + * Function that can be called to replay message fragments + * this peer already knows from this group. NULL if this + * client is unable to support replay. + * @param replay_msg_cb + * Function that can be called to replay message fragments + * this peer already knows from this group. NULL if this + * client is unable to support replay. + * @param message_cb + * Function to be called for all message fragments we + * receive from the group, excluding those our @a replay_cb + * already has. + * @param cls + * Closure for callbacks. + * + * @return Handle for the member, NULL on error. + */ +struct GNUNET_MULTICAST_Member * +GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key, + const struct GNUNET_PeerIdentity *origin, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, + GNUNET_MULTICAST_MessageCallback message_cb, + void *cls) +{ + struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem)); + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + uint16_t relay_size = relay_count * sizeof (*relays); + uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; + struct MulticastMemberJoinMessage *join; + grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); + join->group_pub_key = *group_pub_key; + join->member_key = *member_key; + join->origin = *origin; + join->relay_count = ntohl (relay_count); + if (0 < relay_size) + GNUNET_memcpy (&join[1], relays, relay_size); + if (0 < join_msg_size) + GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); + + grp->cfg = cfg; + grp->is_origin = GNUNET_NO; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + + mem->join_dcsn_cb = join_decision_cb; + grp->join_req_cb = join_request_cb; + grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; + grp->message_cb = message_cb; + grp->cb_cls = cls; + + member_connect (mem); + return mem; +} + + +/** + * Part a multicast group. + * + * Disconnects from all group members and invalidates the @a member handle. + * + * An application-dependent part message can be transmitted beforehand using + * #GNUNET_MULTICAST_member_to_origin()) + * + * @param member + * Membership handle. + */ +void +GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, + GNUNET_ContinuationCallback part_cb, + void *part_cls) +{ + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MQ_Envelope *env; + + mem->join_dcsn_cb = NULL; + grp->join_req_cb = NULL; + grp->message_cb = NULL; + grp->replay_msg_cb = NULL; + grp->replay_frag_cb = NULL; + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = part_cb; + grp->disconnect_cls = part_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); +} + + +void +member_replay_request (struct GNUNET_MULTICAST_Member *mem, + uint64_t fragment_id, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags) +{ + struct MulticastReplayRequestMessage *rep; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST); + + rep->fragment_id = GNUNET_htonll (fragment_id); + rep->message_id = GNUNET_htonll (message_id); + rep->fragment_offset = GNUNET_htonll (fragment_offset); + rep->flags = GNUNET_htonll (flags); + + GNUNET_MQ_send (mem->grp.mq, env); +} + + +/** + * Request a fragment to be replayed by fragment ID. + * + * Useful if messages below the @e max_known_fragment_id given when joining are + * needed and not known to the client. + * + * @param member + * Membership handle. + * @param fragment_id + * ID of a message fragment that this client would like to see replayed. + * @param flags + * Additional flags for the replay request. + * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback + * + * @return Replay request handle. + */ +struct GNUNET_MULTICAST_MemberReplayHandle * +GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, + uint64_t fragment_id, + uint64_t flags) +{ + member_replay_request (mem, fragment_id, 0, 0, flags); + // FIXME: return something useful + return NULL; +} + + +/** + * Request a message fragment to be replayed. + * + * Useful if messages below the @e max_known_fragment_id given when joining are + * needed and not known to the client. + * + * @param member + * Membership handle. + * @param message_id + * ID of the message this client would like to see replayed. + * @param fragment_offset + * Offset of the fragment within the message to replay. + * @param flags + * Additional flags for the replay request. + * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback + * + * @return Replay request handle, NULL on error. + */ +struct GNUNET_MULTICAST_MemberReplayHandle * +GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags) +{ + member_replay_request (mem, 0, message_id, fragment_offset, flags); + // FIXME: return something useful + return NULL; +} + + +static void +member_to_origin (struct GNUNET_MULTICAST_Member *mem) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); + + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_RequestHeader *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req), + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); + + int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); + + if (! (GNUNET_YES == ret || GNUNET_NO == ret) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "MemberTransmitNotify() returned error or invalid message size. " + "ret=%d, buf_size=%u\n", ret, buf_size); + /* FIXME: handle error */ + GNUNET_MQ_discard (env); + return; + } + + if (GNUNET_NO == ret && 0 == buf_size) + { + /* Transmission paused. */ + GNUNET_MQ_discard (env); + return; + } + + req->header.size = htons (sizeof (*req) + buf_size); + req->request_id = GNUNET_htonll (tmit->request_id); + req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); + tmit->fragment_offset += sizeof (*req) + buf_size; + + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; +} + + +/** + * Send a message to the origin of the multicast group. + * + * @param mem + * Membership handle. + * @param request_id + * Application layer ID for the request. Opaque to multicast. + * @param notify + * Callback to call to get the message. + * @param notify_cls + * Closure for @a notify. + * + * @return Handle to cancel request, NULL on error (i.e. request already pending). + */ +struct GNUNET_MULTICAST_MemberTransmitHandle * +GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, + uint64_t request_id, + GNUNET_MULTICAST_MemberTransmitNotify notify, + void *notify_cls) +{ + if (GNUNET_YES == mem->grp.in_transmit) + return NULL; + mem->grp.in_transmit = GNUNET_YES; + + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + tmit->member = mem; + tmit->request_id = request_id; + tmit->fragment_offset = 0; + tmit->notify = notify; + tmit->notify_cls = notify_cls; + + member_to_origin (mem); + return tmit; +} + + +/** + * Resume message transmission to origin. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) +{ + struct GNUNET_MULTICAST_Group *grp = &th->member->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; + member_to_origin (th->member); +} + + +/** + * Cancel request for message transmission to origin. + * + * @param th + * Transmission to cancel. + */ +void +GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) +{ + th->member->grp.in_transmit = GNUNET_NO; +} + + +/* end of multicast_api.c */ diff --git a/src/multicast/test_multicast.c b/src/multicast/test_multicast.c new file mode 100644 index 000000000..b03c52980 --- /dev/null +++ b/src/multicast/test_multicast.c @@ -0,0 +1,758 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file multicast/test_multicast.c + * @brief Tests for the Multicast API. + * @author Gabor X Toth + */ + +#include + +#include +#include +#include +#include +#include +#include "gnunet_multicast_service.h" + +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + +/** + * Return value from 'main'. + */ +static int res; + +/** + * Handle for task for timeout termination. + */ +static struct GNUNET_SCHEDULER_Task * end_badly_task; + +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +struct GNUNET_PeerIdentity this_peer; + +struct GNUNET_MULTICAST_Origin *origin; +struct GNUNET_MULTICAST_Member *member; + +struct GNUNET_CRYPTO_EddsaPrivateKey *group_key; +struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + +struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key; +struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + +struct TransmitClosure { + struct GNUNET_MULTICAST_OriginTransmitHandle *orig_tmit; + struct GNUNET_MULTICAST_MemberTransmitHandle *mem_tmit; + char * data[16]; + uint8_t data_delay[16]; + uint8_t data_count; + uint8_t paused; + uint8_t n; +} tmit_cls; + +struct OriginClosure { + uint8_t msgs_expected; + uint8_t n; +} origin_cls; + +struct MemberClosure { + uint8_t msgs_expected; + size_t n; +} member_cls; + +struct GNUNET_MessageHeader *join_req, *join_resp; + +enum +{ + TEST_NONE = 0, + TEST_ORIGIN_START = 1, + TEST_MEMBER_JOIN_REFUSE = 2, + TEST_MEMBER_JOIN_ADMIT = 3, + TEST_ORIGIN_TO_ALL = 4, + TEST_ORIGIN_TO_ALL_RECV = 5, + TEST_MEMBER_TO_ORIGIN = 6, + TEST_MEMBER_REPLAY_ERROR = 7, + TEST_MEMBER_REPLAY_OK = 8, + TEST_MEMBER_PART = 9, + TEST_ORIGIN_STOP = 10, +} test; + +uint64_t replay_fragment_id; +uint64_t replay_flags; + +static void +member_join (int t); + + +/** + * Clean up all resources used. + */ +static void +cleanup () +{ + if (NULL != member) + { + GNUNET_MULTICAST_member_part (member, NULL, NULL); + member = NULL; + } + if (NULL != origin) + { + GNUNET_MULTICAST_origin_stop (origin, NULL, NULL); + origin = NULL; + } +} + + +/** + * Terminate the test case (failure). + * + * @param cls NULL + */ +static void +end_badly (void *cls) +{ + res = 1; + cleanup (); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n"); +} + + +/** + * Terminate the test case (success). + * + * @param cls NULL + */ +static void +end_normally (void *cls) +{ + res = 0; + cleanup (); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Test PASSED.\n"); +} + + +/** + * Finish the test case (successfully). + */ +static void +end () +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n"); + + if (end_badly_task != NULL) + { + GNUNET_SCHEDULER_cancel (end_badly_task); + end_badly_task = NULL; + } + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS, + &end_normally, NULL); +} + + +static void +tmit_resume (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); + struct TransmitClosure *tmit = cls; + if (NULL != tmit->orig_tmit) + GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit); + else if (NULL != tmit->mem_tmit) + GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit); +} + + +static int +tmit_notify (void *cls, size_t *data_size, void *data) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Test #%u: origin_tmit_notify()\n", test); + struct TransmitClosure *tmit = cls; + + if (0 == tmit->data_count) + { + *data_size = 0; + return GNUNET_YES; + } + + uint16_t size = strlen (tmit->data[tmit->n]); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmit notify data: %u bytes available, processing fragment %u/%u (size %u).\n", + (unsigned int) *data_size, + tmit->n + 1, + tmit->data_count, + size); + if (*data_size < size) + { + *data_size = 0; + GNUNET_assert (0); + return GNUNET_SYSERR; + } + + if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); + tmit->paused = GNUNET_YES; + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + tmit->data_delay[tmit->n]), + tmit_resume, tmit); + *data_size = 0; + return GNUNET_NO; + } + tmit->paused = GNUNET_NO; + + *data_size = size; + GNUNET_memcpy (data, tmit->data[tmit->n], size); + + return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; +} + + +static void +member_recv_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_recv_join_request()\n", test); +} + + +static void +origin_stopped (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_stopped()\n", test); + end (); +} + + +static void +schedule_origin_stop (void *cls) +{ + test = TEST_ORIGIN_STOP; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_stop()\n", test); + GNUNET_MULTICAST_origin_stop (origin, origin_stopped, NULL); + origin = NULL; +} + + +static void +member_parted (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_parted()\n", test); + member = NULL; + + switch (test) + { + case TEST_MEMBER_JOIN_REFUSE: + // Test 3 starts here + member_join (TEST_MEMBER_JOIN_ADMIT); + break; + + case TEST_MEMBER_PART: + GNUNET_SCHEDULER_add_now (&schedule_origin_stop, NULL); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in member_parted()\n", test); + GNUNET_assert (0); + } +} + + +static void +schedule_member_part (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: schedule_member_part()\n", test); + GNUNET_MULTICAST_member_part (member, member_parted, NULL); +} + + +static void +member_part () +{ + test = TEST_MEMBER_PART; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_part()\n", test); + // Test 10 starts here + GNUNET_SCHEDULER_add_now (&schedule_member_part, NULL); +} + + +static void +member_replay_ok () +{ + // Execution of test 8 here + test = TEST_MEMBER_REPLAY_OK; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_replay_ok()\n", test); + replay_fragment_id = 1; + replay_flags = 1 | 1<<11; + GNUNET_MULTICAST_member_replay_fragment (member, replay_fragment_id, + replay_flags); +} + + +static void +member_replay_error () +{ + test = TEST_MEMBER_REPLAY_ERROR; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_replay_error()\n", test); + replay_fragment_id = 1234; + replay_flags = 11 | 1<<11; + GNUNET_MULTICAST_member_replay_fragment (member, replay_fragment_id, + replay_flags); +} + + +static void +origin_recv_replay_msg (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_recv_replay_msg()\n", test); + GNUNET_assert (0); +} + + +static void +member_recv_replay_msg (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_recv_replay_msg()\n", test); + GNUNET_assert (0); +} + + +static void +origin_recv_replay_frag (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key, + uint64_t fragment_id, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_recv_replay_frag()" + " - fragment_id=%" PRIu64 " flags=%" PRIu64 "\n", + test, fragment_id, flags); + GNUNET_assert (replay_fragment_id == fragment_id && replay_flags == flags); + switch (test) + { + case TEST_MEMBER_REPLAY_ERROR: + // Test 8 starts here + GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_SYSERR); + member_replay_ok (); + break; + + case TEST_MEMBER_REPLAY_OK: + { + struct GNUNET_MULTICAST_MessageHeader mmsg = { + .header = { + .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE), + .size = htons (sizeof (mmsg)), + }, + .fragment_id = GNUNET_htonll (1), + .message_id = GNUNET_htonll (1), + .fragment_offset = 0, + .group_generation = GNUNET_htonll (1), + .flags = 0, + }; + member_cls.n = 0; + member_cls.msgs_expected = 1; + GNUNET_MULTICAST_replay_response (rh, &mmsg.header, GNUNET_MULTICAST_REC_OK); + GNUNET_MULTICAST_replay_response_end (rh); + break; + } + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in origin_recv_replay_frag()\n", test); + GNUNET_assert (0); + } +} + + +static void +member_recv_replay_frag (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key, + uint64_t fragment_id, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_recv_replay_frag()\n", test); + GNUNET_assert (0); +} + + +static void +origin_recv_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct OriginClosure *ocls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_recv_request()\n", test); + if (++ocls->n != ocls->msgs_expected) + return; + + GNUNET_assert (0 == memcmp (&req->member_pub_key, + &member_pub_key, sizeof (member_pub_key))); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Test #%u: verify message content, take first 3 bytes: %.3s\n", + test, (char *)&req[1]); + GNUNET_assert (0 == memcmp (&req[1], "abc", 3)); + + // Test 7 starts here + member_replay_error (); +} + + +static void +member_to_origin () +{ + test = TEST_MEMBER_TO_ORIGIN; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_to_origin()\n", test); + + struct TransmitClosure *tmit = &tmit_cls; + *tmit = (struct TransmitClosure) {}; + tmit->data[0] = "abc def"; + tmit->data[1] = "ghi jkl mno"; + tmit->data_delay[1] = 2; + tmit->data[2] = "pqr stuw xyz"; + tmit->data_count = 3; + + origin_cls.n = 0; + origin_cls.msgs_expected = 1; + + tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1, + tmit_notify, tmit); +} + + +static void +member_recv_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct MemberClosure *mcls = cls; + + // Test 5 starts here after message has been received from origin + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Test #%u: member_recv_message() %u/%u\n", + test, + (unsigned int) (mcls->n + 1), + mcls->msgs_expected); + if (++mcls->n != mcls->msgs_expected) + return; + + // FIXME: check message content + + switch (test) + { + case TEST_ORIGIN_TO_ALL: + test = TEST_ORIGIN_TO_ALL_RECV; + break; + + case TEST_ORIGIN_TO_ALL_RECV: + // Test 6 starts here + member_to_origin (); + break; + + case TEST_MEMBER_REPLAY_OK: + // Test 9 starts here + GNUNET_assert (replay_fragment_id == GNUNET_ntohll (msg->fragment_id)); + member_part (); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in origin_recv_message()\n", test); + GNUNET_assert (0); + } +} + + +static void +origin_recv_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct OriginClosure *ocls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_recv_message() %u/%u\n", + test, ocls->n + 1, ocls->msgs_expected); + if (++ocls->n != ocls->msgs_expected) + return; + + // FIXME: check message content + + switch (test) + { + case TEST_ORIGIN_TO_ALL: + // Prepare to execute test 5 + test = TEST_ORIGIN_TO_ALL_RECV; + break; + + case TEST_ORIGIN_TO_ALL_RECV: + // Test 6 starts here + member_to_origin (); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in origin_recv_message()\n", test); + GNUNET_assert (0); + } +} + + +static void +origin_to_all () +{ + test = TEST_ORIGIN_TO_ALL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_to_all()\n", test); + + struct TransmitClosure *tmit = &tmit_cls; + *tmit = (struct TransmitClosure) {}; + tmit->data[0] = "ABC DEF"; + tmit->data[1] = GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1); + uint16_t i; + for (i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++) + tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_'; + tmit->data[2] = "GHI JKL MNO"; + tmit->data_delay[2] = 2; + tmit->data[3] = "PQR STUW XYZ"; + tmit->data_count = 4; + + origin_cls.n = member_cls.n = 0; + origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count; + + tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1, + tmit_notify, tmit); +} + + +static void +member_recv_join_decision (void *cls, + int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_recv_join_decision() - is_admitted: %d\n", + test, is_admitted); + + GNUNET_assert (join_msg->size == join_resp->size); + GNUNET_assert (join_msg->type == join_resp->type); + GNUNET_assert (0 == memcmp (join_msg, join_resp, ntohs (join_resp->size))); + + switch (test) + { + case TEST_MEMBER_JOIN_REFUSE: + GNUNET_assert (0 == relay_count); + // Test 3 starts here + GNUNET_SCHEDULER_add_now (&schedule_member_part, NULL); + break; + + case TEST_MEMBER_JOIN_ADMIT: + GNUNET_assert (1 == relay_count); + GNUNET_assert (0 == memcmp (relays, &this_peer, sizeof (this_peer))); + // Test 4 starts here + origin_to_all (); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in member_recv_join_decision()\n", test); + GNUNET_assert (0); + } +} + +/** + * Test: origin receives join request + */ +static void +origin_recv_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *mem_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_recv_join_request()\n", test); + + GNUNET_assert (0 == memcmp (mem_key, &member_pub_key, sizeof (member_pub_key))); + GNUNET_assert (join_msg->size == join_req->size); + GNUNET_assert (join_msg->type == join_req->type); + GNUNET_assert (0 == memcmp (join_msg, join_req, ntohs (join_req->size))); + + char data[] = "here's the decision"; + uint8_t data_size = strlen (data) + 1; + join_resp = GNUNET_malloc (sizeof (join_resp) + data_size); + join_resp->size = htons (sizeof (join_resp) + data_size); + join_resp->type = htons (456); + GNUNET_memcpy (&join_resp[1], data, data_size); + + switch (test) + { + case TEST_MEMBER_JOIN_REFUSE: + // Test 3 starts here + GNUNET_MULTICAST_join_decision (jh, GNUNET_NO, 0, NULL, join_resp); + break; + + case TEST_MEMBER_JOIN_ADMIT: + // Test 3 is running + GNUNET_MULTICAST_join_decision (jh, GNUNET_YES, 1, &this_peer, join_resp); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid test #%d in origin_recv_join_request()\n", test); + GNUNET_assert (0); + break; + } +} + +/** + * Test: member joins multicast group + */ +static void +member_join (int t) +{ + test = t; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: member_join()\n", test); + + member_key = GNUNET_CRYPTO_ecdsa_key_create (); + GNUNET_CRYPTO_ecdsa_key_get_public (member_key, &member_pub_key); + + if (NULL != join_req) + GNUNET_free (join_req); + + char data[] = "let me in!"; + uint8_t data_size = strlen (data) + 1; + join_req = GNUNET_malloc (sizeof (join_req) + data_size); + join_req->size = htons (sizeof (join_req) + data_size); + join_req->type = htons (123); + GNUNET_memcpy (&join_req[1], data, data_size); + + member = GNUNET_MULTICAST_member_join (cfg, &group_pub_key, member_key, + &this_peer, 1, &this_peer, join_req, + member_recv_join_request, + member_recv_join_decision, + member_recv_replay_frag, + member_recv_replay_msg, + member_recv_message, + &member_cls); +} + +/** + * Test: Start a multicast group as origin + */ +static void +origin_start () +{ + test = TEST_ORIGIN_START; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Test #%u: origin_start()\n", test); + + group_key = GNUNET_CRYPTO_eddsa_key_create (); + GNUNET_CRYPTO_eddsa_key_get_public (group_key, &group_pub_key); + + origin = GNUNET_MULTICAST_origin_start (cfg, group_key, 0, + origin_recv_join_request, + origin_recv_replay_frag, + origin_recv_replay_msg, + origin_recv_request, + origin_recv_message, + &origin_cls); + // Test 2 starts here + member_join (TEST_MEMBER_JOIN_REFUSE); +} + + +/** + * Main function of the test, run from scheduler. + * + * @param cls NULL + * @param cfg configuration we use (also to connect to Multicast service) + * @param peer handle to access more of the peer (not used) + */ +static void +#if DEBUG_TEST_MULTICAST +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +#else +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_TESTING_Peer *peer) +#endif +{ + cfg = c; + end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, + &end_badly, NULL); + GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer); + + // Test 1 starts here + origin_start (); +} + + +int +main (int argc, char *argv[]) +{ + res = 1; +#if DEBUG_TEST_MULTICAST + const struct GNUNET_GETOPT_CommandLineOption opts[] = { + GNUNET_GETOPT_OPTION_END + }; + if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "test-multicast", + "test-multicast [options]", + opts, &run, NULL)) + return 1; +#else + if (0 != GNUNET_TESTING_peer_run ("test-multicast", "test_multicast.conf", &run, NULL)) + return 1; +#endif + return res; +} + +/* end of test_multicast.c */ diff --git a/src/multicast/test_multicast_2peers.c b/src/multicast/test_multicast_2peers.c new file mode 100644 index 000000000..009a0a11a --- /dev/null +++ b/src/multicast/test_multicast_2peers.c @@ -0,0 +1,520 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file multicast/test_multicast_2peers.c + * @brief Tests for the Multicast API with two peers doing the ping + * pong test. + * @author xrs + */ + +#include + +#include +#include +#include +#include +#include +#include "gnunet_multicast_service.h" + +#define NUM_PEERS 2 + +static struct GNUNET_TESTBED_Operation *op0; +static struct GNUNET_TESTBED_Operation *op1; +static struct GNUNET_TESTBED_Operation *pi_op0; +static struct GNUNET_TESTBED_Operation *pi_op1; + +static struct GNUNET_TESTBED_Peer **peers; +const struct GNUNET_PeerIdentity *peer_id[2]; + +static struct GNUNET_SCHEDULER_Task *timeout_tid; + +static struct GNUNET_MULTICAST_Origin *origin; +static struct GNUNET_MULTICAST_Member *member; + +struct GNUNET_CRYPTO_EddsaPrivateKey *group_key; +struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + +struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key; +struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + +/** + * Global result for testcase. + */ +static int result; + + +/** + * Function run on CTRL-C or shutdown (i.e. success/timeout/etc.). + * Cleans up. + */ +static void +shutdown_task (void *cls) +{ + if (NULL != op0) + { + GNUNET_TESTBED_operation_done (op0); + op0 = NULL; + } + if (NULL != op1) + { + GNUNET_TESTBED_operation_done (op1); + op1 = NULL; + } + if (NULL != pi_op0) + { + GNUNET_TESTBED_operation_done (pi_op0); + pi_op0 = NULL; + } + if (NULL != pi_op1) + { + GNUNET_TESTBED_operation_done (pi_op1); + pi_op1 = NULL; + } + if (NULL != timeout_tid) + { + GNUNET_SCHEDULER_cancel (timeout_tid); + timeout_tid = NULL; + } +} + + +static void +timeout_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Timeout!\n"); + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +member_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Member sent a join request.\n"); + +} + + +static int +notify (void *cls, + size_t *data_size, + void *data) +{ + + char text[] = "ping"; + *data_size = strlen(text)+1; + GNUNET_memcpy(data, text, *data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Member sents message to origin: %s\n", text); + + return GNUNET_YES; +} + + +static void +member_join_decision (void *cls, + int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Member received a decision from origin: %s\n", + (GNUNET_YES == is_admitted) + ? "accepted" + : "rejected"); + + if (GNUNET_YES == is_admitted) + { + struct GNUNET_MULTICAST_MemberTransmitHandle *req; + + // FIXME: move to MQ-style API! + req = GNUNET_MULTICAST_member_to_origin (member, + 0, + ¬ify, + NULL); + } +} + + +static void +member_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + if (0 != strncmp ("pong", (char *)&msg[1], 4)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "member did not receive pong\n"); + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "member receives: %s\n", (char *)&msg[1]); + + // Testcase ends here. + result = GNUNET_YES; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +origin_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + struct GNUNET_MessageHeader *join_resp; + + uint8_t data_size = ntohs (join_msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin got a join request...\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin receives: '%s'\n", (char *)&join_msg[1]); + + const char data[] = "Come in!"; + data_size = strlen (data) + 1; + join_resp = GNUNET_malloc (sizeof (join_resp) + data_size); + join_resp->size = htons (sizeof (join_resp) + data_size); + join_resp->type = htons (123); + GNUNET_memcpy (&join_resp[1], data, data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin sends: '%s'\n", data); + + GNUNET_MULTICAST_join_decision (jh, + GNUNET_YES, + 0, + NULL, + join_resp); + GNUNET_free (join_resp); + result = GNUNET_OK; +} + + +int +origin_notify (void *cls, + size_t *data_size, + void *data) +{ + char text[] = "pong"; + + *data_size = strlen(text)+1; + GNUNET_memcpy (data, + text, + *data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin sends (to all): %s\n", text); + + return GNUNET_YES; +} + + +static void +origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin receives: %s\n", (char *)&req[1]); + + if (0 != strncmp ("ping", (char *)&req[1], 4)) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "origin didn't reveice a correct request"); + + GNUNET_MULTICAST_origin_to_all (origin, + 0, + 0, + origin_notify, + NULL); +} + + +static void +origin_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin message msg\n"); +} + + +static void +service_connect1 (void *cls, + struct GNUNET_TESTBED_Operation *op, + void *ca_result, + const char *emsg) +{ + member = ca_result; + + if (NULL == member) + { + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connected to multicast service of member\n"); + } +} + + +static void +multicast_da1 (void *cls, + void * op_result) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Member parting from multicast group\n"); + + GNUNET_MULTICAST_member_part (member, NULL, NULL); +} + + +static void * +multicast_ca1 (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_MessageHeader *join_msg; + void *ret; + + // Get members keys + member_key = GNUNET_CRYPTO_ecdsa_key_create (); + GNUNET_CRYPTO_ecdsa_key_get_public (member_key, &member_pub_key); + + char data[] = "Hi, can I enter?"; + uint8_t data_size = strlen (data) + 1; + join_msg = GNUNET_malloc (sizeof (join_msg) + data_size); + join_msg->size = htons (sizeof (join_msg) + data_size); + join_msg->type = htons (123); + GNUNET_memcpy (&join_msg[1], data, data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Members tries to join multicast group\n"); + + ret = GNUNET_MULTICAST_member_join (cfg, + &group_pub_key, + member_key, + peer_id[0], + 0, + NULL, + join_msg, /* join message */ + member_join_request, + member_join_decision, + NULL, /* no test for member_replay_frag */ + NULL, /* no test for member_replay_msg */ + member_message, + NULL); + GNUNET_free (join_msg); + return ret; +} + + +static void +peer_information_cb (void *cls, + struct GNUNET_TESTBED_Operation *op, + const struct GNUNET_TESTBED_PeerInformation *pinfo, + const char *emsg) +{ + int i = (int) (long) cls; + + if (NULL == pinfo) + { + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } + + peer_id[i] = pinfo->result.id; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got peer information of %s (%s)\n", (0==i)?"origin":"member" ,GNUNET_i2s(pinfo->result.id)); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Create member peer\n"); + + if (0 == i) + { + /* connect to multicast service of member */ + op1 = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ + peers[1], /* The peer whose service to connect to */ + "multicast", /* The name of the service */ + service_connect1, /* callback to call after a handle to service + is opened */ + NULL, /* closure for the above callback */ + multicast_ca1, /* callback to call with peer's configuration; + this should open the needed service connection */ + multicast_da1, /* callback to be called when closing the + opened service connection */ + NULL); /* closure for the above two callbacks */ + } +} + + +/** + * Test logic of peer "0" being origin starts here. + * + * @param cls closure, for the example: NULL + * @param op should be equal to "dht_op" + * @param ca_result result of the connect operation, the + * connection to the DHT service + * @param emsg error message, if testbed somehow failed to + * connect to the DHT. + */ +static void +service_connect0 (void *cls, + struct GNUNET_TESTBED_Operation *op, + void *ca_result, + const char *emsg) +{ + origin = ca_result; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Connected to multicast service of origin\n"); + + // Get GNUnet identity of origin + pi_op0 = GNUNET_TESTBED_peer_get_information (peers[0], + GNUNET_TESTBED_PIT_IDENTITY, + peer_information_cb, + (void *) 0); + // Get GNUnet identity of member + pi_op1 = GNUNET_TESTBED_peer_get_information (peers[1], + GNUNET_TESTBED_PIT_IDENTITY, + peer_information_cb, + (void *) 1); + + /* Connection to service successful. Here we'd usually do something with + * the service. */ + result = GNUNET_OK; + //GNUNET_SCHEDULER_shutdown (); /* Also kills the testbed */ +} + + + +/** + * Function run when service multicast has started and is providing us + * with a configuration file. + */ +static void * +multicast_ca0 (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + group_key = GNUNET_CRYPTO_eddsa_key_create (); + GNUNET_CRYPTO_eddsa_key_get_public (group_key, &group_pub_key); + + return GNUNET_MULTICAST_origin_start (cfg, + group_key, + 0, + origin_join_request, + NULL, /* no test for origin_replay_frag */ + NULL, /* no test for origin_replay_msg */ + origin_request, + origin_message, + NULL); +} + +static void +multicast_da0 (void *cls, + void *op_result) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Origin closes multicast group\n"); + + GNUNET_MULTICAST_origin_stop (origin, NULL, NULL); +} + + +/** + * Main function inovked from TESTBED once all of the + * peers are up and running. This one then connects + * just to the multicast service of peer 0 and 1. + * Peer 0 is going to be origin. + * Peer 1 is going to be one member. + * Origin will start a multicast group and the member will try to join it. + * After that we execute some multicast test. + * + * @param cls closure + * @param h the run handle + * @param peers started peers for the test + * @param num_peers size of the 'peers' array + * @param links_succeeded number of links between peers that were created + * @param links_failed number of links testbed was unable to establish + */ +static void +testbed_master (void *cls, + struct GNUNET_TESTBED_RunHandle *h, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **p, + unsigned int links_succeeded, + unsigned int links_failed) +{ + /* Testbed is ready with peers running and connected in a pre-defined overlay + topology (FIXME) */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Connected to testbed_master()\n"); + + peers = p; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Create origin peer\n"); + op0 = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ + peers[0], /* The peer whose service to connect to */ + "multicast", /* The name of the service */ + service_connect0, /* callback to call after a handle to service + is opened */ + NULL, /* closure for the above callback */ + multicast_ca0, /* callback to call with peer's configuration; + this should open the needed service connection */ + multicast_da0, /* callback to be called when closing the + opened service connection */ + NULL); /* closure for the above two callbacks */ + + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule a new task on shutdown */ + + /* Schedule the shutdown task with a delay of a few Seconds */ + timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 50), + &timeout_task, NULL); +} + + +int +main (int argc, char *argv[]) +{ + int ret; + + result = GNUNET_SYSERR; + ret = GNUNET_TESTBED_test_run + ("test-multicast-2peers", /* test case name */ + "test_multicast.conf", /* template configuration */ + NUM_PEERS, /* number of peers to start */ + 0LL, /* Event mask - set to 0 for no event notifications */ + NULL, /* Controller event callback */ + NULL, /* Closure for controller event callback */ + testbed_master, /* continuation callback to be called when testbed setup is complete */ + NULL); /* Closure for the test_master callback */ + if ( (GNUNET_OK != ret) || (GNUNET_OK != result) ) + return 1; + return 0; +} + + +/* end of test_multicast_2peers.c */ diff --git a/src/multicast/test_multicast_multipeer.c b/src/multicast/test_multicast_multipeer.c new file mode 100644 index 000000000..95f42f4dd --- /dev/null +++ b/src/multicast/test_multicast_multipeer.c @@ -0,0 +1,643 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file multicast/test_multicast_multipeers.c + * @brief Tests for the Multicast API with multiple peers. + * @author xrs + */ + +#include + +#include +#include +#include +#include +#include +#include "gnunet_multicast_service.h" + +#define PEERS_REQUESTED 12 + +struct MulticastPeerContext +{ + int peer; /* peer number */ + struct GNUNET_CRYPTO_EcdsaPrivateKey *key; + const struct GNUNET_PeerIdentity *id; + struct GNUNET_TESTBED_Operation *op; /* not yet in use */ + struct GNUNET_TESTBED_Operation *pi_op; /* not yet in use */ + int test_ok; +}; + +enum pingpong +{ + PING = 1, + PONG = 2 +}; + +struct pingpong_msg +{ + int peer; + enum pingpong msg; +}; + +static void service_connect (void *cls, + struct GNUNET_TESTBED_Operation *op, + void *ca_result, + const char *emsg); + +static struct MulticastPeerContext **multicast_peers; +static struct GNUNET_TESTBED_Peer **peers; + +static struct GNUNET_TESTBED_Operation *op[PEERS_REQUESTED]; +static struct GNUNET_TESTBED_Operation *pi_op[PEERS_REQUESTED]; + +static struct GNUNET_MULTICAST_Origin *origin; +static struct GNUNET_MULTICAST_Member *members[PEERS_REQUESTED]; /* first element always empty */ + +static struct GNUNET_SCHEDULER_Task *timeout_tid; + +static struct GNUNET_CRYPTO_EddsaPrivateKey *group_key; +static struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; +static struct GNUNET_HashCode group_pub_key_hash; + +/** + * Global result for testcase. + */ +static int result; + +/** + * Function run on CTRL-C or shutdown (i.e. success/timeout/etc.). + * Cleans up. + */ +static void +shutdown_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutdown_task!\n"); + for (int i=0;ikey); + GNUNET_free (multicast_peers[i]); + multicast_peers[i] = NULL; + } + GNUNET_free (multicast_peers); + multicast_peers = NULL; + } + + if (NULL != timeout_tid) + { + GNUNET_SCHEDULER_cancel (timeout_tid); + timeout_tid = NULL; + } +} + + +static void +timeout_task (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Timeout!\n"); + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +member_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u (%s) sent a join request.\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); +} + + +static int +notify (void *cls, + size_t *data_size, + void *data) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + + struct pingpong_msg *pp_msg = GNUNET_new (struct pingpong_msg); + pp_msg->peer = mc_peer->peer; + pp_msg->msg = PING; + + *data_size = sizeof (struct pingpong_msg); + GNUNET_memcpy(data, pp_msg, *data_size); + GNUNET_free (pp_msg); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u sents ping to origin\n", mc_peer->peer); + + return GNUNET_YES; +} + + +static void +member_join_decision (void *cls, + int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u (%s) received a decision from origin: %s\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id), + (GNUNET_YES == is_admitted)?"accepted":"rejected"); + + if (GNUNET_YES == is_admitted) + { + GNUNET_MULTICAST_member_to_origin (members[mc_peer->peer], + 0, + notify, + cls); + + } +} + + +static void +member_replay_frag () +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "member replay frag...\n"); +} + + +static void +member_replay_msg () +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "member replay msg...\n"); +} + + +static void +origin_disconnected_cb (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Origin disconnected. Shutting down.\n"); + result = GNUNET_YES; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +member_disconnected_cb (void *cls) +{ + for (int i = 1; i < PEERS_REQUESTED; ++i) + if (GNUNET_NO == multicast_peers[i]->test_ok) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All member disconnected. Stopping origin.\n"); + GNUNET_MULTICAST_origin_stop (origin, origin_disconnected_cb, cls); +} + + +static void +member_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + struct pingpong_msg *pp_msg = (struct pingpong_msg*) &(msg[1]); + + if (PONG == pp_msg->msg && mc_peer->peer == pp_msg->peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "peer #%i (%s) receives a pong\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); + mc_peer->test_ok = GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "peer #%u (%s) parting from multicast group\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); + + GNUNET_MULTICAST_member_part (members[mc_peer->peer], member_disconnected_cb, cls); + } +} + + +static void +origin_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + struct GNUNET_MessageHeader *join_resp; + + uint8_t data_size = ntohs (join_msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin got a join request...\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin receives: '%s'\n", (char *)&join_msg[1]); + + char data[] = "Come in!"; + data_size = strlen (data) + 1; + join_resp = GNUNET_malloc (sizeof (join_resp) + data_size); + join_resp->size = htons (sizeof (join_resp) + data_size); + join_resp->type = htons (123); + GNUNET_memcpy (&join_resp[1], data, data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "origin sends: '%s'\n", data); + + GNUNET_MULTICAST_join_decision (jh, + GNUNET_YES, + 0, + NULL, + join_resp); + + result = GNUNET_OK; +} + + +static void +origin_replay_frag (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + uint64_t fragment_id, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin replay fraq msg\n"); +} + + +static void +origin_replay_msg (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin replay msg\n"); +} + + +static int +origin_notify (void *cls, + size_t *data_size, + void *data) +{ + struct pingpong_msg *rcv_pp_msg = (struct pingpong_msg*)cls; + struct pingpong_msg *pp_msg = GNUNET_new (struct pingpong_msg); + + pp_msg->peer = rcv_pp_msg->peer; + pp_msg->msg = PONG; + *data_size = sizeof (struct pingpong_msg); + GNUNET_memcpy(data, pp_msg, *data_size); + GNUNET_free (pp_msg); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin sends pong\n"); + + return GNUNET_YES; +} + + +static void +origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin receives a msg\n"); + + req++; + struct pingpong_msg *pp_msg = (struct pingpong_msg *) req; + + if (1 != pp_msg->msg) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "origin didn't reveice a correct request"); + } + + GNUNET_MULTICAST_origin_to_all (origin, + 0, + 0, + origin_notify, + pp_msg); +} + + +static void +origin_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "origin message msg\n"); +} + + +static void +multicast_disconnect (void *cls, + void *op_result) +{ + +} + + +static void * +multicast_connect (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct MulticastPeerContext *multicast_peer = cls; + struct GNUNET_MessageHeader *join_msg; + char data[64]; + + if (0 == multicast_peer->peer) + { + group_key = GNUNET_CRYPTO_eddsa_key_create (); + GNUNET_CRYPTO_eddsa_key_get_public (group_key, &group_pub_key); + + GNUNET_CRYPTO_hash (&group_pub_key, sizeof (group_pub_key), &group_pub_key_hash); + origin = GNUNET_MULTICAST_origin_start (cfg, + group_key, + 0, + origin_join_request, + origin_replay_frag, + origin_replay_msg, + origin_request, + origin_message, + cls); + if (NULL == origin) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u could not create a multicast group", + multicast_peer->peer); + return NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u connected as origin to group %s\n", + multicast_peer->peer, + GNUNET_h2s (&group_pub_key_hash)); + return origin; + } + else + { + multicast_peer->key = GNUNET_CRYPTO_ecdsa_key_create (); + + sprintf(data, "Hi, I am peer #%u (%s). Can I enter?", + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id)); + uint8_t data_size = strlen (data) + 1; + join_msg = GNUNET_malloc (sizeof (join_msg) + data_size); + join_msg->size = htons (sizeof (join_msg) + data_size); + join_msg->type = htons (123); + GNUNET_memcpy (&join_msg[1], data, data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Peer #%u (%s) tries to join multicast group %s\n", + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id), + GNUNET_h2s (&group_pub_key_hash)); + + members[multicast_peer->peer] = + GNUNET_MULTICAST_member_join (cfg, + &group_pub_key, + multicast_peer->key, + multicast_peers[0]->id, + 0, + NULL, + join_msg, /* join message */ + member_join_request, + member_join_decision, + member_replay_frag, + member_replay_msg, + member_message, + cls); + return members[multicast_peer->peer]; + } +} + + +static void +peer_information_cb (void *cls, + struct GNUNET_TESTBED_Operation *operation, + const struct GNUNET_TESTBED_PeerInformation *pinfo, + const char *emsg) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + + if (NULL == pinfo) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got no peer information\n"); + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + } + + multicast_peers[mc_peer->peer]->id = pinfo->result.id; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got peer information of %s (%s)\n", + (0 == mc_peer->peer)? "origin" : "member", + GNUNET_i2s (pinfo->result.id)); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Create peer #%u (%s)\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); + + if (0 != mc_peer->peer) + { + /* connect to multicast service of members */ + op[mc_peer->peer] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[mc_peer->peer], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + cls, + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + cls); + } +} + + +static void +service_connect (void *cls, + struct GNUNET_TESTBED_Operation *op, + void *ca_result, + const char *emsg) +{ + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; + + if (NULL == ca_result) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Connection adapter not created for peer #%u (%s)\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); + + result = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown(); + } + + if (0 == mc_peer->peer) + { + // Get GNUnet identity of members + for (int i = 0; ipeer = i; + multicast_peers[i]->test_ok = GNUNET_NO; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Create origin peer\n"); + op[0] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[0], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + multicast_peers[0], + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + multicast_peers[0]); + /* Schedule a new task on shutdown */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + /* Schedule the shutdown task with a delay of a few Seconds */ + timeout_tid = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 400), + &timeout_task, + NULL); +} + + +int +main (int argc, char *argv[]) +{ + int ret; + char const *config_file; + + if (strstr (argv[0], "_line") != NULL) + { + config_file = "test_multicast_line.conf"; + } + else if (strstr(argv[0], "_star") != NULL) + { + config_file = "test_multicast_star.conf"; + } + else + { + config_file = "test_multicast_star.conf"; + } + + result = GNUNET_SYSERR; + ret = + GNUNET_TESTBED_test_run ("test-multicast-multipeer", + config_file, + /* number of peers to start */ + PEERS_REQUESTED, + /* Event mask - set to 0 for no event notifications */ + 0LL, + /* Controller event callback */ + NULL, + /* Closure for controller event callback */ + NULL, + /* called when testbed setup is complete */ + testbed_master, + /* Closure for the test_master callback */ + NULL); + if ( (GNUNET_OK != ret) || (GNUNET_OK != result) ) + return 1; + return 0; +} + +/* end of test_multicast_multipeer.c */ -- cgit v1.2.3