aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps.c64
-rw-r--r--src/rps/gnunet-service-rps_sampler.c91
-rw-r--r--src/rps/gnunet-service-rps_sampler.h26
3 files changed, 23 insertions, 158 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 0d2e68b4a..f66459802 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -609,8 +609,8 @@ cadet_ntfy_tmt_rdy_cb (void *cls, size_t size, void *buf)
609{ 609{
610 struct PeerContext *peer_ctx = (struct PeerContext *) cls; 610 struct PeerContext *peer_ctx = (struct PeerContext *) cls;
611 611
612 if (NULL != buf || 612 if (NULL != buf
613 0 != size) 613 && 0 != size)
614 peer_is_live (peer_ctx); 614 peer_is_live (peer_ctx);
615 615
616 //if (NULL != peer_ctx->is_live_task) 616 //if (NULL != peer_ctx->is_live_task)
@@ -1441,57 +1441,10 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1441} 1441}
1442 1442
1443 1443
1444/**
1445 * Open a connection to given peer and store channel and mq.
1446 */
1447 void
1448insertCB (void *cls, struct RPS_Sampler *sampler,
1449 const struct GNUNET_PeerIdentity *id)
1450{
1451 // We open a channel to be notified when this peer goes down.
1452 (void) get_channel (peer_map, id);
1453}
1454
1455
1456/**
1457 * Close the connection to given peer and delete channel and mq
1458 * if the peer is not anymore in the sampler.
1459 */
1460 void
1461removeCB (void *cls, struct RPS_Sampler *sampler,
1462 const struct GNUNET_PeerIdentity *id)
1463{
1464 size_t s;
1465 struct PeerContext *ctx;
1466
1467 s = RPS_sampler_count_id (sampler, id);
1468 if ( 1 >= s )
1469 {
1470 if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id)
1471 && 0 != GNUNET_CRYPTO_cmp_peer_identity (id, &own_identity))
1472 {
1473 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, id);
1474 if (NULL != ctx->send_channel)
1475 {
1476 if (NULL != ctx->mq)
1477 {
1478 GNUNET_MQ_destroy (ctx->mq);
1479 ctx->mq = NULL;
1480 }
1481 // may already be freed at shutdown of cadet
1482 // maybe this fails at our own channel
1483 GNUNET_CADET_channel_destroy (ctx->send_channel);
1484 ctx->send_channel = NULL;
1485 }
1486 // TODO cleanup peer
1487 //(void) GNUNET_CONTAINER_multipeermap_remove_all (peer_map, id);
1488 }
1489 }
1490}
1491
1492static void 1444static void
1493rps_start (struct GNUNET_SERVER_Handle *server); 1445rps_start (struct GNUNET_SERVER_Handle *server);
1494 1446
1447
1495/** 1448/**
1496 * This is called from GNUNET_CADET_get_peers(). 1449 * This is called from GNUNET_CADET_get_peers().
1497 * 1450 *
@@ -1511,7 +1464,8 @@ init_peer_cb (void *cls,
1511 struct PeerContext *peer_ctx; 1464 struct PeerContext *peer_ctx;
1512 1465
1513 server = (struct GNUNET_SERVER_Handle *) cls; 1466 server = (struct GNUNET_SERVER_Handle *) cls;
1514 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer)) 1467 if (NULL != peer
1468 && 0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer))
1515 { 1469 {
1516 LOG (GNUNET_ERROR_TYPE_DEBUG, 1470 LOG (GNUNET_ERROR_TYPE_DEBUG,
1517 "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n", 1471 "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
@@ -1890,12 +1844,8 @@ run (void *cls,
1890 half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5); 1844 half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
1891 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); 1845 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
1892 1846
1893 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval, 1847 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
1894 //insertCB, NULL, removeCB, NULL); 1848 client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
1895 NULL, NULL, NULL, NULL);
1896 client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
1897 //nsertCB, NULL, removeCB, NULL);
1898 NULL, NULL, NULL, NULL);
1899 1849
1900 /* Initialise push and pull maps */ 1850 /* Initialise push and pull maps */
1901 push_list = NULL; 1851 push_list = NULL;
diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c
index 89dbb4dac..222a600bd 100644
--- a/src/rps/gnunet-service-rps_sampler.c
+++ b/src/rps/gnunet-service-rps_sampler.c
@@ -104,6 +104,7 @@ struct RPS_SamplerElement
104 uint32_t num_change; 104 uint32_t num_change;
105}; 105};
106 106
107
107/** 108/**
108 * Sampler with its own array of SamplerElements 109 * Sampler with its own array of SamplerElements
109 */ 110 */
@@ -126,26 +127,6 @@ struct RPS_Sampler
126 * Used in the context of RPS 127 * Used in the context of RPS
127 */ 128 */
128 struct GNUNET_TIME_Relative max_round_interval; 129 struct GNUNET_TIME_Relative max_round_interval;
129
130 /**
131 * Callback to be called when a peer gets inserted into a sampler.
132 */
133 RPS_sampler_insert_cb insert_cb;
134
135 /**
136 * Closure to the insert_cb.
137 */
138 void *insert_cls;
139
140 /**
141 * Callback to be called when a peer gets inserted into a sampler.
142 */
143 RPS_sampler_remove_cb remove_cb;
144
145 /**
146 * Closure to the remove_cb.
147 */
148 void *remove_cls;
149}; 130};
150 131
151/** 132/**
@@ -344,15 +325,13 @@ RPS_sampler_elem_create (void)
344static void 325static void
345RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, 326RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem,
346 struct RPS_Sampler *sampler, 327 struct RPS_Sampler *sampler,
347 const struct GNUNET_PeerIdentity *other, 328 const struct GNUNET_PeerIdentity *other)
348 RPS_sampler_insert_cb insert_cb, void *insert_cls,
349 RPS_sampler_remove_cb remove_cb, void *remove_cls)
350{ 329{
351 struct GNUNET_HashCode other_hash; 330 struct GNUNET_HashCode other_hash;
352 331
353 s_elem->num_peers++; 332 s_elem->num_peers++;
354 333
355 if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) ) 334 if (0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)))
356 { 335 {
357 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", 336 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
358 GNUNET_i2s (other)); 337 GNUNET_i2s (other));
@@ -366,50 +345,31 @@ RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem,
366 sizeof(struct GNUNET_PeerIdentity), 345 sizeof(struct GNUNET_PeerIdentity),
367 &other_hash); 346 &other_hash);
368 347
369 if ( EMPTY == s_elem->is_empty ) 348 if (EMPTY == s_elem->is_empty)
370 { 349 {
371 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerID %s; Simply accepting (was empty previously).\n", 350 LOG (GNUNET_ERROR_TYPE_DEBUG,
372 GNUNET_i2s(other)); 351 "Got PeerID %s; Simply accepting (was empty previously).\n",
352 GNUNET_i2s(other));
373 s_elem->peer_id = *other; 353 s_elem->peer_id = *other;
374 s_elem->peer_id_hash = other_hash; 354 s_elem->peer_id_hash = other_hash;
375 355
376 if (NULL != insert_cb)
377 insert_cb (insert_cls, sampler, &(s_elem->peer_id));
378
379 s_elem->num_change++; 356 s_elem->num_change++;
380 } 357 }
381 else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) ) 358 else if (0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash))
382 { 359 {
383 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", 360 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
384 GNUNET_i2s (other)); 361 GNUNET_i2s (other));
385 LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n", 362 LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
386 GNUNET_i2s (&s_elem->peer_id)); 363 GNUNET_i2s (&s_elem->peer_id));
387 364
388 if ( NULL != remove_cb )
389 {
390 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n",
391 GNUNET_i2s (&s_elem->peer_id));
392 remove_cb (remove_cls, sampler, &s_elem->peer_id);
393 }
394
395 s_elem->peer_id = *other;
396 s_elem->peer_id_hash = other_hash;
397
398 if ( NULL != insert_cb )
399 {
400 LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n",
401 GNUNET_i2s (&s_elem->peer_id));
402 insert_cb (insert_cls, sampler, &s_elem->peer_id);
403 }
404
405 s_elem->num_change++; 365 s_elem->num_change++;
406 } 366 }
407 else 367 else
408 { 368 {
409 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n", 369 LOG (GNUNET_ERROR_TYPE_DEBUG, " Got PeerID %s\n",
410 GNUNET_i2s(other)); 370 GNUNET_i2s (other));
411 LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n", 371 LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n",
412 GNUNET_i2s(&s_elem->peer_id)); 372 GNUNET_i2s (&s_elem->peer_id));
413 } 373 }
414 } 374 }
415 s_elem->is_empty = NOT_EMPTY; 375 s_elem->is_empty = NOT_EMPTY;
@@ -440,7 +400,6 @@ sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
440{ 400{
441 unsigned int old_size; 401 unsigned int old_size;
442 uint32_t i; 402 uint32_t i;
443 struct RPS_SamplerElement **rem_list;
444 403
445 // TODO check min and max size 404 // TODO check min and max size
446 405
@@ -449,10 +408,6 @@ sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
449 if (old_size > new_size) 408 if (old_size > new_size)
450 { /* Shrinking */ 409 { /* Shrinking */
451 /* Temporary store those to properly call the removeCB on those later */ 410 /* Temporary store those to properly call the removeCB on those later */
452 rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
453 memcpy (rem_list,
454 &sampler->sampler_elements[new_size],
455 (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
456 411
457 LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size); 412 LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size);
458 GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); 413 GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
@@ -460,14 +415,6 @@ sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
460 "sampler->sampler_elements now points to %p\n", 415 "sampler->sampler_elements now points to %p\n",
461 sampler->sampler_elements); 416 sampler->sampler_elements);
462 417
463 for (i = 0 ; i < old_size - new_size ; i++)
464 {/* Remove unneeded rest */
465 LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i);
466 if (NULL != sampler->remove_cb)
467 sampler->remove_cb (sampler->remove_cls, sampler, &rem_list[i]->peer_id);
468 GNUNET_free (rem_list[i]);
469 }
470 GNUNET_free (rem_list);
471 } 418 }
472 else if (old_size < new_size) 419 else if (old_size < new_size)
473 { /* Growing */ 420 { /* Growing */
@@ -480,8 +427,6 @@ sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
480 for ( i = old_size ; i < new_size ; i++ ) 427 for ( i = old_size ; i < new_size ; i++ )
481 { /* Add new sampler elements */ 428 { /* Add new sampler elements */
482 sampler->sampler_elements[i] = RPS_sampler_elem_create (); 429 sampler->sampler_elements[i] = RPS_sampler_elem_create ();
483 if (NULL != sampler->insert_cb)
484 sampler->insert_cb (sampler->insert_cls, sampler, &sampler->sampler_elements[i]->peer_id);
485 LOG (GNUNET_ERROR_TYPE_DEBUG, 430 LOG (GNUNET_ERROR_TYPE_DEBUG,
486 "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n", 431 "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
487 i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id)); 432 i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
@@ -539,9 +484,7 @@ sampler_empty (struct RPS_Sampler *sampler)
539 */ 484 */
540struct RPS_Sampler * 485struct RPS_Sampler *
541RPS_sampler_init (size_t init_size, 486RPS_sampler_init (size_t init_size,
542 struct GNUNET_TIME_Relative max_round_interval, 487 struct GNUNET_TIME_Relative max_round_interval)
543 RPS_sampler_insert_cb ins_cb, void *ins_cls,
544 RPS_sampler_remove_cb rem_cb, void *rem_cls)
545{ 488{
546 struct RPS_Sampler *sampler; 489 struct RPS_Sampler *sampler;
547 //uint32_t i; 490 //uint32_t i;
@@ -554,10 +497,6 @@ RPS_sampler_init (size_t init_size,
554 sampler->sampler_size = 0; 497 sampler->sampler_size = 0;
555 sampler->sampler_elements = NULL; 498 sampler->sampler_elements = NULL;
556 sampler->max_round_interval = max_round_interval; 499 sampler->max_round_interval = max_round_interval;
557 sampler->insert_cb = ins_cb;
558 sampler->insert_cls = ins_cls;
559 sampler->remove_cb = rem_cb;
560 sampler->remove_cls = rem_cls;
561 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); 500 //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
562 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); 501 //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
563 RPS_sampler_resize (sampler, init_size); 502 RPS_sampler_resize (sampler, init_size);
@@ -583,9 +522,8 @@ RPS_sampler_update (struct RPS_Sampler *sampler,
583 522
584 for ( i = 0 ; i < sampler->sampler_size ; i++ ) 523 for ( i = 0 ; i < sampler->sampler_size ; i++ )
585 RPS_sampler_elem_next (sampler->sampler_elements[i], 524 RPS_sampler_elem_next (sampler->sampler_elements[i],
586 sampler, id, 525 sampler,
587 sampler->insert_cb, sampler->insert_cls, 526 id);
588 sampler->remove_cb, sampler->remove_cls);
589} 527}
590 528
591 529
@@ -705,7 +643,8 @@ sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
705 tmp_id = gpc->sampler->sampler_elements[client_get_index]->peer_id; 643 tmp_id = gpc->sampler->sampler_elements[client_get_index]->peer_id;
706 RPS_sampler_elem_reinit (gpc->sampler->sampler_elements[client_get_index]); 644 RPS_sampler_elem_reinit (gpc->sampler->sampler_elements[client_get_index]);
707 RPS_sampler_elem_next (gpc->sampler->sampler_elements[client_get_index], 645 RPS_sampler_elem_next (gpc->sampler->sampler_elements[client_get_index],
708 gpc->sampler, &tmp_id, NULL, NULL, NULL, NULL); 646 gpc->sampler,
647 &tmp_id);
709 648
710 /* Cycle the #client_get_index one step further */ 649 /* Cycle the #client_get_index one step further */
711 if ( client_get_index == gpc->sampler->sampler_size - 1 ) 650 if ( client_get_index == gpc->sampler->sampler_size - 1 )
diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h
index d022e6c9d..708de6980 100644
--- a/src/rps/gnunet-service-rps_sampler.h
+++ b/src/rps/gnunet-service-rps_sampler.h
@@ -36,28 +36,6 @@ struct RPS_Sampler;
36 36
37 37
38/** 38/**
39 * Callback that is called when a new PeerID is inserted into a sampler.
40 *
41 * @param cls the closure given alongside this function.
42 * @param id the PeerID that is inserted
43 */
44typedef void
45(*RPS_sampler_insert_cb) (void *cls,
46 struct RPS_Sampler *sampler,
47 const struct GNUNET_PeerIdentity *id);
48
49/**
50 * Callback that is called when a new PeerID is removed from a sampler.
51 *
52 * @param cls the closure given alongside this function.
53 * @param id the PeerID that is removed
54 */
55typedef void
56(*RPS_sampler_remove_cb) (void *cls,
57 struct RPS_Sampler *sampler,
58 const struct GNUNET_PeerIdentity *id);
59
60/**
61 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. 39 * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
62 * 40 *
63 * @param cls the closure given alongside this function. 41 * @param cls the closure given alongside this function.
@@ -104,9 +82,7 @@ RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size);
104 */ 82 */
105struct RPS_Sampler * 83struct RPS_Sampler *
106RPS_sampler_init (size_t init_size, 84RPS_sampler_init (size_t init_size,
107 struct GNUNET_TIME_Relative max_round_interval, 85 struct GNUNET_TIME_Relative max_round_interval);
108 RPS_sampler_insert_cb ins_cb, void *ins_cls,
109 RPS_sampler_remove_cb rem_cb, void *rem_cls);
110 86
111 87
112/** 88/**