From 670ebb20b9570120df1021e467b575a212743125 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 3 Apr 2019 21:25:59 +0200 Subject: allow applications expressing connection preferences directly to TNG, collect HELLOs from PEERSTORE for expressed prefs --- src/transport/Makefile.am | 10 +- src/transport/gnunet-service-tng.c | 628 +++++++++++++++++------------ src/transport/transport.h | 32 ++ src/transport/transport_api2_application.c | 366 +++++++++++++++++ 4 files changed, 783 insertions(+), 253 deletions(-) create mode 100644 src/transport/transport_api2_application.c (limited to 'src/transport') diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index cd31f7cd7..2865460fd 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am @@ -155,6 +155,7 @@ endif lib_LTLIBRARIES = \ libgnunettransport.la \ libgnunettransportaddress.la \ + libgnunettransportapplication.la \ libgnunettransportcore.la \ libgnunettransportcommunicator.la \ libgnunettransportmonitor.la \ @@ -196,6 +197,14 @@ libgnunettransport_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) $(WINFLAGS) \ -version-info 4:0:2 +libgnunettransportapplication_la_SOURCES = \ + transport_api2_application.c +libgnunettransportapplication_la_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunettransportapplication_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) $(WINFLAGS) \ + -version-info 0:0:0 libgnunettransportaddress_la_SOURCES = \ @@ -360,7 +369,6 @@ gnunet_service_transport_CFLAGS = \ gnunet_service_tng_SOURCES = \ gnunet-service-tng.c gnunet_service_tng_LDADD = \ - $(top_builddir)/src/ats/libgnunetatstransport.la \ $(top_builddir)/src/peerstore/libgnunetpeerstore.la \ $(top_builddir)/src/hello/libgnunethello.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index b64bfb182..6494a5dfd 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -41,7 +41,7 @@ * #3 transport should use validation to also establish * effective flow control (for uni-directional transports!) * #4 UDP broadcasting logic must be extended to use the new API - * #5 only validated addresses go to ATS for scheduling; that + * #5 only validated addresses are selected for scheduling; that * also ensures we know the RTT * #6 to ensure flow control and RTT are OK, we always do the * 'validation', even if address comes from PEERSTORE @@ -59,10 +59,7 @@ * - * * Easy: - * - use ATS bandwidth allocation callback and schedule transmissions! - * - * Plan: - * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update()) + * - figure out how to call XXX_suggestion_cb! * * Later: * - change transport-core API to provide proper flow control in both @@ -98,8 +95,6 @@ * "latest timestamps seen" data * - if transport implements DV, we likely need a 3rd peermap * in addition to ephemerals and (direct) neighbours - * => in this data structure, we should track ATS metrics (distance, RTT, etc.) - * as well as latest timestamps seen, goodput, fragments for transmission, etc. * ==> check if stuff needs to be moved out of "Neighbour" * - transport should encapsualte core-level messages and do its * own ACKing for RTT/goodput/loss measurements _and_ fragment @@ -111,7 +106,6 @@ #include "gnunet_transport_monitor_service.h" #include "gnunet_peerstore_service.h" #include "gnunet_hello_lib.h" -#include "gnunet_ats_transport_service.h" #include "gnunet_signatures.h" #include "transport.h" @@ -148,18 +142,11 @@ #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512 /** - * How many messages can we have pending for a given session (queue to + * How many messages can we have pending for a given queue (queue to * a particular peer via a communicator) process before we start to * throttle that queue? - * - * Used if ATS assigns more bandwidth to a particular transmission - * method than that transmission method can right now handle. (Yes, - * ATS should eventually notice utilization below allocation and - * adjust, but we don't want to queue up tons of messages in the - * meantime). Must be significantly below - * #COMMUNICATOR_TOTAL_QUEUE_LIMIT. */ -#define SESSION_QUEUE_LIMIT 32 +#define QUEUE_LENGTH_LIMIT 32 GNUNET_NETWORK_STRUCT_BEGIN @@ -547,7 +534,6 @@ struct TransportDVBox GNUNET_NETWORK_STRUCT_END - /** * What type of client is the `struct TransportClient` about? */ @@ -571,7 +557,12 @@ enum ClientType /** * It is a communicator, use for communication. */ - CT_COMMUNICATOR = 3 + CT_COMMUNICATOR = 3, + + /** + * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET). + */ + CT_APPLICATION = 4 }; @@ -724,12 +715,19 @@ struct DistanceVector }; +/** + * A queue is a message queue provided by a communicator + * via which we can reach a particular neighbour. + */ +struct Queue; + + /** * Entry identifying transmission in one of our `struct - * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to + * Queue` which still awaits an ACK. This is used to * ensure we do not overwhelm a communicator and limit the number of * messages outstanding per communicator (say in case communicator is - * CPU bound) and per queue (in case ATS bandwidth allocation exceeds + * CPU bound) and per queue (in case bandwidth allocation exceeds * what the communicator can actually provide towards a particular * peer/target). */ @@ -747,9 +745,9 @@ struct QueueEntry struct QueueEntry *prev; /** - * ATS session this entry is queued with. + * Queue this entry is queued with. */ - struct GNUNET_ATS_Session *session; + struct Queue *queue; /** * Message ID used for this message with the queue used for transmission. @@ -759,30 +757,30 @@ struct QueueEntry /** - * An ATS session is a message queue provided by a communicator + * A queue is a message queue provided by a communicator * via which we can reach a particular neighbour. */ -struct GNUNET_ATS_Session +struct Queue { /** * Kept in a MDLL. */ - struct GNUNET_ATS_Session *next_neighbour; + struct Queue *next_neighbour; /** * Kept in a MDLL. */ - struct GNUNET_ATS_Session *prev_neighbour; + struct Queue *prev_neighbour; /** * Kept in a MDLL. */ - struct GNUNET_ATS_Session *prev_client; + struct Queue *prev_client; /** * Kept in a MDLL. */ - struct GNUNET_ATS_Session *next_client; + struct Queue *next_client; /** * Head of DLL of unacked transmission requests. @@ -795,25 +793,20 @@ struct GNUNET_ATS_Session struct QueueEntry *queue_tail; /** - * Which neighbour is this ATS session for? + * Which neighbour is this queue for? */ struct Neighbour *neighbour; /** - * Which communicator offers this ATS session? + * Which communicator offers this queue? */ struct TransportClient *tc; /** - * Address served by the ATS session. + * Address served by the queue. */ const char *address; - /** - * Handle by which we inform ATS about this queue. - */ - struct GNUNET_ATS_SessionRecord *sr; - /** * Task scheduled for the time when this queue can (likely) transmit the * next message. Still needs to check with the @e tracker_out to be sure. @@ -821,7 +814,7 @@ struct GNUNET_ATS_Session struct GNUNET_SCHEDULER_Task *transmit_task; /** - * Our current RTT estimate for this ATS session. + * Our current RTT estimate for this queue. */ struct GNUNET_TIME_Relative rtt; @@ -831,17 +824,17 @@ struct GNUNET_ATS_Session uint64_t mid_gen; /** - * Unique identifier of this ATS session with the communicator. + * Unique identifier of this queue with the communicator. */ uint32_t qid; /** - * Maximum transmission unit supported by this ATS session. + * Maximum transmission unit supported by this queue. */ uint32_t mtu; /** - * Distance to the target of this ATS session. + * Distance to the target of this queue. */ uint32_t distance; @@ -861,22 +854,22 @@ struct GNUNET_ATS_Session unsigned int queue_length; /** - * Network type offered by this ATS session. + * Network type offered by this queue. */ enum GNUNET_NetworkType nt; /** - * Connection status for this ATS session. + * Connection status for this queue. */ enum GNUNET_TRANSPORT_ConnectionStatus cs; /** - * How much outbound bandwidth do we have available for this session? + * How much outbound bandwidth do we have available for this queue? */ struct GNUNET_BANDWIDTH_Tracker tracker_out; /** - * How much inbound bandwidth do we have available for this session? + * How much inbound bandwidth do we have available for this queue? */ struct GNUNET_BANDWIDTH_Tracker tracker_in; }; @@ -1025,14 +1018,14 @@ struct Neighbour struct DistanceVectorHop *dv_tail; /** - * Head of DLL of ATS sessions to this peer. + * Head of DLL of queues to this peer. */ - struct GNUNET_ATS_Session *session_head; + struct Queue *queue_head; /** - * Tail of DLL of ATS sessions to this peer. + * Tail of DLL of queues to this peer. */ - struct GNUNET_ATS_Session *session_tail; + struct Queue *queue_tail; /** * Task run to cleanup pending messages that have exceeded their timeout. @@ -1040,13 +1033,12 @@ struct Neighbour struct GNUNET_SCHEDULER_Task *timeout_task; /** - * Quota at which CORE is allowed to transmit to this peer - * according to ATS. + * Quota at which CORE is allowed to transmit to this peer. * * FIXME: not yet used, tricky to get right given multiple queues! - * (=> Idea: let ATS set a quota per queue and we add them up here?) + * (=> Idea: measure???) * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero (before ATS?) + * Options: start at a minimum value or at literally zero? * (=> Current thought: clean would be zero!) */ struct GNUNET_BANDWIDTH_Value32NBO quota_out; @@ -1059,6 +1051,40 @@ struct Neighbour }; +/** + * A peer that an application (client) would like us to talk to directly. + */ +struct PeerRequest +{ + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity pid; + + /** + * Client responsible for the request. + */ + struct TransportClient *tc; + + /** + * Handle for watching the peerstore for HELLOs for this peer. + */ + struct GNUNET_PEERSTORE_WatchContext *wc; + + /** + * What kind of performance preference does this @e tc have? + */ + enum GNUNET_MQ_PreferenceKind pk; + + /** + * How much bandwidth would this @e tc like to see? + */ + struct GNUNET_BANDWIDTH_Value32NBO bw; + +}; + + /** * Types of different pending messages. */ @@ -1362,12 +1388,12 @@ struct TransportClient /** * Head of DLL of queues offered by this communicator. */ - struct GNUNET_ATS_Session *session_head; + struct Queue *queue_head; /** * Tail of DLL of queues offered by this communicator. */ - struct GNUNET_ATS_Session *session_tail; + struct Queue *queue_tail; /** * Head of list of the addresses of this peer offered by this communicator. @@ -1393,6 +1419,19 @@ struct TransportClient } communicator; + /** + * Information for @e type #CT_APPLICATION + */ + struct { + + /** + * Map of requests for peers the given client application would like to + * see connections for. Maps from PIDs to `struct PeerRequest`. + */ + struct GNUNET_CONTAINER_MultiPeerMap *requests; + + } application; + } details; }; @@ -1465,11 +1504,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map; */ static struct GNUNET_SCHEDULER_Task *ephemeral_task; -/** - * Our connection to ATS for allocation and bootstrapping. - */ -static struct GNUNET_ATS_TransportHandle *ats; - /** * Free cached ephemeral key. @@ -1781,7 +1815,7 @@ free_neighbour (struct Neighbour *neighbour) { struct DistanceVectorHop *dvh; - GNUNET_assert (NULL == neighbour->session_head); + GNUNET_assert (NULL == neighbour->queue_head); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, @@ -1886,7 +1920,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) * communicator for transmission (updating the tracker, and re-scheduling * itself if applicable). * - * @param cls the `struct GNUNET_ATS_Session` to process transmissions for + * @param cls the `struct Queue` to process transmissions for */ static void transmit_on_queue (void *cls); @@ -1902,7 +1936,7 @@ transmit_on_queue (void *cls); * @param queue the queue to do scheduling for */ static void -schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) +schedule_transmit_on_queue (struct Queue *queue) { struct Neighbour *n = queue->neighbour; struct PendingMessage *pm = n->pending_msg_head; @@ -1919,10 +1953,10 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) GNUNET_NO); return; } - if (queue->queue_length >= SESSION_QUEUE_LIMIT) + if (queue->queue_length >= QUEUE_LENGTH_LIMIT) { GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to session queue limit", + "# Transmission throttled due to queue queue limit", 1, GNUNET_NO); return; @@ -1958,15 +1992,15 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) /** - * Free @a session. + * Free @a queue. * - * @param session the session to free + * @param queue the queue to free */ static void -free_session (struct GNUNET_ATS_Session *session) +free_queue (struct Queue *queue) { - struct Neighbour *neighbour = session->neighbour; - struct TransportClient *tc = session->tc; + struct Neighbour *neighbour = queue->neighbour; + struct TransportClient *tc = queue->tc; struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN, .rtt = GNUNET_TIME_UNIT_FOREVER_REL @@ -1974,30 +2008,30 @@ free_session (struct GNUNET_ATS_Session *session) struct QueueEntry *qe; int maxxed; - if (NULL != session->transmit_task) + if (NULL != queue->transmit_task) { - GNUNET_SCHEDULER_cancel (session->transmit_task); - session->transmit_task = NULL; + GNUNET_SCHEDULER_cancel (queue->transmit_task); + queue->transmit_task = NULL; } GNUNET_CONTAINER_MDLL_remove (neighbour, - neighbour->session_head, - neighbour->session_tail, - session); + neighbour->queue_head, + neighbour->queue_tail, + queue); GNUNET_CONTAINER_MDLL_remove (client, - tc->details.communicator.session_head, - tc->details.communicator.session_tail, - session); + tc->details.communicator.queue_head, + tc->details.communicator.queue_tail, + queue); maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); - while (NULL != (qe = session->queue_head)) + while (NULL != (qe = queue->queue_head)) { - GNUNET_CONTAINER_DLL_remove (session->queue_head, - session->queue_tail, + GNUNET_CONTAINER_DLL_remove (queue->queue_head, + queue->queue_tail, qe); - session->queue_length--; + queue->queue_length--; tc->details.communicator.total_queue_length--; GNUNET_free (qe); } - GNUNET_assert (0 == session->queue_length); + GNUNET_assert (0 == queue->queue_length); if ( (maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) ) { @@ -2006,20 +2040,19 @@ free_session (struct GNUNET_ATS_Session *session) "# Transmission throttled due to communicator queue limit", -1, GNUNET_NO); - for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; + for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; s = s->next_client) schedule_transmit_on_queue (s); } notify_monitors (&neighbour->pid, - session->address, - session->nt, + queue->address, + queue->nt, &me); - GNUNET_ATS_session_del (session->sr); - GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); - GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); - GNUNET_free (session); - if (NULL == neighbour->session_head) + GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); + GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); + GNUNET_free (queue); + if (NULL == neighbour->queue_head) { cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); @@ -2054,6 +2087,33 @@ free_address_list_entry (struct AddressListEntry *ale) } +/** + * Stop the peer request in @a value. + * + * @param cls a `struct TransportClient` that no longer makes the request + * @param pid the peer's identity + * @param value a `struct PeerRequest` + * @return #GNUNET_YES (always) + */ +static int +stop_peer_request (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct TransportClient *tc = cls; + struct PeerRequest *pr = value; + + GNUNET_PEERSTORE_watch_cancel (pr->wc); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests, + pid, + pr)); + GNUNET_free (pr); + + return GNUNET_OK; +} + + /** * Called whenever a client is disconnected. Frees our * resources associated with that client. @@ -2097,16 +2157,22 @@ client_disconnect_cb (void *cls, break; case CT_COMMUNICATOR: { - struct GNUNET_ATS_Session *q; + struct Queue *q; struct AddressListEntry *ale; - while (NULL != (q = tc->details.communicator.session_head)) - free_session (q); + while (NULL != (q = tc->details.communicator.queue_head)) + free_queue (q); while (NULL != (ale = tc->details.communicator.addr_head)) - free_address_list_entry (ale); + free_address_list_entry (ale); GNUNET_free (tc->details.communicator.address_prefix); } break; + case CT_APPLICATION: + GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests, + &stop_peer_request, + tc); + GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests); + break; } GNUNET_free (tc); } @@ -2419,7 +2485,7 @@ handle_client_send (void *cls, } if (! was_empty) return; /* all queues must already be busy */ - for (struct GNUNET_ATS_Session *queue = target->session_head; + for (struct Queue *queue = target->queue_head; NULL != queue; queue = queue->next_neighbour) { @@ -2491,7 +2557,7 @@ handle_communicator_available (void *cls, */ static int check_communicator_backchannel (void *cls, - const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) + const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { const struct GNUNET_MessageHeader *inbox; const char *is; @@ -2565,10 +2631,10 @@ expire_ephemerals (void *cls) */ static void lookup_ephemeral (const struct GNUNET_PeerIdentity *pid, - struct GNUNET_CRYPTO_EcdhePrivateKey *private_key, - struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key, - struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig, - struct GNUNET_TIME_Absolute *ephemeral_validity) + struct GNUNET_CRYPTO_EcdhePrivateKey *private_key, + struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key, + struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig, + struct GNUNET_TIME_Absolute *ephemeral_validity) { struct EphemeralCacheEntry *ece; struct EphemeralConfirmation ec; @@ -2643,7 +2709,7 @@ route_message (const struct GNUNET_PeerIdentity *target, */ static void handle_communicator_backchannel (void *cls, - const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) + const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { struct TransportClient *tc = cls; struct GNUNET_CRYPTO_EcdhePrivateKey private_key; @@ -2729,7 +2795,7 @@ store_pi (void *cls); */ static void peerstore_store_cb (void *cls, - int success) + int success) { struct AddressListEntry *ale = cls; @@ -3178,7 +3244,7 @@ handle_fragment_box (void *cls, if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */ ack_now = GNUNET_YES; /* maximum acks received */ // FIXME: possibly also ACK based on RTT (but for that we'd need to - // determine the session used for the ACK first!) + // determine the queue used for the ACK first!) /* is reassembly complete? */ if (0 != rc->msg_missing) @@ -3289,7 +3355,7 @@ handle_reliability_box (void *cls, */ static void handle_reliability_ack (void *cls, - const struct TransportReliabilityAckMessage *ra) + const struct TransportReliabilityAckMessage *ra) { struct CommunicatorMessageContext *cmc = cls; @@ -3308,7 +3374,7 @@ handle_reliability_ack (void *cls, */ static int check_backchannel_encapsulation (void *cls, - const struct TransportBackchannelEncapsulationMessage *be) + const struct TransportBackchannelEncapsulationMessage *be) { uint16_t size = ntohs (be->header.size); @@ -3329,7 +3395,7 @@ check_backchannel_encapsulation (void *cls, */ static void handle_backchannel_encapsulation (void *cls, - const struct TransportBackchannelEncapsulationMessage *be) + const struct TransportBackchannelEncapsulationMessage *be) { struct CommunicatorMessageContext *cmc = cls; @@ -3361,7 +3427,7 @@ handle_backchannel_encapsulation (void *cls, */ static int check_dv_learn (void *cls, - const struct TransportDVLearn *dvl) + const struct TransportDVLearn *dvl) { uint16_t size = ntohs (dvl->header.size); uint16_t num_hops = ntohs (dvl->num_hops); @@ -3375,15 +3441,15 @@ check_dv_learn (void *cls, for (unsigned int i=0;iinitiator, - &hops[i], - sizeof (struct GNUNET_PeerIdentity))) + &hops[i], + sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } if (0 == memcmp (&GST_my_identity, - &hops[i], - sizeof (struct GNUNET_PeerIdentity))) + &hops[i], + sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -3401,7 +3467,7 @@ check_dv_learn (void *cls, */ static void handle_dv_learn (void *cls, - const struct TransportDVLearn *dvl) + const struct TransportDVLearn *dvl) { struct CommunicatorMessageContext *cmc = cls; @@ -3420,7 +3486,7 @@ handle_dv_learn (void *cls, */ static int check_dv_box (void *cls, - const struct TransportDVBox *dvb) + const struct TransportDVBox *dvb) { uint16_t size = ntohs (dvb->header.size); uint16_t num_hops = ntohs (dvb->num_hops); @@ -3614,12 +3680,12 @@ check_add_queue_message (void *cls, * Bandwidth tracker informs us that the delay until we should receive * more has changed. * - * @param cls a `struct GNUNET_ATS_Session` for which the delay changed + * @param cls a `struct Queue` for which the delay changed */ static void tracker_update_in_cb (void *cls) { - struct GNUNET_ATS_Session *queue = cls; + struct Queue *queue = cls; struct GNUNET_TIME_Relative in_delay; unsigned int rsize; @@ -3816,12 +3882,12 @@ reliability_box_message (struct PendingMessage *pm) * communicator for transmission (updating the tracker, and re-scheduling * itself if applicable). * - * @param cls the `struct GNUNET_ATS_Session` to process transmissions for + * @param cls the `struct Queue` to process transmissions for */ static void transmit_on_queue (void *cls) { - struct GNUNET_ATS_Session *queue = cls; + struct Queue *queue = cls; struct Neighbour *n = queue->neighbour; struct QueueEntry *qe; struct PendingMessage *pm; @@ -3871,7 +3937,7 @@ transmit_on_queue (void *cls) /* Pass 's' for transission to the communicator */ qe = GNUNET_new (struct QueueEntry); qe->mid = queue->mid_gen++; - qe->session = queue; + qe->queue = queue; // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'! GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, @@ -4007,12 +4073,12 @@ transmit_on_queue (void *cls) * Bandwidth tracker informs us that the delay until we * can transmit again changed. * - * @param cls a `struct GNUNET_ATS_Session` for which the delay changed + * @param cls a `struct Queue` for which the delay changed */ static void tracker_update_out_cb (void *cls) { - struct GNUNET_ATS_Session *queue = cls; + struct Queue *queue = cls; struct Neighbour *n = queue->neighbour; if (NULL == n->pending_msg_head) @@ -4032,7 +4098,7 @@ tracker_update_out_cb (void *cls) * Bandwidth tracker informs us that excessive outbound bandwidth was * allocated which is not being used. * - * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted + * @param cls a `struct Queue` for which the excess was noted */ static void tracker_excess_out_cb (void *cls) @@ -4041,7 +4107,7 @@ tracker_excess_out_cb (void *cls) this is done internally within transport_api2_core already, but we probably want to change the logic and trigger it from here via a message instead! */ - /* TODO: maybe inform ATS at this point? */ + /* TODO: maybe inform someone at this point? */ GNUNET_STATISTICS_update (GST_stats, "# Excess outbound bandwidth reported", 1, @@ -4054,12 +4120,12 @@ tracker_excess_out_cb (void *cls) * Bandwidth tracker informs us that excessive inbound bandwidth was allocated * which is not being used. * - * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted + * @param cls a `struct Queue` for which the excess was noted */ static void tracker_excess_in_cb (void *cls) { - /* TODO: maybe inform ATS at this point? */ + /* TODO: maybe inform somone at this point? */ GNUNET_STATISTICS_update (GST_stats, "# Excess inbound bandwidth reported", 1, @@ -4078,7 +4144,7 @@ handle_add_queue_message (void *cls, const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) { struct TransportClient *tc = cls; - struct GNUNET_ATS_Session *queue; + struct Queue *queue; struct Neighbour *neighbour; const char *addr; uint16_t addr_len; @@ -4108,7 +4174,7 @@ handle_add_queue_message (void *cls, addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; - queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len); + queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); queue->tc = tc; queue->address = (const char *) &queue[1]; queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL; @@ -4134,38 +4200,6 @@ handle_add_queue_message (void *cls, memcpy (&queue[1], addr, addr_len); - /* notify ATS about new queue */ - { - struct GNUNET_ATS_Properties prop = { - .delay = GNUNET_TIME_UNIT_FOREVER_REL, - .mtu = queue->mtu, - .nt = queue->nt, - .cc = tc->details.communicator.cc - }; - - queue->sr = GNUNET_ATS_session_add (ats, - &neighbour->pid, - queue->address, - queue, - &prop); - if (NULL == queue->sr) - { - /* This can only happen if the 'address' was way too long for ATS - (approaching 64k in strlen()!). In this case, the communicator - must be buggy and we drop it. */ - GNUNET_break (0); - GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); - GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); - GNUNET_free (queue); - if (NULL == neighbour->session_head) - { - cores_send_disconnect_info (&neighbour->pid); - free_neighbour (neighbour); - } - GNUNET_SERVICE_client_drop (tc->client); - return; - } - } /* notify monitors about new queue */ { struct MonitorEvent me = { @@ -4179,12 +4213,12 @@ handle_add_queue_message (void *cls, &me); } GNUNET_CONTAINER_MDLL_insert (neighbour, - neighbour->session_head, - neighbour->session_tail, + neighbour->queue_head, + neighbour->queue_tail, queue); GNUNET_CONTAINER_MDLL_insert (client, - tc->details.communicator.session_head, - tc->details.communicator.session_tail, + tc->details.communicator.queue_head, + tc->details.communicator.queue_tail, queue); GNUNET_SERVICE_client_continue (tc->client); } @@ -4208,18 +4242,18 @@ handle_del_queue_message (void *cls, GNUNET_SERVICE_client_drop (tc->client); return; } - for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; - NULL != session; - session = session->next_client) + for (struct Queue *queue = tc->details.communicator.queue_head; + NULL != queue; + queue = queue->next_client) { - struct Neighbour *neighbour = session->neighbour; + struct Neighbour *neighbour = queue->neighbour; - if ( (dqm->qid != session->qid) || + if ( (dqm->qid != queue->qid) || (0 != memcmp (&dqm->receiver, &neighbour->pid, sizeof (struct GNUNET_PeerIdentity))) ) continue; - free_session (session); + free_queue (queue); GNUNET_SERVICE_client_continue (tc->client); return; } @@ -4239,7 +4273,7 @@ handle_send_message_ack (void *cls, const struct GNUNET_TRANSPORT_SendMessageToAck *sma) { struct TransportClient *tc = cls; - struct QueueEntry *queue; + struct QueueEntry *qe; if (CT_COMMUNICATOR != tc->type) { @@ -4249,37 +4283,37 @@ handle_send_message_ack (void *cls, } /* find our queue entry matching the ACK */ - queue = NULL; - for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; - NULL != session; - session = session->next_client) + qe = NULL; + for (struct Queue *queue = tc->details.communicator.queue_head; + NULL != queue; + queue = queue->next_client) { - if (0 != memcmp (&session->neighbour->pid, + if (0 != memcmp (&queue->neighbour->pid, &sma->receiver, sizeof (struct GNUNET_PeerIdentity))) continue; - for (struct QueueEntry *qe = session->queue_head; - NULL != qe; - qe = qe->next) + for (struct QueueEntry *qep = queue->queue_head; + NULL != qep; + qep = qep->next) { - if (qe->mid != sma->mid) - continue; - queue = qe; + if (qep->mid != sma->mid) + continue; + qe = qep; break; } break; } - if (NULL == queue) + if (NULL == qe) { /* this should never happen */ GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } - GNUNET_CONTAINER_DLL_remove (queue->session->queue_head, - queue->session->queue_tail, - queue); - queue->session->queue_length--; + GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head, + qe->queue->queue_tail, + qe); + qe->queue->queue_length--; tc->details.communicator.total_queue_length--; GNUNET_SERVICE_client_continue (tc->client); @@ -4291,19 +4325,19 @@ handle_send_message_ack (void *cls, "# Transmission throttled due to communicator queue limit", -1, GNUNET_NO); - for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; - NULL != session; - session = session->next_client) - schedule_transmit_on_queue (session); + for (struct Queue *queue = tc->details.communicator.queue_head; + NULL != queue; + queue = queue->next_client) + schedule_transmit_on_queue (queue); } - else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) + else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) { /* queue dropped below threshold; only resume this one queue */ GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to session queue limit", + "# Transmission throttled due to queue queue limit", -1, GNUNET_NO); - schedule_transmit_on_queue (queue->session); + schedule_transmit_on_queue (qe->queue); } /* TODO: we also should react on the status! */ @@ -4311,7 +4345,7 @@ handle_send_message_ack (void *cls, // FIXME: react to communicator status about transmission request. We got: sma->status; // OK success, SYSERR failure - GNUNET_free (queue); + GNUNET_free (qe); } @@ -4333,7 +4367,7 @@ notify_client_queues (void *cls, struct Neighbour *neighbour = value; GNUNET_assert (CT_MONITOR == tc->type); - for (struct GNUNET_ATS_Session *q = neighbour->session_head; + for (struct Queue *q = neighbour->queue_head; NULL != q; q = q->next_neighbour) { @@ -4383,31 +4417,6 @@ handle_monitor_start (void *cls, } -/** - * Signature of a function called by ATS with the current bandwidth - * allocation to be used as determined by ATS. - * - * @param cls closure, NULL - * @param session session this is about - * @param bandwidth_out assigned outbound bandwidth for the connection, - * 0 to signal disconnect - * @param bandwidth_in assigned inbound bandwidth for the connection, - * 0 to signal disconnect - */ -static void -ats_allocation_cb (void *cls, - struct GNUNET_ATS_Session *session, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in) -{ - (void) cls; - GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out, - bandwidth_out); - GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in, - bandwidth_in); -} - - /** * Find transport client providing communication service * for the protocol @a prefix. @@ -4429,24 +4438,22 @@ lookup_communicator (const char *prefix) return tc; } GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n", + "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n", prefix); return NULL; } /** - * Signature of a function called by ATS suggesting transport to - * try connecting with a particular address. + * Signature of a function called with a communicator @a address of a peer + * @a pid that an application wants us to connect to. * - * @param cls closure, NULL * @param pid target peer * @param address the address to try */ static void -ats_suggestion_cb (void *cls, - const struct GNUNET_PeerIdentity *pid, - const char *address) +suggest_to_connect (const struct GNUNET_PeerIdentity *pid, + const char *address) { static uint32_t idgen; struct TransportClient *tc; @@ -4455,18 +4462,17 @@ ats_suggestion_cb (void *cls, struct GNUNET_MQ_Envelope *env; size_t alen; - (void) cls; prefix = GNUNET_HELLO_address_to_prefix (address); if (NULL == prefix) { - GNUNET_break (0); /* ATS gave invalid address!? */ + GNUNET_break (0); /* We got an invalid address!? */ return; } tc = lookup_communicator (prefix); if (NULL == tc) { GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions ignored due to missing communicator", + "# Suggestions ignored due to missing communicator", 1, GNUNET_NO); return; @@ -4511,7 +4517,7 @@ handle_queue_create_ok (void *cls, return; } GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions succeeded at communicator", + "# Suggestions succeeded at communicator", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -4531,7 +4537,7 @@ handle_queue_create_ok (void *cls, */ static void handle_queue_create_fail (void *cls, - const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) + const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) { struct TransportClient *tc = cls; @@ -4545,13 +4551,138 @@ handle_queue_create_fail (void *cls, "Request #%u for communicator to create queue failed\n", (unsigned int) ntohs (cqr->request_id)); GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions failed in queue creation at communicator", + "# Suggestions failed in queue creation at communicator", 1, GNUNET_NO); GNUNET_SERVICE_client_continue (tc->client); } +/** + * Function called by PEERSTORE for each matching record. + * + * @param cls closure + * @param record peerstore record information + * @param emsg error message, or NULL if no errors + */ +static void +handle_hello (void *cls, + const struct GNUNET_PEERSTORE_Record *record, + const char *emsg) +{ + struct PeerRequest *pr = cls; + const char *val; + + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got failure from PEERSTORE: %s\n", + emsg); + return; + } + val = record->value; + if ( (0 == record->value_size) || + ('\0' != val[record->value_size - 1]) ) + { + GNUNET_break (0); + return; + } + suggest_to_connect (&pr->pid, + (const char *) record->value); +} + + +/** + * We have received a `struct ExpressPreferenceMessage` from an application client. + * + * @param cls handle to the client + * @param msg the start message + */ +static void +handle_suggest (void *cls, + const struct ExpressPreferenceMessage *msg) +{ + struct TransportClient *tc = cls; + struct PeerRequest *pr; + + if (CT_NONE == tc->type) + { + tc->type = CT_APPLICATION; + tc->details.application.requests + = GNUNET_CONTAINER_multipeermap_create (16, + GNUNET_YES); + } + if (CT_APPLICATION != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client suggested we talk to %s with preference %d at rate %u\n", + GNUNET_i2s (&msg->peer), + (int) ntohl (msg->pk), + (int) ntohl (msg->bw.value__)); + pr = GNUNET_new (struct PeerRequest); + pr->tc = tc; + pr->pid = msg->peer; + pr->bw = msg->bw; + pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk); + if (GNUNET_YES != + GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests, + &pr->pid, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + GNUNET_break (0); + GNUNET_free (pr); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + pr->wc = GNUNET_PEERSTORE_watch (peerstore, + "transport", + &pr->pid, + "hello", + &handle_hello, + pr); + GNUNET_SERVICE_client_continue (tc->client); +} + + +/** + * We have received a `struct ExpressPreferenceMessage` from an application client. + * + * @param cls handle to the client + * @param msg the start message + */ +static void +handle_suggest_cancel (void *cls, + const struct ExpressPreferenceMessage *msg) +{ + struct TransportClient *tc = cls; + struct PeerRequest *pr; + + if (CT_APPLICATION != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests, + &msg->peer); + if (NULL == pr) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + (void) stop_peer_request (tc, + &pr->pid, + pr); + GNUNET_SERVICE_client_continue (tc->client); +} + + /** * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY * messages. We do nothing here, real verification is done later. @@ -4692,13 +4823,8 @@ do_shutdown (void *cls) ephemeral_task = NULL; } GNUNET_CONTAINER_multipeermap_iterate (neighbours, - &free_neighbour_cb, - NULL); - if (NULL != ats) - { - GNUNET_ATS_transport_done (ats); - ats = NULL; - } + &free_neighbour_cb, + NULL); if (NULL != peerstore) { GNUNET_PEERSTORE_disconnect (peerstore, @@ -4779,17 +4905,6 @@ run (void *cls, GNUNET_SCHEDULER_shutdown (); return; } - ats = GNUNET_ATS_transport_init (GST_cfg, - &ats_allocation_cb, - NULL, - &ats_suggestion_cb, - NULL); - if (NULL == ats) - { - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - } } @@ -4803,6 +4918,15 @@ GNUNET_SERVICE_MAIN &client_connect_cb, &client_disconnect_cb, NULL, + /* communication with applications */ + GNUNET_MQ_hd_fixed_size (suggest, + GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST, + struct ExpressPreferenceMessage, + NULL), + GNUNET_MQ_hd_fixed_size (suggest_cancel, + GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL, + struct ExpressPreferenceMessage, + NULL), /* communication with core */ GNUNET_MQ_hd_fixed_size (client_start, GNUNET_MESSAGE_TYPE_TRANSPORT_START, diff --git a/src/transport/transport.h b/src/transport/transport.h index c0e02c3d9..b231ea8ae 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -1107,6 +1107,38 @@ struct GNUNET_TRANSPORT_AddressToVerify }; +/** + * Application client to TRANSPORT service: we would like to have + * address suggestions for this peer. + */ +struct ExpressPreferenceMessage +{ + /** + * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST or + * #GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL to stop + * suggestions. + */ + struct GNUNET_MessageHeader header; + + /** + * What type of performance preference does the client have? + * A `enum GNUNET_MQ_PreferenceKind` in NBO. + */ + uint32_t pk GNUNET_PACKED; + + /** + * Peer to get address suggestions for. + */ + struct GNUNET_PeerIdentity peer; + + /** + * How much bandwidth in bytes/second does the application expect? + */ + struct GNUNET_BANDWIDTH_Value32NBO bw; + +}; + + #endif GNUNET_NETWORK_STRUCT_END diff --git a/src/transport/transport_api2_application.c b/src/transport/transport_api2_application.c new file mode 100644 index 000000000..325438e11 --- /dev/null +++ b/src/transport/transport_api2_application.c @@ -0,0 +1,366 @@ +/* + This file is part of GNUnet. + Copyright (C) 2010--2019 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later +*/ +/** + * @file transport/transport_api2_application.c + * @brief enable clients to ask TRANSPORT about establishing connections to peers + * @author Christian Grothoff + * @author Matthias Wachs + */ +#include "platform.h" +#include "gnunet_transport_application_service.h" +#include "gnunet_transport_core_service.h" +#include "transport.h" + + +#define LOG(kind,...) GNUNET_log_from(kind, "transport-application-api", __VA_ARGS__) + + +/** + * Handle for TRANSPORT address suggestion requests. + */ +struct GNUNET_TRANSPORT_ApplicationSuggestHandle +{ + /** + * ID of the peer for which address suggestion was requested. + */ + struct GNUNET_PeerIdentity id; + + /** + * Connecitivity handle this suggestion handle belongs to. + */ + struct GNUNET_TRANSPORT_ApplicationHandle *ch; + + /** + * What preference is being expressed? + */ + enum GNUNET_MQ_PreferenceKind pk; + + /** + * How much bandwidth does the client expect? + */ + struct GNUNET_BANDWIDTH_Value32NBO bw; +}; + + +/** + * Handle to the TRANSPORT subsystem for application management. + */ +struct GNUNET_TRANSPORT_ApplicationHandle +{ + + /** + * Our configuration. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Map with the identities of all the peers for which we would + * like to have address suggestions. The key is the PID, the + * value is currently the `struct GNUNET_TRANSPORT_ApplicationSuggestHandle` + */ + struct GNUNET_CONTAINER_MultiPeerMap *sug_requests; + + /** + * Message queue for sending requests to the TRANSPORT service. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Task to trigger reconnect. + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * Reconnect backoff delay. + */ + struct GNUNET_TIME_Relative backoff; +}; + + +/** + * Re-establish the connection to the TRANSPORT service. + * + * @param ch handle to use to re-connect. + */ +static void +reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch); + + +/** + * Re-establish the connection to the TRANSPORT service. + * + * @param cls handle to use to re-connect. + */ +static void +reconnect_task (void *cls) +{ + struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls; + + ch->task = NULL; + reconnect (ch); +} + + +/** + * Disconnect from TRANSPORT and then reconnect. + * + * @param ch our handle + */ +static void +force_reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch) +{ + if (NULL != ch->mq) + { + GNUNET_MQ_destroy (ch->mq); + ch->mq = NULL; + } + ch->backoff = GNUNET_TIME_STD_BACKOFF (ch->backoff); + ch->task = GNUNET_SCHEDULER_add_delayed (ch->backoff, + &reconnect_task, + ch); +} + + +/** + * We encountered an error handling the MQ to the + * TRANSPORT service. Reconnect. + * + * @param cls the `struct GNUNET_TRANSPORT_ApplicationHandle` + * @param error details about the error + */ +static void +error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "TRANSPORT connection died (code %d), reconnecting\n", + (int) error); + force_reconnect (ch); +} + + +/** + * Transmit request for an address suggestion. + * + * @param cls the `struct GNUNET_TRANSPORT_ApplicationHandle` + * @param peer peer to ask for an address suggestion for + * @param value the `struct GNUNET_TRANSPORT_SuggestHandle` + * @return #GNUNET_OK (continue to iterate), #GNUNET_SYSERR on + * failure (message queue no longer exists) + */ +static int +transmit_suggestion (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *value) +{ + struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls; + struct GNUNET_TRANSPORT_ApplicationSuggestHandle *sh = value; + struct GNUNET_MQ_Envelope *ev; + struct ExpressPreferenceMessage *m; + + if (NULL == ch->mq) + return GNUNET_SYSERR; + ev = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST); + m->pk = htonl ((uint32_t) sh->pk); + m->bw = sh->bw; + m->peer = *peer; + GNUNET_MQ_send (ch->mq, ev); + return GNUNET_OK; +} + + +/** + * Re-establish the connection to the TRANSPORT service. + * + * @param ch handle to use to re-connect. + */ +static void +reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch) +{ + static const struct GNUNET_MQ_MessageHandler handlers[] = { + { NULL, 0, 0 } + }; + + GNUNET_assert (NULL == ch->mq); + ch->mq = GNUNET_CLIENT_connect (ch->cfg, + "transport", + handlers, + &error_handler, + ch); + if (NULL == ch->mq) + { + force_reconnect (ch); + return; + } + GNUNET_CONTAINER_multipeermap_iterate (ch->sug_requests, + &transmit_suggestion, + ch); +} + + +/** + * Initialize the TRANSPORT application suggestion client handle. + * + * @param cfg configuration to use + * @return transport application handle, NULL on error + */ +struct GNUNET_TRANSPORT_ApplicationHandle * +GNUNET_TRANSPORT_application_init (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_TRANSPORT_ApplicationHandle *ch; + + ch = GNUNET_new (struct GNUNET_TRANSPORT_ApplicationHandle); + ch->cfg = cfg; + ch->sug_requests = GNUNET_CONTAINER_multipeermap_create (32, + GNUNET_YES); + reconnect (ch); + return ch; +} + + +/** + * Function called to free all `struct GNUNET_TRANSPORT_ApplicationSuggestHandle`s + * in the map. + * + * @param cls NULL + * @param key the key + * @param value the value to free + * @return #GNUNET_OK (continue to iterate) + */ +static int +free_sug_handle (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct GNUNET_TRANSPORT_ApplicationSuggestHandle *cur = value; + + GNUNET_free (cur); + return GNUNET_OK; +} + + +/** + * Client is done with TRANSPORT application management, release resources. + * + * @param ch handle to release + */ +void +GNUNET_TRANSPORT_application_done (struct GNUNET_TRANSPORT_ApplicationHandle *ch) +{ + if (NULL != ch->mq) + { + GNUNET_MQ_destroy (ch->mq); + ch->mq = NULL; + } + if (NULL != ch->task) + { + GNUNET_SCHEDULER_cancel (ch->task); + ch->task = NULL; + } + GNUNET_CONTAINER_multipeermap_iterate (ch->sug_requests, + &free_sug_handle, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (ch->sug_requests); + GNUNET_free (ch); +} + + +/** + * We would like to receive address suggestions for a peer. TRANSPORT will + * respond with a call to the continuation immediately containing an address or + * no address if none is available. TRANSPORT can suggest more addresses until we call + * #GNUNET_TRANSPORT_application_suggest_cancel(). + * + * @param ch handle + * @param peer identity of the peer we need an address for + * @param pk what kind of application will the application require (can be + * #GNUNET_MQ_PREFERENCE_NONE, we will still try to connect) + * @param bw desired bandwith, can be zero (we will still try to connect) + * @return suggest handle, NULL if a request is already pending + */ +struct GNUNET_TRANSPORT_ApplicationSuggestHandle * +GNUNET_TRANSPORT_application_suggest (struct GNUNET_TRANSPORT_ApplicationHandle *ch, + const struct GNUNET_PeerIdentity *peer, + enum GNUNET_MQ_PreferenceKind pk, + struct GNUNET_BANDWIDTH_Value32NBO bw) +{ + struct GNUNET_TRANSPORT_ApplicationSuggestHandle *s; + + s = GNUNET_new (struct GNUNET_TRANSPORT_ApplicationSuggestHandle); + s->ch = ch; + s->id = *peer; + s->pk = pk; + s->bw = bw; + (void) GNUNET_CONTAINER_multipeermap_put (ch->sug_requests, + &s->id, + s, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Requesting TRANSPORT to suggest address for `%s'\n", + GNUNET_i2s (peer)); + if (NULL == ch->mq) + return s; + GNUNET_assert (GNUNET_OK == + transmit_suggestion (ch, + &s->id, + s)); + return s; +} + + +/** + * We no longer care about being connected to a peer. + * + * @param sh handle to stop + */ +void +GNUNET_TRANSPORT_application_suggest_cancel (struct GNUNET_TRANSPORT_ApplicationSuggestHandle *sh) +{ + struct GNUNET_TRANSPORT_ApplicationHandle *ch = sh->ch; + struct GNUNET_MQ_Envelope *ev; + struct ExpressPreferenceMessage *m; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Telling TRANSPORT we no longer care for an address for `%s'\n", + GNUNET_i2s (&sh->id)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (ch->sug_requests, + &sh->id, + sh)); + if (NULL == ch->mq) + { + GNUNET_free (sh); + return; + } + ev = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL); + m->pk = htonl ((uint32_t) sh->pk); + m->bw = sh->bw; + m->peer = sh->id; + GNUNET_MQ_send (ch->mq, + ev); + GNUNET_free (sh); +} + + +/* end of transport_api2_application.c */ -- cgit v1.2.3