aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/.gitignore9
-rw-r--r--src/rps/Makefile.am40
-rw-r--r--src/rps/gnunet-rps-profiler.c12
-rw-r--r--src/rps/gnunet-rps.c31
-rw-r--r--src/rps/gnunet-service-rps.c1814
-rw-r--r--src/rps/gnunet-service-rps_view.c2
-rw-r--r--src/rps/rps-sampler_common.c19
-rw-r--r--src/rps/rps-test_util.c76
-rw-r--r--src/rps/rps-test_util.h25
-rw-r--r--src/rps/rps.h44
-rw-r--r--src/rps/rps_api.c228
-rw-r--r--src/rps/test_rps.c129
-rw-r--r--src/rps/test_rps.conf4
13 files changed, 1555 insertions, 878 deletions
diff --git a/src/rps/.gitignore b/src/rps/.gitignore
index cb14f5b09..9e78e2ca0 100644
--- a/src/rps/.gitignore
+++ b/src/rps/.gitignore
@@ -1,15 +1,16 @@
1gnunet-service-rps 1gnunet-service-rps
2gnunet-rps 2gnunet-rps
3gnunet-rps-profiler 3gnunet-rps-profiler
4test_rps_malicious_1 4test_rps_single_req
5test_rps_malicious_2
6test_rps_malicious_3
7test_rps_req_cancel 5test_rps_req_cancel
6test_rps_sub
8test_rps_seed_big 7test_rps_seed_big
9test_rps_seed_request 8test_rps_seed_request
10test_rps_single_req
11test_service_rps_custommap 9test_service_rps_custommap
12test_service_rps_sampler_elem 10test_service_rps_sampler_elem
13test_service_rps_view 11test_service_rps_view
14test_rps_churn 12test_rps_churn
15test_service_rps_peers 13test_service_rps_peers
14test_rps_malicious_1
15test_rps_malicious_2
16test_rps_malicious_3
diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am
index e973bb7ca..a356d3dbc 100644
--- a/src/rps/Makefile.am
+++ b/src/rps/Makefile.am
@@ -20,8 +20,6 @@ pkgcfg_DATA = \
20bin_PROGRAMS = gnunet-rps 20bin_PROGRAMS = gnunet-rps
21 21
22gnunet_rps_SOURCES = \ 22gnunet_rps_SOURCES = \
23 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
24 rps-sampler_common.h rps-sampler_common.c \
25 gnunet-rps.c 23 gnunet-rps.c
26 24
27gnunet_rps_LDADD = \ 25gnunet_rps_LDADD = \
@@ -32,6 +30,9 @@ gnunet_rps_LDADD = \
32lib_LTLIBRARIES = libgnunetrps.la 30lib_LTLIBRARIES = libgnunetrps.la
33 31
34libgnunetrps_la_SOURCES = \ 32libgnunetrps_la_SOURCES = \
33 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
34 rps-test_util.h rps-test_util.c \
35 rps-sampler_common.h rps-sampler_common.c \
35 rps-sampler_client.h rps-sampler_client.c \ 36 rps-sampler_client.h rps-sampler_client.c \
36 rps_api.c rps.h 37 rps_api.c rps.h
37libgnunetrps_la_LIBADD = \ 38libgnunetrps_la_LIBADD = \
@@ -40,6 +41,8 @@ libgnunetrps_la_LIBADD = \
40libgnunetrps_la_LDFLAGS = \ 41libgnunetrps_la_LDFLAGS = \
41 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 42 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
42 -version-info 0:0:0 43 -version-info 0:0:0
44# Fix 'created both with libtool and without' error:
45libgnunetrps_la_CFLAGS = $(AM_CFLAGS)
43 46
44 47
45libexec_PROGRAMS = \ 48libexec_PROGRAMS = \
@@ -69,6 +72,7 @@ gnunet_service_rps_LDADD = \
69 $(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \ 72 $(top_builddir)/src/peerinfo/libgnunetpeerinfo.la \
70 $(top_builddir)/src/nse/libgnunetnse.la \ 73 $(top_builddir)/src/nse/libgnunetnse.la \
71 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 74 $(top_builddir)/src/statistics/libgnunetstatistics.la \
75 $(top_builddir)/src/core/libgnunetcore.la \
72 $(LIBGCRYPT_LIBS) \ 76 $(LIBGCRYPT_LIBS) \
73 -lm -lgcrypt \ 77 -lm -lgcrypt \
74 $(GN_LIBINTL) 78 $(GN_LIBINTL)
@@ -79,14 +83,15 @@ check_PROGRAMS = \
79 test_service_rps_view \ 83 test_service_rps_view \
80 test_service_rps_custommap \ 84 test_service_rps_custommap \
81 test_service_rps_sampler_elem \ 85 test_service_rps_sampler_elem \
82 test_rps_malicious_1 \
83 test_rps_malicious_2 \
84 test_rps_malicious_3 \
85 test_rps_seed_request \
86 test_rps_single_req \ 86 test_rps_single_req \
87 test_rps_req_cancel \ 87 test_rps_req_cancel \
88 test_rps_sub \
89 test_rps_seed_request \
88 test_rps_seed_big \ 90 test_rps_seed_big \
89 test_rps_churn 91 test_rps_churn \
92 test_rps_malicious_1 \
93 test_rps_malicious_2 \
94 test_rps_malicious_3
90endif 95endif
91 96
92rps_test_src = \ 97rps_test_src = \
@@ -125,15 +130,6 @@ test_service_rps_sampler_elem_SOURCES = \
125 test_service_rps_sampler_elem.c 130 test_service_rps_sampler_elem.c
126test_service_rps_sampler_elem_LDADD = $(top_builddir)/src/util/libgnunetutil.la 131test_service_rps_sampler_elem_LDADD = $(top_builddir)/src/util/libgnunetutil.la
127 132
128test_rps_malicious_1_SOURCES = $(rps_test_src)
129test_rps_malicious_1_LDADD = $(ld_rps_test_lib)
130
131test_rps_malicious_2_SOURCES = $(rps_test_src)
132test_rps_malicious_2_LDADD = $(ld_rps_test_lib)
133
134test_rps_malicious_3_SOURCES = $(rps_test_src)
135test_rps_malicious_3_LDADD = $(ld_rps_test_lib)
136
137test_rps_single_req_SOURCES = $(rps_test_src) 133test_rps_single_req_SOURCES = $(rps_test_src)
138test_rps_single_req_LDADD = $(ld_rps_test_lib) 134test_rps_single_req_LDADD = $(ld_rps_test_lib)
139 135
@@ -143,12 +139,24 @@ test_rps_seed_request_LDADD = $(ld_rps_test_lib)
143test_rps_req_cancel_SOURCES = $(rps_test_src) 139test_rps_req_cancel_SOURCES = $(rps_test_src)
144test_rps_req_cancel_LDADD = $(ld_rps_test_lib) 140test_rps_req_cancel_LDADD = $(ld_rps_test_lib)
145 141
142test_rps_sub_SOURCES = $(rps_test_src)
143test_rps_sub_LDADD = $(ld_rps_test_lib)
144
146test_rps_seed_big_SOURCES = $(rps_test_src) 145test_rps_seed_big_SOURCES = $(rps_test_src)
147test_rps_seed_big_LDADD = $(ld_rps_test_lib) 146test_rps_seed_big_LDADD = $(ld_rps_test_lib)
148 147
149test_rps_churn_SOURCES = $(rps_test_src) 148test_rps_churn_SOURCES = $(rps_test_src)
150test_rps_churn_LDADD = $(ld_rps_test_lib) 149test_rps_churn_LDADD = $(ld_rps_test_lib)
151 150
151test_rps_malicious_1_SOURCES = $(rps_test_src)
152test_rps_malicious_1_LDADD = $(ld_rps_test_lib)
153
154test_rps_malicious_2_SOURCES = $(rps_test_src)
155test_rps_malicious_2_LDADD = $(ld_rps_test_lib)
156
157test_rps_malicious_3_SOURCES = $(rps_test_src)
158test_rps_malicious_3_LDADD = $(ld_rps_test_lib)
159
152gnunet_rps_profiler_SOURCES = \ 160gnunet_rps_profiler_SOURCES = \
153 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ 161 gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
154 rps-sampler_common.h rps-sampler_common.c \ 162 rps-sampler_common.h rps-sampler_common.c \
diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c
index f2a8083e7..b17fb6a50 100644
--- a/src/rps/gnunet-rps-profiler.c
+++ b/src/rps/gnunet-rps-profiler.c
@@ -909,6 +909,7 @@ cancel_request (struct PendingReply *pending_rep)
909 rps_peer->num_pending_reps--; 909 rps_peer->num_pending_reps--;
910 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 910 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911 "Cancelling rps get reply\n"); 911 "Cancelling rps get reply\n");
912 GNUNET_assert (NULL != pending_rep->req_handle);
912 GNUNET_RPS_request_cancel (pending_rep->req_handle); 913 GNUNET_RPS_request_cancel (pending_rep->req_handle);
913 GNUNET_free (pending_rep); 914 GNUNET_free (pending_rep);
914} 915}
@@ -1288,7 +1289,11 @@ rps_disconnect_adapter (void *cls,
1288 cancel_request (pending_rep); 1289 cancel_request (pending_rep);
1289 } 1290 }
1290 GNUNET_assert (h == peer->rps_handle); 1291 GNUNET_assert (h == peer->rps_handle);
1291 GNUNET_RPS_disconnect (h); 1292 if (NULL != h)
1293 {
1294 GNUNET_RPS_disconnect (h);
1295 h = NULL;
1296 }
1292 peer->rps_handle = NULL; 1297 peer->rps_handle = NULL;
1293 } 1298 }
1294} 1299}
@@ -1788,6 +1793,7 @@ profiler_reply_handle (void *cls,
1788 unsigned int i; 1793 unsigned int i;
1789 struct PendingReply *pending_rep = (struct PendingReply *) cls; 1794 struct PendingReply *pending_rep = (struct PendingReply *) cls;
1790 1795
1796 pending_rep->req_handle = NULL;
1791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n"); 1797 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n");
1792 rps_peer = pending_rep->rps_peer; 1798 rps_peer = pending_rep->rps_peer;
1793 (void) GNUNET_asprintf (&file_name, 1799 (void) GNUNET_asprintf (&file_name,
@@ -2426,10 +2432,10 @@ post_test_shutdown_ready_cb (void *cls,
2426 GNUNET_TESTBED_operation_done (rps_peer->stat_op); 2432 GNUNET_TESTBED_operation_done (rps_peer->stat_op);
2427 } 2433 }
2428 2434
2429 //write_final_stats (); 2435 write_final_stats ();
2430 if (GNUNET_YES == check_statistics_collect_completed()) 2436 if (GNUNET_YES == check_statistics_collect_completed())
2431 { 2437 {
2432 write_final_stats (); 2438 //write_final_stats ();
2433 GNUNET_free (stat_cls); 2439 GNUNET_free (stat_cls);
2434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2435 "Shutting down\n"); 2441 "Shutting down\n");
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index d0f905f51..49189481f 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -58,11 +58,6 @@ static int stream_input;
58 */ 58 */
59static uint64_t num_view_updates; 59static uint64_t num_view_updates;
60 60
61/**
62 * @brief Number of peers we want to receive from stream
63 */
64static uint64_t num_stream_peers;
65
66 61
67/** 62/**
68 * Task run when user presses CTRL-C to abort. 63 * Task run when user presses CTRL-C to abort.
@@ -162,24 +157,13 @@ stream_input_handle (void *cls,
162 157
163 if (0 == num_peers) 158 if (0 == num_peers)
164 { 159 {
165 FPRINTF (stdout, "Empty view\n"); 160 FPRINTF (stdout, "No peer was returned\n");
166 } 161 }
167 req_handle = NULL; 162 req_handle = NULL;
168 for (i = 0; i < num_peers; i++) 163 for (i = 0; i < num_peers; i++)
169 { 164 {
170 FPRINTF (stdout, "%s\n", 165 FPRINTF (stdout, "%s\n",
171 GNUNET_i2s_full (&recv_peers[i])); 166 GNUNET_i2s_full (&recv_peers[i]));
172
173 if (1 == num_stream_peers)
174 {
175 ret = 0;
176 GNUNET_SCHEDULER_shutdown ();
177 break;
178 }
179 else if (1 < num_stream_peers)
180 {
181 num_stream_peers--;
182 }
183 } 167 }
184} 168}
185 169
@@ -243,18 +227,7 @@ run (void *cls,
243 } else if (stream_input) 227 } else if (stream_input)
244 { 228 {
245 /* Get updates of view */ 229 /* Get updates of view */
246 if (NULL == args[0] || 230 GNUNET_RPS_stream_request (rps_handle, stream_input_handle, NULL);
247 0 == sscanf (args[0], "%lu", &num_stream_peers))
248 {
249 num_stream_peers = 0;
250 }
251 GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
252 if (0 != num_stream_peers)
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254 "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
255 else
256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257 "Requesting continuous peers from biased stream\n");
258 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); 231 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
259 } 232 }
260 else 233 else
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 07b88ddcd..20b314db3 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -25,6 +25,7 @@
25#include "gnunet_applications.h" 25#include "gnunet_applications.h"
26#include "gnunet_util_lib.h" 26#include "gnunet_util_lib.h"
27#include "gnunet_cadet_service.h" 27#include "gnunet_cadet_service.h"
28#include "gnunet_core_service.h"
28#include "gnunet_peerinfo_service.h" 29#include "gnunet_peerinfo_service.h"
29#include "gnunet_nse_service.h" 30#include "gnunet_nse_service.h"
30#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
@@ -36,11 +37,10 @@
36 37
37#include <math.h> 38#include <math.h>
38#include <inttypes.h> 39#include <inttypes.h>
40#include <string.h>
39 41
40#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) 42#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41 43
42// TODO modify @brief in every file
43
44// TODO check for overflows 44// TODO check for overflows
45 45
46// TODO align message structs 46// TODO align message structs
@@ -149,6 +149,11 @@ struct ChannelCtx;
149struct PeerContext 149struct PeerContext
150{ 150{
151 /** 151 /**
152 * The Sub this context belongs to.
153 */
154 struct Sub *sub;
155
156 /**
152 * Message queue open to client 157 * Message queue open to client
153 */ 158 */
154 struct GNUNET_MQ_Handle *mq; 159 struct GNUNET_MQ_Handle *mq;
@@ -211,6 +216,7 @@ struct PeerContext
211 * it, how did we get its ID, how many pushes (in a timeinterval), 216 * it, how did we get its ID, how many pushes (in a timeinterval),
212 * ...) 217 * ...)
213 */ 218 */
219 uint32_t round_pull_req;
214}; 220};
215 221
216/** 222/**
@@ -275,24 +281,17 @@ struct AttackedPeer
275#endif /* ENABLE_MALICIOUS */ 281#endif /* ENABLE_MALICIOUS */
276 282
277/** 283/**
278 * @brief One SubSampler. 284 * @brief One Sub.
279 * 285 *
280 * Essentially one instance of brahms that only connects to other instances 286 * Essentially one instance of brahms that only connects to other instances
281 * with the same (secret) value. 287 * with the same (secret) value.
282 */ 288 */
283struct SubSampler 289struct Sub
284{ 290{
285 /** 291 /**
286 * @brief Port used for cadet. 292 * @brief Hash of the shared value that defines Subs.
287 *
288 * Don't compute multiple times through making it global
289 */ 293 */
290 struct GNUNET_HashCode port; 294 struct GNUNET_HashCode hash;
291
292 /**
293 * Handler to CADET.
294 */
295 struct GNUNET_CADET_Handle *cadet_handle;
296 295
297 /** 296 /**
298 * @brief Port to communicate to other peers. 297 * @brief Port to communicate to other peers.
@@ -356,6 +355,16 @@ struct SubSampler
356 uint32_t num_observed_peers; 355 uint32_t num_observed_peers;
357 356
358 /** 357 /**
358 * @brief File name to log number of pushes per round to
359 */
360 char *file_name_push_recv;
361
362 /**
363 * @brief File name to log number of pushes per round to
364 */
365 char *file_name_pull_delays;
366
367 /**
359 * @brief Multipeermap (ab-) used to count unique peer_ids 368 * @brief Multipeermap (ab-) used to count unique peer_ids
360 */ 369 */
361 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers; 370 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
@@ -394,6 +403,28 @@ struct SubSampler
394 * Identifier for the main task that runs periodically. 403 * Identifier for the main task that runs periodically.
395 */ 404 */
396 struct GNUNET_SCHEDULER_Task *do_round_task; 405 struct GNUNET_SCHEDULER_Task *do_round_task;
406
407 /* === stats === */
408
409 /**
410 * @brief Counts the executed rounds.
411 */
412 uint32_t num_rounds;
413
414 /**
415 * @brief This array accumulates the number of received pushes per round.
416 *
417 * Number at index i represents the number of rounds with i observed pushes.
418 */
419 uint32_t push_recv[256];
420
421 /**
422 * @brief Number of pull replies with this delay measured in rounds.
423 *
424 * Number at index i represents the number of pull replies with a delay of i
425 * rounds.
426 */
427 uint32_t pull_delays[256];
397}; 428};
398 429
399 430
@@ -412,6 +443,21 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
412struct GNUNET_STATISTICS_Handle *stats; 443struct GNUNET_STATISTICS_Handle *stats;
413 444
414/** 445/**
446 * Handler to CADET.
447 */
448struct GNUNET_CADET_Handle *cadet_handle;
449
450/**
451 * Handle to CORE
452 */
453struct GNUNET_CORE_Handle *core_handle;
454
455/**
456 * @brief PeerMap to keep track of connected peers.
457 */
458struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
459
460/**
415 * Our own identity. 461 * Our own identity.
416 */ 462 */
417static struct GNUNET_PeerIdentity own_identity; 463static struct GNUNET_PeerIdentity own_identity;
@@ -511,12 +557,12 @@ static uint32_t push_limit = 10000;
511#endif /* ENABLE_MALICIOUS */ 557#endif /* ENABLE_MALICIOUS */
512 558
513/** 559/**
514 * @brief Main SubSampler. 560 * @brief Main Sub.
515 * 561 *
516 * This is run in any case by all peers and connects to all peers without 562 * This is run in any case by all peers and connects to all peers without
517 * specifying a shared value. 563 * specifying a shared value.
518 */ 564 */
519static struct SubSampler *mss; 565static struct Sub *msub;
520 566
521/** 567/**
522 * @brief Maximum number of valid peers to keep. 568 * @brief Maximum number of valid peers to keep.
@@ -524,28 +570,36 @@ static struct SubSampler *mss;
524 */ 570 */
525static const uint32_t num_valid_peers_max = UINT32_MAX; 571static const uint32_t num_valid_peers_max = UINT32_MAX;
526 572
527
528/*********************************************************************** 573/***********************************************************************
529 * /Globals 574 * /Globals
530***********************************************************************/ 575***********************************************************************/
531 576
532 577
578static void
579do_round (void *cls);
580
581static void
582do_mal_round (void *cls);
583
584
533/** 585/**
534 * @brief Get the #PeerContext associated with a peer 586 * @brief Get the #PeerContext associated with a peer
535 * 587 *
588 * @param peer_map The peer map containing the context
536 * @param peer the peer id 589 * @param peer the peer id
537 * 590 *
538 * @return the #PeerContext 591 * @return the #PeerContext
539 */ 592 */
540static struct PeerContext * 593static struct PeerContext *
541get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 594get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
595 const struct GNUNET_PeerIdentity *peer)
542{ 596{
543 struct PeerContext *ctx; 597 struct PeerContext *ctx;
544 int ret; 598 int ret;
545 599
546 ret = GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 600 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
547 GNUNET_assert (GNUNET_YES == ret); 601 GNUNET_assert (GNUNET_YES == ret);
548 ctx = GNUNET_CONTAINER_multipeermap_get (mss->peer_map, peer); 602 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
549 GNUNET_assert (NULL != ctx); 603 GNUNET_assert (NULL != ctx);
550 return ctx; 604 return ctx;
551} 605}
@@ -555,18 +609,21 @@ get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
555 * 609 *
556 * FIXME probably deprecated. Make this the new _online. 610 * FIXME probably deprecated. Make this the new _online.
557 * 611 *
612 * @param peer_map The peer map to check for the existence of @a peer
558 * @param peer peer in question 613 * @param peer peer in question
559 * 614 *
560 * @return #GNUNET_YES if peer is known 615 * @return #GNUNET_YES if peer is known
561 * #GNUNET_NO if peer is not knwon 616 * #GNUNET_NO if peer is not knwon
562 */ 617 */
563static int 618static int
564check_peer_known (const struct GNUNET_PeerIdentity *peer) 619check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
620 const struct GNUNET_PeerIdentity *peer)
565{ 621{
566 if (NULL != mss->peer_map) 622 if (NULL != peer_map)
567 { 623 {
568 return GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer); 624 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
569 } else 625 }
626 else
570 { 627 {
571 return GNUNET_NO; 628 return GNUNET_NO;
572 } 629 }
@@ -576,27 +633,33 @@ check_peer_known (const struct GNUNET_PeerIdentity *peer)
576/** 633/**
577 * @brief Create a new #PeerContext and insert it into the peer map 634 * @brief Create a new #PeerContext and insert it into the peer map
578 * 635 *
636 * @param sub The Sub this context belongs to.
579 * @param peer the peer to create the #PeerContext for 637 * @param peer the peer to create the #PeerContext for
580 * 638 *
581 * @return the #PeerContext 639 * @return the #PeerContext
582 */ 640 */
583static struct PeerContext * 641static struct PeerContext *
584create_peer_ctx (const struct GNUNET_PeerIdentity *peer) 642create_peer_ctx (struct Sub *sub,
643 const struct GNUNET_PeerIdentity *peer)
585{ 644{
586 struct PeerContext *ctx; 645 struct PeerContext *ctx;
587 int ret; 646 int ret;
588 647
589 GNUNET_assert (GNUNET_NO == check_peer_known (peer)); 648 GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
590 649
591 ctx = GNUNET_new (struct PeerContext); 650 ctx = GNUNET_new (struct PeerContext);
592 ctx->peer_id = *peer; 651 ctx->peer_id = *peer;
593 ret = GNUNET_CONTAINER_multipeermap_put (mss->peer_map, peer, ctx, 652 ctx->sub = sub;
653 ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 654 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
595 GNUNET_assert (GNUNET_OK == ret); 655 GNUNET_assert (GNUNET_OK == ret);
596 GNUNET_STATISTICS_set (stats, 656 if (sub == msub)
597 "# known peers", 657 {
598 GNUNET_CONTAINER_multipeermap_size (mss->peer_map), 658 GNUNET_STATISTICS_set (stats,
599 GNUNET_NO); 659 "# known peers",
660 GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
661 GNUNET_NO);
662 }
600 return ctx; 663 return ctx;
601} 664}
602 665
@@ -604,18 +667,20 @@ create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
604/** 667/**
605 * @brief Create or get a #PeerContext 668 * @brief Create or get a #PeerContext
606 * 669 *
670 * @param sub The Sub to which the created context belongs to
607 * @param peer the peer to get the associated context to 671 * @param peer the peer to get the associated context to
608 * 672 *
609 * @return the context 673 * @return the context
610 */ 674 */
611static struct PeerContext * 675static struct PeerContext *
612create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer) 676create_or_get_peer_ctx (struct Sub *sub,
677 const struct GNUNET_PeerIdentity *peer)
613{ 678{
614 if (GNUNET_NO == check_peer_known (peer)) 679 if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
615 { 680 {
616 return create_peer_ctx (peer); 681 return create_peer_ctx (sub, peer);
617 } 682 }
618 return get_peer_ctx (peer); 683 return get_peer_ctx (sub->peer_map, peer);
619} 684}
620 685
621 686
@@ -624,23 +689,22 @@ create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
624 * 689 *
625 * Also sets the #Peers_ONLINE flag accordingly 690 * Also sets the #Peers_ONLINE flag accordingly
626 * 691 *
627 * @param peer the peer in question 692 * @param peer_ctx Context of the peer of which connectivity is to be checked
628 * 693 *
629 * @return #GNUNET_YES if we are connected 694 * @return #GNUNET_YES if we are connected
630 * #GNUNET_NO otherwise 695 * #GNUNET_NO otherwise
631 */ 696 */
632static int 697static int
633check_connected (const struct GNUNET_PeerIdentity *peer) 698check_connected (struct PeerContext *peer_ctx)
634{ 699{
635 struct PeerContext *peer_ctx;
636
637 /* If we don't know about this peer we don't know whether it's online */ 700 /* If we don't know about this peer we don't know whether it's online */
638 if (GNUNET_NO == check_peer_known (peer)) 701 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
702 &peer_ctx->peer_id))
639 { 703 {
640 return GNUNET_NO; 704 return GNUNET_NO;
641 } 705 }
642 /* Get the context */ 706 /* Get the context */
643 peer_ctx = get_peer_ctx (peer); 707 peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
644 /* If we have no channel to this peer we don't know whether it's online */ 708 /* If we have no channel to this peer we don't know whether it's online */
645 if ( (NULL == peer_ctx->send_channel_ctx) && 709 if ( (NULL == peer_ctx->send_channel_ctx) &&
646 (NULL == peer_ctx->recv_channel_ctx) ) 710 (NULL == peer_ctx->recv_channel_ctx) )
@@ -709,21 +773,21 @@ get_rand_peer_iterator (void *cls,
709/** 773/**
710 * @brief Get a random peer from @a peer_map 774 * @brief Get a random peer from @a peer_map
711 * 775 *
712 * @param peer_map the peer_map to get the peer from 776 * @param valid_peers Peer map containing valid peers from which to select a
777 * random one
713 * 778 *
714 * @return a random peer 779 * @return a random peer
715 */ 780 */
716static const struct GNUNET_PeerIdentity * 781static const struct GNUNET_PeerIdentity *
717get_random_peer_from_peermap (const struct 782get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
718 GNUNET_CONTAINER_MultiPeerMap *peer_map)
719{ 783{
720 struct GetRandPeerIteratorCls *iterator_cls; 784 struct GetRandPeerIteratorCls *iterator_cls;
721 const struct GNUNET_PeerIdentity *ret; 785 const struct GNUNET_PeerIdentity *ret;
722 786
723 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls); 787 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
724 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 788 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
725 GNUNET_CONTAINER_multipeermap_size (peer_map)); 789 GNUNET_CONTAINER_multipeermap_size (valid_peers));
726 (void) GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 790 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
727 get_rand_peer_iterator, 791 get_rand_peer_iterator,
728 iterator_cls); 792 iterator_cls);
729 ret = iterator_cls->peer; 793 ret = iterator_cls->peer;
@@ -737,31 +801,37 @@ get_random_peer_from_peermap (const struct
737 * 801 *
738 * If valid peers are already #num_valid_peers_max, delete a peer previously. 802 * If valid peers are already #num_valid_peers_max, delete a peer previously.
739 * 803 *
740 * @param peer the peer that is added to the valid peers. 804 * @param peer The peer that is added to the valid peers.
805 * @param valid_peers Peer map of valid peers to which to add the @a peer
741 * 806 *
742 * @return #GNUNET_YES if no other peer had to be removed 807 * @return #GNUNET_YES if no other peer had to be removed
743 * #GNUNET_NO otherwise 808 * #GNUNET_NO otherwise
744 */ 809 */
745static int 810static int
746add_valid_peer (const struct GNUNET_PeerIdentity *peer) 811add_valid_peer (const struct GNUNET_PeerIdentity *peer,
812 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
747{ 813{
748 const struct GNUNET_PeerIdentity *rand_peer; 814 const struct GNUNET_PeerIdentity *rand_peer;
749 int ret; 815 int ret;
750 816
751 ret = GNUNET_YES; 817 ret = GNUNET_YES;
752 while (GNUNET_CONTAINER_multipeermap_size ( 818 /* Remove random peers until there is space for a new one */
753 mss->valid_peers) >= num_valid_peers_max) 819 while (num_valid_peers_max <=
820 GNUNET_CONTAINER_multipeermap_size (valid_peers))
754 { 821 {
755 rand_peer = get_random_peer_from_peermap (mss->valid_peers); 822 rand_peer = get_random_peer_from_peermap (valid_peers);
756 GNUNET_CONTAINER_multipeermap_remove_all (mss->valid_peers, rand_peer); 823 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
757 ret = GNUNET_NO; 824 ret = GNUNET_NO;
758 } 825 }
759 (void) GNUNET_CONTAINER_multipeermap_put (mss->valid_peers, peer, NULL, 826 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
760 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 827 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
761 GNUNET_STATISTICS_set (stats, 828 if (valid_peers == msub->valid_peers)
762 "# valid peers", 829 {
763 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers), 830 GNUNET_STATISTICS_set (stats,
764 GNUNET_NO); 831 "# valid peers",
832 GNUNET_CONTAINER_multipeermap_size (valid_peers),
833 GNUNET_NO);
834 }
765 return ret; 835 return ret;
766} 836}
767 837
@@ -799,7 +869,6 @@ set_peer_online (struct PeerContext *peer_ctx)
799 peer_ctx->online_check_pending = NULL; 869 peer_ctx->online_check_pending = NULL;
800 } 870 }
801 871
802 (void) add_valid_peer (peer);
803 SET_PEER_FLAG (peer_ctx, Peers_ONLINE); 872 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
804 873
805 /* Call pending operations */ 874 /* Call pending operations */
@@ -888,14 +957,12 @@ remove_channel_ctx (struct ChannelCtx *channel_ctx)
888/** 957/**
889 * @brief Get the channel of a peer. If not existing, create. 958 * @brief Get the channel of a peer. If not existing, create.
890 * 959 *
891 * @param peer the peer id 960 * @param peer_ctx Context of the peer of which to get the channel
892 * @return the #GNUNET_CADET_Channel used to send data to @a peer 961 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
893 */ 962 */
894struct GNUNET_CADET_Channel * 963struct GNUNET_CADET_Channel *
895get_channel (const struct GNUNET_PeerIdentity *peer) 964get_channel (struct PeerContext *peer_ctx)
896{ 965{
897 struct PeerContext *peer_ctx;
898 struct GNUNET_PeerIdentity *ctx_peer;
899 /* There exists a copy-paste-clone in run() */ 966 /* There exists a copy-paste-clone in run() */
900 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 967 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
901 GNUNET_MQ_hd_fixed_size (peer_check, 968 GNUNET_MQ_hd_fixed_size (peer_check,
@@ -918,20 +985,17 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
918 }; 985 };
919 986
920 987
921 peer_ctx = get_peer_ctx (peer);
922 if (NULL == peer_ctx->send_channel_ctx) 988 if (NULL == peer_ctx->send_channel_ctx)
923 { 989 {
924 LOG (GNUNET_ERROR_TYPE_DEBUG, 990 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "Trying to establish channel to peer %s\n", 991 "Trying to establish channel to peer %s\n",
926 GNUNET_i2s (peer)); 992 GNUNET_i2s (&peer_ctx->peer_id));
927 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
928 *ctx_peer = *peer;
929 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx); 993 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
930 peer_ctx->send_channel_ctx->channel = 994 peer_ctx->send_channel_ctx->channel =
931 GNUNET_CADET_channel_create (mss->cadet_handle, 995 GNUNET_CADET_channel_create (cadet_handle,
932 peer_ctx->send_channel_ctx, /* context */ 996 peer_ctx->send_channel_ctx, /* context */
933 peer, 997 &peer_ctx->peer_id,
934 &mss->port, 998 &peer_ctx->sub->hash,
935 GNUNET_CADET_OPTION_RELIABLE, 999 GNUNET_CADET_OPTION_RELIABLE,
936 NULL, /* WindowSize handler */ 1000 NULL, /* WindowSize handler */
937 &cleanup_destroyed_channel, /* Disconnect handler */ 1001 &cleanup_destroyed_channel, /* Disconnect handler */
@@ -949,19 +1013,15 @@ get_channel (const struct GNUNET_PeerIdentity *peer)
949 * If we already have a message queue open to this client, 1013 * If we already have a message queue open to this client,
950 * simply return it, otherways create one. 1014 * simply return it, otherways create one.
951 * 1015 *
952 * @param peer the peer to get the mq to 1016 * @param peer_ctx Context of the peer of whicht to get the mq
953 * @return the #GNUNET_MQ_Handle 1017 * @return the #GNUNET_MQ_Handle
954 */ 1018 */
955static struct GNUNET_MQ_Handle * 1019static struct GNUNET_MQ_Handle *
956get_mq (const struct GNUNET_PeerIdentity *peer) 1020get_mq (struct PeerContext *peer_ctx)
957{ 1021{
958 struct PeerContext *peer_ctx;
959
960 peer_ctx = get_peer_ctx (peer);
961
962 if (NULL == peer_ctx->mq) 1022 if (NULL == peer_ctx->mq)
963 { 1023 {
964 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer)); 1024 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
965 } 1025 }
966 return peer_ctx->mq; 1026 return peer_ctx->mq;
967} 1027}
@@ -969,20 +1029,18 @@ get_mq (const struct GNUNET_PeerIdentity *peer)
969/** 1029/**
970 * @brief Add an envelope to a message passed to mq to list of pending messages 1030 * @brief Add an envelope to a message passed to mq to list of pending messages
971 * 1031 *
972 * @param peer peer the message was sent to 1032 * @param peer_ctx Context of the peer for which to insert the envelope
973 * @param ev envelope to the message 1033 * @param ev envelope to the message
974 * @param type type of the message to be sent 1034 * @param type type of the message to be sent
975 * @return pointer to pending message 1035 * @return pointer to pending message
976 */ 1036 */
977static struct PendingMessage * 1037static struct PendingMessage *
978insert_pending_message (const struct GNUNET_PeerIdentity *peer, 1038insert_pending_message (struct PeerContext *peer_ctx,
979 struct GNUNET_MQ_Envelope *ev, 1039 struct GNUNET_MQ_Envelope *ev,
980 const char *type) 1040 const char *type)
981{ 1041{
982 struct PendingMessage *pending_msg; 1042 struct PendingMessage *pending_msg;
983 struct PeerContext *peer_ctx;
984 1043
985 peer_ctx = get_peer_ctx (peer);
986 pending_msg = GNUNET_new (struct PendingMessage); 1044 pending_msg = GNUNET_new (struct PendingMessage);
987 pending_msg->ev = ev; 1045 pending_msg->ev = ev;
988 pending_msg->peer_ctx = peer_ctx; 1046 pending_msg->peer_ctx = peer_ctx;
@@ -1039,6 +1097,7 @@ mq_online_check_successful (void *cls)
1039 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES); 1097 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1040 peer_ctx->online_check_pending = NULL; 1098 peer_ctx->online_check_pending = NULL;
1041 set_peer_online (peer_ctx); 1099 set_peer_online (peer_ctx);
1100 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1042 } 1101 }
1043} 1102}
1044 1103
@@ -1059,16 +1118,19 @@ check_peer_online (struct PeerContext *peer_ctx)
1059 1118
1060 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE); 1119 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1061 peer_ctx->online_check_pending = 1120 peer_ctx->online_check_pending =
1062 insert_pending_message (&peer_ctx->peer_id, ev, "Check online"); 1121 insert_pending_message (peer_ctx, ev, "Check online");
1063 mq = get_mq (&peer_ctx->peer_id); 1122 mq = get_mq (peer_ctx);
1064 GNUNET_MQ_notify_sent (ev, 1123 GNUNET_MQ_notify_sent (ev,
1065 mq_online_check_successful, 1124 mq_online_check_successful,
1066 peer_ctx); 1125 peer_ctx);
1067 GNUNET_MQ_send (mq, ev); 1126 GNUNET_MQ_send (mq, ev);
1068 GNUNET_STATISTICS_update (stats, 1127 if (peer_ctx->sub == msub)
1069 "# pending online checks", 1128 {
1070 1, 1129 GNUNET_STATISTICS_update (stats,
1071 GNUNET_NO); 1130 "# pending online checks",
1131 1,
1132 GNUNET_NO);
1133 }
1072} 1134}
1073 1135
1074 1136
@@ -1078,20 +1140,18 @@ check_peer_online (struct PeerContext *peer_ctx)
1078 * The array with pending operations will probably never grow really big, so 1140 * The array with pending operations will probably never grow really big, so
1079 * iterating over it should be ok. 1141 * iterating over it should be ok.
1080 * 1142 *
1081 * @param peer the peer to check 1143 * @param peer_ctx Context of the peer to check for the operation
1082 * @param peer_op the operation (#PeerOp) on the peer 1144 * @param peer_op the operation (#PeerOp) on the peer
1083 * 1145 *
1084 * @return #GNUNET_YES if this operation is scheduled on that peer 1146 * @return #GNUNET_YES if this operation is scheduled on that peer
1085 * #GNUNET_NO otherwise 1147 * #GNUNET_NO otherwise
1086 */ 1148 */
1087static int 1149static int
1088check_operation_scheduled (const struct GNUNET_PeerIdentity *peer, 1150check_operation_scheduled (const struct PeerContext *peer_ctx,
1089 const PeerOp peer_op) 1151 const PeerOp peer_op)
1090{ 1152{
1091 const struct PeerContext *peer_ctx;
1092 unsigned int i; 1153 unsigned int i;
1093 1154
1094 peer_ctx = get_peer_ctx (peer);
1095 for (i = 0; i < peer_ctx->num_pending_ops; i++) 1155 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1096 if (peer_op == peer_ctx->pending_ops[i].op) 1156 if (peer_op == peer_ctx->pending_ops[i].op)
1097 return GNUNET_YES; 1157 return GNUNET_YES;
@@ -1165,7 +1225,13 @@ schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1165/** 1225/**
1166 * @brief Remove peer 1226 * @brief Remove peer
1167 * 1227 *
1168 * @param peer the peer to clean 1228 * - Empties the list with pending operations
1229 * - Empties the list with pending messages
1230 * - Cancels potentially existing online check
1231 * - Schedules closing of send and recv channels
1232 * - Removes peer from peer map
1233 *
1234 * @param peer_ctx Context of the peer to be destroyed
1169 * @return #GNUNET_YES if peer was removed 1235 * @return #GNUNET_YES if peer was removed
1170 * #GNUNET_NO otherwise 1236 * #GNUNET_NO otherwise
1171 */ 1237 */
@@ -1173,9 +1239,9 @@ static int
1173destroy_peer (struct PeerContext *peer_ctx) 1239destroy_peer (struct PeerContext *peer_ctx)
1174{ 1240{
1175 GNUNET_assert (NULL != peer_ctx); 1241 GNUNET_assert (NULL != peer_ctx);
1176 GNUNET_assert (NULL != mss->peer_map); 1242 GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1177 if (GNUNET_NO == 1243 if (GNUNET_NO ==
1178 GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, 1244 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1179 &peer_ctx->peer_id)) 1245 &peer_ctx->peer_id))
1180 { 1246 {
1181 return GNUNET_NO; 1247 return GNUNET_NO;
@@ -1205,10 +1271,13 @@ destroy_peer (struct PeerContext *peer_ctx)
1205 sizeof (struct PendingMessage))) ) 1271 sizeof (struct PendingMessage))) )
1206 { 1272 {
1207 peer_ctx->online_check_pending = NULL; 1273 peer_ctx->online_check_pending = NULL;
1208 GNUNET_STATISTICS_update (stats, 1274 if (peer_ctx->sub == msub)
1209 "# pending online checks", 1275 {
1210 -1, 1276 GNUNET_STATISTICS_update (stats,
1211 GNUNET_NO); 1277 "# pending online checks",
1278 -1,
1279 GNUNET_NO);
1280 }
1212 } 1281 }
1213 remove_pending_message (peer_ctx->pending_messages_head, 1282 remove_pending_message (peer_ctx->pending_messages_head,
1214 GNUNET_YES); 1283 GNUNET_YES);
@@ -1245,16 +1314,19 @@ destroy_peer (struct PeerContext *peer_ctx)
1245 } 1314 }
1246 1315
1247 if (GNUNET_YES != 1316 if (GNUNET_YES !=
1248 GNUNET_CONTAINER_multipeermap_remove_all (mss->peer_map, 1317 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1249 &peer_ctx->peer_id)) 1318 &peer_ctx->peer_id))
1250 { 1319 {
1251 LOG (GNUNET_ERROR_TYPE_WARNING, 1320 LOG (GNUNET_ERROR_TYPE_WARNING,
1252 "removing peer from mss->peer_map failed\n"); 1321 "removing peer from peer_ctx->sub->peer_map failed\n");
1322 }
1323 if (peer_ctx->sub == msub)
1324 {
1325 GNUNET_STATISTICS_set (stats,
1326 "# known peers",
1327 GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1328 GNUNET_NO);
1253 } 1329 }
1254 GNUNET_STATISTICS_set (stats,
1255 "# known peers",
1256 GNUNET_CONTAINER_multipeermap_size (mss->peer_map),
1257 GNUNET_NO);
1258 GNUNET_free (peer_ctx); 1330 GNUNET_free (peer_ctx);
1259 return GNUNET_YES; 1331 return GNUNET_YES;
1260} 1332}
@@ -1274,9 +1346,10 @@ peermap_clear_iterator (void *cls,
1274 const struct GNUNET_PeerIdentity *key, 1346 const struct GNUNET_PeerIdentity *key,
1275 void *value) 1347 void *value)
1276{ 1348{
1277 (void) cls; 1349 struct Sub *sub = cls;
1278 (void) value; 1350 (void) value;
1279 destroy_peer (get_peer_ctx (key)); 1351
1352 destroy_peer (get_peer_ctx (sub->peer_map, key));
1280 return GNUNET_YES; 1353 return GNUNET_YES;
1281} 1354}
1282 1355
@@ -1295,12 +1368,22 @@ mq_notify_sent_cb (void *cls)
1295 LOG (GNUNET_ERROR_TYPE_DEBUG, 1368 LOG (GNUNET_ERROR_TYPE_DEBUG,
1296 "%s was sent.\n", 1369 "%s was sent.\n",
1297 pending_msg->type); 1370 pending_msg->type);
1298 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10)) 1371 if (pending_msg->peer_ctx->sub == msub)
1299 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO); 1372 {
1300 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) 1373 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1301 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO); 1374 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1302 if (0 == strncmp ("PUSH", pending_msg->type, 4)) 1375 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1303 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO); 1376 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1377 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1378 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1379 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1380 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1381 &pending_msg->peer_ctx->peer_id))
1382 GNUNET_STATISTICS_update(stats,
1383 "# pull requests sent (multi-hop peer)",
1384 1,
1385 GNUNET_NO);
1386 }
1304 /* Do not cancle message */ 1387 /* Do not cancle message */
1305 remove_pending_message (pending_msg, GNUNET_NO); 1388 remove_pending_message (pending_msg, GNUNET_NO);
1306} 1389}
@@ -1350,35 +1433,37 @@ store_peer_presistently_iterator (void *cls,
1350 1433
1351/** 1434/**
1352 * @brief Store the peers currently in #valid_peers to disk. 1435 * @brief Store the peers currently in #valid_peers to disk.
1436 *
1437 * @param sub Sub for which to store the valid peers
1353 */ 1438 */
1354static void 1439static void
1355store_valid_peers () 1440store_valid_peers (const struct Sub *sub)
1356{ 1441{
1357 struct GNUNET_DISK_FileHandle *fh; 1442 struct GNUNET_DISK_FileHandle *fh;
1358 uint32_t number_written_peers; 1443 uint32_t number_written_peers;
1359 int ret; 1444 int ret;
1360 1445
1361 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1446 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1362 { 1447 {
1363 return; 1448 return;
1364 } 1449 }
1365 1450
1366 ret = GNUNET_DISK_directory_create_for_file (mss->filename_valid_peers); 1451 ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1367 if (GNUNET_SYSERR == ret) 1452 if (GNUNET_SYSERR == ret)
1368 { 1453 {
1369 LOG (GNUNET_ERROR_TYPE_WARNING, 1454 LOG (GNUNET_ERROR_TYPE_WARNING,
1370 "Not able to create directory for file `%s'\n", 1455 "Not able to create directory for file `%s'\n",
1371 mss->filename_valid_peers); 1456 sub->filename_valid_peers);
1372 GNUNET_break (0); 1457 GNUNET_break (0);
1373 } 1458 }
1374 else if (GNUNET_NO == ret) 1459 else if (GNUNET_NO == ret)
1375 { 1460 {
1376 LOG (GNUNET_ERROR_TYPE_WARNING, 1461 LOG (GNUNET_ERROR_TYPE_WARNING,
1377 "Directory for file `%s' exists but is not writable for us\n", 1462 "Directory for file `%s' exists but is not writable for us\n",
1378 mss->filename_valid_peers); 1463 sub->filename_valid_peers);
1379 GNUNET_break (0); 1464 GNUNET_break (0);
1380 } 1465 }
1381 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1466 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1382 GNUNET_DISK_OPEN_WRITE | 1467 GNUNET_DISK_OPEN_WRITE |
1383 GNUNET_DISK_OPEN_CREATE, 1468 GNUNET_DISK_OPEN_CREATE,
1384 GNUNET_DISK_PERM_USER_READ | 1469 GNUNET_DISK_PERM_USER_READ |
@@ -1387,19 +1472,19 @@ store_valid_peers ()
1387 { 1472 {
1388 LOG (GNUNET_ERROR_TYPE_WARNING, 1473 LOG (GNUNET_ERROR_TYPE_WARNING,
1389 "Not able to write valid peers to file `%s'\n", 1474 "Not able to write valid peers to file `%s'\n",
1390 mss->filename_valid_peers); 1475 sub->filename_valid_peers);
1391 return; 1476 return;
1392 } 1477 }
1393 LOG (GNUNET_ERROR_TYPE_DEBUG, 1478 LOG (GNUNET_ERROR_TYPE_DEBUG,
1394 "Writing %u valid peers to disk\n", 1479 "Writing %u valid peers to disk\n",
1395 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1480 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1396 number_written_peers = 1481 number_written_peers =
1397 GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1482 GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1398 store_peer_presistently_iterator, 1483 store_peer_presistently_iterator,
1399 fh); 1484 fh);
1400 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1485 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1401 GNUNET_assert (number_written_peers == 1486 GNUNET_assert (number_written_peers ==
1402 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1487 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1403} 1488}
1404 1489
1405 1490
@@ -1451,9 +1536,11 @@ s2i_full (const char *string_repr)
1451 1536
1452/** 1537/**
1453 * @brief Restore the peers on disk to #valid_peers. 1538 * @brief Restore the peers on disk to #valid_peers.
1539 *
1540 * @param sub Sub for which to restore the valid peers
1454 */ 1541 */
1455static void 1542static void
1456restore_valid_peers () 1543restore_valid_peers (const struct Sub *sub)
1457{ 1544{
1458 off_t file_size; 1545 off_t file_size;
1459 uint32_t num_peers; 1546 uint32_t num_peers;
@@ -1464,16 +1551,16 @@ restore_valid_peers ()
1464 char *str_repr; 1551 char *str_repr;
1465 const struct GNUNET_PeerIdentity *peer; 1552 const struct GNUNET_PeerIdentity *peer;
1466 1553
1467 if (0 == strncmp ("DISABLE", mss->filename_valid_peers, 7)) 1554 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1468 { 1555 {
1469 return; 1556 return;
1470 } 1557 }
1471 1558
1472 if (GNUNET_OK != GNUNET_DISK_file_test (mss->filename_valid_peers)) 1559 if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1473 { 1560 {
1474 return; 1561 return;
1475 } 1562 }
1476 fh = GNUNET_DISK_file_open (mss->filename_valid_peers, 1563 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1477 GNUNET_DISK_OPEN_READ, 1564 GNUNET_DISK_OPEN_READ,
1478 GNUNET_DISK_PERM_NONE); 1565 GNUNET_DISK_PERM_NONE);
1479 GNUNET_assert (NULL != fh); 1566 GNUNET_assert (NULL != fh);
@@ -1485,13 +1572,13 @@ restore_valid_peers ()
1485 LOG (GNUNET_ERROR_TYPE_DEBUG, 1572 LOG (GNUNET_ERROR_TYPE_DEBUG,
1486 "Restoring %" PRIu32 " peers from file `%s'\n", 1573 "Restoring %" PRIu32 " peers from file `%s'\n",
1487 num_peers, 1574 num_peers,
1488 mss->filename_valid_peers); 1575 sub->filename_valid_peers);
1489 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53) 1576 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1490 { 1577 {
1491 str_repr = GNUNET_strndup (iter_buf, 53); 1578 str_repr = GNUNET_strndup (iter_buf, 53);
1492 peer = s2i_full (str_repr); 1579 peer = s2i_full (str_repr);
1493 GNUNET_free (str_repr); 1580 GNUNET_free (str_repr);
1494 add_valid_peer (peer); 1581 add_valid_peer (peer, sub->valid_peers);
1495 LOG (GNUNET_ERROR_TYPE_DEBUG, 1582 LOG (GNUNET_ERROR_TYPE_DEBUG,
1496 "Restored valid peer %s from disk\n", 1583 "Restored valid peer %s from disk\n",
1497 GNUNET_i2s_full (peer)); 1584 GNUNET_i2s_full (peer));
@@ -1499,10 +1586,10 @@ restore_valid_peers ()
1499 iter_buf = NULL; 1586 iter_buf = NULL;
1500 GNUNET_free (buf); 1587 GNUNET_free (buf);
1501 LOG (GNUNET_ERROR_TYPE_DEBUG, 1588 LOG (GNUNET_ERROR_TYPE_DEBUG,
1502 "num_peers: %" PRIu32 ", _size (mss->valid_peers): %u\n", 1589 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1503 num_peers, 1590 num_peers,
1504 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1591 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1505 if (num_peers != GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)) 1592 if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1506 { 1593 {
1507 LOG (GNUNET_ERROR_TYPE_WARNING, 1594 LOG (GNUNET_ERROR_TYPE_WARNING,
1508 "Number of restored peers does not match file size. Have probably duplicates.\n"); 1595 "Number of restored peers does not match file size. Have probably duplicates.\n");
@@ -1510,38 +1597,40 @@ restore_valid_peers ()
1510 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1597 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1511 LOG (GNUNET_ERROR_TYPE_DEBUG, 1598 LOG (GNUNET_ERROR_TYPE_DEBUG,
1512 "Restored %u valid peers from disk\n", 1599 "Restored %u valid peers from disk\n",
1513 GNUNET_CONTAINER_multipeermap_size (mss->valid_peers)); 1600 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1514} 1601}
1515 1602
1516 1603
1517/** 1604/**
1518 * @brief Delete storage of peers that was created with #initialise_peers () 1605 * @brief Delete storage of peers that was created with #initialise_peers ()
1606 *
1607 * @param sub Sub for which the storage is deleted
1519 */ 1608 */
1520static void 1609static void
1521peers_terminate () 1610peers_terminate (struct Sub *sub)
1522{ 1611{
1523 if (GNUNET_SYSERR == 1612 if (GNUNET_SYSERR ==
1524 GNUNET_CONTAINER_multipeermap_iterate (mss->peer_map, 1613 GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1525 &peermap_clear_iterator, 1614 &peermap_clear_iterator,
1526 NULL)) 1615 sub))
1527 { 1616 {
1528 LOG (GNUNET_ERROR_TYPE_WARNING, 1617 LOG (GNUNET_ERROR_TYPE_WARNING,
1529 "Iteration destroying peers was aborted.\n"); 1618 "Iteration destroying peers was aborted.\n");
1530 } 1619 }
1531 GNUNET_CONTAINER_multipeermap_destroy (mss->peer_map); 1620 GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1532 mss->peer_map = NULL; 1621 sub->peer_map = NULL;
1533 store_valid_peers (); 1622 store_valid_peers (sub);
1534 GNUNET_free (mss->filename_valid_peers); 1623 GNUNET_free (sub->filename_valid_peers);
1535 mss->filename_valid_peers = NULL; 1624 sub->filename_valid_peers = NULL;
1536 GNUNET_CONTAINER_multipeermap_destroy (mss->valid_peers); 1625 GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1537 mss->valid_peers = NULL; 1626 sub->valid_peers = NULL;
1538} 1627}
1539 1628
1540 1629
1541/** 1630/**
1542 * Iterator over #valid_peers hash map entries. 1631 * Iterator over #valid_peers hash map entries.
1543 * 1632 *
1544 * @param cls closure - unused 1633 * @param cls Closure that contains iterator function and closure
1545 * @param peer current peer id 1634 * @param peer current peer id
1546 * @param value value in the hash map - unused 1635 * @param value value in the hash map - unused
1547 * @return #GNUNET_YES if we should continue to 1636 * @return #GNUNET_YES if we should continue to
@@ -1556,21 +1645,22 @@ valid_peer_iterator (void *cls,
1556 struct PeersIteratorCls *it_cls = cls; 1645 struct PeersIteratorCls *it_cls = cls;
1557 (void) value; 1646 (void) value;
1558 1647
1559 return it_cls->iterator (it_cls->cls, 1648 return it_cls->iterator (it_cls->cls, peer);
1560 peer);
1561} 1649}
1562 1650
1563 1651
1564/** 1652/**
1565 * @brief Get all currently known, valid peer ids. 1653 * @brief Get all currently known, valid peer ids.
1566 * 1654 *
1567 * @param it function to call on each peer id 1655 * @param valid_peers Peer map containing the valid peers in question
1568 * @param it_cls extra argument to @a it 1656 * @param iterator function to call on each peer id
1657 * @param it_cls extra argument to @a iterator
1569 * @return the number of key value pairs processed, 1658 * @return the number of key value pairs processed,
1570 * #GNUNET_SYSERR if it aborted iteration 1659 * #GNUNET_SYSERR if it aborted iteration
1571 */ 1660 */
1572static int 1661static int
1573get_valid_peers (PeersIterator iterator, 1662get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1663 PeersIterator iterator,
1574 void *it_cls) 1664 void *it_cls)
1575{ 1665{
1576 struct PeersIteratorCls *cls; 1666 struct PeersIteratorCls *cls;
@@ -1579,7 +1669,7 @@ get_valid_peers (PeersIterator iterator,
1579 cls = GNUNET_new (struct PeersIteratorCls); 1669 cls = GNUNET_new (struct PeersIteratorCls);
1580 cls->iterator = iterator; 1670 cls->iterator = iterator;
1581 cls->cls = it_cls; 1671 cls->cls = it_cls;
1582 ret = GNUNET_CONTAINER_multipeermap_iterate (mss->valid_peers, 1672 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1583 valid_peer_iterator, 1673 valid_peer_iterator,
1584 cls); 1674 cls);
1585 GNUNET_free (cls); 1675 GNUNET_free (cls);
@@ -1593,19 +1683,21 @@ get_valid_peers (PeersIterator iterator,
1593 * This function is called on new peer_ids from 'external' sources 1683 * This function is called on new peer_ids from 'external' sources
1594 * (client seed, cadet get_peers(), ...) 1684 * (client seed, cadet get_peers(), ...)
1595 * 1685 *
1686 * @param sub Sub with the peer map that the @a peer will be added to
1596 * @param peer the new #GNUNET_PeerIdentity 1687 * @param peer the new #GNUNET_PeerIdentity
1597 * 1688 *
1598 * @return #GNUNET_YES if peer was inserted 1689 * @return #GNUNET_YES if peer was inserted
1599 * #GNUNET_NO otherwise 1690 * #GNUNET_NO otherwise
1600 */ 1691 */
1601static int 1692static int
1602insert_peer (const struct GNUNET_PeerIdentity *peer) 1693insert_peer (struct Sub *sub,
1694 const struct GNUNET_PeerIdentity *peer)
1603{ 1695{
1604 if (GNUNET_YES == check_peer_known (peer)) 1696 if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1605 { 1697 {
1606 return GNUNET_NO; /* We already know this peer - nothing to do */ 1698 return GNUNET_NO; /* We already know this peer - nothing to do */
1607 } 1699 }
1608 (void) create_peer_ctx (peer); 1700 (void) create_peer_ctx (sub, peer);
1609 return GNUNET_YES; 1701 return GNUNET_YES;
1610} 1702}
1611 1703
@@ -1613,6 +1705,7 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1613/** 1705/**
1614 * @brief Check whether flags on a peer are set. 1706 * @brief Check whether flags on a peer are set.
1615 * 1707 *
1708 * @param peer_map Peer map that is expected to contain the @a peer
1616 * @param peer the peer to check the flag of 1709 * @param peer the peer to check the flag of
1617 * @param flags the flags to check 1710 * @param flags the flags to check
1618 * 1711 *
@@ -1621,16 +1714,17 @@ insert_peer (const struct GNUNET_PeerIdentity *peer)
1621 * #GNUNET_NO otherwise 1714 * #GNUNET_NO otherwise
1622 */ 1715 */
1623static int 1716static int
1624check_peer_flag (const struct GNUNET_PeerIdentity *peer, 1717check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1718 const struct GNUNET_PeerIdentity *peer,
1625 enum Peers_PeerFlags flags) 1719 enum Peers_PeerFlags flags)
1626{ 1720{
1627 struct PeerContext *peer_ctx; 1721 struct PeerContext *peer_ctx;
1628 1722
1629 if (GNUNET_NO == check_peer_known (peer)) 1723 if (GNUNET_NO == check_peer_known (peer_map, peer))
1630 { 1724 {
1631 return GNUNET_SYSERR; 1725 return GNUNET_SYSERR;
1632 } 1726 }
1633 peer_ctx = get_peer_ctx (peer); 1727 peer_ctx = get_peer_ctx (peer_map, peer);
1634 return check_peer_flag_set (peer_ctx, flags); 1728 return check_peer_flag_set (peer_ctx, flags);
1635} 1729}
1636 1730
@@ -1639,18 +1733,20 @@ check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1639 * 1733 *
1640 * If not known yet, insert into known peers 1734 * If not known yet, insert into known peers
1641 * 1735 *
1736 * @param sub Sub which would contain the @a peer
1642 * @param peer the peer whose online is to be checked 1737 * @param peer the peer whose online is to be checked
1643 * @return #GNUNET_YES if the check was issued 1738 * @return #GNUNET_YES if the check was issued
1644 * #GNUNET_NO otherwise 1739 * #GNUNET_NO otherwise
1645 */ 1740 */
1646static int 1741static int
1647issue_peer_online_check (const struct GNUNET_PeerIdentity *peer) 1742issue_peer_online_check (struct Sub *sub,
1743 const struct GNUNET_PeerIdentity *peer)
1648{ 1744{
1649 struct PeerContext *peer_ctx; 1745 struct PeerContext *peer_ctx;
1650 1746
1651 (void) insert_peer (peer); 1747 (void) insert_peer (sub, peer); // TODO even needed?
1652 peer_ctx = get_peer_ctx (peer); 1748 peer_ctx = get_peer_ctx (sub->peer_map, peer);
1653 if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) && 1749 if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1654 (NULL == peer_ctx->online_check_pending) ) 1750 (NULL == peer_ctx->online_check_pending) )
1655 { 1751 {
1656 check_peer_online (peer_ctx); 1752 check_peer_online (peer_ctx);
@@ -1668,22 +1764,20 @@ issue_peer_online_check (const struct GNUNET_PeerIdentity *peer)
1668 * - there are pending messages 1764 * - there are pending messages
1669 * - there is no pending pull reply 1765 * - there is no pending pull reply
1670 * 1766 *
1671 * @param peer the peer in question 1767 * @param peer_ctx Context of the peer in question
1672 * @return #GNUNET_YES if peer is removable 1768 * @return #GNUNET_YES if peer is removable
1673 * #GNUNET_NO if peer is NOT removable 1769 * #GNUNET_NO if peer is NOT removable
1674 * #GNUNET_SYSERR if peer is not known 1770 * #GNUNET_SYSERR if peer is not known
1675 */ 1771 */
1676static int 1772static int
1677check_removable (const struct GNUNET_PeerIdentity *peer) 1773check_removable (const struct PeerContext *peer_ctx)
1678{ 1774{
1679 struct PeerContext *peer_ctx; 1775 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1680 1776 &peer_ctx->peer_id))
1681 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mss->peer_map, peer))
1682 { 1777 {
1683 return GNUNET_SYSERR; 1778 return GNUNET_SYSERR;
1684 } 1779 }
1685 1780
1686 peer_ctx = get_peer_ctx (peer);
1687 if ( (NULL != peer_ctx->recv_channel_ctx) || 1781 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1688 (NULL != peer_ctx->pending_messages_head) || 1782 (NULL != peer_ctx->pending_messages_head) ||
1689 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) ) 1783 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
@@ -1699,15 +1793,17 @@ check_removable (const struct GNUNET_PeerIdentity *peer)
1699 * 1793 *
1700 * A valid peer is a peer that we know exists eg. we were connected to once. 1794 * A valid peer is a peer that we know exists eg. we were connected to once.
1701 * 1795 *
1796 * @param valid_peers Peer map that would contain the @a peer
1702 * @param peer peer in question 1797 * @param peer peer in question
1703 * 1798 *
1704 * @return #GNUNET_YES if peer is valid 1799 * @return #GNUNET_YES if peer is valid
1705 * #GNUNET_NO if peer is not valid 1800 * #GNUNET_NO if peer is not valid
1706 */ 1801 */
1707static int 1802static int
1708check_peer_valid (const struct GNUNET_PeerIdentity *peer) 1803check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1804 const struct GNUNET_PeerIdentity *peer)
1709{ 1805{
1710 return GNUNET_CONTAINER_multipeermap_contains (mss->valid_peers, peer); 1806 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1711} 1807}
1712 1808
1713 1809
@@ -1716,13 +1812,14 @@ check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1716 * 1812 *
1717 * This establishes a sending channel 1813 * This establishes a sending channel
1718 * 1814 *
1719 * @param peer the peer to establish channel to 1815 * @param peer_ctx Context of the target peer
1720 */ 1816 */
1721static void 1817static void
1722indicate_sending_intention (const struct GNUNET_PeerIdentity *peer) 1818indicate_sending_intention (struct PeerContext *peer_ctx)
1723{ 1819{
1724 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1820 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1725 (void) get_channel (peer); 1821 &peer_ctx->peer_id));
1822 (void) get_channel (peer_ctx);
1726} 1823}
1727 1824
1728 1825
@@ -1730,17 +1827,14 @@ indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1730 * @brief Check whether other peer has the intention to send/opened channel 1827 * @brief Check whether other peer has the intention to send/opened channel
1731 * towars us 1828 * towars us
1732 * 1829 *
1733 * @param peer the peer in question 1830 * @param peer_ctx Context of the peer in question
1734 * 1831 *
1735 * @return #GNUNET_YES if peer has the intention to send 1832 * @return #GNUNET_YES if peer has the intention to send
1736 * #GNUNET_NO otherwise 1833 * #GNUNET_NO otherwise
1737 */ 1834 */
1738static int 1835static int
1739check_peer_send_intention (const struct GNUNET_PeerIdentity *peer) 1836check_peer_send_intention (const struct PeerContext *peer_ctx)
1740{ 1837{
1741 const struct PeerContext *peer_ctx;
1742
1743 peer_ctx = get_peer_ctx (peer);
1744 if (NULL != peer_ctx->recv_channel_ctx) 1838 if (NULL != peer_ctx->recv_channel_ctx)
1745 { 1839 {
1746 return GNUNET_YES; 1840 return GNUNET_YES;
@@ -1752,7 +1846,7 @@ check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1752/** 1846/**
1753 * Handle the channel a peer opens to us. 1847 * Handle the channel a peer opens to us.
1754 * 1848 *
1755 * @param cls The closure 1849 * @param cls The closure - Sub
1756 * @param channel The channel the peer wants to establish 1850 * @param channel The channel the peer wants to establish
1757 * @param initiator The peer's peer ID 1851 * @param initiator The peer's peer ID
1758 * 1852 *
@@ -1765,23 +1859,22 @@ handle_inbound_channel (void *cls,
1765 const struct GNUNET_PeerIdentity *initiator) 1859 const struct GNUNET_PeerIdentity *initiator)
1766{ 1860{
1767 struct PeerContext *peer_ctx; 1861 struct PeerContext *peer_ctx;
1768 struct GNUNET_PeerIdentity *ctx_peer;
1769 struct ChannelCtx *channel_ctx; 1862 struct ChannelCtx *channel_ctx;
1770 (void) cls; 1863 struct Sub *sub = cls;
1771 1864
1772 LOG (GNUNET_ERROR_TYPE_DEBUG, 1865 LOG (GNUNET_ERROR_TYPE_DEBUG,
1773 "New channel was established to us (Peer %s).\n", 1866 "New channel was established to us (Peer %s).\n",
1774 GNUNET_i2s (initiator)); 1867 GNUNET_i2s (initiator));
1775 GNUNET_assert (NULL != channel); /* according to cadet API */ 1868 GNUNET_assert (NULL != channel); /* according to cadet API */
1776 /* Make sure we 'know' about this peer */ 1869 /* Make sure we 'know' about this peer */
1777 peer_ctx = create_or_get_peer_ctx (initiator); 1870 peer_ctx = create_or_get_peer_ctx (sub, initiator);
1778 set_peer_online (peer_ctx); 1871 set_peer_online (peer_ctx);
1779 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity); 1872 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1780 *ctx_peer = *initiator;
1781 channel_ctx = add_channel_ctx (peer_ctx); 1873 channel_ctx = add_channel_ctx (peer_ctx);
1782 channel_ctx->channel = channel; 1874 channel_ctx->channel = channel;
1783 /* We only accept one incoming channel per peer */ 1875 /* We only accept one incoming channel per peer */
1784 if (GNUNET_YES == check_peer_send_intention (initiator)) 1876 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1877 initiator)))
1785 { 1878 {
1786 LOG (GNUNET_ERROR_TYPE_WARNING, 1879 LOG (GNUNET_ERROR_TYPE_WARNING,
1787 "Already got one receive channel. Destroying old one.\n"); 1880 "Already got one receive channel. Destroying old one.\n");
@@ -1799,21 +1892,19 @@ handle_inbound_channel (void *cls,
1799/** 1892/**
1800 * @brief Check whether a sending channel towards the given peer exists 1893 * @brief Check whether a sending channel towards the given peer exists
1801 * 1894 *
1802 * @param peer the peer to check for 1895 * @param peer_ctx Context of the peer in question
1803 * 1896 *
1804 * @return #GNUNET_YES if a sending channel towards that peer exists 1897 * @return #GNUNET_YES if a sending channel towards that peer exists
1805 * #GNUNET_NO otherwise 1898 * #GNUNET_NO otherwise
1806 */ 1899 */
1807static int 1900static int
1808check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) 1901check_sending_channel_exists (const struct PeerContext *peer_ctx)
1809{ 1902{
1810 struct PeerContext *peer_ctx; 1903 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1811 1904 &peer_ctx->peer_id))
1812 if (GNUNET_NO == check_peer_known (peer))
1813 { /* If no such peer exists, there is no channel */ 1905 { /* If no such peer exists, there is no channel */
1814 return GNUNET_NO; 1906 return GNUNET_NO;
1815 } 1907 }
1816 peer_ctx = get_peer_ctx (peer);
1817 if (NULL == peer_ctx->send_channel_ctx) 1908 if (NULL == peer_ctx->send_channel_ctx)
1818 { 1909 {
1819 return GNUNET_NO; 1910 return GNUNET_NO;
@@ -1826,24 +1917,22 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1826 * @brief Destroy the send channel of a peer e.g. stop indicating a sending 1917 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1827 * intention to another peer 1918 * intention to another peer
1828 * 1919 *
1829 * @peer the peer identity of the peer whose sending channel to destroy 1920 * @param peer_ctx Context to the peer
1830 * @return #GNUNET_YES if channel was destroyed 1921 * @return #GNUNET_YES if channel was destroyed
1831 * #GNUNET_NO otherwise 1922 * #GNUNET_NO otherwise
1832 */ 1923 */
1833static int 1924static int
1834destroy_sending_channel (const struct GNUNET_PeerIdentity *peer) 1925destroy_sending_channel (struct PeerContext *peer_ctx)
1835{ 1926{
1836 struct PeerContext *peer_ctx; 1927 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1837 1928 &peer_ctx->peer_id))
1838 if (GNUNET_NO == check_peer_known (peer))
1839 { 1929 {
1840 return GNUNET_NO; 1930 return GNUNET_NO;
1841 } 1931 }
1842 peer_ctx = get_peer_ctx (peer);
1843 if (NULL != peer_ctx->send_channel_ctx) 1932 if (NULL != peer_ctx->send_channel_ctx)
1844 { 1933 {
1845 destroy_channel (peer_ctx->send_channel_ctx); 1934 destroy_channel (peer_ctx->send_channel_ctx);
1846 (void) check_connected (peer); 1935 (void) check_connected (peer_ctx);
1847 return GNUNET_YES; 1936 return GNUNET_YES;
1848 } 1937 }
1849 return GNUNET_NO; 1938 return GNUNET_NO;
@@ -1855,12 +1944,12 @@ destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1855 * Keeps track about pending messages so they can be properly removed when the 1944 * Keeps track about pending messages so they can be properly removed when the
1856 * peer is destroyed. 1945 * peer is destroyed.
1857 * 1946 *
1858 * @param peer receeiver of the message 1947 * @param peer_ctx Context of the peer to which the message is to be sent
1859 * @param ev envelope of the message 1948 * @param ev envelope of the message
1860 * @param type type of the message 1949 * @param type type of the message
1861 */ 1950 */
1862static void 1951static void
1863send_message (const struct GNUNET_PeerIdentity *peer, 1952send_message (struct PeerContext *peer_ctx,
1864 struct GNUNET_MQ_Envelope *ev, 1953 struct GNUNET_MQ_Envelope *ev,
1865 const char *type) 1954 const char *type)
1866{ 1955{
@@ -1869,10 +1958,10 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1869 1958
1870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1959 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871 "Sending message to %s of type %s\n", 1960 "Sending message to %s of type %s\n",
1872 GNUNET_i2s (peer), 1961 GNUNET_i2s (&peer_ctx->peer_id),
1873 type); 1962 type);
1874 pending_msg = insert_pending_message (peer, ev, type); 1963 pending_msg = insert_pending_message (peer_ctx, ev, type);
1875 mq = get_mq (peer); 1964 mq = get_mq (peer_ctx);
1876 GNUNET_MQ_notify_sent (ev, 1965 GNUNET_MQ_notify_sent (ev,
1877 mq_notify_sent_cb, 1966 mq_notify_sent_cb,
1878 pending_msg); 1967 pending_msg);
@@ -1884,28 +1973,29 @@ send_message (const struct GNUNET_PeerIdentity *peer,
1884 * 1973 *
1885 * Avoids scheduling an operation twice. 1974 * Avoids scheduling an operation twice.
1886 * 1975 *
1887 * @param peer the peer we want to schedule the operation for once it gets 1976 * @param peer_ctx Context of the peer for which to schedule the operation
1888 * online 1977 * @param peer_op the operation to schedule
1978 * @param cls Closure to @a peer_op
1889 * 1979 *
1890 * @return #GNUNET_YES if the operation was scheduled 1980 * @return #GNUNET_YES if the operation was scheduled
1891 * #GNUNET_NO otherwise 1981 * #GNUNET_NO otherwise
1892 */ 1982 */
1893static int 1983static int
1894schedule_operation (const struct GNUNET_PeerIdentity *peer, 1984schedule_operation (struct PeerContext *peer_ctx,
1895 const PeerOp peer_op) 1985 const PeerOp peer_op,
1986 void *cls)
1896{ 1987{
1897 struct PeerPendingOp pending_op; 1988 struct PeerPendingOp pending_op;
1898 struct PeerContext *peer_ctx;
1899 1989
1900 GNUNET_assert (GNUNET_YES == check_peer_known (peer)); 1990 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1991 &peer_ctx->peer_id));
1901 1992
1902 //TODO if ONLINE execute immediately 1993 //TODO if ONLINE execute immediately
1903 1994
1904 if (GNUNET_NO == check_operation_scheduled (peer, peer_op)) 1995 if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
1905 { 1996 {
1906 peer_ctx = get_peer_ctx (peer);
1907 pending_op.op = peer_op; 1997 pending_op.op = peer_op;
1908 pending_op.op_cls = NULL; 1998 pending_op.op_cls = cls;
1909 GNUNET_array_append (peer_ctx->pending_ops, 1999 GNUNET_array_append (peer_ctx->pending_ops,
1910 peer_ctx->num_pending_ops, 2000 peer_ctx->num_pending_ops,
1911 pending_op); 2001 pending_op);
@@ -1983,6 +2073,11 @@ struct ClientContext
1983 * The client handle to send the reply to 2073 * The client handle to send the reply to
1984 */ 2074 */
1985 struct GNUNET_SERVICE_Client *client; 2075 struct GNUNET_SERVICE_Client *client;
2076
2077 /**
2078 * The #Sub this context belongs to
2079 */
2080 struct Sub *sub;
1986}; 2081};
1987 2082
1988/** 2083/**
@@ -2072,34 +2167,46 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2072 */ 2167 */
2073static void 2168static void
2074insert_in_view_op (void *cls, 2169insert_in_view_op (void *cls,
2075 const struct GNUNET_PeerIdentity *peer); 2170 const struct GNUNET_PeerIdentity *peer);
2076 2171
2077/** 2172/**
2078 * Insert PeerID in #view 2173 * Insert PeerID in #view
2079 * 2174 *
2080 * Called once we know a peer is online. 2175 * Called once we know a peer is online.
2081 * 2176 *
2177 * @param sub Sub in with the view to insert in
2178 * @param peer the peer to insert
2179 *
2082 * @return GNUNET_OK if peer was actually inserted 2180 * @return GNUNET_OK if peer was actually inserted
2083 * GNUNET_NO if peer was not inserted 2181 * GNUNET_NO if peer was not inserted
2084 */ 2182 */
2085static int 2183static int
2086insert_in_view (const struct GNUNET_PeerIdentity *peer) 2184insert_in_view (struct Sub *sub,
2185 const struct GNUNET_PeerIdentity *peer)
2087{ 2186{
2187 struct PeerContext *peer_ctx;
2088 int online; 2188 int online;
2089 int ret; 2189 int ret;
2090 2190
2091 online = check_peer_flag (peer, Peers_ONLINE); 2191 online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2192 peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2092 if ( (GNUNET_NO == online) || 2193 if ( (GNUNET_NO == online) ||
2093 (GNUNET_SYSERR == online) ) /* peer is not even known */ 2194 (GNUNET_SYSERR == online) ) /* peer is not even known */
2094 { 2195 {
2095 (void) issue_peer_online_check (peer); 2196 (void) issue_peer_online_check (sub, peer);
2096 (void) schedule_operation (peer, insert_in_view_op); 2197 (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2097 return GNUNET_NO; 2198 return GNUNET_NO;
2098 } 2199 }
2099 /* Open channel towards peer to keep connection open */ 2200 /* Open channel towards peer to keep connection open */
2100 indicate_sending_intention (peer); 2201 indicate_sending_intention (peer_ctx);
2101 ret = View_put (mss->view, peer); 2202 ret = View_put (sub->view, peer);
2102 GNUNET_STATISTICS_set (stats, "view size", View_size(mss->view), GNUNET_NO); 2203 if (peer_ctx->sub == msub)
2204 {
2205 GNUNET_STATISTICS_set (stats,
2206 "view size",
2207 View_size (peer_ctx->sub->view),
2208 GNUNET_NO);
2209 }
2103 return ret; 2210 return ret;
2104} 2211}
2105 2212
@@ -2111,18 +2218,21 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
2111 * @param view_array the peerids of the view as array (can be empty) 2218 * @param view_array the peerids of the view as array (can be empty)
2112 * @param view_size the size of the view array (can be 0) 2219 * @param view_size the size of the view array (can be 0)
2113 */ 2220 */
2114void 2221static void
2115send_view (const struct ClientContext *cli_ctx, 2222send_view (const struct ClientContext *cli_ctx,
2116 const struct GNUNET_PeerIdentity *view_array, 2223 const struct GNUNET_PeerIdentity *view_array,
2117 uint64_t view_size) 2224 uint64_t view_size)
2118{ 2225{
2119 struct GNUNET_MQ_Envelope *ev; 2226 struct GNUNET_MQ_Envelope *ev;
2120 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg; 2227 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2228 struct Sub *sub;
2121 2229
2122 if (NULL == view_array) 2230 if (NULL == view_array)
2123 { 2231 {
2124 view_size = View_size (mss->view); 2232 if (NULL == cli_ctx->sub) sub = msub;
2125 view_array = View_get_as_array(mss->view); 2233 else sub = cli_ctx->sub;
2234 view_size = View_size (sub->view);
2235 view_array = View_get_as_array (sub->view);
2126 } 2236 }
2127 2237
2128 ev = GNUNET_MQ_msg_extra (out_msg, 2238 ev = GNUNET_MQ_msg_extra (out_msg,
@@ -2146,7 +2256,7 @@ send_view (const struct ClientContext *cli_ctx,
2146 * @param view_array the peerids of the view as array (can be empty) 2256 * @param view_array the peerids of the view as array (can be empty)
2147 * @param view_size the size of the view array (can be 0) 2257 * @param view_size the size of the view array (can be 0)
2148 */ 2258 */
2149void 2259static void
2150send_stream_peers (const struct ClientContext *cli_ctx, 2260send_stream_peers (const struct ClientContext *cli_ctx,
2151 uint64_t num_peers, 2261 uint64_t num_peers,
2152 const struct GNUNET_PeerIdentity *peers) 2262 const struct GNUNET_PeerIdentity *peers)
@@ -2170,16 +2280,18 @@ send_stream_peers (const struct ClientContext *cli_ctx,
2170 2280
2171/** 2281/**
2172 * @brief sends updates to clients that are interested 2282 * @brief sends updates to clients that are interested
2283 *
2284 * @param sub Sub for which to notify clients
2173 */ 2285 */
2174static void 2286static void
2175clients_notify_view_update (void) 2287clients_notify_view_update (const struct Sub *sub)
2176{ 2288{
2177 struct ClientContext *cli_ctx_iter; 2289 struct ClientContext *cli_ctx_iter;
2178 uint64_t num_peers; 2290 uint64_t num_peers;
2179 const struct GNUNET_PeerIdentity *view_array; 2291 const struct GNUNET_PeerIdentity *view_array;
2180 2292
2181 num_peers = View_size (mss->view); 2293 num_peers = View_size (sub->view);
2182 view_array = View_get_as_array(mss->view); 2294 view_array = View_get_as_array(sub->view);
2183 /* check size of view is small enough */ 2295 /* check size of view is small enough */
2184 if (GNUNET_MAX_MESSAGE_SIZE < num_peers) 2296 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2185 { 2297 {
@@ -2214,9 +2326,13 @@ clients_notify_view_update (void)
2214 2326
2215/** 2327/**
2216 * @brief sends updates to clients that are interested 2328 * @brief sends updates to clients that are interested
2329 *
2330 * @param num_peers Number of peers to send
2331 * @param peers the array of peers to send
2217 */ 2332 */
2218static void 2333static void
2219clients_notify_stream_peer (uint64_t num_peers, 2334clients_notify_stream_peer (const struct Sub *sub,
2335 uint64_t num_peers,
2220 const struct GNUNET_PeerIdentity *peers) 2336 const struct GNUNET_PeerIdentity *peers)
2221 // TODO enum StreamPeerSource) 2337 // TODO enum StreamPeerSource)
2222{ 2338{
@@ -2230,15 +2346,21 @@ clients_notify_stream_peer (uint64_t num_peers,
2230 NULL != cli_ctx_iter; 2346 NULL != cli_ctx_iter;
2231 cli_ctx_iter = cli_ctx_iter->next) 2347 cli_ctx_iter = cli_ctx_iter->next)
2232 { 2348 {
2233 if (GNUNET_YES == cli_ctx_iter->stream_update) 2349 if (GNUNET_YES == cli_ctx_iter->stream_update &&
2350 (sub == cli_ctx_iter->sub || sub == msub))
2234 { 2351 {
2235 send_stream_peers (cli_ctx_iter, num_peers, peers); 2352 send_stream_peers (cli_ctx_iter, num_peers, peers);
2236 } 2353 }
2237 } 2354 }
2238} 2355}
2239 2356
2357
2240/** 2358/**
2241 * Put random peer from sampler into the view as history update. 2359 * Put random peer from sampler into the view as history update.
2360 *
2361 * @param ids Array of Peers to insert into view
2362 * @param num_peers Number of peers to insert
2363 * @param cls Closure - The Sub for which this is to be done
2242 */ 2364 */
2243static void 2365static void
2244hist_update (const struct GNUNET_PeerIdentity *ids, 2366hist_update (const struct GNUNET_PeerIdentity *ids,
@@ -2246,21 +2368,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2246 void *cls) 2368 void *cls)
2247{ 2369{
2248 unsigned int i; 2370 unsigned int i;
2249 (void) cls; 2371 struct Sub *sub = cls;
2250 2372
2251 for (i = 0; i < num_peers; i++) 2373 for (i = 0; i < num_peers; i++)
2252 { 2374 {
2253 int inserted; 2375 int inserted;
2254 inserted = insert_in_view (&ids[i]); 2376 inserted = insert_in_view (sub, &ids[i]);
2255 if (GNUNET_OK == inserted) 2377 if (GNUNET_OK == inserted)
2256 { 2378 {
2257 clients_notify_stream_peer (1, &ids[i]); 2379 clients_notify_stream_peer (sub, 1, &ids[i]);
2258 } 2380 }
2259 to_file (mss->file_name_view_log, 2381 to_file (sub->file_name_view_log,
2260 "+%s\t(hist)", 2382 "+%s\t(hist)",
2261 GNUNET_i2s_full (ids)); 2383 GNUNET_i2s_full (ids));
2262 } 2384 }
2263 clients_notify_view_update(); 2385 clients_notify_view_update (sub);
2264} 2386}
2265 2387
2266 2388
@@ -2269,6 +2391,9 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
2269 * 2391 *
2270 * If we do not have enough sampler elements, double current sampler size 2392 * If we do not have enough sampler elements, double current sampler size
2271 * If we have more than enough sampler elements, halv current sampler size 2393 * If we have more than enough sampler elements, halv current sampler size
2394 *
2395 * @param sampler The sampler to resize
2396 * @param new_size New size to which to resize
2272 */ 2397 */
2273static void 2398static void
2274resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size) 2399resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
@@ -2316,10 +2441,13 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2316 &peer_array[i], 2441 &peer_array[i],
2317 NULL, 2442 NULL,
2318 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 2443 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2319 GNUNET_STATISTICS_set (stats, 2444 if (msub->peer_map == peer_map)
2320 "# known peers", 2445 {
2321 GNUNET_CONTAINER_multipeermap_size (peer_map), 2446 GNUNET_STATISTICS_set (stats,
2322 GNUNET_NO); 2447 "# known peers",
2448 GNUNET_CONTAINER_multipeermap_size (peer_map),
2449 GNUNET_NO);
2450 }
2323 } 2451 }
2324} 2452}
2325 2453
@@ -2327,12 +2455,12 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2327/** 2455/**
2328 * Send a PULL REPLY to @a peer_id 2456 * Send a PULL REPLY to @a peer_id
2329 * 2457 *
2330 * @param peer_id the peer to send the reply to. 2458 * @param peer_ctx Context of the peer to send the reply to
2331 * @param peer_ids the peers to send to @a peer_id 2459 * @param peer_ids the peers to send to @a peer_id
2332 * @param num_peer_ids the number of peers to send to @a peer_id 2460 * @param num_peer_ids the number of peers to send to @a peer_id
2333 */ 2461 */
2334static void 2462static void
2335send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, 2463send_pull_reply (struct PeerContext *peer_ctx,
2336 const struct GNUNET_PeerIdentity *peer_ids, 2464 const struct GNUNET_PeerIdentity *peer_ids,
2337 unsigned int num_peer_ids) 2465 unsigned int num_peer_ids)
2338{ 2466{
@@ -2358,7 +2486,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2358 2486
2359 LOG (GNUNET_ERROR_TYPE_DEBUG, 2487 LOG (GNUNET_ERROR_TYPE_DEBUG,
2360 "Going to send PULL REPLY with %u peers to %s\n", 2488 "Going to send PULL REPLY with %u peers to %s\n",
2361 send_size, GNUNET_i2s (peer_id)); 2489 send_size, GNUNET_i2s (&peer_ctx->peer_id));
2362 2490
2363 ev = GNUNET_MQ_msg_extra (out_msg, 2491 ev = GNUNET_MQ_msg_extra (out_msg,
2364 send_size * sizeof (struct GNUNET_PeerIdentity), 2492 send_size * sizeof (struct GNUNET_PeerIdentity),
@@ -2367,8 +2495,11 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2367 GNUNET_memcpy (&out_msg[1], peer_ids, 2495 GNUNET_memcpy (&out_msg[1], peer_ids,
2368 send_size * sizeof (struct GNUNET_PeerIdentity)); 2496 send_size * sizeof (struct GNUNET_PeerIdentity));
2369 2497
2370 send_message (peer_id, ev, "PULL REPLY"); 2498 send_message (peer_ctx, ev, "PULL REPLY");
2371 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO); 2499 if (peer_ctx->sub == msub)
2500 {
2501 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2502 }
2372 // TODO check with send intention: as send_channel is used/opened we indicate 2503 // TODO check with send intention: as send_channel is used/opened we indicate
2373 // a sending intention without intending it. 2504 // a sending intention without intending it.
2374 // -> clean peer afterwards? 2505 // -> clean peer afterwards?
@@ -2380,13 +2511,17 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2380 * Insert PeerID in #pull_map 2511 * Insert PeerID in #pull_map
2381 * 2512 *
2382 * Called once we know a peer is online. 2513 * Called once we know a peer is online.
2514 *
2515 * @param cls Closure - Sub with the pull map to insert into
2516 * @param peer Peer to insert
2383 */ 2517 */
2384static void 2518static void
2385insert_in_pull_map (void *cls, 2519insert_in_pull_map (void *cls,
2386 const struct GNUNET_PeerIdentity *peer) 2520 const struct GNUNET_PeerIdentity *peer)
2387{ 2521{
2388 (void) cls; 2522 struct Sub *sub = cls;
2389 CustomPeerMap_put (mss->pull_map, peer); 2523
2524 CustomPeerMap_put (sub->pull_map, peer);
2390} 2525}
2391 2526
2392 2527
@@ -2395,18 +2530,21 @@ insert_in_pull_map (void *cls,
2395 * 2530 *
2396 * Called once we know a peer is online. 2531 * Called once we know a peer is online.
2397 * Implements #PeerOp 2532 * Implements #PeerOp
2533 *
2534 * @param cls Closure - Sub with view to insert peer into
2535 * @param peer the peer to insert
2398 */ 2536 */
2399static void 2537static void
2400insert_in_view_op (void *cls, 2538insert_in_view_op (void *cls,
2401 const struct GNUNET_PeerIdentity *peer) 2539 const struct GNUNET_PeerIdentity *peer)
2402{ 2540{
2403 (void) cls; 2541 struct Sub *sub = cls;
2404 int inserted; 2542 int inserted;
2405 2543
2406 inserted = insert_in_view (peer); 2544 inserted = insert_in_view (sub, peer);
2407 if (GNUNET_OK == inserted) 2545 if (GNUNET_OK == inserted)
2408 { 2546 {
2409 clients_notify_stream_peer (1, peer); 2547 clients_notify_stream_peer (sub, 1, peer);
2410 } 2548 }
2411} 2549}
2412 2550
@@ -2414,41 +2552,46 @@ insert_in_view_op (void *cls,
2414/** 2552/**
2415 * Update sampler with given PeerID. 2553 * Update sampler with given PeerID.
2416 * Implements #PeerOp 2554 * Implements #PeerOp
2555 *
2556 * @param cls Closure - Sub containing the sampler to insert into
2557 * @param peer Peer to insert
2417 */ 2558 */
2418static void 2559static void
2419insert_in_sampler (void *cls, 2560insert_in_sampler (void *cls,
2420 const struct GNUNET_PeerIdentity *peer) 2561 const struct GNUNET_PeerIdentity *peer)
2421{ 2562{
2422 (void) cls; 2563 struct Sub *sub = cls;
2564
2423 LOG (GNUNET_ERROR_TYPE_DEBUG, 2565 LOG (GNUNET_ERROR_TYPE_DEBUG,
2424 "Updating samplers with peer %s from insert_in_sampler()\n", 2566 "Updating samplers with peer %s from insert_in_sampler()\n",
2425 GNUNET_i2s (peer)); 2567 GNUNET_i2s (peer));
2426 RPS_sampler_update (mss->sampler, peer); 2568 RPS_sampler_update (sub->sampler, peer);
2427 if (0 < RPS_sampler_count_id (mss->sampler, peer)) 2569 if (0 < RPS_sampler_count_id (sub->sampler, peer))
2428 { 2570 {
2429 /* Make sure we 'know' about this peer */ 2571 /* Make sure we 'know' about this peer */
2430 (void) issue_peer_online_check (peer); 2572 (void) issue_peer_online_check (sub, peer);
2431 /* Establish a channel towards that peer to indicate we are going to send 2573 /* Establish a channel towards that peer to indicate we are going to send
2432 * messages to it */ 2574 * messages to it */
2433 //indicate_sending_intention (peer); 2575 //indicate_sending_intention (peer);
2434 } 2576 }
2435 #ifdef TO_FILE 2577 #ifdef TO_FILE
2436 mss->num_observed_peers++; 2578 sub->num_observed_peers++;
2437 GNUNET_CONTAINER_multipeermap_put 2579 GNUNET_CONTAINER_multipeermap_put
2438 (mss->observed_unique_peers, 2580 (sub->observed_unique_peers,
2439 peer, 2581 peer,
2440 NULL, 2582 NULL,
2441 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2583 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2442 uint32_t num_observed_unique_peers = 2584 uint32_t num_observed_unique_peers =
2443 GNUNET_CONTAINER_multipeermap_size (mss->observed_unique_peers); 2585 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2444 to_file (mss->file_name_observed_log, 2586 to_file (sub->file_name_observed_log,
2445 "%" PRIu32 " %" PRIu32 " %f\n", 2587 "%" PRIu32 " %" PRIu32 " %f\n",
2446 mss->num_observed_peers, 2588 sub->num_observed_peers,
2447 num_observed_unique_peers, 2589 num_observed_unique_peers,
2448 1.0*num_observed_unique_peers/mss->num_observed_peers) 2590 1.0*num_observed_unique_peers/sub->num_observed_peers)
2449 #endif /* TO_FILE */ 2591 #endif /* TO_FILE */
2450} 2592}
2451 2593
2594
2452/** 2595/**
2453 * @brief This is called on peers from external sources (cadet, peerinfo, ...) 2596 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2454 * If the peer is not known, online check is issued and it is 2597 * If the peer is not known, online check is issued and it is
@@ -2456,45 +2599,60 @@ insert_in_sampler (void *cls,
2456 * 2599 *
2457 * "External sources" refer to every source except the gossip. 2600 * "External sources" refer to every source except the gossip.
2458 * 2601 *
2459 * @param peer peer to insert 2602 * @param sub Sub for which @a peer was received
2603 * @param peer peer to insert/peer received
2460 */ 2604 */
2461static void 2605static void
2462got_peer (const struct GNUNET_PeerIdentity *peer) 2606got_peer (struct Sub *sub,
2607 const struct GNUNET_PeerIdentity *peer)
2463{ 2608{
2464 /* If we did not know this peer already, insert it into sampler and view */ 2609 /* If we did not know this peer already, insert it into sampler and view */
2465 if (GNUNET_YES == issue_peer_online_check (peer)) 2610 if (GNUNET_YES == issue_peer_online_check (sub, peer))
2466 { 2611 {
2467 schedule_operation (peer, insert_in_sampler); 2612 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2468 schedule_operation (peer, insert_in_view_op); 2613 &insert_in_sampler, sub);
2614 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2615 &insert_in_view_op, sub);
2616 }
2617 if (sub == msub)
2618 {
2619 GNUNET_STATISTICS_update (stats,
2620 "# learnd peers",
2621 1,
2622 GNUNET_NO);
2469 } 2623 }
2470 GNUNET_STATISTICS_update (stats,
2471 "# learnd peers",
2472 1,
2473 GNUNET_NO);
2474} 2624}
2475 2625
2626
2476/** 2627/**
2477 * @brief Checks if there is a sending channel and if it is needed 2628 * @brief Checks if there is a sending channel and if it is needed
2478 * 2629 *
2479 * @param peer the peer whose sending channel is checked 2630 * @param peer_ctx Context of the peer to check
2480 * @return GNUNET_YES if sending channel exists and is still needed 2631 * @return GNUNET_YES if sending channel exists and is still needed
2481 * GNUNET_NO otherwise 2632 * GNUNET_NO otherwise
2482 */ 2633 */
2483static int 2634static int
2484check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer) 2635check_sending_channel_needed (const struct PeerContext *peer_ctx)
2485{ 2636{
2486 /* struct GNUNET_CADET_Channel *channel; */ 2637 /* struct GNUNET_CADET_Channel *channel; */
2487 if (GNUNET_NO == check_peer_known (peer)) 2638 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2639 &peer_ctx->peer_id))
2488 { 2640 {
2489 return GNUNET_NO; 2641 return GNUNET_NO;
2490 } 2642 }
2491 if (GNUNET_YES == check_sending_channel_exists (peer)) 2643 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2492 { 2644 {
2493 if ( (0 < RPS_sampler_count_id (mss->sampler, peer)) || 2645 if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2494 (GNUNET_YES == View_contains_peer (mss->view, peer)) || 2646 &peer_ctx->peer_id)) ||
2495 (GNUNET_YES == CustomPeerMap_contains_peer (mss->push_map, peer)) || 2647 (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2496 (GNUNET_YES == CustomPeerMap_contains_peer (mss->pull_map, peer)) || 2648 &peer_ctx->peer_id)) ||
2497 (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING))) 2649 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2650 &peer_ctx->peer_id)) ||
2651 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2652 &peer_ctx->peer_id)) ||
2653 (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2654 &peer_ctx->peer_id,
2655 Peers_PULL_REPLY_PENDING)))
2498 { /* If we want to keep the connection to peer open */ 2656 { /* If we want to keep the connection to peer open */
2499 return GNUNET_YES; 2657 return GNUNET_YES;
2500 } 2658 }
@@ -2503,20 +2661,23 @@ check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2503 return GNUNET_NO; 2661 return GNUNET_NO;
2504} 2662}
2505 2663
2664
2506/** 2665/**
2507 * @brief remove peer from our knowledge, the view, push and pull maps and 2666 * @brief remove peer from our knowledge, the view, push and pull maps and
2508 * samplers. 2667 * samplers.
2509 * 2668 *
2669 * @param sub Sub with the data structures the peer is to be removed from
2510 * @param peer the peer to remove 2670 * @param peer the peer to remove
2511 */ 2671 */
2512static void 2672static void
2513remove_peer (const struct GNUNET_PeerIdentity *peer) 2673remove_peer (struct Sub *sub,
2674 const struct GNUNET_PeerIdentity *peer)
2514{ 2675{
2515 (void) View_remove_peer (mss->view, peer); 2676 (void) View_remove_peer (sub->view, peer);
2516 CustomPeerMap_remove_peer (mss->pull_map, peer); 2677 CustomPeerMap_remove_peer (sub->pull_map, peer);
2517 CustomPeerMap_remove_peer (mss->push_map, peer); 2678 CustomPeerMap_remove_peer (sub->push_map, peer);
2518 RPS_sampler_reinitialise_by_value (mss->sampler, peer); 2679 RPS_sampler_reinitialise_by_value (sub->sampler, peer);
2519 destroy_peer (get_peer_ctx (peer)); 2680 destroy_peer (get_peer_ctx (sub->peer_map, peer));
2520} 2681}
2521 2682
2522 2683
@@ -2525,35 +2686,47 @@ remove_peer (const struct GNUNET_PeerIdentity *peer)
2525 * 2686 *
2526 * If the sending channel is no longer needed it is destroyed. 2687 * If the sending channel is no longer needed it is destroyed.
2527 * 2688 *
2689 * @param sub Sub in which the current peer is to be cleaned
2528 * @param peer the peer whose data is about to be cleaned 2690 * @param peer the peer whose data is about to be cleaned
2529 */ 2691 */
2530static void 2692static void
2531clean_peer (const struct GNUNET_PeerIdentity *peer) 2693clean_peer (struct Sub *sub,
2694 const struct GNUNET_PeerIdentity *peer)
2532{ 2695{
2533 if (GNUNET_NO == check_sending_channel_needed (peer)) 2696 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2697 peer)))
2534 { 2698 {
2535 LOG (GNUNET_ERROR_TYPE_DEBUG, 2699 LOG (GNUNET_ERROR_TYPE_DEBUG,
2536 "Going to remove send channel to peer %s\n", 2700 "Going to remove send channel to peer %s\n",
2537 GNUNET_i2s (peer)); 2701 GNUNET_i2s (peer));
2538 #ifdef ENABLE_MALICIOUS 2702 #ifdef ENABLE_MALICIOUS
2539 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 2703 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2540 (void) destroy_sending_channel (peer); 2704 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2541 #else /* ENABLE_MALICIOUS */ 2705 #else /* ENABLE_MALICIOUS */
2542 (void) destroy_sending_channel (peer); 2706 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2543 #endif /* ENABLE_MALICIOUS */ 2707 #endif /* ENABLE_MALICIOUS */
2544 } 2708 }
2545 2709
2546 if ( (GNUNET_NO == check_peer_send_intention (peer)) && 2710 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map, peer))
2547 (GNUNET_NO == View_contains_peer (mss->view, peer)) && 2711 {
2548 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2712 /* Peer was already removed by callback on destroyed channel */
2549 (GNUNET_NO == CustomPeerMap_contains_peer (mss->push_map, peer)) && 2713 LOG (GNUNET_ERROR_TYPE_WARNING,
2550 (0 == RPS_sampler_count_id (mss->sampler, peer)) && 2714 "Peer was removed from our knowledge during cleanup\n");
2551 (GNUNET_NO != check_removable (peer)) ) 2715 return;
2716 }
2717
2718 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2719 peer))) &&
2720 (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2721 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2722 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2723 (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2724 (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
2552 { /* We can safely remove this peer */ 2725 { /* We can safely remove this peer */
2553 LOG (GNUNET_ERROR_TYPE_DEBUG, 2726 LOG (GNUNET_ERROR_TYPE_DEBUG,
2554 "Going to remove peer %s\n", 2727 "Going to remove peer %s\n",
2555 GNUNET_i2s (peer)); 2728 GNUNET_i2s (peer));
2556 remove_peer (peer); 2729 remove_peer (sub, peer);
2557 return; 2730 return;
2558 } 2731 }
2559} 2732}
@@ -2567,9 +2740,8 @@ clean_peer (const struct GNUNET_PeerIdentity *peer)
2567 * Also check if the knowledge about this peer is still needed. 2740 * Also check if the knowledge about this peer is still needed.
2568 * If not, remove this peer from our knowledge. 2741 * If not, remove this peer from our knowledge.
2569 * 2742 *
2570 * @param cls The closure 2743 * @param cls The closure - Context to the channel
2571 * @param channel The channel being closed 2744 * @param channel The channel being closed
2572 * @param channel_ctx The context associated with this channel
2573 */ 2745 */
2574static void 2746static void
2575cleanup_destroyed_channel (void *cls, 2747cleanup_destroyed_channel (void *cls,
@@ -2577,15 +2749,15 @@ cleanup_destroyed_channel (void *cls,
2577{ 2749{
2578 struct ChannelCtx *channel_ctx = cls; 2750 struct ChannelCtx *channel_ctx = cls;
2579 struct PeerContext *peer_ctx = channel_ctx->peer_ctx; 2751 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2580 (void) cls;
2581 (void) channel; 2752 (void) channel;
2582 2753
2583 channel_ctx->channel = NULL; 2754 channel_ctx->channel = NULL;
2584 remove_channel_ctx (channel_ctx); 2755 remove_channel_ctx (channel_ctx);
2585 if (NULL != peer_ctx && 2756 if (NULL != peer_ctx &&
2586 peer_ctx->send_channel_ctx == channel_ctx) 2757 peer_ctx->send_channel_ctx == channel_ctx &&
2758 GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2587 { 2759 {
2588 remove_peer (&peer_ctx->peer_id); 2760 remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2589 } 2761 }
2590} 2762}
2591 2763
@@ -2596,25 +2768,30 @@ cleanup_destroyed_channel (void *cls,
2596 2768
2597 2769
2598/*********************************************************************** 2770/***********************************************************************
2599 * SubSampler 2771 * Sub
2600***********************************************************************/ 2772***********************************************************************/
2601 2773
2602struct SubSampler * 2774/**
2603new_subsampler (const char *shared_value, 2775 * @brief Create a new Sub
2604 uint32_t sampler_size, 2776 *
2605 struct GNUNET_TIME_Relative round_interval) 2777 * @param hash Hash of value shared among rps instances on other hosts that
2778 * defines a subgroup to sample from.
2779 * @param sampler_size Size of the sampler
2780 * @param round_interval Interval (in average) between two rounds
2781 *
2782 * @return Sub
2783 */
2784struct Sub *
2785new_sub (const struct GNUNET_HashCode *hash,
2786 uint32_t sampler_size,
2787 struct GNUNET_TIME_Relative round_interval)
2606{ 2788{
2607 struct SubSampler *ss; 2789 struct Sub *sub;
2608 char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS;
2609 2790
2610 ss = GNUNET_new (struct SubSampler); 2791 sub = GNUNET_new (struct Sub);
2611 2792
2612 /* With the hash generated from the secret value this service only connects 2793 /* With the hash generated from the secret value this service only connects
2613 * to rps instances that share the value */ 2794 * to rps instances that share the value */
2614 strcat (hash_port_string, shared_value);
2615 GNUNET_CRYPTO_hash (hash_port_string,
2616 strlen (hash_port_string),
2617 &ss->port);
2618 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 2795 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2619 GNUNET_MQ_hd_fixed_size (peer_check, 2796 GNUNET_MQ_hd_fixed_size (peer_check,
2620 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE, 2797 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
@@ -2634,17 +2811,16 @@ new_subsampler (const char *shared_value,
2634 NULL), 2811 NULL),
2635 GNUNET_MQ_handler_end () 2812 GNUNET_MQ_handler_end ()
2636 }; 2813 };
2637 ss->cadet_handle = GNUNET_CADET_connect (cfg); 2814 sub->hash = *hash;
2638 GNUNET_assert (NULL != ss->cadet_handle); 2815 sub->cadet_port =
2639 ss->cadet_port = 2816 GNUNET_CADET_open_port (cadet_handle,
2640 GNUNET_CADET_open_port (ss->cadet_handle, 2817 &sub->hash,
2641 &ss->port,
2642 &handle_inbound_channel, /* Connect handler */ 2818 &handle_inbound_channel, /* Connect handler */
2643 NULL, /* cls */ 2819 sub, /* cls */
2644 NULL, /* WindowSize handler */ 2820 NULL, /* WindowSize handler */
2645 &cleanup_destroyed_channel, /* Disconnect handler */ 2821 &cleanup_destroyed_channel, /* Disconnect handler */
2646 cadet_handlers); 2822 cadet_handlers);
2647 if (NULL == ss->cadet_port) 2823 if (NULL == sub->cadet_port)
2648 { 2824 {
2649 LOG (GNUNET_ERROR_TYPE_ERROR, 2825 LOG (GNUNET_ERROR_TYPE_ERROR,
2650 "Cadet port `%s' is already in use.\n", 2826 "Cadet port `%s' is already in use.\n",
@@ -2653,56 +2829,246 @@ new_subsampler (const char *shared_value,
2653 } 2829 }
2654 2830
2655 /* Set up general data structure to keep track about peers */ 2831 /* Set up general data structure to keep track about peers */
2656 ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2832 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2657 if (GNUNET_OK != 2833 if (GNUNET_OK !=
2658 GNUNET_CONFIGURATION_get_value_filename (cfg, 2834 GNUNET_CONFIGURATION_get_value_filename (cfg,
2659 "rps", 2835 "rps",
2660 "FILENAME_VALID_PEERS", 2836 "FILENAME_VALID_PEERS",
2661 &ss->filename_valid_peers)) 2837 &sub->filename_valid_peers))
2662 { 2838 {
2663 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 2839 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2664 "rps", 2840 "rps",
2665 "FILENAME_VALID_PEERS"); 2841 "FILENAME_VALID_PEERS");
2666 } 2842 }
2667 ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); 2843 if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2844 {
2845 char *tmp_filename_valid_peers;
2846 char str_hash[105];
2847 uint32_t len_filename_valid_peers;
2848
2849 (void) GNUNET_snprintf (str_hash, 105, GNUNET_h2s_full (hash));
2850 tmp_filename_valid_peers = GNUNET_strdup (sub->filename_valid_peers);
2851 GNUNET_free (sub->filename_valid_peers);
2852 len_filename_valid_peers = strlen (tmp_filename_valid_peers) + 105; /* Len of full hash + 1 */
2853 sub->filename_valid_peers = GNUNET_malloc (len_filename_valid_peers);
2854 strncat (sub->filename_valid_peers,
2855 tmp_filename_valid_peers,
2856 len_filename_valid_peers);
2857 strncat (sub->filename_valid_peers,
2858 str_hash,
2859 len_filename_valid_peers);
2860 GNUNET_free (tmp_filename_valid_peers);
2861 }
2862 sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2668 2863
2669 /* Set up the sampler */ 2864 /* Set up the sampler */
2670 ss->sampler_size_est_min = sampler_size; 2865 sub->sampler_size_est_min = sampler_size;
2671 ss->sampler_size_est_need = sampler_size;; 2866 sub->sampler_size_est_need = sampler_size;;
2672 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min); 2867 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2673 ss->round_interval = round_interval; 2868 GNUNET_assert (0 != round_interval.rel_value_us);
2674 ss->sampler = RPS_sampler_init (sampler_size, 2869 sub->round_interval = round_interval;
2870 sub->sampler = RPS_sampler_init (sampler_size,
2675 round_interval); 2871 round_interval);
2676 2872
2677 /* Logging of internals */ 2873 /* Logging of internals */
2678 ss->file_name_view_log = store_prefix_file_name (&own_identity, "view"); 2874 sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2679 #ifdef TO_FILE 2875 #ifdef TO_FILE
2680 ss->file_name_observed_log = store_prefix_file_name (&own_identity, 2876 sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2681 "observed"); 2877 "observed");
2682 ss->num_observed_peers = 0; 2878 sub->file_name_push_recv = store_prefix_file_name (&own_identity,
2683 ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, 2879 "push_recv");
2880 sub->file_name_pull_delays = store_prefix_file_name (&own_identity,
2881 "pull_delays");
2882 sub->num_observed_peers = 0;
2883 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2684 GNUNET_NO); 2884 GNUNET_NO);
2685 #endif /* TO_FILE */ 2885 #endif /* TO_FILE */
2686 2886
2687 /* Set up data structures for gossip */ 2887 /* Set up data structures for gossip */
2688 ss->push_map = CustomPeerMap_create (4); 2888 sub->push_map = CustomPeerMap_create (4);
2689 ss->pull_map = CustomPeerMap_create (4); 2889 sub->pull_map = CustomPeerMap_create (4);
2690 ss->view_size_est_min = sampler_size;; 2890 sub->view_size_est_min = sampler_size;;
2691 ss->view = View_create (ss->view_size_est_min); 2891 sub->view = View_create (sub->view_size_est_min);
2692 GNUNET_STATISTICS_set (stats, 2892 if (sub == msub)
2693 "view size aim", 2893 {
2694 ss->view_size_est_min, 2894 GNUNET_STATISTICS_set (stats,
2695 GNUNET_NO); 2895 "view size aim",
2896 sub->view_size_est_min,
2897 GNUNET_NO);
2898 }
2696 2899
2900 /* Start executing rounds */
2901 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2697 2902
2698 return ss; 2903 return sub;
2699} 2904}
2700 2905
2906
2907/**
2908 * @brief Destroy Sub.
2909 *
2910 * @param sub Sub to destroy
2911 */
2912static void
2913destroy_sub (struct Sub *sub)
2914{
2915#ifdef TO_FILE
2916 char push_recv_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2917 char pull_delays_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2918#endif /* TO_FILE */
2919 GNUNET_assert (NULL != sub);
2920 GNUNET_assert (NULL != sub->do_round_task);
2921 GNUNET_SCHEDULER_cancel (sub->do_round_task);
2922 sub->do_round_task = NULL;
2923
2924 /* Disconnect from cadet */
2925 GNUNET_CADET_close_port (sub->cadet_port);
2926
2927 /* Clean up data structures for peers */
2928 RPS_sampler_destroy (sub->sampler);
2929 sub->sampler = NULL;
2930 View_destroy (sub->view);
2931 sub->view = NULL;
2932 CustomPeerMap_destroy (sub->push_map);
2933 sub->push_map = NULL;
2934 CustomPeerMap_destroy (sub->pull_map);
2935 sub->pull_map = NULL;
2936 peers_terminate (sub);
2937
2938 /* Free leftover data structures */
2939 GNUNET_free (sub->file_name_view_log);
2940 sub->file_name_view_log = NULL;
2941#ifdef TO_FILE
2942 GNUNET_free (sub->file_name_observed_log);
2943 sub->file_name_observed_log = NULL;
2944
2945 /* Write push frequencies to disk */
2946 for (uint32_t i = 0; i < 256; i++)
2947 {
2948 char push_recv_str_tmp[8];
2949 (void) snprintf (push_recv_str_tmp, 8, "%" PRIu32 "\n", sub->push_recv[i]);
2950 LOG (GNUNET_ERROR_TYPE_DEBUG,
2951 "Adding str `%s' to `%s'\n",
2952 push_recv_str_tmp,
2953 push_recv_str);
2954 (void) strncat (push_recv_str,
2955 push_recv_str_tmp,
2956 1535 - strnlen (push_recv_str, 1536));
2957 }
2958 (void) strncat (push_recv_str,
2959 "\n",
2960 1535 - strnlen (push_recv_str, 1536));
2961 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing push stats to disk\n");
2962 to_file_w_len (sub->file_name_push_recv, 1535, push_recv_str);
2963 GNUNET_free (sub->file_name_push_recv);
2964 sub->file_name_push_recv = NULL;
2965
2966 /* Write pull delays to disk */
2967 for (uint32_t i = 0; i < 256; i++)
2968 {
2969 char pull_delays_str_tmp[8];
2970 (void) snprintf (pull_delays_str_tmp, 8, "%" PRIu32 "\n", sub->pull_delays[i]);
2971 LOG (GNUNET_ERROR_TYPE_DEBUG,
2972 "Adding str `%s' to `%s'\n",
2973 pull_delays_str_tmp,
2974 pull_delays_str);
2975 (void) strncat (pull_delays_str,
2976 pull_delays_str_tmp,
2977 1535 - strnlen (pull_delays_str, 1536));
2978 }
2979 (void) strncat (pull_delays_str,
2980 "\n",
2981 1535 - strnlen (pull_delays_str, 1536));
2982 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing pull delays to disk\n");
2983 to_file_w_len (sub->file_name_pull_delays, 1535, pull_delays_str);
2984 GNUNET_free (sub->file_name_pull_delays);
2985 sub->file_name_pull_delays = NULL;
2986
2987 GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
2988 sub->observed_unique_peers = NULL;
2989#endif /* TO_FILE */
2990
2991 GNUNET_free (sub);
2992}
2993
2994
2995/***********************************************************************
2996 * /Sub
2997***********************************************************************/
2998
2999
2701/*********************************************************************** 3000/***********************************************************************
2702 * /SubSampler 3001 * Core handlers
2703***********************************************************************/ 3002***********************************************************************/
2704 3003
3004/**
3005 * @brief Callback on initialisation of Core.
3006 *
3007 * @param cls - unused
3008 * @param my_identity - unused
3009 */
3010void
3011core_init (void *cls,
3012 const struct GNUNET_PeerIdentity *my_identity)
3013{
3014 (void) cls;
3015 (void) my_identity;
3016
3017 map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3018}
3019
2705 3020
3021/**
3022 * @brief Callback for core.
3023 * Method called whenever a given peer connects.
3024 *
3025 * @param cls closure - unused
3026 * @param peer peer identity this notification is about
3027 * @return closure given to #core_disconnects as peer_cls
3028 */
3029void *
3030core_connects (void *cls,
3031 const struct GNUNET_PeerIdentity *peer,
3032 struct GNUNET_MQ_Handle *mq)
3033{
3034 (void) cls;
3035 (void) mq;
3036
3037 GNUNET_CONTAINER_multipeermap_put (map_single_hop, peer, NULL,
3038 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3039 return NULL;
3040}
3041
3042
3043/**
3044 * @brief Callback for core.
3045 * Method called whenever a peer disconnects.
3046 *
3047 * @param cls closure - unused
3048 * @param peer peer identity this notification is about
3049 * @param peer_cls closure given in #core_connects - unused
3050 */
3051void
3052core_disconnects (void *cls,
3053 const struct GNUNET_PeerIdentity *peer,
3054 void *peer_cls)
3055{
3056 (void) cls;
3057 (void) peer_cls;
3058
3059 GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3060}
3061
3062/***********************************************************************
3063 * /Core handlers
3064***********************************************************************/
3065
3066
3067/**
3068 * @brief Destroy the context for a (connected) client
3069 *
3070 * @param cli_ctx Context to destroy
3071 */
2706static void 3072static void
2707destroy_cli_ctx (struct ClientContext *cli_ctx) 3073destroy_cli_ctx (struct ClientContext *cli_ctx)
2708{ 3074{
@@ -2710,51 +3076,94 @@ destroy_cli_ctx (struct ClientContext *cli_ctx)
2710 GNUNET_CONTAINER_DLL_remove (cli_ctx_head, 3076 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2711 cli_ctx_tail, 3077 cli_ctx_tail,
2712 cli_ctx); 3078 cli_ctx);
3079 if (NULL != cli_ctx->sub)
3080 {
3081 destroy_sub (cli_ctx->sub);
3082 cli_ctx->sub = NULL;
3083 }
2713 GNUNET_free (cli_ctx); 3084 GNUNET_free (cli_ctx);
2714} 3085}
2715 3086
2716 3087
2717/** 3088/**
2718 * Function called by NSE. 3089 * @brief Update sizes in sampler and view on estimate update from nse service
2719 * 3090 *
2720 * Updates sizes of sampler list and view and adapt those lists 3091 * @param sub Sub
2721 * accordingly. 3092 * @param logestimate the log(Base 2) value of the current network size estimate
3093 * @param std_dev standard deviation for the estimate
2722 */ 3094 */
2723static void 3095static void
2724nse_callback (void *cls, 3096adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
2725 struct GNUNET_TIME_Absolute timestamp,
2726 double logestimate, double std_dev)
2727{ 3097{
2728 double estimate; 3098 double estimate;
2729 //double scale; // TODO this might go gloabal/config 3099 //double scale; // TODO this might go gloabal/config
2730 (void) cls;
2731 (void) timestamp;
2732 3100
2733 LOG (GNUNET_ERROR_TYPE_DEBUG, 3101 LOG (GNUNET_ERROR_TYPE_DEBUG,
2734 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n", 3102 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2735 logestimate, std_dev, RPS_sampler_get_size (mss->sampler)); 3103 logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
2736 //scale = .01; 3104 //scale = .01;
2737 estimate = GNUNET_NSE_log_estimate_to_n (logestimate); 3105 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2738 // GNUNET_NSE_log_estimate_to_n (logestimate); 3106 // GNUNET_NSE_log_estimate_to_n (logestimate);
2739 estimate = pow (estimate, 1.0 / 3); 3107 estimate = pow (estimate, 1.0 / 3);
2740 // TODO add if std_dev is a number 3108 // TODO add if std_dev is a number
2741 // estimate += (std_dev * scale); 3109 // estimate += (std_dev * scale);
2742 if (mss->view_size_est_min < ceil (estimate)) 3110 if (sub->view_size_est_min < ceil (estimate))
2743 { 3111 {
2744 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate); 3112 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2745 mss->sampler_size_est_need = estimate; 3113 sub->sampler_size_est_need = estimate;
2746 mss->view_size_est_need = estimate; 3114 sub->view_size_est_need = estimate;
2747 } else 3115 } else
2748 { 3116 {
2749 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate); 3117 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2750 //mss->sampler_size_est_need = mss->view_size_est_min; 3118 //sub->sampler_size_est_need = sub->view_size_est_min;
2751 mss->view_size_est_need = mss->view_size_est_min; 3119 sub->view_size_est_need = sub->view_size_est_min;
3120 }
3121 if (sub == msub)
3122 {
3123 GNUNET_STATISTICS_set (stats,
3124 "view size aim",
3125 sub->view_size_est_need,
3126 GNUNET_NO);
2752 } 3127 }
2753 GNUNET_STATISTICS_set (stats, "view size aim", mss->view_size_est_need, GNUNET_NO);
2754 3128
2755 /* If the NSE has changed adapt the lists accordingly */ 3129 /* If the NSE has changed adapt the lists accordingly */
2756 resize_wrapper (mss->sampler, mss->sampler_size_est_need); 3130 resize_wrapper (sub->sampler, sub->sampler_size_est_need);
2757 View_change_len (mss->view, mss->view_size_est_need); 3131 View_change_len (sub->view, sub->view_size_est_need);
3132}
3133
3134
3135/**
3136 * Function called by NSE.
3137 *
3138 * Updates sizes of sampler list and view and adapt those lists
3139 * accordingly.
3140 *
3141 * implements #GNUNET_NSE_Callback
3142 *
3143 * @param cls Closure - unused
3144 * @param timestamp time when the estimate was received from the server (or created by the server)
3145 * @param logestimate the log(Base 2) value of the current network size estimate
3146 * @param std_dev standard deviation for the estimate
3147 */
3148static void
3149nse_callback (void *cls,
3150 struct GNUNET_TIME_Absolute timestamp,
3151 double logestimate, double std_dev)
3152{
3153 (void) cls;
3154 (void) timestamp;
3155 struct ClientContext *cli_ctx_iter;
3156
3157 adapt_sizes (msub, logestimate, std_dev);
3158 for (cli_ctx_iter = cli_ctx_head;
3159 NULL != cli_ctx_iter;
3160 cli_ctx_iter = cli_ctx_iter->next)
3161 {
3162 if (NULL != cli_ctx_iter->sub)
3163 {
3164 adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3165 }
3166 }
2758} 3167}
2759 3168
2760 3169
@@ -2765,6 +3174,7 @@ nse_callback (void *cls,
2765 * @param cls the closure (#ClientContext) 3174 * @param cls the closure (#ClientContext)
2766 * @param msg the message 3175 * @param msg the message
2767 * @return #GNUNET_OK if @a msg is well-formed 3176 * @return #GNUNET_OK if @a msg is well-formed
3177 * #GNUNET_SYSERR otherwise
2768 */ 3178 */
2769static int 3179static int
2770check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg) 3180check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
@@ -2777,6 +3187,10 @@ check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2777 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || 3187 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2778 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) 3188 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2779 { 3189 {
3190 LOG (GNUNET_ERROR_TYPE_ERROR,
3191 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3192 ntohl (msg->num_peers),
3193 (msize / sizeof (struct GNUNET_PeerIdentity)));
2780 GNUNET_break (0); 3194 GNUNET_break (0);
2781 GNUNET_SERVICE_client_drop (cli_ctx->client); 3195 GNUNET_SERVICE_client_drop (cli_ctx->client);
2782 return GNUNET_SYSERR; 3196 return GNUNET_SYSERR;
@@ -2814,7 +3228,8 @@ handle_client_seed (void *cls,
2814 i, 3228 i,
2815 GNUNET_i2s (&peers[i])); 3229 GNUNET_i2s (&peers[i]));
2816 3230
2817 got_peer (&peers[i]); 3231 if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3232 if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
2818 } 3233 }
2819 GNUNET_SERVICE_client_continue (cli_ctx->client); 3234 GNUNET_SERVICE_client_continue (cli_ctx->client);
2820} 3235}
@@ -2824,7 +3239,8 @@ handle_client_seed (void *cls,
2824 * Handle RPS request from the client. 3239 * Handle RPS request from the client.
2825 * 3240 *
2826 * @param cls Client context 3241 * @param cls Client context
2827 * @param message unused 3242 * @param message Message containing the numer of updates the client wants to
3243 * receive
2828 */ 3244 */
2829static void 3245static void
2830handle_client_view_request (void *cls, 3246handle_client_view_request (void *cls,
@@ -2908,15 +3324,63 @@ handle_client_stream_cancel (void *cls,
2908 (void) msg; 3324 (void) msg;
2909 3325
2910 LOG (GNUNET_ERROR_TYPE_DEBUG, 3326 LOG (GNUNET_ERROR_TYPE_DEBUG,
2911 "Client requested peers from biased stream.\n"); 3327 "Client canceled receiving peers from biased stream.\n");
2912 cli_ctx->stream_update = GNUNET_NO; 3328 cli_ctx->stream_update = GNUNET_NO;
2913 3329
2914 GNUNET_assert (NULL != cli_ctx); 3330 GNUNET_assert (NULL != cli_ctx);
2915 GNUNET_SERVICE_client_continue (cli_ctx->client); 3331 GNUNET_SERVICE_client_continue (cli_ctx->client);
2916 if (0 == cli_ctx->view_updates_left) 3332}
3333
3334
3335/**
3336 * @brief Create and start a Sub.
3337 *
3338 * @param cls Closure - unused
3339 * @param msg Message containing the necessary information
3340 */
3341static void
3342handle_client_start_sub (void *cls,
3343 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3344{
3345 struct ClientContext *cli_ctx = cls;
3346
3347 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3348 if (NULL != cli_ctx->sub &&
3349 0 != memcmp (&cli_ctx->sub->hash,
3350 &msg->hash,
3351 sizeof (struct GNUNET_HashCode)))
2917 { 3352 {
2918 destroy_cli_ctx (cli_ctx); 3353 LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3354 destroy_sub (cli_ctx->sub);
3355 cli_ctx->sub = NULL;
3356 }
3357 cli_ctx->sub = new_sub (&msg->hash,
3358 msub->sampler_size_est_min, // TODO make api input?
3359 GNUNET_TIME_relative_ntoh (msg->round_interval));
3360 GNUNET_SERVICE_client_continue (cli_ctx->client);
3361}
3362
3363
3364/**
3365 * @brief Destroy the Sub
3366 *
3367 * @param cls Closure - unused
3368 * @param msg Message containing the hash that identifies the Sub
3369 */
3370static void
3371handle_client_stop_sub (void *cls,
3372 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3373{
3374 struct ClientContext *cli_ctx = cls;
3375
3376 GNUNET_assert (NULL != cli_ctx->sub);
3377 if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3378 {
3379 LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
2919 } 3380 }
3381 destroy_sub (cli_ctx->sub);
3382 cli_ctx->sub = NULL;
3383 GNUNET_SERVICE_client_continue (cli_ctx->client);
2920} 3384}
2921 3385
2922 3386
@@ -2926,8 +3390,8 @@ handle_client_stream_cancel (void *cls,
2926 * This does nothing. But without calling #GNUNET_CADET_receive_done() 3390 * This does nothing. But without calling #GNUNET_CADET_receive_done()
2927 * the channel is blocked for all other communication. 3391 * the channel is blocked for all other communication.
2928 * 3392 *
2929 * @param cls Closure 3393 * @param cls Closure - Context of channel
2930 * @param msg The message header 3394 * @param msg Message - unused
2931 */ 3395 */
2932static void 3396static void
2933handle_peer_check (void *cls, 3397handle_peer_check (void *cls,
@@ -2939,22 +3403,26 @@ handle_peer_check (void *cls,
2939 3403
2940 LOG (GNUNET_ERROR_TYPE_DEBUG, 3404 LOG (GNUNET_ERROR_TYPE_DEBUG,
2941 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer)); 3405 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2942 GNUNET_STATISTICS_update (stats, 3406 if (channel_ctx->peer_ctx->sub == msub)
2943 "# pending online checks", 3407 {
2944 -1, 3408 GNUNET_STATISTICS_update (stats,
2945 GNUNET_NO); 3409 "# pending online checks",
3410 -1,
3411 GNUNET_NO);
3412 }
2946 3413
2947 GNUNET_CADET_receive_done (channel_ctx->channel); 3414 GNUNET_CADET_receive_done (channel_ctx->channel);
2948} 3415}
2949 3416
3417
2950/** 3418/**
2951 * Handle a PUSH message from another peer. 3419 * Handle a PUSH message from another peer.
2952 * 3420 *
2953 * Check the proof of work and store the PeerID 3421 * Check the proof of work and store the PeerID
2954 * in the temporary list for pushed PeerIDs. 3422 * in the temporary list for pushed PeerIDs.
2955 * 3423 *
2956 * @param cls Closure 3424 * @param cls Closure - Context of channel
2957 * @param msg The message header 3425 * @param msg Message - unused
2958 */ 3426 */
2959static void 3427static void
2960handle_peer_push (void *cls, 3428handle_peer_push (void *cls,
@@ -2969,7 +3437,10 @@ handle_peer_push (void *cls,
2969 LOG (GNUNET_ERROR_TYPE_DEBUG, 3437 LOG (GNUNET_ERROR_TYPE_DEBUG,
2970 "Received PUSH (%s)\n", 3438 "Received PUSH (%s)\n",
2971 GNUNET_i2s (peer)); 3439 GNUNET_i2s (peer));
2972 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO); 3440 if (channel_ctx->peer_ctx->sub == msub)
3441 {
3442 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3443 }
2973 3444
2974 #ifdef ENABLE_MALICIOUS 3445 #ifdef ENABLE_MALICIOUS
2975 struct AttackedPeer *tmp_att_peer; 3446 struct AttackedPeer *tmp_att_peer;
@@ -2981,9 +3452,8 @@ handle_peer_push (void *cls,
2981 tmp_att_peer->peer_id = *peer; 3452 tmp_att_peer->peer_id = *peer;
2982 if (NULL == att_peer_set) 3453 if (NULL == att_peer_set)
2983 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO); 3454 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2984 if (GNUNET_NO == 3455 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2985 GNUNET_CONTAINER_multipeermap_contains (att_peer_set, 3456 peer))
2986 peer))
2987 { 3457 {
2988 GNUNET_CONTAINER_DLL_insert (att_peers_head, 3458 GNUNET_CONTAINER_DLL_insert (att_peers_head,
2989 att_peers_tail, 3459 att_peers_tail,
@@ -3004,9 +3474,10 @@ handle_peer_push (void *cls,
3004 #endif /* ENABLE_MALICIOUS */ 3474 #endif /* ENABLE_MALICIOUS */
3005 3475
3006 /* Add the sending peer to the push_map */ 3476 /* Add the sending peer to the push_map */
3007 CustomPeerMap_put (mss->push_map, peer); 3477 CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3008 3478
3009 GNUNET_break_op (check_peer_known (peer)); 3479 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3480 &channel_ctx->peer_ctx->peer_id));
3010 GNUNET_CADET_receive_done (channel_ctx->channel); 3481 GNUNET_CADET_receive_done (channel_ctx->channel);
3011} 3482}
3012 3483
@@ -3016,41 +3487,59 @@ handle_peer_push (void *cls,
3016 * 3487 *
3017 * Reply with the view of PeerIDs. 3488 * Reply with the view of PeerIDs.
3018 * 3489 *
3019 * @param cls Closure 3490 * @param cls Closure - Context of channel
3020 * @param msg The message header 3491 * @param msg Message - unused
3021 */ 3492 */
3022static void 3493static void
3023handle_peer_pull_request (void *cls, 3494handle_peer_pull_request (void *cls,
3024 const struct GNUNET_MessageHeader *msg) 3495 const struct GNUNET_MessageHeader *msg)
3025{ 3496{
3026 const struct ChannelCtx *channel_ctx = cls; 3497 const struct ChannelCtx *channel_ctx = cls;
3027 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id; 3498 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3499 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3028 const struct GNUNET_PeerIdentity *view_array; 3500 const struct GNUNET_PeerIdentity *view_array;
3029 (void) msg; 3501 (void) msg;
3030 3502
3031 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer)); 3503 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3032 GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO); 3504 if (peer_ctx->sub == msub)
3505 {
3506 GNUNET_STATISTICS_update(stats,
3507 "# pull request message received",
3508 1,
3509 GNUNET_NO);
3510 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3511 &peer_ctx->peer_id))
3512 {
3513 GNUNET_STATISTICS_update (stats,
3514 "# pull request message received (multi-hop peer)",
3515 1,
3516 GNUNET_NO);
3517 }
3518 }
3033 3519
3034 #ifdef ENABLE_MALICIOUS 3520 #ifdef ENABLE_MALICIOUS
3035 if (1 == mal_type 3521 if (1 == mal_type
3036 || 3 == mal_type) 3522 || 3 == mal_type)
3037 { /* Try to maximise representation */ 3523 { /* Try to maximise representation */
3038 send_pull_reply (peer, mal_peers, num_mal_peers); 3524 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3039 } 3525 }
3040 3526
3041 else if (2 == mal_type) 3527 else if (2 == mal_type)
3042 { /* Try to partition network */ 3528 { /* Try to partition network */
3043 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 3529 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3044 { 3530 {
3045 send_pull_reply (peer, mal_peers, num_mal_peers); 3531 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3046 } 3532 }
3047 } 3533 }
3048 #endif /* ENABLE_MALICIOUS */ 3534 #endif /* ENABLE_MALICIOUS */
3049 3535
3050 GNUNET_break_op (check_peer_known (peer)); 3536 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3537 &channel_ctx->peer_ctx->peer_id));
3051 GNUNET_CADET_receive_done (channel_ctx->channel); 3538 GNUNET_CADET_receive_done (channel_ctx->channel);
3052 view_array = View_get_as_array (mss->view); 3539 view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3053 send_pull_reply (peer, view_array, View_size (mss->view)); 3540 send_pull_reply (peer_ctx,
3541 view_array,
3542 View_size (channel_ctx->peer_ctx->sub->view));
3054} 3543}
3055 3544
3056 3545
@@ -3058,8 +3547,8 @@ handle_peer_pull_request (void *cls,
3058 * Check whether we sent a corresponding request and 3547 * Check whether we sent a corresponding request and
3059 * whether this reply is the first one. 3548 * whether this reply is the first one.
3060 * 3549 *
3061 * @param cls Closure 3550 * @param cls Closure - Context of channel
3062 * @param msg The message header 3551 * @param msg Message containing the replied peers
3063 */ 3552 */
3064static int 3553static int
3065check_peer_pull_reply (void *cls, 3554check_peer_pull_reply (void *cls,
@@ -3086,22 +3575,27 @@ check_peer_pull_reply (void *cls,
3086 return GNUNET_SYSERR; 3575 return GNUNET_SYSERR;
3087 } 3576 }
3088 3577
3089 if (GNUNET_YES != check_peer_flag (&sender_ctx->peer_id, 3578 if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3579 &sender_ctx->peer_id,
3090 Peers_PULL_REPLY_PENDING)) 3580 Peers_PULL_REPLY_PENDING))
3091 { 3581 {
3092 LOG (GNUNET_ERROR_TYPE_WARNING, 3582 LOG (GNUNET_ERROR_TYPE_WARNING,
3093 "Received a pull reply from a peer (%s) we didn't request one from!\n", 3583 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3094 GNUNET_i2s (&sender_ctx->peer_id)); 3584 GNUNET_i2s (&sender_ctx->peer_id));
3095 GNUNET_STATISTICS_update (stats, 3585 if (sender_ctx->sub == msub)
3096 "# unrequested pull replies", 3586 {
3097 1, 3587 GNUNET_STATISTICS_update (stats,
3098 GNUNET_NO); 3588 "# unrequested pull replies",
3589 1,
3590 GNUNET_NO);
3591 }
3099 GNUNET_break_op (0); 3592 GNUNET_break_op (0);
3100 return GNUNET_SYSERR; 3593 return GNUNET_SYSERR;
3101 } 3594 }
3102 return GNUNET_OK; 3595 return GNUNET_OK;
3103} 3596}
3104 3597
3598
3105/** 3599/**
3106 * Handle PULL REPLY message from another peer. 3600 * Handle PULL REPLY message from another peer.
3107 * 3601 *
@@ -3115,13 +3609,29 @@ handle_peer_pull_reply (void *cls,
3115 const struct ChannelCtx *channel_ctx = cls; 3609 const struct ChannelCtx *channel_ctx = cls;
3116 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id; 3610 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3117 const struct GNUNET_PeerIdentity *peers; 3611 const struct GNUNET_PeerIdentity *peers;
3612 struct Sub *sub = channel_ctx->peer_ctx->sub;
3118 uint32_t i; 3613 uint32_t i;
3119#ifdef ENABLE_MALICIOUS 3614#ifdef ENABLE_MALICIOUS
3120 struct AttackedPeer *tmp_att_peer; 3615 struct AttackedPeer *tmp_att_peer;
3121#endif /* ENABLE_MALICIOUS */ 3616#endif /* ENABLE_MALICIOUS */
3122 3617
3618 sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3123 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender)); 3619 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3124 GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO); 3620 if (channel_ctx->peer_ctx->sub == msub)
3621 {
3622 GNUNET_STATISTICS_update (stats,
3623 "# pull reply messages received",
3624 1,
3625 GNUNET_NO);
3626 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3627 &channel_ctx->peer_ctx->peer_id))
3628 {
3629 GNUNET_STATISTICS_update (stats,
3630 "# pull reply messages received (multi-hop peer)",
3631 1,
3632 GNUNET_NO);
3633 }
3634 }
3125 3635
3126 #ifdef ENABLE_MALICIOUS 3636 #ifdef ENABLE_MALICIOUS
3127 // We shouldn't even receive pull replies as we're not sending 3637 // We shouldn't even receive pull replies as we're not sending
@@ -3165,23 +3675,28 @@ handle_peer_pull_reply (void *cls,
3165 } 3675 }
3166 #endif /* ENABLE_MALICIOUS */ 3676 #endif /* ENABLE_MALICIOUS */
3167 /* Make sure we 'know' about this peer */ 3677 /* Make sure we 'know' about this peer */
3168 (void) insert_peer (&peers[i]); 3678 (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]);
3169 3679
3170 if (GNUNET_YES == check_peer_valid (&peers[i])) 3680 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3681 &peers[i]))
3171 { 3682 {
3172 CustomPeerMap_put (mss->pull_map, &peers[i]); 3683 CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]);
3173 } 3684 }
3174 else 3685 else
3175 { 3686 {
3176 schedule_operation (&peers[i], insert_in_pull_map); 3687 schedule_operation (channel_ctx->peer_ctx,
3177 (void) issue_peer_online_check (&peers[i]); 3688 insert_in_pull_map,
3689 channel_ctx->peer_ctx->sub); /* cls */
3690 (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]);
3178 } 3691 }
3179 } 3692 }
3180 3693
3181 UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING); 3694 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender),
3182 clean_peer (sender); 3695 Peers_PULL_REPLY_PENDING);
3696 clean_peer (channel_ctx->peer_ctx->sub, sender);
3183 3697
3184 GNUNET_break_op (check_peer_known (sender)); 3698 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3699 sender));
3185 GNUNET_CADET_receive_done (channel_ctx->channel); 3700 GNUNET_CADET_receive_done (channel_ctx->channel);
3186} 3701}
3187 3702
@@ -3193,12 +3708,12 @@ handle_peer_pull_reply (void *cls,
3193 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min)) 3708 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3194 * It would return a random value between 2 and 6 min. 3709 * It would return a random value between 2 and 6 min.
3195 * 3710 *
3196 * @param mean the mean 3711 * @param mean the mean time until the next round
3197 * @param spread the inverse amount of deviation from the mean 3712 * @param spread the inverse amount of deviation from the mean
3198 */ 3713 */
3199static struct GNUNET_TIME_Relative 3714static struct GNUNET_TIME_Relative
3200compute_rand_delay (struct GNUNET_TIME_Relative mean, 3715compute_rand_delay (struct GNUNET_TIME_Relative mean,
3201 unsigned int spread) 3716 unsigned int spread)
3202{ 3717{
3203 struct GNUNET_TIME_Relative half_interval; 3718 struct GNUNET_TIME_Relative half_interval;
3204 struct GNUNET_TIME_Relative ret; 3719 struct GNUNET_TIME_Relative ret;
@@ -3238,53 +3753,69 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean,
3238/** 3753/**
3239 * Send single pull request 3754 * Send single pull request
3240 * 3755 *
3241 * @param peer_id the peer to send the pull request to. 3756 * @param peer_ctx Context to the peer to send request to
3242 */ 3757 */
3243static void 3758static void
3244send_pull_request (const struct GNUNET_PeerIdentity *peer) 3759send_pull_request (struct PeerContext *peer_ctx)
3245{ 3760{
3246 struct GNUNET_MQ_Envelope *ev; 3761 struct GNUNET_MQ_Envelope *ev;
3247 3762
3248 GNUNET_assert (GNUNET_NO == check_peer_flag (peer, 3763 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3249 Peers_PULL_REPLY_PENDING)); 3764 &peer_ctx->peer_id,
3250 SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING); 3765 Peers_PULL_REPLY_PENDING));
3766 SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
3767 peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3251 3768
3252 LOG (GNUNET_ERROR_TYPE_DEBUG, 3769 LOG (GNUNET_ERROR_TYPE_DEBUG,
3253 "Going to send PULL REQUEST to peer %s.\n", 3770 "Going to send PULL REQUEST to peer %s.\n",
3254 GNUNET_i2s (peer)); 3771 GNUNET_i2s (&peer_ctx->peer_id));
3255 3772
3256 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); 3773 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3257 send_message (peer, ev, "PULL REQUEST"); 3774 send_message (peer_ctx, ev, "PULL REQUEST");
3258 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO); 3775 if (peer_ctx->sub)
3776 {
3777 GNUNET_STATISTICS_update (stats,
3778 "# pull request send issued",
3779 1,
3780 GNUNET_NO);
3781 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3782 &peer_ctx->peer_id))
3783 {
3784 GNUNET_STATISTICS_update (stats,
3785 "# pull request send issued (multi-hop peer)",
3786 1,
3787 GNUNET_NO);
3788 }
3789 }
3259} 3790}
3260 3791
3261 3792
3262/** 3793/**
3263 * Send single push 3794 * Send single push
3264 * 3795 *
3265 * @param peer_id the peer to send the push to. 3796 * @param peer_ctx Context of peer to send push to
3266 */ 3797 */
3267static void 3798static void
3268send_push (const struct GNUNET_PeerIdentity *peer_id) 3799send_push (struct PeerContext *peer_ctx)
3269{ 3800{
3270 struct GNUNET_MQ_Envelope *ev; 3801 struct GNUNET_MQ_Envelope *ev;
3271 3802
3272 LOG (GNUNET_ERROR_TYPE_DEBUG, 3803 LOG (GNUNET_ERROR_TYPE_DEBUG,
3273 "Going to send PUSH to peer %s.\n", 3804 "Going to send PUSH to peer %s.\n",
3274 GNUNET_i2s (peer_id)); 3805 GNUNET_i2s (&peer_ctx->peer_id));
3275 3806
3276 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); 3807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3277 send_message (peer_id, ev, "PUSH"); 3808 send_message (peer_ctx, ev, "PUSH");
3278 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO); 3809 if (peer_ctx->sub)
3810 {
3811 GNUNET_STATISTICS_update (stats,
3812 "# push send issued",
3813 1,
3814 GNUNET_NO);
3815 }
3279} 3816}
3280 3817
3281 3818
3282static void
3283do_round (void *cls);
3284
3285static void
3286do_mal_round (void *cls);
3287
3288#ifdef ENABLE_MALICIOUS 3819#ifdef ENABLE_MALICIOUS
3289 3820
3290 3821
@@ -3334,7 +3865,9 @@ handle_client_act_malicious (void *cls,
3334 struct GNUNET_PeerIdentity *peers; 3865 struct GNUNET_PeerIdentity *peers;
3335 uint32_t num_mal_peers_sent; 3866 uint32_t num_mal_peers_sent;
3336 uint32_t num_mal_peers_old; 3867 uint32_t num_mal_peers_old;
3868 struct Sub *sub = cli_ctx->sub;
3337 3869
3870 if (NULL == sub) sub = msub;
3338 /* Do actual logic */ 3871 /* Do actual logic */
3339 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 3872 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3340 mal_type = ntohl (msg->type); 3873 mal_type = ntohl (msg->type);
@@ -3365,8 +3898,9 @@ handle_client_act_malicious (void *cls,
3365 mal_peer_set); 3898 mal_peer_set);
3366 3899
3367 /* Substitute do_round () with do_mal_round () */ 3900 /* Substitute do_round () with do_mal_round () */
3368 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3901 GNUNET_assert (NULL != sub->do_round_task);
3369 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3902 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3903 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3370 } 3904 }
3371 3905
3372 else if ( (2 == mal_type) || 3906 else if ( (2 == mal_type) ||
@@ -3398,9 +3932,9 @@ handle_client_act_malicious (void *cls,
3398 &msg->attacked_peer, 3932 &msg->attacked_peer,
3399 sizeof (struct GNUNET_PeerIdentity)); 3933 sizeof (struct GNUNET_PeerIdentity));
3400 /* Set the flag of the attacked peer to valid to avoid problems */ 3934 /* Set the flag of the attacked peer to valid to avoid problems */
3401 if (GNUNET_NO == check_peer_known (&attacked_peer)) 3935 if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3402 { 3936 {
3403 (void) issue_peer_online_check (&attacked_peer); 3937 (void) issue_peer_online_check (sub, &attacked_peer);
3404 } 3938 }
3405 3939
3406 LOG (GNUNET_ERROR_TYPE_DEBUG, 3940 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3408,16 +3942,20 @@ handle_client_act_malicious (void *cls,
3408 GNUNET_i2s (&attacked_peer)); 3942 GNUNET_i2s (&attacked_peer));
3409 3943
3410 /* Substitute do_round () with do_mal_round () */ 3944 /* Substitute do_round () with do_mal_round () */
3411 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3945 if (NULL != sub->do_round_task)
3412 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); 3946 {
3947 /* Probably in shutdown */
3948 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3949 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3950 }
3413 } 3951 }
3414 else if (0 == mal_type) 3952 else if (0 == mal_type)
3415 { /* Stop acting malicious */ 3953 { /* Stop acting malicious */
3416 GNUNET_array_grow (mal_peers, num_mal_peers, 0); 3954 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3417 3955
3418 /* Substitute do_mal_round () with do_round () */ 3956 /* Substitute do_mal_round () with do_round () */
3419 GNUNET_SCHEDULER_cancel (mss->do_round_task); 3957 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3420 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); 3958 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
3421 } 3959 }
3422 else 3960 else
3423 { 3961 {
@@ -3432,6 +3970,8 @@ handle_client_act_malicious (void *cls,
3432 * Send out PUSHes and PULLs maliciously. 3970 * Send out PUSHes and PULLs maliciously.
3433 * 3971 *
3434 * This is executed regylary. 3972 * This is executed regylary.
3973 *
3974 * @param cls Closure - Sub
3435 */ 3975 */
3436static void 3976static void
3437do_mal_round (void *cls) 3977do_mal_round (void *cls)
@@ -3440,12 +3980,12 @@ do_mal_round (void *cls)
3440 uint32_t i; 3980 uint32_t i;
3441 struct GNUNET_TIME_Relative time_next_round; 3981 struct GNUNET_TIME_Relative time_next_round;
3442 struct AttackedPeer *tmp_att_peer; 3982 struct AttackedPeer *tmp_att_peer;
3443 (void) cls; 3983 struct Sub *sub = cls;
3444 3984
3445 LOG (GNUNET_ERROR_TYPE_DEBUG, 3985 LOG (GNUNET_ERROR_TYPE_DEBUG,
3446 "Going to execute next round maliciously type %" PRIu32 ".\n", 3986 "Going to execute next round maliciously type %" PRIu32 ".\n",
3447 mal_type); 3987 mal_type);
3448 mss->do_round_task = NULL; 3988 sub->do_round_task = NULL;
3449 GNUNET_assert (mal_type <= 3); 3989 GNUNET_assert (mal_type <= 3);
3450 /* Do malicious actions */ 3990 /* Do malicious actions */
3451 if (1 == mal_type) 3991 if (1 == mal_type)
@@ -3468,7 +4008,7 @@ do_mal_round (void *cls)
3468 else 4008 else
3469 att_peer_index = att_peer_index->next; 4009 att_peer_index = att_peer_index->next;
3470 4010
3471 send_push (&att_peer_index->peer_id); 4011 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3472 } 4012 }
3473 4013
3474 /* Send PULLs to some peers to learn about additional peers to attack */ 4014 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3480,7 +4020,7 @@ do_mal_round (void *cls)
3480 else 4020 else
3481 att_peer_index = tmp_att_peer->next; 4021 att_peer_index = tmp_att_peer->next;
3482 4022
3483 send_pull_request (&tmp_att_peer->peer_id); 4023 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3484 } 4024 }
3485 } 4025 }
3486 4026
@@ -3491,9 +4031,11 @@ do_mal_round (void *cls)
3491 * Send as many pushes to the attacked peer as possible 4031 * Send as many pushes to the attacked peer as possible
3492 * That is one push per round as it will ignore more. 4032 * That is one push per round as it will ignore more.
3493 */ 4033 */
3494 (void) issue_peer_online_check (&attacked_peer); 4034 (void) issue_peer_online_check (sub, &attacked_peer);
3495 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 4035 if (GNUNET_YES == check_peer_flag (sub->peer_map,
3496 send_push (&attacked_peer); 4036 &attacked_peer,
4037 Peers_ONLINE))
4038 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3497 } 4039 }
3498 4040
3499 4041
@@ -3501,18 +4043,20 @@ do_mal_round (void *cls)
3501 { /* Combined attack */ 4043 { /* Combined attack */
3502 4044
3503 /* Send PUSH to attacked peers */ 4045 /* Send PUSH to attacked peers */
3504 if (GNUNET_YES == check_peer_known (&attacked_peer)) 4046 if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
3505 { 4047 {
3506 (void) issue_peer_online_check (&attacked_peer); 4048 (void) issue_peer_online_check (sub, &attacked_peer);
3507 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE)) 4049 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4050 &attacked_peer,
4051 Peers_ONLINE))
3508 { 4052 {
3509 LOG (GNUNET_ERROR_TYPE_DEBUG, 4053 LOG (GNUNET_ERROR_TYPE_DEBUG,
3510 "Goding to send push to attacked peer (%s)\n", 4054 "Goding to send push to attacked peer (%s)\n",
3511 GNUNET_i2s (&attacked_peer)); 4055 GNUNET_i2s (&attacked_peer));
3512 send_push (&attacked_peer); 4056 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
3513 } 4057 }
3514 } 4058 }
3515 (void) issue_peer_online_check (&attacked_peer); 4059 (void) issue_peer_online_check (sub, &attacked_peer);
3516 4060
3517 /* The maximum of pushes we're going to send this round */ 4061 /* The maximum of pushes we're going to send this round */
3518 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, 4062 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
@@ -3530,7 +4074,7 @@ do_mal_round (void *cls)
3530 else 4074 else
3531 att_peer_index = att_peer_index->next; 4075 att_peer_index = att_peer_index->next;
3532 4076
3533 send_push (&att_peer_index->peer_id); 4077 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
3534 } 4078 }
3535 4079
3536 /* Send PULLs to some peers to learn about additional peers to attack */ 4080 /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3542,26 +4086,27 @@ do_mal_round (void *cls)
3542 else 4086 else
3543 att_peer_index = tmp_att_peer->next; 4087 att_peer_index = tmp_att_peer->next;
3544 4088
3545 send_pull_request (&tmp_att_peer->peer_id); 4089 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
3546 } 4090 }
3547 } 4091 }
3548 4092
3549 /* Schedule next round */ 4093 /* Schedule next round */
3550 time_next_round = compute_rand_delay (mss->round_interval, 2); 4094 time_next_round = compute_rand_delay (sub->round_interval, 2);
3551 4095
3552 //mss->do_round_task = GNUNET_SCHEDULER_add_delayed (mss->round_interval, &do_mal_round, 4096 GNUNET_assert (NULL == sub->do_round_task);
3553 //NULL); 4097 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3554 GNUNET_assert (NULL == mss->do_round_task); 4098 &do_mal_round, sub);
3555 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3556 &do_mal_round, NULL);
3557 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 4099 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3558} 4100}
3559#endif /* ENABLE_MALICIOUS */ 4101#endif /* ENABLE_MALICIOUS */
3560 4102
4103
3561/** 4104/**
3562 * Send out PUSHes and PULLs, possibly update #view, samplers. 4105 * Send out PUSHes and PULLs, possibly update #view, samplers.
3563 * 4106 *
3564 * This is executed regylary. 4107 * This is executed regylary.
4108 *
4109 * @param cls Closure - Sub
3565 */ 4110 */
3566static void 4111static void
3567do_round (void *cls) 4112do_round (void *cls)
@@ -3575,64 +4120,70 @@ do_round (void *cls)
3575 uint32_t second_border; 4120 uint32_t second_border;
3576 struct GNUNET_PeerIdentity peer; 4121 struct GNUNET_PeerIdentity peer;
3577 struct GNUNET_PeerIdentity *update_peer; 4122 struct GNUNET_PeerIdentity *update_peer;
3578 (void) cls; 4123 struct Sub *sub = cls;
3579 4124
4125 sub->num_rounds++;
3580 LOG (GNUNET_ERROR_TYPE_DEBUG, 4126 LOG (GNUNET_ERROR_TYPE_DEBUG,
3581 "Going to execute next round.\n"); 4127 "Going to execute next round.\n");
3582 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO); 4128 if (sub == msub)
3583 mss->do_round_task = NULL; 4129 {
4130 GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4131 }
4132 sub->do_round_task = NULL;
3584 LOG (GNUNET_ERROR_TYPE_DEBUG, 4133 LOG (GNUNET_ERROR_TYPE_DEBUG,
3585 "Printing view:\n"); 4134 "Printing view:\n");
3586 to_file (mss->file_name_view_log, 4135 to_file (sub->file_name_view_log,
3587 "___ new round ___"); 4136 "___ new round ___");
3588 view_array = View_get_as_array (mss->view); 4137 view_array = View_get_as_array (sub->view);
3589 for (i = 0; i < View_size (mss->view); i++) 4138 for (i = 0; i < View_size (sub->view); i++)
3590 { 4139 {
3591 LOG (GNUNET_ERROR_TYPE_DEBUG, 4140 LOG (GNUNET_ERROR_TYPE_DEBUG,
3592 "\t%s\n", GNUNET_i2s (&view_array[i])); 4141 "\t%s\n", GNUNET_i2s (&view_array[i]));
3593 to_file (mss->file_name_view_log, 4142 to_file (sub->file_name_view_log,
3594 "=%s\t(do round)", 4143 "=%s\t(do round)",
3595 GNUNET_i2s_full (&view_array[i])); 4144 GNUNET_i2s_full (&view_array[i]));
3596 } 4145 }
3597 4146
3598 4147
3599 /* Send pushes and pull requests */ 4148 /* Send pushes and pull requests */
3600 if (0 < View_size (mss->view)) 4149 if (0 < View_size (sub->view))
3601 { 4150 {
3602 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4151 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3603 View_size (mss->view)); 4152 View_size (sub->view));
3604 4153
3605 /* Send PUSHes */ 4154 /* Send PUSHes */
3606 a_peers = ceil (alpha * View_size (mss->view)); 4155 a_peers = ceil (alpha * View_size (sub->view));
3607 4156
3608 LOG (GNUNET_ERROR_TYPE_DEBUG, 4157 LOG (GNUNET_ERROR_TYPE_DEBUG,
3609 "Going to send pushes to %u (ceil (%f * %u)) peers.\n", 4158 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3610 a_peers, alpha, View_size (mss->view)); 4159 a_peers, alpha, View_size (sub->view));
3611 for (i = 0; i < a_peers; i++) 4160 for (i = 0; i < a_peers; i++)
3612 { 4161 {
3613 peer = view_array[permut[i]]; 4162 peer = view_array[permut[i]];
3614 // FIXME if this fails schedule/loop this for later 4163 // FIXME if this fails schedule/loop this for later
3615 send_push (&peer); 4164 send_push (get_peer_ctx (sub->peer_map, &peer));
3616 } 4165 }
3617 4166
3618 /* Send PULL requests */ 4167 /* Send PULL requests */
3619 b_peers = ceil (beta * View_size (mss->view)); 4168 b_peers = ceil (beta * View_size (sub->view));
3620 first_border = a_peers; 4169 first_border = a_peers;
3621 second_border = a_peers + b_peers; 4170 second_border = a_peers + b_peers;
3622 if (second_border > View_size (mss->view)) 4171 if (second_border > View_size (sub->view))
3623 { 4172 {
3624 first_border = View_size (mss->view) - b_peers; 4173 first_border = View_size (sub->view) - b_peers;
3625 second_border = View_size (mss->view); 4174 second_border = View_size (sub->view);
3626 } 4175 }
3627 LOG (GNUNET_ERROR_TYPE_DEBUG, 4176 LOG (GNUNET_ERROR_TYPE_DEBUG,
3628 "Going to send pulls to %u (ceil (%f * %u)) peers.\n", 4177 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3629 b_peers, beta, View_size (mss->view)); 4178 b_peers, beta, View_size (sub->view));
3630 for (i = first_border; i < second_border; i++) 4179 for (i = first_border; i < second_border; i++)
3631 { 4180 {
3632 peer = view_array[permut[i]]; 4181 peer = view_array[permut[i]];
3633 if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) 4182 if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4183 &peer,
4184 Peers_PULL_REPLY_PENDING))
3634 { // FIXME if this fails schedule/loop this for later 4185 { // FIXME if this fails schedule/loop this for later
3635 send_pull_request (&peer); 4186 send_pull_request (get_peer_ctx (sub->peer_map, &peer));
3636 } 4187 }
3637 } 4188 }
3638 4189
@@ -3644,10 +4195,9 @@ do_round (void *cls)
3644 /* Update view */ 4195 /* Update view */
3645 /* TODO see how many peers are in push-/pull- list! */ 4196 /* TODO see how many peers are in push-/pull- list! */
3646 4197
3647 if ((CustomPeerMap_size (mss->push_map) <= alpha * mss->view_size_est_need) && 4198 if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
3648 (0 < CustomPeerMap_size (mss->push_map)) && 4199 (0 < CustomPeerMap_size (sub->push_map)) &&
3649 (0 < CustomPeerMap_size (mss->pull_map))) 4200 (0 < CustomPeerMap_size (sub->pull_map)))
3650 //if (GNUNET_YES) // disable blocking temporarily
3651 { /* If conditions for update are fulfilled, update */ 4201 { /* If conditions for update are fulfilled, update */
3652 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n"); 4202 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3653 4203
@@ -3659,23 +4209,23 @@ do_round (void *cls)
3659 peers_to_clean_size = 0; 4209 peers_to_clean_size = 0;
3660 GNUNET_array_grow (peers_to_clean, 4210 GNUNET_array_grow (peers_to_clean,
3661 peers_to_clean_size, 4211 peers_to_clean_size,
3662 View_size (mss->view)); 4212 View_size (sub->view));
3663 GNUNET_memcpy (peers_to_clean, 4213 GNUNET_memcpy (peers_to_clean,
3664 view_array, 4214 view_array,
3665 View_size (mss->view) * sizeof (struct GNUNET_PeerIdentity)); 4215 View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
3666 4216
3667 /* Seems like recreating is the easiest way of emptying the peermap */ 4217 /* Seems like recreating is the easiest way of emptying the peermap */
3668 View_clear (mss->view); 4218 View_clear (sub->view);
3669 to_file (mss->file_name_view_log, 4219 to_file (sub->file_name_view_log,
3670 "--- emptied ---"); 4220 "--- emptied ---");
3671 4221
3672 first_border = GNUNET_MIN (ceil (alpha * mss->view_size_est_need), 4222 first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
3673 CustomPeerMap_size (mss->push_map)); 4223 CustomPeerMap_size (sub->push_map));
3674 second_border = first_border + 4224 second_border = first_border +
3675 GNUNET_MIN (floor (beta * mss->view_size_est_need), 4225 GNUNET_MIN (floor (beta * sub->view_size_est_need),
3676 CustomPeerMap_size (mss->pull_map)); 4226 CustomPeerMap_size (sub->pull_map));
3677 final_size = second_border + 4227 final_size = second_border +
3678 ceil ((1 - (alpha + beta)) * mss->view_size_est_need); 4228 ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
3679 LOG (GNUNET_ERROR_TYPE_DEBUG, 4229 LOG (GNUNET_ERROR_TYPE_DEBUG,
3680 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n", 4230 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3681 first_border, 4231 first_border,
@@ -3684,18 +4234,20 @@ do_round (void *cls)
3684 4234
3685 /* Update view with peers received through PUSHes */ 4235 /* Update view with peers received through PUSHes */
3686 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4236 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3687 CustomPeerMap_size (mss->push_map)); 4237 CustomPeerMap_size (sub->push_map));
3688 for (i = 0; i < first_border; i++) 4238 for (i = 0; i < first_border; i++)
3689 { 4239 {
3690 int inserted; 4240 int inserted;
3691 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->push_map, 4241 inserted = insert_in_view (sub,
4242 CustomPeerMap_get_peer_by_index (sub->push_map,
3692 permut[i])); 4243 permut[i]));
3693 if (GNUNET_OK == inserted) 4244 if (GNUNET_OK == inserted)
3694 { 4245 {
3695 clients_notify_stream_peer (1, 4246 clients_notify_stream_peer (sub,
3696 CustomPeerMap_get_peer_by_index (mss->push_map, permut[i])); 4247 1,
4248 CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
3697 } 4249 }
3698 to_file (mss->file_name_view_log, 4250 to_file (sub->file_name_view_log,
3699 "+%s\t(push list)", 4251 "+%s\t(push list)",
3700 GNUNET_i2s_full (&view_array[i])); 4252 GNUNET_i2s_full (&view_array[i]));
3701 // TODO change the peer_flags accordingly 4253 // TODO change the peer_flags accordingly
@@ -3705,19 +4257,21 @@ do_round (void *cls)
3705 4257
3706 /* Update view with peers received through PULLs */ 4258 /* Update view with peers received through PULLs */
3707 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 4259 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3708 CustomPeerMap_size (mss->pull_map)); 4260 CustomPeerMap_size (sub->pull_map));
3709 for (i = first_border; i < second_border; i++) 4261 for (i = first_border; i < second_border; i++)
3710 { 4262 {
3711 int inserted; 4263 int inserted;
3712 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (mss->pull_map, 4264 inserted = insert_in_view (sub,
3713 permut[i - first_border])); 4265 CustomPeerMap_get_peer_by_index (sub->pull_map,
4266 permut[i - first_border]));
3714 if (GNUNET_OK == inserted) 4267 if (GNUNET_OK == inserted)
3715 { 4268 {
3716 clients_notify_stream_peer (1, 4269 clients_notify_stream_peer (sub,
3717 CustomPeerMap_get_peer_by_index (mss->pull_map, 4270 1,
4271 CustomPeerMap_get_peer_by_index (sub->pull_map,
3718 permut[i - first_border])); 4272 permut[i - first_border]));
3719 } 4273 }
3720 to_file (mss->file_name_view_log, 4274 to_file (sub->file_name_view_log,
3721 "+%s\t(pull list)", 4275 "+%s\t(pull list)",
3722 GNUNET_i2s_full (&view_array[i])); 4276 GNUNET_i2s_full (&view_array[i]));
3723 // TODO change the peer_flags accordingly 4277 // TODO change the peer_flags accordingly
@@ -3726,106 +4280,116 @@ do_round (void *cls)
3726 permut = NULL; 4280 permut = NULL;
3727 4281
3728 /* Update view with peers from history */ 4282 /* Update view with peers from history */
3729 RPS_sampler_get_n_rand_peers (mss->sampler, 4283 RPS_sampler_get_n_rand_peers (sub->sampler,
3730 final_size - second_border, 4284 final_size - second_border,
3731 hist_update, 4285 hist_update,
3732 NULL); 4286 sub);
3733 // TODO change the peer_flags accordingly 4287 // TODO change the peer_flags accordingly
3734 4288
3735 for (i = 0; i < View_size (mss->view); i++) 4289 for (i = 0; i < View_size (sub->view); i++)
3736 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]); 4290 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3737 4291
3738 /* Clean peers that were removed from the view */ 4292 /* Clean peers that were removed from the view */
3739 for (i = 0; i < peers_to_clean_size; i++) 4293 for (i = 0; i < peers_to_clean_size; i++)
3740 { 4294 {
3741 to_file (mss->file_name_view_log, 4295 to_file (sub->file_name_view_log,
3742 "-%s", 4296 "-%s",
3743 GNUNET_i2s_full (&peers_to_clean[i])); 4297 GNUNET_i2s_full (&peers_to_clean[i]));
3744 clean_peer (&peers_to_clean[i]); 4298 clean_peer (sub, &peers_to_clean[i]);
3745 } 4299 }
3746 4300
3747 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); 4301 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3748 clients_notify_view_update(); 4302 clients_notify_view_update (sub);
3749 } else { 4303 } else {
3750 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n"); 4304 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3751 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO); 4305 if (sub == msub)
3752 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4306 {
3753 !(0 >= CustomPeerMap_size (mss->pull_map))) 4307 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3754 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO); 4308 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3755 if (CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4309 !(0 >= CustomPeerMap_size (sub->pull_map)))
3756 (0 >= CustomPeerMap_size (mss->pull_map))) 4310 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3757 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO); 4311 if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
3758 if (0 >= CustomPeerMap_size (mss->push_map) && 4312 (0 >= CustomPeerMap_size (sub->pull_map)))
3759 !(0 >= CustomPeerMap_size (mss->pull_map))) 4313 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3760 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO); 4314 if (0 >= CustomPeerMap_size (sub->push_map) &&
3761 if (0 >= CustomPeerMap_size (mss->push_map) && 4315 !(0 >= CustomPeerMap_size (sub->pull_map)))
3762 (0 >= CustomPeerMap_size (mss->pull_map))) 4316 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3763 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO); 4317 if (0 >= CustomPeerMap_size (sub->push_map) &&
3764 if (0 >= CustomPeerMap_size (mss->pull_map) && 4318 (0 >= CustomPeerMap_size (sub->pull_map)))
3765 CustomPeerMap_size (mss->push_map) > alpha * View_size (mss->view) && 4319 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3766 0 >= CustomPeerMap_size (mss->push_map)) 4320 if (0 >= CustomPeerMap_size (sub->pull_map) &&
3767 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO); 4321 CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
4322 0 >= CustomPeerMap_size (sub->push_map))
4323 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4324 }
3768 } 4325 }
3769 // TODO independent of that also get some peers from CADET_get_peers()? 4326 // TODO independent of that also get some peers from CADET_get_peers()?
3770 GNUNET_STATISTICS_set (stats, 4327 sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
3771 "# peers in push map at end of round", 4328 if (sub == msub)
3772 CustomPeerMap_size (mss->push_map), 4329 {
3773 GNUNET_NO); 4330 GNUNET_STATISTICS_set (stats,
3774 GNUNET_STATISTICS_set (stats, 4331 "# peers in push map at end of round",
3775 "# peers in pull map at end of round", 4332 CustomPeerMap_size (sub->push_map),
3776 CustomPeerMap_size (mss->pull_map), 4333 GNUNET_NO);
3777 GNUNET_NO); 4334 GNUNET_STATISTICS_set (stats,
3778 GNUNET_STATISTICS_set (stats, 4335 "# peers in pull map at end of round",
3779 "# peers in view at end of round", 4336 CustomPeerMap_size (sub->pull_map),
3780 View_size (mss->view), 4337 GNUNET_NO);
3781 GNUNET_NO); 4338 GNUNET_STATISTICS_set (stats,
4339 "# peers in view at end of round",
4340 View_size (sub->view),
4341 GNUNET_NO);
4342 }
3782 4343
3783 LOG (GNUNET_ERROR_TYPE_DEBUG, 4344 LOG (GNUNET_ERROR_TYPE_DEBUG,
3784 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (mss->view%u) = %.2f)\n", 4345 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
3785 CustomPeerMap_size (mss->push_map), 4346 CustomPeerMap_size (sub->push_map),
3786 CustomPeerMap_size (mss->pull_map), 4347 CustomPeerMap_size (sub->pull_map),
3787 alpha, 4348 alpha,
3788 View_size (mss->view), 4349 View_size (sub->view),
3789 alpha * View_size (mss->view)); 4350 alpha * View_size (sub->view));
3790 4351
3791 /* Update samplers */ 4352 /* Update samplers */
3792 for (i = 0; i < CustomPeerMap_size (mss->push_map); i++) 4353 for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
3793 { 4354 {
3794 update_peer = CustomPeerMap_get_peer_by_index (mss->push_map, i); 4355 update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
3795 LOG (GNUNET_ERROR_TYPE_DEBUG, 4356 LOG (GNUNET_ERROR_TYPE_DEBUG,
3796 "Updating with peer %s from push list\n", 4357 "Updating with peer %s from push list\n",
3797 GNUNET_i2s (update_peer)); 4358 GNUNET_i2s (update_peer));
3798 insert_in_sampler (NULL, update_peer); 4359 insert_in_sampler (sub, update_peer);
3799 clean_peer (update_peer); /* This cleans only if it is not in the view */ 4360 clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
3800 } 4361 }
3801 4362
3802 for (i = 0; i < CustomPeerMap_size (mss->pull_map); i++) 4363 for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
3803 { 4364 {
3804 LOG (GNUNET_ERROR_TYPE_DEBUG, 4365 LOG (GNUNET_ERROR_TYPE_DEBUG,
3805 "Updating with peer %s from pull list\n", 4366 "Updating with peer %s from pull list\n",
3806 GNUNET_i2s (CustomPeerMap_get_peer_by_index (mss->pull_map, i))); 4367 GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
3807 insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 4368 insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3808 /* This cleans only if it is not in the view */ 4369 /* This cleans only if it is not in the view */
3809 clean_peer (CustomPeerMap_get_peer_by_index (mss->pull_map, i)); 4370 clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
3810 } 4371 }
3811 4372
3812 4373
3813 /* Empty push/pull lists */ 4374 /* Empty push/pull lists */
3814 CustomPeerMap_clear (mss->push_map); 4375 CustomPeerMap_clear (sub->push_map);
3815 CustomPeerMap_clear (mss->pull_map); 4376 CustomPeerMap_clear (sub->pull_map);
3816 4377
3817 GNUNET_STATISTICS_set (stats, 4378 if (sub == msub)
3818 "view size", 4379 {
3819 View_size(mss->view), 4380 GNUNET_STATISTICS_set (stats,
3820 GNUNET_NO); 4381 "view size",
4382 View_size(sub->view),
4383 GNUNET_NO);
4384 }
3821 4385
3822 struct GNUNET_TIME_Relative time_next_round; 4386 struct GNUNET_TIME_Relative time_next_round;
3823 4387
3824 time_next_round = compute_rand_delay (mss->round_interval, 2); 4388 time_next_round = compute_rand_delay (sub->round_interval, 2);
3825 4389
3826 /* Schedule next round */ 4390 /* Schedule next round */
3827 mss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, 4391 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3828 &do_round, NULL); 4392 &do_round, sub);
3829 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n"); 4393 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3830} 4394}
3831 4395
@@ -3835,16 +4399,25 @@ do_round (void *cls)
3835 * 4399 *
3836 * It is called on every peer(ID) that cadet somehow has contact with. 4400 * It is called on every peer(ID) that cadet somehow has contact with.
3837 * We use those to initialise the sampler. 4401 * We use those to initialise the sampler.
4402 *
4403 * implements #GNUNET_CADET_PeersCB
4404 *
4405 * @param cls Closure - Sub
4406 * @param peer Peer, or NULL on "EOF".
4407 * @param tunnel Do we have a tunnel towards this peer?
4408 * @param n_paths Number of known paths towards this peer.
4409 * @param best_path How long is the best path?
4410 * (0 = unknown, 1 = ourselves, 2 = neighbor)
3838 */ 4411 */
3839void 4412void
3840init_peer_cb (void *cls, 4413init_peer_cb (void *cls,
3841 const struct GNUNET_PeerIdentity *peer, 4414 const struct GNUNET_PeerIdentity *peer,
3842 int tunnel, // "Do we have a tunnel towards this peer?" 4415 int tunnel, /* "Do we have a tunnel towards this peer?" */
3843 unsigned int n_paths, // "Number of known paths towards this peer" 4416 unsigned int n_paths, /* "Number of known paths towards this peer" */
3844 unsigned int best_path) // "How long is the best path? 4417 unsigned int best_path) /* "How long is the best path?
3845 // (0 = unknown, 1 = ourselves, 2 = neighbor)" 4418 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
3846{ 4419{
3847 (void) cls; 4420 struct Sub *sub = cls;
3848 (void) tunnel; 4421 (void) tunnel;
3849 (void) n_paths; 4422 (void) n_paths;
3850 (void) best_path; 4423 (void) best_path;
@@ -3854,16 +4427,17 @@ init_peer_cb (void *cls,
3854 LOG (GNUNET_ERROR_TYPE_DEBUG, 4427 LOG (GNUNET_ERROR_TYPE_DEBUG,
3855 "Got peer_id %s from cadet\n", 4428 "Got peer_id %s from cadet\n",
3856 GNUNET_i2s (peer)); 4429 GNUNET_i2s (peer));
3857 got_peer (peer); 4430 got_peer (sub, peer);
3858 } 4431 }
3859} 4432}
3860 4433
4434
3861/** 4435/**
3862 * @brief Iterator function over stored, valid peers. 4436 * @brief Iterator function over stored, valid peers.
3863 * 4437 *
3864 * We initialise the sampler with those. 4438 * We initialise the sampler with those.
3865 * 4439 *
3866 * @param cls the closure 4440 * @param cls Closure - Sub
3867 * @param peer the peer id 4441 * @param peer the peer id
3868 * @return #GNUNET_YES if we should continue to 4442 * @return #GNUNET_YES if we should continue to
3869 * iterate, 4443 * iterate,
@@ -3873,14 +4447,14 @@ static int
3873valid_peers_iterator (void *cls, 4447valid_peers_iterator (void *cls,
3874 const struct GNUNET_PeerIdentity *peer) 4448 const struct GNUNET_PeerIdentity *peer)
3875{ 4449{
3876 (void) cls; 4450 struct Sub *sub = cls;
3877 4451
3878 if (NULL != peer) 4452 if (NULL != peer)
3879 { 4453 {
3880 LOG (GNUNET_ERROR_TYPE_DEBUG, 4454 LOG (GNUNET_ERROR_TYPE_DEBUG,
3881 "Got stored, valid peer %s\n", 4455 "Got stored, valid peer %s\n",
3882 GNUNET_i2s (peer)); 4456 GNUNET_i2s (peer));
3883 got_peer (peer); 4457 got_peer (sub, peer);
3884 } 4458 }
3885 return GNUNET_YES; 4459 return GNUNET_YES;
3886} 4460}
@@ -3889,7 +4463,7 @@ valid_peers_iterator (void *cls,
3889/** 4463/**
3890 * Iterator over peers from peerinfo. 4464 * Iterator over peers from peerinfo.
3891 * 4465 *
3892 * @param cls closure 4466 * @param cls Closure - Sub
3893 * @param peer id of the peer, NULL for last call 4467 * @param peer id of the peer, NULL for last call
3894 * @param hello hello message for the peer (can be NULL) 4468 * @param hello hello message for the peer (can be NULL)
3895 * @param error message 4469 * @param error message
@@ -3900,7 +4474,7 @@ process_peerinfo_peers (void *cls,
3900 const struct GNUNET_HELLO_Message *hello, 4474 const struct GNUNET_HELLO_Message *hello,
3901 const char *err_msg) 4475 const char *err_msg)
3902{ 4476{
3903 (void) cls; 4477 struct Sub *sub = cls;
3904 (void) hello; 4478 (void) hello;
3905 (void) err_msg; 4479 (void) err_msg;
3906 4480
@@ -3909,7 +4483,7 @@ process_peerinfo_peers (void *cls,
3909 LOG (GNUNET_ERROR_TYPE_DEBUG, 4483 LOG (GNUNET_ERROR_TYPE_DEBUG,
3910 "Got peer_id %s from peerinfo\n", 4484 "Got peer_id %s from peerinfo\n",
3911 GNUNET_i2s (peer)); 4485 GNUNET_i2s (peer));
3912 got_peer (peer); 4486 got_peer (sub, peer);
3913 } 4487 }
3914} 4488}
3915 4489
@@ -3917,13 +4491,13 @@ process_peerinfo_peers (void *cls,
3917/** 4491/**
3918 * Task run during shutdown. 4492 * Task run during shutdown.
3919 * 4493 *
3920 * @param cls unused 4494 * @param cls Closure - unused
3921 */ 4495 */
3922static void 4496static void
3923shutdown_task (void *cls) 4497shutdown_task (void *cls)
3924{ 4498{
3925 struct ClientContext *client_ctx;
3926 (void) cls; 4499 (void) cls;
4500 struct ClientContext *client_ctx;
3927 4501
3928 LOG (GNUNET_ERROR_TYPE_DEBUG, 4502 LOG (GNUNET_ERROR_TYPE_DEBUG,
3929 "RPS service is going down\n"); 4503 "RPS service is going down\n");
@@ -3935,42 +4509,40 @@ shutdown_task (void *cls)
3935 { 4509 {
3936 destroy_cli_ctx (client_ctx); 4510 destroy_cli_ctx (client_ctx);
3937 } 4511 }
4512 if (NULL != msub)
4513 {
4514 destroy_sub (msub);
4515 msub = NULL;
4516 }
4517
4518 /* Disconnect from other services */
3938 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle); 4519 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3939 GNUNET_PEERINFO_disconnect (peerinfo_handle); 4520 GNUNET_PEERINFO_disconnect (peerinfo_handle);
3940 peerinfo_handle = NULL; 4521 peerinfo_handle = NULL;
3941 if (NULL != mss->do_round_task) 4522 GNUNET_NSE_disconnect (nse);
4523 if (NULL != map_single_hop)
3942 { 4524 {
3943 GNUNET_SCHEDULER_cancel (mss->do_round_task); 4525 /* core_init was called - core was initialised */
3944 mss->do_round_task = NULL; 4526 /* disconnect first, so no callback tries to access missing peermap */
4527 GNUNET_CORE_disconnect (core_handle);
4528 core_handle = NULL;
4529 GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4530 map_single_hop = NULL;
3945 } 4531 }
3946 4532
3947 peers_terminate ();
3948
3949 GNUNET_NSE_disconnect (nse);
3950 RPS_sampler_destroy (mss->sampler);
3951 GNUNET_CADET_close_port (mss->cadet_port);
3952 GNUNET_CADET_disconnect (mss->cadet_handle);
3953 mss->cadet_handle = NULL;
3954 View_destroy (mss->view);
3955 CustomPeerMap_destroy (mss->push_map);
3956 CustomPeerMap_destroy (mss->pull_map);
3957 if (NULL != stats) 4533 if (NULL != stats)
3958 { 4534 {
3959 GNUNET_STATISTICS_destroy (stats, 4535 GNUNET_STATISTICS_destroy (stats,
3960 GNUNET_NO); 4536 GNUNET_NO);
3961 stats = NULL; 4537 stats = NULL;
3962 } 4538 }
4539 GNUNET_CADET_disconnect (cadet_handle);
4540 cadet_handle = NULL;
3963#ifdef ENABLE_MALICIOUS 4541#ifdef ENABLE_MALICIOUS
3964 struct AttackedPeer *tmp_att_peer; 4542 struct AttackedPeer *tmp_att_peer;
3965 /* it is ok to free this const during shutdown: */
3966 GNUNET_free ((char *) mss->file_name_view_log);
3967#ifdef TO_FILE
3968 GNUNET_free ((char *) mss->file_name_observed_log);
3969 GNUNET_CONTAINER_multipeermap_destroy (mss->observed_unique_peers);
3970#endif /* TO_FILE */
3971 GNUNET_array_grow (mal_peers, 4543 GNUNET_array_grow (mal_peers,
3972 num_mal_peers, 4544 num_mal_peers,
3973 0); 4545 0);
3974 if (NULL != mal_peer_set) 4546 if (NULL != mal_peer_set)
3975 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set); 4547 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3976 if (NULL != att_peer_set) 4548 if (NULL != att_peer_set)
@@ -3979,8 +4551,8 @@ shutdown_task (void *cls)
3979 { 4551 {
3980 tmp_att_peer = att_peers_head; 4552 tmp_att_peer = att_peers_head;
3981 GNUNET_CONTAINER_DLL_remove (att_peers_head, 4553 GNUNET_CONTAINER_DLL_remove (att_peers_head,
3982 att_peers_tail, 4554 att_peers_tail,
3983 tmp_att_peer); 4555 tmp_att_peer);
3984 GNUNET_free (tmp_att_peer); 4556 GNUNET_free (tmp_att_peer);
3985 } 4557 }
3986#endif /* ENABLE_MALICIOUS */ 4558#endif /* ENABLE_MALICIOUS */
@@ -3990,7 +4562,7 @@ shutdown_task (void *cls)
3990/** 4562/**
3991 * Handle client connecting to the service. 4563 * Handle client connecting to the service.
3992 * 4564 *
3993 * @param cls NULL 4565 * @param cls unused
3994 * @param client the new client 4566 * @param client the new client
3995 * @param mq the message queue of @a client 4567 * @param mq the message queue of @a client
3996 * @return @a client 4568 * @return @a client
@@ -4060,20 +4632,21 @@ run (void *cls,
4060 const struct GNUNET_CONFIGURATION_Handle *c, 4632 const struct GNUNET_CONFIGURATION_Handle *c,
4061 struct GNUNET_SERVICE_Handle *service) 4633 struct GNUNET_SERVICE_Handle *service)
4062{ 4634{
4063 char *fn_valid_peers;
4064 struct GNUNET_TIME_Relative round_interval; 4635 struct GNUNET_TIME_Relative round_interval;
4065 long long unsigned int sampler_size; 4636 long long unsigned int sampler_size;
4637 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4638 struct GNUNET_HashCode hash;
4066 4639
4067 (void) cls; 4640 (void) cls;
4068 (void) service; 4641 (void) service;
4069 4642
4070 GNUNET_log_setup ("rps", 4643 GNUNET_log_setup ("rps",
4071 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), 4644 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4072 NULL); 4645 NULL);
4073 cfg = c; 4646 cfg = c;
4074 /* Get own ID */ 4647 /* Get own ID */
4075 GNUNET_CRYPTO_get_peer_identity (cfg, 4648 GNUNET_CRYPTO_get_peer_identity (cfg,
4076 &own_identity); // TODO check return value 4649 &own_identity); // TODO check return value
4077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 4650 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4078 "STARTING SERVICE (rps) for peer [%s]\n", 4651 "STARTING SERVICE (rps) for peer [%s]\n",
4079 GNUNET_i2s (&own_identity)); 4652 GNUNET_i2s (&own_identity));
@@ -4085,9 +4658,9 @@ run (void *cls,
4085 /* Get time interval from the configuration */ 4658 /* Get time interval from the configuration */
4086 if (GNUNET_OK != 4659 if (GNUNET_OK !=
4087 GNUNET_CONFIGURATION_get_value_time (cfg, 4660 GNUNET_CONFIGURATION_get_value_time (cfg,
4088 "RPS", 4661 "RPS",
4089 "ROUNDINTERVAL", 4662 "ROUNDINTERVAL",
4090 &round_interval)) 4663 &round_interval))
4091 { 4664 {
4092 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4665 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4093 "RPS", "ROUNDINTERVAL"); 4666 "RPS", "ROUNDINTERVAL");
@@ -4108,54 +4681,51 @@ run (void *cls,
4108 return; 4681 return;
4109 } 4682 }
4110 4683
4111 if (GNUNET_OK != 4684 cadet_handle = GNUNET_CADET_connect (cfg);
4112 GNUNET_CONFIGURATION_get_value_filename (cfg, 4685 GNUNET_assert (NULL != cadet_handle);
4113 "rps", 4686 core_handle = GNUNET_CORE_connect (cfg,
4114 "FILENAME_VALID_PEERS", 4687 NULL, /* cls */
4115 &fn_valid_peers)) 4688 core_init, /* init */
4116 { 4689 core_connects, /* connects */
4117 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 4690 core_disconnects, /* disconnects */
4118 "rps", "FILENAME_VALID_PEERS"); 4691 NULL); /* handlers */
4119 } 4692 GNUNET_assert (NULL != core_handle);
4120
4121
4122 /* connect to NSE */
4123 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4124 4693
4125 4694
4126 alpha = 0.45; 4695 alpha = 0.45;
4127 beta = 0.45; 4696 beta = 0.45;
4128 4697
4129 4698
4130 /* Set up main SubSampler */ 4699 /* Set up main Sub */
4131 mss = new_subsampler ("", /* this is the main sampler - no shared value */ 4700 GNUNET_CRYPTO_hash (hash_port_string,
4132 sampler_size, /* Will be overwritten by config */ 4701 strlen (hash_port_string),
4133 round_interval); 4702 &hash);
4703 msub = new_sub (&hash,
4704 sampler_size, /* Will be overwritten by config */
4705 round_interval);
4134 4706
4135 4707
4136 peerinfo_handle = GNUNET_PEERINFO_connect (cfg); 4708 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4137 4709
4710 /* connect to NSE */
4711 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4712
4138 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); 4713 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4139 //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, NULL); 4714 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4140 // TODO send push/pull to each of those peers? 4715 // TODO send push/pull to each of those peers?
4141 // TODO read stored valid peers from last run
4142 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n"); 4716 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4143 restore_valid_peers (); 4717 restore_valid_peers (msub);
4144 get_valid_peers (valid_peers_iterator, NULL); 4718 get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4145 4719
4146 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg, 4720 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4147 GNUNET_NO, 4721 GNUNET_NO,
4148 process_peerinfo_peers, 4722 process_peerinfo_peers,
4149 NULL); 4723 msub);
4150 4724
4151 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n"); 4725 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4152 4726
4153 mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4154 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4155
4156 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 4727 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4157 stats = GNUNET_STATISTICS_create ("rps", cfg); 4728 stats = GNUNET_STATISTICS_create ("rps", cfg);
4158
4159} 4729}
4160 4730
4161 4731
@@ -4195,6 +4765,14 @@ GNUNET_SERVICE_MAIN
4195 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL, 4765 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4196 struct GNUNET_MessageHeader, 4766 struct GNUNET_MessageHeader,
4197 NULL), 4767 NULL),
4768 GNUNET_MQ_hd_fixed_size (client_start_sub,
4769 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4770 struct GNUNET_RPS_CS_SubStartMessage,
4771 NULL),
4772 GNUNET_MQ_hd_fixed_size (client_stop_sub,
4773 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4774 struct GNUNET_RPS_CS_SubStopMessage,
4775 NULL),
4198 GNUNET_MQ_handler_end()); 4776 GNUNET_MQ_handler_end());
4199 4777
4200/* end of gnunet-service-rps.c */ 4778/* end of gnunet-service-rps.c */
diff --git a/src/rps/gnunet-service-rps_view.c b/src/rps/gnunet-service-rps_view.c
index 3af858e60..17fec559d 100644
--- a/src/rps/gnunet-service-rps_view.c
+++ b/src/rps/gnunet-service-rps_view.c
@@ -282,7 +282,9 @@ View_destroy (struct View *view)
282{ 282{
283 View_clear (view); 283 View_clear (view);
284 GNUNET_free (view->array); 284 GNUNET_free (view->array);
285 view->array = NULL;
285 GNUNET_CONTAINER_multipeermap_destroy (view->mpm); 286 GNUNET_CONTAINER_multipeermap_destroy (view->mpm);
287 GNUNET_free (view);
286} 288}
287 289
288/* end of gnunet-service-rps_view.c */ 290/* end of gnunet-service-rps_view.c */
diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c
index d004c06a5..5dbb5315a 100644
--- a/src/rps/rps-sampler_common.c
+++ b/src/rps/rps-sampler_common.c
@@ -387,6 +387,10 @@ check_n_peers_ready (void *cls,
387{ 387{
388 struct RPS_SamplerRequestHandle *req_handle = cls; 388 struct RPS_SamplerRequestHandle *req_handle = cls;
389 (void) id; 389 (void) id;
390 RPS_sampler_n_rand_peers_ready_cb tmp_cb;
391 struct GNUNET_PeerIdentity *peers;
392 uint32_t num_peers;
393 void *cb_cls;
390 394
391 req_handle->cur_num_peers++; 395 req_handle->cur_num_peers++;
392 LOG (GNUNET_ERROR_TYPE_DEBUG, 396 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -400,9 +404,20 @@ check_n_peers_ready (void *cls,
400 LOG (GNUNET_ERROR_TYPE_DEBUG, 404 LOG (GNUNET_ERROR_TYPE_DEBUG,
401 "returning %" PRIX32 " peers to the client\n", 405 "returning %" PRIX32 " peers to the client\n",
402 req_handle->num_peers); 406 req_handle->num_peers);
403 req_handle->callback (req_handle->ids, req_handle->num_peers, req_handle->cls);
404 407
408 /* Copy pointers and peers temporarily as they
409 * might be deleted from within the callback */
410 tmp_cb = req_handle->callback;
411 num_peers = req_handle->num_peers;
412 peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
413 GNUNET_memcpy (peers,
414 req_handle->ids,
415 num_peers * sizeof (struct GNUNET_PeerIdentity));
416 cb_cls = req_handle->cls;
405 RPS_sampler_request_cancel (req_handle); 417 RPS_sampler_request_cancel (req_handle);
418 req_handle = NULL;
419 tmp_cb (peers, num_peers, cb_cls);
420 GNUNET_free (peers);
406 } 421 }
407} 422}
408 423
@@ -493,10 +508,12 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
493 req_handle->sampler->notify_ctx_tail, 508 req_handle->sampler->notify_ctx_tail,
494 i->notify_ctx); 509 i->notify_ctx);
495 GNUNET_free (i->notify_ctx); 510 GNUNET_free (i->notify_ctx);
511 i->notify_ctx = NULL;
496 } 512 }
497 GNUNET_free (i); 513 GNUNET_free (i);
498 } 514 }
499 GNUNET_free (req_handle->ids); 515 GNUNET_free (req_handle->ids);
516 req_handle->ids = NULL;
500 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, 517 GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
501 req_handle->sampler->req_handle_tail, 518 req_handle->sampler->req_handle_tail,
502 req_handle); 519 req_handle);
diff --git a/src/rps/rps-test_util.c b/src/rps/rps-test_util.c
index 271c96648..a6ea033cd 100644
--- a/src/rps/rps-test_util.c
+++ b/src/rps/rps-test_util.c
@@ -26,6 +26,7 @@
26 26
27#include "platform.h" 27#include "platform.h"
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "rps-test_util.h"
29 30
30#include <inttypes.h> 31#include <inttypes.h>
31 32
@@ -44,7 +45,6 @@
44 45
45#ifdef TO_FILE 46#ifdef TO_FILE
46 47
47#define min(x,y) ((x) > (y) ? (y) : (x))
48 48
49/** 49/**
50 * @brief buffer for storing the unaligned bits for the next write 50 * @brief buffer for storing the unaligned bits for the next write
@@ -56,76 +56,6 @@ static char buf_unaligned;
56 */ 56 */
57static unsigned num_bits_buf_unaligned; 57static unsigned num_bits_buf_unaligned;
58 58
59void
60to_file_ (const char *file_name, char *line)
61{
62 struct GNUNET_DISK_FileHandle *f;
63 char output_buffer[512];
64 size_t output_buffer_size = 512;
65 char *output_buffer_p;
66 //size_t size;
67 int size;
68 int size2;
69
70
71 if (NULL == (f = GNUNET_DISK_file_open (file_name,
72 GNUNET_DISK_OPEN_APPEND |
73 GNUNET_DISK_OPEN_WRITE |
74 GNUNET_DISK_OPEN_CREATE,
75 GNUNET_DISK_PERM_USER_READ |
76 GNUNET_DISK_PERM_USER_WRITE |
77 GNUNET_DISK_PERM_GROUP_READ |
78 GNUNET_DISK_PERM_OTHER_READ)))
79 {
80 LOG (GNUNET_ERROR_TYPE_WARNING,
81 "Not able to open file %s\n",
82 file_name);
83 return;
84 }
85 output_buffer_size = strlen (line) + 18;
86 if (512 < output_buffer_size)
87 {
88 output_buffer_p = GNUNET_malloc ((output_buffer_size) * sizeof (char));
89 } else {
90 output_buffer_p = &output_buffer[0];
91 }
92 size = GNUNET_snprintf (output_buffer_p,
93 output_buffer_size,
94 "%llu %s\n",
95 (GNUNET_TIME_absolute_get ().abs_value_us) / 1000000, // microsec -> sec
96 line);
97 if (0 > size)
98 {
99 LOG (GNUNET_ERROR_TYPE_WARNING,
100 "Failed to write string to buffer (size: %i)\n",
101 size);
102 return;
103 }
104
105 size2 = GNUNET_DISK_file_write (f, output_buffer_p, size);
106 if (size != size2)
107 {
108 LOG (GNUNET_ERROR_TYPE_WARNING,
109 "Unable to write to file! (Size: %u, size2: %u)\n",
110 size,
111 size2);
112
113 if (GNUNET_YES != GNUNET_DISK_file_close (f))
114 LOG (GNUNET_ERROR_TYPE_WARNING,
115 "Unable to close file\n");
116
117 return;
118 }
119
120 if (512 < output_buffer_size)
121 {
122 GNUNET_free (output_buffer_p);
123 }
124
125 if (GNUNET_YES != GNUNET_DISK_file_close (f))
126 LOG (GNUNET_ERROR_TYPE_WARNING,
127 "Unable to close file\n");
128}
129 59
130void 60void
131to_file_raw (const char *file_name, const char *buf, size_t size_buf) 61to_file_raw (const char *file_name, const char *buf, size_t size_buf)
@@ -270,7 +200,7 @@ to_file_raw_unaligned (const char *file_name,
270 LOG (GNUNET_ERROR_TYPE_DEBUG, 200 LOG (GNUNET_ERROR_TYPE_DEBUG,
271 "number of bits needed to align unaligned bit: %u\n", 201 "number of bits needed to align unaligned bit: %u\n",
272 num_bits_to_align); 202 num_bits_to_align);
273 num_bits_to_move = min (num_bits_to_align, num_bits_needed_iter); 203 num_bits_to_move = GNUNET_MIN (num_bits_to_align, num_bits_needed_iter);
274 LOG (GNUNET_ERROR_TYPE_DEBUG, 204 LOG (GNUNET_ERROR_TYPE_DEBUG,
275 "number of bits of new byte to move: %u\n", 205 "number of bits of new byte to move: %u\n",
276 num_bits_to_move); 206 num_bits_to_move);
@@ -456,7 +386,7 @@ static int ensure_folder_exist (void)
456 return GNUNET_YES; 386 return GNUNET_YES;
457} 387}
458 388
459const char * 389char *
460store_prefix_file_name (const struct GNUNET_PeerIdentity *peer, 390store_prefix_file_name (const struct GNUNET_PeerIdentity *peer,
461 const char *prefix) 391 const char *prefix)
462{ 392{
diff --git a/src/rps/rps-test_util.h b/src/rps/rps-test_util.h
index def52a0ac..a806f11cd 100644
--- a/src/rps/rps-test_util.h
+++ b/src/rps/rps-test_util.h
@@ -26,9 +26,8 @@
26#ifndef RPS_TEST_UTIL_H 26#ifndef RPS_TEST_UTIL_H
27#define RPS_TEST_UTIL_H 27#define RPS_TEST_UTIL_H
28 28
29#define TO_FILE 1
29 30
30void
31to_file_ (const char *file_name, char *line);
32 31
33char * 32char *
34auth_key_to_string (struct GNUNET_CRYPTO_AuthKey auth_key); 33auth_key_to_string (struct GNUNET_CRYPTO_AuthKey auth_key);
@@ -43,32 +42,42 @@ create_file (const char *name);
43 * This function is used to facilitate writing important information to disk 42 * This function is used to facilitate writing important information to disk
44 */ 43 */
45#ifdef TO_FILE 44#ifdef TO_FILE
46# define to_file(file_name, ...) do {char tmp_buf[512];\ 45# define to_file(file_name, ...) do {char tmp_buf[512] = "";\
47 int size;\ 46 int size;\
48 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\ 47 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
49 if (0 > size)\ 48 if (0 > size)\
50 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\ 49 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
51 "Failed to create tmp_buf\n");\ 50 "Failed to create tmp_buf\n");\
52 else\ 51 else\
53 to_file_(file_name,tmp_buf);\ 52 GNUNET_DISK_fn_write(file_name, tmp_buf, strnlen(tmp_buf, 512),\
53 GNUNET_DISK_PERM_USER_READ |\
54 GNUNET_DISK_PERM_USER_WRITE |\
55 GNUNET_DISK_PERM_GROUP_READ |\
56 GNUNET_DISK_PERM_OTHER_READ);\
54 } while (0); 57 } while (0);
55# define to_file_w_len(file_name, len, ...) do {char tmp_buf[len];\ 58
59#define to_file_w_len(file_name, len, ...) do {char tmp_buf[len];\
56 int size;\ 60 int size;\
61 memset (tmp_buf, 0, len);\
57 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\ 62 size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
58 if (0 > size)\ 63 if (0 > size)\
59 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\ 64 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,\
60 "Failed to create tmp_buf\n");\ 65 "Failed to create tmp_buf\n");\
61 else\ 66 else\
62 to_file_(file_name,tmp_buf);\ 67 GNUNET_DISK_fn_write(file_name, tmp_buf, strnlen(tmp_buf, len), \
68 GNUNET_DISK_PERM_USER_READ |\
69 GNUNET_DISK_PERM_USER_WRITE |\
70 GNUNET_DISK_PERM_GROUP_READ |\
71 GNUNET_DISK_PERM_OTHER_READ);\
63 } while (0); 72 } while (0);
64#else /* TO_FILE */ 73#else /* TO_FILE */
65# define to_file(file_name, ...) 74# define to_file(file_name, ...)
66# define to_file_w_len(file_name, len, ...) 75# define to_file_w_len(file_name, len, ...)
67#endif /* TO_FILE */ 76#endif /* TO_FILE */
68 77
69const char * 78char *
70store_prefix_file_name (const struct GNUNET_PeerIdentity *peer, 79store_prefix_file_name (const struct GNUNET_PeerIdentity *peer,
71 const char *prefix); 80 const char *prefix);
72 81
73void 82void
74to_file_raw (const char *file_name, const char *buf, size_t size_buf); 83to_file_raw (const char *file_name, const char *buf, size_t size_buf);
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 915524f88..616eabdac 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -114,6 +114,50 @@ struct GNUNET_RPS_CS_ActMaliciousMessage
114#endif /* ENABLE_MALICIOUS */ 114#endif /* ENABLE_MALICIOUS */
115 115
116 116
117/**
118 * Message from client to service telling it to start a new sub
119 */
120struct GNUNET_RPS_CS_SubStartMessage
121{
122 /**
123 * Header including size and type in NBO
124 */
125 struct GNUNET_MessageHeader header;
126
127 /**
128 * For alignment.
129 */
130 uint32_t reserved GNUNET_PACKED;
131
132 /**
133 * Mean interval between two rounds
134 */
135 struct GNUNET_TIME_RelativeNBO round_interval;
136
137 /**
138 * Length of the shared value represented as string.
139 */
140 struct GNUNET_HashCode hash GNUNET_PACKED;
141};
142
143
144/**
145 * Message from client to service telling it to stop a new sub
146 */
147struct GNUNET_RPS_CS_SubStopMessage
148{
149 /**
150 * Header including size and type in NBO
151 */
152 struct GNUNET_MessageHeader header;
153
154 /**
155 * Length of the shared value represented as string.
156 */
157 struct GNUNET_HashCode hash GNUNET_PACKED;
158};
159
160
117/* Debug messages */ 161/* Debug messages */
118 162
119/** 163/**
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 7d0674aff..cfab06f17 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -42,11 +42,6 @@ struct GNUNET_RPS_StreamRequestHandle
42 struct GNUNET_RPS_Handle *rps_handle; 42 struct GNUNET_RPS_Handle *rps_handle;
43 43
44 /** 44 /**
45 * The number of requested peers.
46 */
47 uint32_t num_peers_left;
48
49 /**
50 * The callback to be called when we receive an answer. 45 * The callback to be called when we receive an answer.
51 */ 46 */
52 GNUNET_RPS_NotifyReadyCB ready_cb; 47 GNUNET_RPS_NotifyReadyCB ready_cb;
@@ -188,7 +183,6 @@ struct cb_cls_pack
188 */ 183 */
189static struct GNUNET_RPS_StreamRequestHandle * 184static struct GNUNET_RPS_StreamRequestHandle *
190new_stream_request (struct GNUNET_RPS_Handle *rps_handle, 185new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
191 uint64_t num_peers,
192 GNUNET_RPS_NotifyReadyCB ready_cb, 186 GNUNET_RPS_NotifyReadyCB ready_cb,
193 void *cls) 187 void *cls)
194{ 188{
@@ -197,7 +191,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
197 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); 191 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
198 192
199 srh->rps_handle = rps_handle; 193 srh->rps_handle = rps_handle;
200 srh->num_peers_left = num_peers;
201 srh->ready_cb = ready_cb; 194 srh->ready_cb = ready_cb;
202 srh->ready_cb_cls = cls; 195 srh->ready_cb_cls = cls;
203 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, 196 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
@@ -244,11 +237,14 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
244{ 237{
245 struct GNUNET_RPS_Request_Handle *rh = cls; 238 struct GNUNET_RPS_Request_Handle *rh = cls;
246 239
240 rh->sampler_rh = NULL;
247 rh->ready_cb (rh->ready_cb_cls, 241 rh->ready_cb (rh->ready_cb_cls,
248 num_peers, 242 num_peers,
249 peers); 243 peers);
250 // TODO cleanup, sampler, rh, cancel stuff 244 GNUNET_RPS_stream_cancel (rh->srh);
251 // TODO screw this function. We can give the cb,cls directly to the sampler. 245 rh->srh = NULL;
246 RPS_sampler_destroy (rh->sampler);
247 rh->sampler = NULL;
252} 248}
253 249
254 250
@@ -267,6 +263,9 @@ collect_peers_cb (void *cls,
267{ 263{
268 struct GNUNET_RPS_Request_Handle *rh = cls; 264 struct GNUNET_RPS_Request_Handle *rh = cls;
269 265
266 LOG (GNUNET_ERROR_TYPE_DEBUG,
267 "Service sent %" PRIu64 " peers from stream\n",
268 num_peers);
270 for (uint64_t i = 0; i < num_peers; i++) 269 for (uint64_t i = 0; i < num_peers; i++)
271 { 270 {
272 RPS_sampler_update (rh->sampler, &peers[i]); 271 RPS_sampler_update (rh->sampler, &peers[i]);
@@ -274,44 +273,6 @@ collect_peers_cb (void *cls,
274} 273}
275 274
276 275
277/**
278 * @brief Create new request handle
279 *
280 * @param rps_handle Handle to the service
281 * @param num_requests Number of requests
282 * @param ready_cb Callback
283 * @param cls Closure
284 *
285 * @return The newly created request handle
286 */
287static struct GNUNET_RPS_Request_Handle *
288new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
289 uint64_t num_requests,
290 GNUNET_RPS_NotifyReadyCB ready_cb,
291 void *cls)
292{
293 struct GNUNET_RPS_Request_Handle *rh;
294
295 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
296 rh->rps_handle = rps_handle;
297 rh->num_requests = num_requests;
298 rh->sampler = RPS_sampler_mod_init (num_requests,
299 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
300 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
301 num_requests,
302 peers_ready_cb,
303 rh);
304 rh->srh = GNUNET_RPS_stream_request (rps_handle,
305 0, /* infinite updates */
306 collect_peers_cb,
307 rh); /* cls */
308 rh->ready_cb = ready_cb;
309 rh->ready_cb_cls = cls;
310
311 return rh;
312}
313
314
315/* Get internals for debugging/profiling purposes */ 276/* Get internals for debugging/profiling purposes */
316 277
317/** 278/**
@@ -362,14 +323,11 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
362 * Request biased stream of peers that are being put into the sampler 323 * Request biased stream of peers that are being put into the sampler
363 * 324 *
364 * @param rps_handle handle to the rps service 325 * @param rps_handle handle to the rps service
365 * @param num_req_peers number of peers we want to receive
366 * (0 for infinite updates)
367 * @param cls a closure that will be given to the callback 326 * @param cls a closure that will be given to the callback
368 * @param ready_cb the callback called when the peers are available 327 * @param ready_cb the callback called when the peers are available
369 */ 328 */
370struct GNUNET_RPS_StreamRequestHandle * 329struct GNUNET_RPS_StreamRequestHandle *
371GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, 330GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
372 uint32_t num_peers,
373 GNUNET_RPS_NotifyReadyCB stream_input_cb, 331 GNUNET_RPS_NotifyReadyCB stream_input_cb,
374 void *cls) 332 void *cls)
375{ 333{
@@ -378,12 +336,9 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
378 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; 336 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
379 337
380 srh = new_stream_request (rps_handle, 338 srh = new_stream_request (rps_handle,
381 num_peers, /* num requests */
382 stream_input_cb, 339 stream_input_cb,
383 cls); 340 cls);
384 LOG (GNUNET_ERROR_TYPE_DEBUG, 341 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
385 "Client requests %" PRIu32 " biased stream updates\n",
386 num_peers);
387 342
388 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); 343 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
389 GNUNET_MQ_send (rps_handle->mq, ev); 344 GNUNET_MQ_send (rps_handle->mq, ev);
@@ -470,6 +425,7 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
470{ 425{
471 struct GNUNET_RPS_Handle *rps_handle; 426 struct GNUNET_RPS_Handle *rps_handle;
472 427
428 GNUNET_assert (NULL != srh);
473 rps_handle = srh->rps_handle; 429 rps_handle = srh->rps_handle;
474 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, 430 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
475 rps_handle->stream_requests_tail, 431 rps_handle->stream_requests_tail,
@@ -521,65 +477,25 @@ handle_stream_input (void *cls,
521{ 477{
522 struct GNUNET_RPS_Handle *h = cls; 478 struct GNUNET_RPS_Handle *h = cls;
523 const struct GNUNET_PeerIdentity *peers; 479 const struct GNUNET_PeerIdentity *peers;
524 /* The following two pointers are used to prevent that new handles are
525 * inserted into the DLL, that is currently iterated over, from within a call
526 * to that handler_cb, are executed and in turn again add themselves to the
527 * iterated DLL infinitely */
528 struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
529 struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
530 uint64_t num_peers; 480 uint64_t num_peers;
531 uint64_t num_peers_return; 481 struct GNUNET_RPS_StreamRequestHandle *srh_iter;
482 struct GNUNET_RPS_StreamRequestHandle *srh_next;
532 483
533 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 484 peers = (struct GNUNET_PeerIdentity *) &msg[1];
534 num_peers = ntohl (msg->num_peers); 485 num_peers = ntohl (msg->num_peers);
535 LOG (GNUNET_ERROR_TYPE_DEBUG, 486 LOG (GNUNET_ERROR_TYPE_DEBUG,
536 "Received %" PRIu64 " peer(s) from stream input.\n", 487 "Received %" PRIu64 " peer(s) from stream input.\n",
537 num_peers); 488 num_peers);
538 srh_head_tmp = h->stream_requests_head; 489 srh_iter = h->stream_requests_head;
539 srh_tail_tmp = h->stream_requests_tail; 490 while (NULL != srh_iter)
540 h->stream_requests_head = NULL;
541 h->stream_requests_tail = NULL;
542 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
543 NULL != srh_iter;
544 srh_iter = srh_iter->next)
545 { 491 {
546 LOG (GNUNET_ERROR_TYPE_DEBUG, 492 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
547 "Calling srh - left: %" PRIu64 "\n", 493 /* Store next pointer - srh might be removed/freed in callback */
548 srh_iter->num_peers_left); 494 srh_next = srh_iter->next;
549 if (0 == srh_iter->num_peers_left) /* infinite updates */
550 {
551 num_peers_return = num_peers;
552 }
553 else if (num_peers > srh_iter->num_peers_left)
554 {
555 num_peers_return = num_peers - srh_iter->num_peers_left;
556 }
557 else /* num_peers <= srh_iter->num_peers_left */
558 {
559 num_peers_return = srh_iter->num_peers_left - num_peers;
560 }
561 srh_iter->ready_cb (srh_iter->ready_cb_cls, 495 srh_iter->ready_cb (srh_iter->ready_cb_cls,
562 num_peers_return, 496 num_peers,
563 peers); 497 peers);
564 if (0 == srh_iter->num_peers_left) ; 498 srh_iter = srh_next;
565 else if (num_peers_return >= srh_iter->num_peers_left)
566 {
567 remove_stream_request (srh_iter,
568 srh_head_tmp,
569 srh_tail_tmp);
570 }
571 else
572 {
573 srh_iter->num_peers_left -= num_peers_return;
574 }
575 }
576 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
577 NULL != srh_iter;
578 srh_iter = srh_iter->next)
579 {
580 GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
581 h->stream_requests_tail,
582 srh_iter);
583 } 499 }
584 500
585 if (NULL == h->stream_requests_head) 501 if (NULL == h->stream_requests_head)
@@ -623,6 +539,24 @@ mq_error_handler (void *cls,
623 539
624 540
625/** 541/**
542 * @brief Create the hash value from the share value that defines the sub
543 * (-group)
544 *
545 * @param share_val Share value - strings longer than 508 (512 - 4) will be
546 * truncated.
547 * @param hash Pointer to the location in which the hash will be stored.
548 */
549static void
550hash_from_share_val (const char *share_val, struct GNUNET_HashCode *hash)
551{
552 char hash_port_string[512] = "rps";
553
554 (void) strncat (hash_port_string, share_val, 508);
555 GNUNET_CRYPTO_hash (hash_port_string, strlen (hash_port_string), hash);
556}
557
558
559/**
626 * Reconnect to the service 560 * Reconnect to the service
627 */ 561 */
628static void 562static void
@@ -674,6 +608,49 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
674 608
675 609
676/** 610/**
611 * @brief Start a sub with the given shared value
612 *
613 * @param h Handle to rps
614 * @param shared_value The shared value that defines the members of the sub (-gorup)
615 */
616void
617GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
618 const char *shared_value)
619{
620 struct GNUNET_RPS_CS_SubStartMessage *msg;
621 struct GNUNET_MQ_Envelope *ev;
622
623 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
624 hash_from_share_val (shared_value, &msg->hash);
625 msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config!
626 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
627 GNUNET_assert (0 != msg->round_interval.rel_value_us__);
628
629 GNUNET_MQ_send (h->mq, ev);
630}
631
632
633/**
634 * @brief Stop a sub with the given shared value
635 *
636 * @param h Handle to rps
637 * @param shared_value The shared value that defines the members of the sub (-gorup)
638 */
639void
640GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
641 const char *shared_value)
642{
643 struct GNUNET_RPS_CS_SubStopMessage *msg;
644 struct GNUNET_MQ_Envelope *ev;
645
646 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
647 hash_from_share_val (shared_value, &msg->hash);
648
649 GNUNET_MQ_send (h->mq, ev);
650}
651
652
653/**
677 * Request n random peers. 654 * Request n random peers.
678 * 655 *
679 * @param rps_handle handle to the rps service 656 * @param rps_handle handle to the rps service
@@ -690,11 +667,23 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
690{ 667{
691 struct GNUNET_RPS_Request_Handle *rh; 668 struct GNUNET_RPS_Request_Handle *rh;
692 669
693 rh = new_request_handle (rps_handle, 670 LOG (GNUNET_ERROR_TYPE_INFO,
694 num_req_peers, 671 "Client requested %" PRIu32 " peers\n",
695 ready_cb, 672 num_req_peers);
696 cls); 673 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
697 674 rh->rps_handle = rps_handle;
675 rh->num_requests = num_req_peers;
676 rh->sampler = RPS_sampler_mod_init (num_req_peers,
677 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
678 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
679 num_req_peers,
680 peers_ready_cb,
681 rh);
682 rh->srh = GNUNET_RPS_stream_request (rps_handle,
683 collect_peers_cb,
684 rh); /* cls */
685 rh->ready_cb = ready_cb;
686 rh->ready_cb_cls = cls;
698 687
699 return rh; 688 return rh;
700} 689}
@@ -865,12 +854,11 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
865 struct GNUNET_RPS_Handle *h; 854 struct GNUNET_RPS_Handle *h;
866 855
867 h = rh->rps_handle; 856 h = rh->rps_handle;
868 if (NULL != rh->srh) 857 GNUNET_assert (NULL != rh);
869 { 858 GNUNET_assert (NULL != rh->srh);
870 remove_stream_request (rh->srh, 859 remove_stream_request (rh->srh,
871 h->stream_requests_head, 860 h->stream_requests_head,
872 h->stream_requests_tail); 861 h->stream_requests_tail);
873 }
874 if (NULL == h->stream_requests_head) cancel_stream(h); 862 if (NULL == h->stream_requests_head) cancel_stream(h);
875 if (NULL != rh->sampler_rh) 863 if (NULL != rh->sampler_rh)
876 { 864 {
@@ -889,17 +877,27 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
889void 877void
890GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) 878GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
891{ 879{
892 GNUNET_MQ_destroy (h->mq);
893 if (NULL != h->stream_requests_head) 880 if (NULL != h->stream_requests_head)
894 { 881 {
882 struct GNUNET_RPS_StreamRequestHandle *srh_iter;
883
895 LOG (GNUNET_ERROR_TYPE_WARNING, 884 LOG (GNUNET_ERROR_TYPE_WARNING,
896 "Still waiting for replies\n"); 885 "Still waiting for replies\n");
886 srh_iter = h->stream_requests_head;
887 while (NULL != srh_iter)
888 {
889 struct GNUNET_RPS_StreamRequestHandle *srh_tmp = srh_iter;
890 srh_iter = srh_iter->next;
891 GNUNET_RPS_stream_cancel (srh_tmp);
892 }
897 } 893 }
898 if (NULL != h->view_update_cb) 894 if (NULL != h->view_update_cb)
899 { 895 {
900 LOG (GNUNET_ERROR_TYPE_WARNING, 896 LOG (GNUNET_ERROR_TYPE_WARNING,
901 "Still waiting for view updates\n"); 897 "Still waiting for view updates\n");
898 GNUNET_RPS_view_request_cancel (h);
902 } 899 }
900 GNUNET_MQ_destroy (h->mq);
903 GNUNET_free (h); 901 GNUNET_free (h);
904} 902}
905 903
diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c
index 92d8c12ea..63a6007ae 100644
--- a/src/rps/test_rps.c
+++ b/src/rps/test_rps.c
@@ -190,6 +190,11 @@ struct RPSPeer
190 struct GNUNET_RPS_Handle *rps_handle; 190 struct GNUNET_RPS_Handle *rps_handle;
191 191
192 /** 192 /**
193 * Handle to stream requests
194 */
195 struct GNUNET_RPS_StreamRequestHandle *rps_srh;
196
197 /**
193 * ID of the peer. 198 * ID of the peer.
194 */ 199 */
195 struct GNUNET_PeerIdentity *peer_id; 200 struct GNUNET_PeerIdentity *peer_id;
@@ -673,6 +678,13 @@ ids_to_file (char *file_name,
673} */ 678} */
674 679
675/** 680/**
681 * Task run on timeout to collect statistics and potentially shut down.
682 */
683static void
684post_test_op (void *cls);
685
686
687/**
676 * Test the success of a single test 688 * Test the success of a single test
677 */ 689 */
678static int 690static int
@@ -732,6 +744,8 @@ static int check_statistics_collect_completed_single_peer (
732 } 744 }
733 return GNUNET_YES; 745 return GNUNET_YES;
734} 746}
747
748
735/** 749/**
736 * @brief Checks if all peers already received their statistics value from the 750 * @brief Checks if all peers already received their statistics value from the
737 * statistics service. 751 * statistics service.
@@ -758,6 +772,7 @@ static int check_statistics_collect_completed ()
758 return GNUNET_YES; 772 return GNUNET_YES;
759} 773}
760 774
775
761/** 776/**
762 * Task run on timeout to shut everything down. 777 * Task run on timeout to shut everything down.
763 */ 778 */
@@ -765,6 +780,7 @@ static void
765shutdown_op (void *cls) 780shutdown_op (void *cls)
766{ 781{
767 unsigned int i; 782 unsigned int i;
783 (void) cls;
768 784
769 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 785 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
770 "Shutdown task scheduled, going down.\n"); 786 "Shutdown task scheduled, going down.\n");
@@ -772,6 +788,7 @@ shutdown_op (void *cls)
772 if (NULL != post_test_task) 788 if (NULL != post_test_task)
773 { 789 {
774 GNUNET_SCHEDULER_cancel (post_test_task); 790 GNUNET_SCHEDULER_cancel (post_test_task);
791 post_test_op (NULL);
775 } 792 }
776 if (NULL != churn_task) 793 if (NULL != churn_task)
777 { 794 {
@@ -799,6 +816,7 @@ static void
799post_test_op (void *cls) 816post_test_op (void *cls)
800{ 817{
801 unsigned int i; 818 unsigned int i;
819 (void) cls;
802 820
803 post_test_task = NULL; 821 post_test_task = NULL;
804 post_test = GNUNET_YES; 822 post_test = GNUNET_YES;
@@ -811,16 +829,16 @@ post_test_op (void *cls)
811 } 829 }
812 for (i = 0; i < num_peers; i++) 830 for (i = 0; i < num_peers; i++)
813 { 831 {
814 if (NULL != rps_peers[i].op)
815 {
816 GNUNET_TESTBED_operation_done (rps_peers[i].op);
817 rps_peers[i].op = NULL;
818 }
819 if (NULL != cur_test_run.post_test) 832 if (NULL != cur_test_run.post_test)
820 { 833 {
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i); 834 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
822 cur_test_run.post_test (&rps_peers[i]); 835 cur_test_run.post_test (&rps_peers[i]);
823 } 836 }
837 if (NULL != rps_peers[i].op)
838 {
839 GNUNET_TESTBED_operation_done (rps_peers[i].op);
840 rps_peers[i].op = NULL;
841 }
824 } 842 }
825 /* If we do not collect statistics, shut down directly */ 843 /* If we do not collect statistics, shut down directly */
826 if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics || 844 if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
@@ -905,6 +923,7 @@ info_cb (void *cb_cls,
905 const char *emsg) 923 const char *emsg)
906{ 924{
907 struct OpListEntry *entry = (struct OpListEntry *) cb_cls; 925 struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
926 (void) op;
908 927
909 if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test) 928 if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
910 { 929 {
@@ -1070,6 +1089,9 @@ stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
1070{ 1089{
1071 //struct GNUNET_STATISTICS_Handle *sh = ca_result; 1090 //struct GNUNET_STATISTICS_Handle *sh = ca_result;
1072 //struct RPSPeer *peer = (struct RPSPeer *) cls; 1091 //struct RPSPeer *peer = (struct RPSPeer *) cls;
1092 (void) cls;
1093 (void) op;
1094 (void) ca_result;
1073 1095
1074 if (NULL != emsg) 1096 if (NULL != emsg)
1075 { 1097 {
@@ -1098,6 +1120,12 @@ rps_disconnect_adapter (void *cls,
1098{ 1120{
1099 struct RPSPeer *peer = cls; 1121 struct RPSPeer *peer = cls;
1100 struct GNUNET_RPS_Handle *h = op_result; 1122 struct GNUNET_RPS_Handle *h = op_result;
1123
1124 if (NULL != peer->rps_srh)
1125 {
1126 GNUNET_RPS_stream_cancel (peer->rps_srh);
1127 peer->rps_srh = NULL;
1128 }
1101 GNUNET_assert (NULL != peer); 1129 GNUNET_assert (NULL != peer);
1102 GNUNET_RPS_disconnect (h); 1130 GNUNET_RPS_disconnect (h);
1103 peer->rps_handle = NULL; 1131 peer->rps_handle = NULL;
@@ -1441,6 +1469,7 @@ seed_big_cb (struct RPSPeer *rps_peer)
1441static void 1469static void
1442single_peer_seed_cb (struct RPSPeer *rps_peer) 1470single_peer_seed_cb (struct RPSPeer *rps_peer)
1443{ 1471{
1472 (void) rps_peer;
1444 // TODO 1473 // TODO
1445} 1474}
1446 1475
@@ -1525,6 +1554,63 @@ churn_test_cb (struct RPSPeer *rps_peer)
1525} 1554}
1526 1555
1527/*********************************** 1556/***********************************
1557 * SUB
1558***********************************/
1559
1560static void
1561got_stream_peer_cb (void *cls,
1562 uint64_t num_peers,
1563 const struct GNUNET_PeerIdentity *peers)
1564{
1565 const struct RPSPeer *rps_peer = cls;
1566
1567 for (uint64_t i = 0; i < num_peers; i++)
1568 {
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "Peer %" PRIu32 " received [%s] from stream.\n",
1571 rps_peer->index,
1572 GNUNET_i2s (&peers[i]));
1573 if (0 != rps_peer->index &&
1574 0 == memcmp (&peers[i],
1575 &rps_peers[0].peer_id,
1576 sizeof (struct GNUNET_PeerIdentity)))
1577 {
1578 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub\n");
1579 ok = 1;
1580 }
1581 else if (0 == rps_peer->index &&
1582 0 != memcmp (&peers[i],
1583 &rps_peers[0].peer_id,
1584 sizeof (struct GNUNET_PeerIdentity)))
1585 {
1586 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub (lonely)\n");
1587 ok = 1;
1588 }
1589 }
1590}
1591
1592
1593static void
1594sub_post (struct RPSPeer *rps_peer)
1595{
1596 if (0 != rps_peer->index) GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
1597 else GNUNET_RPS_sub_stop (rps_peer->rps_handle, "lonely");
1598}
1599
1600
1601static void
1602sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
1603{
1604 (void) rps_peer;
1605
1606 if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test");
1607 else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */
1608 rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
1609 &got_stream_peer_cb,
1610 rps_peer);
1611}
1612
1613/***********************************
1528 * PROFILER 1614 * PROFILER
1529***********************************/ 1615***********************************/
1530 1616
@@ -1540,6 +1626,7 @@ churn_cb (void *cls,
1540 struct GNUNET_TESTBED_Operation *op, 1626 struct GNUNET_TESTBED_Operation *op,
1541 const char *emsg) 1627 const char *emsg)
1542{ 1628{
1629 (void) op;
1543 // FIXME 1630 // FIXME
1544 struct OpListEntry *entry = cls; 1631 struct OpListEntry *entry = cls;
1545 1632
@@ -1670,6 +1757,7 @@ manage_service_wrapper (unsigned int i, unsigned int j,
1670static void 1757static void
1671churn (void *cls) 1758churn (void *cls)
1672{ 1759{
1760 (void) cls;
1673 unsigned int i; 1761 unsigned int i;
1674 unsigned int j; 1762 unsigned int j;
1675 double portion_online; 1763 double portion_online;
@@ -1832,6 +1920,8 @@ profiler_cb (struct RPSPeer *rps_peer)
1832int 1920int
1833file_name_cb (void *cls, const char *filename) 1921file_name_cb (void *cls, const char *filename)
1834{ 1922{
1923 (void) cls;
1924
1835 if (NULL != strstr (filename, "sampler_el")) 1925 if (NULL != strstr (filename, "sampler_el"))
1836 { 1926 {
1837 struct RPS_SamplerElement *s_elem; 1927 struct RPS_SamplerElement *s_elem;
@@ -2501,6 +2591,8 @@ stat_iterator (void *cls,
2501 uint64_t value, 2591 uint64_t value,
2502 int is_persistent) 2592 int is_persistent)
2503{ 2593{
2594 (void) subsystem;
2595 (void) is_persistent;
2504 const struct STATcls *stat_cls = (const struct STATcls *) cls; 2596 const struct STATcls *stat_cls = (const struct STATcls *) cls;
2505 struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer; 2597 struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
2506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n", 2598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
@@ -2635,6 +2727,9 @@ run (void *cls,
2635 unsigned int links_succeeded, 2727 unsigned int links_succeeded,
2636 unsigned int links_failed) 2728 unsigned int links_failed)
2637{ 2729{
2730 (void) cls;
2731 (void) h;
2732 (void) links_failed;
2638 unsigned int i; 2733 unsigned int i;
2639 struct OpListEntry *entry; 2734 struct OpListEntry *entry;
2640 2735
@@ -2711,8 +2806,6 @@ run (void *cls,
2711 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2806 timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
2712 (timeout_s * 1.2) + 0.1 * num_peers); 2807 (timeout_s * 1.2) + 0.1 * num_peers);
2713 shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL); 2808 shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
2714 shutdown_task = GNUNET_SCHEDULER_add_shutdown (shutdown_op, NULL);
2715
2716} 2809}
2717 2810
2718 2811
@@ -2727,6 +2820,7 @@ int
2727main (int argc, char *argv[]) 2820main (int argc, char *argv[])
2728{ 2821{
2729 int ret_value; 2822 int ret_value;
2823 (void) argc;
2730 2824
2731 /* Defaults for tests */ 2825 /* Defaults for tests */
2732 num_peers = 5; 2826 num_peers = 5;
@@ -2748,6 +2842,7 @@ main (int argc, char *argv[])
2748 cur_test_run.pre_test = mal_pre; 2842 cur_test_run.pre_test = mal_pre;
2749 cur_test_run.main_test = mal_cb; 2843 cur_test_run.main_test = mal_cb;
2750 cur_test_run.init_peer = mal_init_peer; 2844 cur_test_run.init_peer = mal_init_peer;
2845 timeout_s = 40;
2751 2846
2752 if (strstr (argv[0], "_1") != NULL) 2847 if (strstr (argv[0], "_1") != NULL)
2753 { 2848 {
@@ -2843,7 +2938,22 @@ main (int argc, char *argv[])
2843 cur_test_run.eval_cb = default_eval_cb; 2938 cur_test_run.eval_cb = default_eval_cb;
2844 cur_test_run.have_churn = HAVE_NO_CHURN; 2939 cur_test_run.have_churn = HAVE_NO_CHURN;
2845 cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT; 2940 cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
2846 timeout_s = 10; 2941 timeout_s = 40;
2942 }
2943
2944 else if (strstr (argv[0], "_sub") != NULL)
2945 {
2946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
2947 cur_test_run.name = "test-rps-sub";
2948 num_peers = 5;
2949 //cur_test_run.init_peer = &default_init_peer;
2950 cur_test_run.pre_test = &sub_pre;
2951 cur_test_run.main_test = &single_req_cb;
2952 //cur_test_run.reply_handle = default_reply_handle;
2953 cur_test_run.post_test = &sub_post;
2954 //cur_test_run.eval_cb = default_eval_cb;
2955 cur_test_run.have_churn = HAVE_NO_CHURN;
2956 cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
2847 } 2957 }
2848 2958
2849 else if (strstr (argv[0], "profiler") != NULL) 2959 else if (strstr (argv[0], "profiler") != NULL)
@@ -2916,7 +3026,7 @@ main (int argc, char *argv[])
2916 } 3026 }
2917 3027
2918 ret_value = cur_test_run.eval_cb(); 3028 ret_value = cur_test_run.eval_cb();
2919 3029
2920 if (NO_COLLECT_VIEW == cur_test_run.have_collect_view) 3030 if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
2921 { 3031 {
2922 GNUNET_array_grow (rps_peers->cur_view, 3032 GNUNET_array_grow (rps_peers->cur_view,
@@ -2929,4 +3039,5 @@ main (int argc, char *argv[])
2929 return ret_value; 3039 return ret_value;
2930} 3040}
2931 3041
3042
2932/* end of test_rps.c */ 3043/* end of test_rps.c */
diff --git a/src/rps/test_rps.conf b/src/rps/test_rps.conf
index 84e0e5049..c7ac1f3b8 100644
--- a/src/rps/test_rps.conf
+++ b/src/rps/test_rps.conf
@@ -1,7 +1,7 @@
1[rps] 1[rps]
2#PREFIX = valgrind --leak-check=full --show-leak-kinds=all --log-file=/tmp/rps/valgrind!gnunet-service-rps!%p 2#PREFIX = valgrind --leak-check=full --show-leak-kinds=all --log-file=/tmp/rps/valgrind!gnunet-service-rps!%p
3#PREFIX = valgrind --log-file=/tmp/rps/valgrind!gnunet-service-rps!%p 3#PREFIX = valgrind --log-file=/tmp/rps/valgrind!gnunet-service-rps!%p
4#PREFIX = valgrind 4#PREFIX = valgrind
5UNIXPATH = $GNUNET_TMP/gnunet-service-rps.sock 5UNIXPATH = $GNUNET_TMP/gnunet-service-rps.sock
6HOME = $SERVICEHOME 6HOME = $SERVICEHOME
7# PORT = 2106 7# PORT = 2106
@@ -12,7 +12,7 @@ NOARMBIND = YES
12#OPTIONS=-l /tmp/rps_profiler_logs/rps-[]-%Y-%m-%d.log 12#OPTIONS=-l /tmp/rps_profiler_logs/rps-[]-%Y-%m-%d.log
13 13
14# This is the timeinterval between the rounds 14# This is the timeinterval between the rounds
15ROUNDINTERVAL = 2 s 15ROUNDINTERVAL = 1 s
16FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt 16FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt
17 17
18# This is the 'estimate' in the beginning. 18# This is the 'estimate' in the beginning.