From 8b2a3260e6aafc2ad31c8b7bff5f7d25b57bfc14 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 31 Jul 2016 08:46:59 +0000 Subject: -convert NSE to new core MQ API --- src/nse/gnunet-service-nse.c | 516 +++++++++++++++++++++++-------------------- 1 file changed, 272 insertions(+), 244 deletions(-) (limited to 'src/nse') diff --git a/src/nse/gnunet-service-nse.c b/src/nse/gnunet-service-nse.c index 7bf186e83..262f85c8d 100644 --- a/src/nse/gnunet-service-nse.c +++ b/src/nse/gnunet-service-nse.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010, 2011, 2012, 2013 GNUnet e.V. + Copyright (C) 2009, 2010, 2011, 2012, 2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -51,6 +51,7 @@ #include + /** * Should messages be delayed randomly? This option should be set to * #GNUNET_NO only for experiments, not in production. @@ -120,17 +121,17 @@ struct NSEPeerEntry /** * Core handle for sending messages to this peer. */ - struct GNUNET_CORE_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * What is the identity of the peer? */ - struct GNUNET_PeerIdentity id; + const struct GNUNET_PeerIdentity *id; /** * Task scheduled to send message to this peer. */ - struct GNUNET_SCHEDULER_Task * transmit_task; + struct GNUNET_SCHEDULER_Task *transmit_task; /** * Did we receive or send a message about the previous round @@ -434,11 +435,15 @@ handle_start_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received START message from client\n"); - GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_add (nc, + client); setup_estimate_message (&em); - GNUNET_SERVER_notification_context_unicast (nc, client, &em.header, + GNUNET_SERVER_notification_context_unicast (nc, + client, + &em.header, GNUNET_YES); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); } @@ -507,9 +512,11 @@ pow_hash (const void *buf, gcry_kdf_derive (buf, buf_len, GCRY_KDF_SCRYPT, 1 /* subalgo */, - "gnunet-proof-of-work", strlen ("gnunet-proof-of-work"), + "gnunet-proof-of-work", + strlen ("gnunet-proof-of-work"), 2 /* iterations; keep cost of individual op small */, - sizeof (struct GNUNET_HashCode), result)); + sizeof (struct GNUNET_HashCode), + result)); } @@ -559,13 +566,15 @@ get_transmit_delay (int round_offset) case -1: /* previous round is randomized between 0 and 50 ms */ #if USE_RANDOM_DELAYS - ret.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 50); + ret.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + 50); #else ret = GNUNET_TIME_UNIT_ZERO; #endif GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting previous round behind schedule in %s\n", - GNUNET_STRINGS_relative_time_to_string (ret, GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (ret, + GNUNET_YES)); return ret; case 0: /* current round is based on best-known matching_bits */ @@ -581,7 +590,8 @@ get_transmit_delay (int round_offset) GNUNET_STRINGS_relative_time_to_string (ret, GNUNET_YES)); /* now consider round start time and add delay to it */ - tgt = GNUNET_TIME_absolute_add (current_timestamp, ret); + tgt = GNUNET_TIME_absolute_add (current_timestamp, + ret); return GNUNET_TIME_absolute_get_remaining (tgt); } GNUNET_break (0); @@ -595,99 +605,62 @@ get_transmit_delay (int round_offset) * @param cls the `struct NSEPeerEntry *` */ static void -transmit_task_cb (void *cls); - - -/** - * Called when core is ready to send a message we asked for - * out to the destination. - * - * @param cls closure with the `struct NSEPeerEntry *` - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_ready (void *cls, - size_t size, - void *buf) +transmit_task_cb (void *cls) { struct NSEPeerEntry *peer_entry = cls; unsigned int idx; - - peer_entry->th = NULL; - if (NULL == buf) - { - /* client disconnected */ - return 0; - } - GNUNET_assert (size >= sizeof (struct GNUNET_NSE_FloodMessage)); + struct GNUNET_MQ_Envelope *env; + + peer_entry->transmit_task = NULL; idx = estimate_index; if (GNUNET_NO == peer_entry->previous_round) { idx = (idx + HISTORY_SIZE - 1) % HISTORY_SIZE; peer_entry->previous_round = GNUNET_YES; - peer_entry->transmit_task = - GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), - &transmit_task_cb, - peer_entry); + peer_entry->transmit_task + = GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), + &transmit_task_cb, + peer_entry); } if ((0 == ntohl (size_estimate_messages[idx].hop_count)) && (NULL != proof_task)) { GNUNET_STATISTICS_update (stats, "# flood messages not generated (no proof yet)", - 1, GNUNET_NO); - return 0; + 1, + GNUNET_NO); + return; } if (0 == ntohs (size_estimate_messages[idx].header.size)) { GNUNET_STATISTICS_update (stats, "# flood messages not generated (lack of history)", - 1, GNUNET_NO); - return 0; + 1, + GNUNET_NO); + return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "In round %s, sending to `%s' estimate with %u bits\n", GNUNET_STRINGS_absolute_time_to_string (GNUNET_TIME_absolute_ntoh (size_estimate_messages[idx].timestamp)), - GNUNET_i2s (&peer_entry->id), + GNUNET_i2s (peer_entry->id), (unsigned int) ntohl (size_estimate_messages[idx].matching_bits)); - if (ntohl (size_estimate_messages[idx].hop_count) == 0) - GNUNET_STATISTICS_update (stats, "# flood messages started", 1, GNUNET_NO); - GNUNET_STATISTICS_update (stats, "# flood messages transmitted", 1, + if (0 == ntohl (size_estimate_messages[idx].hop_count)) + GNUNET_STATISTICS_update (stats, + "# flood messages started", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + "# flood messages transmitted", + 1, GNUNET_NO); #if ENABLE_NSE_HISTOGRAM peer_entry->transmitted_messages++; - peer_entry->last_transmitted_size = - ntohl(size_estimate_messages[idx].matching_bits); + peer_entry->last_transmitted_size + = ntohl(size_estimate_messages[idx].matching_bits); #endif - GNUNET_memcpy (buf, &size_estimate_messages[idx], - sizeof (struct GNUNET_NSE_FloodMessage)); - return sizeof (struct GNUNET_NSE_FloodMessage); -} - - -/** - * Task that triggers a NSE P2P transmission. - * - * @param cls the `struct NSEPeerEntry *` - */ -static void -transmit_task_cb (void *cls) -{ - struct NSEPeerEntry *peer_entry = cls; - - peer_entry->transmit_task = NULL; - - GNUNET_assert (NULL == peer_entry->th); - peer_entry->th = - GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO, - NSE_PRIORITY, - GNUNET_TIME_UNIT_FOREVER_REL, - &peer_entry->id, - sizeof (struct - GNUNET_NSE_FloodMessage), - &transmit_ready, peer_entry); + env = GNUNET_MQ_msg_copy (&size_estimate_messages[idx].header); + GNUNET_MQ_send (peer_entry->mq, + env); } @@ -723,7 +696,8 @@ setup_flood_message (unsigned int slot, struct GNUNET_NSE_FloodMessage *fm; uint32_t matching_bits; - matching_bits = get_matching_bits (ts, &my_identity); + matching_bits = get_matching_bits (ts, + &my_identity); fm = &size_estimate_messages[slot]; fm->header.size = htons (sizeof (struct GNUNET_NSE_FloodMessage)); fm->header.type = htons (GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD); @@ -739,10 +713,13 @@ setup_flood_message (unsigned int slot, fm->proof_of_work = my_proof; if (nse_work_required > 0) GNUNET_assert (GNUNET_OK == - GNUNET_CRYPTO_eddsa_sign (my_private_key, &fm->purpose, - &fm->signature)); + GNUNET_CRYPTO_eddsa_sign (my_private_key, + &fm->purpose, + &fm->signature)); else - memset (&fm->signature, 0, sizeof (fm->signature)); + memset (&fm->signature, + 0, + sizeof (fm->signature)); } @@ -763,11 +740,6 @@ schedule_current_round (void *cls, struct NSEPeerEntry *peer_entry = value; struct GNUNET_TIME_Relative delay; - if (NULL != peer_entry->th) - { - peer_entry->previous_round = GNUNET_NO; - return GNUNET_OK; - } if (NULL != peer_entry->transmit_task) { GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); @@ -775,14 +747,16 @@ schedule_current_round (void *cls, } #if ENABLE_NSE_HISTOGRAM if (peer_entry->received_messages > 1) - GNUNET_STATISTICS_update(stats, "# extra messages", - peer_entry->received_messages - 1, GNUNET_NO); + GNUNET_STATISTICS_update(stats, + "# extra messages", + peer_entry->received_messages - 1, + GNUNET_NO); peer_entry->transmitted_messages = 0; peer_entry->last_transmitted_size = 0; peer_entry->received_messages = 0; #endif delay = - get_transmit_delay ((peer_entry->previous_round == GNUNET_NO) ? -1 : 0); + get_transmit_delay ((GNUNET_NO == peer_entry->previous_round) ? -1 : 0); peer_entry->transmit_task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_task_cb, @@ -809,7 +783,8 @@ update_flood_message (void *cls) /* somehow run early, delay more */ flood_task = GNUNET_SCHEDULER_add_delayed (offset, - &update_flood_message, NULL); + &update_flood_message, + NULL); return; } estimate_index = (estimate_index + 1) % HISTORY_SIZE; @@ -818,10 +793,10 @@ update_flood_message (void *cls) current_timestamp = next_timestamp; next_timestamp = GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval); - if ((current_timestamp.abs_value_us == - GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value_us) && - (get_matching_bits (current_timestamp, &my_identity) < - ntohl(next_message.matching_bits))) + if ( (current_timestamp.abs_value_us == + GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value_us) && + (get_matching_bits (current_timestamp, &my_identity) < + ntohl(next_message.matching_bits)) ) { /* we received a message for this round way early, use it! */ size_estimate_messages[estimate_index] = next_message; @@ -829,12 +804,13 @@ update_flood_message (void *cls) htonl (1 + ntohl (next_message.hop_count)); } else - setup_flood_message (estimate_index, current_timestamp); + setup_flood_message (estimate_index, + current_timestamp); next_message.matching_bits = htonl (0); /* reset for 'next' round */ hop_count_max = 0; for (i = 0; i < HISTORY_SIZE; i++) - hop_count_max = - GNUNET_MAX (ntohl (size_estimate_messages[i].hop_count), hop_count_max); + hop_count_max = GNUNET_MAX (ntohl (size_estimate_messages[i].hop_count), + hop_count_max); GNUNET_CONTAINER_multipeermap_iterate (peers, &schedule_current_round, NULL); @@ -858,7 +834,8 @@ count_leading_zeroes (const struct GNUNET_HashCode *hash) unsigned int hash_count; hash_count = 0; - while (0 == GNUNET_CRYPTO_hash_get_bit (hash, hash_count)) + while (0 == GNUNET_CRYPTO_hash_get_bit (hash, + hash_count)) hash_count++; return hash_count; } @@ -880,10 +857,15 @@ check_proof_of_work (const struct GNUNET_CRYPTO_EddsaPublicKey *pkey, sizeof (val)] GNUNET_ALIGN; struct GNUNET_HashCode result; - GNUNET_memcpy (buf, &val, sizeof (val)); - GNUNET_memcpy (&buf[sizeof (val)], pkey, - sizeof (struct GNUNET_CRYPTO_EddsaPublicKey)); - pow_hash (buf, sizeof (buf), &result); + GNUNET_memcpy (buf, + &val, + sizeof (val)); + GNUNET_memcpy (&buf[sizeof (val)], + pkey, + sizeof (struct GNUNET_CRYPTO_EddsaPublicKey)); + pow_hash (buf, + sizeof (buf), + &result); return (count_leading_zeroes (&result) >= nse_work_required) ? GNUNET_YES : GNUNET_NO; } @@ -898,15 +880,21 @@ write_proof () char *proof; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_filename (cfg, "NSE", "PROOFFILE", &proof)) + GNUNET_CONFIGURATION_get_value_filename (cfg, + "NSE", + "PROOFFILE", + &proof)) return; if (sizeof (my_proof) != - GNUNET_DISK_fn_write (proof, &my_proof, sizeof (my_proof), + GNUNET_DISK_fn_write (proof, + &my_proof, + sizeof (my_proof), GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE)) - GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "write", proof); + GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, + "write", + proof); GNUNET_free (proof); - } @@ -1021,18 +1009,15 @@ update_flood_times (void *cls, struct NSEPeerEntry *peer_entry = value; struct GNUNET_TIME_Relative delay; - if (NULL != peer_entry->th) - return GNUNET_OK; /* already active */ if (peer_entry == exclude) return GNUNET_OK; /* trigger of the update */ - if (peer_entry->previous_round == GNUNET_NO) + if (GNUNET_NO == peer_entry->previous_round) { /* still stuck in previous round, no point to update, check that * we are active here though... */ - if ( (NULL == peer_entry->transmit_task) && - (NULL == peer_entry->th) ) + if (NULL == peer_entry->transmit_task) { - GNUNET_break (0); + GNUNET_break (0); } return GNUNET_OK; } @@ -1052,18 +1037,15 @@ update_flood_times (void *cls, /** * Core handler for size estimate flooding messages. * - * @param cls closure unused - * @param message message - * @param peer peer identity this message is from (ignored) + * @param cls peer this message is from + * @param incoming_flood received message */ -static int -handle_p2p_size_estimate (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +static void +handle_p2p_estimate (void *cls, + const struct GNUNET_NSE_FloodMessage *incoming_flood) { - const struct GNUNET_NSE_FloodMessage *incoming_flood; + struct NSEPeerEntry *peer_entry = cls; struct GNUNET_TIME_Absolute ts; - struct NSEPeerEntry *peer_entry; uint32_t matching_bits; unsigned int idx; @@ -1078,8 +1060,10 @@ handle_p2p_size_estimate (void *cls, GNUNET_BIO_write_int64 (histogram, t); } #endif - incoming_flood = (const struct GNUNET_NSE_FloodMessage *) message; - GNUNET_STATISTICS_update (stats, "# flood messages received", 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, + "# flood messages received", + 1, + GNUNET_NO); matching_bits = ntohl (incoming_flood->matching_bits); #if DEBUG_NSE { @@ -1089,31 +1073,30 @@ handle_p2p_size_estimate (void *cls, GNUNET_snprintf (origin, sizeof (origin), - "%4s", + "%s", GNUNET_i2s (&incoming_flood->origin)); GNUNET_snprintf (pred, sizeof (pred), - "%4s", - GNUNET_i2s (peer)); + "%s", + GNUNET_i2s (peer_entry->id)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Flood at %s from `%s' via `%s' at `%s' with bits %u\n", GNUNET_STRINGS_absolute_time_to_string (GNUNET_TIME_absolute_ntoh (incoming_flood->timestamp)), - origin, pred, GNUNET_i2s (&my_identity), + origin, + pred, + GNUNET_i2s (&my_identity), (unsigned int) matching_bits); } #endif - peer_entry = GNUNET_CONTAINER_multipeermap_get (peers, peer); - if (NULL == peer_entry) - { - GNUNET_break (0); - return GNUNET_OK; - } #if ENABLE_NSE_HISTOGRAM peer_entry->received_messages++; if (peer_entry->transmitted_messages > 0 && peer_entry->last_transmitted_size >= matching_bits) - GNUNET_STATISTICS_update(stats, "# cross messages", 1, GNUNET_NO); + GNUNET_STATISTICS_update(stats, + "# cross messages", + 1, + GNUNET_NO); #endif ts = GNUNET_TIME_absolute_ntoh (incoming_flood->timestamp); @@ -1125,35 +1108,39 @@ handle_p2p_size_estimate (void *cls, else if (ts.abs_value_us == next_timestamp.abs_value_us) { if (matching_bits <= ntohl (next_message.matching_bits)) - return GNUNET_OK; /* ignore, simply too early/late */ - if (GNUNET_YES != verify_message_crypto (incoming_flood)) + return; /* ignore, simply too early/late */ + if (GNUNET_YES != + verify_message_crypto (incoming_flood)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer %s is likely ill-configured!\n", - GNUNET_i2s (peer)); + GNUNET_i2s (peer_entry->id)); GNUNET_break_op (0); - return GNUNET_OK; + return; } next_message = *incoming_flood; - return GNUNET_OK; + return; } else { GNUNET_STATISTICS_update (stats, "# flood messages discarded (clock skew too large)", 1, GNUNET_NO); - return GNUNET_OK; + return; } - if (0 == (memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))) + if (0 == (memcmp (peer_entry->id, + &my_identity, + sizeof (struct GNUNET_PeerIdentity)))) { /* send to self, update our own estimate IF this also comes from us! */ if (0 == memcmp (&incoming_flood->origin, &my_identity, sizeof (my_identity))) update_network_size_estimate (); - return GNUNET_OK; + return; } - if (matching_bits == ntohl (size_estimate_messages[idx].matching_bits)) + if (matching_bits == + ntohl (size_estimate_messages[idx].matching_bits)) { /* Cancel transmission in the other direction, as this peer clearly has up-to-date information already. Even if we didn't talk to this peer in @@ -1164,7 +1151,7 @@ handle_p2p_size_estimate (void *cls, { /* do not transmit information for the previous round to this peer anymore (but allow current round) */ - return GNUNET_OK; + return; } /* got up-to-date information for current round, cancel transmission to * this peer altogether */ @@ -1173,36 +1160,33 @@ handle_p2p_size_estimate (void *cls, GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); peer_entry->transmit_task = NULL; } - if (NULL != peer_entry->th) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th); - peer_entry->th = NULL; - } - return GNUNET_OK; + return; } if (matching_bits < ntohl (size_estimate_messages[idx].matching_bits)) { - if ((idx < estimate_index) && (peer_entry->previous_round == GNUNET_YES)) { + if ( (idx < estimate_index) && + (peer_entry->previous_round == GNUNET_YES)) + { peer_entry->previous_round = GNUNET_NO; } /* push back our result now, that peer is spreading bad information... */ - if (NULL == peer_entry->th) - { - if (peer_entry->transmit_task != NULL) - GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); - peer_entry->transmit_task = - GNUNET_SCHEDULER_add_now (&transmit_task_cb, peer_entry); - } + if (NULL != peer_entry->transmit_task) + GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); + peer_entry->transmit_task + = GNUNET_SCHEDULER_add_now (&transmit_task_cb, + peer_entry); /* Not closer than our most recent message, no need to do work here */ GNUNET_STATISTICS_update (stats, "# flood messages ignored (had closer already)", - 1, GNUNET_NO); - return GNUNET_OK; + 1, + GNUNET_NO); + return; } - if (GNUNET_YES != verify_message_crypto (incoming_flood)) + if (GNUNET_YES != + verify_message_crypto (incoming_flood)) { GNUNET_break_op (0); - return GNUNET_OK; + return; } GNUNET_assert (matching_bits > ntohl (size_estimate_messages[idx].matching_bits)); @@ -1212,23 +1196,19 @@ handle_p2p_size_estimate (void *cls, peer_entry->previous_round = GNUNET_YES; if (idx == estimate_index) { - /* cancel any activity for current round */ - if (peer_entry->transmit_task != NULL) - { - GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); - peer_entry->transmit_task = NULL; - } - if (peer_entry->th != NULL) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th); - peer_entry->th = NULL; - } + /* cancel any activity for current round */ + if (NULL != peer_entry->transmit_task) + { + GNUNET_SCHEDULER_cancel (peer_entry->transmit_task); + peer_entry->transmit_task = NULL; + } } size_estimate_messages[idx] = *incoming_flood; size_estimate_messages[idx].hop_count = htonl (ntohl (incoming_flood->hop_count) + 1); hop_count_max = - GNUNET_MAX (ntohl (incoming_flood->hop_count) + 1, hop_count_max); + GNUNET_MAX (ntohl (incoming_flood->hop_count) + 1, + hop_count_max); GNUNET_STATISTICS_set (stats, "# estimated network diameter", hop_count_max, GNUNET_NO); @@ -1237,9 +1217,9 @@ handle_p2p_size_estimate (void *cls, update_network_size_estimate (); /* flood to rest */ - GNUNET_CONTAINER_multipeermap_iterate (peers, &update_flood_times, + GNUNET_CONTAINER_multipeermap_iterate (peers, + &update_flood_times, peer_entry); - return GNUNET_OK; } @@ -1250,20 +1230,32 @@ handle_p2p_size_estimate (void *cls, * @param cls closure * @param peer peer identity this notification is about */ -static void +static void * handle_core_connect (void *cls, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { struct NSEPeerEntry *peer_entry; + uint64_t flags; + const void *extra; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%s' connected to us\n", GNUNET_i2s (peer)); + /* set our default transmission options */ + extra = GNUNET_CORE_get_mq_options (GNUNET_NO, + NSE_PRIORITY, + &flags); + GNUNET_MQ_set_options (mq, + flags, + extra); + /* create our peer entry for this peer */ peer_entry = GNUNET_new (struct NSEPeerEntry); - peer_entry->id = *peer; + peer_entry->id = peer; + peer_entry->mq = mq; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (peers, - peer, + peer_entry->id, peer_entry, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); peer_entry->transmit_task = @@ -1274,6 +1266,7 @@ handle_core_connect (void *cls, "# peers connected", 1, GNUNET_NO); + return peer_entry; } @@ -1283,36 +1276,32 @@ handle_core_connect (void *cls, * * @param cls closure * @param peer peer identity this notification is about + * @parma internal_cls the `struct NSEPeerEntry` for the @a peer */ static void handle_core_disconnect (void *cls, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + void *internal_cls) { - struct NSEPeerEntry *pos; + struct NSEPeerEntry *pos = internal_cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%s' disconnected from us\n", GNUNET_i2s (peer)); - pos = GNUNET_CONTAINER_multipeermap_get (peers, peer); - if (NULL == pos) - { - GNUNET_break (0); - return; - } GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (peers, peer, + GNUNET_CONTAINER_multipeermap_remove (peers, + peer, pos)); - if (pos->transmit_task != NULL) { + if (pos->transmit_task != NULL) + { GNUNET_SCHEDULER_cancel (pos->transmit_task); pos->transmit_task = NULL; } - if (NULL != pos->th) - { - GNUNET_CORE_notify_transmit_ready_cancel (pos->th); - pos->th = NULL; - } GNUNET_free (pos); - GNUNET_STATISTICS_update (stats, "# peers connected", -1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, + "# peers connected", + -1, + GNUNET_NO); } @@ -1360,7 +1349,7 @@ shutdown_task (void *cls) } if (NULL != core_api) { - GNUNET_CORE_disconnect (core_api); + GNUNET_CORE_disconnecT (core_api); core_api = NULL; } if (NULL != stats) @@ -1414,28 +1403,35 @@ core_init (void *cls, if (NULL == identity) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Connection to core FAILED!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Connection to core FAILED!\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (0 == - memcmp (&my_identity, identity, + memcmp (&my_identity, + identity, sizeof (struct GNUNET_PeerIdentity))); now = GNUNET_TIME_absolute_get (); current_timestamp.abs_value_us = (now.abs_value_us / gnunet_nse_interval.rel_value_us) * gnunet_nse_interval.rel_value_us; next_timestamp = - GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval); + GNUNET_TIME_absolute_add (current_timestamp, + gnunet_nse_interval); estimate_index = HISTORY_SIZE - 1; estimate_count = 0; - if (GNUNET_YES == check_proof_of_work (&my_identity.public_key, my_proof)) + if (GNUNET_YES == + check_proof_of_work (&my_identity.public_key, + my_proof)) { int idx = (estimate_index + HISTORY_SIZE - 1) % HISTORY_SIZE; prev_time.abs_value_us = current_timestamp.abs_value_us - gnunet_nse_interval.rel_value_us; - setup_flood_message (idx, prev_time); - setup_flood_message (estimate_index, current_timestamp); + setup_flood_message (idx, + prev_time); + setup_flood_message (estimate_index, + current_timestamp); estimate_count++; } flood_task = @@ -1445,22 +1441,25 @@ core_init (void *cls, NULL); } + #if ENABLE_NSE_HISTOGRAM /** * Function called with the status of the testbed logger service * * @param cls NULL - * @param status GNUNET_YES if the service is running, - * GNUNET_NO if the service is not running - * GNUNET_SYSERR if the configuration is invalid + * @param status #GNUNET_YES if the service is running, + * #GNUNET_NO if the service is not running + * #GNUNET_SYSERR if the configuration is invalid */ static void -status_cb (void *cls, int status) +status_cb (void *cls, + int status) { logger_test = NULL; if (GNUNET_YES != status) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Testbed logger not running\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Testbed logger not running\n"); return; } if (NULL == (lh = GNUNET_TESTBED_LOGGER_connect (cfg))) @@ -1485,15 +1484,17 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { + GNUNET_MQ_hd_fixed_size (p2p_estimate, + GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD, + struct GNUNET_NSE_FloodMessage); static const struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_start_message, NULL, GNUNET_MESSAGE_TYPE_NSE_START, sizeof (struct GNUNET_MessageHeader)}, {NULL, NULL, 0, 0} }; - static const struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_p2p_size_estimate, GNUNET_MESSAGE_TYPE_NSE_P2P_FLOOD, - sizeof (struct GNUNET_NSE_FloodMessage)}, - {NULL, 0, 0} + struct GNUNET_MQ_MessageHandler core_handlers[] = { + make_p2p_estimate_handler (NULL), + GNUNET_MQ_handler_end (), }; char *proof; struct GNUNET_CRYPTO_EddsaPrivateKey *pk; @@ -1501,29 +1502,38 @@ run (void *cls, cfg = c; srv = server; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (cfg, "NSE", "INTERVAL", + GNUNET_CONFIGURATION_get_value_time (cfg, + "NSE", + "INTERVAL", &gnunet_nse_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "NSE", "INTERVAL"); + "NSE", + "INTERVAL"); GNUNET_SCHEDULER_shutdown (); return; } if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (cfg, "NSE", "WORKDELAY", + GNUNET_CONFIGURATION_get_value_time (cfg, + "NSE", + "WORKDELAY", &proof_find_delay)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "NSE", "WORKDELAY"); + "NSE", + "WORKDELAY"); GNUNET_SCHEDULER_shutdown (); return; } if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (cfg, "NSE", "WORKBITS", + GNUNET_CONFIGURATION_get_value_number (cfg, + "NSE", + "WORKBITS", &nse_work_required)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "NSE", "WORKBITS"); + "NSE", + "WORKBITS"); GNUNET_SCHEDULER_shutdown (); return; } @@ -1543,21 +1553,28 @@ run (void *cls, char *histogram_fn; if (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_filename (cfg, "NSE", "HISTOGRAM_DIR", + GNUNET_CONFIGURATION_get_value_filename (cfg, + "NSE", + "HISTOGRAM_DIR", &histogram_dir)) { - GNUNET_assert (0 < GNUNET_asprintf (&histogram_fn, "%s/timestamps", + GNUNET_assert (0 < GNUNET_asprintf (&histogram_fn, + "%s/timestamps", histogram_dir)); GNUNET_free (histogram_dir); histogram = GNUNET_BIO_write_open (histogram_fn); - GNUNET_free (histogram_fn); if (NULL == histogram) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unable to open histogram file\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Unable to open histogram file `%s'\n", + histogram_fn); + GNUNET_free (histogram_fn); } logger_test = - GNUNET_CLIENT_service_test ("testbed-logger", cfg, + GNUNET_CLIENT_service_test ("testbed-logger", + cfg, GNUNET_TIME_UNIT_SECONDS, - &status_cb, NULL); + &status_cb, + NULL); } #endif @@ -1568,11 +1585,16 @@ run (void *cls, GNUNET_assert (NULL != pk); my_private_key = pk; GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, - &my_identity.public_key); + &my_identity.public_key); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_filename (cfg, "NSE", "PROOFFILE", &proof)) + GNUNET_CONFIGURATION_get_value_filename (cfg, + "NSE", + "PROOFFILE", + &proof)) { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "NSE", "PROOFFILE"); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "NSE", + "PROOFFILE"); GNUNET_free (my_private_key); my_private_key = NULL; GNUNET_SCHEDULER_shutdown (); @@ -1580,33 +1602,35 @@ run (void *cls, } if ((GNUNET_YES != GNUNET_DISK_file_test (proof)) || (sizeof (my_proof) != - GNUNET_DISK_fn_read (proof, &my_proof, sizeof (my_proof)))) + GNUNET_DISK_fn_read (proof, + &my_proof, + sizeof (my_proof)))) my_proof = 0; GNUNET_free (proof); proof_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, - &find_proof, NULL); + &find_proof, + NULL); - peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); - GNUNET_SERVER_add_handlers (srv, handlers); + peers = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); + GNUNET_SERVER_add_handlers (srv, + handlers); nc = GNUNET_SERVER_notification_context_create (srv, 1); /* Connect to core service and register core handlers */ - core_api = GNUNET_CORE_connect (cfg, /* Main configuration */ - NULL, /* Closure passed to functions */ - &core_init, /* Call core_init once connected */ - &handle_core_connect, /* Handle connects */ - &handle_core_disconnect, /* Handle disconnects */ - NULL, /* Don't want notified about all incoming messages */ - GNUNET_NO, /* For header only inbound notification */ - NULL, /* Don't want notified about all outbound messages */ - GNUNET_NO, /* For header only outbound notification */ - core_handlers); /* Register these handlers */ + core_api = GNUNET_CORE_connecT (cfg, /* Main configuration */ + NULL, /* Closure passed to functions */ + &core_init, /* Call core_init once connected */ + &handle_core_connect, /* Handle connects */ + &handle_core_disconnect, /* Handle disconnects */ + core_handlers); /* Register these handlers */ if (NULL == core_api) { GNUNET_SCHEDULER_shutdown (); return; } - stats = GNUNET_STATISTICS_create ("nse", cfg); + stats = GNUNET_STATISTICS_create ("nse", + cfg); } @@ -1622,8 +1646,12 @@ main (int argc, char *const *argv) { return (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "nse", GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; + GNUNET_SERVICE_run (argc, + argv, + "nse", + GNUNET_SERVICE_OPTION_NONE, + &run, + NULL)) ? 0 : 1; } -- cgit v1.2.3