diff options
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r-- | src/rps/rps_api.c | 1320 |
1 files changed, 0 insertions, 1320 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c deleted file mode 100644 index bb4c3597e..000000000 --- a/src/rps/rps_api.c +++ /dev/null | |||
@@ -1,1320 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file rps/rps_api.c | ||
23 | * @brief API for rps | ||
24 | * @author Julius Bünger | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "rps.h" | ||
29 | #include "gnunet_rps_service.h" | ||
30 | #include "rps-sampler_client.h" | ||
31 | |||
32 | #include "gnunet_nse_service.h" | ||
33 | |||
34 | #include <inttypes.h> | ||
35 | |||
36 | #define LOG(kind, ...) GNUNET_log_from (kind, "rps-api", __VA_ARGS__) | ||
37 | |||
38 | /** | ||
39 | * Handle for a request to get peers from biased stream of ids | ||
40 | */ | ||
41 | struct GNUNET_RPS_StreamRequestHandle | ||
42 | { | ||
43 | /** | ||
44 | * The client issuing the request. | ||
45 | */ | ||
46 | struct GNUNET_RPS_Handle *rps_handle; | ||
47 | |||
48 | /** | ||
49 | * The callback to be called when we receive an answer. | ||
50 | */ | ||
51 | GNUNET_RPS_NotifyReadyCB ready_cb; | ||
52 | |||
53 | /** | ||
54 | * The closure for the callback. | ||
55 | */ | ||
56 | void *ready_cb_cls; | ||
57 | |||
58 | /** | ||
59 | * @brief Scheduler task for scheduled callback | ||
60 | */ | ||
61 | struct GNUNET_SCHEDULER_Task *callback_task; | ||
62 | |||
63 | /** | ||
64 | * @brief Next element of the DLL | ||
65 | */ | ||
66 | struct GNUNET_RPS_StreamRequestHandle *next; | ||
67 | |||
68 | /** | ||
69 | * @brief Previous element of the DLL | ||
70 | */ | ||
71 | struct GNUNET_RPS_StreamRequestHandle *prev; | ||
72 | }; | ||
73 | |||
74 | |||
75 | /** | ||
76 | * Handler to handle requests from a client. | ||
77 | */ | ||
78 | struct GNUNET_RPS_Handle | ||
79 | { | ||
80 | /** | ||
81 | * The handle to the client configuration. | ||
82 | */ | ||
83 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
84 | |||
85 | /** | ||
86 | * The message queue to the client. | ||
87 | */ | ||
88 | struct GNUNET_MQ_Handle *mq; | ||
89 | |||
90 | /** | ||
91 | * @brief Callback called on each update of the view | ||
92 | */ | ||
93 | GNUNET_RPS_NotifyReadyCB view_update_cb; | ||
94 | |||
95 | /** | ||
96 | * @brief Closure to each requested update of the view | ||
97 | */ | ||
98 | void *view_update_cls; | ||
99 | |||
100 | /** | ||
101 | * @brief Closure to each requested peer from the biased stream | ||
102 | */ | ||
103 | void *stream_input_cls; | ||
104 | |||
105 | /** | ||
106 | * @brief Head of the DLL of stream requests | ||
107 | */ | ||
108 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_head; | ||
109 | |||
110 | /** | ||
111 | * @brief Tail of the DLL of stream requests | ||
112 | */ | ||
113 | struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; | ||
114 | |||
115 | /** | ||
116 | * @brief Handle to nse service | ||
117 | */ | ||
118 | struct GNUNET_NSE_Handle *nse; | ||
119 | |||
120 | /** | ||
121 | * @brief Pointer to the head element in DLL of request handles | ||
122 | */ | ||
123 | struct GNUNET_RPS_Request_Handle *rh_head; | ||
124 | |||
125 | /** | ||
126 | * @brief Pointer to the tail element in DLL of request handles | ||
127 | */ | ||
128 | struct GNUNET_RPS_Request_Handle *rh_tail; | ||
129 | |||
130 | /** | ||
131 | * @brief Pointer to the head element in DLL of single request handles | ||
132 | */ | ||
133 | struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head; | ||
134 | |||
135 | /** | ||
136 | * @brief Pointer to the tail element in DLL of single request handles | ||
137 | */ | ||
138 | struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail; | ||
139 | |||
140 | /** | ||
141 | * @brief The desired probability with which we want to have observed all | ||
142 | * peers. | ||
143 | */ | ||
144 | float desired_probability; | ||
145 | |||
146 | /** | ||
147 | * @brief A factor that catches the 'bias' of a random stream of peer ids. | ||
148 | * | ||
149 | * As introduced by Brahms: Factor between the number of unique ids in a | ||
150 | * truly random stream and number of unique ids in the gossip stream. | ||
151 | */ | ||
152 | float deficiency_factor; | ||
153 | }; | ||
154 | |||
155 | |||
156 | /** | ||
157 | * Handler for a single request from a client. | ||
158 | */ | ||
159 | struct GNUNET_RPS_Request_Handle | ||
160 | { | ||
161 | /** | ||
162 | * The client issuing the request. | ||
163 | */ | ||
164 | struct GNUNET_RPS_Handle *rps_handle; | ||
165 | |||
166 | /** | ||
167 | * The number of requested peers. | ||
168 | */ | ||
169 | uint32_t num_requests; | ||
170 | |||
171 | /** | ||
172 | * @brief The Sampler for the client request | ||
173 | */ | ||
174 | struct RPS_Sampler *sampler; | ||
175 | |||
176 | /** | ||
177 | * @brief Request handle of the request to the sampler - needed to cancel the request | ||
178 | */ | ||
179 | struct RPS_SamplerRequestHandle *sampler_rh; | ||
180 | |||
181 | /** | ||
182 | * @brief Request handle of the request of the biased stream of peers - | ||
183 | * needed to cancel the request | ||
184 | */ | ||
185 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
186 | |||
187 | /** | ||
188 | * The callback to be called when we receive an answer. | ||
189 | */ | ||
190 | GNUNET_RPS_NotifyReadyCB ready_cb; | ||
191 | |||
192 | /** | ||
193 | * The closure for the callback. | ||
194 | */ | ||
195 | void *ready_cb_cls; | ||
196 | |||
197 | /** | ||
198 | * @brief Pointer to next element in DLL | ||
199 | */ | ||
200 | struct GNUNET_RPS_Request_Handle *next; | ||
201 | |||
202 | /** | ||
203 | * @brief Pointer to previous element in DLL | ||
204 | */ | ||
205 | struct GNUNET_RPS_Request_Handle *prev; | ||
206 | }; | ||
207 | |||
208 | |||
209 | /** | ||
210 | * Handler for a single request from a client. | ||
211 | */ | ||
212 | struct GNUNET_RPS_Request_Handle_Single_Info | ||
213 | { | ||
214 | /** | ||
215 | * The client issuing the request. | ||
216 | */ | ||
217 | struct GNUNET_RPS_Handle *rps_handle; | ||
218 | |||
219 | /** | ||
220 | * @brief The Sampler for the client request | ||
221 | */ | ||
222 | struct RPS_Sampler *sampler; | ||
223 | |||
224 | /** | ||
225 | * @brief Request handle of the request to the sampler - needed to cancel the request | ||
226 | */ | ||
227 | struct RPS_SamplerRequestHandleSingleInfo *sampler_rh; | ||
228 | |||
229 | /** | ||
230 | * @brief Request handle of the request of the biased stream of peers - | ||
231 | * needed to cancel the request | ||
232 | */ | ||
233 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
234 | |||
235 | /** | ||
236 | * The callback to be called when we receive an answer. | ||
237 | */ | ||
238 | GNUNET_RPS_NotifyReadySingleInfoCB ready_cb; | ||
239 | |||
240 | /** | ||
241 | * The closure for the callback. | ||
242 | */ | ||
243 | void *ready_cb_cls; | ||
244 | |||
245 | /** | ||
246 | * @brief Pointer to next element in DLL | ||
247 | */ | ||
248 | struct GNUNET_RPS_Request_Handle_Single_Info *next; | ||
249 | |||
250 | /** | ||
251 | * @brief Pointer to previous element in DLL | ||
252 | */ | ||
253 | struct GNUNET_RPS_Request_Handle_Single_Info *prev; | ||
254 | }; | ||
255 | |||
256 | |||
257 | /** | ||
258 | * Struct used to pack the callback, its closure (provided by the caller) | ||
259 | * and the connection handler to the service to pass it to a callback function. | ||
260 | */ | ||
261 | struct cb_cls_pack | ||
262 | { | ||
263 | /** | ||
264 | * Callback provided by the client | ||
265 | */ | ||
266 | GNUNET_RPS_NotifyReadyCB cb; | ||
267 | |||
268 | /** | ||
269 | * Closure provided by the client | ||
270 | */ | ||
271 | void *cls; | ||
272 | |||
273 | /** | ||
274 | * Handle to the service connection | ||
275 | */ | ||
276 | struct GNUNET_CLIENT_Connection *service_conn; | ||
277 | }; | ||
278 | |||
279 | |||
280 | /** | ||
281 | * @brief Peers received from the biased stream to be passed to all | ||
282 | * srh_handlers | ||
283 | */ | ||
284 | static struct GNUNET_PeerIdentity *srh_callback_peers; | ||
285 | |||
286 | /** | ||
287 | * @brief Number of peers in the biased stream that are to be passed to all | ||
288 | * srh_handlers | ||
289 | */ | ||
290 | static uint64_t srh_callback_num_peers; | ||
291 | |||
292 | |||
293 | /** | ||
294 | * @brief Create a new handle for a stream request | ||
295 | * | ||
296 | * @param rps_handle The rps handle | ||
297 | * @param num_peers The number of desired peers | ||
298 | * @param ready_cb The callback to be called, once all peers are ready | ||
299 | * @param cls The colsure to provide to the callback | ||
300 | * | ||
301 | * @return The handle to the stream request | ||
302 | */ | ||
303 | static struct GNUNET_RPS_StreamRequestHandle * | ||
304 | new_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
305 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
306 | void *cls) | ||
307 | { | ||
308 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
309 | |||
310 | srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); | ||
311 | srh->rps_handle = rps_handle; | ||
312 | srh->ready_cb = ready_cb; | ||
313 | srh->ready_cb_cls = cls; | ||
314 | GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, | ||
315 | rps_handle->stream_requests_tail, | ||
316 | srh); | ||
317 | |||
318 | return srh; | ||
319 | } | ||
320 | |||
321 | |||
322 | /** | ||
323 | * @brief Remove the given stream request from the list of requests and memory | ||
324 | * | ||
325 | * @param srh The request to be removed | ||
326 | */ | ||
327 | static void | ||
328 | remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh) | ||
329 | { | ||
330 | struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle; | ||
331 | |||
332 | GNUNET_assert (NULL != srh); | ||
333 | if (NULL != srh->callback_task) | ||
334 | { | ||
335 | GNUNET_SCHEDULER_cancel (srh->callback_task); | ||
336 | srh->callback_task = NULL; | ||
337 | } | ||
338 | GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, | ||
339 | rps_handle->stream_requests_tail, | ||
340 | srh); | ||
341 | GNUNET_free (srh); | ||
342 | } | ||
343 | |||
344 | |||
345 | /** | ||
346 | * @brief Called once the sampler has collected all requested peers. | ||
347 | * | ||
348 | * Calls the callback provided by the client with the corresponding cls. | ||
349 | * | ||
350 | * @param peers The array of @a num_peers that has been returned. | ||
351 | * @param num_peers The number of peers that have been returned | ||
352 | * @param cls The #GNUNET_RPS_Request_Handle | ||
353 | */ | ||
354 | static void | ||
355 | peers_ready_cb (const struct GNUNET_PeerIdentity *peers, | ||
356 | uint32_t num_peers, | ||
357 | void *cls) | ||
358 | { | ||
359 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
360 | |||
361 | rh->sampler_rh = NULL; | ||
362 | rh->ready_cb (rh->ready_cb_cls, | ||
363 | num_peers, | ||
364 | peers); | ||
365 | GNUNET_RPS_request_cancel (rh); | ||
366 | } | ||
367 | |||
368 | |||
369 | /** | ||
370 | * @brief Called once the sampler has collected the requested peer. | ||
371 | * | ||
372 | * Calls the callback provided by the client with the corresponding cls. | ||
373 | * | ||
374 | * @param peers The array of @a num_peers that has been returned. | ||
375 | * @param num_peers The number of peers that have been returned | ||
376 | * @param cls The #GNUNET_RPS_Request_Handle | ||
377 | * @param probability Probability with which all IDs have been observed | ||
378 | * @param num_observed Number of observed IDs | ||
379 | */ | ||
380 | static void | ||
381 | peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers, | ||
382 | void *cls, | ||
383 | double probability, | ||
384 | uint32_t num_observed) | ||
385 | { | ||
386 | struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls; | ||
387 | |||
388 | rh->sampler_rh = NULL; | ||
389 | rh->ready_cb (rh->ready_cb_cls, | ||
390 | peers, | ||
391 | probability, | ||
392 | num_observed); | ||
393 | GNUNET_RPS_request_single_info_cancel (rh); | ||
394 | } | ||
395 | |||
396 | |||
397 | /** | ||
398 | * @brief Callback to collect the peers from the biased stream and put those | ||
399 | * into the sampler. | ||
400 | * | ||
401 | * @param cls The #GNUNET_RPS_Request_Handle | ||
402 | * @param num_peers The number of peer that have been returned | ||
403 | * @param peers The array of @a num_peers that have been returned | ||
404 | */ | ||
405 | static void | ||
406 | collect_peers_cb (void *cls, | ||
407 | uint64_t num_peers, | ||
408 | const struct GNUNET_PeerIdentity *peers) | ||
409 | { | ||
410 | struct GNUNET_RPS_Request_Handle *rh = cls; | ||
411 | |||
412 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
413 | "Service sent %" PRIu64 " peers from stream\n", | ||
414 | num_peers); | ||
415 | for (uint64_t i = 0; i < num_peers; i++) | ||
416 | { | ||
417 | RPS_sampler_update (rh->sampler, &peers[i]); | ||
418 | } | ||
419 | } | ||
420 | |||
421 | |||
422 | /** | ||
423 | * @brief Callback to collect the peers from the biased stream and put those | ||
424 | * into the sampler. | ||
425 | * | ||
426 | * This version is for the modified #GNUNET_RPS_Request_Handle_Single_Info | ||
427 | * | ||
428 | * @param cls The #GNUNET_RPS_Request_Handle | ||
429 | * @param num_peers The number of peer that have been returned | ||
430 | * @param peers The array of @a num_peers that have been returned | ||
431 | */ | ||
432 | static void | ||
433 | collect_peers_info_cb (void *cls, | ||
434 | uint64_t num_peers, | ||
435 | const struct GNUNET_PeerIdentity *peers) | ||
436 | { | ||
437 | struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls; | ||
438 | |||
439 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
440 | "Service sent %" PRIu64 " peers from stream\n", | ||
441 | num_peers); | ||
442 | for (uint64_t i = 0; i < num_peers; i++) | ||
443 | { | ||
444 | RPS_sampler_update (rhs->sampler, &peers[i]); | ||
445 | } | ||
446 | } | ||
447 | |||
448 | |||
449 | /* Get internals for debugging/profiling purposes */ | ||
450 | |||
451 | /** | ||
452 | * Request updates of view | ||
453 | * | ||
454 | * @param rps_handle handle to the rps service | ||
455 | * @param num_req_peers number of peers we want to receive | ||
456 | * (0 for infinite updates) | ||
457 | * @param cls a closure that will be given to the callback | ||
458 | * @param ready_cb the callback called when the peers are available | ||
459 | */ | ||
460 | void | ||
461 | GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, | ||
462 | uint32_t num_updates, | ||
463 | GNUNET_RPS_NotifyReadyCB view_update_cb, | ||
464 | void *cls) | ||
465 | { | ||
466 | struct GNUNET_MQ_Envelope *ev; | ||
467 | struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg; | ||
468 | |||
469 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
470 | "Client requests %" PRIu32 " view updates\n", | ||
471 | num_updates); | ||
472 | rps_handle->view_update_cb = view_update_cb; | ||
473 | rps_handle->view_update_cls = cls; | ||
474 | |||
475 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST); | ||
476 | msg->num_updates = htonl (num_updates); | ||
477 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
478 | } | ||
479 | |||
480 | |||
481 | void | ||
482 | GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) | ||
483 | { | ||
484 | struct GNUNET_MQ_Envelope *ev; | ||
485 | |||
486 | GNUNET_assert (NULL != rps_handle->view_update_cb); | ||
487 | |||
488 | rps_handle->view_update_cb = NULL; | ||
489 | |||
490 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL); | ||
491 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
492 | } | ||
493 | |||
494 | |||
495 | /** | ||
496 | * Request biased stream of peers that are being put into the sampler | ||
497 | * | ||
498 | * @param rps_handle handle to the rps service | ||
499 | * @param cls a closure that will be given to the callback | ||
500 | * @param ready_cb the callback called when the peers are available | ||
501 | */ | ||
502 | struct GNUNET_RPS_StreamRequestHandle * | ||
503 | GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, | ||
504 | GNUNET_RPS_NotifyReadyCB stream_input_cb, | ||
505 | void *cls) | ||
506 | { | ||
507 | struct GNUNET_RPS_StreamRequestHandle *srh; | ||
508 | struct GNUNET_MQ_Envelope *ev; | ||
509 | struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; | ||
510 | |||
511 | srh = new_stream_request (rps_handle, | ||
512 | stream_input_cb, | ||
513 | cls); | ||
514 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); | ||
515 | |||
516 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); | ||
517 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
518 | return srh; | ||
519 | } | ||
520 | |||
521 | |||
522 | /** | ||
523 | * This function is called, when the service updates the view. | ||
524 | * It verifies that @a msg is well-formed. | ||
525 | * | ||
526 | * @param cls the closure | ||
527 | * @param msg the message | ||
528 | * @return #GNUNET_OK if @a msg is well-formed | ||
529 | */ | ||
530 | static int | ||
531 | check_view_update (void *cls, | ||
532 | const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) | ||
533 | { | ||
534 | uint16_t msize = ntohs (msg->header.size); | ||
535 | uint32_t num_peers = ntohl (msg->num_peers); | ||
536 | |||
537 | (void) cls; | ||
538 | |||
539 | msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply); | ||
540 | if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) || | ||
541 | (msize % sizeof(struct GNUNET_PeerIdentity) != 0)) | ||
542 | { | ||
543 | GNUNET_break (0); | ||
544 | return GNUNET_SYSERR; | ||
545 | } | ||
546 | return GNUNET_OK; | ||
547 | } | ||
548 | |||
549 | |||
550 | /** | ||
551 | * This function is called, when the service updated its view. | ||
552 | * It calls the callback the caller provided | ||
553 | * and disconnects afterwards. | ||
554 | * | ||
555 | * @param msg the message | ||
556 | */ | ||
557 | static void | ||
558 | handle_view_update (void *cls, | ||
559 | const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) | ||
560 | { | ||
561 | struct GNUNET_RPS_Handle *h = cls; | ||
562 | struct GNUNET_PeerIdentity *peers; | ||
563 | |||
564 | /* Give the peers back */ | ||
565 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
566 | "New view of %" PRIu32 " peers:\n", | ||
567 | ntohl (msg->num_peers)); | ||
568 | |||
569 | peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
570 | GNUNET_assert (NULL != h); | ||
571 | GNUNET_assert (NULL != h->view_update_cb); | ||
572 | h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers); | ||
573 | } | ||
574 | |||
575 | |||
576 | /** | ||
577 | * @brief Send message to service that this client does not want to receive | ||
578 | * further updates from the biased peer stream | ||
579 | * | ||
580 | * @param rps_handle The handle representing the service to the client | ||
581 | */ | ||
582 | static void | ||
583 | cancel_stream (struct GNUNET_RPS_Handle *rps_handle) | ||
584 | { | ||
585 | struct GNUNET_MQ_Envelope *ev; | ||
586 | |||
587 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); | ||
588 | GNUNET_MQ_send (rps_handle->mq, ev); | ||
589 | } | ||
590 | |||
591 | |||
592 | /** | ||
593 | * @brief Cancel a specific request for updates from the biased peer stream | ||
594 | * | ||
595 | * @param srh The request handle to cancel | ||
596 | */ | ||
597 | void | ||
598 | GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) | ||
599 | { | ||
600 | struct GNUNET_RPS_Handle *rps_handle; | ||
601 | |||
602 | rps_handle = srh->rps_handle; | ||
603 | remove_stream_request (srh); | ||
604 | if (NULL == rps_handle->stream_requests_head) | ||
605 | cancel_stream (rps_handle); | ||
606 | } | ||
607 | |||
608 | |||
609 | /** | ||
610 | * This function is called, when the service sends another peer from the biased | ||
611 | * stream. | ||
612 | * It calls the callback the caller provided | ||
613 | * and disconnects afterwards. | ||
614 | * | ||
615 | * TODO merge with check_view_update | ||
616 | * | ||
617 | * @param msg the message | ||
618 | */ | ||
619 | static int | ||
620 | check_stream_input (void *cls, | ||
621 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | ||
622 | { | ||
623 | uint16_t msize = ntohs (msg->header.size); | ||
624 | uint32_t num_peers = ntohl (msg->num_peers); | ||
625 | |||
626 | (void) cls; | ||
627 | |||
628 | msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply); | ||
629 | if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) || | ||
630 | (msize % sizeof(struct GNUNET_PeerIdentity) != 0)) | ||
631 | { | ||
632 | GNUNET_break (0); | ||
633 | return GNUNET_SYSERR; | ||
634 | } | ||
635 | return GNUNET_OK; | ||
636 | } | ||
637 | |||
638 | |||
639 | /** | ||
640 | * @brief Called by the scheduler to call the callbacks of the srh handlers | ||
641 | * | ||
642 | * @param cls Stream request handle | ||
643 | */ | ||
644 | static void | ||
645 | srh_callback_scheduled (void *cls) | ||
646 | { | ||
647 | struct GNUNET_RPS_StreamRequestHandle *srh = cls; | ||
648 | |||
649 | srh->callback_task = NULL; | ||
650 | srh->ready_cb (srh->ready_cb_cls, | ||
651 | srh_callback_num_peers, | ||
652 | srh_callback_peers); | ||
653 | } | ||
654 | |||
655 | |||
656 | /** | ||
657 | * This function is called, when the service sends another peer from the biased | ||
658 | * stream. | ||
659 | * It calls the callback the caller provided | ||
660 | * and disconnects afterwards. | ||
661 | * | ||
662 | * @param msg the message | ||
663 | */ | ||
664 | static void | ||
665 | handle_stream_input (void *cls, | ||
666 | const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) | ||
667 | { | ||
668 | struct GNUNET_RPS_Handle *h = cls; | ||
669 | // const struct GNUNET_PeerIdentity *peers; | ||
670 | uint64_t num_peers; | ||
671 | struct GNUNET_RPS_StreamRequestHandle *srh_iter; | ||
672 | struct GNUNET_RPS_StreamRequestHandle *srh_next; | ||
673 | |||
674 | // peers = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
675 | num_peers = ntohl (msg->num_peers); | ||
676 | srh_callback_num_peers = num_peers; | ||
677 | GNUNET_free (srh_callback_peers); | ||
678 | srh_callback_peers = GNUNET_new_array (num_peers, | ||
679 | struct GNUNET_PeerIdentity); | ||
680 | GNUNET_memcpy (srh_callback_peers, | ||
681 | &msg[1], | ||
682 | num_peers * sizeof(struct GNUNET_PeerIdentity)); | ||
683 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
684 | "Received %" PRIu64 " peer(s) from stream input.\n", | ||
685 | num_peers); | ||
686 | for (srh_iter = h->stream_requests_head; | ||
687 | NULL != srh_iter; | ||
688 | srh_iter = srh_next) | ||
689 | { | ||
690 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); | ||
691 | /* Store next pointer - srh might be removed/freed in callback */ | ||
692 | srh_next = srh_iter->next; | ||
693 | if (NULL != srh_iter->callback_task) | ||
694 | GNUNET_SCHEDULER_cancel (srh_iter->callback_task); | ||
695 | srh_iter->callback_task = | ||
696 | GNUNET_SCHEDULER_add_now (&srh_callback_scheduled, | ||
697 | srh_iter); | ||
698 | } | ||
699 | |||
700 | if (NULL == h->stream_requests_head) | ||
701 | { | ||
702 | cancel_stream (h); | ||
703 | } | ||
704 | } | ||
705 | |||
706 | |||
707 | /** | ||
708 | * Reconnect to the service | ||
709 | */ | ||
710 | static void | ||
711 | reconnect (struct GNUNET_RPS_Handle *h); | ||
712 | |||
713 | |||
714 | /** | ||
715 | * Error handler for mq. | ||
716 | * | ||
717 | * This function is called when mq encounters an error. | ||
718 | * Until now mq doesn't provide useful error messages. | ||
719 | * | ||
720 | * @param cls the closure | ||
721 | * @param error error code without specyfied meaning | ||
722 | */ | ||
723 | static void | ||
724 | mq_error_handler (void *cls, | ||
725 | enum GNUNET_MQ_Error error) | ||
726 | { | ||
727 | struct GNUNET_RPS_Handle *h = cls; | ||
728 | |||
729 | // TODO LOG | ||
730 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
731 | "Problem with message queue. error: %i\n\ | ||
732 | 1: READ,\n\ | ||
733 | 2: WRITE,\n\ | ||
734 | 4: TIMEOUT\n", | ||
735 | // TODO: write GNUNET_MQ_strerror (error) | ||
736 | error); | ||
737 | reconnect (h); | ||
738 | /* Resend all pending request as the service destroyed its knowledge | ||
739 | * about them */ | ||
740 | } | ||
741 | |||
742 | |||
743 | /** | ||
744 | * @brief Create the hash value from the share value that defines the sub | ||
745 | * (-group) | ||
746 | * | ||
747 | * @param share_val Share value | ||
748 | * @param hash[out] Pointer to the location in which the hash will be stored. | ||
749 | */ | ||
750 | static void | ||
751 | hash_from_share_val (const char *share_val, | ||
752 | struct GNUNET_HashCode *hash) | ||
753 | { | ||
754 | GNUNET_CRYPTO_kdf (hash, | ||
755 | sizeof(struct GNUNET_HashCode), | ||
756 | "rps", | ||
757 | strlen ("rps"), | ||
758 | share_val, | ||
759 | strlen (share_val), | ||
760 | NULL, 0); | ||
761 | } | ||
762 | |||
763 | |||
764 | /** | ||
765 | * @brief Callback for network size estimate - called with new estimates about | ||
766 | * the network size, updates all samplers with the new estimate | ||
767 | * | ||
768 | * Implements #GNUNET_NSE_Callback | ||
769 | * | ||
770 | * @param cls the rps handle | ||
771 | * @param timestamp unused | ||
772 | * @param logestimate the estimate | ||
773 | * @param std_dev the standard distribution | ||
774 | */ | ||
775 | static void | ||
776 | nse_cb (void *cls, | ||
777 | struct GNUNET_TIME_Absolute timestamp, | ||
778 | double logestimate, | ||
779 | double std_dev) | ||
780 | { | ||
781 | struct GNUNET_RPS_Handle *h = cls; | ||
782 | |||
783 | (void) timestamp; | ||
784 | (void) std_dev; | ||
785 | |||
786 | for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; | ||
787 | NULL != rh_iter && NULL != rh_iter->next; | ||
788 | rh_iter = rh_iter->next) | ||
789 | { | ||
790 | RPS_sampler_update_with_nw_size (rh_iter->sampler, | ||
791 | GNUNET_NSE_log_estimate_to_n ( | ||
792 | logestimate)); | ||
793 | } | ||
794 | for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; | ||
795 | NULL != rhs_iter && NULL != rhs_iter->next; | ||
796 | rhs_iter = rhs_iter->next) | ||
797 | { | ||
798 | RPS_sampler_update_with_nw_size (rhs_iter->sampler, | ||
799 | GNUNET_NSE_log_estimate_to_n ( | ||
800 | logestimate)); | ||
801 | } | ||
802 | } | ||
803 | |||
804 | |||
805 | /** | ||
806 | * Reconnect to the service | ||
807 | */ | ||
808 | static void | ||
809 | reconnect (struct GNUNET_RPS_Handle *h) | ||
810 | { | ||
811 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
812 | GNUNET_MQ_hd_var_size (view_update, | ||
813 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, | ||
814 | struct GNUNET_RPS_CS_DEBUG_ViewReply, | ||
815 | h), | ||
816 | GNUNET_MQ_hd_var_size (stream_input, | ||
817 | GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, | ||
818 | struct GNUNET_RPS_CS_DEBUG_StreamReply, | ||
819 | h), | ||
820 | GNUNET_MQ_handler_end () | ||
821 | }; | ||
822 | |||
823 | if (NULL != h->mq) | ||
824 | GNUNET_MQ_destroy (h->mq); | ||
825 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
826 | "rps", | ||
827 | mq_handlers, | ||
828 | &mq_error_handler, | ||
829 | h); | ||
830 | if (NULL != h->nse) | ||
831 | GNUNET_NSE_disconnect (h->nse); | ||
832 | h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h); | ||
833 | } | ||
834 | |||
835 | |||
836 | /** | ||
837 | * Connect to the rps service | ||
838 | * | ||
839 | * @param cfg configuration to use | ||
840 | * @return a handle to the service, NULL on error | ||
841 | */ | ||
842 | struct GNUNET_RPS_Handle * | ||
843 | GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
844 | { | ||
845 | struct GNUNET_RPS_Handle *h; | ||
846 | |||
847 | h = GNUNET_new (struct GNUNET_RPS_Handle); | ||
848 | h->cfg = cfg; | ||
849 | if (GNUNET_OK != | ||
850 | GNUNET_CONFIGURATION_get_value_float (cfg, | ||
851 | "RPS", | ||
852 | "DESIRED_PROBABILITY", | ||
853 | &h->desired_probability)) | ||
854 | { | ||
855 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, | ||
856 | "RPS", "DESIRED_PROBABILITY"); | ||
857 | GNUNET_free (h); | ||
858 | return NULL; | ||
859 | } | ||
860 | if ((0 > h->desired_probability) || | ||
861 | (1 < h->desired_probability) ) | ||
862 | { | ||
863 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
864 | "The desired probability must be in the interval [0;1]\n"); | ||
865 | GNUNET_free (h); | ||
866 | return NULL; | ||
867 | } | ||
868 | if (GNUNET_OK != | ||
869 | GNUNET_CONFIGURATION_get_value_float (cfg, | ||
870 | "RPS", | ||
871 | "DEFICIENCY_FACTOR", | ||
872 | &h->deficiency_factor)) | ||
873 | { | ||
874 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, | ||
875 | "RPS", "DEFICIENCY_FACTOR"); | ||
876 | GNUNET_free (h); | ||
877 | return NULL; | ||
878 | } | ||
879 | if ((0 > h->desired_probability) || | ||
880 | (1 < h->desired_probability) ) | ||
881 | { | ||
882 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
883 | "The deficiency factor must be in the interval [0;1]\n"); | ||
884 | GNUNET_free (h); | ||
885 | return NULL; | ||
886 | } | ||
887 | reconnect (h); | ||
888 | if (NULL == h->mq) | ||
889 | { | ||
890 | GNUNET_free (h); | ||
891 | return NULL; | ||
892 | } | ||
893 | return h; | ||
894 | } | ||
895 | |||
896 | |||
897 | /** | ||
898 | * @brief Start a sub with the given shared value | ||
899 | * | ||
900 | * @param h Handle to rps | ||
901 | * @param shared_value The shared value that defines the members of the sub (-gorup) | ||
902 | */ | ||
903 | void | ||
904 | GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h, | ||
905 | const char *shared_value) | ||
906 | { | ||
907 | struct GNUNET_RPS_CS_SubStartMessage *msg; | ||
908 | struct GNUNET_MQ_Envelope *ev; | ||
909 | |||
910 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START); | ||
911 | hash_from_share_val (shared_value, &msg->hash); | ||
912 | msg->round_interval = GNUNET_TIME_relative_hton ( // TODO read from config! | ||
913 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)); | ||
914 | GNUNET_assert (0 != msg->round_interval.rel_value_us__); | ||
915 | |||
916 | GNUNET_MQ_send (h->mq, ev); | ||
917 | } | ||
918 | |||
919 | |||
920 | /** | ||
921 | * @brief Stop a sub with the given shared value | ||
922 | * | ||
923 | * @param h Handle to rps | ||
924 | * @param shared_value The shared value that defines the members of the sub (-gorup) | ||
925 | */ | ||
926 | void | ||
927 | GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h, | ||
928 | const char *shared_value) | ||
929 | { | ||
930 | struct GNUNET_RPS_CS_SubStopMessage *msg; | ||
931 | struct GNUNET_MQ_Envelope *ev; | ||
932 | |||
933 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP); | ||
934 | hash_from_share_val (shared_value, &msg->hash); | ||
935 | |||
936 | GNUNET_MQ_send (h->mq, ev); | ||
937 | } | ||
938 | |||
939 | |||
940 | /** | ||
941 | * Request n random peers. | ||
942 | * | ||
943 | * @param rps_handle handle to the rps service | ||
944 | * @param num_req_peers number of peers we want to receive | ||
945 | * @param ready_cb the callback called when the peers are available | ||
946 | * @param cls closure given to the callback | ||
947 | * @return a handle to cancel this request | ||
948 | */ | ||
949 | struct GNUNET_RPS_Request_Handle * | ||
950 | GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, | ||
951 | uint32_t num_req_peers, | ||
952 | GNUNET_RPS_NotifyReadyCB ready_cb, | ||
953 | void *cls) | ||
954 | { | ||
955 | struct GNUNET_RPS_Request_Handle *rh; | ||
956 | |||
957 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
958 | "Client requested %" PRIu32 " peers\n", | ||
959 | num_req_peers); | ||
960 | rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); | ||
961 | rh->rps_handle = rps_handle; | ||
962 | rh->num_requests = num_req_peers; | ||
963 | rh->sampler = RPS_sampler_mod_init (num_req_peers, | ||
964 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff | ||
965 | RPS_sampler_set_desired_probability (rh->sampler, | ||
966 | rps_handle->desired_probability); | ||
967 | RPS_sampler_set_deficiency_factor (rh->sampler, | ||
968 | rps_handle->deficiency_factor); | ||
969 | rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, | ||
970 | num_req_peers, | ||
971 | peers_ready_cb, | ||
972 | rh); | ||
973 | rh->srh = GNUNET_RPS_stream_request (rps_handle, | ||
974 | collect_peers_cb, | ||
975 | rh); /* cls */ | ||
976 | rh->ready_cb = ready_cb; | ||
977 | rh->ready_cb_cls = cls; | ||
978 | GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head, | ||
979 | rps_handle->rh_tail, | ||
980 | rh); | ||
981 | |||
982 | return rh; | ||
983 | } | ||
984 | |||
985 | |||
986 | /** | ||
987 | * Request one random peer, getting additional information. | ||
988 | * | ||
989 | * @param rps_handle handle to the rps service | ||
990 | * @param ready_cb the callback called when the peers are available | ||
991 | * @param cls closure given to the callback | ||
992 | * @return a handle to cancel this request | ||
993 | */ | ||
994 | struct GNUNET_RPS_Request_Handle_Single_Info * | ||
995 | GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle, | ||
996 | GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, | ||
997 | void *cls) | ||
998 | { | ||
999 | struct GNUNET_RPS_Request_Handle_Single_Info *rhs; | ||
1000 | uint32_t num_req_peers = 1; | ||
1001 | |||
1002 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
1003 | "Client requested peer with additional info\n"); | ||
1004 | rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info); | ||
1005 | rhs->rps_handle = rps_handle; | ||
1006 | rhs->sampler = RPS_sampler_mod_init (num_req_peers, | ||
1007 | GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff | ||
1008 | RPS_sampler_set_desired_probability (rhs->sampler, | ||
1009 | rps_handle->desired_probability); | ||
1010 | RPS_sampler_set_deficiency_factor (rhs->sampler, | ||
1011 | rps_handle->deficiency_factor); | ||
1012 | rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler, | ||
1013 | peer_info_ready_cb, | ||
1014 | rhs); | ||
1015 | rhs->srh = GNUNET_RPS_stream_request (rps_handle, | ||
1016 | collect_peers_info_cb, | ||
1017 | rhs); /* cls */ | ||
1018 | rhs->ready_cb = ready_cb; | ||
1019 | rhs->ready_cb_cls = cls; | ||
1020 | GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head, | ||
1021 | rps_handle->rhs_tail, | ||
1022 | rhs); | ||
1023 | |||
1024 | return rhs; | ||
1025 | } | ||
1026 | |||
1027 | |||
1028 | /** | ||
1029 | * Seed rps service with peerIDs. | ||
1030 | * | ||
1031 | * @param h handle to the rps service | ||
1032 | * @param n number of peers to seed | ||
1033 | * @param ids the ids of the peers seeded | ||
1034 | */ | ||
1035 | void | ||
1036 | GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h, | ||
1037 | uint32_t n, | ||
1038 | const struct GNUNET_PeerIdentity *ids) | ||
1039 | { | ||
1040 | size_t size_needed; | ||
1041 | uint32_t num_peers_max; | ||
1042 | const struct GNUNET_PeerIdentity *tmp_peer_pointer; | ||
1043 | struct GNUNET_MQ_Envelope *ev; | ||
1044 | struct GNUNET_RPS_CS_SeedMessage *msg; | ||
1045 | |||
1046 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1047 | "Client wants to seed %" PRIu32 " peers:\n", | ||
1048 | n); | ||
1049 | for (unsigned int i = 0; i < n; i++) | ||
1050 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1051 | "%u. peer: %s\n", | ||
1052 | i, | ||
1053 | GNUNET_i2s (&ids[i])); | ||
1054 | |||
1055 | /* The actual size the message occupies */ | ||
1056 | size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) | ||
1057 | + n * sizeof(struct GNUNET_PeerIdentity); | ||
1058 | /* The number of peers that fits in one message together with | ||
1059 | * the respective header */ | ||
1060 | num_peers_max = (GNUNET_MAX_MESSAGE_SIZE | ||
1061 | - sizeof(struct GNUNET_RPS_CS_SeedMessage)) | ||
1062 | / sizeof(struct GNUNET_PeerIdentity); | ||
1063 | tmp_peer_pointer = ids; | ||
1064 | |||
1065 | while (GNUNET_MAX_MESSAGE_SIZE < size_needed) | ||
1066 | { | ||
1067 | ev = GNUNET_MQ_msg_extra (msg, | ||
1068 | num_peers_max * sizeof(struct | ||
1069 | GNUNET_PeerIdentity), | ||
1070 | GNUNET_MESSAGE_TYPE_RPS_CS_SEED); | ||
1071 | msg->num_peers = htonl (num_peers_max); | ||
1072 | GNUNET_memcpy (&msg[1], | ||
1073 | tmp_peer_pointer, | ||
1074 | num_peers_max * sizeof(struct GNUNET_PeerIdentity)); | ||
1075 | GNUNET_MQ_send (h->mq, | ||
1076 | ev); | ||
1077 | n -= num_peers_max; | ||
1078 | size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) | ||
1079 | + n * sizeof(struct GNUNET_PeerIdentity); | ||
1080 | /* Set pointer to beginning of next block of num_peers_max peers */ | ||
1081 | tmp_peer_pointer = &ids[num_peers_max]; | ||
1082 | } | ||
1083 | |||
1084 | ev = GNUNET_MQ_msg_extra (msg, | ||
1085 | n * sizeof(struct GNUNET_PeerIdentity), | ||
1086 | GNUNET_MESSAGE_TYPE_RPS_CS_SEED); | ||
1087 | msg->num_peers = htonl (n); | ||
1088 | GNUNET_memcpy (&msg[1], | ||
1089 | tmp_peer_pointer, | ||
1090 | n * sizeof(struct GNUNET_PeerIdentity)); | ||
1091 | GNUNET_MQ_send (h->mq, | ||
1092 | ev); | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | #if ENABLE_MALICIOUS | ||
1097 | /** | ||
1098 | * Turn RPS service to act malicious. | ||
1099 | * | ||
1100 | * @param h handle to the rps service | ||
1101 | * @param type which type of malicious peer to turn to. | ||
1102 | * 0 Don't act malicious at all | ||
1103 | * 1 Try to maximise representation | ||
1104 | * 2 Try to partition the network | ||
1105 | * (isolate one peer from the rest) | ||
1106 | * @param n number of @a ids | ||
1107 | * @param ids the ids of the malicious peers | ||
1108 | * if @type is 2 the last id is the id of the | ||
1109 | * peer to be isolated from the rest | ||
1110 | */ | ||
1111 | void | ||
1112 | GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, | ||
1113 | uint32_t type, | ||
1114 | uint32_t num_peers, | ||
1115 | const struct GNUNET_PeerIdentity *peer_ids, | ||
1116 | const struct GNUNET_PeerIdentity *target_peer) | ||
1117 | { | ||
1118 | size_t size_needed; | ||
1119 | uint32_t num_peers_max; | ||
1120 | const struct GNUNET_PeerIdentity *tmp_peer_pointer; | ||
1121 | struct GNUNET_MQ_Envelope *ev; | ||
1122 | struct GNUNET_RPS_CS_ActMaliciousMessage *msg; | ||
1123 | |||
1124 | unsigned int i; | ||
1125 | |||
1126 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1127 | "Client turns malicious (type %" PRIu32 ") with %" PRIu32 | ||
1128 | " other peers:\n", | ||
1129 | type, | ||
1130 | num_peers); | ||
1131 | for (i = 0; i < num_peers; i++) | ||
1132 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1133 | "%u. peer: %s\n", | ||
1134 | i, | ||
1135 | GNUNET_i2s (&peer_ids[i])); | ||
1136 | |||
1137 | /* The actual size the message would occupy */ | ||
1138 | size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) | ||
1139 | + num_peers * sizeof(struct GNUNET_PeerIdentity); | ||
1140 | /* The number of peers that fit in one message together with | ||
1141 | * the respective header */ | ||
1142 | num_peers_max = (GNUNET_MAX_MESSAGE_SIZE | ||
1143 | - sizeof(struct GNUNET_RPS_CS_SeedMessage)) | ||
1144 | / sizeof(struct GNUNET_PeerIdentity); | ||
1145 | tmp_peer_pointer = peer_ids; | ||
1146 | |||
1147 | while (GNUNET_MAX_MESSAGE_SIZE < size_needed) | ||
1148 | { | ||
1149 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1150 | "Too many peers to send at once, sending %" PRIu32 | ||
1151 | " (all we can so far)\n", | ||
1152 | num_peers_max); | ||
1153 | ev = GNUNET_MQ_msg_extra (msg, | ||
1154 | num_peers_max * sizeof(struct | ||
1155 | GNUNET_PeerIdentity), | ||
1156 | GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); | ||
1157 | msg->type = htonl (type); | ||
1158 | msg->num_peers = htonl (num_peers_max); | ||
1159 | if ((2 == type) || | ||
1160 | (3 == type)) | ||
1161 | msg->attacked_peer = peer_ids[num_peers]; | ||
1162 | GNUNET_memcpy (&msg[1], | ||
1163 | tmp_peer_pointer, | ||
1164 | num_peers_max * sizeof(struct GNUNET_PeerIdentity)); | ||
1165 | |||
1166 | GNUNET_MQ_send (h->mq, ev); | ||
1167 | |||
1168 | num_peers -= num_peers_max; | ||
1169 | size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) | ||
1170 | + num_peers * sizeof(struct GNUNET_PeerIdentity); | ||
1171 | /* Set pointer to beginning of next block of num_peers_max peers */ | ||
1172 | tmp_peer_pointer = &peer_ids[num_peers_max]; | ||
1173 | } | ||
1174 | |||
1175 | ev = GNUNET_MQ_msg_extra (msg, | ||
1176 | num_peers * sizeof(struct GNUNET_PeerIdentity), | ||
1177 | GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); | ||
1178 | msg->type = htonl (type); | ||
1179 | msg->num_peers = htonl (num_peers); | ||
1180 | if ((2 == type) || | ||
1181 | (3 == type)) | ||
1182 | msg->attacked_peer = *target_peer; | ||
1183 | GNUNET_memcpy (&msg[1], | ||
1184 | tmp_peer_pointer, | ||
1185 | num_peers * sizeof(struct GNUNET_PeerIdentity)); | ||
1186 | |||
1187 | GNUNET_MQ_send (h->mq, ev); | ||
1188 | } | ||
1189 | |||
1190 | |||
1191 | #endif /* ENABLE_MALICIOUS */ | ||
1192 | |||
1193 | |||
1194 | /** | ||
1195 | * Cancel an issued request. | ||
1196 | * | ||
1197 | * @param rh request handle of request to cancel | ||
1198 | */ | ||
1199 | void | ||
1200 | GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) | ||
1201 | { | ||
1202 | struct GNUNET_RPS_Handle *h; | ||
1203 | |||
1204 | h = rh->rps_handle; | ||
1205 | GNUNET_assert (NULL != rh); | ||
1206 | GNUNET_assert (NULL != rh->srh); | ||
1207 | GNUNET_assert (h == rh->srh->rps_handle); | ||
1208 | GNUNET_RPS_stream_cancel (rh->srh); | ||
1209 | rh->srh = NULL; | ||
1210 | if (NULL == h->stream_requests_head) | ||
1211 | cancel_stream (h); | ||
1212 | if (NULL != rh->sampler_rh) | ||
1213 | { | ||
1214 | RPS_sampler_request_cancel (rh->sampler_rh); | ||
1215 | } | ||
1216 | RPS_sampler_destroy (rh->sampler); | ||
1217 | rh->sampler = NULL; | ||
1218 | GNUNET_CONTAINER_DLL_remove (h->rh_head, | ||
1219 | h->rh_tail, | ||
1220 | rh); | ||
1221 | GNUNET_free (rh); | ||
1222 | } | ||
1223 | |||
1224 | |||
1225 | /** | ||
1226 | * Cancel an issued single info request. | ||
1227 | * | ||
1228 | * @param rhs request handle of request to cancel | ||
1229 | */ | ||
1230 | void | ||
1231 | GNUNET_RPS_request_single_info_cancel ( | ||
1232 | struct GNUNET_RPS_Request_Handle_Single_Info *rhs) | ||
1233 | { | ||
1234 | struct GNUNET_RPS_Handle *h; | ||
1235 | |||
1236 | h = rhs->rps_handle; | ||
1237 | GNUNET_assert (NULL != rhs); | ||
1238 | GNUNET_assert (NULL != rhs->srh); | ||
1239 | GNUNET_assert (h == rhs->srh->rps_handle); | ||
1240 | GNUNET_RPS_stream_cancel (rhs->srh); | ||
1241 | rhs->srh = NULL; | ||
1242 | if (NULL == h->stream_requests_head) | ||
1243 | cancel_stream (h); | ||
1244 | if (NULL != rhs->sampler_rh) | ||
1245 | { | ||
1246 | RPS_sampler_request_single_info_cancel (rhs->sampler_rh); | ||
1247 | } | ||
1248 | RPS_sampler_destroy (rhs->sampler); | ||
1249 | rhs->sampler = NULL; | ||
1250 | GNUNET_CONTAINER_DLL_remove (h->rhs_head, | ||
1251 | h->rhs_tail, | ||
1252 | rhs); | ||
1253 | GNUNET_free (rhs); | ||
1254 | } | ||
1255 | |||
1256 | |||
1257 | /** | ||
1258 | * Disconnect from the rps service | ||
1259 | * | ||
1260 | * @param h the handle to the rps service | ||
1261 | */ | ||
1262 | void | ||
1263 | GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) | ||
1264 | { | ||
1265 | if (NULL != h->stream_requests_head) | ||
1266 | { | ||
1267 | struct GNUNET_RPS_StreamRequestHandle *srh_next; | ||
1268 | |||
1269 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1270 | "Still waiting for replies\n"); | ||
1271 | for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = | ||
1272 | h->stream_requests_head; | ||
1273 | NULL != srh_iter; | ||
1274 | srh_iter = srh_next) | ||
1275 | { | ||
1276 | srh_next = srh_iter->next; | ||
1277 | GNUNET_RPS_stream_cancel (srh_iter); | ||
1278 | } | ||
1279 | } | ||
1280 | if (NULL != h->rh_head) | ||
1281 | { | ||
1282 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1283 | "Not all requests were cancelled!\n"); | ||
1284 | for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; | ||
1285 | h->rh_head != NULL; | ||
1286 | rh_iter = h->rh_head) | ||
1287 | { | ||
1288 | GNUNET_RPS_request_cancel (rh_iter); | ||
1289 | } | ||
1290 | } | ||
1291 | if (NULL != h->rhs_head) | ||
1292 | { | ||
1293 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1294 | "Not all requests were cancelled!\n"); | ||
1295 | for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; | ||
1296 | h->rhs_head != NULL; | ||
1297 | rhs_iter = h->rhs_head) | ||
1298 | { | ||
1299 | GNUNET_RPS_request_single_info_cancel (rhs_iter); | ||
1300 | } | ||
1301 | } | ||
1302 | if (NULL != srh_callback_peers) | ||
1303 | { | ||
1304 | GNUNET_free (srh_callback_peers); | ||
1305 | srh_callback_peers = NULL; | ||
1306 | } | ||
1307 | if (NULL != h->view_update_cb) | ||
1308 | { | ||
1309 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1310 | "Still waiting for view updates\n"); | ||
1311 | GNUNET_RPS_view_request_cancel (h); | ||
1312 | } | ||
1313 | if (NULL != h->nse) | ||
1314 | GNUNET_NSE_disconnect (h->nse); | ||
1315 | GNUNET_MQ_destroy (h->mq); | ||
1316 | GNUNET_free (h); | ||
1317 | } | ||
1318 | |||
1319 | |||
1320 | /* end of rps_api.c */ | ||