aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c925
1 files changed, 469 insertions, 456 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 2b54297c3..19cbdcf8a 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -33,12 +33,13 @@
33 33
34#include <inttypes.h> 34#include <inttypes.h>
35 35
36#define LOG(kind, ...) GNUNET_log_from(kind, "rps-api", __VA_ARGS__) 36#define LOG(kind, ...) GNUNET_log_from (kind, "rps-api", __VA_ARGS__)
37 37
38/** 38/**
39 * Handle for a request to get peers from biased stream of ids 39 * Handle for a request to get peers from biased stream of ids
40 */ 40 */
41struct GNUNET_RPS_StreamRequestHandle { 41struct GNUNET_RPS_StreamRequestHandle
42{
42 /** 43 /**
43 * The client issuing the request. 44 * The client issuing the request.
44 */ 45 */
@@ -74,7 +75,8 @@ struct GNUNET_RPS_StreamRequestHandle {
74/** 75/**
75 * Handler to handle requests from a client. 76 * Handler to handle requests from a client.
76 */ 77 */
77struct GNUNET_RPS_Handle { 78struct GNUNET_RPS_Handle
79{
78 /** 80 /**
79 * The handle to the client configuration. 81 * The handle to the client configuration.
80 */ 82 */
@@ -154,7 +156,8 @@ struct GNUNET_RPS_Handle {
154/** 156/**
155 * Handler for a single request from a client. 157 * Handler for a single request from a client.
156 */ 158 */
157struct GNUNET_RPS_Request_Handle { 159struct GNUNET_RPS_Request_Handle
160{
158 /** 161 /**
159 * The client issuing the request. 162 * The client issuing the request.
160 */ 163 */
@@ -206,7 +209,8 @@ struct GNUNET_RPS_Request_Handle {
206/** 209/**
207 * Handler for a single request from a client. 210 * Handler for a single request from a client.
208 */ 211 */
209struct GNUNET_RPS_Request_Handle_Single_Info { 212struct GNUNET_RPS_Request_Handle_Single_Info
213{
210 /** 214 /**
211 * The client issuing the request. 215 * The client issuing the request.
212 */ 216 */
@@ -254,7 +258,8 @@ struct GNUNET_RPS_Request_Handle_Single_Info {
254 * Struct used to pack the callback, its closure (provided by the caller) 258 * Struct used to pack the callback, its closure (provided by the caller)
255 * and the connection handler to the service to pass it to a callback function. 259 * and the connection handler to the service to pass it to a callback function.
256 */ 260 */
257struct cb_cls_pack { 261struct cb_cls_pack
262{
258 /** 263 /**
259 * Callback provided by the client 264 * Callback provided by the client
260 */ 265 */
@@ -296,19 +301,19 @@ static uint64_t srh_callback_num_peers;
296 * @return The handle to the stream request 301 * @return The handle to the stream request
297 */ 302 */
298static struct GNUNET_RPS_StreamRequestHandle * 303static struct GNUNET_RPS_StreamRequestHandle *
299new_stream_request(struct GNUNET_RPS_Handle *rps_handle, 304new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
300 GNUNET_RPS_NotifyReadyCB ready_cb, 305 GNUNET_RPS_NotifyReadyCB ready_cb,
301 void *cls) 306 void *cls)
302{ 307{
303 struct GNUNET_RPS_StreamRequestHandle *srh; 308 struct GNUNET_RPS_StreamRequestHandle *srh;
304 309
305 srh = GNUNET_new(struct GNUNET_RPS_StreamRequestHandle); 310 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
306 srh->rps_handle = rps_handle; 311 srh->rps_handle = rps_handle;
307 srh->ready_cb = ready_cb; 312 srh->ready_cb = ready_cb;
308 srh->ready_cb_cls = cls; 313 srh->ready_cb_cls = cls;
309 GNUNET_CONTAINER_DLL_insert(rps_handle->stream_requests_head, 314 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
310 rps_handle->stream_requests_tail, 315 rps_handle->stream_requests_tail,
311 srh); 316 srh);
312 317
313 return srh; 318 return srh;
314} 319}
@@ -320,20 +325,20 @@ new_stream_request(struct GNUNET_RPS_Handle *rps_handle,
320 * @param srh The request to be removed 325 * @param srh The request to be removed
321 */ 326 */
322static void 327static void
323remove_stream_request(struct GNUNET_RPS_StreamRequestHandle *srh) 328remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
324{ 329{
325 struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle; 330 struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
326 331
327 GNUNET_assert(NULL != srh); 332 GNUNET_assert (NULL != srh);
328 if (NULL != srh->callback_task) 333 if (NULL != srh->callback_task)
329 { 334 {
330 GNUNET_SCHEDULER_cancel(srh->callback_task); 335 GNUNET_SCHEDULER_cancel (srh->callback_task);
331 srh->callback_task = NULL; 336 srh->callback_task = NULL;
332 } 337 }
333 GNUNET_CONTAINER_DLL_remove(rps_handle->stream_requests_head, 338 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
334 rps_handle->stream_requests_tail, 339 rps_handle->stream_requests_tail,
335 srh); 340 srh);
336 GNUNET_free(srh); 341 GNUNET_free (srh);
337} 342}
338 343
339 344
@@ -347,17 +352,17 @@ remove_stream_request(struct GNUNET_RPS_StreamRequestHandle *srh)
347 * @param cls The #GNUNET_RPS_Request_Handle 352 * @param cls The #GNUNET_RPS_Request_Handle
348 */ 353 */
349static void 354static void
350peers_ready_cb(const struct GNUNET_PeerIdentity *peers, 355peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
351 uint32_t num_peers, 356 uint32_t num_peers,
352 void *cls) 357 void *cls)
353{ 358{
354 struct GNUNET_RPS_Request_Handle *rh = cls; 359 struct GNUNET_RPS_Request_Handle *rh = cls;
355 360
356 rh->sampler_rh = NULL; 361 rh->sampler_rh = NULL;
357 rh->ready_cb(rh->ready_cb_cls, 362 rh->ready_cb (rh->ready_cb_cls,
358 num_peers, 363 num_peers,
359 peers); 364 peers);
360 GNUNET_RPS_request_cancel(rh); 365 GNUNET_RPS_request_cancel (rh);
361} 366}
362 367
363 368
@@ -373,19 +378,19 @@ peers_ready_cb(const struct GNUNET_PeerIdentity *peers,
373 * @param num_observed Number of observed IDs 378 * @param num_observed Number of observed IDs
374 */ 379 */
375static void 380static void
376peer_info_ready_cb(const struct GNUNET_PeerIdentity *peers, 381peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers,
377 void *cls, 382 void *cls,
378 double probability, 383 double probability,
379 uint32_t num_observed) 384 uint32_t num_observed)
380{ 385{
381 struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls; 386 struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
382 387
383 rh->sampler_rh = NULL; 388 rh->sampler_rh = NULL;
384 rh->ready_cb(rh->ready_cb_cls, 389 rh->ready_cb (rh->ready_cb_cls,
385 peers, 390 peers,
386 probability, 391 probability,
387 num_observed); 392 num_observed);
388 GNUNET_RPS_request_single_info_cancel(rh); 393 GNUNET_RPS_request_single_info_cancel (rh);
389} 394}
390 395
391 396
@@ -398,19 +403,19 @@ peer_info_ready_cb(const struct GNUNET_PeerIdentity *peers,
398 * @param peers The array of @a num_peers that have been returned 403 * @param peers The array of @a num_peers that have been returned
399 */ 404 */
400static void 405static void
401collect_peers_cb(void *cls, 406collect_peers_cb (void *cls,
402 uint64_t num_peers, 407 uint64_t num_peers,
403 const struct GNUNET_PeerIdentity *peers) 408 const struct GNUNET_PeerIdentity *peers)
404{ 409{
405 struct GNUNET_RPS_Request_Handle *rh = cls; 410 struct GNUNET_RPS_Request_Handle *rh = cls;
406 411
407 LOG(GNUNET_ERROR_TYPE_DEBUG, 412 LOG (GNUNET_ERROR_TYPE_DEBUG,
408 "Service sent %" PRIu64 " peers from stream\n", 413 "Service sent %" PRIu64 " peers from stream\n",
409 num_peers); 414 num_peers);
410 for (uint64_t i = 0; i < num_peers; i++) 415 for (uint64_t i = 0; i < num_peers; i++)
411 { 416 {
412 RPS_sampler_update(rh->sampler, &peers[i]); 417 RPS_sampler_update (rh->sampler, &peers[i]);
413 } 418 }
414} 419}
415 420
416 421
@@ -425,19 +430,19 @@ collect_peers_cb(void *cls,
425 * @param peers The array of @a num_peers that have been returned 430 * @param peers The array of @a num_peers that have been returned
426 */ 431 */
427static void 432static void
428collect_peers_info_cb(void *cls, 433collect_peers_info_cb (void *cls,
429 uint64_t num_peers, 434 uint64_t num_peers,
430 const struct GNUNET_PeerIdentity *peers) 435 const struct GNUNET_PeerIdentity *peers)
431{ 436{
432 struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls; 437 struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
433 438
434 LOG(GNUNET_ERROR_TYPE_DEBUG, 439 LOG (GNUNET_ERROR_TYPE_DEBUG,
435 "Service sent %" PRIu64 " peers from stream\n", 440 "Service sent %" PRIu64 " peers from stream\n",
436 num_peers); 441 num_peers);
437 for (uint64_t i = 0; i < num_peers; i++) 442 for (uint64_t i = 0; i < num_peers; i++)
438 { 443 {
439 RPS_sampler_update(rhs->sampler, &peers[i]); 444 RPS_sampler_update (rhs->sampler, &peers[i]);
440 } 445 }
441} 446}
442 447
443 448
@@ -453,37 +458,37 @@ collect_peers_info_cb(void *cls,
453 * @param ready_cb the callback called when the peers are available 458 * @param ready_cb the callback called when the peers are available
454 */ 459 */
455void 460void
456GNUNET_RPS_view_request(struct GNUNET_RPS_Handle *rps_handle, 461GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
457 uint32_t num_updates, 462 uint32_t num_updates,
458 GNUNET_RPS_NotifyReadyCB view_update_cb, 463 GNUNET_RPS_NotifyReadyCB view_update_cb,
459 void *cls) 464 void *cls)
460{ 465{
461 struct GNUNET_MQ_Envelope *ev; 466 struct GNUNET_MQ_Envelope *ev;
462 struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg; 467 struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg;
463 468
464 LOG(GNUNET_ERROR_TYPE_DEBUG, 469 LOG (GNUNET_ERROR_TYPE_DEBUG,
465 "Client requests %" PRIu32 " view updates\n", 470 "Client requests %" PRIu32 " view updates\n",
466 num_updates); 471 num_updates);
467 rps_handle->view_update_cb = view_update_cb; 472 rps_handle->view_update_cb = view_update_cb;
468 rps_handle->view_update_cls = cls; 473 rps_handle->view_update_cls = cls;
469 474
470 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST); 475 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST);
471 msg->num_updates = htonl(num_updates); 476 msg->num_updates = htonl (num_updates);
472 GNUNET_MQ_send(rps_handle->mq, ev); 477 GNUNET_MQ_send (rps_handle->mq, ev);
473} 478}
474 479
475 480
476void 481void
477GNUNET_RPS_view_request_cancel(struct GNUNET_RPS_Handle *rps_handle) 482GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle)
478{ 483{
479 struct GNUNET_MQ_Envelope *ev; 484 struct GNUNET_MQ_Envelope *ev;
480 485
481 GNUNET_assert(NULL != rps_handle->view_update_cb); 486 GNUNET_assert (NULL != rps_handle->view_update_cb);
482 487
483 rps_handle->view_update_cb = NULL; 488 rps_handle->view_update_cb = NULL;
484 489
485 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL); 490 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL);
486 GNUNET_MQ_send(rps_handle->mq, ev); 491 GNUNET_MQ_send (rps_handle->mq, ev);
487} 492}
488 493
489 494
@@ -495,21 +500,21 @@ GNUNET_RPS_view_request_cancel(struct GNUNET_RPS_Handle *rps_handle)
495 * @param ready_cb the callback called when the peers are available 500 * @param ready_cb the callback called when the peers are available
496 */ 501 */
497struct GNUNET_RPS_StreamRequestHandle * 502struct GNUNET_RPS_StreamRequestHandle *
498GNUNET_RPS_stream_request(struct GNUNET_RPS_Handle *rps_handle, 503GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
499 GNUNET_RPS_NotifyReadyCB stream_input_cb, 504 GNUNET_RPS_NotifyReadyCB stream_input_cb,
500 void *cls) 505 void *cls)
501{ 506{
502 struct GNUNET_RPS_StreamRequestHandle *srh; 507 struct GNUNET_RPS_StreamRequestHandle *srh;
503 struct GNUNET_MQ_Envelope *ev; 508 struct GNUNET_MQ_Envelope *ev;
504 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; 509 struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
505 510
506 srh = new_stream_request(rps_handle, 511 srh = new_stream_request (rps_handle,
507 stream_input_cb, 512 stream_input_cb,
508 cls); 513 cls);
509 LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); 514 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
510 515
511 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); 516 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
512 GNUNET_MQ_send(rps_handle->mq, ev); 517 GNUNET_MQ_send (rps_handle->mq, ev);
513 return srh; 518 return srh;
514} 519}
515 520
@@ -523,21 +528,21 @@ GNUNET_RPS_stream_request(struct GNUNET_RPS_Handle *rps_handle,
523 * @return #GNUNET_OK if @a msg is well-formed 528 * @return #GNUNET_OK if @a msg is well-formed
524 */ 529 */
525static int 530static int
526check_view_update(void *cls, 531check_view_update (void *cls,
527 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) 532 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
528{ 533{
529 uint16_t msize = ntohs(msg->header.size); 534 uint16_t msize = ntohs (msg->header.size);
530 uint32_t num_peers = ntohl(msg->num_peers); 535 uint32_t num_peers = ntohl (msg->num_peers);
531 536
532 (void)cls; 537 (void) cls;
533 538
534 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply); 539 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply);
535 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) || 540 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
536 (msize % sizeof(struct GNUNET_PeerIdentity) != 0)) 541 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
537 { 542 {
538 GNUNET_break(0); 543 GNUNET_break (0);
539 return GNUNET_SYSERR; 544 return GNUNET_SYSERR;
540 } 545 }
541 return GNUNET_OK; 546 return GNUNET_OK;
542} 547}
543 548
@@ -550,21 +555,21 @@ check_view_update(void *cls,
550 * @param msg the message 555 * @param msg the message
551 */ 556 */
552static void 557static void
553handle_view_update(void *cls, 558handle_view_update (void *cls,
554 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg) 559 const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
555{ 560{
556 struct GNUNET_RPS_Handle *h = cls; 561 struct GNUNET_RPS_Handle *h = cls;
557 struct GNUNET_PeerIdentity *peers; 562 struct GNUNET_PeerIdentity *peers;
558 563
559 /* Give the peers back */ 564 /* Give the peers back */
560 LOG(GNUNET_ERROR_TYPE_DEBUG, 565 LOG (GNUNET_ERROR_TYPE_DEBUG,
561 "New view of %" PRIu32 " peers:\n", 566 "New view of %" PRIu32 " peers:\n",
562 ntohl(msg->num_peers)); 567 ntohl (msg->num_peers));
563 568
564 peers = (struct GNUNET_PeerIdentity *)&msg[1]; 569 peers = (struct GNUNET_PeerIdentity *) &msg[1];
565 GNUNET_assert(NULL != h); 570 GNUNET_assert (NULL != h);
566 GNUNET_assert(NULL != h->view_update_cb); 571 GNUNET_assert (NULL != h->view_update_cb);
567 h->view_update_cb(h->view_update_cls, ntohl(msg->num_peers), peers); 572 h->view_update_cb (h->view_update_cls, ntohl (msg->num_peers), peers);
568} 573}
569 574
570 575
@@ -575,12 +580,12 @@ handle_view_update(void *cls,
575 * @param rps_handle The handle representing the service to the client 580 * @param rps_handle The handle representing the service to the client
576 */ 581 */
577static void 582static void
578cancel_stream(struct GNUNET_RPS_Handle *rps_handle) 583cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
579{ 584{
580 struct GNUNET_MQ_Envelope *ev; 585 struct GNUNET_MQ_Envelope *ev;
581 586
582 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); 587 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
583 GNUNET_MQ_send(rps_handle->mq, ev); 588 GNUNET_MQ_send (rps_handle->mq, ev);
584} 589}
585 590
586 591
@@ -590,14 +595,14 @@ cancel_stream(struct GNUNET_RPS_Handle *rps_handle)
590 * @param srh The request handle to cancel 595 * @param srh The request handle to cancel
591 */ 596 */
592void 597void
593GNUNET_RPS_stream_cancel(struct GNUNET_RPS_StreamRequestHandle *srh) 598GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
594{ 599{
595 struct GNUNET_RPS_Handle *rps_handle; 600 struct GNUNET_RPS_Handle *rps_handle;
596 601
597 rps_handle = srh->rps_handle; 602 rps_handle = srh->rps_handle;
598 remove_stream_request(srh); 603 remove_stream_request (srh);
599 if (NULL == rps_handle->stream_requests_head) 604 if (NULL == rps_handle->stream_requests_head)
600 cancel_stream(rps_handle); 605 cancel_stream (rps_handle);
601} 606}
602 607
603 608
@@ -612,21 +617,21 @@ GNUNET_RPS_stream_cancel(struct GNUNET_RPS_StreamRequestHandle *srh)
612 * @param msg the message 617 * @param msg the message
613 */ 618 */
614static int 619static int
615check_stream_input(void *cls, 620check_stream_input (void *cls,
616 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 621 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
617{ 622{
618 uint16_t msize = ntohs(msg->header.size); 623 uint16_t msize = ntohs (msg->header.size);
619 uint32_t num_peers = ntohl(msg->num_peers); 624 uint32_t num_peers = ntohl (msg->num_peers);
620 625
621 (void)cls; 626 (void) cls;
622 627
623 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply); 628 msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply);
624 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) || 629 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
625 (msize % sizeof(struct GNUNET_PeerIdentity) != 0)) 630 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
626 { 631 {
627 GNUNET_break(0); 632 GNUNET_break (0);
628 return GNUNET_SYSERR; 633 return GNUNET_SYSERR;
629 } 634 }
630 return GNUNET_OK; 635 return GNUNET_OK;
631} 636}
632 637
@@ -637,14 +642,14 @@ check_stream_input(void *cls,
637 * @param cls Stream request handle 642 * @param cls Stream request handle
638 */ 643 */
639static void 644static void
640srh_callback_scheduled(void *cls) 645srh_callback_scheduled (void *cls)
641{ 646{
642 struct GNUNET_RPS_StreamRequestHandle *srh = cls; 647 struct GNUNET_RPS_StreamRequestHandle *srh = cls;
643 648
644 srh->callback_task = NULL; 649 srh->callback_task = NULL;
645 srh->ready_cb(srh->ready_cb_cls, 650 srh->ready_cb (srh->ready_cb_cls,
646 srh_callback_num_peers, 651 srh_callback_num_peers,
647 srh_callback_peers); 652 srh_callback_peers);
648} 653}
649 654
650 655
@@ -657,45 +662,45 @@ srh_callback_scheduled(void *cls)
657 * @param msg the message 662 * @param msg the message
658 */ 663 */
659static void 664static void
660handle_stream_input(void *cls, 665handle_stream_input (void *cls,
661 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg) 666 const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
662{ 667{
663 struct GNUNET_RPS_Handle *h = cls; 668 struct GNUNET_RPS_Handle *h = cls;
664 //const struct GNUNET_PeerIdentity *peers; 669 // const struct GNUNET_PeerIdentity *peers;
665 uint64_t num_peers; 670 uint64_t num_peers;
666 struct GNUNET_RPS_StreamRequestHandle *srh_iter; 671 struct GNUNET_RPS_StreamRequestHandle *srh_iter;
667 struct GNUNET_RPS_StreamRequestHandle *srh_next; 672 struct GNUNET_RPS_StreamRequestHandle *srh_next;
668 673
669 //peers = (struct GNUNET_PeerIdentity *) &msg[1]; 674 // peers = (struct GNUNET_PeerIdentity *) &msg[1];
670 num_peers = ntohl(msg->num_peers); 675 num_peers = ntohl (msg->num_peers);
671 srh_callback_num_peers = num_peers; 676 srh_callback_num_peers = num_peers;
672 GNUNET_free_non_null(srh_callback_peers); 677 GNUNET_free_non_null (srh_callback_peers);
673 srh_callback_peers = GNUNET_new_array(num_peers, 678 srh_callback_peers = GNUNET_new_array (num_peers,
674 struct GNUNET_PeerIdentity); 679 struct GNUNET_PeerIdentity);
675 GNUNET_memcpy(srh_callback_peers, 680 GNUNET_memcpy (srh_callback_peers,
676 &msg[1], 681 &msg[1],
677 num_peers * sizeof(struct GNUNET_PeerIdentity)); 682 num_peers * sizeof(struct GNUNET_PeerIdentity));
678 LOG(GNUNET_ERROR_TYPE_DEBUG, 683 LOG (GNUNET_ERROR_TYPE_DEBUG,
679 "Received %" PRIu64 " peer(s) from stream input.\n", 684 "Received %" PRIu64 " peer(s) from stream input.\n",
680 num_peers); 685 num_peers);
681 for (srh_iter = h->stream_requests_head; 686 for (srh_iter = h->stream_requests_head;
682 NULL != srh_iter; 687 NULL != srh_iter;
683 srh_iter = srh_next) 688 srh_iter = srh_next)
684 { 689 {
685 LOG(GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); 690 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
686 /* Store next pointer - srh might be removed/freed in callback */ 691 /* Store next pointer - srh might be removed/freed in callback */
687 srh_next = srh_iter->next; 692 srh_next = srh_iter->next;
688 if (NULL != srh_iter->callback_task) 693 if (NULL != srh_iter->callback_task)
689 GNUNET_SCHEDULER_cancel(srh_iter->callback_task); 694 GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
690 srh_iter->callback_task = 695 srh_iter->callback_task =
691 GNUNET_SCHEDULER_add_now(&srh_callback_scheduled, 696 GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
692 srh_iter); 697 srh_iter);
693 } 698 }
694 699
695 if (NULL == h->stream_requests_head) 700 if (NULL == h->stream_requests_head)
696 { 701 {
697 cancel_stream(h); 702 cancel_stream (h);
698 } 703 }
699} 704}
700 705
701 706
@@ -703,7 +708,7 @@ handle_stream_input(void *cls,
703 * Reconnect to the service 708 * Reconnect to the service
704 */ 709 */
705static void 710static void
706reconnect(struct GNUNET_RPS_Handle *h); 711reconnect (struct GNUNET_RPS_Handle *h);
707 712
708 713
709/** 714/**
@@ -716,19 +721,20 @@ reconnect(struct GNUNET_RPS_Handle *h);
716 * @param error error code without specyfied meaning 721 * @param error error code without specyfied meaning
717 */ 722 */
718static void 723static void
719mq_error_handler(void *cls, 724mq_error_handler (void *cls,
720 enum GNUNET_MQ_Error error) 725 enum GNUNET_MQ_Error error)
721{ 726{
722 struct GNUNET_RPS_Handle *h = cls; 727 struct GNUNET_RPS_Handle *h = cls;
723 728
724 //TODO LOG 729 // TODO LOG
725 LOG(GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\ 730 LOG (GNUNET_ERROR_TYPE_WARNING,
731 "Problem with message queue. error: %i\n\
726 1: READ,\n\ 732 1: READ,\n\
727 2: WRITE,\n\ 733 2: WRITE,\n\
728 4: TIMEOUT\n", 734 4: TIMEOUT\n",
729 // TODO: write GNUNET_MQ_strerror (error) 735 // TODO: write GNUNET_MQ_strerror (error)
730 error); 736 error);
731 reconnect(h); 737 reconnect (h);
732 /* Resend all pending request as the service destroyed its knowledge 738 /* Resend all pending request as the service destroyed its knowledge
733 * about them */ 739 * about them */
734} 740}
@@ -742,16 +748,16 @@ mq_error_handler(void *cls,
742 * @param hash[out] Pointer to the location in which the hash will be stored. 748 * @param hash[out] Pointer to the location in which the hash will be stored.
743 */ 749 */
744static void 750static void
745hash_from_share_val(const char *share_val, 751hash_from_share_val (const char *share_val,
746 struct GNUNET_HashCode *hash) 752 struct GNUNET_HashCode *hash)
747{ 753{
748 GNUNET_CRYPTO_kdf(hash, 754 GNUNET_CRYPTO_kdf (hash,
749 sizeof(struct GNUNET_HashCode), 755 sizeof(struct GNUNET_HashCode),
750 "rps", 756 "rps",
751 strlen("rps"), 757 strlen ("rps"),
752 share_val, 758 share_val,
753 strlen(share_val), 759 strlen (share_val),
754 NULL, 0); 760 NULL, 0);
755} 761}
756 762
757 763
@@ -767,30 +773,32 @@ hash_from_share_val(const char *share_val,
767 * @param std_dev the standard distribution 773 * @param std_dev the standard distribution
768 */ 774 */
769static void 775static void
770nse_cb(void *cls, 776nse_cb (void *cls,
771 struct GNUNET_TIME_Absolute timestamp, 777 struct GNUNET_TIME_Absolute timestamp,
772 double logestimate, 778 double logestimate,
773 double std_dev) 779 double std_dev)
774{ 780{
775 struct GNUNET_RPS_Handle *h = cls; 781 struct GNUNET_RPS_Handle *h = cls;
776 782
777 (void)timestamp; 783 (void) timestamp;
778 (void)std_dev; 784 (void) std_dev;
779 785
780 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; 786 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
781 NULL != rh_iter && NULL != rh_iter->next; 787 NULL != rh_iter && NULL != rh_iter->next;
782 rh_iter = rh_iter->next) 788 rh_iter = rh_iter->next)
783 { 789 {
784 RPS_sampler_update_with_nw_size(rh_iter->sampler, 790 RPS_sampler_update_with_nw_size (rh_iter->sampler,
785 GNUNET_NSE_log_estimate_to_n(logestimate)); 791 GNUNET_NSE_log_estimate_to_n (
786 } 792 logestimate));
793 }
787 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; 794 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
788 NULL != rhs_iter && NULL != rhs_iter->next; 795 NULL != rhs_iter && NULL != rhs_iter->next;
789 rhs_iter = rhs_iter->next) 796 rhs_iter = rhs_iter->next)
790 { 797 {
791 RPS_sampler_update_with_nw_size(rhs_iter->sampler, 798 RPS_sampler_update_with_nw_size (rhs_iter->sampler,
792 GNUNET_NSE_log_estimate_to_n(logestimate)); 799 GNUNET_NSE_log_estimate_to_n (
793 } 800 logestimate));
801 }
794} 802}
795 803
796 804
@@ -798,30 +806,30 @@ nse_cb(void *cls,
798 * Reconnect to the service 806 * Reconnect to the service
799 */ 807 */
800static void 808static void
801reconnect(struct GNUNET_RPS_Handle *h) 809reconnect (struct GNUNET_RPS_Handle *h)
802{ 810{
803 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 811 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
804 GNUNET_MQ_hd_var_size(view_update, 812 GNUNET_MQ_hd_var_size (view_update,
805 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, 813 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
806 struct GNUNET_RPS_CS_DEBUG_ViewReply, 814 struct GNUNET_RPS_CS_DEBUG_ViewReply,
807 h), 815 h),
808 GNUNET_MQ_hd_var_size(stream_input, 816 GNUNET_MQ_hd_var_size (stream_input,
809 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY, 817 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
810 struct GNUNET_RPS_CS_DEBUG_StreamReply, 818 struct GNUNET_RPS_CS_DEBUG_StreamReply,
811 h), 819 h),
812 GNUNET_MQ_handler_end() 820 GNUNET_MQ_handler_end ()
813 }; 821 };
814 822
815 if (NULL != h->mq) 823 if (NULL != h->mq)
816 GNUNET_MQ_destroy(h->mq); 824 GNUNET_MQ_destroy (h->mq);
817 h->mq = GNUNET_CLIENT_connect(h->cfg, 825 h->mq = GNUNET_CLIENT_connect (h->cfg,
818 "rps", 826 "rps",
819 mq_handlers, 827 mq_handlers,
820 &mq_error_handler, 828 &mq_error_handler,
821 h); 829 h);
822 if (NULL != h->nse) 830 if (NULL != h->nse)
823 GNUNET_NSE_disconnect(h->nse); 831 GNUNET_NSE_disconnect (h->nse);
824 h->nse = GNUNET_NSE_connect(h->cfg, &nse_cb, h); 832 h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
825} 833}
826 834
827 835
@@ -832,56 +840,56 @@ reconnect(struct GNUNET_RPS_Handle *h)
832 * @return a handle to the service, NULL on error 840 * @return a handle to the service, NULL on error
833 */ 841 */
834struct GNUNET_RPS_Handle * 842struct GNUNET_RPS_Handle *
835GNUNET_RPS_connect(const struct GNUNET_CONFIGURATION_Handle *cfg) 843GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
836{ 844{
837 struct GNUNET_RPS_Handle *h; 845 struct GNUNET_RPS_Handle *h;
838 846
839 h = GNUNET_new(struct GNUNET_RPS_Handle); 847 h = GNUNET_new (struct GNUNET_RPS_Handle);
840 h->cfg = cfg; 848 h->cfg = cfg;
841 if (GNUNET_OK != 849 if (GNUNET_OK !=
842 GNUNET_CONFIGURATION_get_value_float(cfg, 850 GNUNET_CONFIGURATION_get_value_float (cfg,
843 "RPS", 851 "RPS",
844 "DESIRED_PROBABILITY", 852 "DESIRED_PROBABILITY",
845 &h->desired_probability)) 853 &h->desired_probability))
846 { 854 {
847 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR, 855 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
848 "RPS", "DESIRED_PROBABILITY"); 856 "RPS", "DESIRED_PROBABILITY");
849 GNUNET_free(h); 857 GNUNET_free (h);
850 return NULL; 858 return NULL;
851 } 859 }
852 if (0 > h->desired_probability || 860 if ((0 > h->desired_probability)||
853 1 < h->desired_probability) 861 (1 < h->desired_probability) )
854 { 862 {
855 LOG(GNUNET_ERROR_TYPE_ERROR, 863 LOG (GNUNET_ERROR_TYPE_ERROR,
856 "The desired probability must be in the interval [0;1]\n"); 864 "The desired probability must be in the interval [0;1]\n");
857 GNUNET_free(h); 865 GNUNET_free (h);
858 return NULL; 866 return NULL;
859 } 867 }
860 if (GNUNET_OK != 868 if (GNUNET_OK !=
861 GNUNET_CONFIGURATION_get_value_float(cfg, 869 GNUNET_CONFIGURATION_get_value_float (cfg,
862 "RPS", 870 "RPS",
863 "DEFICIENCY_FACTOR", 871 "DEFICIENCY_FACTOR",
864 &h->deficiency_factor)) 872 &h->deficiency_factor))
865 { 873 {
866 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR, 874 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
867 "RPS", "DEFICIENCY_FACTOR"); 875 "RPS", "DEFICIENCY_FACTOR");
868 GNUNET_free(h); 876 GNUNET_free (h);
869 return NULL; 877 return NULL;
870 } 878 }
871 if (0 > h->desired_probability || 879 if ((0 > h->desired_probability)||
872 1 < h->desired_probability) 880 (1 < h->desired_probability) )
873 { 881 {
874 LOG(GNUNET_ERROR_TYPE_ERROR, 882 LOG (GNUNET_ERROR_TYPE_ERROR,
875 "The deficiency factor must be in the interval [0;1]\n"); 883 "The deficiency factor must be in the interval [0;1]\n");
876 GNUNET_free(h); 884 GNUNET_free (h);
877 return NULL; 885 return NULL;
878 } 886 }
879 reconnect(h); 887 reconnect (h);
880 if (NULL == h->mq) 888 if (NULL == h->mq)
881 { 889 {
882 GNUNET_free(h); 890 GNUNET_free (h);
883 return NULL; 891 return NULL;
884 } 892 }
885 return h; 893 return h;
886} 894}
887 895
@@ -893,19 +901,19 @@ GNUNET_RPS_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
893 * @param shared_value The shared value that defines the members of the sub (-gorup) 901 * @param shared_value The shared value that defines the members of the sub (-gorup)
894 */ 902 */
895void 903void
896GNUNET_RPS_sub_start(struct GNUNET_RPS_Handle *h, 904GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
897 const char *shared_value) 905 const char *shared_value)
898{ 906{
899 struct GNUNET_RPS_CS_SubStartMessage *msg; 907 struct GNUNET_RPS_CS_SubStartMessage *msg;
900 struct GNUNET_MQ_Envelope *ev; 908 struct GNUNET_MQ_Envelope *ev;
901 909
902 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START); 910 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
903 hash_from_share_val(shared_value, &msg->hash); 911 hash_from_share_val (shared_value, &msg->hash);
904 msg->round_interval = GNUNET_TIME_relative_hton( // TODO read from config! 912 msg->round_interval = GNUNET_TIME_relative_hton ( // TODO read from config!
905 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)); 913 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
906 GNUNET_assert(0 != msg->round_interval.rel_value_us__); 914 GNUNET_assert (0 != msg->round_interval.rel_value_us__);
907 915
908 GNUNET_MQ_send(h->mq, ev); 916 GNUNET_MQ_send (h->mq, ev);
909} 917}
910 918
911 919
@@ -916,16 +924,16 @@ GNUNET_RPS_sub_start(struct GNUNET_RPS_Handle *h,
916 * @param shared_value The shared value that defines the members of the sub (-gorup) 924 * @param shared_value The shared value that defines the members of the sub (-gorup)
917 */ 925 */
918void 926void
919GNUNET_RPS_sub_stop(struct GNUNET_RPS_Handle *h, 927GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
920 const char *shared_value) 928 const char *shared_value)
921{ 929{
922 struct GNUNET_RPS_CS_SubStopMessage *msg; 930 struct GNUNET_RPS_CS_SubStopMessage *msg;
923 struct GNUNET_MQ_Envelope *ev; 931 struct GNUNET_MQ_Envelope *ev;
924 932
925 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP); 933 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
926 hash_from_share_val(shared_value, &msg->hash); 934 hash_from_share_val (shared_value, &msg->hash);
927 935
928 GNUNET_MQ_send(h->mq, ev); 936 GNUNET_MQ_send (h->mq, ev);
929} 937}
930 938
931 939
@@ -939,37 +947,37 @@ GNUNET_RPS_sub_stop(struct GNUNET_RPS_Handle *h,
939 * @return a handle to cancel this request 947 * @return a handle to cancel this request
940 */ 948 */
941struct GNUNET_RPS_Request_Handle * 949struct GNUNET_RPS_Request_Handle *
942GNUNET_RPS_request_peers(struct GNUNET_RPS_Handle *rps_handle, 950GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
943 uint32_t num_req_peers, 951 uint32_t num_req_peers,
944 GNUNET_RPS_NotifyReadyCB ready_cb, 952 GNUNET_RPS_NotifyReadyCB ready_cb,
945 void *cls) 953 void *cls)
946{ 954{
947 struct GNUNET_RPS_Request_Handle *rh; 955 struct GNUNET_RPS_Request_Handle *rh;
948 956
949 LOG(GNUNET_ERROR_TYPE_INFO, 957 LOG (GNUNET_ERROR_TYPE_INFO,
950 "Client requested %" PRIu32 " peers\n", 958 "Client requested %" PRIu32 " peers\n",
951 num_req_peers); 959 num_req_peers);
952 rh = GNUNET_new(struct GNUNET_RPS_Request_Handle); 960 rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
953 rh->rps_handle = rps_handle; 961 rh->rps_handle = rps_handle;
954 rh->num_requests = num_req_peers; 962 rh->num_requests = num_req_peers;
955 rh->sampler = RPS_sampler_mod_init(num_req_peers, 963 rh->sampler = RPS_sampler_mod_init (num_req_peers,
956 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff 964 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
957 RPS_sampler_set_desired_probability(rh->sampler, 965 RPS_sampler_set_desired_probability (rh->sampler,
958 rps_handle->desired_probability); 966 rps_handle->desired_probability);
959 RPS_sampler_set_deficiency_factor(rh->sampler, 967 RPS_sampler_set_deficiency_factor (rh->sampler,
960 rps_handle->deficiency_factor); 968 rps_handle->deficiency_factor);
961 rh->sampler_rh = RPS_sampler_get_n_rand_peers(rh->sampler, 969 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
962 num_req_peers, 970 num_req_peers,
963 peers_ready_cb, 971 peers_ready_cb,
964 rh); 972 rh);
965 rh->srh = GNUNET_RPS_stream_request(rps_handle, 973 rh->srh = GNUNET_RPS_stream_request (rps_handle,
966 collect_peers_cb, 974 collect_peers_cb,
967 rh); /* cls */ 975 rh); /* cls */
968 rh->ready_cb = ready_cb; 976 rh->ready_cb = ready_cb;
969 rh->ready_cb_cls = cls; 977 rh->ready_cb_cls = cls;
970 GNUNET_CONTAINER_DLL_insert(rps_handle->rh_head, 978 GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
971 rps_handle->rh_tail, 979 rps_handle->rh_tail,
972 rh); 980 rh);
973 981
974 return rh; 982 return rh;
975} 983}
@@ -984,34 +992,34 @@ GNUNET_RPS_request_peers(struct GNUNET_RPS_Handle *rps_handle,
984 * @return a handle to cancel this request 992 * @return a handle to cancel this request
985 */ 993 */
986struct GNUNET_RPS_Request_Handle_Single_Info * 994struct GNUNET_RPS_Request_Handle_Single_Info *
987GNUNET_RPS_request_peer_info(struct GNUNET_RPS_Handle *rps_handle, 995GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle,
988 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, 996 GNUNET_RPS_NotifyReadySingleInfoCB ready_cb,
989 void *cls) 997 void *cls)
990{ 998{
991 struct GNUNET_RPS_Request_Handle_Single_Info *rhs; 999 struct GNUNET_RPS_Request_Handle_Single_Info *rhs;
992 uint32_t num_req_peers = 1; 1000 uint32_t num_req_peers = 1;
993 1001
994 LOG(GNUNET_ERROR_TYPE_INFO, 1002 LOG (GNUNET_ERROR_TYPE_INFO,
995 "Client requested peer with additional info\n"); 1003 "Client requested peer with additional info\n");
996 rhs = GNUNET_new(struct GNUNET_RPS_Request_Handle_Single_Info); 1004 rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info);
997 rhs->rps_handle = rps_handle; 1005 rhs->rps_handle = rps_handle;
998 rhs->sampler = RPS_sampler_mod_init(num_req_peers, 1006 rhs->sampler = RPS_sampler_mod_init (num_req_peers,
999 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff 1007 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1000 RPS_sampler_set_desired_probability(rhs->sampler, 1008 RPS_sampler_set_desired_probability (rhs->sampler,
1001 rps_handle->desired_probability); 1009 rps_handle->desired_probability);
1002 RPS_sampler_set_deficiency_factor(rhs->sampler, 1010 RPS_sampler_set_deficiency_factor (rhs->sampler,
1003 rps_handle->deficiency_factor); 1011 rps_handle->deficiency_factor);
1004 rhs->sampler_rh = RPS_sampler_get_rand_peer_info(rhs->sampler, 1012 rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler,
1005 peer_info_ready_cb, 1013 peer_info_ready_cb,
1006 rhs); 1014 rhs);
1007 rhs->srh = GNUNET_RPS_stream_request(rps_handle, 1015 rhs->srh = GNUNET_RPS_stream_request (rps_handle,
1008 collect_peers_info_cb, 1016 collect_peers_info_cb,
1009 rhs); /* cls */ 1017 rhs); /* cls */
1010 rhs->ready_cb = ready_cb; 1018 rhs->ready_cb = ready_cb;
1011 rhs->ready_cb_cls = cls; 1019 rhs->ready_cb_cls = cls;
1012 GNUNET_CONTAINER_DLL_insert(rps_handle->rhs_head, 1020 GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head,
1013 rps_handle->rhs_tail, 1021 rps_handle->rhs_tail,
1014 rhs); 1022 rhs);
1015 1023
1016 return rhs; 1024 return rhs;
1017} 1025}
@@ -1025,9 +1033,9 @@ GNUNET_RPS_request_peer_info(struct GNUNET_RPS_Handle *rps_handle,
1025 * @param ids the ids of the peers seeded 1033 * @param ids the ids of the peers seeded
1026 */ 1034 */
1027void 1035void
1028GNUNET_RPS_seed_ids(struct GNUNET_RPS_Handle *h, 1036GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
1029 uint32_t n, 1037 uint32_t n,
1030 const struct GNUNET_PeerIdentity *ids) 1038 const struct GNUNET_PeerIdentity *ids)
1031{ 1039{
1032 size_t size_needed; 1040 size_t size_needed;
1033 uint32_t num_peers_max; 1041 uint32_t num_peers_max;
@@ -1035,52 +1043,53 @@ GNUNET_RPS_seed_ids(struct GNUNET_RPS_Handle *h,
1035 struct GNUNET_MQ_Envelope *ev; 1043 struct GNUNET_MQ_Envelope *ev;
1036 struct GNUNET_RPS_CS_SeedMessage *msg; 1044 struct GNUNET_RPS_CS_SeedMessage *msg;
1037 1045
1038 LOG(GNUNET_ERROR_TYPE_DEBUG, 1046 LOG (GNUNET_ERROR_TYPE_DEBUG,
1039 "Client wants to seed %" PRIu32 " peers:\n", 1047 "Client wants to seed %" PRIu32 " peers:\n",
1040 n); 1048 n);
1041 for (unsigned int i = 0; i < n; i++) 1049 for (unsigned int i = 0; i < n; i++)
1042 LOG(GNUNET_ERROR_TYPE_DEBUG, 1050 LOG (GNUNET_ERROR_TYPE_DEBUG,
1043 "%u. peer: %s\n", 1051 "%u. peer: %s\n",
1044 i, 1052 i,
1045 GNUNET_i2s(&ids[i])); 1053 GNUNET_i2s (&ids[i]));
1046 1054
1047 /* The actual size the message occupies */ 1055 /* The actual size the message occupies */
1048 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) + 1056 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1049 n * sizeof(struct GNUNET_PeerIdentity); 1057 + n * sizeof(struct GNUNET_PeerIdentity);
1050 /* The number of peers that fits in one message together with 1058 /* The number of peers that fits in one message together with
1051 * the respective header */ 1059 * the respective header */
1052 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - 1060 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE
1053 sizeof(struct GNUNET_RPS_CS_SeedMessage)) / 1061 - sizeof(struct GNUNET_RPS_CS_SeedMessage))
1054 sizeof(struct GNUNET_PeerIdentity); 1062 / sizeof(struct GNUNET_PeerIdentity);
1055 tmp_peer_pointer = ids; 1063 tmp_peer_pointer = ids;
1056 1064
1057 while (GNUNET_MAX_MESSAGE_SIZE < size_needed) 1065 while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1058 { 1066 {
1059 ev = GNUNET_MQ_msg_extra(msg, 1067 ev = GNUNET_MQ_msg_extra (msg,
1060 num_peers_max * sizeof(struct GNUNET_PeerIdentity), 1068 num_peers_max * sizeof(struct
1061 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 1069 GNUNET_PeerIdentity),
1062 msg->num_peers = htonl(num_peers_max); 1070 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1063 GNUNET_memcpy(&msg[1], 1071 msg->num_peers = htonl (num_peers_max);
1064 tmp_peer_pointer, 1072 GNUNET_memcpy (&msg[1],
1065 num_peers_max * sizeof(struct GNUNET_PeerIdentity)); 1073 tmp_peer_pointer,
1066 GNUNET_MQ_send(h->mq, 1074 num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1067 ev); 1075 GNUNET_MQ_send (h->mq,
1068 n -= num_peers_max; 1076 ev);
1069 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) + 1077 n -= num_peers_max;
1070 n * sizeof(struct GNUNET_PeerIdentity); 1078 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1071 /* Set pointer to beginning of next block of num_peers_max peers */ 1079 + n * sizeof(struct GNUNET_PeerIdentity);
1072 tmp_peer_pointer = &ids[num_peers_max]; 1080 /* Set pointer to beginning of next block of num_peers_max peers */
1073 } 1081 tmp_peer_pointer = &ids[num_peers_max];
1074 1082 }
1075 ev = GNUNET_MQ_msg_extra(msg, 1083
1076 n * sizeof(struct GNUNET_PeerIdentity), 1084 ev = GNUNET_MQ_msg_extra (msg,
1077 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 1085 n * sizeof(struct GNUNET_PeerIdentity),
1078 msg->num_peers = htonl(n); 1086 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
1079 GNUNET_memcpy(&msg[1], 1087 msg->num_peers = htonl (n);
1080 tmp_peer_pointer, 1088 GNUNET_memcpy (&msg[1],
1081 n * sizeof(struct GNUNET_PeerIdentity)); 1089 tmp_peer_pointer,
1082 GNUNET_MQ_send(h->mq, 1090 n * sizeof(struct GNUNET_PeerIdentity));
1083 ev); 1091 GNUNET_MQ_send (h->mq,
1092 ev);
1084} 1093}
1085 1094
1086 1095
@@ -1100,11 +1109,11 @@ GNUNET_RPS_seed_ids(struct GNUNET_RPS_Handle *h,
1100 * peer to be isolated from the rest 1109 * peer to be isolated from the rest
1101 */ 1110 */
1102void 1111void
1103GNUNET_RPS_act_malicious(struct GNUNET_RPS_Handle *h, 1112GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
1104 uint32_t type, 1113 uint32_t type,
1105 uint32_t num_peers, 1114 uint32_t num_peers,
1106 const struct GNUNET_PeerIdentity *peer_ids, 1115 const struct GNUNET_PeerIdentity *peer_ids,
1107 const struct GNUNET_PeerIdentity *target_peer) 1116 const struct GNUNET_PeerIdentity *target_peer)
1108{ 1117{
1109 size_t size_needed; 1118 size_t size_needed;
1110 uint32_t num_peers_max; 1119 uint32_t num_peers_max;
@@ -1114,65 +1123,68 @@ GNUNET_RPS_act_malicious(struct GNUNET_RPS_Handle *h,
1114 1123
1115 unsigned int i; 1124 unsigned int i;
1116 1125
1117 LOG(GNUNET_ERROR_TYPE_DEBUG, 1126 LOG (GNUNET_ERROR_TYPE_DEBUG,
1118 "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n", 1127 "Client turns malicious (type %" PRIu32 ") with %" PRIu32
1119 type, 1128 " other peers:\n",
1120 num_peers); 1129 type,
1130 num_peers);
1121 for (i = 0; i < num_peers; i++) 1131 for (i = 0; i < num_peers; i++)
1122 LOG(GNUNET_ERROR_TYPE_DEBUG, 1132 LOG (GNUNET_ERROR_TYPE_DEBUG,
1123 "%u. peer: %s\n", 1133 "%u. peer: %s\n",
1124 i, 1134 i,
1125 GNUNET_i2s(&peer_ids[i])); 1135 GNUNET_i2s (&peer_ids[i]));
1126 1136
1127 /* The actual size the message would occupy */ 1137 /* The actual size the message would occupy */
1128 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) + 1138 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1129 num_peers * sizeof(struct GNUNET_PeerIdentity); 1139 + num_peers * sizeof(struct GNUNET_PeerIdentity);
1130 /* The number of peers that fit in one message together with 1140 /* The number of peers that fit in one message together with
1131 * the respective header */ 1141 * the respective header */
1132 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - 1142 num_peers_max = (GNUNET_MAX_MESSAGE_SIZE
1133 sizeof(struct GNUNET_RPS_CS_SeedMessage)) / 1143 - sizeof(struct GNUNET_RPS_CS_SeedMessage))
1134 sizeof(struct GNUNET_PeerIdentity); 1144 / sizeof(struct GNUNET_PeerIdentity);
1135 tmp_peer_pointer = peer_ids; 1145 tmp_peer_pointer = peer_ids;
1136 1146
1137 while (GNUNET_MAX_MESSAGE_SIZE < size_needed) 1147 while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1138 { 1148 {
1139 LOG(GNUNET_ERROR_TYPE_DEBUG, 1149 LOG (GNUNET_ERROR_TYPE_DEBUG,
1140 "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n", 1150 "Too many peers to send at once, sending %" PRIu32
1141 num_peers_max); 1151 " (all we can so far)\n",
1142 ev = GNUNET_MQ_msg_extra(msg, 1152 num_peers_max);
1143 num_peers_max * sizeof(struct GNUNET_PeerIdentity), 1153 ev = GNUNET_MQ_msg_extra (msg,
1144 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 1154 num_peers_max * sizeof(struct
1145 msg->type = htonl(type); 1155 GNUNET_PeerIdentity),
1146 msg->num_peers = htonl(num_peers_max); 1156 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1147 if ((2 == type) || 1157 msg->type = htonl (type);
1148 (3 == type)) 1158 msg->num_peers = htonl (num_peers_max);
1149 msg->attacked_peer = peer_ids[num_peers]; 1159 if ((2 == type) ||
1150 GNUNET_memcpy(&msg[1], 1160 (3 == type))
1151 tmp_peer_pointer, 1161 msg->attacked_peer = peer_ids[num_peers];
1152 num_peers_max * sizeof(struct GNUNET_PeerIdentity)); 1162 GNUNET_memcpy (&msg[1],
1153 1163 tmp_peer_pointer,
1154 GNUNET_MQ_send(h->mq, ev); 1164 num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1155 1165
1156 num_peers -= num_peers_max; 1166 GNUNET_MQ_send (h->mq, ev);
1157 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) + 1167
1158 num_peers * sizeof(struct GNUNET_PeerIdentity); 1168 num_peers -= num_peers_max;
1159 /* Set pointer to beginning of next block of num_peers_max peers */ 1169 size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage)
1160 tmp_peer_pointer = &peer_ids[num_peers_max]; 1170 + num_peers * sizeof(struct GNUNET_PeerIdentity);
1161 } 1171 /* Set pointer to beginning of next block of num_peers_max peers */
1162 1172 tmp_peer_pointer = &peer_ids[num_peers_max];
1163 ev = GNUNET_MQ_msg_extra(msg, 1173 }
1164 num_peers * sizeof(struct GNUNET_PeerIdentity), 1174
1165 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); 1175 ev = GNUNET_MQ_msg_extra (msg,
1166 msg->type = htonl(type); 1176 num_peers * sizeof(struct GNUNET_PeerIdentity),
1167 msg->num_peers = htonl(num_peers); 1177 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1178 msg->type = htonl (type);
1179 msg->num_peers = htonl (num_peers);
1168 if ((2 == type) || 1180 if ((2 == type) ||
1169 (3 == type)) 1181 (3 == type))
1170 msg->attacked_peer = *target_peer; 1182 msg->attacked_peer = *target_peer;
1171 GNUNET_memcpy(&msg[1], 1183 GNUNET_memcpy (&msg[1],
1172 tmp_peer_pointer, 1184 tmp_peer_pointer,
1173 num_peers * sizeof(struct GNUNET_PeerIdentity)); 1185 num_peers * sizeof(struct GNUNET_PeerIdentity));
1174 1186
1175 GNUNET_MQ_send(h->mq, ev); 1187 GNUNET_MQ_send (h->mq, ev);
1176} 1188}
1177#endif /* ENABLE_MALICIOUS */ 1189#endif /* ENABLE_MALICIOUS */
1178 1190
@@ -1183,28 +1195,28 @@ GNUNET_RPS_act_malicious(struct GNUNET_RPS_Handle *h,
1183 * @param rh request handle of request to cancle 1195 * @param rh request handle of request to cancle
1184 */ 1196 */
1185void 1197void
1186GNUNET_RPS_request_cancel(struct GNUNET_RPS_Request_Handle *rh) 1198GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
1187{ 1199{
1188 struct GNUNET_RPS_Handle *h; 1200 struct GNUNET_RPS_Handle *h;
1189 1201
1190 h = rh->rps_handle; 1202 h = rh->rps_handle;
1191 GNUNET_assert(NULL != rh); 1203 GNUNET_assert (NULL != rh);
1192 GNUNET_assert(NULL != rh->srh); 1204 GNUNET_assert (NULL != rh->srh);
1193 GNUNET_assert(h == rh->srh->rps_handle); 1205 GNUNET_assert (h == rh->srh->rps_handle);
1194 GNUNET_RPS_stream_cancel(rh->srh); 1206 GNUNET_RPS_stream_cancel (rh->srh);
1195 rh->srh = NULL; 1207 rh->srh = NULL;
1196 if (NULL == h->stream_requests_head) 1208 if (NULL == h->stream_requests_head)
1197 cancel_stream(h); 1209 cancel_stream (h);
1198 if (NULL != rh->sampler_rh) 1210 if (NULL != rh->sampler_rh)
1199 { 1211 {
1200 RPS_sampler_request_cancel(rh->sampler_rh); 1212 RPS_sampler_request_cancel (rh->sampler_rh);
1201 } 1213 }
1202 RPS_sampler_destroy(rh->sampler); 1214 RPS_sampler_destroy (rh->sampler);
1203 rh->sampler = NULL; 1215 rh->sampler = NULL;
1204 GNUNET_CONTAINER_DLL_remove(h->rh_head, 1216 GNUNET_CONTAINER_DLL_remove (h->rh_head,
1205 h->rh_tail, 1217 h->rh_tail,
1206 rh); 1218 rh);
1207 GNUNET_free(rh); 1219 GNUNET_free (rh);
1208} 1220}
1209 1221
1210 1222
@@ -1214,29 +1226,29 @@ GNUNET_RPS_request_cancel(struct GNUNET_RPS_Request_Handle *rh)
1214 * @param rhs request handle of request to cancle 1226 * @param rhs request handle of request to cancle
1215 */ 1227 */
1216void 1228void
1217GNUNET_RPS_request_single_info_cancel( 1229GNUNET_RPS_request_single_info_cancel (
1218 struct GNUNET_RPS_Request_Handle_Single_Info *rhs) 1230 struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
1219{ 1231{
1220 struct GNUNET_RPS_Handle *h; 1232 struct GNUNET_RPS_Handle *h;
1221 1233
1222 h = rhs->rps_handle; 1234 h = rhs->rps_handle;
1223 GNUNET_assert(NULL != rhs); 1235 GNUNET_assert (NULL != rhs);
1224 GNUNET_assert(NULL != rhs->srh); 1236 GNUNET_assert (NULL != rhs->srh);
1225 GNUNET_assert(h == rhs->srh->rps_handle); 1237 GNUNET_assert (h == rhs->srh->rps_handle);
1226 GNUNET_RPS_stream_cancel(rhs->srh); 1238 GNUNET_RPS_stream_cancel (rhs->srh);
1227 rhs->srh = NULL; 1239 rhs->srh = NULL;
1228 if (NULL == h->stream_requests_head) 1240 if (NULL == h->stream_requests_head)
1229 cancel_stream(h); 1241 cancel_stream (h);
1230 if (NULL != rhs->sampler_rh) 1242 if (NULL != rhs->sampler_rh)
1231 { 1243 {
1232 RPS_sampler_request_single_info_cancel(rhs->sampler_rh); 1244 RPS_sampler_request_single_info_cancel (rhs->sampler_rh);
1233 } 1245 }
1234 RPS_sampler_destroy(rhs->sampler); 1246 RPS_sampler_destroy (rhs->sampler);
1235 rhs->sampler = NULL; 1247 rhs->sampler = NULL;
1236 GNUNET_CONTAINER_DLL_remove(h->rhs_head, 1248 GNUNET_CONTAINER_DLL_remove (h->rhs_head,
1237 h->rhs_tail, 1249 h->rhs_tail,
1238 rhs); 1250 rhs);
1239 GNUNET_free(rhs); 1251 GNUNET_free (rhs);
1240} 1252}
1241 1253
1242 1254
@@ -1246,59 +1258,60 @@ GNUNET_RPS_request_single_info_cancel(
1246 * @param h the handle to the rps service 1258 * @param h the handle to the rps service
1247 */ 1259 */
1248void 1260void
1249GNUNET_RPS_disconnect(struct GNUNET_RPS_Handle *h) 1261GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
1250{ 1262{
1251 if (NULL != h->stream_requests_head) 1263 if (NULL != h->stream_requests_head)
1264 {
1265 struct GNUNET_RPS_StreamRequestHandle *srh_next;
1266
1267 LOG (GNUNET_ERROR_TYPE_WARNING,
1268 "Still waiting for replies\n");
1269 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter =
1270 h->stream_requests_head;
1271 NULL != srh_iter;
1272 srh_iter = srh_next)
1252 { 1273 {
1253 struct GNUNET_RPS_StreamRequestHandle *srh_next; 1274 srh_next = srh_iter->next;
1254 1275 GNUNET_RPS_stream_cancel (srh_iter);
1255 LOG(GNUNET_ERROR_TYPE_WARNING,
1256 "Still waiting for replies\n");
1257 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
1258 NULL != srh_iter;
1259 srh_iter = srh_next)
1260 {
1261 srh_next = srh_iter->next;
1262 GNUNET_RPS_stream_cancel(srh_iter);
1263 }
1264 } 1276 }
1277 }
1265 if (NULL != h->rh_head) 1278 if (NULL != h->rh_head)
1279 {
1280 LOG (GNUNET_ERROR_TYPE_WARNING,
1281 "Not all requests were cancelled!\n");
1282 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1283 h->rh_head != NULL;
1284 rh_iter = h->rh_head)
1266 { 1285 {
1267 LOG(GNUNET_ERROR_TYPE_WARNING, 1286 GNUNET_RPS_request_cancel (rh_iter);
1268 "Not all requests were cancelled!\n");
1269 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1270 h->rh_head != NULL;
1271 rh_iter = h->rh_head)
1272 {
1273 GNUNET_RPS_request_cancel(rh_iter);
1274 }
1275 } 1287 }
1288 }
1276 if (NULL != h->rhs_head) 1289 if (NULL != h->rhs_head)
1290 {
1291 LOG (GNUNET_ERROR_TYPE_WARNING,
1292 "Not all requests were cancelled!\n");
1293 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1294 h->rhs_head != NULL;
1295 rhs_iter = h->rhs_head)
1277 { 1296 {
1278 LOG(GNUNET_ERROR_TYPE_WARNING, 1297 GNUNET_RPS_request_single_info_cancel (rhs_iter);
1279 "Not all requests were cancelled!\n");
1280 for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1281 h->rhs_head != NULL;
1282 rhs_iter = h->rhs_head)
1283 {
1284 GNUNET_RPS_request_single_info_cancel(rhs_iter);
1285 }
1286 } 1298 }
1299 }
1287 if (NULL != srh_callback_peers) 1300 if (NULL != srh_callback_peers)
1288 { 1301 {
1289 GNUNET_free(srh_callback_peers); 1302 GNUNET_free (srh_callback_peers);
1290 srh_callback_peers = NULL; 1303 srh_callback_peers = NULL;
1291 } 1304 }
1292 if (NULL != h->view_update_cb) 1305 if (NULL != h->view_update_cb)
1293 { 1306 {
1294 LOG(GNUNET_ERROR_TYPE_WARNING, 1307 LOG (GNUNET_ERROR_TYPE_WARNING,
1295 "Still waiting for view updates\n"); 1308 "Still waiting for view updates\n");
1296 GNUNET_RPS_view_request_cancel(h); 1309 GNUNET_RPS_view_request_cancel (h);
1297 } 1310 }
1298 if (NULL != h->nse) 1311 if (NULL != h->nse)
1299 GNUNET_NSE_disconnect(h->nse); 1312 GNUNET_NSE_disconnect (h->nse);
1300 GNUNET_MQ_destroy(h->mq); 1313 GNUNET_MQ_destroy (h->mq);
1301 GNUNET_free(h); 1314 GNUNET_free (h);
1302} 1315}
1303 1316
1304 1317