aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
authorng0 <ng0@n0.is>2019-09-08 12:33:09 +0000
committerng0 <ng0@n0.is>2019-09-08 12:33:09 +0000
commitd41ed82a4ea0cc8e1674b6d5d2c49fd6462610bb (patch)
tree9efd18ea7d425652085ed0bd5e8e45604bc5f6b9 /src/rps/rps_api.c
parenta0fce305c565c0937d917a92712f15e9c5736260 (diff)
downloadgnunet-d41ed82a4ea0cc8e1674b6d5d2c49fd6462610bb.tar.gz
gnunet-d41ed82a4ea0cc8e1674b6d5d2c49fd6462610bb.zip
uncrustify as demanded.
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c913
1 files changed, 457 insertions, 456 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 9e405fdef..2b54297c3 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -16,7 +16,7 @@
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file rps/rps_api.c 22 * @file rps/rps_api.c
@@ -33,13 +33,12 @@
33 33
34#include <inttypes.h> 34#include <inttypes.h>
35 35
36#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) 36#define LOG(kind, ...) GNUNET_log_from(kind, "rps-api", __VA_ARGS__)
37 37
38/** 38/**
39 * Handle for a request to get peers from biased stream of ids 39 * Handle for a request to get peers from biased stream of ids
40 */ 40 */
41struct GNUNET_RPS_StreamRequestHandle 41struct GNUNET_RPS_StreamRequestHandle {
42{
43 /** 42 /**
44 * The client issuing the request. 43 * The client issuing the request.
45 */ 44 */
@@ -75,8 +74,7 @@ struct GNUNET_RPS_StreamRequestHandle
75/** 74/**
76 * Handler to handle requests from a client. 75 * Handler to handle requests from a client.
77 */ 76 */
78struct GNUNET_RPS_Handle 77struct GNUNET_RPS_Handle {
79{
80 /** 78 /**
81 * The handle to the client configuration. 79 * The handle to the client configuration.
82 */ 80 */
@@ -156,8 +154,7 @@ struct GNUNET_RPS_Handle
156/** 154/**
157 * Handler for a single request from a client. 155 * Handler for a single request from a client.
158 */ 156 */
159struct GNUNET_RPS_Request_Handle 157struct GNUNET_RPS_Request_Handle {
160{
161 /** 158 /**
162 * The client issuing the request. 159 * The client issuing the request.
163 */ 160 */
@@ -209,8 +206,7 @@ struct GNUNET_RPS_Request_Handle
209/** 206/**
210 * Handler for a single request from a client. 207 * Handler for a single request from a client.
211 */ 208 */
212struct GNUNET_RPS_Request_Handle_Single_Info 209struct GNUNET_RPS_Request_Handle_Single_Info {
213{
214 /** 210 /**
215 * The client issuing the request. 211 * The client issuing the request.
216 */ 212 */
@@ -258,8 +254,7 @@ struct GNUNET_RPS_Request_Handle_Single_Info
258 * Struct used to pack the callback, its closure (provided by the caller) 254 * Struct used to pack the callback, its closure (provided by the caller)
259 * and the connection handler to the service to pass it to a callback function. 255 * and the connection handler to the service to pass it to a callback function.
260 */ 256 */
261struct cb_cls_pack 257struct cb_cls_pack {
262{
263 /** 258 /**
264 * Callback provided by the client 259 * Callback provided by the client
265 */ 260 */
@@ -273,7 +268,7 @@ struct cb_cls_pack
273 /** 268 /**
274 * Handle to the service connection 269 * Handle to the service connection
275 */ 270 */
276 struct GNUNET_CLIENT_Connection *service_conn; 271 struct GNUNET_CLIENT_Connection *service_conn;
277}; 272};
278 273
279 274
@@ -301,19 +296,19 @@ static uint64_t srh_callback_num_peers;
301 * @return The handle to the stream request 296 * @return The handle to the stream request
302 */ 297 */
303static struct GNUNET_RPS_StreamRequestHandle * 298static struct GNUNET_RPS_StreamRequestHandle *
304new_stream_request (struct GNUNET_RPS_Handle *rps_handle, 299new_stream_request(struct GNUNET_RPS_Handle *rps_handle,
305 GNUNET_RPS_NotifyReadyCB ready_cb, 300 GNUNET_RPS_NotifyReadyCB ready_cb,
306 void *cls) 301 void *cls)
307{ 302{
308 struct GNUNET_RPS_StreamRequestHandle *srh; 303 struct GNUNET_RPS_StreamRequestHandle *srh;
309 304
310 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); 305 srh = GNUNET_new(struct GNUNET_RPS_StreamRequestHandle);
311 srh->rps_handle = rps_handle; 306 srh->rps_handle = rps_handle;
312 srh->ready_cb = ready_cb; 307 srh->ready_cb = ready_cb;
313 srh->ready_cb_cls = cls; 308 srh->ready_cb_cls = cls;
314 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, 309 GNUNET_CONTAINER_DLL_insert(rps_handle->stream_requests_head,
315 rps_handle->stream_requests_tail, 310 rps_handle->stream_requests_tail,
316 srh); 311 srh);
317 312
318 return srh; 313 return srh;
319} 314}
@@ -325,20 +320,20 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
325 * @param srh The request to be removed 320 * @param srh The request to be removed
326 */ 321 */
327static void 322static void
328remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh) 323remove_stream_request(struct GNUNET_RPS_StreamRequestHandle *srh)
329{ 324{
330 struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle; 325 struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
331 326
332 GNUNET_assert (NULL != srh); 327 GNUNET_assert(NULL != srh);
333 if (NULL != srh->callback_task) 328 if (NULL != srh->callback_task)
334 { 329 {
335 GNUNET_SCHEDULER_cancel (srh->callback_task); 330 GNUNET_SCHEDULER_cancel(srh->callback_task);
336 srh->callback_task = NULL; 331 srh->callback_task = NULL;
337 } 332 }
338 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, 333 GNUNET_CONTAINER_DLL_remove(rps_handle->stream_requests_head,
339 rps_handle->stream_requests_tail, 334 rps_handle->stream_requests_tail,
340 srh); 335 srh);
341 GNUNET_free (srh); 336 GNUNET_free(srh);
342} 337}
343 338
344 339
@@ -352,17 +347,17 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
352 * @param cls The #GNUNET_RPS_Request_Handle 347 * @param cls The #GNUNET_RPS_Request_Handle
353 */ 348 */
354static void 349static void
355peers_ready_cb (const struct GNUNET_PeerIdentity *peers, 350peers_ready_cb(const struct GNUNET_PeerIdentity *peers,
356 uint32_t num_peers, 351 uint32_t num_peers,
357 void *cls) 352 void *cls)
358{ 353{
359 struct GNUNET_RPS_Request_Handle *rh = cls; 354 struct GNUNET_RPS_Request_Handle *rh = cls;
360 355
361 rh->sampler_rh = NULL; 356 rh->sampler_rh = NULL;
362 rh->ready_cb (rh->ready_cb_cls, 357 rh->ready_cb(rh->ready_cb_cls,
363 num_peers, 358 num_peers,
364 peers); 359 peers);
365 GNUNET_RPS_request_cancel (rh); 360 GNUNET_RPS_request_cancel(rh);
366} 361}
367 362
368 363
@@ -378,19 +373,19 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
378 * @param num_observed Number of observed IDs 373 * @param num_observed Number of observed IDs
379 */ 374 */
380static void 375static void
381peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers, 376peer_info_ready_cb(const struct GNUNET_PeerIdentity *peers,
382 void *cls, 377 void *cls,
383 double probability, 378 double probability,
384 uint32_t num_observed) 379 uint32_t num_observed)
385{ 380{
386 struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls; 381 struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
387 382
388 rh->sampler_rh = NULL; 383 rh->sampler_rh = NULL;
389 rh->ready_cb (rh->ready_cb_cls, 384 rh->ready_cb(rh->ready_cb_cls,
390 peers, 385 peers,
391 probability, 386 probability,
392 num_observed); 387 num_observed);
393 GNUNET_RPS_request_single_info_cancel (rh); 388 GNUNET_RPS_request_single_info_cancel(rh);
394} 389}
395 390
396 391
@@ -403,19 +398,19 @@ peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
403 * @param peers The array of @a num_peers that have been returned 398 * @param peers The array of @a num_peers that have been returned
404 */ 399 */
405static void 400static void
406collect_peers_cb (void *cls, 401collect_peers_cb(void *cls,
407 uint64_t num_peers, 402 uint64_t num_peers,
408 const struct GNUNET_PeerIdentity *peers) 403 const struct GNUNET_PeerIdentity *peers)
409{ 404{
410 struct GNUNET_RPS_Request_Handle *rh = cls; 405 struct GNUNET_RPS_Request_Handle *rh = cls;
411 406
412 LOG (GNUNET_ERROR_TYPE_DEBUG, 407 LOG(GNUNET_ERROR_TYPE_DEBUG,
413 "Service sent %" PRIu64 " peers from stream\n", 408 "Service sent %" PRIu64 " peers from stream\n",
414 num_peers); 409 num_peers);
415 for (uint64_t i = 0; i < num_peers; i++) 410 for (uint64_t i = 0; i < num_peers; i++)
416 { 411 {
417 RPS_sampler_update (rh->sampler, &peers[i]); 412 RPS_sampler_update(rh->sampler, &peers[i]);
418 } 413 }
419} 414}
420 415
421 416
@@ -430,19 +425,19 @@ collect_peers_cb (void *cls,
430 * @param peers The array of @a num_peers that have been returned 425 * @param peers The array of @a num_peers that have been returned
431 */ 426 */
432static void 427static void
433collect_peers_info_cb (void *cls, 428collect_peers_info_cb(void *cls,
434 uint64_t num_peers, 429 uint64_t num_peers,
435 const struct GNUNET_PeerIdentity *peers) 430 const struct GNUNET_PeerIdentity *peers)
436{ 431{
437 struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls; 432 struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
438 433
439 LOG (GNUNET_ERROR_TYPE_DEBUG, 434 LOG(GNUNET_ERROR_TYPE_DEBUG,
440 "Service sent %" PRIu64 " peers from stream\n", 435 "Service sent %" PRIu64 " peers from stream\n",
441 num_peers); 436 num_peers);
442 for (uint64_t i = 0; i < num_peers; i++) 437 for (uint64_t i = 0; i < num_peers; i++)
443 { 438 {
444 RPS_sampler_update (rhs->sampler, &peers[i]); 439 RPS_sampler_update(rhs->sampler, &peers[i]);
445 } 440 }
446} 441}
447 442
448 443
@@ -458,37 +453,37 @@ collect_peers_info_cb (void *cls,
458 * @param ready_cb the callback called when the peers are available 453 * @param ready_cb the callback called when the peers are available
459 */ 454 */
460void 455void
461GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, 456GNUNET_RPS_view_request(struct GNUNET_RPS_Handle *rps_handle,
462 uint32_t num_updates, 457 uint32_t num_updates,
463 GNUNET_RPS_NotifyReadyCB view_update_cb, 458 GNUNET_RPS_NotifyReadyCB view_update_cb,
464 void *cls) 459 void *cls)
465{ 460{
466 struct GNUNET_MQ_Envelope *ev; 461 struct GNUNET_MQ_Envelope *ev;
467 struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg; 462 struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
468 463
469 LOG (GNUNET_ERROR_TYPE_DEBUG, 464 LOG(GNUNET_ERROR_TYPE_DEBUG,
470 "Client requests %" PRIu32 " view updates\n", 465 "Client requests %" PRIu32 " view updates\n",
471 num_updates); 466 num_updates);
472 rps_handle->view_update_cb = view_update_cb; 467 rps_handle->view_update_cb = view_update_cb;
473 rps_handle->view_update_cls = cls; 468 rps_handle->view_update_cls = cls;
474 469
475 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST); 470 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
476 msg->num_updates = htonl (num_updates); 471 msg->num_updates = htonl(num_updates);
477 GNUNET_MQ_send (rps_handle->mq, ev); 472 GNUNET_MQ_send(rps_handle->mq, ev);
478} 473}
479 474
480 475
481void 476void
482GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) 477GNUNET_RPS_view_request_cancel(struct GNUNET_RPS_Handle *rps_handle)
483{ 478{
484 struct GNUNET_MQ_Envelope *ev; 479 struct GNUNET_MQ_Envelope *ev;
485 480
486 GNUNET_assert (NULL != rps_handle->view_update_cb); 481 GNUNET_assert(NULL != rps_handle->view_update_cb);
487 482
488 rps_handle->view_update_cb = NULL; 483 rps_handle->view_update_cb = NULL;
489 484
490 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL); 485 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL);
491 GNUNET_MQ_send (rps_handle->mq, ev); 486 GNUNET_MQ_send(rps_handle->mq, ev);
492} 487}
493 488
494 489
@@ -500,21 +495,21 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
500 * @param ready_cb the callback called when the peers are available 495 * @param ready_cb the callback called when the peers are available
501 */ 496 */
502struct GNUNET_RPS_StreamRequestHandle * 497struct GNUNET_RPS_StreamRequestHandle *
503GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, 498GNUNET_RPS_stream_request(struct GNUNET_RPS_Handle *rps_handle,
504 GNUNET_RPS_NotifyReadyCB stream_input_cb, 499 GNUNET_RPS_NotifyReadyCB stream_input_cb,
505 void *cls) 500 void *cls)
506{ 501{
507 struct GNUNET_RPS_StreamRequestHandle *srh; 502 struct GNUNET_RPS_StreamRequestHandle *srh;
508 struct GNUNET_MQ_Envelope *ev; 503 struct GNUNET_MQ_Envelope *ev;
509 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; 504 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
510 505
511 srh = new_stream_request (rps_handle, 506 srh = new_stream_request(rps_handle,
512 stream_input_cb, 507 stream_input_cb,
513 cls); 508 cls);
514 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); 509 LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
515 510
516 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); 511 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
517 GNUNET_MQ_send (rps_handle->mq, ev); 512 GNUNET_MQ_send(rps_handle->mq, ev);
518 return srh; 513 return srh;
519} 514}
520 515
@@ -528,20 +523,21 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
528 * @return #GNUNET_OK if @a msg is well-formed 523 * @return #GNUNET_OK if @a msg is well-formed
529 */ 524 */
530static int 525static int
531check_view_update (void *cls, 526check_view_update(void *cls,
532 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) 527 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
533{ 528{
534 uint16_t msize = ntohs (msg->header.size); 529 uint16_t msize = ntohs(msg->header.size);
535 uint32_t num_peers = ntohl (msg->num_peers); 530 uint32_t num_peers = ntohl(msg->num_peers);
536 (void) cls; 531
537 532 (void)cls;
538 msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_ViewReply); 533
539 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || 534 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply);
540 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) 535 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
541 { 536 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
542 GNUNET_break (0); 537 {
543 return GNUNET_SYSERR; 538 GNUNET_break(0);
544 } 539 return GNUNET_SYSERR;
540 }
545 return GNUNET_OK; 541 return GNUNET_OK;
546} 542}
547 543
@@ -554,21 +550,21 @@ check_view_update (void *cls,
554 * @param msg the message 550 * @param msg the message
555 */ 551 */
556static void 552static void
557handle_view_update (void *cls, 553handle_view_update(void *cls,
558 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) 554 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
559{ 555{
560 struct GNUNET_RPS_Handle *h = cls; 556 struct GNUNET_RPS_Handle *h = cls;
561 struct GNUNET_PeerIdentity *peers; 557 struct GNUNET_PeerIdentity *peers;
562 558
563 /* Give the peers back */ 559 /* Give the peers back */
564 LOG (GNUNET_ERROR_TYPE_DEBUG, 560 LOG(GNUNET_ERROR_TYPE_DEBUG,
565 "New view of %" PRIu32 " peers:\n", 561 "New view of %" PRIu32 " peers:\n",
566 ntohl (msg->num_peers)); 562 ntohl(msg->num_peers));
567 563
568 peers = (struct GNUNET_PeerIdentity *) &msg[1]; 564 peers = (struct GNUNET_PeerIdentity *)&msg[1];
569 GNUNET_assert (NULL != h); 565 GNUNET_assert(NULL != h);
570 GNUNET_assert (NULL != h->view_update_cb); 566 GNUNET_assert(NULL != h->view_update_cb);
571 h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers); 567 h->view_update_cb(h->view_update_cls, ntohl(msg->num_peers), peers);
572} 568}
573 569
574 570
@@ -579,12 +575,12 @@ handle_view_update (void *cls,
579 * @param rps_handle The handle representing the service to the client 575 * @param rps_handle The handle representing the service to the client
580 */ 576 */
581static void 577static void
582cancel_stream (struct GNUNET_RPS_Handle *rps_handle) 578cancel_stream(struct GNUNET_RPS_Handle *rps_handle)
583{ 579{
584 struct GNUNET_MQ_Envelope *ev; 580 struct GNUNET_MQ_Envelope *ev;
585 581
586 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); 582 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
587 GNUNET_MQ_send (rps_handle->mq, ev); 583 GNUNET_MQ_send(rps_handle->mq, ev);
588} 584}
589 585
590 586
@@ -594,14 +590,14 @@ cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
594 * @param srh The request handle to cancel 590 * @param srh The request handle to cancel
595 */ 591 */
596void 592void
597GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) 593GNUNET_RPS_stream_cancel(struct GNUNET_RPS_StreamRequestHandle *srh)
598{ 594{
599 struct GNUNET_RPS_Handle *rps_handle; 595 struct GNUNET_RPS_Handle *rps_handle;
600 596
601 rps_handle = srh->rps_handle; 597 rps_handle = srh->rps_handle;
602 remove_stream_request (srh); 598 remove_stream_request(srh);
603 if (NULL == rps_handle->stream_requests_head) 599 if (NULL == rps_handle->stream_requests_head)
604 cancel_stream (rps_handle); 600 cancel_stream(rps_handle);
605} 601}
606 602
607 603
@@ -616,20 +612,21 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
616 * @param msg the message 612 * @param msg the message
617 */ 613 */
618static int 614static int
619check_stream_input (void *cls, 615check_stream_input(void *cls,
620 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 616 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
621{ 617{
622 uint16_t msize = ntohs (msg->header.size); 618 uint16_t msize = ntohs(msg->header.size);
623 uint32_t num_peers = ntohl (msg->num_peers); 619 uint32_t num_peers = ntohl(msg->num_peers);
624 (void) cls; 620
625 621 (void)cls;
626 msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply); 622
627 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || 623 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply);
628 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) 624 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
629 { 625 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
630 GNUNET_break (0); 626 {
631 return GNUNET_SYSERR; 627 GNUNET_break(0);
632 } 628 return GNUNET_SYSERR;
629 }
633 return GNUNET_OK; 630 return GNUNET_OK;
634} 631}
635 632
@@ -640,14 +637,14 @@ check_stream_input (void *cls,
640 * @param cls Stream request handle 637 * @param cls Stream request handle
641 */ 638 */
642static void 639static void
643srh_callback_scheduled (void *cls) 640srh_callback_scheduled(void *cls)
644{ 641{
645 struct GNUNET_RPS_StreamRequestHandle *srh = cls; 642 struct GNUNET_RPS_StreamRequestHandle *srh = cls;
646 643
647 srh->callback_task = NULL; 644 srh->callback_task = NULL;
648 srh->ready_cb (srh->ready_cb_cls, 645 srh->ready_cb(srh->ready_cb_cls,
649 srh_callback_num_peers, 646 srh_callback_num_peers,
650 srh_callback_peers); 647 srh_callback_peers);
651} 648}
652 649
653 650
@@ -660,8 +657,8 @@ srh_callback_scheduled (void *cls)
660 * @param msg the message 657 * @param msg the message
661 */ 658 */
662static void 659static void
663handle_stream_input (void *cls, 660handle_stream_input(void *cls,
664 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 661 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
665{ 662{
666 struct GNUNET_RPS_Handle *h = cls; 663 struct GNUNET_RPS_Handle *h = cls;
667 //const struct GNUNET_PeerIdentity *peers; 664 //const struct GNUNET_PeerIdentity *peers;
@@ -670,35 +667,35 @@ handle_stream_input (void *cls,
670 struct GNUNET_RPS_StreamRequestHandle *srh_next; 667 struct GNUNET_RPS_StreamRequestHandle *srh_next;
671 668
672 //peers = (struct GNUNET_PeerIdentity *) &msg[1]; 669 //peers = (struct GNUNET_PeerIdentity *) &msg[1];
673 num_peers = ntohl (msg->num_peers); 670 num_peers = ntohl(msg->num_peers);
674 srh_callback_num_peers = num_peers; 671 srh_callback_num_peers = num_peers;
675 GNUNET_free_non_null (srh_callback_peers); 672 GNUNET_free_non_null(srh_callback_peers);
676 srh_callback_peers = GNUNET_new_array (num_peers, 673 srh_callback_peers = GNUNET_new_array(num_peers,
677 struct GNUNET_PeerIdentity); 674 struct GNUNET_PeerIdentity);
678 GNUNET_memcpy (srh_callback_peers, 675 GNUNET_memcpy(srh_callback_peers,
679 &msg[1], 676 &msg[1],
680 num_peers * sizeof (struct GNUNET_PeerIdentity)); 677 num_peers * sizeof(struct GNUNET_PeerIdentity));
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 678 LOG(GNUNET_ERROR_TYPE_DEBUG,
682 "Received %" PRIu64 " peer(s) from stream input.\n", 679 "Received %" PRIu64 " peer(s) from stream input.\n",
683 num_peers); 680 num_peers);
684 for (srh_iter = h->stream_requests_head; 681 for (srh_iter = h->stream_requests_head;
685 NULL != srh_iter; 682 NULL != srh_iter;
686 srh_iter = srh_next) 683 srh_iter = srh_next)
687 { 684 {
688 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); 685 LOG(GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
689 /* Store next pointer - srh might be removed/freed in callback */ 686 /* Store next pointer - srh might be removed/freed in callback */
690 srh_next = srh_iter->next; 687 srh_next = srh_iter->next;
691 if (NULL != srh_iter->callback_task) 688 if (NULL != srh_iter->callback_task)
692 GNUNET_SCHEDULER_cancel (srh_iter->callback_task); 689 GNUNET_SCHEDULER_cancel(srh_iter->callback_task);
693 srh_iter->callback_task = 690 srh_iter->callback_task =
694 GNUNET_SCHEDULER_add_now (&srh_callback_scheduled, 691 GNUNET_SCHEDULER_add_now(&srh_callback_scheduled,
695 srh_iter); 692 srh_iter);
696 } 693 }
697 694
698 if (NULL == h->stream_requests_head) 695 if (NULL == h->stream_requests_head)
699 { 696 {
700 cancel_stream (h); 697 cancel_stream(h);
701 } 698 }
702} 699}
703 700
704 701
@@ -706,7 +703,7 @@ handle_stream_input (void *cls,
706 * Reconnect to the service 703 * Reconnect to the service
707 */ 704 */
708static void 705static void
709reconnect (struct GNUNET_RPS_Handle *h); 706reconnect(struct GNUNET_RPS_Handle *h);
710 707
711 708
712/** 709/**
@@ -719,18 +716,19 @@ reconnect (struct GNUNET_RPS_Handle *h);
719 * @param error error code without specyfied meaning 716 * @param error error code without specyfied meaning
720 */ 717 */
721static void 718static void
722mq_error_handler (void *cls, 719mq_error_handler(void *cls,
723 enum GNUNET_MQ_Error error) 720 enum GNUNET_MQ_Error error)
724{ 721{
725 struct GNUNET_RPS_Handle *h = cls; 722 struct GNUNET_RPS_Handle *h = cls;
723
726 //TODO LOG 724 //TODO LOG
727 LOG (GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\ 725 LOG(GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
728 1: READ,\n\ 726 1: READ,\n\
729 2: WRITE,\n\ 727 2: WRITE,\n\
730 4: TIMEOUT\n", 728 4: TIMEOUT\n",
731 // TODO: write GNUNET_MQ_strerror (error) 729 // TODO: write GNUNET_MQ_strerror (error)
732 error); 730 error);
733 reconnect (h); 731 reconnect(h);
734 /* Resend all pending request as the service destroyed its knowledge 732 /* Resend all pending request as the service destroyed its knowledge
735 * about them */ 733 * about them */
736} 734}
@@ -744,16 +742,16 @@ mq_error_handler (void *cls,
744 * @param hash[out] Pointer to the location in which the hash will be stored. 742 * @param hash[out] Pointer to the location in which the hash will be stored.
745 */ 743 */
746static void 744static void
747hash_from_share_val (const char *share_val, 745hash_from_share_val(const char *share_val,
748 struct GNUNET_HashCode *hash) 746 struct GNUNET_HashCode *hash)
749{ 747{
750 GNUNET_CRYPTO_kdf (hash, 748 GNUNET_CRYPTO_kdf(hash,
751 sizeof (struct GNUNET_HashCode), 749 sizeof(struct GNUNET_HashCode),
752 "rps", 750 "rps",
753 strlen ("rps"), 751 strlen("rps"),
754 share_val, 752 share_val,
755 strlen (share_val), 753 strlen(share_val),
756 NULL, 0); 754 NULL, 0);
757} 755}
758 756
759 757
@@ -769,29 +767,30 @@ hash_from_share_val (const char *share_val,
769 * @param std_dev the standard distribution 767 * @param std_dev the standard distribution
770 */ 768 */
771static void 769static void
772nse_cb (void *cls, 770nse_cb(void *cls,
773 struct GNUNET_TIME_Absolute timestamp, 771 struct GNUNET_TIME_Absolute timestamp,
774 double logestimate, 772 double logestimate,
775 double std_dev) 773 double std_dev)
776{ 774{
777 struct GNUNET_RPS_Handle *h = cls; 775 struct GNUNET_RPS_Handle *h = cls;
778 (void) timestamp; 776
779 (void) std_dev; 777 (void)timestamp;
778 (void)std_dev;
780 779
781 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; 780 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
782 NULL != rh_iter && NULL != rh_iter->next; 781 NULL != rh_iter && NULL != rh_iter->next;
783 rh_iter = rh_iter->next) 782 rh_iter = rh_iter->next)
784 { 783 {
785 RPS_sampler_update_with_nw_size (rh_iter->sampler, 784 RPS_sampler_update_with_nw_size(rh_iter->sampler,
786 GNUNET_NSE_log_estimate_to_n (logestimate)); 785 GNUNET_NSE_log_estimate_to_n(logestimate));
787 } 786 }
788 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; 787 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
789 NULL != rhs_iter && NULL != rhs_iter->next; 788 NULL != rhs_iter && NULL != rhs_iter->next;
790 rhs_iter = rhs_iter->next) 789 rhs_iter = rhs_iter->next)
791 { 790 {
792 RPS_sampler_update_with_nw_size (rhs_iter->sampler, 791 RPS_sampler_update_with_nw_size(rhs_iter->sampler,
793 GNUNET_NSE_log_estimate_to_n (logestimate)); 792 GNUNET_NSE_log_estimate_to_n(logestimate));
794 } 793 }
795} 794}
796 795
797 796
@@ -799,30 +798,30 @@ nse_cb (void *cls,
799 * Reconnect to the service 798 * Reconnect to the service
800 */ 799 */
801static void 800static void
802reconnect (struct GNUNET_RPS_Handle *h) 801reconnect(struct GNUNET_RPS_Handle *h)
803{ 802{
804 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 803 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
805 GNUNET_MQ_hd_var_size (view_update, 804 GNUNET_MQ_hd_var_size(view_update,
806 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 805 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
807 struct GNUNET_RPS_CS_DEBUG_ViewReply, 806 struct GNUNET_RPS_CS_DEBUG_ViewReply,
808 h), 807 h),
809 GNUNET_MQ_hd_var_size (stream_input, 808 GNUNET_MQ_hd_var_size(stream_input,
810 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, 809 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
811 struct GNUNET_RPS_CS_DEBUG_StreamReply, 810 struct GNUNET_RPS_CS_DEBUG_StreamReply,
812 h), 811 h),
813 GNUNET_MQ_handler_end () 812 GNUNET_MQ_handler_end()
814 }; 813 };
815 814
816 if (NULL != h->mq) 815 if (NULL != h->mq)
817 GNUNET_MQ_destroy (h->mq); 816 GNUNET_MQ_destroy(h->mq);
818 h->mq = GNUNET_CLIENT_connect (h->cfg, 817 h->mq = GNUNET_CLIENT_connect(h->cfg,
819 "rps", 818 "rps",
820 mq_handlers, 819 mq_handlers,
821 &mq_error_handler, 820 &mq_error_handler,
822 h); 821 h);
823 if (NULL != h->nse) 822 if (NULL != h->nse)
824 GNUNET_NSE_disconnect (h->nse); 823 GNUNET_NSE_disconnect(h->nse);
825 h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h); 824 h->nse = GNUNET_NSE_connect(h->cfg, &nse_cb, h);
826} 825}
827 826
828 827
@@ -833,56 +832,56 @@ reconnect (struct GNUNET_RPS_Handle *h)
833 * @return a handle to the service, NULL on error 832 * @return a handle to the service, NULL on error
834 */ 833 */
835struct GNUNET_RPS_Handle * 834struct GNUNET_RPS_Handle *
836GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 835GNUNET_RPS_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
837{ 836{
838 struct GNUNET_RPS_Handle *h; 837 struct GNUNET_RPS_Handle *h;
839 838
840 h = GNUNET_new (struct GNUNET_RPS_Handle); 839 h = GNUNET_new(struct GNUNET_RPS_Handle);
841 h->cfg = cfg; 840 h->cfg = cfg;
842 if (GNUNET_OK != 841 if (GNUNET_OK !=
843 GNUNET_CONFIGURATION_get_value_float (cfg, 842 GNUNET_CONFIGURATION_get_value_float(cfg,
844 "RPS", 843 "RPS",
845 "DESIRED_PROBABILITY", 844 "DESIRED_PROBABILITY",
846 &h->desired_probability)) 845 &h->desired_probability))
847 { 846 {
848 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 847 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
849 "RPS", "DESIRED_PROBABILITY"); 848 "RPS", "DESIRED_PROBABILITY");
850 GNUNET_free (h); 849 GNUNET_free(h);
851 return NULL; 850 return NULL;
852 } 851 }
853 if (0 > h->desired_probability || 852 if (0 > h->desired_probability ||
854 1 < h->desired_probability) 853 1 < h->desired_probability)
855 { 854 {
856 LOG (GNUNET_ERROR_TYPE_ERROR, 855 LOG(GNUNET_ERROR_TYPE_ERROR,
857 "The desired probability must be in the interval [0;1]\n"); 856 "The desired probability must be in the interval [0;1]\n");
858 GNUNET_free (h); 857 GNUNET_free(h);
859 return NULL; 858 return NULL;
860 } 859 }
861 if (GNUNET_OK != 860 if (GNUNET_OK !=
862 GNUNET_CONFIGURATION_get_value_float (cfg, 861 GNUNET_CONFIGURATION_get_value_float(cfg,
863 "RPS", 862 "RPS",
864 "DEFICIENCY_FACTOR", 863 "DEFICIENCY_FACTOR",
865 &h->deficiency_factor)) 864 &h->deficiency_factor))
866 { 865 {
867 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 866 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
868 "RPS", "DEFICIENCY_FACTOR"); 867 "RPS", "DEFICIENCY_FACTOR");
869 GNUNET_free (h); 868 GNUNET_free(h);
870 return NULL; 869 return NULL;
871 } 870 }
872 if (0 > h->desired_probability || 871 if (0 > h->desired_probability ||
873 1 < h->desired_probability) 872 1 < h->desired_probability)
874 { 873 {
875 LOG (GNUNET_ERROR_TYPE_ERROR, 874 LOG(GNUNET_ERROR_TYPE_ERROR,
876 "The deficiency factor must be in the interval [0;1]\n"); 875 "The deficiency factor must be in the interval [0;1]\n");
877 GNUNET_free (h); 876 GNUNET_free(h);
878 return NULL; 877 return NULL;
879 } 878 }
880 reconnect (h); 879 reconnect(h);
881 if (NULL == h->mq) 880 if (NULL == h->mq)
882 { 881 {
883 GNUNET_free (h); 882 GNUNET_free(h);
884 return NULL; 883 return NULL;
885 } 884 }
886 return h; 885 return h;
887} 886}
888 887
@@ -894,19 +893,19 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
894 * @param shared_value The shared value that defines the members of the sub (-gorup) 893 * @param shared_value The shared value that defines the members of the sub (-gorup)
895 */ 894 */
896void 895void
897GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h, 896GNUNET_RPS_sub_start(struct GNUNET_RPS_Handle *h,
898 const char *shared_value) 897 const char *shared_value)
899{ 898{
900 struct GNUNET_RPS_CS_SubStartMessage *msg; 899 struct GNUNET_RPS_CS_SubStartMessage *msg;
901 struct GNUNET_MQ_Envelope *ev; 900 struct GNUNET_MQ_Envelope *ev;
902 901
903 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START); 902 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
904 hash_from_share_val (shared_value, &msg->hash); 903 hash_from_share_val(shared_value, &msg->hash);
905 msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config! 904 msg->round_interval = GNUNET_TIME_relative_hton( // TODO read from config!
906 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)); 905 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30));
907 GNUNET_assert (0 != msg->round_interval.rel_value_us__); 906 GNUNET_assert(0 != msg->round_interval.rel_value_us__);
908 907
909 GNUNET_MQ_send (h->mq, ev); 908 GNUNET_MQ_send(h->mq, ev);
910} 909}
911 910
912 911
@@ -917,16 +916,16 @@ GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
917 * @param shared_value The shared value that defines the members of the sub (-gorup) 916 * @param shared_value The shared value that defines the members of the sub (-gorup)
918 */ 917 */
919void 918void
920GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h, 919GNUNET_RPS_sub_stop(struct GNUNET_RPS_Handle *h,
921 const char *shared_value) 920 const char *shared_value)
922{ 921{
923 struct GNUNET_RPS_CS_SubStopMessage *msg; 922 struct GNUNET_RPS_CS_SubStopMessage *msg;
924 struct GNUNET_MQ_Envelope *ev; 923 struct GNUNET_MQ_Envelope *ev;
925 924
926 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP); 925 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
927 hash_from_share_val (shared_value, &msg->hash); 926 hash_from_share_val(shared_value, &msg->hash);
928 927
929 GNUNET_MQ_send (h->mq, ev); 928 GNUNET_MQ_send(h->mq, ev);
930} 929}
931 930
932 931
@@ -940,37 +939,37 @@ GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
940 * @return a handle to cancel this request 939 * @return a handle to cancel this request
941 */ 940 */
942struct GNUNET_RPS_Request_Handle * 941struct GNUNET_RPS_Request_Handle *
943GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, 942GNUNET_RPS_request_peers(struct GNUNET_RPS_Handle *rps_handle,
944 uint32_t num_req_peers, 943 uint32_t num_req_peers,
945 GNUNET_RPS_NotifyReadyCB ready_cb, 944 GNUNET_RPS_NotifyReadyCB ready_cb,
946 void *cls) 945 void *cls)
947{ 946{
948 struct GNUNET_RPS_Request_Handle *rh; 947 struct GNUNET_RPS_Request_Handle *rh;
949 948
950 LOG (GNUNET_ERROR_TYPE_INFO, 949 LOG(GNUNET_ERROR_TYPE_INFO,
951 "Client requested %" PRIu32 " peers\n", 950 "Client requested %" PRIu32 " peers\n",
952 num_req_peers); 951 num_req_peers);
953 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); 952 rh = GNUNET_new(struct GNUNET_RPS_Request_Handle);
954 rh->rps_handle = rps_handle; 953 rh->rps_handle = rps_handle;
955 rh->num_requests = num_req_peers; 954 rh->num_requests = num_req_peers;
956 rh->sampler = RPS_sampler_mod_init (num_req_peers, 955 rh->sampler = RPS_sampler_mod_init(num_req_peers,
957 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff 956 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
958 RPS_sampler_set_desired_probability (rh->sampler, 957 RPS_sampler_set_desired_probability(rh->sampler,
959 rps_handle->desired_probability); 958 rps_handle->desired_probability);
960 RPS_sampler_set_deficiency_factor (rh->sampler, 959 RPS_sampler_set_deficiency_factor(rh->sampler,
961 rps_handle->deficiency_factor); 960 rps_handle->deficiency_factor);
962 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, 961 rh->sampler_rh = RPS_sampler_get_n_rand_peers(rh->sampler,
963 num_req_peers, 962 num_req_peers,
964 peers_ready_cb, 963 peers_ready_cb,
965 rh); 964 rh);
966 rh->srh = GNUNET_RPS_stream_request (rps_handle, 965 rh->srh = GNUNET_RPS_stream_request(rps_handle,
967 collect_peers_cb, 966 collect_peers_cb,
968 rh); /* cls */ 967 rh); /* cls */
969 rh->ready_cb = ready_cb; 968 rh->ready_cb = ready_cb;
970 rh->ready_cb_cls = cls; 969 rh->ready_cb_cls = cls;
971 GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head, 970 GNUNET_CONTAINER_DLL_insert(rps_handle->rh_head,
972 rps_handle->rh_tail, 971 rps_handle->rh_tail,
973 rh); 972 rh);
974 973
975 return rh; 974 return rh;
976} 975}
@@ -985,34 +984,34 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
985 * @return a handle to cancel this request 984 * @return a handle to cancel this request
986 */ 985 */
987struct GNUNET_RPS_Request_Handle_Single_Info * 986struct GNUNET_RPS_Request_Handle_Single_Info *
988GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle, 987GNUNET_RPS_request_peer_info(struct GNUNET_RPS_Handle *rps_handle,
989 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, 988 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
990 void *cls) 989 void *cls)
991{ 990{
992 struct GNUNET_RPS_Request_Handle_Single_Info *rhs; 991 struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
993 uint32_t num_req_peers = 1; 992 uint32_t num_req_peers = 1;
994 993
995 LOG (GNUNET_ERROR_TYPE_INFO, 994 LOG(GNUNET_ERROR_TYPE_INFO,
996 "Client requested peer with additional info\n"); 995 "Client requested peer with additional info\n");
997 rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info); 996 rhs = GNUNET_new(struct GNUNET_RPS_Request_Handle_Single_Info);
998 rhs->rps_handle = rps_handle; 997 rhs->rps_handle = rps_handle;
999 rhs->sampler = RPS_sampler_mod_init (num_req_peers, 998 rhs->sampler = RPS_sampler_mod_init(num_req_peers,
1000 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff 999 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1001 RPS_sampler_set_desired_probability (rhs->sampler, 1000 RPS_sampler_set_desired_probability(rhs->sampler,
1002 rps_handle->desired_probability); 1001 rps_handle->desired_probability);
1003 RPS_sampler_set_deficiency_factor (rhs->sampler, 1002 RPS_sampler_set_deficiency_factor(rhs->sampler,
1004 rps_handle->deficiency_factor); 1003 rps_handle->deficiency_factor);
1005 rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler, 1004 rhs->sampler_rh = RPS_sampler_get_rand_peer_info(rhs->sampler,
1006 peer_info_ready_cb, 1005 peer_info_ready_cb,
1007 rhs); 1006 rhs);
1008 rhs->srh = GNUNET_RPS_stream_request (rps_handle, 1007 rhs->srh = GNUNET_RPS_stream_request(rps_handle,
1009 collect_peers_info_cb, 1008 collect_peers_info_cb,
1010 rhs); /* cls */ 1009 rhs); /* cls */
1011 rhs->ready_cb = ready_cb; 1010 rhs->ready_cb = ready_cb;
1012 rhs->ready_cb_cls = cls; 1011 rhs->ready_cb_cls = cls;
1013 GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head, 1012 GNUNET_CONTAINER_DLL_insert(rps_handle->rhs_head,
1014 rps_handle->rhs_tail, 1013 rps_handle->rhs_tail,
1015 rhs); 1014 rhs);
1016 1015
1017 return rhs; 1016 return rhs;
1018} 1017}
@@ -1026,9 +1025,9 @@ GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
1026 * @param ids the ids of the peers seeded 1025 * @param ids the ids of the peers seeded
1027 */ 1026 */
1028void 1027void
1029GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h, 1028GNUNET_RPS_seed_ids(struct GNUNET_RPS_Handle *h,
1030 uint32_t n, 1029 uint32_t n,
1031 const struct GNUNET_PeerIdentity *ids) 1030 const struct GNUNET_PeerIdentity *ids)
1032{ 1031{
1033 size_t size_needed; 1032 size_t size_needed;
1034 uint32_t num_peers_max; 1033 uint32_t num_peers_max;
@@ -1036,52 +1035,52 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
1036 struct GNUNET_MQ_Envelope *ev; 1035 struct GNUNET_MQ_Envelope *ev;
1037 struct GNUNET_RPS_CS_SeedMessage *msg; 1036 struct GNUNET_RPS_CS_SeedMessage *msg;
1038 1037
1039 LOG (GNUNET_ERROR_TYPE_DEBUG, 1038 LOG(GNUNET_ERROR_TYPE_DEBUG,
1040 "Client wants to seed %" PRIu32 " peers:\n", 1039 "Client wants to seed %" PRIu32 " peers:\n",
1041 n); 1040 n);
1042 for (unsigned int i = 0 ; i < n ; i++) 1041 for (unsigned int i = 0; i < n; i++)
1043 LOG (GNUNET_ERROR_TYPE_DEBUG, 1042 LOG(GNUNET_ERROR_TYPE_DEBUG,
1044 "%u. peer: %s\n", 1043 "%u. peer: %s\n",
1045 i, 1044 i,
1046 GNUNET_i2s (&ids[i])); 1045 GNUNET_i2s(&ids[i]));
1047 1046
1048 /* The actual size the message occupies */ 1047 /* The actual size the message occupies */
1049 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) + 1048 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1050 n * sizeof (struct GNUNET_PeerIdentity); 1049 n * sizeof(struct GNUNET_PeerIdentity);
1051 /* The number of peers that fits in one message together with 1050 /* The number of peers that fits in one message together with
1052 * the respective header */ 1051 * the respective header */
1053 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - 1052 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1054 sizeof (struct GNUNET_RPS_CS_SeedMessage)) / 1053 sizeof(struct GNUNET_RPS_CS_SeedMessage)) /
1055 sizeof (struct GNUNET_PeerIdentity); 1054 sizeof(struct GNUNET_PeerIdentity);
1056 tmp_peer_pointer = ids; 1055 tmp_peer_pointer = ids;
1057 1056
1058 while (GNUNET_MAX_MESSAGE_SIZE < size_needed) 1057 while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1059 { 1058 {
1060 ev = GNUNET_MQ_msg_extra (msg, 1059 ev = GNUNET_MQ_msg_extra(msg,
1061 num_peers_max * sizeof (struct GNUNET_PeerIdentity), 1060 num_peers_max * sizeof(struct GNUNET_PeerIdentity),
1062 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 1061 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1063 msg->num_peers = htonl (num_peers_max); 1062 msg->num_peers = htonl(num_peers_max);
1064 GNUNET_memcpy (&msg[1], 1063 GNUNET_memcpy(&msg[1],
1065 tmp_peer_pointer, 1064 tmp_peer_pointer,
1066 num_peers_max * sizeof (struct GNUNET_PeerIdentity)); 1065 num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1067 GNUNET_MQ_send (h->mq, 1066 GNUNET_MQ_send(h->mq,
1068 ev); 1067 ev);
1069 n -= num_peers_max; 1068 n -= num_peers_max;
1070 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) + 1069 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1071 n * sizeof (struct GNUNET_PeerIdentity); 1070 n * sizeof(struct GNUNET_PeerIdentity);
1072 /* Set pointer to beginning of next block of num_peers_max peers */ 1071 /* Set pointer to beginning of next block of num_peers_max peers */
1073 tmp_peer_pointer = &ids[num_peers_max]; 1072 tmp_peer_pointer = &ids[num_peers_max];
1074 } 1073 }
1075 1074
1076 ev = GNUNET_MQ_msg_extra (msg, 1075 ev = GNUNET_MQ_msg_extra(msg,
1077 n * sizeof (struct GNUNET_PeerIdentity), 1076 n * sizeof(struct GNUNET_PeerIdentity),
1078 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 1077 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1079 msg->num_peers = htonl (n); 1078 msg->num_peers = htonl(n);
1080 GNUNET_memcpy (&msg[1], 1079 GNUNET_memcpy(&msg[1],
1081 tmp_peer_pointer, 1080 tmp_peer_pointer,
1082 n * sizeof (struct GNUNET_PeerIdentity)); 1081 n * sizeof(struct GNUNET_PeerIdentity));
1083 GNUNET_MQ_send (h->mq, 1082 GNUNET_MQ_send(h->mq,
1084 ev); 1083 ev);
1085} 1084}
1086 1085
1087 1086
@@ -1101,11 +1100,11 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
1101 * peer to be isolated from the rest 1100 * peer to be isolated from the rest
1102 */ 1101 */
1103void 1102void
1104GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, 1103GNUNET_RPS_act_malicious(struct GNUNET_RPS_Handle *h,
1105 uint32_t type, 1104 uint32_t type,
1106 uint32_t num_peers, 1105 uint32_t num_peers,
1107 const struct GNUNET_PeerIdentity *peer_ids, 1106 const struct GNUNET_PeerIdentity *peer_ids,
1108 const struct GNUNET_PeerIdentity *target_peer) 1107 const struct GNUNET_PeerIdentity *target_peer)
1109{ 1108{
1110 size_t size_needed; 1109 size_t size_needed;
1111 uint32_t num_peers_max; 1110 uint32_t num_peers_max;
@@ -1115,65 +1114,65 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
1115 1114
1116 unsigned int i; 1115 unsigned int i;
1117 1116
1118 LOG (GNUNET_ERROR_TYPE_DEBUG, 1117 LOG(GNUNET_ERROR_TYPE_DEBUG,
1119 "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n", 1118 "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
1120 type, 1119 type,
1121 num_peers); 1120 num_peers);
1122 for (i = 0 ; i < num_peers ; i++) 1121 for (i = 0; i < num_peers; i++)
1123 LOG (GNUNET_ERROR_TYPE_DEBUG, 1122 LOG(GNUNET_ERROR_TYPE_DEBUG,
1124 "%u. peer: %s\n", 1123 "%u. peer: %s\n",
1125 i, 1124 i,
1126 GNUNET_i2s (&peer_ids[i])); 1125 GNUNET_i2s(&peer_ids[i]));
1127 1126
1128 /* The actual size the message would occupy */ 1127 /* The actual size the message would occupy */
1129 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) + 1128 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1130 num_peers * sizeof (struct GNUNET_PeerIdentity); 1129 num_peers * sizeof(struct GNUNET_PeerIdentity);
1131 /* The number of peers that fit in one message together with 1130 /* The number of peers that fit in one message together with
1132 * the respective header */ 1131 * the respective header */
1133 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - 1132 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1134 sizeof (struct GNUNET_RPS_CS_SeedMessage)) / 1133 sizeof(struct GNUNET_RPS_CS_SeedMessage)) /
1135 sizeof (struct GNUNET_PeerIdentity); 1134 sizeof(struct GNUNET_PeerIdentity);
1136 tmp_peer_pointer = peer_ids; 1135 tmp_peer_pointer = peer_ids;
1137 1136
1138 while (GNUNET_MAX_MESSAGE_SIZE < size_needed) 1137 while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1139 { 1138 {
1140 LOG (GNUNET_ERROR_TYPE_DEBUG, 1139 LOG(GNUNET_ERROR_TYPE_DEBUG,
1141 "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n", 1140 "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
1142 num_peers_max); 1141 num_peers_max);
1143 ev = GNUNET_MQ_msg_extra (msg, 1142 ev = GNUNET_MQ_msg_extra(msg,
1144 num_peers_max * sizeof (struct GNUNET_PeerIdentity), 1143 num_peers_max * sizeof(struct GNUNET_PeerIdentity),
1145 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 1144 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1146 msg->type = htonl (type); 1145 msg->type = htonl(type);
1147 msg->num_peers = htonl (num_peers_max); 1146 msg->num_peers = htonl(num_peers_max);
1148 if ( (2 == type) || 1147 if ((2 == type) ||
1149 (3 == type) ) 1148 (3 == type))
1150 msg->attacked_peer = peer_ids[num_peers]; 1149 msg->attacked_peer = peer_ids[num_peers];
1151 GNUNET_memcpy (&msg[1], 1150 GNUNET_memcpy(&msg[1],
1152 tmp_peer_pointer, 1151 tmp_peer_pointer,
1153 num_peers_max * sizeof (struct GNUNET_PeerIdentity)); 1152 num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1154 1153
1155 GNUNET_MQ_send (h->mq, ev); 1154 GNUNET_MQ_send(h->mq, ev);
1156 1155
1157 num_peers -= num_peers_max; 1156 num_peers -= num_peers_max;
1158 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) + 1157 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1159 num_peers * sizeof (struct GNUNET_PeerIdentity); 1158 num_peers * sizeof(struct GNUNET_PeerIdentity);
1160 /* Set pointer to beginning of next block of num_peers_max peers */ 1159 /* Set pointer to beginning of next block of num_peers_max peers */
1161 tmp_peer_pointer = &peer_ids[num_peers_max]; 1160 tmp_peer_pointer = &peer_ids[num_peers_max];
1162 } 1161 }
1163 1162
1164 ev = GNUNET_MQ_msg_extra (msg, 1163 ev = GNUNET_MQ_msg_extra(msg,
1165 num_peers * sizeof (struct GNUNET_PeerIdentity), 1164 num_peers * sizeof(struct GNUNET_PeerIdentity),
1166 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 1165 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1167 msg->type = htonl (type); 1166 msg->type = htonl(type);
1168 msg->num_peers = htonl (num_peers); 1167 msg->num_peers = htonl(num_peers);
1169 if ( (2 == type) || 1168 if ((2 == type) ||
1170 (3 == type) ) 1169 (3 == type))
1171 msg->attacked_peer = *target_peer; 1170 msg->attacked_peer = *target_peer;
1172 GNUNET_memcpy (&msg[1], 1171 GNUNET_memcpy(&msg[1],
1173 tmp_peer_pointer, 1172 tmp_peer_pointer,
1174 num_peers * sizeof (struct GNUNET_PeerIdentity)); 1173 num_peers * sizeof(struct GNUNET_PeerIdentity));
1175 1174
1176 GNUNET_MQ_send (h->mq, ev); 1175 GNUNET_MQ_send(h->mq, ev);
1177} 1176}
1178#endif /* ENABLE_MALICIOUS */ 1177#endif /* ENABLE_MALICIOUS */
1179 1178
@@ -1184,27 +1183,28 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
1184 * @param rh request handle of request to cancle 1183 * @param rh request handle of request to cancle
1185 */ 1184 */
1186void 1185void
1187GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) 1186GNUNET_RPS_request_cancel(struct GNUNET_RPS_Request_Handle *rh)
1188{ 1187{
1189 struct GNUNET_RPS_Handle *h; 1188 struct GNUNET_RPS_Handle *h;
1190 1189
1191 h = rh->rps_handle; 1190 h = rh->rps_handle;
1192 GNUNET_assert (NULL != rh); 1191 GNUNET_assert(NULL != rh);
1193 GNUNET_assert (NULL != rh->srh); 1192 GNUNET_assert(NULL != rh->srh);
1194 GNUNET_assert (h == rh->srh->rps_handle); 1193 GNUNET_assert(h == rh->srh->rps_handle);
1195 GNUNET_RPS_stream_cancel (rh->srh); 1194 GNUNET_RPS_stream_cancel(rh->srh);
1196 rh->srh = NULL; 1195 rh->srh = NULL;
1197 if (NULL == h->stream_requests_head) cancel_stream(h); 1196 if (NULL == h->stream_requests_head)
1197 cancel_stream(h);
1198 if (NULL != rh->sampler_rh) 1198 if (NULL != rh->sampler_rh)
1199 { 1199 {
1200 RPS_sampler_request_cancel (rh->sampler_rh); 1200 RPS_sampler_request_cancel(rh->sampler_rh);
1201 } 1201 }
1202 RPS_sampler_destroy (rh->sampler); 1202 RPS_sampler_destroy(rh->sampler);
1203 rh->sampler = NULL; 1203 rh->sampler = NULL;
1204 GNUNET_CONTAINER_DLL_remove (h->rh_head, 1204 GNUNET_CONTAINER_DLL_remove(h->rh_head,
1205 h->rh_tail, 1205 h->rh_tail,
1206 rh); 1206 rh);
1207 GNUNET_free (rh); 1207 GNUNET_free(rh);
1208} 1208}
1209 1209
1210 1210
@@ -1214,28 +1214,29 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1214 * @param rhs request handle of request to cancle 1214 * @param rhs request handle of request to cancle
1215 */ 1215 */
1216void 1216void
1217GNUNET_RPS_request_single_info_cancel ( 1217GNUNET_RPS_request_single_info_cancel(
1218 struct GNUNET_RPS_Request_Handle_Single_Info *rhs) 1218 struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1219{ 1219{
1220 struct GNUNET_RPS_Handle *h; 1220 struct GNUNET_RPS_Handle *h;
1221 1221
1222 h = rhs->rps_handle; 1222 h = rhs->rps_handle;
1223 GNUNET_assert (NULL != rhs); 1223 GNUNET_assert(NULL != rhs);
1224 GNUNET_assert (NULL != rhs->srh); 1224 GNUNET_assert(NULL != rhs->srh);
1225 GNUNET_assert (h == rhs->srh->rps_handle); 1225 GNUNET_assert(h == rhs->srh->rps_handle);
1226 GNUNET_RPS_stream_cancel (rhs->srh); 1226 GNUNET_RPS_stream_cancel(rhs->srh);
1227 rhs->srh = NULL; 1227 rhs->srh = NULL;
1228 if (NULL == h->stream_requests_head) cancel_stream(h); 1228 if (NULL == h->stream_requests_head)
1229 cancel_stream(h);
1229 if (NULL != rhs->sampler_rh) 1230 if (NULL != rhs->sampler_rh)
1230 { 1231 {
1231 RPS_sampler_request_single_info_cancel (rhs->sampler_rh); 1232 RPS_sampler_request_single_info_cancel(rhs->sampler_rh);
1232 } 1233 }
1233 RPS_sampler_destroy (rhs->sampler); 1234 RPS_sampler_destroy(rhs->sampler);
1234 rhs->sampler = NULL; 1235 rhs->sampler = NULL;
1235 GNUNET_CONTAINER_DLL_remove (h->rhs_head, 1236 GNUNET_CONTAINER_DLL_remove(h->rhs_head,
1236 h->rhs_tail, 1237 h->rhs_tail,
1237 rhs); 1238 rhs);
1238 GNUNET_free (rhs); 1239 GNUNET_free(rhs);
1239} 1240}
1240 1241
1241 1242
@@ -1245,59 +1246,59 @@ GNUNET_RPS_request_single_info_cancel (
1245 * @param h the handle to the rps service 1246 * @param h the handle to the rps service
1246 */ 1247 */
1247void 1248void
1248GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) 1249GNUNET_RPS_disconnect(struct GNUNET_RPS_Handle *h)
1249{ 1250{
1250 if (NULL != h->stream_requests_head) 1251 if (NULL != h->stream_requests_head)
1251 {
1252 struct GNUNET_RPS_StreamRequestHandle *srh_next;
1253
1254 LOG (GNUNET_ERROR_TYPE_WARNING,
1255 "Still waiting for replies\n");
1256 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
1257 NULL != srh_iter;
1258 srh_iter = srh_next)
1259 { 1252 {
1260 srh_next = srh_iter->next; 1253 struct GNUNET_RPS_StreamRequestHandle *srh_next;
1261 GNUNET_RPS_stream_cancel (srh_iter); 1254
1255 LOG(GNUNET_ERROR_TYPE_WARNING,
1256 "Still waiting for replies\n");
1257 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
1258 NULL != srh_iter;
1259 srh_iter = srh_next)
1260 {
1261 srh_next = srh_iter->next;
1262 GNUNET_RPS_stream_cancel(srh_iter);
1263 }
1262 } 1264 }
1263 }
1264 if (NULL != h->rh_head) 1265 if (NULL != h->rh_head)
1265 {
1266 LOG (GNUNET_ERROR_TYPE_WARNING,
1267 "Not all requests were cancelled!\n");
1268 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1269 h->rh_head != NULL;
1270 rh_iter = h->rh_head)
1271 { 1266 {
1272 GNUNET_RPS_request_cancel (rh_iter); 1267 LOG(GNUNET_ERROR_TYPE_WARNING,
1268 "Not all requests were cancelled!\n");
1269 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1270 h->rh_head != NULL;
1271 rh_iter = h->rh_head)
1272 {
1273 GNUNET_RPS_request_cancel(rh_iter);
1274 }
1273 } 1275 }
1274 }
1275 if (NULL != h->rhs_head) 1276 if (NULL != h->rhs_head)
1276 {
1277 LOG (GNUNET_ERROR_TYPE_WARNING,
1278 "Not all requests were cancelled!\n");
1279 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1280 h->rhs_head != NULL;
1281 rhs_iter = h->rhs_head)
1282 { 1277 {
1283 GNUNET_RPS_request_single_info_cancel (rhs_iter); 1278 LOG(GNUNET_ERROR_TYPE_WARNING,
1279 "Not all requests were cancelled!\n");
1280 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1281 h->rhs_head != NULL;
1282 rhs_iter = h->rhs_head)
1283 {
1284 GNUNET_RPS_request_single_info_cancel(rhs_iter);
1285 }
1284 } 1286 }
1285 }
1286 if (NULL != srh_callback_peers) 1287 if (NULL != srh_callback_peers)
1287 { 1288 {
1288 GNUNET_free (srh_callback_peers); 1289 GNUNET_free(srh_callback_peers);
1289 srh_callback_peers = NULL; 1290 srh_callback_peers = NULL;
1290 } 1291 }
1291 if (NULL != h->view_update_cb) 1292 if (NULL != h->view_update_cb)
1292 { 1293 {
1293 LOG (GNUNET_ERROR_TYPE_WARNING, 1294 LOG(GNUNET_ERROR_TYPE_WARNING,
1294 "Still waiting for view updates\n"); 1295 "Still waiting for view updates\n");
1295 GNUNET_RPS_view_request_cancel (h); 1296 GNUNET_RPS_view_request_cancel(h);
1296 } 1297 }
1297 if (NULL != h->nse) 1298 if (NULL != h->nse)
1298 GNUNET_NSE_disconnect (h->nse); 1299 GNUNET_NSE_disconnect(h->nse);
1299 GNUNET_MQ_destroy (h->mq); 1300 GNUNET_MQ_destroy(h->mq);
1300 GNUNET_free (h); 1301 GNUNET_free(h);
1301} 1302}
1302 1303
1303 1304