/*
This file is part of GNUnet.
Copyright (C) 2009-2017, 2021 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
* * Note that we should not ALWAYS select the closest peer to the * target, peers further away from the target should be chosen with * exponentially declining probability. * * FIXME: double-check that this is fine * * * @param key the key we are selecting a peer to route to * @param bloom a bloomfilter containing entries this request has seen already * @param hops how many hops has this message traversed thus far * @return Peer to route to, or NULL on error */ static struct PeerInfo * select_peer (const struct GNUNET_HashCode *key, const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops) { unsigned int bc; unsigned int count; unsigned int selected; struct PeerInfo *pos; struct PeerInfo *chosen; if (hops >= GDS_NSE_get ()) { /* greedy selection (closest peer that is not in bloomfilter) */ unsigned int best_bucket = 0; uint64_t best_in_bucket = UINT64_MAX; chosen = NULL; for (bc = 0; bc <= closest_bucket; bc++) { count = 0; for (pos = k_buckets[bc].head; (pos != NULL) && (count < bucket_size); pos = pos->next) { struct GNUNET_HashCode xor; unsigned int bucket; uint64_t dist; GNUNET_CRYPTO_hash_xor (key, &pos->phash, &xor); bucket = GNUNET_CRYPTO_hash_count_leading_zeros (&xor); dist = GNUNET_CRYPTO_hash_bucket_distance (&xor, bucket); if (bucket < best_bucket) continue; if (dist > best_in_bucket) continue; best_bucket = bucket; best_in_bucket = dist; if ( (NULL == bloom) || (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash)) ) { chosen = pos; } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in greedy routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ( "# Peers excluded from routing due to Bloomfilter"), 1, GNUNET_NO); chosen = NULL; } count++; } } if (NULL == chosen) GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peer selection failed"), 1, GNUNET_NO); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected peer `%s' in greedy routing for %s\n", GNUNET_i2s (chosen->id), GNUNET_h2s (key)); return chosen; } /* select "random" peer */ /* count number of peers that are available and not filtered */ count = 0; for (bc = 0; bc <= closest_bucket; bc++) { pos = k_buckets[bc].head; while ((NULL != pos) && (count < bucket_size)) { if ((NULL != bloom) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash))) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ( "# Peers excluded from routing due to Bloomfilter"), 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in random routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); pos = pos->next; continue; /* Ignore bloomfiltered peers */ } count++; pos = pos->next; } } if (0 == count) /* No peers to select from! */ { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peer selection failed"), 1, GNUNET_NO); return NULL; } /* Now actually choose a peer */ selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count); count = 0; for (bc = 0; bc <= closest_bucket; bc++) { for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next) { if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash))) { continue; /* Ignore bloomfiltered peers */ } if (0 == selected--) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected peer `%s' in random routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); return pos; } } } GNUNET_break (0); return NULL; } /** * Compute the set of peers that the given request should be * forwarded to. * * @param key routing key * @param bloom bloom filter excluding peers as targets, all selected * peers will be added to the bloom filter * @param hop_count number of hops the request has traversed so far * @param target_replication desired number of replicas * @param targets where to store an array of target peers (to be * free'd by the caller) * @return number of peers returned in 'targets'. */ static unsigned int get_target_peers (const struct GNUNET_HashCode *key, struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hop_count, uint32_t target_replication, struct PeerInfo ***targets) { unsigned int ret; unsigned int off; struct PeerInfo **rtargets; struct PeerInfo *nxt; GNUNET_assert (NULL != bloom); ret = get_forward_count (hop_count, target_replication); if (0 == ret) { *targets = NULL; return 0; } rtargets = GNUNET_new_array (ret, struct PeerInfo *); for (off = 0; off < ret; off++) { nxt = select_peer (key, bloom, hop_count); if (NULL == nxt) break; rtargets[off] = nxt; GNUNET_break (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &nxt->phash)); GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->phash); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected %u/%u peers at hop %u for %s (target was %u)\n", off, GNUNET_CONTAINER_multipeermap_size (all_connected_peers), (unsigned int) hop_count, GNUNET_h2s (key), ret); if (0 == off) { GNUNET_free (rtargets); *targets = NULL; return 0; } *targets = rtargets; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Forwarding query `%s' to %u peers (goal was %u peers)\n", GNUNET_h2s (key), off, ret); return off; } enum GNUNET_GenericReturnValue GDS_NEIGHBOURS_handle_put (const struct GDS_DATACACHE_BlockData *bd, enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, uint32_t hop_count, struct GNUNET_CONTAINER_BloomFilter *bf) { unsigned int target_count; unsigned int i; struct PeerInfo **targets; struct PeerInfo *target; size_t msize; struct GNUNET_MQ_Envelope *env; struct PeerPutMessage *ppm; struct GNUNET_PeerIdentity *pp; unsigned int skip_count; unsigned int put_path_length = bd->put_path_length; GNUNET_assert (NULL != bf); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to PUT bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (&bd->key)); GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash); GNUNET_STATISTICS_update (GDS_stats, "# PUT requests routed", 1, GNUNET_NO); target_count = get_target_peers (&bd->key, bf, hop_count, desired_replication_level, &targets); if (0 == target_count) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s terminates after %u hops at %s\n", GNUNET_h2s (&bd->key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } msize = bd->put_path_length * sizeof(struct GNUNET_PeerIdentity) + bd->data_size; if (msize + sizeof(struct PeerPutMessage) >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { put_path_length = 0; msize = bd->data_size; } if (msize + sizeof(struct PeerPutMessage) >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, "# PUT messages queued for transmission", target_count, GNUNET_NO); skip_count = 0; for (i = 0; i < target_count; i++) { target = targets[i]; if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ( "# P2P messages dropped due to full queue"), 1, GNUNET_NO); skip_count++; continue; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (&bd->key), (unsigned int) hop_count, GNUNET_i2s (target->id)); env = GNUNET_MQ_msg_extra (ppm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); ppm->options = htonl (options); ppm->type = htonl (bd->type); ppm->hop_count = htonl (hop_count + 1); ppm->desired_replication_level = htonl (desired_replication_level); ppm->put_path_length = htonl (put_path_length); ppm->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &target->phash)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data (bf, ppm->bloomfilter, DHT_BLOOM_SIZE)); ppm->key = bd->key; pp = (struct GNUNET_PeerIdentity *) &ppm[1]; GNUNET_memcpy (pp, bd->put_path, sizeof(struct GNUNET_PeerIdentity) * put_path_length); GNUNET_memcpy (&pp[put_path_length], bd->data, bd->data_size); GNUNET_MQ_send (target->mq, env); } GNUNET_free (targets); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; } enum GNUNET_GenericReturnValue GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, uint32_t hop_count, const struct GNUNET_HashCode *key, const void *xquery, size_t xquery_size, struct GNUNET_BLOCK_Group *bg, struct GNUNET_CONTAINER_BloomFilter *peer_bf) { unsigned int target_count; struct PeerInfo **targets; struct PeerInfo *target; struct GNUNET_MQ_Envelope *env; size_t msize; struct PeerGetMessage *pgm; char *xq; size_t reply_bf_size; void *reply_bf; unsigned int skip_count; uint32_t bf_nonce; GNUNET_assert (NULL != peer_bf); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"), 1, GNUNET_NO); target_count = get_target_peers (key, peer_bf, hop_count, desired_replication_level, &targets); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to GET bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (key)); GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash); if (0 == target_count) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s terminates after %u hops at %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } if (GNUNET_OK != GNUNET_BLOCK_group_serialize (bg, &bf_nonce, &reply_bf, &reply_bf_size)) { reply_bf = NULL; reply_bf_size = 0; bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); } msize = xquery_size + reply_bf_size; if (msize + sizeof(struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free (reply_bf); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ( "# GET messages queued for transmission"), target_count, GNUNET_NO); /* forward request */ skip_count = 0; for (unsigned int i = 0; i < target_count; i++) { target = targets[i]; if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ( "# P2P messages dropped due to full queue"), 1, GNUNET_NO); skip_count++; continue; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (target->id)); env = GNUNET_MQ_msg_extra (pgm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_GET); pgm->options = htonl (options); pgm->type = htonl (type); pgm->hop_count = htonl (hop_count + 1); pgm->desired_replication_level = htonl (desired_replication_level); pgm->xquery_size = htonl (xquery_size); pgm->bf_mutator = bf_nonce; GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &target->phash)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf, pgm->bloomfilter, DHT_BLOOM_SIZE)); pgm->key = *key; xq = (char *) &pgm[1]; GNUNET_memcpy (xq, xquery, xquery_size); GNUNET_memcpy (&xq[xquery_size], reply_bf, reply_bf_size); GNUNET_MQ_send (target->mq, env); } GNUNET_free (targets); GNUNET_free (reply_bf); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; } void GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, const struct GDS_DATACACHE_BlockData *bd, const struct GNUNET_HashCode *query_hash, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path) { struct PeerInfo *pi; struct GNUNET_MQ_Envelope *env; struct PeerResultMessage *prm; struct GNUNET_PeerIdentity *paths; size_t msize; msize = bd->data_size + (get_path_length + bd->put_path_length) * sizeof(struct GNUNET_PeerIdentity); if ( (msize + sizeof(struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) || (get_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) || (bd->put_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) || (bd->data_size > GNUNET_MAX_MESSAGE_SIZE)) { GNUNET_break (0); return; } pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers, target); if (NULL == pi) { /* peer disconnected in the meantime, drop reply */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No matching peer for reply for key %s\n", GNUNET_h2s (query_hash)); return; } if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, "# P2P messages dropped due to full queue", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer queue full, ignoring reply for key %s\n", GNUNET_h2s (&bd->key)); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Forwarding reply for key %s to peer %s\n", GNUNET_h2s (query_hash), GNUNET_i2s (target)); GNUNET_STATISTICS_update (GDS_stats, "# RESULT messages queued for transmission", 1, GNUNET_NO); env = GNUNET_MQ_msg_extra (prm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); prm->type = htonl (bd->type); prm->put_path_length = htonl (bd->put_path_length); prm->get_path_length = htonl (get_path_length); prm->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); prm->key = *query_hash; paths = (struct GNUNET_PeerIdentity *) &prm[1]; GNUNET_memcpy (paths, bd->put_path, bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[bd->put_path_length], get_path, get_path_length * sizeof(struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[bd->put_path_length + get_path_length], bd->data, bd->data_size); GNUNET_MQ_send (pi->mq, env); } /** * To be called on core init/fail. * * @param cls service closure * @param identity the public identity of this peer */ static void core_init (void *cls, const struct GNUNET_PeerIdentity *identity) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "CORE called, I am %s\n", GNUNET_i2s (identity)); my_identity = *identity; GNUNET_CRYPTO_hash (identity, sizeof(struct GNUNET_PeerIdentity), &my_identity_hash); GNUNET_SERVICE_resume (GDS_service); } /** * Check validity of a p2p put request. * * @param cls closure with the `struct PeerInfo` of the sender * @param message message * @return #GNUNET_OK if the message is valid */ static enum GNUNET_GenericReturnValue check_dht_p2p_put (void *cls, const struct PeerPutMessage *put) { uint32_t putlen; uint16_t msize; (void) cls; msize = ntohs (put->header.size); putlen = ntohl (put->put_path_length); if ((msize < sizeof(struct PeerPutMessage) + putlen * sizeof(struct GNUNET_PeerIdentity)) || (putlen > GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Core handler for p2p put requests. * * @param cls closure with the `struct PeerInfo` of the sender * @param message message */ static void handle_dht_p2p_put (void *cls, const struct PeerPutMessage *put) { struct PeerInfo *peer = cls; uint16_t msize = ntohs (put->header.size); enum GNUNET_DHT_RouteOption options = (enum GNUNET_DHT_RouteOption) ntohl (put->options); struct GDS_DATACACHE_BlockData bd = { .key = put->key, .expiration_time = GNUNET_TIME_absolute_ntoh (put->expiration_time), .type = ntohl (put->type) }; const struct GNUNET_PeerIdentity *put_path = (const struct GNUNET_PeerIdentity *) &put[1]; uint32_t putlen = ntohl (put->put_path_length); bd.data_size = msize - (sizeof(*put) + putlen * sizeof(struct GNUNET_PeerIdentity)); bd.data = &put_path[putlen]; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", GNUNET_h2s (&put->key), GNUNET_i2s (peer->id)); if (GNUNET_TIME_absolute_is_past (bd.expiration_time)) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Expired PUTs discarded"), 1, GNUNET_NO); return; } GNUNET_STATISTICS_update (GDS_stats, "# P2P PUT requests received", 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, "# P2P PUT bytes received", msize, GNUNET_NO); if (GNUNET_YES == log_route_details_stderr) { char *tmp; char *pp; struct GNUNET_HashCode mxor; struct GNUNET_HashCode pxor; GNUNET_CRYPTO_hash_xor (&my_identity_hash, &put->key, &mxor); GNUNET_CRYPTO_hash_xor (&peer->phash, &put->key, &pxor); pp = GNUNET_STRINGS_pp2s (put_path, putlen); tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n", GNUNET_h2s (&put->key), GNUNET_i2s (peer->id), tmp, ntohl (put->hop_count), GNUNET_CRYPTO_hash_count_leading_zeros (&pxor), GNUNET_CRYPTO_hash_count_leading_zeros (&mxor), pp); GNUNET_free (pp); GNUNET_free (tmp); } { struct GNUNET_HashCode test_key; enum GNUNET_GenericReturnValue ret; ret = GNUNET_BLOCK_get_key (GDS_block_context, bd.type, bd.data, bd.data_size, &test_key); switch (ret) { case GNUNET_YES: if (0 != GNUNET_memcmp (&test_key, &bd.key)) { GNUNET_break_op (0); return; } break; case GNUNET_NO: GNUNET_break_op (0); return; case GNUNET_SYSERR: /* cannot verify, good luck */ break; } } if (GNUNET_NO == GNUNET_BLOCK_check_block (GDS_block_context, bd.type, &bd.key, bd.data, bd.data_size)) { GNUNET_break_op (0); return; } { struct GNUNET_CONTAINER_BloomFilter *bf; struct GNUNET_PeerIdentity pp[putlen + 1]; bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &peer->phash)); /* extend 'put path' by sender */ bd.put_path = (const struct GNUNET_PeerIdentity *) pp; bd.put_path_length = putlen + 1; if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE)) { #if SANITY_CHECKS for (unsigned int i = 0; i <= putlen; i++) { for (unsigned int j = 0; j < i; j++) { GNUNET_break (0 != GNUNET_memcmp (&pp[i], &pp[j])); } GNUNET_break (0 != GNUNET_memcmp (&pp[i], peer->id)); } #endif GNUNET_memcpy (pp, put_path, putlen * sizeof(struct GNUNET_PeerIdentity)); pp[putlen] = *peer->id; putlen++; } else { bd.put_path_length = 0; } /* give to local clients */ GDS_CLIENTS_handle_reply (&bd, &bd.key, 0, NULL /* get path */); /* store locally */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (GDS_am_closest_peer (&put->key, bf))) GDS_DATACACHE_handle_put (&bd); { enum GNUNET_GenericReturnValue forwarded; /* route to other peers */ forwarded = GDS_NEIGHBOURS_handle_put (&bd, options, ntohl (put->desired_replication_level), ntohl (put->hop_count), bf); /* notify monitoring clients */ GDS_CLIENTS_process_put (options | ((GNUNET_OK == forwarded) ? GNUNET_DHT_RO_LAST_HOP : 0), &bd, ntohl (put->hop_count), ntohl (put->desired_replication_level)); } GNUNET_CONTAINER_bloomfilter_free (bf); } } /** * We have received a FIND PEER request. Send matching * HELLOs back. * * @param sender sender of the FIND PEER request * @param key peers close to this key are desired * @param bg group for filtering peers */ static void handle_find_peer (const struct GNUNET_PeerIdentity *sender, const struct GNUNET_HashCode *query_hash, struct GNUNET_BLOCK_Group *bg) { int bucket_idx; struct PeerBucket *bucket; struct PeerInfo *peer; unsigned int choice; struct GDS_DATACACHE_BlockData bd = { .type = GNUNET_BLOCK_TYPE_DHT_HELLO }; /* first, check about our own HELLO */ if (NULL != GDS_my_hello) { bd.expiration_time = GNUNET_TIME_relative_to_absolute ( hello_expiration), bd.key = my_identity_hash, bd.data = GDS_my_hello; bd.data_size = GNUNET_HELLO_size ( (const struct GNUNET_HELLO_Message *) GDS_my_hello); GNUNET_break (bd.data_size >= sizeof(struct GNUNET_MessageHeader)); if (GNUNET_BLOCK_REPLY_OK_MORE == GNUNET_BLOCK_check_reply (GDS_block_context, GNUNET_BLOCK_TYPE_DHT_HELLO, bg, &my_identity_hash, NULL, 0, bd.data, bd.data_size)) { GDS_NEIGHBOURS_handle_reply (sender, &bd, query_hash, 0, NULL /* get path */); } else { GNUNET_STATISTICS_update (GDS_stats, "# FIND PEER requests ignored due to Bloomfilter", 1, GNUNET_NO); } } else { GNUNET_STATISTICS_update (GDS_stats, "# FIND PEER requests ignored due to lack of HELLO", 1, GNUNET_NO); } /* then, also consider sending a random HELLO from the closest bucket */ /* FIXME: How can this be true? Shouldnt we just do find_bucket() ? */ if (0 == GNUNET_memcmp (&my_identity_hash, query_hash)) bucket_idx = closest_bucket; else bucket_idx = GNUNET_MIN ((int) closest_bucket, find_bucket (query_hash)); if (bucket_idx < 0) return; bucket = &k_buckets[bucket_idx]; if (bucket->peers_size == 0) return; choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size); peer = bucket->head; while (choice > 0) { GNUNET_assert (NULL != peer); peer = peer->next; choice--; } choice = bucket->peers_size; { const struct GNUNET_HELLO_Message *hello; size_t hello_size; do { peer = peer->next; if (0 == choice--) return; /* no non-masked peer available */ if (NULL == peer) peer = bucket->head; hello = GDS_HELLO_get (peer->id); } while ( (NULL == hello) || (GNUNET_BLOCK_REPLY_OK_MORE != GNUNET_BLOCK_check_reply ( GDS_block_context, GNUNET_BLOCK_TYPE_DHT_HELLO, bg, &peer->phash, NULL, 0, /* xquery */ hello, (hello_size = GNUNET_HELLO_size (hello))))); bd.expiration_time = GNUNET_TIME_relative_to_absolute ( GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION); bd.key = peer->phash; bd.data = hello; bd.data_size = hello_size; GDS_NEIGHBOURS_handle_reply (sender, &bd, query_hash, 0, NULL /* get path */); } } /** * Handle an exact result from local datacache for a GET operation. * * @param cls the `struct PeerInfo` for which this is a reply * @param bd details about the block we found locally */ static void handle_local_result (void *cls, const struct GDS_DATACACHE_BlockData *bd) { struct PeerInfo *peer = cls; { char *pp; pp = GNUNET_STRINGS_pp2s (bd->put_path, bd->put_path_length); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found local result for %s (PP: %s)\n", GNUNET_h2s (&bd->key), pp); GNUNET_free (pp); } GDS_NEIGHBOURS_handle_reply (peer->id, bd, &bd->key, 0, NULL /* get path */); } /** * Check validity of p2p get request. * * @param cls closure with the `struct PeerInfo` of the sender * @param get the message * @return #GNUNET_OK if the message is well-formed */ static enum GNUNET_GenericReturnValue check_dht_p2p_get (void *cls, const struct PeerGetMessage *get) { uint32_t xquery_size; uint16_t msize; (void) cls; msize = ntohs (get->header.size); xquery_size = ntohl (get->xquery_size); if (msize < sizeof(struct PeerGetMessage) + xquery_size) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Core handler for p2p get requests. * * @param cls closure with the `struct PeerInfo` of the sender * @param get the message */ static void handle_dht_p2p_get (void *cls, const struct PeerGetMessage *get) { struct PeerInfo *peer = cls; uint32_t xquery_size; size_t reply_bf_size; uint16_t msize; enum GNUNET_BLOCK_Type type; enum GNUNET_DHT_RouteOption options; enum GNUNET_BLOCK_ReplyEvaluationResult eval = GNUNET_BLOCK_REPLY_OK_MORE; struct GNUNET_BLOCK_Group *bg; struct GNUNET_CONTAINER_BloomFilter *peer_bf; const void *xquery; enum GNUNET_GenericReturnValue forwarded; /* parse and validate message */ msize = ntohs (get->header.size); xquery_size = ntohl (get->xquery_size); reply_bf_size = msize - (sizeof(struct PeerGetMessage) + xquery_size); type = ntohl (get->type); options = ntohl (get->options); xquery = (const void *) &get[1]; GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET requests received"), 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET bytes received"), msize, GNUNET_NO); if (GNUNET_YES == log_route_details_stderr) { char *tmp; struct GNUNET_HashCode mxor; struct GNUNET_HashCode pxor; GNUNET_CRYPTO_hash_xor (&my_identity_hash, &get->key, &mxor); GNUNET_CRYPTO_hash_xor (&peer->phash, &get->key, &pxor); tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n", GNUNET_h2s (&get->key), GNUNET_i2s (peer->id), tmp, ntohl (get->hop_count), GNUNET_CRYPTO_hash_count_leading_zeros (&pxor), GNUNET_CRYPTO_hash_count_leading_zeros (&mxor), ntohl (get->xquery_size), (const char *) xquery); GNUNET_free (tmp); } if (GNUNET_NO == GNUNET_BLOCK_check_query (GDS_block_context, type, &get->key, xquery, xquery_size)) { /* request invalid */ GNUNET_break_op (0); return; } peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &peer->phash)); bg = GNUNET_BLOCK_group_create (GDS_block_context, type, get->bf_mutator, xquery + xquery_size, reply_bf_size, "filter-size", reply_bf_size, NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n", GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity), (unsigned int) ntohl (get->hop_count)); /* local lookup (this may update the reply_bf) */ if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (GDS_am_closest_peer (&get->key, peer_bf)) ) { if ((0 != (options & GNUNET_DHT_RO_FIND_PEER))) { GNUNET_STATISTICS_update (GDS_stats, "# P2P FIND PEER requests processed", 1, GNUNET_NO); handle_find_peer (peer->id, &get->key, bg); } else { eval = GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size, bg, &handle_local_result, peer); } } else { GNUNET_STATISTICS_update (GDS_stats, "# P2P GET requests ONLY routed", 1, GNUNET_NO); } /* remember request for routing replies */ GDS_ROUTING_add (peer->id, type, bg, /* bg now owned by routing, but valid at least until end of this function! */ options, &get->key, xquery, xquery_size); /* P2P forwarding */ forwarded = GNUNET_NO; if (eval != GNUNET_BLOCK_REPLY_OK_LAST) forwarded = GDS_NEIGHBOURS_handle_get (type, options, ntohl ( get->desired_replication_level), ntohl (get->hop_count), &get->key, xquery, xquery_size, bg, peer_bf); GDS_CLIENTS_process_get (options | ((GNUNET_OK == forwarded) ? GNUNET_DHT_RO_LAST_HOP : 0), type, ntohl (get->hop_count), ntohl (get->desired_replication_level), 0, NULL, &get->key); /* clean up; note that 'bg' is owned by routing now! */ GNUNET_CONTAINER_bloomfilter_free (peer_bf); } /** * Process a reply, after the @a get_path has been updated. * * @param bd block details * @param query_hash hash of the original query, might not match key in @a bd * @param get_path_length number of entries in @a get_path * @param get_path path the reply has taken */ static void process_reply_with_path (const struct GDS_DATACACHE_BlockData *bd, const struct GNUNET_HashCode *query_hash, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path) { /* forward to local clients */ GDS_CLIENTS_handle_reply (bd, query_hash, get_path_length, get_path); GDS_CLIENTS_process_get_resp (bd, get_path, get_path_length); if (GNUNET_YES == cache_results) { struct GNUNET_PeerIdentity xput_path[GNUNET_NZL (get_path_length + bd->put_path_length)]; struct GDS_DATACACHE_BlockData bdx = *bd; GNUNET_memcpy (xput_path, bd->put_path, bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); GNUNET_memcpy (&xput_path[bd->put_path_length], get_path, get_path_length * sizeof(struct GNUNET_PeerIdentity)); bdx.put_path = xput_path; bdx.put_path_length += get_path_length; GDS_DATACACHE_handle_put (&bdx); } /* forward to other peers */ GDS_ROUTING_process (bd, query_hash, get_path_length, get_path); } /** * Check validity of p2p result message. * * @param cls closure * @param message message * @return #GNUNET_YES if the message is well-formed */ static enum GNUNET_GenericReturnValue check_dht_p2p_result (void *cls, const struct PeerResultMessage *prm) { uint32_t get_path_length = ntohl (prm->get_path_length); uint32_t put_path_length = ntohl (prm->put_path_length); uint16_t msize = ntohs (prm->header.size); (void) cls; if ( (msize < sizeof(struct PeerResultMessage) + (get_path_length + put_path_length) * sizeof(struct GNUNET_PeerIdentity)) || (get_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) || (put_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Core handler for p2p result messages. * * @param cls closure * @param message message */ static void handle_dht_p2p_result (void *cls, const struct PeerResultMessage *prm) { struct PeerInfo *peer = cls; uint16_t msize = ntohs (prm->header.size); uint32_t get_path_length = ntohl (prm->get_path_length); struct GDS_DATACACHE_BlockData bd = { .expiration_time = GNUNET_TIME_absolute_ntoh (prm->expiration_time), .put_path = (const struct GNUNET_PeerIdentity *) &prm[1], .put_path_length = ntohl (prm->put_path_length), .type = ntohl (prm->type) }; const struct GNUNET_PeerIdentity *get_path = &bd.put_path[bd.put_path_length]; /* parse and validate message */ if (GNUNET_TIME_absolute_is_past (bd.expiration_time)) { GNUNET_STATISTICS_update (GDS_stats, "# Expired results discarded", 1, GNUNET_NO); return; } get_path = &bd.put_path[bd.put_path_length]; bd.data = (const void *) &get_path[get_path_length]; bd.data_size = msize - (sizeof(struct PeerResultMessage) + (get_path_length + bd.put_path_length) * sizeof(struct GNUNET_PeerIdentity)); GNUNET_STATISTICS_update (GDS_stats, "# P2P RESULTS received", 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, "# P2P RESULT bytes received", msize, GNUNET_NO); { enum GNUNET_GenericReturnValue ret; const struct GNUNET_HashCode *pquery; ret = GNUNET_BLOCK_get_key (GDS_block_context, bd.type, bd.data, bd.data_size, &bd.key); if (GNUNET_NO == ret) { GNUNET_break_op (0); return; } pquery = (GNUNET_OK == ret) ? &bd.key : &prm->key; if (GNUNET_OK != GNUNET_BLOCK_check_block (GDS_block_context, bd.type, pquery, bd.data, bd.data_size)) { GNUNET_break_op (0); return; } } if (GNUNET_YES == log_route_details_stderr) { char *tmp; char *pp; char *gp; gp = GNUNET_STRINGS_pp2s (get_path, get_path_length); pp = GNUNET_STRINGS_pp2s (bd.put_path, bd.put_path_length); tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n", GNUNET_h2s (&prm->key), GNUNET_i2s (peer->id), tmp, gp, pp); GNUNET_free (gp); GNUNET_free (pp); GNUNET_free (tmp); } /* if we got a HELLO, consider it for our own routing table */ if (GNUNET_BLOCK_TYPE_DHT_HELLO == bd.type) { const struct GNUNET_MessageHeader *h = bd.data; struct GNUNET_PeerIdentity pid; /* Should be a HELLO, validate and consider using it! */ if (bd.data_size < sizeof(struct GNUNET_HELLO_Message)) { GNUNET_break (0); return; } if (bd.data_size != ntohs (h->size)) { GNUNET_break (0); return; } if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid)) { GNUNET_break_op (0); return; } if ( (GNUNET_YES != disable_try_connect) && (0 != GNUNET_memcmp (&my_identity, &pid)) ) try_connect (&pid, h); } /* First, check if 'peer' is already on the path, and if so, truncate it instead of expanding. */ for (unsigned int i = 0; i <= get_path_length; i++) if (0 == GNUNET_memcmp (&get_path[i], peer->id)) { process_reply_with_path (&bd, &prm->key, i, get_path); return; } /* Need to append 'peer' to 'get_path' (normal case) */ { struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; GNUNET_memcpy (xget_path, get_path, get_path_length * sizeof(struct GNUNET_PeerIdentity)); xget_path[get_path_length] = *peer->id; process_reply_with_path (&bd, &prm->key, get_path_length + 1, xget_path); } } enum GNUNET_GenericReturnValue GDS_NEIGHBOURS_init () { struct GNUNET_MQ_MessageHandler core_handlers[] = { GNUNET_MQ_hd_var_size (dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, struct PeerGetMessage, NULL), GNUNET_MQ_hd_var_size (dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, struct PeerPutMessage, NULL), GNUNET_MQ_hd_var_size (dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, struct PeerResultMessage, NULL), GNUNET_MQ_handler_end () }; unsigned long long temp_config_num; disable_try_connect = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT"); if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size", &temp_config_num)) bucket_size = (unsigned int) temp_config_num; cache_results = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS"); log_route_details_stderr = (NULL != getenv ("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO; ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg); core_api = GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect, &handle_core_disconnect, core_handlers); if (NULL == core_api) return GNUNET_SYSERR; all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_NO); return GNUNET_OK; } void GDS_NEIGHBOURS_done () { if (NULL == core_api) return; GNUNET_CORE_disconnect (core_api); core_api = NULL; GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers); all_connected_peers = NULL; GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers, &free_connect_info, NULL); GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers); all_desired_peers = NULL; GNUNET_ATS_connectivity_done (ats_ch); ats_ch = NULL; GNUNET_assert (NULL == find_peer_task); } struct GNUNET_PeerIdentity * GDS_NEIGHBOURS_get_id () { return &my_identity; } /* end of gnunet-service-dht_neighbours.c */