From d045ec0ba4e862904fc310f3a247c5fb041bf6a8 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 13 Apr 2012 08:51:53 +0000 Subject: -fixing outbound notifications in core API --- src/core/core_api.c | 63 +++--------------------- src/core/gnunet-service-core_clients.c | 88 +++++++++++++++++++++------------- src/core/gnunet-service-core_clients.h | 3 +- 3 files changed, 63 insertions(+), 91 deletions(-) (limited to 'src/core') diff --git a/src/core/core_api.c b/src/core/core_api.c index ecfe8b478..d2eaa260d 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -367,9 +367,7 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_CORE_Handle *h = cls; h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n"); -#endif reconnect (h); } @@ -546,11 +544,9 @@ request_next_transmission (struct PeerRecord *pr) smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, h->control_pending_tail, cm); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding SEND REQUEST for peer `%s' to message queue\n", GNUNET_i2s (&pr->peer)); -#endif trigger_next_request (h, GNUNET_NO); } @@ -580,10 +576,8 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * us from the 'ready' list */ GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Signalling timeout of request for transmission to CORE service\n"); -#endif request_next_transmission (pr); GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); GNUNET_free (th); @@ -609,10 +603,8 @@ transmit_message (void *cls, size_t size, void *buf) h->cth = NULL; if (buf == NULL) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission failed, initiating reconnect\n"); -#endif reconnect_later (h); return 0; } @@ -626,11 +618,9 @@ transmit_message (void *cls, size_t size, void *buf) trigger_next_request (h, GNUNET_NO); return 0; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting control message with %u bytes of type %u to core.\n", (unsigned int) msize, (unsigned int) ntohs (hdr->type)); -#endif memcpy (buf, hdr, msize); GNUNET_CONTAINER_DLL_remove (h->control_pending_head, h->control_pending_tail, cm); @@ -660,11 +650,9 @@ transmit_message (void *cls, size_t size, void *buf) GNUNET_SCHEDULER_cancel (pr->timeout_task); pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting SEND request to `%s' with %u bytes.\n", GNUNET_i2s (&pr->peer), (unsigned int) th->msize); -#endif sm = (struct SendMessage *) buf; sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); sm->priority = htonl (th->priority); @@ -676,28 +664,22 @@ transmit_message (void *cls, size_t size, void *buf) th->get_message (th->get_message_cls, size - sizeof (struct SendMessage), &sm[1]); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting SEND request to `%s' yielded %u bytes.\n", GNUNET_i2s (&pr->peer), ret); -#endif GNUNET_free (th); if (0 == ret) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Size of clients message to peer %s is 0!\n", GNUNET_i2s (&pr->peer)); -#endif /* client decided to send nothing! */ request_next_transmission (pr); return 0; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Produced SEND message to core with %u bytes payload\n", (unsigned int) ret); -#endif GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { @@ -729,17 +711,13 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO)) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Core connection down, not processing queue\n"); -#endif return; } if (NULL != h->cth) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n"); -#endif return; } if (h->control_pending_head != NULL) @@ -751,10 +729,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); else { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Request queue empty, not processing queue\n"); -#endif return; /* no pending message */ } h->cth = @@ -800,11 +776,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } msize = ntohs (msg->size); -#if DEBUG_CORE > 2 LOG (GNUNET_ERROR_TYPE_DEBUG, "Processing message of type %u and size %u from core service\n", ntohs (msg->type), msize); -#endif switch (ntohs (msg->type)) { case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: @@ -828,18 +802,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) { /* mark so we don't call init on reconnect */ h->init = NULL; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n", GNUNET_i2s (&h->me)); -#endif init (h->cls, h, &h->me); } else { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Successfully reconnected to core service.\n"); -#endif } /* fake 'connect to self' */ pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey); @@ -871,11 +841,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) reconnect_later (h); return; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about connection from `%s'.\n", GNUNET_i2s (&cnm->peer)); -#endif if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity))) { /* connect to self!? */ @@ -915,11 +883,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } GNUNET_break (0 == ntohl (dnm->reserved)); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about disconnect from `%s'.\n", GNUNET_i2s (&dnm->peer)); -#endif pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey); if (pr == NULL) { @@ -954,11 +920,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1]; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u and size %u from peer `%4s'\n", ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); -#endif pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey); if (pr == NULL) { @@ -1033,11 +997,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) reconnect_later (h); return; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission to `%s'.\n", GNUNET_i2s (&ntm->peer)); -#endif if ((GNUNET_NO == h->outbound_hdr_only) && (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + @@ -1069,11 +1031,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) reconnect_later (h); return; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission readiness to `%s'.\n", GNUNET_i2s (&smr->peer)); -#endif if (pr->pending_head == NULL) { /* request must have been cancelled between the original request @@ -1123,10 +1083,8 @@ init_done_task (void *cls, int success) return; /* shutdown */ if (success == GNUNET_NO) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to exchange INIT with core, retrying\n"); -#endif if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK) reconnect_later (h); return; @@ -1152,9 +1110,6 @@ reconnect (struct GNUNET_CORE_Handle *h) uint16_t *ts; unsigned int hpos; -#if DEBUG_CORE - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n"); -#endif GNUNET_assert (h->client == NULL); GNUNET_assert (h->currently_down == GNUNET_YES); h->client = GNUNET_CLIENT_connect ("core", h->cfg); @@ -1185,6 +1140,10 @@ reconnect (struct GNUNET_CORE_Handle *h) else opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND; } + LOG (GNUNET_ERROR_TYPE_INFO, + "(Re)connecting to CORE service, monitoring messages of type %u\n", + opt); + init->options = htonl (opt); ts = (uint16_t *) & init[1]; for (hpos = 0; hpos < h->hcnt; hpos++) @@ -1203,8 +1162,8 @@ reconnect (struct GNUNET_CORE_Handle *h) * @param cfg configuration to use * @param queue_size size of the per-peer message queue * @param cls closure for the various callbacks that follow (including handlers in the handlers array) - * @param init callback to call on timeout or once we have successfully - * connected to the core service; note that timeout is only meaningful if init is not NULL + * @param init callback to call once we have successfully + * connected to the core service * @param connects function to call on peer connect, can be NULL * @param disconnects function to call on peer disconnect / timeout, can be NULL * @param inbound_notify function to call for all inbound messages, can be NULL @@ -1255,9 +1214,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_assert (h->hcnt < (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct InitMessage)) / sizeof (uint16_t)); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n"); -#endif reconnect (h); return h; } @@ -1275,9 +1232,7 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) { struct ControlMessage *cm; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n"); -#endif if (handle->cth != NULL) { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); @@ -1401,18 +1356,14 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, GNUNET_break (handle->queue_size != 0); GNUNET_break (pr->queue_size == 1); GNUNET_free (th); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping transmission request: cannot drop queue head and limit is one\n"); -#endif return NULL; } if (priority <= minp->priority) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping transmission request: priority too low\n"); -#endif GNUNET_free (th); return NULL; /* priority too low */ } @@ -1440,9 +1391,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, th); pr->queue_size++; /* was the request queue previously empty? */ -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n"); -#endif if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) && (pr->next == NULL) && (pr->prev == NULL) && (handle->ready_peer_head != pr)) diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c index 4098b45b1..b57936baa 100644 --- a/src/core/gnunet-service-core_clients.c +++ b/src/core/gnunet-service-core_clients.c @@ -95,6 +95,11 @@ struct GSC_Client }; +/** + * Big "or" of all client options. + */ +static uint32_t all_client_options; + /** * Head of linked list of our clients. */ @@ -146,12 +151,10 @@ static void send_to_client (struct GSC_Client *client, const struct GNUNET_MessageHeader *msg, int can_drop) { -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Preparing to send %u bytes of message of type %u to client.\n", (unsigned int) ntohs (msg->size), (unsigned int) ntohs (msg->type)); -#endif GNUNET_SERVER_notification_context_unicast (notifier, client->client_handle, msg, can_drop); } @@ -207,7 +210,7 @@ type_match (uint16_t type, struct GSC_Client *c) * Send a message to all of our current clients that have the right * options set. * - * @param sender origin of the message (used to check that this peer is + * @param partner origin (or destination) of the message (used to check that this peer is * known to be connected to the respective client) * @param msg message to multicast * @param can_drop can this message be discarded if the queue is too long @@ -215,27 +218,33 @@ type_match (uint16_t type, struct GSC_Client *c) * @param type type of the embedded message, 0 for none */ static void -send_to_all_clients (const struct GNUNET_PeerIdentity *sender, +send_to_all_clients (const struct GNUNET_PeerIdentity *partner, const struct GNUNET_MessageHeader *msg, int can_drop, - int options, uint16_t type) + uint32_t options, uint16_t type) { struct GSC_Client *c; for (c = client_head; c != NULL; c = c->next) { - if ((0 == (options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) && - (GNUNET_YES == type_match (type, c))) - continue; /* not the full message, but we'd like the full one! */ - if ((0 == (c->options & options)) && (GNUNET_YES != type_match (type, c))) - continue; /* neither options nor type match permit the message */ -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending message to client interested in messages of type %u.\n", + if (! ( (0 != (c->options & options)) || + ( (0 != (options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) && + (GNUNET_YES == type_match (type, c)) ) ) ) + continue; /* neither options nor type match permit the message */ + if ( (0 != (options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)) && + ( (0 != (c->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) || + (GNUNET_YES == type_match (type, c)) ) ) + continue; + if ( (0 != (options & GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND)) && + (0 != (c->options & GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND)) ) + continue; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sending %u message with %u bytes to client interested in messages of type %u.\n", + options, + ntohs (msg->size), (unsigned int) type); -#endif GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (c->connectmap, - &sender->hashPubKey)); + &partner->hashPubKey)); send_to_client (c, msg, can_drop); } } @@ -283,6 +292,7 @@ handle_client_init (void *cls, struct GNUNET_SERVER_Client *client, c->client_handle = client; c->tcnt = msize / sizeof (uint16_t); c->options = ntohl (im->options); + all_client_options |= c->options; c->types = (const uint16_t *) &c[1]; c->connectmap = GNUNET_CONTAINER_multihashmap_create (16); GNUNET_assert (GNUNET_YES == @@ -295,11 +305,9 @@ handle_client_init (void *cls, struct GNUNET_SERVER_Client *client, wtypes[i] = ntohs (types[i]); GSC_TYPEMAP_add (wtypes, c->tcnt); GNUNET_CONTAINER_DLL_insert (client_head, client_tail, c); -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connecting to core service is interested in %u message types\n", (unsigned int) c->tcnt); -#endif /* send init reply message */ irm.header.size = htons (sizeof (struct InitReplyMessage)); irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY); @@ -338,11 +346,9 @@ handle_client_send_request (void *cls, struct GNUNET_SERVER_Client *client, } if (c->requests == NULL) c->requests = GNUNET_CONTAINER_multihashmap_create (16); -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client asked for transmission to `%s'\n", GNUNET_i2s (&req->peer)); -#endif is_loopback = (0 == memcmp (&req->peer, &GSC_my_identity, @@ -472,11 +478,9 @@ handle_client_send (void *cls, struct GNUNET_SERVER_Client *client, &sm->peer.hashPubKey, tc.car)); tc.cork = ntohl (sm->cork); -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client asked for transmission of %u bytes to `%s' %s\n", msize, GNUNET_i2s (&sm->peer), tc.cork ? "now" : ""); -#endif GNUNET_SERVER_mst_receive (client_mst, &tc, (const char *) &sm[1], msize, GNUNET_YES, GNUNET_NO); if (0 != @@ -509,19 +513,21 @@ client_tokenizer_callback (void *cls, void *client, memcmp (&car->target, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) { -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Delivering message of type %u to myself\n", ntohs (message->type)); -#endif GSC_CLIENTS_deliver_message (&GSC_my_identity, NULL, 0, message, - ntohs (message->size), - GNUNET_CORE_OPTION_SEND_FULL_INBOUND | - GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); + ntohs (message->size), + GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); + GSC_CLIENTS_deliver_message (&GSC_my_identity, NULL, 0, message, + sizeof (struct GNUNET_MessageHeader), + GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); GSC_CLIENTS_deliver_message (&GSC_my_identity, NULL, 0, message, - sizeof (struct GNUNET_MessageHeader), - GNUNET_CORE_OPTION_SEND_HDR_INBOUND | - GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); + ntohs (message->size), + GNUNET_CORE_OPTION_SEND_FULL_INBOUND); + GSC_CLIENTS_deliver_message (&GSC_my_identity, NULL, 0, message, + sizeof (struct GNUNET_MessageHeader), + GNUNET_CORE_OPTION_SEND_HDR_INBOUND); } else { @@ -530,6 +536,12 @@ client_tokenizer_callback (void *cls, void *client, "Delivering message of type %u to %s\n", ntohs (message->type), GNUNET_i2s (&car->target)); #endif + GSC_CLIENTS_deliver_message (&car->target, NULL, 0, message, + ntohs (message->size), + GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); + GSC_CLIENTS_deliver_message (&car->target, NULL, 0, message, + sizeof (struct GNUNET_MessageHeader), + GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); GSC_SESSIONS_transmit (car, message, tc->cork); } } @@ -592,6 +604,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) c->connectmap = NULL; GSC_TYPEMAP_remove (c->types, c->tcnt); GNUNET_free (c); + + /* recalculate 'all_client_options' */ + all_client_options = 0; + for (c = client_head; NULL != c ; c = c->next) + all_client_options |= c->options; } @@ -786,7 +803,8 @@ GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count, const struct GNUNET_MessageHeader *msg, - uint16_t msize, int options) + uint16_t msize, + uint32_t options) { size_t size = msize + sizeof (struct NotifyTrafficMessage) + @@ -809,15 +827,19 @@ GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, atsi_count = 0; size = msize + sizeof (struct NotifyTrafficMessage); } -#if DEBUG_CORE + if (! ( (0 != (all_client_options & options)) || + (0 != (options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) )) + return; /* no client cares about this message notification */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service passes message from `%4s' of type %u to client.\n", GNUNET_i2s (sender), (unsigned int) ntohs (msg->type)); -#endif GSC_SESSIONS_add_to_typemap (sender, ntohs (msg->type)); ntm = (struct NotifyTrafficMessage *) buf; ntm->header.size = htons (size); - ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND); + if (0 != (options & (GNUNET_CORE_OPTION_SEND_FULL_INBOUND | GNUNET_CORE_OPTION_SEND_HDR_INBOUND))) + ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND); + else + ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND); ntm->ats_count = htonl (atsi_count); ntm->peer = *sender; a = &ntm->ats; diff --git a/src/core/gnunet-service-core_clients.h b/src/core/gnunet-service-core_clients.h index bdad20da8..8ece1ce1d 100644 --- a/src/core/gnunet-service-core_clients.h +++ b/src/core/gnunet-service-core_clients.h @@ -105,7 +105,8 @@ GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count, const struct GNUNET_MessageHeader *msg, - uint16_t msize, int options); + uint16_t msize, + uint32_t options); /** -- cgit v1.2.3