aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2014-12-20 15:57:31 +0000
committerJulius Bünger <buenger@mytum.de>2014-12-20 15:57:31 +0000
commit7f57da285eb93330722ec6b6176c9187d355d03f (patch)
tree36906a0d0929aa5d4388e637ee1568401bee2a9e /src
parent49b1ddee6e5e6f4043d7a084705e0a115ee64404 (diff)
downloadgnunet-7f57da285eb93330722ec6b6176c9187d355d03f.tar.gz
gnunet-7f57da285eb93330722ec6b6176c9187d355d03f.zip
Cleaned up
Diffstat (limited to 'src')
-rw-r--r--src/rps/gnunet-service-rps.c424
1 files changed, 272 insertions, 152 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 7ee3d78cf..09ed5de48 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -42,12 +42,12 @@
42 42
43// TODO align message structs 43// TODO align message structs
44 44
45// TODO multipeerlist indep of gossiped list
46
47// (TODO api -- possibility of getting weak random peer immideately) 45// (TODO api -- possibility of getting weak random peer immideately)
48 46
49// TODO malicious peer 47// TODO malicious peer
50 48
49// TODO Change API to accept initialisation peers
50
51/** 51/**
52 * Our configuration. 52 * Our configuration.
53 */ 53 */
@@ -74,6 +74,28 @@ get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
74// It might be interesting to formulate this independent of PeerIDs. 74// It might be interesting to formulate this independent of PeerIDs.
75 75
76/** 76/**
77 * Callback that is called when a new PeerID is inserted into a sampler.
78 *
79 * @param cls the closure given alongside this function.
80 * @param id the PeerID that is inserted
81 * @param hash the hash the sampler produced of the PeerID
82 */
83typedef void (* SAMPLER_insertCB) (void *cls,
84 const struct GNUNET_PeerIdentity *id,
85 struct GNUNET_HashCode hash);
86
87/**
88 * Callback that is called when a new PeerID is removed from a sampler.
89 *
90 * @param cls the closure given alongside this function.
91 * @param id the PeerID that is removed
92 * @param hash the hash the sampler produced of the PeerID
93 */
94typedef void (* SAMPLER_removeCB) (void *cls,
95 const struct GNUNET_PeerIdentity *id,
96 struct GNUNET_HashCode hash);
97
98/**
77 * A sampler sampling PeerIDs. 99 * A sampler sampling PeerIDs.
78 */ 100 */
79struct Sampler 101struct Sampler
@@ -124,6 +146,26 @@ struct Samplers
124 struct GNUNET_PeerIdentity *peer_ids; 146 struct GNUNET_PeerIdentity *peer_ids;
125 147
126 /** 148 /**
149 * Callback to be called when a peer gets inserted into a sampler.
150 */
151 SAMPLER_insertCB insertCB;
152
153 /**
154 * Closure to the insertCB.
155 */
156 void *insertCLS;
157
158 /**
159 * Callback to be called when a peer gets inserted into a sampler.
160 */
161 SAMPLER_removeCB removeCB;
162
163 /**
164 * Closure to the removeCB.
165 */
166 void *removeCLS;
167
168 /**
127 * The head of the DLL. 169 * The head of the DLL.
128 */ 170 */
129 struct Sampler *head; 171 struct Sampler *head;
@@ -135,9 +177,6 @@ struct Samplers
135 177
136}; 178};
137 179
138// TODO change to updateCB and call on updates in general
139typedef void (* SAMPLER_deleteCB) (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash);
140
141/** 180/**
142 * (Re)Initialise given Sampler with random min-wise independent function. 181 * (Re)Initialise given Sampler with random min-wise independent function.
143 * 182 *
@@ -161,9 +200,7 @@ SAMPLER_init(struct GNUNET_PeerIdentity *id)
161 200
162 GNUNET_assert(NULL != id); 201 GNUNET_assert(NULL != id);
163 s->peer_id = id; 202 s->peer_id = id;
164 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _init()\n");
165 memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those. 203 memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
166 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _init()\n");
167 //s->peer_id = own_identity; // Maybe set to own PeerID. So we always have 204 //s->peer_id = own_identity; // Maybe set to own PeerID. So we always have
168 // a valid PeerID in the sampler. 205 // a valid PeerID in the sampler.
169 // Maybe take a PeerID as second argument. 206 // Maybe take a PeerID as second argument.
@@ -210,8 +247,9 @@ peer_cmp(const struct GNUNET_PeerIdentity *id1, const struct GNUNET_PeerIdentity
210 */ 247 */
211 static void 248 static void
212SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other, 249SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
213 SAMPLER_deleteCB del_cb, void *cb_cls) 250 SAMPLER_insertCB insertCB, void *insertCLS,
214 // TODO set id in peer_ids 251 SAMPLER_removeCB removeCB, void *removeCLS)
252 // TODO call update herein
215{ 253{
216 struct GNUNET_HashCode other_hash; 254 struct GNUNET_HashCode other_hash;
217 255
@@ -240,9 +278,12 @@ SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
240 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n", 278 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
241 GNUNET_i2s(other)); 279 GNUNET_i2s(other));
242 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); 280 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
243 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _next()\n");
244 //s->peer_id = other; 281 //s->peer_id = other;
245 s->peer_id_hash = other_hash; 282 s->peer_id_hash = other_hash;
283 if (NULL != insertCB)
284 {
285 insertCB(insertCLS, s->peer_id, s->peer_id_hash);
286 }
246 } 287 }
247 else if ( 0 > hash_cmp(&other_hash, &s->peer_id_hash) ) 288 else if ( 0 > hash_cmp(&other_hash, &s->peer_id_hash) )
248 { 289 {
@@ -251,17 +292,23 @@ SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
251 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", 292 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
252 GNUNET_i2s(s->peer_id)); 293 GNUNET_i2s(s->peer_id));
253 294
254 if ( NULL != del_cb ) 295 if ( NULL != removeCB )
255 { 296 {
256 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the delete callback.\n", 297 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
257 GNUNET_i2s(s->peer_id)); 298 GNUNET_i2s(s->peer_id));
258 del_cb(cb_cls, s->peer_id, s->peer_id_hash); 299 removeCB(removeCLS, s->peer_id, s->peer_id_hash);
259 } 300 }
260 301
261 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); 302 memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
262 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in _next()\n");
263 //s->peer_id = other; 303 //s->peer_id = other;
264 s->peer_id_hash = other_hash; 304 s->peer_id_hash = other_hash;
305
306 if ( NULL != insertCB )
307 {
308 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
309 GNUNET_i2s(s->peer_id));
310 insertCB(insertCLS, s->peer_id, s->peer_id_hash);
311 }
265 } 312 }
266 else 313 else
267 { 314 {
@@ -289,7 +336,7 @@ SAMPLER_samplers_resize (struct Samplers * samplers,
289{ 336{
290 if ( samplers->size == new_size ) 337 if ( samplers->size == new_size )
291 { 338 {
292 LOG(GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); 339 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
293 return; 340 return;
294 } 341 }
295 342
@@ -299,12 +346,27 @@ SAMPLER_samplers_resize (struct Samplers * samplers,
299 struct Sampler *tmp; 346 struct Sampler *tmp;
300 347
301 old_size = samplers->size; 348 old_size = samplers->size;
302 LOG(GNUNET_ERROR_TYPE_DEBUG, "Growing/Shrinking samplers %u -> %u\n", old_size, new_size); 349 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
303 GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
304 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_resize()\n");
305 LOG(GNUNET_ERROR_TYPE_DEBUG, "samplers->peer_ids now points to %p\n", samplers->peer_ids);
306 350
307 iter = samplers->head; 351 iter = samplers->head;
352
353 if ( new_size < old_size )
354 {
355 for ( i = new_size ; i < old_size ; i++ )
356 {/* Remove unneeded rest */
357 tmp = iter->next;
358 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
359 if (NULL != samplers->removeCB)
360 samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
361 GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
362 GNUNET_free(iter);
363 iter = tmp;
364 }
365 }
366
367 GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
368 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
369
308 if ( new_size > old_size ) 370 if ( new_size > old_size )
309 { /* Growing */ 371 { /* Growing */
310 GNUNET_assert( NULL != fill_up_id ); 372 GNUNET_assert( NULL != fill_up_id );
@@ -313,8 +375,7 @@ SAMPLER_samplers_resize (struct Samplers * samplers,
313 if ( i < old_size ) 375 if ( i < old_size )
314 { /* Update old samplers */ 376 { /* Update old samplers */
315 iter->peer_id = &samplers->peer_ids[i]; 377 iter->peer_id = &samplers->peer_ids[i];
316 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _samplers_resize()\n"); 378 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
317 LOG(GNUNET_ERROR_TYPE_DEBUG, "Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
318 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); 379 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
319 iter = iter->next; 380 iter = iter->next;
320 } 381 }
@@ -322,37 +383,32 @@ SAMPLER_samplers_resize (struct Samplers * samplers,
322 { /* Add new samplers */ 383 { /* Add new samplers */
323 memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity)); 384 memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
324 iter = SAMPLER_init(&samplers->peer_ids[i]); 385 iter = SAMPLER_init(&samplers->peer_ids[i]);
386 if (NULL != samplers->insertCB)
387 {
388 samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
389 }
325 GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter); 390 GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
326 LOG(GNUNET_ERROR_TYPE_DEBUG, "Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n", 391 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
327 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id)); 392 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
328 } 393 }
329 } 394 }
330 } 395 }
331 else// if ( new_size < old_size ) 396 else// if ( new_size < old_size )
332 { /* Shrinking */ 397 { /* Shrinking */
333 for ( i = 0 ; i < old_size ; i++) 398 for ( i = 0 ; i < new_size ; i++)
334 { /* All samplers */ 399 { /* All samplers */
335 tmp = iter->next; 400 tmp = iter->next;
336 if ( i < new_size ) 401 /* Update remaining samplers */
337 { /* Update remaining samplers */ 402 iter->peer_id = &samplers->peer_ids[i];
338 iter->peer_id = &samplers->peer_ids[i]; 403 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
339 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _samplers_resize()\n"); 404 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
340 LOG(GNUNET_ERROR_TYPE_DEBUG, "Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n", 405
341 i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
342 }
343 else
344 { /* Remove unneeded rest */
345 LOG(GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX64 ". sampler\n", i);
346 // TODO call delCB on elem?
347 GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
348 GNUNET_free(iter);
349 }
350 iter = tmp; 406 iter = tmp;
351 } 407 }
352 } 408 }
353 409
354 GNUNET_assert(samplers->size == new_size); 410 GNUNET_assert(samplers->size == new_size);
355 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); 411 LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
356} 412}
357 413
358 414
@@ -360,7 +416,9 @@ SAMPLER_samplers_resize (struct Samplers * samplers,
360 * Initialise a tuple of samplers. 416 * Initialise a tuple of samplers.
361 */ 417 */
362struct Samplers * 418struct Samplers *
363SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id) 419SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
420 SAMPLER_insertCB insertCB, void *insertCLS,
421 SAMPLER_removeCB removeCB, void *removeCLS)
364{ 422{
365 struct Samplers *samplers; 423 struct Samplers *samplers;
366 //struct Sampler *s; 424 //struct Sampler *s;
@@ -368,9 +426,12 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id)
368 426
369 samplers = GNUNET_new(struct Samplers); 427 samplers = GNUNET_new(struct Samplers);
370 samplers->size = 0; 428 samplers->size = 0;
371 samplers->head = samplers->tail = NULL;
372 samplers->peer_ids = NULL; 429 samplers->peer_ids = NULL;
373 LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_init()\n"); 430 samplers->insertCB = insertCB;
431 samplers->insertCLS = insertCLS;
432 samplers->removeCB = removeCB;
433 samplers->removeCLS = removeCLS;
434 samplers->head = samplers->tail = NULL;
374 //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); 435 //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
375 436
376 SAMPLER_samplers_resize(samplers, init_size, id); 437 SAMPLER_samplers_resize(samplers, init_size, id);
@@ -380,7 +441,6 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id)
380 // GNUNET_array_append(samplers->peer_ids, 441 // GNUNET_array_append(samplers->peer_ids,
381 // samplers->size, 442 // samplers->size,
382 // *id); 443 // *id);
383 // LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in _samplers_init()\n");
384 // s = SAMPLER_init(&samplers->peer_ids[i]); 444 // s = SAMPLER_init(&samplers->peer_ids[i]);
385 // GNUNET_CONTAINER_DLL_insert_tail(samplers->head, 445 // GNUNET_CONTAINER_DLL_insert_tail(samplers->head,
386 // samplers->tail, 446 // samplers->tail,
@@ -396,15 +456,16 @@ SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id)
396 * A fuction to update every sampler in the given list 456 * A fuction to update every sampler in the given list
397 */ 457 */
398 static void 458 static void
399SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id, 459SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
400 SAMPLER_deleteCB del_cb, void *cb_cls)
401{ 460{
402 struct Sampler *iter; 461 struct Sampler *iter;
403 462
404 iter = samplers->head; 463 iter = samplers->head;
405 while ( NULL != iter->next ) 464 while ( NULL != iter->next )
406 { 465 {
407 SAMPLER_next(iter, id, del_cb, cb_cls); 466 SAMPLER_next(iter, id,
467 samplers->insertCB, samplers->insertCLS,
468 samplers->removeCB, samplers->removeCLS);
408 iter = iter->next; 469 iter = iter->next;
409 } 470 }
410 471
@@ -468,7 +529,7 @@ SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
468 * Counts how many Samplers currently hold a given PeerID. 529 * Counts how many Samplers currently hold a given PeerID.
469 */ 530 */
470 uint64_t 531 uint64_t
471SAMPLER_count_id ( struct Samplers *samplers, struct GNUNET_PeerIdentity *id ) 532SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
472{ 533{
473 struct Sampler *iter; 534 struct Sampler *iter;
474 uint64_t count; 535 uint64_t count;
@@ -710,50 +771,110 @@ get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size)
710} 771}
711 772
712/** 773/**
713 * Get the message queue of a specific peer. 774 * Make sure the context of a given peer exists.
714 *
715 * If we already have a message queue open to this client,
716 * simply return it, otherways create one.
717 */ 775 */
718 struct GNUNET_MQ_Handle * 776 void
719get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, struct GNUNET_PeerIdentity *peer_id) 777touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
720{ 778{
721 struct peer_context *ctx; 779 struct peer_context *ctx;
722 struct GNUNET_MQ_Handle * mq;
723 struct GNUNET_CADET_Channel *channel;
724
725 if ( GNUNET_OK != GNUNET_CONTAINER_multipeermap_contains( peer_map, peer_id ) ) {
726
727 channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id,
728 GNUNET_RPS_CADET_PORT,
729 GNUNET_CADET_OPTION_RELIABLE);
730 mq = GNUNET_CADET_mq_create(channel);
731 780
781 if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
782 {
783 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
784 }
785 else
786 {
732 ctx = GNUNET_malloc(sizeof(struct peer_context)); 787 ctx = GNUNET_malloc(sizeof(struct peer_context));
733 ctx->in_flags = 0; 788 ctx->in_flags = 0;
734 ctx->to_channel = channel; 789 ctx->mq = NULL;
735 ctx->mq = mq; 790 ctx->to_channel = NULL;
791 ctx->from_channel = NULL;
792 GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
793 }
794}
736 795
737 GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx, 796/**
738 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 797 * Get the context of a peer. If not existing, create.
739 } else { 798 */
740 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer_id); 799 struct peer_context *
741 if ( NULL == ctx->mq ) { 800get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
742 if ( NULL == ctx->to_channel ) { 801{
743 channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id, 802 struct peer_context *ctx;
744 GNUNET_RPS_CADET_PORT,
745 GNUNET_CADET_OPTION_RELIABLE);
746 ctx->to_channel = channel;
747 }
748 803
749 mq = GNUNET_CADET_mq_create(ctx->to_channel); 804 touch_peer_ctx(peer_map, peer);
750 ctx->mq = mq; 805 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
751 } 806 return ctx;
807}
808
809/**
810 * Get the channel of a peer. If not existing, create.
811 */
812 void
813touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
814{
815 struct peer_context *ctx;
816
817 ctx = get_peer_ctx (peer_map, peer);
818 if (NULL == ctx->to_channel)
819 {
820 ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
821 GNUNET_RPS_CADET_PORT,
822 GNUNET_CADET_OPTION_RELIABLE);
823 //TODO do I have to explicitly put it in the peer_map?
752 } 824 }
825}
753 826
754 return ctx->mq; 827/**
828 * Get the channel of a peer. If not existing, create.
829 */
830 struct GNUNET_CADET_Channel *
831get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
832{
833 struct peer_context *ctx;
834
835 ctx = get_peer_ctx (peer_map, peer);
836 touch_channel(peer_map, peer);
837 return ctx->to_channel;
755} 838}
756 839
840/**
841 * Make sure the mq for a given peer exists.
842 *
843 * If we already have a message queue open to this client,
844 * simply return it, otherways create one.
845 */
846 void
847touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
848{
849 struct peer_context *ctx;
850
851 ctx = get_peer_ctx(peer_map, peer_id);
852 if (NULL == ctx->mq)
853 {
854 touch_channel(peer_map, peer_id);
855 ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
856 //TODO do I have to explicitly put it in the peer_map?
857 //GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
858 // GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
859 }
860}
861
862/**
863 * Get the message queue of a specific peer.
864 *
865 * If we already have a message queue open to this client,
866 * simply return it, otherways create one.
867 */
868 struct GNUNET_MQ_Handle *
869get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
870{
871 struct peer_context *ctx;
872
873 ctx = get_peer_ctx(peer_map, peer_id);
874 touch_mq(peer_map, peer_id);
875
876 return ctx->mq;
877}
757 878
758/*********************************************************************** 879/***********************************************************************
759 * /Util functions 880 * /Util functions
@@ -819,29 +940,21 @@ handle_cs_request (void *cls,
819 GNUNET_SERVER_client_set_user_context(client, cli_ctx); 940 GNUNET_SERVER_client_set_user_context(client, cli_ctx);
820 } 941 }
821 942
822 //mq = GNUNET_MQ_queue_for_server_client(client);
823
824 // TODO How many peers do we give back? 943 // TODO How many peers do we give back?
825 // Wait until we have enough random peers? 944 // Wait until we have enough random peers?
826 945
827 ev = GNUNET_MQ_msg_extra(out_msg, 946 ev = GNUNET_MQ_msg_extra(out_msg,
828 GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity), 947 GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity),
829 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY); 948 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
830 out_msg->num_peers = GNUNET_ntohll(msg->num_peers); 949 out_msg->num_peers = msg->num_peers; // No conversion between network and host order
831 950
832 num_peers = GNUNET_ntohll(msg->num_peers); 951 num_peers = GNUNET_ntohll(msg->num_peers);
833 //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers); 952 //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
834 memcpy(&out_msg[1], 953 memcpy(&out_msg[1],
835 SAMPLER_get_n_rand_peers(sampler_list, num_peers), 954 SAMPLER_get_n_rand_peers(sampler_list, num_peers),
836 num_peers * sizeof(struct GNUNET_PeerIdentity)); 955 num_peers * sizeof(struct GNUNET_PeerIdentity));
837 //for ( i = 0 ; i < num_peers ; i++ ) {
838 // memcpy(&out_msg[1] + i * sizeof(struct GNUNET_PeerIdentity),
839 // SAMPLER_get_rand_peer(sampler_list),
840 // sizeof(struct GNUNET_PeerIdentity));
841 //}
842 956
843 GNUNET_MQ_send(cli_ctx->mq, ev); 957 GNUNET_MQ_send(cli_ctx->mq, ev);
844 //GNUNET_MQ_send(mq, ev);
845 //GNUNET_MQ_destroy(mq); 958 //GNUNET_MQ_destroy(mq);
846 959
847 GNUNET_SERVER_receive_done (client, 960 GNUNET_SERVER_receive_done (client,
@@ -973,22 +1086,6 @@ handle_peer_pull_reply (void *cls,
973 1086
974 1087
975/** 1088/**
976 * Callback called when a Sampler is updated.
977 */
978 void
979delete_cb (void *cls, struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
980{
981 size_t s;
982
983 s = SAMPLER_count_id(sampler_list, id);
984 if ( 1 >= s ) {
985 // TODO cleanup peer
986 GNUNET_CONTAINER_multipeermap_remove_all( peer_map, id);
987 }
988}
989
990
991/**
992 * Send out PUSHes and PULLs. 1089 * Send out PUSHes and PULLs.
993 * 1090 *
994 * This is executed regylary. 1091 * This is executed regylary.
@@ -1010,15 +1107,11 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1010 1107
1011 1108
1012 /* If the NSE has changed adapt the lists accordingly */ 1109 /* If the NSE has changed adapt the lists accordingly */
1013 // TODO check nse == 0!
1014 LOG(GNUNET_ERROR_TYPE_DEBUG, "Checking size estimate.\n");
1015 if ( sampler_list->size != est_size ) 1110 if ( sampler_list->size != est_size )
1016 SAMPLER_samplers_resize(sampler_list, est_size, own_identity); 1111 SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1017 1112
1018 GNUNET_array_grow(gossip_list, gossip_list_size, est_size); 1113 GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1019 1114
1020 gossip_list_size = sampler_list->size = est_size;
1021
1022 1115
1023 /* Would it make sense to have one shuffeled gossip list and then 1116 /* Would it make sense to have one shuffeled gossip list and then
1024 * to send PUSHes to first alpha peers, PULL requests to next beta peers and 1117 * to send PUSHes to first alpha peers, PULL requests to next beta peers and
@@ -1063,21 +1156,21 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1063 } 1156 }
1064 1157
1065 1158
1066
1067
1068 /* Update gossip list */ 1159 /* Update gossip list */
1069 uint64_t r_index; 1160 uint64_t r_index;
1070 1161
1071 if ( push_list_size <= alpha * gossip_list_size && 1162 if ( push_list_size <= alpha * gossip_list_size &&
1072 push_list_size != 0 && 1163 push_list_size != 0 &&
1073 pull_list_size != 0 ) { 1164 pull_list_size != 0 )
1165 {
1074 LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n"); 1166 LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
1075 1167
1076 uint64_t first_border; 1168 uint64_t first_border;
1077 uint64_t second_border; 1169 uint64_t second_border;
1078 1170
1079 first_border = round(alpha * gossip_list_size); 1171 first_border = round(alpha * gossip_list_size);
1080 for ( i = 0 ; i < first_border ; i++ ) { // TODO use SAMPLER_get_n_rand_peers 1172 for ( i = 0 ; i < first_border ; i++ )
1173 { // TODO use SAMPLER_get_n_rand_peers
1081 /* Update gossip list with peers received through PUSHes */ 1174 /* Update gossip list with peers received through PUSHes */
1082 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 1175 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1083 push_list_size); 1176 push_list_size);
@@ -1086,7 +1179,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1086 } 1179 }
1087 1180
1088 second_border = first_border + round(beta * gossip_list_size); 1181 second_border = first_border + round(beta * gossip_list_size);
1089 for ( i = first_border ; i < second_border ; i++ ) { 1182 for ( i = first_border ; i < second_border ; i++ )
1183 {
1090 /* Update gossip list with peers received through PULLs */ 1184 /* Update gossip list with peers received through PULLs */
1091 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 1185 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1092 pull_list_size); 1186 pull_list_size);
@@ -1094,7 +1188,8 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1094 // TODO change the in_flags accordingly 1188 // TODO change the in_flags accordingly
1095 } 1189 }
1096 1190
1097 for ( i = second_border ; i < gossip_list_size ; i++ ) { 1191 for ( i = second_border ; i < gossip_list_size ; i++ )
1192 {
1098 /* Update gossip list with peers from history */ 1193 /* Update gossip list with peers from history */
1099 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 1194 r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1100 sampler_list->size); 1195 sampler_list->size);
@@ -1102,7 +1197,9 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1102 // TODO change the in_flags accordingly 1197 // TODO change the in_flags accordingly
1103 } 1198 }
1104 1199
1105 } else { 1200 }
1201 else
1202 {
1106 LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n"); 1203 LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
1107 } 1204 }
1108 // TODO independent of that also get some peers from CADET_get_peers()? 1205 // TODO independent of that also get some peers from CADET_get_peers()?
@@ -1112,35 +1209,66 @@ do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1112 1209
1113 for ( i = 0 ; i < push_list_size ; i++ ) 1210 for ( i = 0 ; i < push_list_size ; i++ )
1114 { 1211 {
1115 SAMPLER_update_list(sampler_list, &push_list[i], NULL, NULL); 1212 SAMPLER_update_list(sampler_list, &push_list[i]);
1116 // TODO set in_flag? 1213 // TODO set in_flag?
1117 } 1214 }
1118 1215
1119 for ( i = 0 ; i < pull_list_size ; i++ ) 1216 for ( i = 0 ; i < pull_list_size ; i++ )
1120 { 1217 {
1121 SAMPLER_update_list(sampler_list, &pull_list[i], NULL, NULL); 1218 SAMPLER_update_list(sampler_list, &pull_list[i]);
1122 // TODO set in_flag? 1219 // TODO set in_flag?
1123 } 1220 }
1124 1221
1125 1222
1126 // TODO go over whole peer_map and do cleanups
1127 // delete unneeded peers, set in_flags, check channel/mq
1128 // -- already done with deleteCB?
1129
1130
1131 /* Empty push/pull lists */ 1223 /* Empty push/pull lists */
1132 GNUNET_array_grow(push_list, push_list_size, 0); 1224 GNUNET_array_grow(push_list, push_list_size, 0);
1133 push_list_size = 0; // TODO I guess that's not necessary but doesn't hurt 1225 push_list_size = 0; // I guess that's not necessary but doesn't hurt
1134 GNUNET_array_grow(pull_list, pull_list_size, 0); 1226 GNUNET_array_grow(pull_list, pull_list_size, 0);
1135 pull_list_size = 0; // TODO I guess that's not necessary but doesn't hurt 1227 pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1136 1228
1137 1229
1138 /* Schedule next round */ 1230 /* Schedule next round */
1139 // TODO
1140 do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL ); 1231 do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL );
1141 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 1232 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1142} 1233}
1143 1234
1235/**
1236 * Open a connection to given peer and store channel and mq.
1237 */
1238 void
1239insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1240{
1241 touch_mq(peer_map, id);
1242}
1243
1244/**
1245 * Close the connection to given peer and delete channel and mq.
1246 */
1247 void
1248removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1249{
1250 size_t s;
1251 struct peer_context *ctx;
1252
1253 s = SAMPLER_count_id(sampler_list, id);
1254 if ( 1 >= s ) {
1255 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
1256 {
1257 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
1258 if (NULL != ctx->to_channel)
1259 {
1260 if (NULL != ctx->mq)
1261 {
1262 GNUNET_MQ_destroy(ctx->mq);
1263 }
1264 GNUNET_CADET_channel_destroy(ctx->to_channel);
1265 }
1266 // TODO cleanup peer
1267 GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
1268 }
1269 }
1270}
1271
1144static void 1272static void
1145rps_start (struct GNUNET_SERVER_Handle *server); 1273rps_start (struct GNUNET_SERVER_Handle *server);
1146 1274
@@ -1158,28 +1286,19 @@ init_peer_cb (void *cls,
1158 unsigned int best_path) // "How long is the best path? 1286 unsigned int best_path) // "How long is the best path?
1159 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 1287 // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1160{ 1288{
1161 if ( NULL != peer ) { 1289 if ( NULL != peer )
1290 {
1162 LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer); 1291 LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1163 SAMPLER_update_list(sampler_list, peer, NULL, NULL); 1292 SAMPLER_update_list(sampler_list, peer);
1164 // TODO put the following part in a function of its own. 1293 touch_peer_ctx(peer_map, peer);
1165 if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) ) {
1166 ;
1167 } else {
1168 struct peer_context *ctx;
1169
1170 ctx = GNUNET_malloc(sizeof(struct peer_context));
1171 ctx->in_flags = 0;
1172 ctx->mq = NULL;
1173 ctx->to_channel = NULL;
1174 ctx->from_channel = NULL;
1175 GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1176 }
1177 1294
1178 uint64_t i; 1295 uint64_t i;
1179 i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size); 1296 i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size);
1180 gossip_list[i] = *peer; 1297 gossip_list[i] = *peer;
1181 // TODO send push/pull to each of those peers? 1298 // TODO send push/pull to each of those peers?
1182 } else { 1299 }
1300 else
1301 {
1183 rps_start( (struct GNUNET_SERVER_Handle *) cls); 1302 rps_start( (struct GNUNET_SERVER_Handle *) cls);
1184 } 1303 }
1185} 1304}
@@ -1251,7 +1370,7 @@ handle_inbound_channel (void *cls,
1251 1370
1252 GNUNET_assert( NULL != channel ); 1371 GNUNET_assert( NULL != channel );
1253 1372
1254 // TODO we might even not store the from_channel 1373 // TODO we might not even store the from_channel
1255 1374
1256 if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) { 1375 if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
1257 ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel; 1376 ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel;
@@ -1282,6 +1401,8 @@ cleanup_channel(void *cls,
1282 void *channel_ctx) 1401 void *channel_ctx)
1283{ 1402{
1284 LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n"); 1403 LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n");
1404 // TODO test whether that was a peer in the samplers/a peer we opened a connection to
1405 // and if so, reinitialise the sampler
1285} 1406}
1286 1407
1287/** 1408/**
@@ -1405,17 +1526,6 @@ run (void *cls,
1405 peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO); 1526 peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
1406 1527
1407 1528
1408 /* Initialise sampler and gossip list */
1409
1410 sampler_list = SAMPLER_samplers_init(est_size, own_identity);
1411
1412 push_list = NULL;
1413 //GNUNET_array_grow(push_list, push_list_size, 0);
1414 push_list_size = 0;
1415 pull_list = NULL;
1416 //GNUNET_array_grow(pull_list, pull_list_size, 0);
1417 pull_list_size = 0;
1418
1419 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { 1529 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1420 {&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0}, 1530 {&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0},
1421 {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0}, 1531 {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1433,6 +1543,16 @@ run (void *cls,
1433 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n"); 1543 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1434 1544
1435 1545
1546 /* Initialise sampler and gossip list */
1547
1548 sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL);
1549
1550 push_list = NULL;
1551 push_list_size = 0;
1552 pull_list = NULL;
1553 pull_list_size = 0;
1554
1555
1436 LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 1556 LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1437 GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server); 1557 GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
1438 // FIXME use magic 0000 PeerID to _start_ the service 1558 // FIXME use magic 0000 PeerID to _start_ the service