aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2018-10-11 14:39:48 +0200
committerJulius Bünger <buenger@mytum.de>2018-10-11 14:40:44 +0200
commitfbbf0db19d08fe54f5b51c82b6cb1c9c7a2d040d (patch)
tree2248e208d0821c4d2c4aac5a4b03309cd0fa6c21 /src
parent0cdd1af62ca1f17fdca31be6e28ad24dcf3293b1 (diff)
downloadgnunet-fbbf0db19d08fe54f5b51c82b6cb1c9c7a2d040d.tar.gz
gnunet-fbbf0db19d08fe54f5b51c82b6cb1c9c7a2d040d.zip
RPS API: Add creation, deletion of Subs
Diffstat (limited to 'src')
-rw-r--r--src/rps/gnunet-service-rps.c974
-rw-r--r--src/rps/rps.h44
-rw-r--r--src/rps/rps_api.c61
3 files changed, 658 insertions, 421 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index d1c169239..b41a77074 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -36,11 +36,10 @@
36 36
37#include <math.h> 37#include <math.h>
38#include <inttypes.h> 38#include <inttypes.h>
39#include <string.h>
39 40
40#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) 41#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41 42
42// TODO modify @brief in every file
43
44// TODO check for overflows 43// TODO check for overflows
45 44
46// TODO align message structs 45// TODO align message structs
@@ -149,9 +148,9 @@ struct ChannelCtx;
149struct PeerContext 148struct PeerContext
150{ 149{
151 /** 150 /**
152 * The SubSampler this context belongs to. 151 * The Sub this context belongs to.
153 */ 152 */
154 struct SubSampler *ss; 153 struct Sub *sub;
155 154
156 /** 155 /**
157 * Message queue open to client 156 * Message queue open to client
@@ -280,24 +279,17 @@ struct AttackedPeer
280#endif /* ENABLE_MALICIOUS */ 279#endif /* ENABLE_MALICIOUS */
281 280
282/** 281/**
283 * @brief One SubSampler. 282 * @brief One Sub.
284 * 283 *
285 * Essentially one instance of brahms that only connects to other instances 284 * Essentially one instance of brahms that only connects to other instances
286 * with the same (secret) value. 285 * with the same (secret) value.
287 */ 286 */
288struct SubSampler 287struct Sub
289{ 288{
290 /** 289 /**
291 * @brief Port used for cadet. 290 * @brief Hash of the shared value that defines Subs.
292 *
293 * Don't compute multiple times through making it global
294 */
295 struct GNUNET_HashCode port;
296
297 /**
298 * Handler to CADET.
299 */ 291 */
300 struct GNUNET_CADET_Handle *cadet_handle; 292 struct GNUNET_HashCode hash;
301 293
302 /** 294 /**
303 * @brief Port to communicate to other peers. 295 * @brief Port to communicate to other peers.
@@ -417,6 +409,11 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
417struct GNUNET_STATISTICS_Handle *stats; 409struct GNUNET_STATISTICS_Handle *stats;
418 410
419/** 411/**
412 * Handler to CADET.
413 */
414struct GNUNET_CADET_Handle *cadet_handle;
415
416/**
420 * Our own identity. 417 * Our own identity.
421 */ 418 */
422static struct GNUNET_PeerIdentity own_identity; 419static struct GNUNET_PeerIdentity own_identity;
@@ -516,12 +513,12 @@ static uint32_t push_limit = 10000;
516#endif /* ENABLE_MALICIOUS */ 513#endif /* ENABLE_MALICIOUS */
517 514
518/** 515/**
519 * @brief Main SubSampler. 516 * @brief Main Sub.
520 * 517 *
521 * This is run in any case by all peers and connects to all peers without 518 * This is run in any case by all peers and connects to all peers without
522 * specifying a shared value. 519 * specifying a shared value.
523 */ 520 */
524static struct SubSampler *mss; 521static struct Sub *msub;
525 522
526/** 523/**
527 * @brief Maximum number of valid peers to keep. 524 * @brief Maximum number of valid peers to keep.
@@ -529,12 +526,18 @@ static struct SubSampler *mss;
529 */ 526 */
530static const uint32_t num_valid_peers_max = UINT32_MAX; 527static const uint32_t num_valid_peers_max = UINT32_MAX;
531 528
532
533/*********************************************************************** 529/***********************************************************************
534 * /Globals 530 * /Globals
535***********************************************************************/ 531***********************************************************************/
536 532
537 533
534static void
535do_round (void *cls);
536
537static void
538do_mal_round (void *cls);
539
540
538/** 541/**
539 * @brief Get the #PeerContext associated with a peer 542 * @brief Get the #PeerContext associated with a peer
540 * 543 *
@@ -586,29 +589,29 @@ check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
586/** 589/**
587 * @brief Create a new #PeerContext and insert it into the peer map 590 * @brief Create a new #PeerContext and insert it into the peer map
588 * 591 *
589 * @param ss The SubSampler this context belongs to. 592 * @param sub The Sub this context belongs to.
590 * @param peer the peer to create the #PeerContext for 593 * @param peer the peer to create the #PeerContext for
591 * 594 *
592 * @return the #PeerContext 595 * @return the #PeerContext
593 */ 596 */
594static struct PeerContext * 597static struct PeerContext *
595create_peer_ctx (struct SubSampler *ss, 598create_peer_ctx (struct Sub *sub,
596 const struct GNUNET_PeerIdentity *peer) 599 const struct GNUNET_PeerIdentity *peer)
597{ 600{
598 struct PeerContext *ctx; 601 struct PeerContext *ctx;
599 int ret; 602 int ret;
600 603
601 GNUNET_assert (GNUNET_NO == check_peer_known (ss->peer_map, peer)); 604 GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
602 605
603 ctx = GNUNET_new (struct PeerContext); 606 ctx = GNUNET_new (struct PeerContext);
604 ctx->peer_id = *peer; 607 ctx->peer_id = *peer;
605 ctx->ss = ss; 608 ctx->sub = sub;
606 ret = GNUNET_CONTAINER_multipeermap_put (ss->peer_map, peer, ctx, 609 ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
607 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 610 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
608 GNUNET_assert (GNUNET_OK == ret); 611 GNUNET_assert (GNUNET_OK == ret);
609 GNUNET_STATISTICS_set (stats, 612 GNUNET_STATISTICS_set (stats,
610 "# known peers", 613 "# known peers",
611 GNUNET_CONTAINER_multipeermap_size (ss->peer_map), 614 GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
612 GNUNET_NO); 615 GNUNET_NO);
613 return ctx; 616 return ctx;
614} 617}
@@ -617,20 +620,20 @@ create_peer_ctx (struct SubSampler *ss,
617/** 620/**
618 * @brief Create or get a #PeerContext 621 * @brief Create or get a #PeerContext
619 * 622 *
620 * @param ss The SubSampler to which the created context belongs to 623 * @param sub The Sub to which the created context belongs to
621 * @param peer the peer to get the associated context to 624 * @param peer the peer to get the associated context to
622 * 625 *
623 * @return the context 626 * @return the context
624 */ 627 */
625static struct PeerContext * 628static struct PeerContext *
626create_or_get_peer_ctx (struct SubSampler *ss, 629create_or_get_peer_ctx (struct Sub *sub,
627 const struct GNUNET_PeerIdentity *peer) 630 const struct GNUNET_PeerIdentity *peer)
628{ 631{
629 if (GNUNET_NO == check_peer_known (ss->peer_map, peer)) 632 if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
630 { 633 {
631 return create_peer_ctx (ss, peer); 634 return create_peer_ctx (sub, peer);
632 } 635 }
633 return get_peer_ctx (ss->peer_map, peer); 636 return get_peer_ctx (sub->peer_map, peer);
634} 637}
635 638
636 639
@@ -648,13 +651,13 @@ static int
648check_connected (struct PeerContext *peer_ctx) 651check_connected (struct PeerContext *peer_ctx)
649{ 652{
650 /* If we don't know about this peer we don't know whether it's online */ 653 /* If we don't know about this peer we don't know whether it's online */
651 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, 654 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
652 &peer_ctx->peer_id)) 655 &peer_ctx->peer_id))
653 { 656 {
654 return GNUNET_NO; 657 return GNUNET_NO;
655 } 658 }
656 /* Get the context */ 659 /* Get the context */
657 peer_ctx = get_peer_ctx (peer_ctx->ss->peer_map, &peer_ctx->peer_id); 660 peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
658 /* If we have no channel to this peer we don't know whether it's online */ 661 /* If we have no channel to this peer we don't know whether it's online */
659 if ( (NULL == peer_ctx->send_channel_ctx) && 662 if ( (NULL == peer_ctx->send_channel_ctx) &&
660 (NULL == peer_ctx->recv_channel_ctx) ) 663 (NULL == peer_ctx->recv_channel_ctx) )
@@ -943,10 +946,10 @@ get_channel (struct PeerContext *peer_ctx)
943 *ctx_peer = peer_ctx->peer_id; 946 *ctx_peer = peer_ctx->peer_id;
944 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); 947 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
945 peer_ctx->send_channel_ctx->channel = 948 peer_ctx->send_channel_ctx->channel =
946 GNUNET_CADET_channel_create (peer_ctx->ss->cadet_handle, 949 GNUNET_CADET_channel_create (cadet_handle,
947 peer_ctx->send_channel_ctx, /* context */ 950 peer_ctx->send_channel_ctx, /* context */
948 &peer_ctx->peer_id, 951 &peer_ctx->peer_id,
949 &peer_ctx->ss->port, 952 &peer_ctx->sub->hash,
950 GNUNET_CADET_OPTION_RELIABLE, 953 GNUNET_CADET_OPTION_RELIABLE,
951 NULL, /* WindowSize handler */ 954 NULL, /* WindowSize handler */
952 &cleanup_destroyed_channel, /* Disconnect handler */ 955 &cleanup_destroyed_channel, /* Disconnect handler */
@@ -1048,7 +1051,7 @@ mq_online_check_successful (void *cls)
1048 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); 1051 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1049 peer_ctx->online_check_pending = NULL; 1052 peer_ctx->online_check_pending = NULL;
1050 set_peer_online (peer_ctx); 1053 set_peer_online (peer_ctx);
1051 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers); 1054 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1052 } 1055 }
1053} 1056}
1054 1057
@@ -1187,9 +1190,9 @@ static int
1187destroy_peer (struct PeerContext *peer_ctx) 1190destroy_peer (struct PeerContext *peer_ctx)
1188{ 1191{
1189 GNUNET_assert (NULL != peer_ctx); 1192 GNUNET_assert (NULL != peer_ctx);
1190 GNUNET_assert (NULL != peer_ctx->ss->peer_map); 1193 GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1191 if (GNUNET_NO == 1194 if (GNUNET_NO ==
1192 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map, 1195 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1193 &peer_ctx->peer_id)) 1196 &peer_ctx->peer_id))
1194 { 1197 {
1195 return GNUNET_NO; 1198 return GNUNET_NO;
@@ -1259,15 +1262,15 @@ destroy_peer (struct PeerContext *peer_ctx)
1259 } 1262 }
1260 1263
1261 if (GNUNET_YES != 1264 if (GNUNET_YES !=
1262 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->ss->peer_map, 1265 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1263 &peer_ctx->peer_id)) 1266 &peer_ctx->peer_id))
1264 { 1267 {
1265 LOG (GNUNET_ERROR_TYPE_WARNING, 1268 LOG (GNUNET_ERROR_TYPE_WARNING,
1266 "removing peer from peer_ctx->ss->peer_map failed\n"); 1269 "removing peer from peer_ctx->sub->peer_map failed\n");
1267 } 1270 }
1268 GNUNET_STATISTICS_set (stats, 1271 GNUNET_STATISTICS_set (stats,
1269 "# known peers", 1272 "# known peers",
1270 GNUNET_CONTAINER_multipeermap_size (peer_ctx->ss->peer_map), 1273 GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1271 GNUNET_NO); 1274 GNUNET_NO);
1272 GNUNET_free (peer_ctx); 1275 GNUNET_free (peer_ctx);
1273 return GNUNET_YES; 1276 return GNUNET_YES;
@@ -1288,10 +1291,10 @@ peermap_clear_iterator (void *cls,
1288 const struct GNUNET_PeerIdentity *key, 1291 const struct GNUNET_PeerIdentity *key,
1289 void *value) 1292 void *value)
1290{ 1293{
1291 struct SubSampler *ss = cls; 1294 struct Sub *sub = cls;
1292 (void) value; 1295 (void) value;
1293 1296
1294 destroy_peer (get_peer_ctx (ss->peer_map, key)); 1297 destroy_peer (get_peer_ctx (sub->peer_map, key));
1295 return GNUNET_YES; 1298 return GNUNET_YES;
1296} 1299}
1297 1300
@@ -1366,36 +1369,36 @@ store_peer_presistently_iterator (void *cls,
1366/** 1369/**
1367 * @brief Store the peers currently in #valid_peers to disk. 1370 * @brief Store the peers currently in #valid_peers to disk.
1368 * 1371 *
1369 * @param ss SubSampler for which to store the valid peers 1372 * @param sub Sub for which to store the valid peers
1370 */ 1373 */
1371static void 1374static void
1372store_valid_peers (const struct SubSampler *ss) 1375store_valid_peers (const struct Sub *sub)
1373{ 1376{
1374 struct GNUNET_DISK_FileHandle *fh; 1377 struct GNUNET_DISK_FileHandle *fh;
1375 uint32_t number_written_peers; 1378 uint32_t number_written_peers;
1376 int ret; 1379 int ret;
1377 1380
1378 if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7)) 1381 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1379 { 1382 {
1380 return; 1383 return;
1381 } 1384 }
1382 1385
1383 ret = GNUNET_DISK_directory_create_for_file (ss->filename_valid_peers); 1386 ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1384 if (GNUNET_SYSERR == ret) 1387 if (GNUNET_SYSERR == ret)
1385 { 1388 {
1386 LOG (GNUNET_ERROR_TYPE_WARNING, 1389 LOG (GNUNET_ERROR_TYPE_WARNING,
1387 "Not able to create directory for file `%s'\n", 1390 "Not able to create directory for file `%s'\n",
1388 ss->filename_valid_peers); 1391 sub->filename_valid_peers);
1389 GNUNET_break (0); 1392 GNUNET_break (0);
1390 } 1393 }
1391 else if (GNUNET_NO == ret) 1394 else if (GNUNET_NO == ret)
1392 { 1395 {
1393 LOG (GNUNET_ERROR_TYPE_WARNING, 1396 LOG (GNUNET_ERROR_TYPE_WARNING,
1394 "Directory for file `%s' exists but is not writable for us\n", 1397 "Directory for file `%s' exists but is not writable for us\n",
1395 ss->filename_valid_peers); 1398 sub->filename_valid_peers);
1396 GNUNET_break (0); 1399 GNUNET_break (0);
1397 } 1400 }
1398 fh = GNUNET_DISK_file_open (ss->filename_valid_peers, 1401 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1399 GNUNET_DISK_OPEN_WRITE | 1402 GNUNET_DISK_OPEN_WRITE |
1400 GNUNET_DISK_OPEN_CREATE, 1403 GNUNET_DISK_OPEN_CREATE,
1401 GNUNET_DISK_PERM_USER_READ | 1404 GNUNET_DISK_PERM_USER_READ |
@@ -1404,19 +1407,19 @@ store_valid_peers (const struct SubSampler *ss)
1404 { 1407 {
1405 LOG (GNUNET_ERROR_TYPE_WARNING, 1408 LOG (GNUNET_ERROR_TYPE_WARNING,
1406 "Not able to write valid peers to file `%s'\n", 1409 "Not able to write valid peers to file `%s'\n",
1407 ss->filename_valid_peers); 1410 sub->filename_valid_peers);
1408 return; 1411 return;
1409 } 1412 }
1410 LOG (GNUNET_ERROR_TYPE_DEBUG, 1413 LOG (GNUNET_ERROR_TYPE_DEBUG,
1411 "Writing %u valid peers to disk\n", 1414 "Writing %u valid peers to disk\n",
1412 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); 1415 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1413 number_written_peers = 1416 number_written_peers =
1414 GNUNET_CONTAINER_multipeermap_iterate (ss->valid_peers, 1417 GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1415 store_peer_presistently_iterator, 1418 store_peer_presistently_iterator,
1416 fh); 1419 fh);
1417 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1420 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1418 GNUNET_assert (number_written_peers == 1421 GNUNET_assert (number_written_peers ==
1419 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); 1422 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1420} 1423}
1421 1424
1422 1425
@@ -1469,10 +1472,10 @@ s2i_full (const char *string_repr)
1469/** 1472/**
1470 * @brief Restore the peers on disk to #valid_peers. 1473 * @brief Restore the peers on disk to #valid_peers.
1471 * 1474 *
1472 * @param ss SubSampler for which to restore the valid peers 1475 * @param sub Sub for which to restore the valid peers
1473 */ 1476 */
1474static void 1477static void
1475restore_valid_peers (const struct SubSampler *ss) 1478restore_valid_peers (const struct Sub *sub)
1476{ 1479{
1477 off_t file_size; 1480 off_t file_size;
1478 uint32_t num_peers; 1481 uint32_t num_peers;
@@ -1483,16 +1486,16 @@ restore_valid_peers (const struct SubSampler *ss)
1483 char *str_repr; 1486 char *str_repr;
1484 const struct GNUNET_PeerIdentity *peer; 1487 const struct GNUNET_PeerIdentity *peer;
1485 1488
1486 if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7)) 1489 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1487 { 1490 {
1488 return; 1491 return;
1489 } 1492 }
1490 1493
1491 if (GNUNET_OK != GNUNET_DISK_file_test (ss->filename_valid_peers)) 1494 if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1492 { 1495 {
1493 return; 1496 return;
1494 } 1497 }
1495 fh = GNUNET_DISK_file_open (ss->filename_valid_peers, 1498 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1496 GNUNET_DISK_OPEN_READ, 1499 GNUNET_DISK_OPEN_READ,
1497 GNUNET_DISK_PERM_NONE); 1500 GNUNET_DISK_PERM_NONE);
1498 GNUNET_assert (NULL != fh); 1501 GNUNET_assert (NULL != fh);
@@ -1504,13 +1507,13 @@ restore_valid_peers (const struct SubSampler *ss)
1504 LOG (GNUNET_ERROR_TYPE_DEBUG, 1507 LOG (GNUNET_ERROR_TYPE_DEBUG,
1505 "Restoring %" PRIu32 " peers from file `%s'\n", 1508 "Restoring %" PRIu32 " peers from file `%s'\n",
1506 num_peers, 1509 num_peers,
1507 ss->filename_valid_peers); 1510 sub->filename_valid_peers);
1508 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) 1511 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1509 { 1512 {
1510 str_repr = GNUNET_strndup (iter_buf, 53); 1513 str_repr = GNUNET_strndup (iter_buf, 53);
1511 peer = s2i_full (str_repr); 1514 peer = s2i_full (str_repr);
1512 GNUNET_free (str_repr); 1515 GNUNET_free (str_repr);
1513 add_valid_peer (peer, ss->valid_peers); 1516 add_valid_peer (peer, sub->valid_peers);
1514 LOG (GNUNET_ERROR_TYPE_DEBUG, 1517 LOG (GNUNET_ERROR_TYPE_DEBUG,
1515 "Restored valid peer %s from disk\n", 1518 "Restored valid peer %s from disk\n",
1516 GNUNET_i2s_full (peer)); 1519 GNUNET_i2s_full (peer));
@@ -1518,10 +1521,10 @@ restore_valid_peers (const struct SubSampler *ss)
1518 iter_buf = NULL; 1521 iter_buf = NULL;
1519 GNUNET_free (buf); 1522 GNUNET_free (buf);
1520 LOG (GNUNET_ERROR_TYPE_DEBUG, 1523 LOG (GNUNET_ERROR_TYPE_DEBUG,
1521 "num_peers: %" PRIu32 ", _size (ss->valid_peers): %u\n", 1524 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1522 num_peers, 1525 num_peers,
1523 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); 1526 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1524 if (num_peers != GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)) 1527 if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1525 { 1528 {
1526 LOG (GNUNET_ERROR_TYPE_WARNING, 1529 LOG (GNUNET_ERROR_TYPE_WARNING,
1527 "Number of restored peers does not match file size. Have probably duplicates.\n"); 1530 "Number of restored peers does not match file size. Have probably duplicates.\n");
@@ -1529,33 +1532,33 @@ restore_valid_peers (const struct SubSampler *ss)
1529 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1532 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1530 LOG (GNUNET_ERROR_TYPE_DEBUG, 1533 LOG (GNUNET_ERROR_TYPE_DEBUG,
1531 "Restored %u valid peers from disk\n", 1534 "Restored %u valid peers from disk\n",
1532 GNUNET_CONTAINER_multipeermap_size (ss->valid_peers)); 1535 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1533} 1536}
1534 1537
1535 1538
1536/** 1539/**
1537 * @brief Delete storage of peers that was created with #initialise_peers () 1540 * @brief Delete storage of peers that was created with #initialise_peers ()
1538 * 1541 *
1539 * @param ss SubSampler for which the storage is deleted 1542 * @param sub Sub for which the storage is deleted
1540 */ 1543 */
1541static void 1544static void
1542peers_terminate (struct SubSampler *ss) 1545peers_terminate (struct Sub *sub)
1543{ 1546{
1544 if (GNUNET_SYSERR == 1547 if (GNUNET_SYSERR ==
1545 GNUNET_CONTAINER_multipeermap_iterate (ss->peer_map, 1548 GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1546 &peermap_clear_iterator, 1549 &peermap_clear_iterator,
1547 ss)) 1550 sub))
1548 { 1551 {
1549 LOG (GNUNET_ERROR_TYPE_WARNING, 1552 LOG (GNUNET_ERROR_TYPE_WARNING,
1550 "Iteration destroying peers was aborted.\n"); 1553 "Iteration destroying peers was aborted.\n");
1551 } 1554 }
1552 GNUNET_CONTAINER_multipeermap_destroy (ss->peer_map); 1555 GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1553 ss->peer_map = NULL; 1556 sub->peer_map = NULL;
1554 store_valid_peers (ss); 1557 store_valid_peers (sub);
1555 GNUNET_free (ss->filename_valid_peers); 1558 GNUNET_free (sub->filename_valid_peers);
1556 ss->filename_valid_peers = NULL; 1559 sub->filename_valid_peers = NULL;
1557 GNUNET_CONTAINER_multipeermap_destroy (ss->valid_peers); 1560 GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1558 ss->valid_peers = NULL; 1561 sub->valid_peers = NULL;
1559} 1562}
1560 1563
1561 1564
@@ -1615,21 +1618,21 @@ get_valid_peers (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1615 * This function is called on new peer_ids from 'external' sources 1618 * This function is called on new peer_ids from 'external' sources
1616 * (client seed, cadet get_peers(), ...) 1619 * (client seed, cadet get_peers(), ...)
1617 * 1620 *
1618 * @param ss SubSampler with the peer map that the @a peer will be added to 1621 * @param sub Sub with the peer map that the @a peer will be added to
1619 * @param peer the new #GNUNET_PeerIdentity 1622 * @param peer the new #GNUNET_PeerIdentity
1620 * 1623 *
1621 * @return #GNUNET_YES if peer was inserted 1624 * @return #GNUNET_YES if peer was inserted
1622 * #GNUNET_NO otherwise 1625 * #GNUNET_NO otherwise
1623 */ 1626 */
1624static int 1627static int
1625insert_peer (struct SubSampler *ss, 1628insert_peer (struct Sub *sub,
1626 const struct GNUNET_PeerIdentity *peer) 1629 const struct GNUNET_PeerIdentity *peer)
1627{ 1630{
1628 if (GNUNET_YES == check_peer_known (ss->peer_map, peer)) 1631 if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1629 { 1632 {
1630 return GNUNET_NO; /* We already know this peer - nothing to do */ 1633 return GNUNET_NO; /* We already know this peer - nothing to do */
1631 } 1634 }
1632 (void) create_peer_ctx (ss, peer); 1635 (void) create_peer_ctx (sub, peer);
1633 return GNUNET_YES; 1636 return GNUNET_YES;
1634} 1637}
1635 1638
@@ -1665,20 +1668,20 @@ check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1665 * 1668 *
1666 * If not known yet, insert into known peers 1669 * If not known yet, insert into known peers
1667 * 1670 *
1668 * @param ss SubSampler which would contain the @a peer 1671 * @param sub Sub which would contain the @a peer
1669 * @param peer the peer whose online is to be checked 1672 * @param peer the peer whose online is to be checked
1670 * @return #GNUNET_YES if the check was issued 1673 * @return #GNUNET_YES if the check was issued
1671 * #GNUNET_NO otherwise 1674 * #GNUNET_NO otherwise
1672 */ 1675 */
1673static int 1676static int
1674issue_peer_online_check (struct SubSampler *ss, 1677issue_peer_online_check (struct Sub *sub,
1675 const struct GNUNET_PeerIdentity *peer) 1678 const struct GNUNET_PeerIdentity *peer)
1676{ 1679{
1677 struct PeerContext *peer_ctx; 1680 struct PeerContext *peer_ctx;
1678 1681
1679 (void) insert_peer (ss, peer); // TODO even needed? 1682 (void) insert_peer (sub, peer); // TODO even needed?
1680 peer_ctx = get_peer_ctx (ss->peer_map, peer); 1683 peer_ctx = get_peer_ctx (sub->peer_map, peer);
1681 if ( (GNUNET_NO == check_peer_flag (ss->peer_map, peer, Peers_ONLINE)) && 1684 if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1682 (NULL == peer_ctx->online_check_pending) ) 1685 (NULL == peer_ctx->online_check_pending) )
1683 { 1686 {
1684 check_peer_online (peer_ctx); 1687 check_peer_online (peer_ctx);
@@ -1704,7 +1707,7 @@ issue_peer_online_check (struct SubSampler *ss,
1704static int 1707static int
1705check_removable (const struct PeerContext *peer_ctx) 1708check_removable (const struct PeerContext *peer_ctx)
1706{ 1709{
1707 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map, 1710 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1708 &peer_ctx->peer_id)) 1711 &peer_ctx->peer_id))
1709 { 1712 {
1710 return GNUNET_SYSERR; 1713 return GNUNET_SYSERR;
@@ -1749,7 +1752,7 @@ check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1749static void 1752static void
1750indicate_sending_intention (struct PeerContext *peer_ctx) 1753indicate_sending_intention (struct PeerContext *peer_ctx)
1751{ 1754{
1752 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map, 1755 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1753 &peer_ctx->peer_id)); 1756 &peer_ctx->peer_id));
1754 (void) get_channel (peer_ctx); 1757 (void) get_channel (peer_ctx);
1755} 1758}
@@ -1778,7 +1781,7 @@ check_peer_send_intention (const struct PeerContext *peer_ctx)
1778/** 1781/**
1779 * Handle the channel a peer opens to us. 1782 * Handle the channel a peer opens to us.
1780 * 1783 *
1781 * @param cls The closure - SubSampler 1784 * @param cls The closure - Sub
1782 * @param channel The channel the peer wants to establish 1785 * @param channel The channel the peer wants to establish
1783 * @param initiator The peer's peer ID 1786 * @param initiator The peer's peer ID
1784 * 1787 *
@@ -1793,22 +1796,22 @@ handle_inbound_channel (void *cls,
1793 struct PeerContext *peer_ctx; 1796 struct PeerContext *peer_ctx;
1794 struct GNUNET_PeerIdentity *ctx_peer; 1797 struct GNUNET_PeerIdentity *ctx_peer;
1795 struct ChannelCtx *channel_ctx; 1798 struct ChannelCtx *channel_ctx;
1796 struct SubSampler *ss = cls; 1799 struct Sub *sub = cls;
1797 1800
1798 LOG (GNUNET_ERROR_TYPE_DEBUG, 1801 LOG (GNUNET_ERROR_TYPE_DEBUG,
1799 "New channel was established to us (Peer %s).\n", 1802 "New channel was established to us (Peer %s).\n",
1800 GNUNET_i2s (initiator)); 1803 GNUNET_i2s (initiator));
1801 GNUNET_assert (NULL != channel); /* according to cadet API */ 1804 GNUNET_assert (NULL != channel); /* according to cadet API */
1802 /* Make sure we 'know' about this peer */ 1805 /* Make sure we 'know' about this peer */
1803 peer_ctx = create_or_get_peer_ctx (ss, initiator); 1806 peer_ctx = create_or_get_peer_ctx (sub, initiator);
1804 set_peer_online (peer_ctx); 1807 set_peer_online (peer_ctx);
1805 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers); 1808 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1806 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); 1809 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1807 *ctx_peer = *initiator; 1810 *ctx_peer = *initiator;
1808 channel_ctx = add_channel_ctx (peer_ctx); 1811 channel_ctx = add_channel_ctx (peer_ctx);
1809 channel_ctx->channel = channel; 1812 channel_ctx->channel = channel;
1810 /* We only accept one incoming channel per peer */ 1813 /* We only accept one incoming channel per peer */
1811 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (ss->peer_map, 1814 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1812 initiator))) 1815 initiator)))
1813 { 1816 {
1814 LOG (GNUNET_ERROR_TYPE_WARNING, 1817 LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -1835,7 +1838,7 @@ handle_inbound_channel (void *cls,
1835static int 1838static int
1836check_sending_channel_exists (const struct PeerContext *peer_ctx) 1839check_sending_channel_exists (const struct PeerContext *peer_ctx)
1837{ 1840{
1838 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, 1841 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1839 &peer_ctx->peer_id)) 1842 &peer_ctx->peer_id))
1840 { /* If no such peer exists, there is no channel */ 1843 { /* If no such peer exists, there is no channel */
1841 return GNUNET_NO; 1844 return GNUNET_NO;
@@ -1859,7 +1862,7 @@ check_sending_channel_exists (const struct PeerContext *peer_ctx)
1859static int 1862static int
1860destroy_sending_channel (struct PeerContext *peer_ctx) 1863destroy_sending_channel (struct PeerContext *peer_ctx)
1861{ 1864{
1862 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, 1865 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1863 &peer_ctx->peer_id)) 1866 &peer_ctx->peer_id))
1864 { 1867 {
1865 return GNUNET_NO; 1868 return GNUNET_NO;
@@ -1922,7 +1925,7 @@ schedule_operation (struct PeerContext *peer_ctx,
1922{ 1925{
1923 struct PeerPendingOp pending_op; 1926 struct PeerPendingOp pending_op;
1924 1927
1925 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map, 1928 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1926 &peer_ctx->peer_id)); 1929 &peer_ctx->peer_id));
1927 1930
1928 //TODO if ONLINE execute immediately 1931 //TODO if ONLINE execute immediately
@@ -2010,9 +2013,9 @@ struct ClientContext
2010 struct GNUNET_SERVICE_Client *client; 2013 struct GNUNET_SERVICE_Client *client;
2011 2014
2012 /** 2015 /**
2013 * The #SubSampler this context belongs to 2016 * The #Sub this context belongs to
2014 */ 2017 */
2015 struct SubSampler *ss; 2018 struct Sub *sub;
2016}; 2019};
2017 2020
2018/** 2021/**
@@ -2109,35 +2112,35 @@ insert_in_view_op (void *cls,
2109 * 2112 *
2110 * Called once we know a peer is online. 2113 * Called once we know a peer is online.
2111 * 2114 *
2112 * @param ss SubSampler in with the view to insert in 2115 * @param sub Sub in with the view to insert in
2113 * @param peer the peer to insert 2116 * @param peer the peer to insert
2114 * 2117 *
2115 * @return GNUNET_OK if peer was actually inserted 2118 * @return GNUNET_OK if peer was actually inserted
2116 * GNUNET_NO if peer was not inserted 2119 * GNUNET_NO if peer was not inserted
2117 */ 2120 */
2118static int 2121static int
2119insert_in_view (struct SubSampler *ss, 2122insert_in_view (struct Sub *sub,
2120 const struct GNUNET_PeerIdentity *peer) 2123 const struct GNUNET_PeerIdentity *peer)
2121{ 2124{
2122 struct PeerContext *peer_ctx; 2125 struct PeerContext *peer_ctx;
2123 int online; 2126 int online;
2124 int ret; 2127 int ret;
2125 2128
2126 online = check_peer_flag (ss->peer_map, peer, Peers_ONLINE); 2129 online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2127 peer_ctx = get_peer_ctx (ss->peer_map, peer); // TODO indirection needed? 2130 peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2128 if ( (GNUNET_NO == online) || 2131 if ( (GNUNET_NO == online) ||
2129 (GNUNET_SYSERR == online) ) /* peer is not even known */ 2132 (GNUNET_SYSERR == online) ) /* peer is not even known */
2130 { 2133 {
2131 (void) issue_peer_online_check (ss, peer); 2134 (void) issue_peer_online_check (sub, peer);
2132 (void) schedule_operation (peer_ctx, insert_in_view_op, NULL); 2135 (void) schedule_operation (peer_ctx, insert_in_view_op, NULL);
2133 return GNUNET_NO; 2136 return GNUNET_NO;
2134 } 2137 }
2135 /* Open channel towards peer to keep connection open */ 2138 /* Open channel towards peer to keep connection open */
2136 indicate_sending_intention (peer_ctx); 2139 indicate_sending_intention (peer_ctx);
2137 ret = View_put (ss->view, peer); 2140 ret = View_put (sub->view, peer);
2138 GNUNET_STATISTICS_set (stats, 2141 GNUNET_STATISTICS_set (stats,
2139 "view size", 2142 "view size",
2140 View_size (peer_ctx->ss->view), 2143 View_size (peer_ctx->sub->view),
2141 GNUNET_NO); 2144 GNUNET_NO);
2142 return ret; 2145 return ret;
2143} 2146}
@@ -2160,8 +2163,8 @@ send_view (const struct ClientContext *cli_ctx,
2160 2163
2161 if (NULL == view_array) 2164 if (NULL == view_array)
2162 { 2165 {
2163 view_size = View_size (cli_ctx->ss->view); 2166 view_size = View_size (cli_ctx->sub->view);
2164 view_array = View_get_as_array (cli_ctx->ss->view); 2167 view_array = View_get_as_array (cli_ctx->sub->view);
2165 } 2168 }
2166 2169
2167 ev = GNUNET_MQ_msg_extra (out_msg, 2170 ev = GNUNET_MQ_msg_extra (out_msg,
@@ -2210,17 +2213,17 @@ send_stream_peers (const struct ClientContext *cli_ctx,
2210/** 2213/**
2211 * @brief sends updates to clients that are interested 2214 * @brief sends updates to clients that are interested
2212 * 2215 *
2213 * @param ss Subsampler for which to notify clients 2216 * @param sub Sub for which to notify clients
2214 */ 2217 */
2215static void 2218static void
2216clients_notify_view_update (const struct SubSampler *ss) 2219clients_notify_view_update (const struct Sub *sub)
2217{ 2220{
2218 struct ClientContext *cli_ctx_iter; 2221 struct ClientContext *cli_ctx_iter;
2219 uint64_t num_peers; 2222 uint64_t num_peers;
2220 const struct GNUNET_PeerIdentity *view_array; 2223 const struct GNUNET_PeerIdentity *view_array;
2221 2224
2222 num_peers = View_size (ss->view); 2225 num_peers = View_size (sub->view);
2223 view_array = View_get_as_array(ss->view); 2226 view_array = View_get_as_array(sub->view);
2224 /* check size of view is small enough */ 2227 /* check size of view is small enough */
2225 if (GNUNET_MAX_MESSAGE_SIZE < num_peers) 2228 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2226 { 2229 {
@@ -2260,7 +2263,8 @@ clients_notify_view_update (const struct SubSampler *ss)
2260 * @param peers the array of peers to send 2263 * @param peers the array of peers to send
2261 */ 2264 */
2262static void 2265static void
2263clients_notify_stream_peer (uint64_t num_peers, 2266clients_notify_stream_peer (const struct Sub *sub,
2267 uint64_t num_peers,
2264 const struct GNUNET_PeerIdentity *peers) 2268 const struct GNUNET_PeerIdentity *peers)
2265 // TODO enum StreamPeerSource) 2269 // TODO enum StreamPeerSource)
2266{ 2270{
@@ -2274,7 +2278,8 @@ clients_notify_stream_peer (uint64_t num_peers,
2274 NULL != cli_ctx_iter; 2278 NULL != cli_ctx_iter;
2275 cli_ctx_iter = cli_ctx_iter->next) 2279 cli_ctx_iter = cli_ctx_iter->next)
2276 { 2280 {
2277 if (GNUNET_YES == cli_ctx_iter->stream_update) 2281 if (GNUNET_YES == cli_ctx_iter->stream_update &&
2282 (sub == cli_ctx_iter->sub || sub == msub))
2278 { 2283 {
2279 send_stream_peers (cli_ctx_iter, num_peers, peers); 2284 send_stream_peers (cli_ctx_iter, num_peers, peers);
2280 } 2285 }
@@ -2287,7 +2292,7 @@ clients_notify_stream_peer (uint64_t num_peers,
2287 * 2292 *
2288 * @param ids Array of Peers to insert into view 2293 * @param ids Array of Peers to insert into view
2289 * @param num_peers Number of peers to insert 2294 * @param num_peers Number of peers to insert
2290 * @param cls Closure - The SubSampler for which this is to be done 2295 * @param cls Closure - The Sub for which this is to be done
2291 */ 2296 */
2292static void 2297static void
2293hist_update (const struct GNUNET_PeerIdentity *ids, 2298hist_update (const struct GNUNET_PeerIdentity *ids,
@@ -2295,21 +2300,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2295 void *cls) 2300 void *cls)
2296{ 2301{
2297 unsigned int i; 2302 unsigned int i;
2298 struct SubSampler *ss = cls; 2303 struct Sub *sub = cls;
2299 2304
2300 for (i = 0; i < num_peers; i++) 2305 for (i = 0; i < num_peers; i++)
2301 { 2306 {
2302 int inserted; 2307 int inserted;
2303 inserted = insert_in_view (ss, &ids[i]); 2308 inserted = insert_in_view (sub, &ids[i]);
2304 if (GNUNET_OK == inserted) 2309 if (GNUNET_OK == inserted)
2305 { 2310 {
2306 clients_notify_stream_peer (1, &ids[i]); 2311 clients_notify_stream_peer (sub, 1, &ids[i]);
2307 } 2312 }
2308 to_file (ss->file_name_view_log, 2313 to_file (sub->file_name_view_log,
2309 "+%s\t(hist)", 2314 "+%s\t(hist)",
2310 GNUNET_i2s_full (ids)); 2315 GNUNET_i2s_full (ids));
2311 } 2316 }
2312 clients_notify_view_update (ss); 2317 clients_notify_view_update (sub);
2313} 2318}
2314 2319
2315 2320
@@ -2433,16 +2438,16 @@ send_pull_reply (struct PeerContext *peer_ctx,
2433 * 2438 *
2434 * Called once we know a peer is online. 2439 * Called once we know a peer is online.
2435 * 2440 *
2436 * @param cls Closure - SubSampler with the pull map to insert into 2441 * @param cls Closure - Sub with the pull map to insert into
2437 * @param peer Peer to insert 2442 * @param peer Peer to insert
2438 */ 2443 */
2439static void 2444static void
2440insert_in_pull_map (void *cls, 2445insert_in_pull_map (void *cls,
2441 const struct GNUNET_PeerIdentity *peer) 2446 const struct GNUNET_PeerIdentity *peer)
2442{ 2447{
2443 struct SubSampler *ss = cls; 2448 struct Sub *sub = cls;
2444 2449
2445 CustomPeerMap_put (ss->pull_map, peer); 2450 CustomPeerMap_put (sub->pull_map, peer);
2446} 2451}
2447 2452
2448 2453
@@ -2452,20 +2457,20 @@ insert_in_pull_map (void *cls,
2452 * Called once we know a peer is online. 2457 * Called once we know a peer is online.
2453 * Implements #PeerOp 2458 * Implements #PeerOp
2454 * 2459 *
2455 * @param cls Closure - SubSampler with view to insert peer into 2460 * @param cls Closure - Sub with view to insert peer into
2456 * @param peer the peer to insert 2461 * @param peer the peer to insert
2457 */ 2462 */
2458static void 2463static void
2459insert_in_view_op (void *cls, 2464insert_in_view_op (void *cls,
2460 const struct GNUNET_PeerIdentity *peer) 2465 const struct GNUNET_PeerIdentity *peer)
2461{ 2466{
2462 struct SubSampler *ss = cls; 2467 struct Sub *sub = cls;
2463 int inserted; 2468 int inserted;
2464 2469
2465 inserted = insert_in_view (ss, peer); 2470 inserted = insert_in_view (sub, peer);
2466 if (GNUNET_OK == inserted) 2471 if (GNUNET_OK == inserted)
2467 { 2472 {
2468 clients_notify_stream_peer (1, peer); 2473 clients_notify_stream_peer (sub, 1, peer);
2469 } 2474 }
2470} 2475}
2471 2476
@@ -2474,41 +2479,41 @@ insert_in_view_op (void *cls,
2474 * Update sampler with given PeerID. 2479 * Update sampler with given PeerID.
2475 * Implements #PeerOp 2480 * Implements #PeerOp
2476 * 2481 *
2477 * @param cls Closure - SubSampler containing the sampler to insert into 2482 * @param cls Closure - Sub containing the sampler to insert into
2478 * @param peer Peer to insert 2483 * @param peer Peer to insert
2479 */ 2484 */
2480static void 2485static void
2481insert_in_sampler (void *cls, 2486insert_in_sampler (void *cls,
2482 const struct GNUNET_PeerIdentity *peer) 2487 const struct GNUNET_PeerIdentity *peer)
2483{ 2488{
2484 struct SubSampler *ss = cls; 2489 struct Sub *sub = cls;
2485 2490
2486 LOG (GNUNET_ERROR_TYPE_DEBUG, 2491 LOG (GNUNET_ERROR_TYPE_DEBUG,
2487 "Updating samplers with peer %s from insert_in_sampler()\n", 2492 "Updating samplers with peer %s from insert_in_sampler()\n",
2488 GNUNET_i2s (peer)); 2493 GNUNET_i2s (peer));
2489 RPS_sampler_update (ss->sampler, peer); 2494 RPS_sampler_update (sub->sampler, peer);
2490 if (0 < RPS_sampler_count_id (ss->sampler, peer)) 2495 if (0 < RPS_sampler_count_id (sub->sampler, peer))
2491 { 2496 {
2492 /* Make sure we 'know' about this peer */ 2497 /* Make sure we 'know' about this peer */
2493 (void) issue_peer_online_check (ss, peer); 2498 (void) issue_peer_online_check (sub, peer);
2494 /* Establish a channel towards that peer to indicate we are going to send 2499 /* Establish a channel towards that peer to indicate we are going to send
2495 * messages to it */ 2500 * messages to it */
2496 //indicate_sending_intention (peer); 2501 //indicate_sending_intention (peer);
2497 } 2502 }
2498 #ifdef TO_FILE 2503 #ifdef TO_FILE
2499 ss->num_observed_peers++; 2504 sub->num_observed_peers++;
2500 GNUNET_CONTAINER_multipeermap_put 2505 GNUNET_CONTAINER_multipeermap_put
2501 (ss->observed_unique_peers, 2506 (sub->observed_unique_peers,
2502 peer, 2507 peer,
2503 NULL, 2508 NULL,
2504 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2509 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2505 uint32_t num_observed_unique_peers = 2510 uint32_t num_observed_unique_peers =
2506 GNUNET_CONTAINER_multipeermap_size (ss->observed_unique_peers); 2511 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2507 to_file (ss->file_name_observed_log, 2512 to_file (sub->file_name_observed_log,
2508 "%" PRIu32 " %" PRIu32 " %f\n", 2513 "%" PRIu32 " %" PRIu32 " %f\n",
2509 ss->num_observed_peers, 2514 sub->num_observed_peers,
2510 num_observed_unique_peers, 2515 num_observed_unique_peers,
2511 1.0*num_observed_unique_peers/ss->num_observed_peers) 2516 1.0*num_observed_unique_peers/sub->num_observed_peers)
2512 #endif /* TO_FILE */ 2517 #endif /* TO_FILE */
2513} 2518}
2514 2519
@@ -2520,20 +2525,20 @@ insert_in_sampler (void *cls,
2520 * 2525 *
2521 * "External sources" refer to every source except the gossip. 2526 * "External sources" refer to every source except the gossip.
2522 * 2527 *
2523 * @param ss SubSampler for which @a peer was received 2528 * @param sub Sub for which @a peer was received
2524 * @param peer peer to insert/peer received 2529 * @param peer peer to insert/peer received
2525 */ 2530 */
2526static void 2531static void
2527got_peer (struct SubSampler *ss, 2532got_peer (struct Sub *sub,
2528 const struct GNUNET_PeerIdentity *peer) 2533 const struct GNUNET_PeerIdentity *peer)
2529{ 2534{
2530 /* If we did not know this peer already, insert it into sampler and view */ 2535 /* If we did not know this peer already, insert it into sampler and view */
2531 if (GNUNET_YES == issue_peer_online_check (ss, peer)) 2536 if (GNUNET_YES == issue_peer_online_check (sub, peer))
2532 { 2537 {
2533 schedule_operation (get_peer_ctx (ss->peer_map, peer), 2538 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2534 &insert_in_sampler, ss); 2539 &insert_in_sampler, sub);
2535 schedule_operation (get_peer_ctx (ss->peer_map, peer), 2540 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2536 &insert_in_view_op, ss); 2541 &insert_in_view_op, sub);
2537 } 2542 }
2538 GNUNET_STATISTICS_update (stats, 2543 GNUNET_STATISTICS_update (stats,
2539 "# learnd peers", 2544 "# learnd peers",
@@ -2553,22 +2558,22 @@ static int
2553check_sending_channel_needed (const struct PeerContext *peer_ctx) 2558check_sending_channel_needed (const struct PeerContext *peer_ctx)
2554{ 2559{
2555 /* struct GNUNET_CADET_Channel *channel; */ 2560 /* struct GNUNET_CADET_Channel *channel; */
2556 if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map, 2561 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2557 &peer_ctx->peer_id)) 2562 &peer_ctx->peer_id))
2558 { 2563 {
2559 return GNUNET_NO; 2564 return GNUNET_NO;
2560 } 2565 }
2561 if (GNUNET_YES == check_sending_channel_exists (peer_ctx)) 2566 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2562 { 2567 {
2563 if ( (0 < RPS_sampler_count_id (peer_ctx->ss->sampler, 2568 if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2564 &peer_ctx->peer_id)) || 2569 &peer_ctx->peer_id)) ||
2565 (GNUNET_YES == View_contains_peer (peer_ctx->ss->view, 2570 (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2566 &peer_ctx->peer_id)) || 2571 &peer_ctx->peer_id)) ||
2567 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->push_map, 2572 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2568 &peer_ctx->peer_id)) || 2573 &peer_ctx->peer_id)) ||
2569 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->pull_map, 2574 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2570 &peer_ctx->peer_id)) || 2575 &peer_ctx->peer_id)) ||
2571 (GNUNET_YES == check_peer_flag (peer_ctx->ss->peer_map, 2576 (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2572 &peer_ctx->peer_id, 2577 &peer_ctx->peer_id,
2573 Peers_PULL_REPLY_PENDING))) 2578 Peers_PULL_REPLY_PENDING)))
2574 { /* If we want to keep the connection to peer open */ 2579 { /* If we want to keep the connection to peer open */
@@ -2584,18 +2589,18 @@ check_sending_channel_needed (const struct PeerContext *peer_ctx)
2584 * @brief remove peer from our knowledge, the view, push and pull maps and 2589 * @brief remove peer from our knowledge, the view, push and pull maps and
2585 * samplers. 2590 * samplers.
2586 * 2591 *
2587 * @param ss SubSampler with the data structures the peer is to be removed from 2592 * @param sub Sub with the data structures the peer is to be removed from
2588 * @param peer the peer to remove 2593 * @param peer the peer to remove
2589 */ 2594 */
2590static void 2595static void
2591remove_peer (struct SubSampler *ss, 2596remove_peer (struct Sub *sub,
2592 const struct GNUNET_PeerIdentity *peer) 2597 const struct GNUNET_PeerIdentity *peer)
2593{ 2598{
2594 (void) View_remove_peer (ss->view, peer); 2599 (void) View_remove_peer (sub->view, peer);
2595 CustomPeerMap_remove_peer (ss->pull_map, peer); 2600 CustomPeerMap_remove_peer (sub->pull_map, peer);
2596 CustomPeerMap_remove_peer (ss->push_map, peer); 2601 CustomPeerMap_remove_peer (sub->push_map, peer);
2597 RPS_sampler_reinitialise_by_value (ss->sampler, peer); 2602 RPS_sampler_reinitialise_by_value (sub->sampler, peer);
2598 destroy_peer (get_peer_ctx (ss->peer_map, peer)); 2603 destroy_peer (get_peer_ctx (sub->peer_map, peer));
2599} 2604}
2600 2605
2601 2606
@@ -2604,14 +2609,14 @@ remove_peer (struct SubSampler *ss,
2604 * 2609 *
2605 * If the sending channel is no longer needed it is destroyed. 2610 * If the sending channel is no longer needed it is destroyed.
2606 * 2611 *
2607 * @param ss SubSamper in which the current peer is to be cleaned 2612 * @param sub Sub in which the current peer is to be cleaned
2608 * @param peer the peer whose data is about to be cleaned 2613 * @param peer the peer whose data is about to be cleaned
2609 */ 2614 */
2610static void 2615static void
2611clean_peer (struct SubSampler *ss, 2616clean_peer (struct Sub *sub,
2612 const struct GNUNET_PeerIdentity *peer) 2617 const struct GNUNET_PeerIdentity *peer)
2613{ 2618{
2614 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (ss->peer_map, 2619 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2615 peer))) 2620 peer)))
2616 { 2621 {
2617 LOG (GNUNET_ERROR_TYPE_DEBUG, 2622 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2619,24 +2624,24 @@ clean_peer (struct SubSampler *ss,
2619 GNUNET_i2s (peer)); 2624 GNUNET_i2s (peer));
2620 #ifdef ENABLE_MALICIOUS 2625 #ifdef ENABLE_MALICIOUS
2621 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 2626 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2622 (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer)); 2627 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2623 #else /* ENABLE_MALICIOUS */ 2628 #else /* ENABLE_MALICIOUS */
2624 (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer)); 2629 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2625 #endif /* ENABLE_MALICIOUS */ 2630 #endif /* ENABLE_MALICIOUS */
2626 } 2631 }
2627 2632
2628 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (ss->peer_map, 2633 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2629 peer))) && 2634 peer))) &&
2630 (GNUNET_NO == View_contains_peer (ss->view, peer)) && 2635 (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2631 (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) && 2636 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2632 (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) && 2637 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2633 (0 == RPS_sampler_count_id (ss->sampler, peer)) && 2638 (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2634 (GNUNET_NO != check_removable (get_peer_ctx (ss->peer_map, peer))) ) 2639 (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
2635 { /* We can safely remove this peer */ 2640 { /* We can safely remove this peer */
2636 LOG (GNUNET_ERROR_TYPE_DEBUG, 2641 LOG (GNUNET_ERROR_TYPE_DEBUG,
2637 "Going to remove peer %s\n", 2642 "Going to remove peer %s\n",
2638 GNUNET_i2s (peer)); 2643 GNUNET_i2s (peer));
2639 remove_peer (ss, peer); 2644 remove_peer (sub, peer);
2640 return; 2645 return;
2641 } 2646 }
2642} 2647}
@@ -2666,7 +2671,7 @@ cleanup_destroyed_channel (void *cls,
2666 if (NULL != peer_ctx && 2671 if (NULL != peer_ctx &&
2667 peer_ctx->send_channel_ctx == channel_ctx) 2672 peer_ctx->send_channel_ctx == channel_ctx)
2668 { 2673 {
2669 remove_peer (peer_ctx->ss, &peer_ctx->peer_id); 2674 remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2670 } 2675 }
2671} 2676}
2672 2677
@@ -2677,35 +2682,30 @@ cleanup_destroyed_channel (void *cls,
2677 2682
2678 2683
2679/*********************************************************************** 2684/***********************************************************************
2680 * SubSampler 2685 * Sub
2681***********************************************************************/ 2686***********************************************************************/
2682 2687
2683/** 2688/**
2684 * @brief Create a new SUbSampler 2689 * @brief Create a new Sub
2685 * 2690 *
2686 * @param shared_value Value shared among rps instances on other hosts that 2691 * @param hash Hash of value shared among rps instances on other hosts that
2687 * defines a subgroup to sample from. 2692 * defines a subgroup to sample from.
2688 * @param sampler_size Size of the sampler 2693 * @param sampler_size Size of the sampler
2689 * @param round_interval Interval (in average) between two rounds 2694 * @param round_interval Interval (in average) between two rounds
2690 * 2695 *
2691 * @return SubSampler 2696 * @return Sub
2692 */ 2697 */
2693struct SubSampler * 2698struct Sub *
2694new_subsampler (const char *shared_value, 2699new_sub (const struct GNUNET_HashCode *hash,
2695 uint32_t sampler_size, 2700 uint32_t sampler_size,
2696 struct GNUNET_TIME_Relative round_interval) 2701 struct GNUNET_TIME_Relative round_interval)
2697{ 2702{
2698 struct SubSampler *ss; 2703 struct Sub *sub;
2699 char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS;
2700 2704
2701 ss = GNUNET_new (struct SubSampler); 2705 sub = GNUNET_new (struct Sub);
2702 2706
2703 /* With the hash generated from the secret value this service only connects 2707 /* With the hash generated from the secret value this service only connects
2704 * to rps instances that share the value */ 2708 * to rps instances that share the value */
2705 strcat (hash_port_string, shared_value);
2706 GNUNET_CRYPTO_hash (hash_port_string,
2707 strlen (hash_port_string),
2708 &ss->port);
2709 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 2709 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2710 GNUNET_MQ_hd_fixed_size (peer_check, 2710 GNUNET_MQ_hd_fixed_size (peer_check,
2711 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, 2711 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
@@ -2725,17 +2725,16 @@ new_subsampler (const char *shared_value,
2725 NULL), 2725 NULL),
2726 GNUNET_MQ_handler_end () 2726 GNUNET_MQ_handler_end ()
2727 }; 2727 };
2728 ss->cadet_handle = GNUNET_CADET_connect (cfg); 2728 sub->hash = *hash;
2729 GNUNET_assert (NULL != ss->cadet_handle); 2729 sub->cadet_port =
2730 ss->cadet_port = 2730 GNUNET_CADET_open_port (cadet_handle,
2731 GNUNET_CADET_open_port (ss->cadet_handle, 2731 &sub->hash,
2732 &ss->port,
2733 &handle_inbound_channel, /* Connect handler */ 2732 &handle_inbound_channel, /* Connect handler */
2734 ss, /* cls */ 2733 sub, /* cls */
2735 NULL, /* WindowSize handler */ 2734 NULL, /* WindowSize handler */
2736 &cleanup_destroyed_channel, /* Disconnect handler */ 2735 &cleanup_destroyed_channel, /* Disconnect handler */
2737 cadet_handlers); 2736 cadet_handlers);
2738 if (NULL == ss->cadet_port) 2737 if (NULL == sub->cadet_port)
2739 { 2738 {
2740 LOG (GNUNET_ERROR_TYPE_ERROR, 2739 LOG (GNUNET_ERROR_TYPE_ERROR,
2741 "Cadet port `%s' is already in use.\n", 2740 "Cadet port `%s' is already in use.\n",
@@ -2744,53 +2743,98 @@ new_subsampler (const char *shared_value,
2744 } 2743 }
2745 2744
2746 /* Set up general data structure to keep track about peers */ 2745 /* Set up general data structure to keep track about peers */
2747 ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2746 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2748 if (GNUNET_OK != 2747 if (GNUNET_OK !=
2749 GNUNET_CONFIGURATION_get_value_filename (cfg, 2748 GNUNET_CONFIGURATION_get_value_filename (cfg,
2750 "rps", 2749 "rps",
2751 "FILENAME_VALID_PEERS", 2750 "FILENAME_VALID_PEERS",
2752 &ss->filename_valid_peers)) 2751 &sub->filename_valid_peers))
2753 { 2752 {
2754 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 2753 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2755 "rps", 2754 "rps",
2756 "FILENAME_VALID_PEERS"); 2755 "FILENAME_VALID_PEERS");
2757 } 2756 }
2758 ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2757 sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2759 2758
2760 /* Set up the sampler */ 2759 /* Set up the sampler */
2761 ss->sampler_size_est_min = sampler_size; 2760 sub->sampler_size_est_min = sampler_size;
2762 ss->sampler_size_est_need = sampler_size;; 2761 sub->sampler_size_est_need = sampler_size;;
2763 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min); 2762 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2764 ss->round_interval = round_interval; 2763 GNUNET_assert (0 != round_interval.rel_value_us);
2765 ss->sampler = RPS_sampler_init (sampler_size, 2764 sub->round_interval = round_interval;
2765 sub->sampler = RPS_sampler_init (sampler_size,
2766 round_interval); 2766 round_interval);
2767 2767
2768 /* Logging of internals */ 2768 /* Logging of internals */
2769 ss->file_name_view_log = store_prefix_file_name (&own_identity, "view"); 2769 sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2770 #ifdef TO_FILE 2770 #ifdef TO_FILE
2771 ss->file_name_observed_log = store_prefix_file_name (&own_identity, 2771 sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2772 "observed"); 2772 "observed");
2773 ss->num_observed_peers = 0; 2773 sub->num_observed_peers = 0;
2774 ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, 2774 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2775 GNUNET_NO); 2775 GNUNET_NO);
2776 #endif /* TO_FILE */ 2776 #endif /* TO_FILE */
2777 2777
2778 /* Set up data structures for gossip */ 2778 /* Set up data structures for gossip */
2779 ss->push_map = CustomPeerMap_create (4); 2779 sub->push_map = CustomPeerMap_create (4);
2780 ss->pull_map = CustomPeerMap_create (4); 2780 sub->pull_map = CustomPeerMap_create (4);
2781 ss->view_size_est_min = sampler_size;; 2781 sub->view_size_est_min = sampler_size;;
2782 ss->view = View_create (ss->view_size_est_min); 2782 sub->view = View_create (sub->view_size_est_min);
2783 GNUNET_STATISTICS_set (stats, 2783 GNUNET_STATISTICS_set (stats,
2784 "view size aim", 2784 "view size aim",
2785 ss->view_size_est_min, 2785 sub->view_size_est_min,
2786 GNUNET_NO); 2786 GNUNET_NO);
2787 2787
2788 /* Start executing rounds */
2789 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2788 2790
2789 return ss; 2791 return sub;
2790} 2792}
2791 2793
2794
2795/**
2796 * @brief Destroy Sub.
2797 *
2798 * @param sub Sub to destroy
2799 */
2800static void
2801destroy_sub (struct Sub *sub)
2802{
2803 GNUNET_assert (NULL != sub);
2804 GNUNET_assert (NULL != sub->do_round_task);
2805 GNUNET_SCHEDULER_cancel (sub->do_round_task);
2806 sub->do_round_task = NULL;
2807
2808 /* Disconnect from cadet */
2809 GNUNET_CADET_close_port (sub->cadet_port);
2810
2811 /* Clean up data structures for peers */
2812 RPS_sampler_destroy (sub->sampler);
2813 sub->sampler = NULL;
2814 View_destroy (sub->view);
2815 sub->view = NULL;
2816 CustomPeerMap_destroy (sub->push_map);
2817 sub->push_map = NULL;
2818 CustomPeerMap_destroy (sub->pull_map);
2819 sub->pull_map = NULL;
2820 peers_terminate (sub);
2821
2822 /* Free leftover data structures */
2823 GNUNET_free (sub->file_name_view_log);
2824 sub->file_name_view_log = NULL;
2825#ifdef TO_FILE
2826 GNUNET_free (sub->file_name_observed_log);
2827 sub->file_name_observed_log = NULL;
2828 GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
2829 sub->observed_unique_peers = NULL;
2830#endif /* TO_FILE */
2831
2832 GNUNET_free (sub);
2833}
2834
2835
2792/*********************************************************************** 2836/***********************************************************************
2793 * /SubSampler 2837 * /Sub
2794***********************************************************************/ 2838***********************************************************************/
2795 2839
2796 2840
@@ -2806,58 +2850,88 @@ destroy_cli_ctx (struct ClientContext *cli_ctx)
2806 GNUNET_CONTAINER_DLL_remove (cli_ctx_head, 2850 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2807 cli_ctx_tail, 2851 cli_ctx_tail,
2808 cli_ctx); 2852 cli_ctx);
2853 if (NULL != cli_ctx->sub)
2854 {
2855 destroy_sub (cli_ctx->sub);
2856 cli_ctx->sub = NULL;
2857 }
2809 GNUNET_free (cli_ctx); 2858 GNUNET_free (cli_ctx);
2810} 2859}
2811 2860
2812 2861
2813/** 2862/**
2814 * Function called by NSE. 2863 * @brief Update sizes in sampler and view on estimate update from nse service
2815 * 2864 *
2816 * Updates sizes of sampler list and view and adapt those lists 2865 * @param sub Sub
2817 * accordingly.
2818 *
2819 * implements #GNUNET_NSE_Callback
2820 *
2821 * @param cls Closure - SubSampler
2822 * @param timestamp time when the estimate was received from the server (or created by the server)
2823 * @param logestimate the log(Base 2) value of the current network size estimate 2866 * @param logestimate the log(Base 2) value of the current network size estimate
2824 * @param std_dev standard deviation for the estimate 2867 * @param std_dev standard deviation for the estimate
2825 */ 2868 */
2826static void 2869static void
2827nse_callback (void *cls, 2870adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
2828 struct GNUNET_TIME_Absolute timestamp,
2829 double logestimate, double std_dev)
2830{ 2871{
2831 double estimate; 2872 double estimate;
2832 //double scale; // TODO this might go gloabal/config 2873 //double scale; // TODO this might go gloabal/config
2833 struct SubSampler *ss = cls;
2834 (void) timestamp;
2835 2874
2836 LOG (GNUNET_ERROR_TYPE_DEBUG, 2875 LOG (GNUNET_ERROR_TYPE_DEBUG,
2837 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", 2876 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2838 logestimate, std_dev, RPS_sampler_get_size (ss->sampler)); 2877 logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
2839 //scale = .01; 2878 //scale = .01;
2840 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 2879 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2841 // GNUNET_NSE_log_estimate_to_n (logestimate); 2880 // GNUNET_NSE_log_estimate_to_n (logestimate);
2842 estimate = pow (estimate, 1.0 / 3); 2881 estimate = pow (estimate, 1.0 / 3);
2843 // TODO add if std_dev is a number 2882 // TODO add if std_dev is a number
2844 // estimate += (std_dev * scale); 2883 // estimate += (std_dev * scale);
2845 if (ss->view_size_est_min < ceil (estimate)) 2884 if (sub->view_size_est_min < ceil (estimate))
2846 { 2885 {
2847 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 2886 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2848 ss->sampler_size_est_need = estimate; 2887 sub->sampler_size_est_need = estimate;
2849 ss->view_size_est_need = estimate; 2888 sub->view_size_est_need = estimate;
2850 } else 2889 } else
2851 { 2890 {
2852 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 2891 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2853 //ss->sampler_size_est_need = ss->view_size_est_min; 2892 //sub->sampler_size_est_need = sub->view_size_est_min;
2854 ss->view_size_est_need = ss->view_size_est_min; 2893 sub->view_size_est_need = sub->view_size_est_min;
2855 } 2894 }
2856 GNUNET_STATISTICS_set (stats, "view size aim", ss->view_size_est_need, GNUNET_NO); 2895 GNUNET_STATISTICS_set (stats, "view size aim", sub->view_size_est_need, GNUNET_NO);
2857 2896
2858 /* If the NSE has changed adapt the lists accordingly */ 2897 /* If the NSE has changed adapt the lists accordingly */
2859 resize_wrapper (ss->sampler, ss->sampler_size_est_need); 2898 resize_wrapper (sub->sampler, sub->sampler_size_est_need);
2860 View_change_len (ss->view, ss->view_size_est_need); 2899 View_change_len (sub->view, sub->view_size_est_need);
2900}
2901
2902
2903/**
2904 * Function called by NSE.
2905 *
2906 * Updates sizes of sampler list and view and adapt those lists
2907 * accordingly.
2908 *
2909 * implements #GNUNET_NSE_Callback
2910 *
2911 * @param cls Closure - unused
2912 * @param timestamp time when the estimate was received from the server (or created by the server)
2913 * @param logestimate the log(Base 2) value of the current network size estimate
2914 * @param std_dev standard deviation for the estimate
2915 */
2916static void
2917nse_callback (void *cls,
2918 struct GNUNET_TIME_Absolute timestamp,
2919 double logestimate, double std_dev)
2920{
2921 (void) cls;
2922 (void) timestamp;
2923 struct ClientContext *cli_ctx_iter;
2924
2925 adapt_sizes (msub, logestimate, std_dev);
2926 for (cli_ctx_iter = cli_ctx_head;
2927 NULL != cli_ctx_iter;
2928 cli_ctx_iter = cli_ctx_iter->next)
2929 {
2930 if (NULL != cli_ctx_iter->sub)
2931 {
2932 adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
2933 }
2934 }
2861} 2935}
2862 2936
2863 2937
@@ -2881,6 +2955,10 @@ check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2881 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || 2955 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2882 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) 2956 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2883 { 2957 {
2958 LOG (GNUNET_ERROR_TYPE_ERROR,
2959 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
2960 ntohl (msg->num_peers),
2961 (msize / sizeof (struct GNUNET_PeerIdentity)));
2884 GNUNET_break (0); 2962 GNUNET_break (0);
2885 GNUNET_SERVICE_client_drop (cli_ctx->client); 2963 GNUNET_SERVICE_client_drop (cli_ctx->client);
2886 return GNUNET_SYSERR; 2964 return GNUNET_SYSERR;
@@ -2918,7 +2996,8 @@ handle_client_seed (void *cls,
2918 i, 2996 i,
2919 GNUNET_i2s (&peers[i])); 2997 GNUNET_i2s (&peers[i]));
2920 2998
2921 got_peer (cli_ctx->ss, &peers[i]); 2999 if (NULL != msub) got_peer (msub, &peers[i]);
3000 if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
2922 } 3001 }
2923 GNUNET_SERVICE_client_continue (cli_ctx->client); 3002 GNUNET_SERVICE_client_continue (cli_ctx->client);
2924} 3003}
@@ -3013,15 +3092,63 @@ handle_client_stream_cancel (void *cls,
3013 (void) msg; 3092 (void) msg;
3014 3093
3015 LOG (GNUNET_ERROR_TYPE_DEBUG, 3094 LOG (GNUNET_ERROR_TYPE_DEBUG,
3016 "Client requested peers from biased stream.\n"); 3095 "Client canceled receiving peers from biased stream.\n");
3017 cli_ctx->stream_update = GNUNET_NO; 3096 cli_ctx->stream_update = GNUNET_NO;
3018 3097
3019 GNUNET_assert (NULL != cli_ctx); 3098 GNUNET_assert (NULL != cli_ctx);
3020 GNUNET_SERVICE_client_continue (cli_ctx->client); 3099 GNUNET_SERVICE_client_continue (cli_ctx->client);
3021 if (0 == cli_ctx->view_updates_left) 3100}
3101
3102
3103/**
3104 * @brief Create and start a Sub.
3105 *
3106 * @param cls Closure - unused
3107 * @param msg Message containing the necessary information
3108 */
3109static void
3110handle_client_start_sub (void *cls,
3111 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3112{
3113 struct ClientContext *cli_ctx = cls;
3114
3115 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3116 if (NULL != cli_ctx->sub &&
3117 0 != memcmp (&cli_ctx->sub->hash,
3118 &msg->hash,
3119 sizeof (struct GNUNET_HashCode)))
3022 { 3120 {
3023 destroy_cli_ctx (cli_ctx); 3121 LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3122 destroy_sub (cli_ctx->sub);
3123 cli_ctx->sub = NULL;
3024 } 3124 }
3125 cli_ctx->sub = new_sub (&msg->hash,
3126 msub->sampler_size_est_min, // TODO make api input?
3127 GNUNET_TIME_relative_ntoh (msg->round_interval));
3128 GNUNET_SERVICE_client_continue (cli_ctx->client);
3129}
3130
3131
3132/**
3133 * @brief Destroy the Sub
3134 *
3135 * @param cls Closure - unused
3136 * @param msg Message containing the hash that identifies the Sub
3137 */
3138static void
3139handle_client_stop_sub (void *cls,
3140 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3141{
3142 struct ClientContext *cli_ctx = cls;
3143
3144 GNUNET_assert (NULL != cli_ctx->sub);
3145 if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3146 {
3147 LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3148 }
3149 destroy_sub (cli_ctx->sub);
3150 cli_ctx->sub = NULL;
3151 GNUNET_SERVICE_client_continue (cli_ctx->client);
3025} 3152}
3026 3153
3027 3154
@@ -3109,9 +3236,9 @@ handle_peer_push (void *cls,
3109 #endif /* ENABLE_MALICIOUS */ 3236 #endif /* ENABLE_MALICIOUS */
3110 3237
3111 /* Add the sending peer to the push_map */ 3238 /* Add the sending peer to the push_map */
3112 CustomPeerMap_put (channel_ctx->peer_ctx->ss->push_map, peer); 3239 CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3113 3240
3114 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, 3241 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3115 &channel_ctx->peer_ctx->peer_id)); 3242 &channel_ctx->peer_ctx->peer_id));
3116 GNUNET_CADET_receive_done (channel_ctx->channel); 3243 GNUNET_CADET_receive_done (channel_ctx->channel);
3117} 3244}
@@ -3154,13 +3281,13 @@ handle_peer_pull_request (void *cls,
3154 } 3281 }
3155 #endif /* ENABLE_MALICIOUS */ 3282 #endif /* ENABLE_MALICIOUS */
3156 3283
3157 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, 3284 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3158 &channel_ctx->peer_ctx->peer_id)); 3285 &channel_ctx->peer_ctx->peer_id));
3159 GNUNET_CADET_receive_done (channel_ctx->channel); 3286 GNUNET_CADET_receive_done (channel_ctx->channel);
3160 view_array = View_get_as_array (channel_ctx->peer_ctx->ss->view); 3287 view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3161 send_pull_reply (peer_ctx, 3288 send_pull_reply (peer_ctx,
3162 view_array, 3289 view_array,
3163 View_size (channel_ctx->peer_ctx->ss->view)); 3290 View_size (channel_ctx->peer_ctx->sub->view));
3164} 3291}
3165 3292
3166 3293
@@ -3196,7 +3323,7 @@ check_peer_pull_reply (void *cls,
3196 return GNUNET_SYSERR; 3323 return GNUNET_SYSERR;
3197 } 3324 }
3198 3325
3199 if (GNUNET_YES != check_peer_flag (sender_ctx->ss->peer_map, 3326 if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3200 &sender_ctx->peer_id, 3327 &sender_ctx->peer_id,
3201 Peers_PULL_REPLY_PENDING)) 3328 Peers_PULL_REPLY_PENDING))
3202 { 3329 {
@@ -3277,27 +3404,27 @@ handle_peer_pull_reply (void *cls,
3277 } 3404 }
3278 #endif /* ENABLE_MALICIOUS */ 3405 #endif /* ENABLE_MALICIOUS */
3279 /* Make sure we 'know' about this peer */ 3406 /* Make sure we 'know' about this peer */
3280 (void) insert_peer (channel_ctx->peer_ctx->ss, &peers[i]); 3407 (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]);
3281 3408
3282 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->ss->valid_peers, 3409 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3283 &peers[i])) 3410 &peers[i]))
3284 { 3411 {
3285 CustomPeerMap_put (channel_ctx->peer_ctx->ss->pull_map, &peers[i]); 3412 CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]);
3286 } 3413 }
3287 else 3414 else
3288 { 3415 {
3289 schedule_operation (channel_ctx->peer_ctx, 3416 schedule_operation (channel_ctx->peer_ctx,
3290 insert_in_pull_map, 3417 insert_in_pull_map,
3291 channel_ctx->peer_ctx->ss); /* cls */ 3418 channel_ctx->peer_ctx->sub); /* cls */
3292 (void) issue_peer_online_check (channel_ctx->peer_ctx->ss, &peers[i]); 3419 (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]);
3293 } 3420 }
3294 } 3421 }
3295 3422
3296 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->ss->peer_map, sender), 3423 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender),
3297 Peers_PULL_REPLY_PENDING); 3424 Peers_PULL_REPLY_PENDING);
3298 clean_peer (channel_ctx->peer_ctx->ss, sender); 3425 clean_peer (channel_ctx->peer_ctx->sub, sender);
3299 3426
3300 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map, 3427 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3301 sender)); 3428 sender));
3302 GNUNET_CADET_receive_done (channel_ctx->channel); 3429 GNUNET_CADET_receive_done (channel_ctx->channel);
3303} 3430}
@@ -3362,7 +3489,7 @@ send_pull_request (struct PeerContext *peer_ctx)
3362{ 3489{
3363 struct GNUNET_MQ_Envelope *ev; 3490 struct GNUNET_MQ_Envelope *ev;
3364 3491
3365 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->ss->peer_map, 3492 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3366 &peer_ctx->peer_id, 3493 &peer_ctx->peer_id,
3367 Peers_PULL_REPLY_PENDING)); 3494 Peers_PULL_REPLY_PENDING));
3368 SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING); 3495 SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
@@ -3397,12 +3524,6 @@ send_push (struct PeerContext *peer_ctx)
3397} 3524}
3398 3525
3399 3526
3400static void
3401do_round (void *cls);
3402
3403static void
3404do_mal_round (void *cls);
3405
3406#ifdef ENABLE_MALICIOUS 3527#ifdef ENABLE_MALICIOUS
3407 3528
3408 3529
@@ -3452,7 +3573,7 @@ handle_client_act_malicious (void *cls,
3452 struct GNUNET_PeerIdentity *peers; 3573 struct GNUNET_PeerIdentity *peers;
3453 uint32_t num_mal_peers_sent; 3574 uint32_t num_mal_peers_sent;
3454 uint32_t num_mal_peers_old; 3575 uint32_t num_mal_peers_old;
3455 struct SubSampler *ss = cli_ctx->ss; 3576 struct Sub *sub = cli_ctx->sub;
3456 3577
3457 /* Do actual logic */ 3578 /* Do actual logic */
3458 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 3579 peers = (struct GNUNET_PeerIdentity *) &msg[1];
@@ -3484,8 +3605,15 @@ handle_client_act_malicious (void *cls,
3484 mal_peer_set); 3605 mal_peer_set);
3485 3606
3486 /* Substitute do_round () with do_mal_round () */ 3607 /* Substitute do_round () with do_mal_round () */
3487 GNUNET_SCHEDULER_cancel (ss->do_round_task); 3608 if (NULL != sub)
3488 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss); 3609 {
3610 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3611 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3612 }
3613 else
3614 {
3615 LOG (GNUNET_ERROR_TYPE_WARNING, "do_round_task is NULL, probably in shutdown\n");
3616 }
3489 } 3617 }
3490 3618
3491 else if ( (2 == mal_type) || 3619 else if ( (2 == mal_type) ||
@@ -3517,9 +3645,10 @@ handle_client_act_malicious (void *cls,
3517 &msg->attacked_peer, 3645 &msg->attacked_peer,
3518 sizeof (struct GNUNET_PeerIdentity)); 3646 sizeof (struct GNUNET_PeerIdentity));
3519 /* Set the flag of the attacked peer to valid to avoid problems */ 3647 /* Set the flag of the attacked peer to valid to avoid problems */
3520 if (GNUNET_NO == check_peer_known (ss->peer_map, &attacked_peer)) 3648 if (NULL != sub &&
3649 GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3521 { 3650 {
3522 (void) issue_peer_online_check (ss, &attacked_peer); 3651 (void) issue_peer_online_check (sub, &attacked_peer);
3523 } 3652 }
3524 3653
3525 LOG (GNUNET_ERROR_TYPE_DEBUG, 3654 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3527,16 +3656,20 @@ handle_client_act_malicious (void *cls,
3527 GNUNET_i2s (&attacked_peer)); 3656 GNUNET_i2s (&attacked_peer));
3528 3657
3529 /* Substitute do_round () with do_mal_round () */ 3658 /* Substitute do_round () with do_mal_round () */
3530 GNUNET_SCHEDULER_cancel (ss->do_round_task); 3659 if (NULL != sub && NULL != sub->do_round_task)
3531 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss); 3660 {
3661 /* Probably in shutdown */
3662 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3663 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3664 }
3532 } 3665 }
3533 else if (0 == mal_type) 3666 else if (0 == mal_type)
3534 { /* Stop acting malicious */ 3667 { /* Stop acting malicious */
3535 GNUNET_array_grow (mal_peers, num_mal_peers, 0); 3668 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3536 3669
3537 /* Substitute do_mal_round () with do_round () */ 3670 /* Substitute do_mal_round () with do_round () */
3538 GNUNET_SCHEDULER_cancel (ss->do_round_task); 3671 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3539 ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, ss); 3672 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
3540 } 3673 }
3541 else 3674 else
3542 { 3675 {
@@ -3552,7 +3685,7 @@ handle_client_act_malicious (void *cls,
3552 * 3685 *
3553 * This is executed regylary. 3686 * This is executed regylary.
3554 * 3687 *
3555 * @param cls Closure - SubSamper 3688 * @param cls Closure - Sub
3556 */ 3689 */
3557static void 3690static void
3558do_mal_round (void *cls) 3691do_mal_round (void *cls)
@@ -3561,12 +3694,12 @@ do_mal_round (void *cls)
3561 uint32_t i; 3694 uint32_t i;
3562 struct GNUNET_TIME_Relative time_next_round; 3695 struct GNUNET_TIME_Relative time_next_round;
3563 struct AttackedPeer *tmp_att_peer; 3696 struct AttackedPeer *tmp_att_peer;
3564 struct SubSampler *ss = cls; 3697 struct Sub *sub = cls;
3565 3698
3566 LOG (GNUNET_ERROR_TYPE_DEBUG, 3699 LOG (GNUNET_ERROR_TYPE_DEBUG,
3567 "Going to execute next round maliciously type %" PRIu32 ".\n", 3700 "Going to execute next round maliciously type %" PRIu32 ".\n",
3568 mal_type); 3701 mal_type);
3569 ss->do_round_task = NULL; 3702 sub->do_round_task = NULL;
3570 GNUNET_assert (mal_type <= 3); 3703 GNUNET_assert (mal_type <= 3);
3571 /* Do malicious actions */ 3704 /* Do malicious actions */
3572 if (1 == mal_type) 3705 if (1 == mal_type)
@@ -3589,7 +3722,7 @@ do_mal_round (void *cls)
3589 else 3722 else
3590 att_peer_index = att_peer_index->next; 3723 att_peer_index = att_peer_index->next;
3591 3724
3592 send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id)); 3725 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3593 } 3726 }
3594 3727
3595 /* Send PULLs to some peers to learn about additional peers to attack */ 3728 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3601,7 +3734,7 @@ do_mal_round (void *cls)
3601 else 3734 else
3602 att_peer_index = tmp_att_peer->next; 3735 att_peer_index = tmp_att_peer->next;
3603 3736
3604 send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id)); 3737 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3605 } 3738 }
3606 } 3739 }
3607 3740
@@ -3612,11 +3745,11 @@ do_mal_round (void *cls)
3612 * Send as many pushes to the attacked peer as possible 3745 * Send as many pushes to the attacked peer as possible
3613 * That is one push per round as it will ignore more. 3746 * That is one push per round as it will ignore more.
3614 */ 3747 */
3615 (void) issue_peer_online_check (ss, &attacked_peer); 3748 (void) issue_peer_online_check (sub, &attacked_peer);
3616 if (GNUNET_YES == check_peer_flag (ss->peer_map, 3749 if (GNUNET_YES == check_peer_flag (sub->peer_map,
3617 &attacked_peer, 3750 &attacked_peer,
3618 Peers_ONLINE)) 3751 Peers_ONLINE))
3619 send_push (get_peer_ctx (ss->peer_map, &attacked_peer)); 3752 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3620 } 3753 }
3621 3754
3622 3755
@@ -3624,20 +3757,20 @@ do_mal_round (void *cls)
3624 { /* Combined attack */ 3757 { /* Combined attack */
3625 3758
3626 /* Send PUSH to attacked peers */ 3759 /* Send PUSH to attacked peers */
3627 if (GNUNET_YES == check_peer_known (ss->peer_map, &attacked_peer)) 3760 if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
3628 { 3761 {
3629 (void) issue_peer_online_check (ss, &attacked_peer); 3762 (void) issue_peer_online_check (sub, &attacked_peer);
3630 if (GNUNET_YES == check_peer_flag (ss->peer_map, 3763 if (GNUNET_YES == check_peer_flag (sub->peer_map,
3631 &attacked_peer, 3764 &attacked_peer,
3632 Peers_ONLINE)) 3765 Peers_ONLINE))
3633 { 3766 {
3634 LOG (GNUNET_ERROR_TYPE_DEBUG, 3767 LOG (GNUNET_ERROR_TYPE_DEBUG,
3635 "Goding to send push to attacked peer (%s)\n", 3768 "Goding to send push to attacked peer (%s)\n",
3636 GNUNET_i2s (&attacked_peer)); 3769 GNUNET_i2s (&attacked_peer));
3637 send_push (get_peer_ctx (ss->peer_map, &attacked_peer)); 3770 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3638 } 3771 }
3639 } 3772 }
3640 (void) issue_peer_online_check (ss, &attacked_peer); 3773 (void) issue_peer_online_check (sub, &attacked_peer);
3641 3774
3642 /* The maximum of pushes we're going to send this round */ 3775 /* The maximum of pushes we're going to send this round */
3643 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, 3776 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
@@ -3655,7 +3788,7 @@ do_mal_round (void *cls)
3655 else 3788 else
3656 att_peer_index = att_peer_index->next; 3789 att_peer_index = att_peer_index->next;
3657 3790
3658 send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id)); 3791 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3659 } 3792 }
3660 3793
3661 /* Send PULLs to some peers to learn about additional peers to attack */ 3794 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3667,18 +3800,16 @@ do_mal_round (void *cls)
3667 else 3800 else
3668 att_peer_index = tmp_att_peer->next; 3801 att_peer_index = tmp_att_peer->next;
3669 3802
3670 send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id)); 3803 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3671 } 3804 }
3672 } 3805 }
3673 3806
3674 /* Schedule next round */ 3807 /* Schedule next round */
3675 time_next_round = compute_rand_delay (ss->round_interval, 2); 3808 time_next_round = compute_rand_delay (sub->round_interval, 2);
3676 3809
3677 //ss->do_round_task = GNUNET_SCHEDULER_add_delayed (ss->round_interval, &do_mal_round, 3810 GNUNET_assert (NULL == sub->do_round_task);
3678 //NULL); 3811 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3679 GNUNET_assert (NULL == ss->do_round_task); 3812 &do_mal_round, sub);
3680 ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3681 &do_mal_round, ss);
3682 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 3813 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3683} 3814}
3684#endif /* ENABLE_MALICIOUS */ 3815#endif /* ENABLE_MALICIOUS */
@@ -3688,7 +3819,7 @@ do_mal_round (void *cls)
3688 * 3819 *
3689 * This is executed regylary. 3820 * This is executed regylary.
3690 * 3821 *
3691 * @param cls Closure - SubSampler 3822 * @param cls Closure - Sub
3692 */ 3823 */
3693static void 3824static void
3694do_round (void *cls) 3825do_round (void *cls)
@@ -3702,66 +3833,66 @@ do_round (void *cls)
3702 uint32_t second_border; 3833 uint32_t second_border;
3703 struct GNUNET_PeerIdentity peer; 3834 struct GNUNET_PeerIdentity peer;
3704 struct GNUNET_PeerIdentity *update_peer; 3835 struct GNUNET_PeerIdentity *update_peer;
3705 struct SubSampler *ss = cls; 3836 struct Sub *sub = cls;
3706 3837
3707 LOG (GNUNET_ERROR_TYPE_DEBUG, 3838 LOG (GNUNET_ERROR_TYPE_DEBUG,
3708 "Going to execute next round.\n"); 3839 "Going to execute next round.\n");
3709 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); 3840 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3710 ss->do_round_task = NULL; 3841 sub->do_round_task = NULL;
3711 LOG (GNUNET_ERROR_TYPE_DEBUG, 3842 LOG (GNUNET_ERROR_TYPE_DEBUG,
3712 "Printing view:\n"); 3843 "Printing view:\n");
3713 to_file (ss->file_name_view_log, 3844 to_file (sub->file_name_view_log,
3714 "___ new round ___"); 3845 "___ new round ___");
3715 view_array = View_get_as_array (ss->view); 3846 view_array = View_get_as_array (sub->view);
3716 for (i = 0; i < View_size (ss->view); i++) 3847 for (i = 0; i < View_size (sub->view); i++)
3717 { 3848 {
3718 LOG (GNUNET_ERROR_TYPE_DEBUG, 3849 LOG (GNUNET_ERROR_TYPE_DEBUG,
3719 "\t%s\n", GNUNET_i2s (&view_array[i])); 3850 "\t%s\n", GNUNET_i2s (&view_array[i]));
3720 to_file (ss->file_name_view_log, 3851 to_file (sub->file_name_view_log,
3721 "=%s\t(do round)", 3852 "=%s\t(do round)",
3722 GNUNET_i2s_full (&view_array[i])); 3853 GNUNET_i2s_full (&view_array[i]));
3723 } 3854 }
3724 3855
3725 3856
3726 /* Send pushes and pull requests */ 3857 /* Send pushes and pull requests */
3727 if (0 < View_size (ss->view)) 3858 if (0 < View_size (sub->view))
3728 { 3859 {
3729 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3860 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3730 View_size (ss->view)); 3861 View_size (sub->view));
3731 3862
3732 /* Send PUSHes */ 3863 /* Send PUSHes */
3733 a_peers = ceil (alpha * View_size (ss->view)); 3864 a_peers = ceil (alpha * View_size (sub->view));
3734 3865
3735 LOG (GNUNET_ERROR_TYPE_DEBUG, 3866 LOG (GNUNET_ERROR_TYPE_DEBUG,
3736 "Going to send pushes to %u (ceil (%f * %u)) peers.\n", 3867 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3737 a_peers, alpha, View_size (ss->view)); 3868 a_peers, alpha, View_size (sub->view));
3738 for (i = 0; i < a_peers; i++) 3869 for (i = 0; i < a_peers; i++)
3739 { 3870 {
3740 peer = view_array[permut[i]]; 3871 peer = view_array[permut[i]];
3741 // FIXME if this fails schedule/loop this for later 3872 // FIXME if this fails schedule/loop this for later
3742 send_push (get_peer_ctx (ss->peer_map, &peer)); 3873 send_push (get_peer_ctx (sub->peer_map, &peer));
3743 } 3874 }
3744 3875
3745 /* Send PULL requests */ 3876 /* Send PULL requests */
3746 b_peers = ceil (beta * View_size (ss->view)); 3877 b_peers = ceil (beta * View_size (sub->view));
3747 first_border = a_peers; 3878 first_border = a_peers;
3748 second_border = a_peers + b_peers; 3879 second_border = a_peers + b_peers;
3749 if (second_border > View_size (ss->view)) 3880 if (second_border > View_size (sub->view))
3750 { 3881 {
3751 first_border = View_size (ss->view) - b_peers; 3882 first_border = View_size (sub->view) - b_peers;
3752 second_border = View_size (ss->view); 3883 second_border = View_size (sub->view);
3753 } 3884 }
3754 LOG (GNUNET_ERROR_TYPE_DEBUG, 3885 LOG (GNUNET_ERROR_TYPE_DEBUG,
3755 "Going to send pulls to %u (ceil (%f * %u)) peers.\n", 3886 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3756 b_peers, beta, View_size (ss->view)); 3887 b_peers, beta, View_size (sub->view));
3757 for (i = first_border; i < second_border; i++) 3888 for (i = first_border; i < second_border; i++)
3758 { 3889 {
3759 peer = view_array[permut[i]]; 3890 peer = view_array[permut[i]];
3760 if ( GNUNET_NO == check_peer_flag (ss->peer_map, 3891 if ( GNUNET_NO == check_peer_flag (sub->peer_map,
3761 &peer, 3892 &peer,
3762 Peers_PULL_REPLY_PENDING)) 3893 Peers_PULL_REPLY_PENDING))
3763 { // FIXME if this fails schedule/loop this for later 3894 { // FIXME if this fails schedule/loop this for later
3764 send_pull_request (get_peer_ctx (ss->peer_map, &peer)); 3895 send_pull_request (get_peer_ctx (sub->peer_map, &peer));
3765 } 3896 }
3766 } 3897 }
3767 3898
@@ -3773,10 +3904,9 @@ do_round (void *cls)
3773 /* Update view */ 3904 /* Update view */
3774 /* TODO see how many peers are in push-/pull- list! */ 3905 /* TODO see how many peers are in push-/pull- list! */
3775 3906
3776 if ((CustomPeerMap_size (ss->push_map) <= alpha * ss->view_size_est_need) && 3907 if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
3777 (0 < CustomPeerMap_size (ss->push_map)) && 3908 (0 < CustomPeerMap_size (sub->push_map)) &&
3778 (0 < CustomPeerMap_size (ss->pull_map))) 3909 (0 < CustomPeerMap_size (sub->pull_map)))
3779 //if (GNUNET_YES) // disable blocking temporarily
3780 { /* If conditions for update are fulfilled, update */ 3910 { /* If conditions for update are fulfilled, update */
3781 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); 3911 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3782 3912
@@ -3788,23 +3918,23 @@ do_round (void *cls)
3788 peers_to_clean_size = 0; 3918 peers_to_clean_size = 0;
3789 GNUNET_array_grow (peers_to_clean, 3919 GNUNET_array_grow (peers_to_clean,
3790 peers_to_clean_size, 3920 peers_to_clean_size,
3791 View_size (ss->view)); 3921 View_size (sub->view));
3792 GNUNET_memcpy (peers_to_clean, 3922 GNUNET_memcpy (peers_to_clean,
3793 view_array, 3923 view_array,
3794 View_size (ss->view) * sizeof (struct GNUNET_PeerIdentity)); 3924 View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
3795 3925
3796 /* Seems like recreating is the easiest way of emptying the peermap */ 3926 /* Seems like recreating is the easiest way of emptying the peermap */
3797 View_clear (ss->view); 3927 View_clear (sub->view);
3798 to_file (ss->file_name_view_log, 3928 to_file (sub->file_name_view_log,
3799 "--- emptied ---"); 3929 "--- emptied ---");
3800 3930
3801 first_border = GNUNET_MIN (ceil (alpha * ss->view_size_est_need), 3931 first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
3802 CustomPeerMap_size (ss->push_map)); 3932 CustomPeerMap_size (sub->push_map));
3803 second_border = first_border + 3933 second_border = first_border +
3804 GNUNET_MIN (floor (beta * ss->view_size_est_need), 3934 GNUNET_MIN (floor (beta * sub->view_size_est_need),
3805 CustomPeerMap_size (ss->pull_map)); 3935 CustomPeerMap_size (sub->pull_map));
3806 final_size = second_border + 3936 final_size = second_border +
3807 ceil ((1 - (alpha + beta)) * ss->view_size_est_need); 3937 ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
3808 LOG (GNUNET_ERROR_TYPE_DEBUG, 3938 LOG (GNUNET_ERROR_TYPE_DEBUG,
3809 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", 3939 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3810 first_border, 3940 first_border,
@@ -3813,19 +3943,20 @@ do_round (void *cls)
3813 3943
3814 /* Update view with peers received through PUSHes */ 3944 /* Update view with peers received through PUSHes */
3815 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3945 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3816 CustomPeerMap_size (ss->push_map)); 3946 CustomPeerMap_size (sub->push_map));
3817 for (i = 0; i < first_border; i++) 3947 for (i = 0; i < first_border; i++)
3818 { 3948 {
3819 int inserted; 3949 int inserted;
3820 inserted = insert_in_view (ss, 3950 inserted = insert_in_view (sub,
3821 CustomPeerMap_get_peer_by_index (ss->push_map, 3951 CustomPeerMap_get_peer_by_index (sub->push_map,
3822 permut[i])); 3952 permut[i]));
3823 if (GNUNET_OK == inserted) 3953 if (GNUNET_OK == inserted)
3824 { 3954 {
3825 clients_notify_stream_peer (1, 3955 clients_notify_stream_peer (sub,
3826 CustomPeerMap_get_peer_by_index (ss->push_map, permut[i])); 3956 1,
3957 CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
3827 } 3958 }
3828 to_file (ss->file_name_view_log, 3959 to_file (sub->file_name_view_log,
3829 "+%s\t(push list)", 3960 "+%s\t(push list)",
3830 GNUNET_i2s_full (&view_array[i])); 3961 GNUNET_i2s_full (&view_array[i]));
3831 // TODO change the peer_flags accordingly 3962 // TODO change the peer_flags accordingly
@@ -3835,20 +3966,21 @@ do_round (void *cls)
3835 3966
3836 /* Update view with peers received through PULLs */ 3967 /* Update view with peers received through PULLs */
3837 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 3968 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3838 CustomPeerMap_size (ss->pull_map)); 3969 CustomPeerMap_size (sub->pull_map));
3839 for (i = first_border; i < second_border; i++) 3970 for (i = first_border; i < second_border; i++)
3840 { 3971 {
3841 int inserted; 3972 int inserted;
3842 inserted = insert_in_view (ss, 3973 inserted = insert_in_view (sub,
3843 CustomPeerMap_get_peer_by_index (ss->pull_map, 3974 CustomPeerMap_get_peer_by_index (sub->pull_map,
3844 permut[i - first_border])); 3975 permut[i - first_border]));
3845 if (GNUNET_OK == inserted) 3976 if (GNUNET_OK == inserted)
3846 { 3977 {
3847 clients_notify_stream_peer (1, 3978 clients_notify_stream_peer (sub,
3848 CustomPeerMap_get_peer_by_index (ss->pull_map, 3979 1,
3980 CustomPeerMap_get_peer_by_index (sub->pull_map,
3849 permut[i - first_border])); 3981 permut[i - first_border]));
3850 } 3982 }
3851 to_file (ss->file_name_view_log, 3983 to_file (sub->file_name_view_log,
3852 "+%s\t(pull list)", 3984 "+%s\t(pull list)",
3853 GNUNET_i2s_full (&view_array[i])); 3985 GNUNET_i2s_full (&view_array[i]));
3854 // TODO change the peer_flags accordingly 3986 // TODO change the peer_flags accordingly
@@ -3857,106 +3989,106 @@ do_round (void *cls)
3857 permut = NULL; 3989 permut = NULL;
3858 3990
3859 /* Update view with peers from history */ 3991 /* Update view with peers from history */
3860 RPS_sampler_get_n_rand_peers (ss->sampler, 3992 RPS_sampler_get_n_rand_peers (sub->sampler,
3861 final_size - second_border, 3993 final_size - second_border,
3862 hist_update, 3994 hist_update,
3863 ss); 3995 sub);
3864 // TODO change the peer_flags accordingly 3996 // TODO change the peer_flags accordingly
3865 3997
3866 for (i = 0; i < View_size (ss->view); i++) 3998 for (i = 0; i < View_size (sub->view); i++)
3867 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]); 3999 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3868 4000
3869 /* Clean peers that were removed from the view */ 4001 /* Clean peers that were removed from the view */
3870 for (i = 0; i < peers_to_clean_size; i++) 4002 for (i = 0; i < peers_to_clean_size; i++)
3871 { 4003 {
3872 to_file (ss->file_name_view_log, 4004 to_file (sub->file_name_view_log,
3873 "-%s", 4005 "-%s",
3874 GNUNET_i2s_full (&peers_to_clean[i])); 4006 GNUNET_i2s_full (&peers_to_clean[i]));
3875 clean_peer (ss, &peers_to_clean[i]); 4007 clean_peer (sub, &peers_to_clean[i]);
3876 } 4008 }
3877 4009
3878 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); 4010 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3879 clients_notify_view_update (ss); 4011 clients_notify_view_update (sub);
3880 } else { 4012 } else {
3881 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); 4013 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3882 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); 4014 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3883 if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && 4015 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3884 !(0 >= CustomPeerMap_size (ss->pull_map))) 4016 !(0 >= CustomPeerMap_size (sub->pull_map)))
3885 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); 4017 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3886 if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && 4018 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3887 (0 >= CustomPeerMap_size (ss->pull_map))) 4019 (0 >= CustomPeerMap_size (sub->pull_map)))
3888 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); 4020 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3889 if (0 >= CustomPeerMap_size (ss->push_map) && 4021 if (0 >= CustomPeerMap_size (sub->push_map) &&
3890 !(0 >= CustomPeerMap_size (ss->pull_map))) 4022 !(0 >= CustomPeerMap_size (sub->pull_map)))
3891 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); 4023 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3892 if (0 >= CustomPeerMap_size (ss->push_map) && 4024 if (0 >= CustomPeerMap_size (sub->push_map) &&
3893 (0 >= CustomPeerMap_size (ss->pull_map))) 4025 (0 >= CustomPeerMap_size (sub->pull_map)))
3894 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); 4026 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3895 if (0 >= CustomPeerMap_size (ss->pull_map) && 4027 if (0 >= CustomPeerMap_size (sub->pull_map) &&
3896 CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) && 4028 CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3897 0 >= CustomPeerMap_size (ss->push_map)) 4029 0 >= CustomPeerMap_size (sub->push_map))
3898 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); 4030 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3899 } 4031 }
3900 // TODO independent of that also get some peers from CADET_get_peers()? 4032 // TODO independent of that also get some peers from CADET_get_peers()?
3901 GNUNET_STATISTICS_set (stats, 4033 GNUNET_STATISTICS_set (stats,
3902 "# peers in push map at end of round", 4034 "# peers in push map at end of round",
3903 CustomPeerMap_size (ss->push_map), 4035 CustomPeerMap_size (sub->push_map),
3904 GNUNET_NO); 4036 GNUNET_NO);
3905 GNUNET_STATISTICS_set (stats, 4037 GNUNET_STATISTICS_set (stats,
3906 "# peers in pull map at end of round", 4038 "# peers in pull map at end of round",
3907 CustomPeerMap_size (ss->pull_map), 4039 CustomPeerMap_size (sub->pull_map),
3908 GNUNET_NO); 4040 GNUNET_NO);
3909 GNUNET_STATISTICS_set (stats, 4041 GNUNET_STATISTICS_set (stats,
3910 "# peers in view at end of round", 4042 "# peers in view at end of round",
3911 View_size (ss->view), 4043 View_size (sub->view),
3912 GNUNET_NO); 4044 GNUNET_NO);
3913 4045
3914 LOG (GNUNET_ERROR_TYPE_DEBUG, 4046 LOG (GNUNET_ERROR_TYPE_DEBUG,
3915 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (ss->view%u) = %.2f)\n", 4047 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
3916 CustomPeerMap_size (ss->push_map), 4048 CustomPeerMap_size (sub->push_map),
3917 CustomPeerMap_size (ss->pull_map), 4049 CustomPeerMap_size (sub->pull_map),
3918 alpha, 4050 alpha,
3919 View_size (ss->view), 4051 View_size (sub->view),
3920 alpha * View_size (ss->view)); 4052 alpha * View_size (sub->view));
3921 4053
3922 /* Update samplers */ 4054 /* Update samplers */
3923 for (i = 0; i < CustomPeerMap_size (ss->push_map); i++) 4055 for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
3924 { 4056 {
3925 update_peer = CustomPeerMap_get_peer_by_index (ss->push_map, i); 4057 update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
3926 LOG (GNUNET_ERROR_TYPE_DEBUG, 4058 LOG (GNUNET_ERROR_TYPE_DEBUG,
3927 "Updating with peer %s from push list\n", 4059 "Updating with peer %s from push list\n",
3928 GNUNET_i2s (update_peer)); 4060 GNUNET_i2s (update_peer));
3929 insert_in_sampler (ss, update_peer); 4061 insert_in_sampler (sub, update_peer);
3930 clean_peer (ss, update_peer); /* This cleans only if it is not in the view */ 4062 clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
3931 } 4063 }
3932 4064
3933 for (i = 0; i < CustomPeerMap_size (ss->pull_map); i++) 4065 for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
3934 { 4066 {
3935 LOG (GNUNET_ERROR_TYPE_DEBUG, 4067 LOG (GNUNET_ERROR_TYPE_DEBUG,
3936 "Updating with peer %s from pull list\n", 4068 "Updating with peer %s from pull list\n",
3937 GNUNET_i2s (CustomPeerMap_get_peer_by_index (ss->pull_map, i))); 4069 GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
3938 insert_in_sampler (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i)); 4070 insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3939 /* This cleans only if it is not in the view */ 4071 /* This cleans only if it is not in the view */
3940 clean_peer (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i)); 4072 clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3941 } 4073 }
3942 4074
3943 4075
3944 /* Empty push/pull lists */ 4076 /* Empty push/pull lists */
3945 CustomPeerMap_clear (ss->push_map); 4077 CustomPeerMap_clear (sub->push_map);
3946 CustomPeerMap_clear (ss->pull_map); 4078 CustomPeerMap_clear (sub->pull_map);
3947 4079
3948 GNUNET_STATISTICS_set (stats, 4080 GNUNET_STATISTICS_set (stats,
3949 "view size", 4081 "view size",
3950 View_size(ss->view), 4082 View_size(sub->view),
3951 GNUNET_NO); 4083 GNUNET_NO);
3952 4084
3953 struct GNUNET_TIME_Relative time_next_round; 4085 struct GNUNET_TIME_Relative time_next_round;
3954 4086
3955 time_next_round = compute_rand_delay (ss->round_interval, 2); 4087 time_next_round = compute_rand_delay (sub->round_interval, 2);
3956 4088
3957 /* Schedule next round */ 4089 /* Schedule next round */
3958 ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, 4090 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3959 &do_round, ss); 4091 &do_round, sub);
3960 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 4092 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3961} 4093}
3962 4094
@@ -3969,7 +4101,7 @@ do_round (void *cls)
3969 * 4101 *
3970 * implements #GNUNET_CADET_PeersCB 4102 * implements #GNUNET_CADET_PeersCB
3971 * 4103 *
3972 * @param cls Closure - SubSampler 4104 * @param cls Closure - Sub
3973 * @param peer Peer, or NULL on "EOF". 4105 * @param peer Peer, or NULL on "EOF".
3974 * @param tunnel Do we have a tunnel towards this peer? 4106 * @param tunnel Do we have a tunnel towards this peer?
3975 * @param n_paths Number of known paths towards this peer. 4107 * @param n_paths Number of known paths towards this peer.
@@ -3979,12 +4111,12 @@ do_round (void *cls)
3979void 4111void
3980init_peer_cb (void *cls, 4112init_peer_cb (void *cls,
3981 const struct GNUNET_PeerIdentity *peer, 4113 const struct GNUNET_PeerIdentity *peer,
3982 int tunnel, // "Do we have a tunnel towards this peer?" 4114 int tunnel, /* "Do we have a tunnel towards this peer?" */
3983 unsigned int n_paths, // "Number of known paths towards this peer" 4115 unsigned int n_paths, /* "Number of known paths towards this peer" */
3984 unsigned int best_path) // "How long is the best path? 4116 unsigned int best_path) /* "How long is the best path?
3985 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 4117 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
3986{ 4118{
3987 struct SubSampler *ss = cls; 4119 struct Sub *sub = cls;
3988 (void) tunnel; 4120 (void) tunnel;
3989 (void) n_paths; 4121 (void) n_paths;
3990 (void) best_path; 4122 (void) best_path;
@@ -3994,7 +4126,7 @@ init_peer_cb (void *cls,
3994 LOG (GNUNET_ERROR_TYPE_DEBUG, 4126 LOG (GNUNET_ERROR_TYPE_DEBUG,
3995 "Got peer_id %s from cadet\n", 4127 "Got peer_id %s from cadet\n",
3996 GNUNET_i2s (peer)); 4128 GNUNET_i2s (peer));
3997 got_peer (ss, peer); 4129 got_peer (sub, peer);
3998 } 4130 }
3999} 4131}
4000 4132
@@ -4004,7 +4136,7 @@ init_peer_cb (void *cls,
4004 * 4136 *
4005 * We initialise the sampler with those. 4137 * We initialise the sampler with those.
4006 * 4138 *
4007 * @param cls Closure - SubSampler 4139 * @param cls Closure - Sub
4008 * @param peer the peer id 4140 * @param peer the peer id
4009 * @return #GNUNET_YES if we should continue to 4141 * @return #GNUNET_YES if we should continue to
4010 * iterate, 4142 * iterate,
@@ -4014,14 +4146,14 @@ static int
4014valid_peers_iterator (void *cls, 4146valid_peers_iterator (void *cls,
4015 const struct GNUNET_PeerIdentity *peer) 4147 const struct GNUNET_PeerIdentity *peer)
4016{ 4148{
4017 struct SubSampler *ss = cls; 4149 struct Sub *sub = cls;
4018 4150
4019 if (NULL != peer) 4151 if (NULL != peer)
4020 { 4152 {
4021 LOG (GNUNET_ERROR_TYPE_DEBUG, 4153 LOG (GNUNET_ERROR_TYPE_DEBUG,
4022 "Got stored, valid peer %s\n", 4154 "Got stored, valid peer %s\n",
4023 GNUNET_i2s (peer)); 4155 GNUNET_i2s (peer));
4024 got_peer (ss, peer); 4156 got_peer (sub, peer);
4025 } 4157 }
4026 return GNUNET_YES; 4158 return GNUNET_YES;
4027} 4159}
@@ -4030,7 +4162,7 @@ valid_peers_iterator (void *cls,
4030/** 4162/**
4031 * Iterator over peers from peerinfo. 4163 * Iterator over peers from peerinfo.
4032 * 4164 *
4033 * @param cls Closure - SubSampler 4165 * @param cls Closure - Sub
4034 * @param peer id of the peer, NULL for last call 4166 * @param peer id of the peer, NULL for last call
4035 * @param hello hello message for the peer (can be NULL) 4167 * @param hello hello message for the peer (can be NULL)
4036 * @param error message 4168 * @param error message
@@ -4041,7 +4173,7 @@ process_peerinfo_peers (void *cls,
4041 const struct GNUNET_HELLO_Message *hello, 4173 const struct GNUNET_HELLO_Message *hello,
4042 const char *err_msg) 4174 const char *err_msg)
4043{ 4175{
4044 struct SubSampler *ss = cls; 4176 struct Sub *sub = cls;
4045 (void) hello; 4177 (void) hello;
4046 (void) err_msg; 4178 (void) err_msg;
4047 4179
@@ -4050,7 +4182,7 @@ process_peerinfo_peers (void *cls,
4050 LOG (GNUNET_ERROR_TYPE_DEBUG, 4182 LOG (GNUNET_ERROR_TYPE_DEBUG,
4051 "Got peer_id %s from peerinfo\n", 4183 "Got peer_id %s from peerinfo\n",
4052 GNUNET_i2s (peer)); 4184 GNUNET_i2s (peer));
4053 got_peer (ss, peer); 4185 got_peer (sub, peer);
4054 } 4186 }
4055} 4187}
4056 4188
@@ -4058,14 +4190,13 @@ process_peerinfo_peers (void *cls,
4058/** 4190/**
4059 * Task run during shutdown. 4191 * Task run during shutdown.
4060 * 4192 *
4061 * @param cls Closure - SubSampler containing all datastructures to clean 4193 * @param cls Closure - unused
4062 */ 4194 */
4063static void 4195static void
4064shutdown_task (void *cls) 4196shutdown_task (void *cls)
4065{ 4197{
4066 struct ClientContext *client_ctx;
4067 (void) cls; 4198 (void) cls;
4068 struct SubSampler *ss = cls; 4199 struct ClientContext *client_ctx;
4069 4200
4070 LOG (GNUNET_ERROR_TYPE_DEBUG, 4201 LOG (GNUNET_ERROR_TYPE_DEBUG,
4071 "RPS service is going down\n"); 4202 "RPS service is going down\n");
@@ -4077,39 +4208,28 @@ shutdown_task (void *cls)
4077 { 4208 {
4078 destroy_cli_ctx (client_ctx); 4209 destroy_cli_ctx (client_ctx);
4079 } 4210 }
4080 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); 4211 if (NULL != msub)
4081 GNUNET_PEERINFO_disconnect (peerinfo_handle);
4082 peerinfo_handle = NULL;
4083 if (NULL != ss->do_round_task)
4084 { 4212 {
4085 GNUNET_SCHEDULER_cancel (ss->do_round_task); 4213 destroy_sub (msub);
4086 ss->do_round_task = NULL; 4214 msub = NULL;
4087 } 4215 }
4088 4216
4089 peers_terminate (ss); 4217 /* Disconnect from other services */
4090 4218 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4219 GNUNET_PEERINFO_disconnect (peerinfo_handle);
4220 peerinfo_handle = NULL;
4091 GNUNET_NSE_disconnect (nse); 4221 GNUNET_NSE_disconnect (nse);
4092 RPS_sampler_destroy (ss->sampler); 4222
4093 GNUNET_CADET_close_port (ss->cadet_port);
4094 GNUNET_CADET_disconnect (ss->cadet_handle);
4095 ss->cadet_handle = NULL;
4096 View_destroy (ss->view);
4097 CustomPeerMap_destroy (ss->push_map);
4098 CustomPeerMap_destroy (ss->pull_map);
4099 if (NULL != stats) 4223 if (NULL != stats)
4100 { 4224 {
4101 GNUNET_STATISTICS_destroy (stats, 4225 GNUNET_STATISTICS_destroy (stats,
4102 GNUNET_NO); 4226 GNUNET_NO);
4103 stats = NULL; 4227 stats = NULL;
4104 } 4228 }
4229 GNUNET_CADET_disconnect (cadet_handle);
4230 cadet_handle = NULL;
4105#ifdef ENABLE_MALICIOUS 4231#ifdef ENABLE_MALICIOUS
4106 struct AttackedPeer *tmp_att_peer; 4232 struct AttackedPeer *tmp_att_peer;
4107 /* it is ok to free this const during shutdown: */
4108 GNUNET_free ((char *) ss->file_name_view_log);
4109#ifdef TO_FILE
4110 GNUNET_free ((char *) ss->file_name_observed_log);
4111 GNUNET_CONTAINER_multipeermap_destroy (ss->observed_unique_peers);
4112#endif /* TO_FILE */
4113 GNUNET_array_grow (mal_peers, 4233 GNUNET_array_grow (mal_peers,
4114 num_mal_peers, 4234 num_mal_peers,
4115 0); 4235 0);
@@ -4154,7 +4274,6 @@ client_connect_cb (void *cls,
4154 cli_ctx->view_updates_left = -1; 4274 cli_ctx->view_updates_left = -1;
4155 cli_ctx->stream_update = GNUNET_NO; 4275 cli_ctx->stream_update = GNUNET_NO;
4156 cli_ctx->client = client; 4276 cli_ctx->client = client;
4157 cli_ctx->ss = mss;
4158 GNUNET_CONTAINER_DLL_insert (cli_ctx_head, 4277 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4159 cli_ctx_tail, 4278 cli_ctx_tail,
4160 cli_ctx); 4279 cli_ctx);
@@ -4206,6 +4325,8 @@ run (void *cls,
4206 char *fn_valid_peers; 4325 char *fn_valid_peers;
4207 struct GNUNET_TIME_Relative round_interval; 4326 struct GNUNET_TIME_Relative round_interval;
4208 long long unsigned int sampler_size; 4327 long long unsigned int sampler_size;
4328 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4329 struct GNUNET_HashCode hash;
4209 4330
4210 (void) cls; 4331 (void) cls;
4211 (void) service; 4332 (void) service;
@@ -4262,40 +4383,43 @@ run (void *cls,
4262 "FILENAME_VALID_PEERS"); 4383 "FILENAME_VALID_PEERS");
4263 } 4384 }
4264 4385
4386 cadet_handle = GNUNET_CADET_connect (cfg);
4387 GNUNET_assert (NULL != cadet_handle);
4388
4389
4265 alpha = 0.45; 4390 alpha = 0.45;
4266 beta = 0.45; 4391 beta = 0.45;
4267 4392
4268 4393
4269 /* Set up main SubSampler */ 4394 /* Set up main Sub */
4270 mss = new_subsampler ("", /* this is the main sampler - no shared value */ 4395 GNUNET_CRYPTO_hash (hash_port_string,
4271 sampler_size, /* Will be overwritten by config */ 4396 strlen (hash_port_string),
4272 round_interval); 4397 &hash);
4398 msub = new_sub (&hash,
4399 sampler_size, /* Will be overwritten by config */
4400 round_interval);
4273 4401
4274 4402
4275 peerinfo_handle = GNUNET_PEERINFO_connect (cfg); 4403 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4276 4404
4277 /* connect to NSE */ 4405 /* connect to NSE */
4278 nse = GNUNET_NSE_connect (cfg, nse_callback, mss); 4406 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4279 4407
4280 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 4408 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4281 //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, mss); 4409 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4282 // TODO send push/pull to each of those peers? 4410 // TODO send push/pull to each of those peers?
4283 // TODO read stored valid peers from last run
4284 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); 4411 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4285 restore_valid_peers (mss); 4412 restore_valid_peers (msub);
4286 get_valid_peers (mss->valid_peers, valid_peers_iterator, mss); 4413 get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4287 4414
4288 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, 4415 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4289 GNUNET_NO, 4416 GNUNET_NO,
4290 process_peerinfo_peers, 4417 process_peerinfo_peers,
4291 mss); 4418 msub);
4292 4419
4293 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); 4420 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4294 4421
4295 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, mss); 4422 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4296 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4297
4298 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, mss);
4299 stats = GNUNET_STATISTICS_create ("rps", cfg); 4423 stats = GNUNET_STATISTICS_create ("rps", cfg);
4300} 4424}
4301 4425
@@ -4336,6 +4460,14 @@ GNUNET_SERVICE_MAIN
4336 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL, 4460 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4337 struct GNUNET_MessageHeader, 4461 struct GNUNET_MessageHeader,
4338 NULL), 4462 NULL),
4463 GNUNET_MQ_hd_fixed_size (client_start_sub,
4464 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4465 struct GNUNET_RPS_CS_SubStartMessage,
4466 NULL),
4467 GNUNET_MQ_hd_fixed_size (client_stop_sub,
4468 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4469 struct GNUNET_RPS_CS_SubStopMessage,
4470 NULL),
4339 GNUNET_MQ_handler_end()); 4471 GNUNET_MQ_handler_end());
4340 4472
4341/* end of gnunet-service-rps.c */ 4473/* end of gnunet-service-rps.c */
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 915524f88..616eabdac 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -114,6 +114,50 @@ struct GNUNET_RPS_CS_ActMaliciousMessage
114#endif /* ENABLE_MALICIOUS */ 114#endif /* ENABLE_MALICIOUS */
115 115
116 116
117/**
118 * Message from client to service telling it to start a new sub
119 */
120struct GNUNET_RPS_CS_SubStartMessage
121{
122 /**
123 * Header including size and type in NBO
124 */
125 struct GNUNET_MessageHeader header;
126
127 /**
128 * For alignment.
129 */
130 uint32_t reserved GNUNET_PACKED;
131
132 /**
133 * Mean interval between two rounds
134 */
135 struct GNUNET_TIME_RelativeNBO round_interval;
136
137 /**
138 * Length of the shared value represented as string.
139 */
140 struct GNUNET_HashCode hash GNUNET_PACKED;
141};
142
143
144/**
145 * Message from client to service telling it to stop a new sub
146 */
147struct GNUNET_RPS_CS_SubStopMessage
148{
149 /**
150 * Header including size and type in NBO
151 */
152 struct GNUNET_MessageHeader header;
153
154 /**
155 * Length of the shared value represented as string.
156 */
157 struct GNUNET_HashCode hash GNUNET_PACKED;
158};
159
160
117/* Debug messages */ 161/* Debug messages */
118 162
119/** 163/**
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 47c4f019d..34b28cd6a 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -588,6 +588,24 @@ mq_error_handler (void *cls,
588 588
589 589
590/** 590/**
591 * @brief Create the hash value from the share value that defines the sub
592 * (-group)
593 *
594 * @param share_val Share value - strings longer than 508 (512 - 4) will be
595 * truncated.
596 * @param hash Pointer to the location in which the hash will be stored.
597 */
598static void
599hash_from_share_val (const char *share_val, struct GNUNET_HashCode *hash)
600{
601 char hash_port_string[512] = "rps";
602
603 (void) strncat (hash_port_string, share_val, 508);
604 GNUNET_CRYPTO_hash (hash_port_string, strlen (hash_port_string), hash);
605}
606
607
608/**
591 * Reconnect to the service 609 * Reconnect to the service
592 */ 610 */
593static void 611static void
@@ -639,6 +657,49 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
639 657
640 658
641/** 659/**
660 * @brief Start a sub with the given shared value
661 *
662 * @param h Handle to rps
663 * @param shared_value The shared value that defines the members of the sub (-gorup)
664 */
665void
666GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
667 const char *shared_value)
668{
669 struct GNUNET_RPS_CS_SubStartMessage *msg;
670 struct GNUNET_MQ_Envelope *ev;
671
672 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
673 hash_from_share_val (shared_value, &msg->hash);
674 msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config!
675 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
676 GNUNET_assert (0 != msg->round_interval.rel_value_us__);
677
678 GNUNET_MQ_send (h->mq, ev);
679}
680
681
682/**
683 * @brief Stop a sub with the given shared value
684 *
685 * @param h Handle to rps
686 * @param shared_value The shared value that defines the members of the sub (-gorup)
687 */
688void
689GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
690 const char *shared_value)
691{
692 struct GNUNET_RPS_CS_SubStopMessage *msg;
693 struct GNUNET_MQ_Envelope *ev;
694
695 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
696 hash_from_share_val (shared_value, &msg->hash);
697
698 GNUNET_MQ_send (h->mq, ev);
699}
700
701
702/**
642 * Request n random peers. 703 * Request n random peers.
643 * 704 *
644 * @param rps_handle handle to the rps service 705 * @param rps_handle handle to the rps service