From 7f57da285eb93330722ec6b6176c9187d355d03f Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Sat, 20 Dec 2014 15:57:31 +0000 Subject: Cleaned up --- src/rps/gnunet-service-rps.c | 424 +++++++++++++++++++++++++++---------------- 1 file changed, 272 insertions(+), 152 deletions(-) diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 7ee3d78cf..09ed5de48 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -42,12 +42,12 @@ // TODO align message structs -// TODO multipeerlist indep of gossiped list - // (TODO api -- possibility of getting weak random peer immideately) // TODO malicious peer +// TODO Change API to accept initialisation peers + /** * Our configuration. */ @@ -73,6 +73,28 @@ get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size); // It might be interesting to formulate this independent of PeerIDs. +/** + * Callback that is called when a new PeerID is inserted into a sampler. + * + * @param cls the closure given alongside this function. + * @param id the PeerID that is inserted + * @param hash the hash the sampler produced of the PeerID + */ +typedef void (* SAMPLER_insertCB) (void *cls, + const struct GNUNET_PeerIdentity *id, + struct GNUNET_HashCode hash); + +/** + * Callback that is called when a new PeerID is removed from a sampler. + * + * @param cls the closure given alongside this function. + * @param id the PeerID that is removed + * @param hash the hash the sampler produced of the PeerID + */ +typedef void (* SAMPLER_removeCB) (void *cls, + const struct GNUNET_PeerIdentity *id, + struct GNUNET_HashCode hash); + /** * A sampler sampling PeerIDs. */ @@ -123,6 +145,26 @@ struct Samplers */ struct GNUNET_PeerIdentity *peer_ids; + /** + * Callback to be called when a peer gets inserted into a sampler. + */ + SAMPLER_insertCB insertCB; + + /** + * Closure to the insertCB. + */ + void *insertCLS; + + /** + * Callback to be called when a peer gets inserted into a sampler. + */ + SAMPLER_removeCB removeCB; + + /** + * Closure to the removeCB. + */ + void *removeCLS; + /** * The head of the DLL. */ @@ -135,9 +177,6 @@ struct Samplers }; -// TODO change to updateCB and call on updates in general -typedef void (* SAMPLER_deleteCB) (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash); - /** * (Re)Initialise given Sampler with random min-wise independent function. * @@ -161,9 +200,7 @@ SAMPLER_init(struct GNUNET_PeerIdentity *id) GNUNET_assert(NULL != id); s->peer_id = id; - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _init()\n"); memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those. - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _init()\n"); //s->peer_id = own_identity; // Maybe set to own PeerID. So we always have // a valid PeerID in the sampler. // Maybe take a PeerID as second argument. @@ -210,8 +247,9 @@ peer_cmp(const struct GNUNET_PeerIdentity *id1, const struct GNUNET_PeerIdentity */ static void SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other, - SAMPLER_deleteCB del_cb, void *cb_cls) - // TODO set id in peer_ids + SAMPLER_insertCB insertCB, void *insertCLS, + SAMPLER_removeCB removeCB, void *removeCLS) + // TODO call update herein { struct GNUNET_HashCode other_hash; @@ -240,9 +278,12 @@ SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other, LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n", GNUNET_i2s(other)); memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _next()\n"); //s->peer_id = other; s->peer_id_hash = other_hash; + if (NULL != insertCB) + { + insertCB(insertCLS, s->peer_id, s->peer_id_hash); + } } else if ( 0 > hash_cmp(&other_hash, &s->peer_id_hash) ) { @@ -251,17 +292,23 @@ SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other, LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", GNUNET_i2s(s->peer_id)); - if ( NULL != del_cb ) + if ( NULL != removeCB ) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the delete callback.\n", + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n", GNUNET_i2s(s->peer_id)); - del_cb(cb_cls, s->peer_id, s->peer_id_hash); + removeCB(removeCLS, s->peer_id, s->peer_id_hash); } memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _next()\n"); //s->peer_id = other; s->peer_id_hash = other_hash; + + if ( NULL != insertCB ) + { + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n", + GNUNET_i2s(s->peer_id)); + insertCB(insertCLS, s->peer_id, s->peer_id_hash); + } } else { @@ -289,7 +336,7 @@ SAMPLER_samplers_resize (struct Samplers * samplers, { if ( samplers->size == new_size ) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n"); return; } @@ -299,12 +346,27 @@ SAMPLER_samplers_resize (struct Samplers * samplers, struct Sampler *tmp; old_size = samplers->size; - LOG(GNUNET_ERROR_TYPE_DEBUG, "Growing/Shrinking samplers %u -> %u\n", old_size, new_size); - GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_resize()\n"); - LOG(GNUNET_ERROR_TYPE_DEBUG, "samplers->peer_ids now points to %p\n", samplers->peer_ids); + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size); iter = samplers->head; + + if ( new_size < old_size ) + { + for ( i = new_size ; i < old_size ; i++ ) + {/* Remove unneeded rest */ + tmp = iter->next; + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i); + if (NULL != samplers->removeCB) + samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash); + GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter); + GNUNET_free(iter); + iter = tmp; + } + } + + GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size); + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids); + if ( new_size > old_size ) { /* Growing */ GNUNET_assert( NULL != fill_up_id ); @@ -313,8 +375,7 @@ SAMPLER_samplers_resize (struct Samplers * samplers, if ( i < old_size ) { /* Update old samplers */ iter->peer_id = &samplers->peer_ids[i]; - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _samplers_resize()\n"); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n", + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n", i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); iter = iter->next; } @@ -322,37 +383,32 @@ SAMPLER_samplers_resize (struct Samplers * samplers, { /* Add new samplers */ memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity)); iter = SAMPLER_init(&samplers->peer_ids[i]); + if (NULL != samplers->insertCB) + { + samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash); + } GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n", + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n", i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); } } } else// if ( new_size < old_size ) { /* Shrinking */ - for ( i = 0 ; i < old_size ; i++) + for ( i = 0 ; i < new_size ; i++) { /* All samplers */ tmp = iter->next; - if ( i < new_size ) - { /* Update remaining samplers */ - iter->peer_id = &samplers->peer_ids[i]; - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _samplers_resize()\n"); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n", - i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); - } - else - { /* Remove unneeded rest */ - LOG(GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX64 ". sampler\n", i); - // TODO call delCB on elem? - GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter); - GNUNET_free(iter); - } + /* Update remaining samplers */ + iter->peer_id = &samplers->peer_ids[i]; + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n", + i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); + iter = tmp; } } GNUNET_assert(samplers->size == new_size); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); + LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); } @@ -360,7 +416,9 @@ SAMPLER_samplers_resize (struct Samplers * samplers, * Initialise a tuple of samplers. */ struct Samplers * -SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id) +SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id, + SAMPLER_insertCB insertCB, void *insertCLS, + SAMPLER_removeCB removeCB, void *removeCLS) { struct Samplers *samplers; //struct Sampler *s; @@ -368,9 +426,12 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id) samplers = GNUNET_new(struct Samplers); samplers->size = 0; - samplers->head = samplers->tail = NULL; samplers->peer_ids = NULL; - LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_init()\n"); + samplers->insertCB = insertCB; + samplers->insertCLS = insertCLS; + samplers->removeCB = removeCB; + samplers->removeCLS = removeCLS; + samplers->head = samplers->tail = NULL; //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); SAMPLER_samplers_resize(samplers, init_size, id); @@ -380,7 +441,6 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id) // GNUNET_array_append(samplers->peer_ids, // samplers->size, // *id); - // LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_init()\n"); // s = SAMPLER_init(&samplers->peer_ids[i]); // GNUNET_CONTAINER_DLL_insert_tail(samplers->head, // samplers->tail, @@ -396,15 +456,16 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id) * A fuction to update every sampler in the given list */ static void -SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id, - SAMPLER_deleteCB del_cb, void *cb_cls) +SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id) { struct Sampler *iter; iter = samplers->head; while ( NULL != iter->next ) { - SAMPLER_next(iter, id, del_cb, cb_cls); + SAMPLER_next(iter, id, + samplers->insertCB, samplers->insertCLS, + samplers->removeCB, samplers->removeCLS); iter = iter->next; } @@ -468,7 +529,7 @@ SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n) * Counts how many Samplers currently hold a given PeerID. */ uint64_t -SAMPLER_count_id ( struct Samplers *samplers, struct GNUNET_PeerIdentity *id ) +SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id ) { struct Sampler *iter; uint64_t count; @@ -710,50 +771,110 @@ get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size) } /** - * Get the message queue of a specific peer. - * - * If we already have a message queue open to this client, - * simply return it, otherways create one. + * Make sure the context of a given peer exists. */ - struct GNUNET_MQ_Handle * -get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, struct GNUNET_PeerIdentity *peer_id) + void +touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer) { struct peer_context *ctx; - struct GNUNET_MQ_Handle * mq; - struct GNUNET_CADET_Channel *channel; - - if ( GNUNET_OK != GNUNET_CONTAINER_multipeermap_contains( peer_map, peer_id ) ) { - - channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id, - GNUNET_RPS_CADET_PORT, - GNUNET_CADET_OPTION_RELIABLE); - mq = GNUNET_CADET_mq_create(channel); + if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) ) + { + ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer); + } + else + { ctx = GNUNET_malloc(sizeof(struct peer_context)); ctx->in_flags = 0; - ctx->to_channel = channel; - ctx->mq = mq; + ctx->mq = NULL; + ctx->to_channel = NULL; + ctx->from_channel = NULL; + GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } +} - GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - } else { - ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer_id); - if ( NULL == ctx->mq ) { - if ( NULL == ctx->to_channel ) { - channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id, - GNUNET_RPS_CADET_PORT, - GNUNET_CADET_OPTION_RELIABLE); - ctx->to_channel = channel; - } +/** + * Get the context of a peer. If not existing, create. + */ + struct peer_context * +get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer) +{ + struct peer_context *ctx; - mq = GNUNET_CADET_mq_create(ctx->to_channel); - ctx->mq = mq; - } + touch_peer_ctx(peer_map, peer); + ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer); + return ctx; +} + +/** + * Get the channel of a peer. If not existing, create. + */ + void +touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer) +{ + struct peer_context *ctx; + + ctx = get_peer_ctx (peer_map, peer); + if (NULL == ctx->to_channel) + { + ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer, + GNUNET_RPS_CADET_PORT, + GNUNET_CADET_OPTION_RELIABLE); + //TODO do I have to explicitly put it in the peer_map? } +} - return ctx->mq; +/** + * Get the channel of a peer. If not existing, create. + */ + struct GNUNET_CADET_Channel * +get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer) +{ + struct peer_context *ctx; + + ctx = get_peer_ctx (peer_map, peer); + touch_channel(peer_map, peer); + return ctx->to_channel; } +/** + * Make sure the mq for a given peer exists. + * + * If we already have a message queue open to this client, + * simply return it, otherways create one. + */ + void +touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id) +{ + struct peer_context *ctx; + + ctx = get_peer_ctx(peer_map, peer_id); + if (NULL == ctx->mq) + { + touch_channel(peer_map, peer_id); + ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel); + //TODO do I have to explicitly put it in the peer_map? + //GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx, + // GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } +} + +/** + * Get the message queue of a specific peer. + * + * If we already have a message queue open to this client, + * simply return it, otherways create one. + */ + struct GNUNET_MQ_Handle * +get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id) +{ + struct peer_context *ctx; + + ctx = get_peer_ctx(peer_map, peer_id); + touch_mq(peer_map, peer_id); + + return ctx->mq; +} /*********************************************************************** * /Util functions @@ -819,29 +940,21 @@ handle_cs_request (void *cls, GNUNET_SERVER_client_set_user_context(client, cli_ctx); } - //mq = GNUNET_MQ_queue_for_server_client(client); - // TODO How many peers do we give back? // Wait until we have enough random peers? ev = GNUNET_MQ_msg_extra(out_msg, GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity), GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); - out_msg->num_peers = GNUNET_ntohll(msg->num_peers); + out_msg->num_peers = msg->num_peers; // No conversion between network and host order num_peers = GNUNET_ntohll(msg->num_peers); //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers); memcpy(&out_msg[1], SAMPLER_get_n_rand_peers(sampler_list, num_peers), num_peers * sizeof(struct GNUNET_PeerIdentity)); - //for ( i = 0 ; i < num_peers ; i++ ) { - // memcpy(&out_msg[1] + i * sizeof(struct GNUNET_PeerIdentity), - // SAMPLER_get_rand_peer(sampler_list), - // sizeof(struct GNUNET_PeerIdentity)); - //} GNUNET_MQ_send(cli_ctx->mq, ev); - //GNUNET_MQ_send(mq, ev); //GNUNET_MQ_destroy(mq); GNUNET_SERVER_receive_done (client, @@ -972,22 +1085,6 @@ handle_peer_pull_reply (void *cls, } -/** - * Callback called when a Sampler is updated. - */ - void -delete_cb (void *cls, struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash) -{ - size_t s; - - s = SAMPLER_count_id(sampler_list, id); - if ( 1 >= s ) { - // TODO cleanup peer - GNUNET_CONTAINER_multipeermap_remove_all( peer_map, id); - } -} - - /** * Send out PUSHes and PULLs. * @@ -1010,15 +1107,11 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* If the NSE has changed adapt the lists accordingly */ - // TODO check nse == 0! - LOG(GNUNET_ERROR_TYPE_DEBUG, "Checking size estimate.\n"); if ( sampler_list->size != est_size ) SAMPLER_samplers_resize(sampler_list, est_size, own_identity); GNUNET_array_grow(gossip_list, gossip_list_size, est_size); - gossip_list_size = sampler_list->size = est_size; - /* Would it make sense to have one shuffeled gossip list and then * to send PUSHes to first alpha peers, PULL requests to next beta peers and @@ -1063,21 +1156,21 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } - - /* Update gossip list */ uint64_t r_index; if ( push_list_size <= alpha * gossip_list_size && push_list_size != 0 && - pull_list_size != 0 ) { + pull_list_size != 0 ) + { LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n"); uint64_t first_border; uint64_t second_border; first_border = round(alpha * gossip_list_size); - for ( i = 0 ; i < first_border ; i++ ) { // TODO use SAMPLER_get_n_rand_peers + for ( i = 0 ; i < first_border ; i++ ) + { // TODO use SAMPLER_get_n_rand_peers /* Update gossip list with peers received through PUSHes */ r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, push_list_size); @@ -1086,7 +1179,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } second_border = first_border + round(beta * gossip_list_size); - for ( i = first_border ; i < second_border ; i++ ) { + for ( i = first_border ; i < second_border ; i++ ) + { /* Update gossip list with peers received through PULLs */ r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, pull_list_size); @@ -1094,7 +1188,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) // TODO change the in_flags accordingly } - for ( i = second_border ; i < gossip_list_size ; i++ ) { + for ( i = second_border ; i < gossip_list_size ; i++ ) + { /* Update gossip list with peers from history */ r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, sampler_list->size); @@ -1102,7 +1197,9 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) // TODO change the in_flags accordingly } - } else { + } + else + { LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n"); } // TODO independent of that also get some peers from CADET_get_peers()? @@ -1112,35 +1209,66 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for ( i = 0 ; i < push_list_size ; i++ ) { - SAMPLER_update_list(sampler_list, &push_list[i], NULL, NULL); + SAMPLER_update_list(sampler_list, &push_list[i]); // TODO set in_flag? } for ( i = 0 ; i < pull_list_size ; i++ ) { - SAMPLER_update_list(sampler_list, &pull_list[i], NULL, NULL); + SAMPLER_update_list(sampler_list, &pull_list[i]); // TODO set in_flag? } - // TODO go over whole peer_map and do cleanups - // delete unneeded peers, set in_flags, check channel/mq - // -- already done with deleteCB? - - /* Empty push/pull lists */ GNUNET_array_grow(push_list, push_list_size, 0); - push_list_size = 0; // TODO I guess that's not necessary but doesn't hurt + push_list_size = 0; // I guess that's not necessary but doesn't hurt GNUNET_array_grow(pull_list, pull_list_size, 0); - pull_list_size = 0; // TODO I guess that's not necessary but doesn't hurt + pull_list_size = 0; // I guess that's not necessary but doesn't hurt /* Schedule next round */ - // TODO do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL ); LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); } +/** + * Open a connection to given peer and store channel and mq. + */ + void +insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash) +{ + touch_mq(peer_map, id); +} + +/** + * Close the connection to given peer and delete channel and mq. + */ + void +removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash) +{ + size_t s; + struct peer_context *ctx; + + s = SAMPLER_count_id(sampler_list, id); + if ( 1 >= s ) { + if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id)) + { + ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id); + if (NULL != ctx->to_channel) + { + if (NULL != ctx->mq) + { + GNUNET_MQ_destroy(ctx->mq); + } + GNUNET_CADET_channel_destroy(ctx->to_channel); + } + // TODO cleanup peer + GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id); + } + } +} + static void rps_start (struct GNUNET_SERVER_Handle *server); @@ -1158,28 +1286,19 @@ init_peer_cb (void *cls, unsigned int best_path) // "How long is the best path? // (0 = unknown, 1 = ourselves, 2 = neighbor)" { - if ( NULL != peer ) { + if ( NULL != peer ) + { LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer); - SAMPLER_update_list(sampler_list, peer, NULL, NULL); - // TODO put the following part in a function of its own. - if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) ) { - ; - } else { - struct peer_context *ctx; - - ctx = GNUNET_malloc(sizeof(struct peer_context)); - ctx->in_flags = 0; - ctx->mq = NULL; - ctx->to_channel = NULL; - ctx->from_channel = NULL; - GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - } + SAMPLER_update_list(sampler_list, peer); + touch_peer_ctx(peer_map, peer); uint64_t i; i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size); gossip_list[i] = *peer; // TODO send push/pull to each of those peers? - } else { + } + else + { rps_start( (struct GNUNET_SERVER_Handle *) cls); } } @@ -1251,7 +1370,7 @@ handle_inbound_channel (void *cls, GNUNET_assert( NULL != channel ); - // TODO we might even not store the from_channel + // TODO we might not even store the from_channel if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) { ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel; @@ -1282,6 +1401,8 @@ cleanup_channel(void *cls, void *channel_ctx) { LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n"); + // TODO test whether that was a peer in the samplers/a peer we opened a connection to + // and if so, reinitialise the sampler } /** @@ -1405,17 +1526,6 @@ run (void *cls, peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO); - /* Initialise sampler and gossip list */ - - sampler_list = SAMPLER_samplers_init(est_size, own_identity); - - push_list = NULL; - //GNUNET_array_grow(push_list, push_list_size, 0); - push_list_size = 0; - pull_list = NULL; - //GNUNET_array_grow(pull_list, pull_list_size, 0); - pull_list_size = 0; - static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { {&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0}, {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0}, @@ -1433,6 +1543,16 @@ run (void *cls, LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n"); + /* Initialise sampler and gossip list */ + + sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL); + + push_list = NULL; + push_list_size = 0; + pull_list = NULL; + pull_list_size = 0; + + LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server); // FIXME use magic 0000 PeerID to _start_ the service -- cgit v1.2.3