From d7591587eb28a40e44a3065c30ec1b60a9bcba68 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 6 Oct 2011 19:03:36 +0000 Subject: finishing clients draft --- src/core/gnunet-service-core_clients.c | 564 +++++++++----------------------- src/core/gnunet-service-core_clients.h | 43 ++- src/core/gnunet-service-core_sessions.c | 216 ++++++++++++ src/core/gnunet-service-core_typemap.c | 18 + 4 files changed, 423 insertions(+), 418 deletions(-) (limited to 'src/core') diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c index 7d56a49a5..333bd5ad8 100644 --- a/src/core/gnunet-service-core_clients.c +++ b/src/core/gnunet-service-core_clients.c @@ -32,7 +32,6 @@ #include "gnunet_service_core_typemap.h" - /** * Data structure for each client connected to the core service. */ @@ -144,6 +143,31 @@ send_to_client (struct GSC_Client *client, } +/** + * Send a message to one of our clients. + * + * @param client target for the message + * @param msg message to transmit + * @param can_drop could this message be dropped if the + * client's queue is getting too large? + */ +void +GSC_CLIENTS_send_to_client (struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg, + int can_drop) +{ + struct GSC_Client *c; + + c = find_client (client); + if (NULL == c) + { + GNUNET_break (0); + return; + } + send_to_client (c, msg, can_drop); +} + + /** * Test if the client is interested in messages of the given type. * @@ -183,9 +207,12 @@ send_to_all_clients (const struct GNUNET_MessageHeader *msg, for (c = client_head; c != NULL; c = c->next) { + if ( (0 != (c->options & options)) && + (GNUNET_YES == type_match (type, c)) ) + continue; /* both match, wait for only type match */ if ( (0 == (c->options & options)) && (GNUNET_YES != type_match (type, c)) ) - continue; + continue; /* neither match, skip entirely */ #if DEBUG_CORE_CLIENT > 1 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u to client.\n", @@ -500,136 +527,144 @@ GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car) } - -// FIXME from here....................................... - - - - - - /** - * Notify client about an existing connection to one of our neighbours. + * Notify a particular client about a change to existing connection to + * one of our neighbours (check if the client is interested). Called + * from 'GSC_SESSIONS_notify_client_about_sessions'. + * + * @param client client to notify + * @param neighbour identity of the neighbour that changed status + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array + * @param tmap_old previous type map for the neighbour, NULL for disconnect + * @param tmap_new updated type map for the neighbour, NULL for disconnect */ -static int -notify_client_about_neighbour (void *cls, const GNUNET_HashCode * key, - void *value) +void +GDS_CLIENTS_notify_client_about_neighbour (struct GSC_Client *client, + const struct GNUNET_PeerIdentity *neighbour, + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, + const struct GSC_TypeMap *tmap_old, + const struct GSC_TypeMap *tmap_new) { - struct GSC_Client *c = cls; - struct Neighbour *n = value; + struct ConnectNotifyMessage *cnm; size_t size; char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - struct ConnectNotifyMessage *cnm; - - size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + struct GNUNET_TRANSPORT_ATS_Information *a; + struct DisconnectNotifyMessage dcm; + int old_match; + int new_match; + + old_match = GSC_TYPEMAP_test_match (tmap_old, client->types, client->tcnt); + new_match = GSC_TYPEMAP_test_match (tmap_new, client->types, client->tcnt); + if (old_match == new_match) + return; /* no change */ + if (old_match == GNUNET_NO) { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); + /* send connect */ size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - cnm = (struct ConnectNotifyMessage *) buf; - cnm->header.size = htons (size); - cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm->ats_count = htonl (n->ats_count); - ats = &cnm->ats; - memcpy (ats, n->ats, - sizeof (struct GNUNET_TRANSPORT_ATS_Information) * n->ats_count); - ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - ats[n->ats_count].value = htonl (0); - if (n->status == PEER_STATE_KEY_CONFIRMED) - { + sizeof (struct ConnectNotifyMessage) + + (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); + if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + /* recovery strategy: throw away performance data */ + GNUNET_array_grow (n->ats, n->ats_count, 0); + size = + sizeof (struct ConnectNotifyMessage) + + (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); + } + cnm = (struct ConnectNotifyMessage *) buf; + cnm->header.size = htons (size); + cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); + cnm->ats_count = htonl (atsi); + a = &cnm->atsi; + memcpy (a, atsi, + sizeof (struct GNUNET_TRANSPORT_ATS_Information) * atsi_count); + a[ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + a[ats_count].value = htonl (0); #if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending `%s' message to client.\n", "NOTIFY_CONNECT"); #endif cnm->peer = n->peer; - send_to_client (c, &cnm->header, GNUNET_NO); + send_to_client (client, &cnm->header, GNUNET_NO); + } + else + { + /* send disconnect */ + dcm.header.size = htons (sizeof (struct DisconnectNotifyMessage)); + dcm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT); + dcm.reserved = htonl (0); + dcm.peer = *peer; + send_to_client (client, &cnm.header, GNUNET_NO); } - return GNUNET_OK; } - /** - * Helper function for handle_client_iterate_peers. + * Notify all clients about a change to existing session. + * Called from SESSIONS whenever there is a change in sessions + * or types processed by the respective peer. * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies - * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer - * @return GNUNET_OK (continue to iterate) + * @param neighbour identity of the neighbour that changed status + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array + * @param tmap_old previous type map for the neighbour, NULL for disconnect + * @param tmap_new updated type map for the neighbour, NULL for disconnect */ -static int -queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) +void +GDS_CLIENTS_notify_clients_about_neighbour (const struct GNUNET_PeerIdentity *neighbour, + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, + const struct GSC_TypeMap *tmap_old, + const struct GSC_TypeMap *tmap_new) { - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Neighbour *n = value; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; - struct ConnectNotifyMessage *cnm; + struct GSC_Client *c; - cnm = (struct ConnectNotifyMessage *) buf; - if (n->status != PEER_STATE_KEY_CONFIRMED) - return GNUNET_OK; - size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - cnm = (struct ConnectNotifyMessage *) buf; - cnm->header.size = htons (size); - cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm->ats_count = htonl (n->ats_count); - ats = &cnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - ats[n->ats_count].value = htonl (0); -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", - "NOTIFY_CONNECT"); -#endif - cnm->peer = n->peer; - GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header); - return GNUNET_OK; + for (c = client_head; c != NULL; c = c->next) + GDS_CLIENTS_notify_client_about_neighbour (c, neighbour, atsi, + atsi_count, + tmap_old, tmap_new); } - - /** - * Send a P2P message to a client. + * Deliver P2P message to interested clients. Caller must have checked + * that the sending peer actually lists the given message type as one + * of its types. * - * @param sender who sent us the message? - * @param client who should we give the message to? - * @param m contains the message to transmit - * @param msize number of bytes in buf to transmit + * @param sender peer who sent us the message + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array + * @param msg the message + * @param msize number of bytes to transmit + * @param options options for checking which clients should + * receive the message */ -static void -send_p2p_message_to_client (struct Neighbour *sender, struct GSC_Client *client, - const void *m, size_t msize) +void +GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, + const struct GNUNET_MessageHeader *msg, + uint16_t msize, + int options) { - size_t size = - msize + sizeof (struct NotifyTrafficMessage) + + size_t size = msize + sizeof (struct NotifyTrafficMessage) + (sender->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); char buf[size]; struct NotifyTrafficMessage *ntm; - struct GNUNET_TRANSPORT_ATS_Information *ats; + struct GNUNET_TRANSPORT_ATS_Information *a; + int dropped; + if (0 == options) + { + GNUNET_snprintf (buf, sizeof (buf), + gettext_noop ("# bytes of messages of type %u received"), + (unsigned int) ntohs (msg->type)); + GNUNET_STATISTICS_update (stats, buf, msize, GNUNET_NO); + } GNUNET_assert (GNUNET_YES == sender->is_connected); GNUNET_break (sender->status == PEER_STATE_KEY_CONFIRMED); if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) @@ -644,324 +679,25 @@ send_p2p_message_to_client (struct Neighbour *sender, struct GSC_Client *client, #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service passes message from `%4s' of type %u to client.\n", - GNUNET_i2s (&sender->peer), - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) m)->type)); + GNUNET_i2s (sender), + (unsigned int) ntohs (msg->type)); #endif ntm = (struct NotifyTrafficMessage *) buf; ntm->header.size = htons (size); ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND); - ntm->ats_count = htonl (sender->ats_count); + ntm->ats_count = htonl (atsi_count); ntm->peer = sender->peer; - ats = &ntm->ats; - memcpy (ats, sender->ats, - sizeof (struct GNUNET_TRANSPORT_ATS_Information) * sender->ats_count); - ats[sender->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - ats[sender->ats_count].value = htonl (0); - memcpy (&ats[sender->ats_count + 1], m, msize); - send_to_client (client, &ntm->header, GNUNET_YES); -} - - - -/** - * Notify a particular client about a change to existing connection to - * one of our neighbours (check if the client is interested). Called - * from 'GSC_SESSIONS_notify_client_about_sessions'. - * - * @param client client to notify - * @param neighbour identity of the neighbour that changed status - * @param tmap_old previous type map for the neighbour, NULL for disconnect - * @param tmap_new updated type map for the neighbour, NULL for disconnect - */ -void -GDS_CLIENTS_notify_client_about_neighbour (struct GSC_Client *client, - const struct GNUNET_PeerIdentity *neighbour, - const struct GSC_TypeMap *tmap_old, - const struct GSC_TypeMap *tmap_new) -{ -} - - -/** - * Notify client about a change to existing connection to one of our neighbours. - * - * @param neighbour identity of the neighbour that changed status - * @param tmap_old previous type map for the neighbour, NULL for disconnect - * @param tmap_new updated type map for the neighbour, NULL for disconnect - */ -void -GDS_CLIENTS_notify_clients_about_neighbour (const struct GNUNET_PeerIdentity *neighbour, - const struct GSC_TypeMap *tmap_old, - const struct GSC_TypeMap *tmap_new) -{ -} - - -/** - * Deliver P2P message to interested clients. - * - * @param sender peer who sent us the message - * @param m the message - */ -void -GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *m) -{ - struct Neighbour *sender = client; - size_t msize = ntohs (m->size); - char buf[256]; - struct GSC_Client *cpos; - uint16_t type; - unsigned int tpos; - int deliver_full; - int dropped; - - GNUNET_break (sender->status == PEER_STATE_KEY_CONFIRMED); - type = ntohs (m->type); -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received encapsulated message of type %u and size %u from `%4s'\n", - (unsigned int) type, ntohs (m->size), GNUNET_i2s (&sender->peer)); -#endif - GNUNET_snprintf (buf, sizeof (buf), - gettext_noop ("# bytes of messages of type %u received"), - (unsigned int) type); - GNUNET_STATISTICS_update (stats, buf, msize, GNUNET_NO); - if ((GNUNET_MESSAGE_TYPE_CORE_BINARY_TYPE_MAP == type) || - (GNUNET_MESSAGE_TYPE_CORE_COMPRESSED_TYPE_MAP == type)) - { - /* FIXME: update message type map for 'Neighbour' */ - return; - } - dropped = GNUNET_YES; - cpos = clients; - while (cpos != NULL) - { - deliver_full = GNUNET_NO; - if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) - deliver_full = GNUNET_YES; - else - { - for (tpos = 0; tpos < cpos->tcnt; tpos++) - { - if (type != cpos->types[tpos]) - continue; - deliver_full = GNUNET_YES; - break; - } - } - if (GNUNET_YES == deliver_full) - { - send_p2p_message_to_client (sender, cpos, m, msize); - dropped = GNUNET_NO; - } - else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND) - { - send_p2p_message_to_client (sender, cpos, m, - sizeof (struct GNUNET_MessageHeader)); - } - cpos = cpos->next; - } - if (dropped == GNUNET_YES) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message of type %u from `%4s' not delivered to any client.\n", - (unsigned int) type, GNUNET_i2s (&sender->peer)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages not delivered to any client"), 1, - GNUNET_NO); - } + a = &ntm->ats; + memcpy (a, atsi, + sizeof (struct GNUNET_TRANSPORT_ATS_Information) * sender->atsi_count); + a[atsi_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + a[atsi_count].value = htonl (0); + memcpy (&ats[atsi_count + 1], msg, msize); + send_to_all_clients (&ntm->header, GNUNET_YES, + options, ntohs (msg->type)); } - - -/** - * Handle CORE_ITERATE_PEERS request. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -static void -handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - int msize; - - /* notify new client about existing neighbours */ - - msize = ntohs (message->size); - tc = GNUNET_SERVER_transmit_context_create (client); - if (msize == sizeof (struct GNUNET_MessageHeader)) - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message, - tc); - else - GNUNET_break (0); - - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Handle CORE_PEER_CONNECTED request. Notify client about existing neighbours. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -static void -handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - struct GNUNET_PeerIdentity *peer; - - tc = GNUNET_SERVER_transmit_context_create (client); - peer = (struct GNUNET_PeerIdentity *) &message[1]; - GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey, - &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Handle REQUEST_INFO request. - * - * @param cls unused - * @param client client sending the request - * @param message iteration request message - */ -static void -handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct RequestInfoMessage *rcm; - struct GSC_Client *pos; - struct Neighbour *n; - struct ConfigurationInfoMessage cim; - int32_t want_reserv; - int32_t got_reserv; - unsigned long long old_preference; - struct GNUNET_TIME_Relative rdelay; - - rdelay = GNUNET_TIME_relative_get_zero (); -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n", - "REQUEST_INFO"); -#endif - pos = clients; - while (pos != NULL) - { - if (client == pos->client_handle) - break; - pos = pos->next; - } - if (pos == NULL) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - - rcm = (const struct RequestInfoMessage *) message; - n = find_neighbour (&rcm->peer); - memset (&cim, 0, sizeof (cim)); - if ((n != NULL) && (GNUNET_YES == n->is_connected)) - { - want_reserv = ntohl (rcm->reserve_inbound); - if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__) - { - n->bw_out_internal_limit = rcm->limit_outbound; - if (n->bw_out.value__ != - GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, - n->bw_out_external_limit).value__) - { - n->bw_out = - GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, - n->bw_out_external_limit); - GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window, - n->bw_out); - GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out); - handle_peer_status_change (n); - } - } - if (want_reserv < 0) - { - got_reserv = want_reserv; - } - else if (want_reserv > 0) - { - rdelay = - GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window, - want_reserv); - if (rdelay.rel_value == 0) - got_reserv = want_reserv; - else - got_reserv = 0; /* all or nothing */ - } - else - got_reserv = 0; - GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv); - old_preference = n->current_preference; - n->current_preference += GNUNET_ntohll (rcm->preference_change); - if (old_preference > n->current_preference) - { - /* overflow; cap at maximum value */ - n->current_preference = ULLONG_MAX; - } - update_preference_sum (n->current_preference - old_preference); -#if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n", - (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv, - (unsigned long long) rdelay.rel_value); -#endif - cim.reserved_amount = htonl (got_reserv); - cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay); - cim.bw_out = n->bw_out; - cim.preference = n->current_preference; - } - else - { - /* Technically, this COULD happen (due to asynchronous behavior), - * but it should be rare, so we should generate an info event - * to help diagnosis of serious errors that might be masked by this */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Client asked for preference change with peer `%s', which is not connected!\n"), - GNUNET_i2s (&rcm->peer)); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - cim.header.size = htons (sizeof (struct ConfigurationInfoMessage)); - cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO); - cim.peer = rcm->peer; - cim.rim_id = rcm->rim_id; -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", - "CONFIGURATION_INFO"); -#endif - send_to_client (pos, &cim.header, GNUNET_NO); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - - - /** * Initialize clients subsystem. * @@ -973,14 +709,14 @@ GSC_CLIENTS_init (struct GNUNET_SERVER_Handle *server) static const struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_client_init, NULL, GNUNET_MESSAGE_TYPE_CORE_INIT, 0}, - {&handle_client_iterate_peers, NULL, + {&GSC_SESSIONS_handle_client_iterate_peers, NULL, GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS, sizeof (struct GNUNET_MessageHeader)}, - {&handle_client_have_peer, NULL, + {&GSC_SESSIONS_handle_client_have_peer, NULL, GNUNET_MESSAGE_TYPE_CORE_PEER_CONNECTED, sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_PeerIdentity)}, - {&handle_client_request_info, NULL, + {&GSC_SESSIONS_handle_client_request_info, NULL, GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO, sizeof (struct RequestInfoMessage)}, {&handle_client_send_request, NULL, diff --git a/src/core/gnunet-service-core_clients.h b/src/core/gnunet-service-core_clients.h index 2b624ef9c..a137c6e81 100644 --- a/src/core/gnunet-service-core_clients.h +++ b/src/core/gnunet-service-core_clients.h @@ -30,6 +30,20 @@ #include "gnunet_service_core.h" +/** + * Send a message to one of our clients. + * + * @param client target for the message + * @param msg message to transmit + * @param can_drop could this message be dropped if the + * client's queue is getting too large? + */ +void +GSC_CLIENTS_send_to_client (struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg, + int can_drop); + + /** * Notify a particular client about a change to existing connection to * one of our neighbours (check if the client is interested). Called @@ -37,38 +51,59 @@ * * @param client client to notify * @param neighbour identity of the neighbour that changed status + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array * @param tmap_old previous type map for the neighbour, NULL for disconnect * @param tmap_new updated type map for the neighbour, NULL for disconnect */ void GDS_CLIENTS_notify_client_about_neighbour (struct GSC_Client *client, const struct GNUNET_PeerIdentity *neighbour, + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, const struct GSC_TypeMap *tmap_old, const struct GSC_TypeMap *tmap_new); /** - * Notify client about a change to existing connection to one of our neighbours. + * Notify all clients about a change to existing session. + * Called from SESSIONS whenever there is a change in sessions + * or types processed by the respective peer. * * @param neighbour identity of the neighbour that changed status + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array * @param tmap_old previous type map for the neighbour, NULL for disconnect * @param tmap_new updated type map for the neighbour, NULL for disconnect */ void GDS_CLIENTS_notify_clients_about_neighbour (const struct GNUNET_PeerIdentity *neighbour, + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, const struct GSC_TypeMap *tmap_old, const struct GSC_TypeMap *tmap_new); /** - * Deliver P2P message to interested clients. + * Deliver P2P message to interested clients. Caller must have checked + * that the sending peer actually lists the given message type as one + * of its types. * * @param sender peer who sent us the message - * @param m the message + * @param atsi performance information about neighbour + * @param atsi_count number of entries in 'ats' array + * @param msg the message + * @param msize number of bytes to transmit + * @param options options for checking which clients should + * receive the message */ void GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *m); + const struct GNUNET_TRANSPORT_ATS_Information *atsi, + unsigned int atsi_count, + const struct GNUNET_MessageHeader *msg, + uint16_t msize, + int options); /** diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index fd60aadc7..0db053ff5 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1360,6 +1360,222 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, + +/** + * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * + * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies + * @param key identity of the connected peer + * @param value the 'struct Neighbour' for the peer + * @return GNUNET_OK (continue to iterate) + */ +static int +queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct GNUNET_SERVER_TransmitContext *tc = cls; + struct Neighbour *n = value; + char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; + struct GNUNET_TRANSPORT_ATS_Information *ats; + size_t size; + struct ConnectNotifyMessage *cnm; + + cnm = (struct ConnectNotifyMessage *) buf; + if (n->status != PEER_STATE_KEY_CONFIRMED) + return GNUNET_OK; + size = + sizeof (struct ConnectNotifyMessage) + + (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); + if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + /* recovery strategy: throw away performance data */ + GNUNET_array_grow (n->ats, n->ats_count, 0); + size = + sizeof (struct PeerStatusNotifyMessage) + + n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); + } + cnm = (struct ConnectNotifyMessage *) buf; + cnm->header.size = htons (size); + cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); + cnm->ats_count = htonl (n->ats_count); + ats = &cnm->ats; + memcpy (ats, n->ats, + n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); + ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + ats[n->ats_count].value = htonl (0); +#if DEBUG_CORE_CLIENT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", + "NOTIFY_CONNECT"); +#endif + cnm->peer = n->peer; + GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header); + return GNUNET_OK; +} + + + +/** + * Handle CORE_ITERATE_PEERS request. + * + * @param cls unused + * @param client client sending the iteration request + * @param message iteration request message + */ +void +GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_MessageHeader done_msg; + struct GNUNET_SERVER_TransmitContext *tc; + + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message, + tc); + done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); + GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); +} + + +/** + * Handle CORE_PEER_CONNECTED request. Notify client about existing neighbours. + * + * @param cls unused + * @param client client sending the iteration request + * @param message iteration request message + */ +void +GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_MessageHeader done_msg; + struct GNUNET_SERVER_TransmitContext *tc; + const struct GNUNET_PeerIdentity *peer; + + peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey, + &queue_connect_message, tc); + done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); + GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); +} + + + +/** + * Handle REQUEST_INFO request. + * + * @param cls unused + * @param client client sending the request + * @param message iteration request message + */ +void +GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct RequestInfoMessage *rcm; + struct GSC_Client *pos; + struct Neighbour *n; + struct ConfigurationInfoMessage cim; + int32_t want_reserv; + int32_t got_reserv; + unsigned long long old_preference; + struct GNUNET_TIME_Relative rdelay; + + rdelay = GNUNET_TIME_relative_get_zero (); +#if DEBUG_CORE_CLIENT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n", + "REQUEST_INFO"); +#endif + rcm = (const struct RequestInfoMessage *) message; + n = find_neighbour (&rcm->peer); + memset (&cim, 0, sizeof (cim)); + if ((n != NULL) && (GNUNET_YES == n->is_connected)) + { + want_reserv = ntohl (rcm->reserve_inbound); + if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__) + { + n->bw_out_internal_limit = rcm->limit_outbound; + if (n->bw_out.value__ != + GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, + n->bw_out_external_limit).value__) + { + n->bw_out = + GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, + n->bw_out_external_limit); + GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window, + n->bw_out); + GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out); + handle_peer_status_change (n); + } + } + if (want_reserv < 0) + { + got_reserv = want_reserv; + } + else if (want_reserv > 0) + { + rdelay = + GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window, + want_reserv); + if (rdelay.rel_value == 0) + got_reserv = want_reserv; + else + got_reserv = 0; /* all or nothing */ + } + else + got_reserv = 0; + GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv); + old_preference = n->current_preference; + n->current_preference += GNUNET_ntohll (rcm->preference_change); + if (old_preference > n->current_preference) + { + /* overflow; cap at maximum value */ + n->current_preference = ULLONG_MAX; + } + update_preference_sum (n->current_preference - old_preference); +#if DEBUG_CORE_QUOTA + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n", + (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv, + (unsigned long long) rdelay.rel_value); +#endif + cim.reserved_amount = htonl (got_reserv); + cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay); + cim.bw_out = n->bw_out; + cim.preference = n->current_preference; + } + else + { + /* Technically, this COULD happen (due to asynchronous behavior), + * but it should be rare, so we should generate an info event + * to help diagnosis of serious errors that might be masked by this */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _ + ("Client asked for preference change with peer `%s', which is not connected!\n"), + GNUNET_i2s (&rcm->peer)); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + cim.header.size = htons (sizeof (struct ConfigurationInfoMessage)); + cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO); + cim.peer = rcm->peer; + cim.rim_id = rcm->rim_id; +#if DEBUG_CORE_CLIENT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", + "CONFIGURATION_INFO"); +#endif + GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + + + + int GSC_NEIGHBOURS_init () { diff --git a/src/core/gnunet-service-core_typemap.c b/src/core/gnunet-service-core_typemap.c index 0d270737b..dfd0b8888 100644 --- a/src/core/gnunet-service-core_typemap.c +++ b/src/core/gnunet-service-core_typemap.c @@ -50,6 +50,24 @@ GSC_TYPEMAP_remove (const uint16_t *types, } +/** + * Test if any of the types from the types array is in the + * given type map. + * + * @param map map to test + * @param types array of types + * @param tcnt number of entries in types + * @return GNUNET_YES if a type is in the map, GNUNET_NO if not + */ +int +GSC_TYPEMAP_test_match (struct GSC_TypeMap *tmap, + const uint16_t *types, + unsigned int tcnt) +{ + return GNUNET_YES; /* FIXME */ +} + + /** * Compute a type map message for this peer. * -- cgit v1.2.3