aboutsummaryrefslogtreecommitdiff
path: root/src/statistics/statistics_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-21 18:29:03 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-21 18:29:03 +0000
commit3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899 (patch)
treeb586e13615fe58377cef5c0a238a677e5fd8f609 /src/statistics/statistics_api.c
parent1732154b8c021e7ee0e34c28cf3b1a843454727a (diff)
downloadgnunet-3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899.tar.gz
gnunet-3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899.zip
update statistics API to use new MQ API style, also get rid of timeout argument
Diffstat (limited to 'src/statistics/statistics_api.c')
-rw-r--r--src/statistics/statistics_api.c934
1 files changed, 391 insertions, 543 deletions
diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c
index 32b973eec..37aa99017 100644
--- a/src/statistics/statistics_api.c
+++ b/src/statistics/statistics_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011 GNUnet e.V. 3 Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -88,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry
88 GNUNET_STATISTICS_Iterator proc; 88 GNUNET_STATISTICS_Iterator proc;
89 89
90 /** 90 /**
91 * Closure for proc 91 * Closure for @e proc
92 */ 92 */
93 void *proc_cls; 93 void *proc_cls;
94 94
@@ -137,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle
137 GNUNET_STATISTICS_Iterator proc; 137 GNUNET_STATISTICS_Iterator proc;
138 138
139 /** 139 /**
140 * Closure for proc and cont. 140 * Closure for @e proc and @e cont.
141 */ 141 */
142 void *cls; 142 void *cls;
143 143
@@ -147,11 +147,6 @@ struct GNUNET_STATISTICS_GetHandle
147 struct GNUNET_TIME_Absolute timeout; 147 struct GNUNET_TIME_Absolute timeout;
148 148
149 /** 149 /**
150 * Task run on timeout.
151 */
152 struct GNUNET_SCHEDULER_Task * timeout_task;
153
154 /**
155 * Associated value. 150 * Associated value.
156 */ 151 */
157 uint64_t value; 152 uint64_t value;
@@ -167,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle
167 int aborted; 162 int aborted;
168 163
169 /** 164 /**
170 * Is this a GET, SET, UPDATE or WATCH? 165 * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
171 */ 166 */
172 enum ActionType type; 167 enum ActionType type;
173 168
@@ -195,14 +190,9 @@ struct GNUNET_STATISTICS_Handle
195 const struct GNUNET_CONFIGURATION_Handle *cfg; 190 const struct GNUNET_CONFIGURATION_Handle *cfg;
196 191
197 /** 192 /**
198 * Socket (if available). 193 * Message queue to the service.
199 */ 194 */
200 struct GNUNET_CLIENT_Connection *client; 195 struct GNUNET_MQ_Handle *mq;
201
202 /**
203 * Currently pending transmission request.
204 */
205 struct GNUNET_CLIENT_TransmitHandle *th;
206 196
207 /** 197 /**
208 * Head of the linked list of pending actions (first action 198 * Head of the linked list of pending actions (first action
@@ -230,7 +220,7 @@ struct GNUNET_STATISTICS_Handle
230 /** 220 /**
231 * Task doing exponential back-off trying to reconnect. 221 * Task doing exponential back-off trying to reconnect.
232 */ 222 */
233 struct GNUNET_SCHEDULER_Task * backoff_task; 223 struct GNUNET_SCHEDULER_Task *backoff_task;
234 224
235 /** 225 /**
236 * Time for next connect retry. 226 * Time for next connect retry.
@@ -248,7 +238,7 @@ struct GNUNET_STATISTICS_Handle
248 uint64_t peak_rss; 238 uint64_t peak_rss;
249 239
250 /** 240 /**
251 * Size of the 'watches' array. 241 * Size of the @e watches array.
252 */ 242 */
253 unsigned int watches_size; 243 unsigned int watches_size;
254 244
@@ -321,6 +311,15 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h);
321 311
322 312
323/** 313/**
314 * Reconnect at a later time, respecting back-off.
315 *
316 * @param h statistics handle
317 */
318static void
319reconnect_later (struct GNUNET_STATISTICS_Handle *h);
320
321
322/**
324 * Transmit request to service that we want to watch 323 * Transmit request to service that we want to watch
325 * the development of a particular value. 324 * the development of a particular value.
326 * 325 *
@@ -353,7 +352,8 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
353 ai->type = ACTION_WATCH; 352 ai->type = ACTION_WATCH;
354 ai->proc = watch->proc; 353 ai->proc = watch->proc;
355 ai->cls = watch->proc_cls; 354 ai->cls = watch->proc_cls;
356 GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, 355 GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
356 h->action_tail,
357 ai); 357 ai);
358 schedule_action (h); 358 schedule_action (h);
359} 359}
@@ -367,11 +367,6 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
367static void 367static void
368free_action_item (struct GNUNET_STATISTICS_GetHandle *gh) 368free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
369{ 369{
370 if (NULL != gh->timeout_task)
371 {
372 GNUNET_SCHEDULER_cancel (gh->timeout_task);
373 gh->timeout_task = NULL;
374 }
375 GNUNET_free_non_null (gh->subsystem); 370 GNUNET_free_non_null (gh->subsystem);
376 GNUNET_free_non_null (gh->name); 371 GNUNET_free_non_null (gh->name);
377 GNUNET_free (gh); 372 GNUNET_free (gh);
@@ -388,11 +383,6 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h)
388{ 383{
389 struct GNUNET_STATISTICS_GetHandle *c; 384 struct GNUNET_STATISTICS_GetHandle *c;
390 385
391 if (NULL != h->th)
392 {
393 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
394 h->th = NULL;
395 }
396 h->receiving = GNUNET_NO; 386 h->receiving = GNUNET_NO;
397 if (NULL != (c = h->current)) 387 if (NULL != (c = h->current))
398 { 388 {
@@ -400,393 +390,400 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h)
400 if ( (NULL != c->cont) && 390 if ( (NULL != c->cont) &&
401 (GNUNET_YES != c->aborted) ) 391 (GNUNET_YES != c->aborted) )
402 { 392 {
403 c->cont (c->cls, GNUNET_SYSERR); 393 c->cont (c->cls,
394 GNUNET_SYSERR);
404 c->cont = NULL; 395 c->cont = NULL;
405 } 396 }
406 free_action_item (c); 397 free_action_item (c);
407 } 398 }
408 if (NULL != h->client) 399 if (NULL != h->mq)
409 { 400 {
410 GNUNET_CLIENT_disconnect (h->client); 401 GNUNET_MQ_destroy (h->mq);
411 h->client = NULL; 402 h->mq = NULL;
412 } 403 }
413} 404}
414 405
415 406
416/** 407/**
417 * Try to (re)connect to the statistics service. 408 * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
418 * 409 *
419 * @param h statistics handle to reconnect 410 * @param cls statistics handle
420 * @return #GNUNET_YES on success, #GNUNET_NO on failure. 411 * @param smsg message received from the service, never NULL
412 * @return #GNUNET_OK if the message was well-formed
421 */ 413 */
422static int 414static int
423try_connect (struct GNUNET_STATISTICS_Handle *h) 415check_statistics_value (void *cls,
416 const struct GNUNET_STATISTICS_ReplyMessage *smsg)
424{ 417{
425 struct GNUNET_STATISTICS_GetHandle *gh; 418 const char *service;
426 struct GNUNET_STATISTICS_GetHandle *gn; 419 const char *name;
427 unsigned int i; 420 uint16_t size;
428 421
429 if (NULL != h->backoff_task) 422 size = ntohs (smsg->header.size);
430 return GNUNET_NO; 423 size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
431 if (NULL != h->client) 424 if (size !=
432 return GNUNET_YES; 425 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
433 h->client = GNUNET_CLIENT_connect ("statistics", h->cfg); 426 size,
434 if (NULL != h->client) 427 2,
428 &service,
429 &name))
435 { 430 {
436 gn = h->action_head; 431 GNUNET_break (0);
437 while (NULL != (gh = gn)) 432 return GNUNET_SYSERR;
438 {
439 gn = gh->next;
440 if (gh->type == ACTION_WATCH)
441 {
442 GNUNET_CONTAINER_DLL_remove (h->action_head,
443 h->action_tail,
444 gh);
445 free_action_item (gh);
446 }
447 }
448 for (i = 0; i < h->watches_size; i++)
449 {
450 if (NULL != h->watches[i])
451 schedule_watch_request (h, h->watches[i]);
452 }
453 return GNUNET_YES;
454 } 433 }
455 LOG (GNUNET_ERROR_TYPE_DEBUG, 434 return GNUNET_OK;
456 "Failed to connect to statistics service!\n");
457 return GNUNET_NO;
458} 435}
459 436
460 437
461/** 438/**
462 * We've waited long enough, reconnect now. 439 * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
463 * 440 *
464 * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect 441 * @param cls statistics handle
442 * @param msg message received from the service, never NULL
443 * @return #GNUNET_OK if the message was well-formed
465 */ 444 */
466static void 445static void
467reconnect_task (void *cls) 446handle_statistics_value (void *cls,
447 const struct GNUNET_STATISTICS_ReplyMessage *smsg)
468{ 448{
469 struct GNUNET_STATISTICS_Handle *h = cls; 449 struct GNUNET_STATISTICS_Handle *h = cls;
450 const char *service;
451 const char *name;
452 uint16_t size;
470 453
471 h->backoff_task = NULL; 454 if (h->current->aborted)
472 schedule_action (h); 455 return; /* iteration aborted, don't bother */
456
457 size = ntohs (smsg->header.size);
458 size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
459 GNUNET_assert (size ==
460 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
461 size,
462 2,
463 &service,
464 &name));
465 LOG (GNUNET_ERROR_TYPE_DEBUG,
466 "Received valid statistic on `%s:%s': %llu\n",
467 service, name,
468 GNUNET_ntohll (smsg->value));
469 if (GNUNET_OK !=
470 h->current->proc (h->current->cls,
471 service,
472 name,
473 GNUNET_ntohll (smsg->value),
474 0 !=
475 (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
476 {
477 LOG (GNUNET_ERROR_TYPE_DEBUG,
478 "Processing of remaining statistics aborted by client.\n");
479 h->current->aborted = GNUNET_YES;
480 }
473} 481}
474 482
475 483
476/** 484/**
477 * Task used by 'reconnect_later' to shutdown the handle 485 * We have received a watch value from the service. Process it.
478 * 486 *
479 * @param cls the statistics handle 487 * @param cls statistics handle
488 * @param msg the watch value message
480 */ 489 */
481static void 490static void
482do_destroy (void *cls) 491handle_statistics_watch_value (void *cls,
492 const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
483{ 493{
484 struct GNUNET_STATISTICS_Handle *h = cls; 494 struct GNUNET_STATISTICS_Handle *h = cls;
495 struct GNUNET_STATISTICS_WatchEntry *w;
496 uint32_t wid;
485 497
486 GNUNET_STATISTICS_destroy (h, GNUNET_NO); 498 GNUNET_break (0 == ntohl (wvm->reserved));
499 wid = ntohl (wvm->wid);
500 if (wid >= h->watches_size)
501 {
502 do_disconnect (h);
503 reconnect_later (h);
504 return;
505 }
506 w = h->watches[wid];
507 if (NULL == w)
508 return;
509 (void) w->proc (w->proc_cls,
510 w->subsystem,
511 w->name,
512 GNUNET_ntohll (wvm->value),
513 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
487} 514}
488 515
489 516
490/** 517/**
491 * Reconnect at a later time, respecting back-off. 518 * Generic error handler, called with the appropriate error code and
519 * the same closure specified at the creation of the message queue.
520 * Not every message queue implementation supports an error handler.
492 * 521 *
493 * @param h statistics handle 522 * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
523 * @param error error code
494 */ 524 */
495static void 525static void
496reconnect_later (struct GNUNET_STATISTICS_Handle *h) 526mq_error_handler (void *cls,
527 enum GNUNET_MQ_Error error)
497{ 528{
498 int loss; 529 struct GNUNET_STATISTICS_Handle *h = cls;
499 struct GNUNET_STATISTICS_GetHandle *gh;
500 530
501 GNUNET_assert (NULL == h->backoff_task); 531 if (GNUNET_NO != h->do_destroy)
502 if (GNUNET_YES == h->do_destroy)
503 { 532 {
504 /* So we are shutting down and the service is not reachable.
505 * Chances are that it's down for good and we are not going to connect to
506 * it anymore.
507 * Give up and don't sync the rest of the data.
508 */
509 loss = GNUNET_NO;
510 for (gh = h->action_head; NULL != gh; gh = gh->next)
511 if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
512 loss = GNUNET_YES;
513 if (GNUNET_YES == loss)
514 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
515 _("Could not save some persistent statistics\n"));
516 h->do_destroy = GNUNET_NO; 533 h->do_destroy = GNUNET_NO;
517 GNUNET_SCHEDULER_add_now (&do_destroy, h); 534 GNUNET_STATISTICS_destroy (h,
535 GNUNET_NO);
518 return; 536 return;
519 } 537 }
520 h->backoff_task = 538 do_disconnect (h);
521 GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h); 539 reconnect_later (h);
522 h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
523} 540}
524 541
525 542
526/** 543/**
527 * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. 544 * Task used to destroy the statistics handle.
528 * 545 *
529 * @param h statistics handle 546 * @param cls the `struct GNUNET_STATISTICS_Handle`
530 * @param msg message received from the service, never NULL
531 * @return #GNUNET_OK if the message was well-formed
532 */ 547 */
533static int 548static void
534process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h, 549destroy_task (void *cls)
535 const struct GNUNET_MessageHeader *msg)
536{ 550{
537 char *service; 551 struct GNUNET_STATISTICS_Handle *h = cls;
538 char *name;
539 const struct GNUNET_STATISTICS_ReplyMessage *smsg;
540 uint16_t size;
541 552
542 if (h->current->aborted) 553 GNUNET_STATISTICS_destroy (h, GNUNET_NO);
543 { 554}
544 LOG (GNUNET_ERROR_TYPE_DEBUG, 555
545 "Iteration was aborted, ignoring VALUE\n"); 556
546 return GNUNET_OK; /* don't bother */ 557/**
547 } 558 * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this
548 size = ntohs (msg->size); 559 * message at the end of the shutdown when the service confirms that
549 if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage)) 560 * all data has been written to disk.
561 *
562 * @param cls our `struct GNUNET_STATISTICS_Handle *`
563 * @param msg the message
564 */
565static void
566handle_test (void *cls,
567 const struct GNUNET_MessageHeader *msg)
568{
569 struct GNUNET_STATISTICS_Handle *h = cls;
570
571 if (GNUNET_SYSERR != h->do_destroy)
550 { 572 {
573 /* not in shutdown, why do we get 'TEST'? */
551 GNUNET_break (0); 574 GNUNET_break (0);
552 return GNUNET_SYSERR; 575 do_disconnect (h);
576 reconnect_later (h);
577 return;
553 } 578 }
554 smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg; 579 h->do_destroy = GNUNET_NO;
555 size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); 580 GNUNET_SCHEDULER_add_now (&destroy_task,
556 if (size != 581 h);
557 GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2, 582}
558 &service, &name)) 583
584
585/**
586 * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
587 * this message in response to a query to indicate that there are no
588 * further matching results.
589 *
590 * @param cls our `struct GNUNET_STATISTICS_Handle *`
591 * @param msg the message
592 */
593static void
594handle_statistics_end (void *cls,
595 const struct GNUNET_MessageHeader *msg)
596{
597 struct GNUNET_STATISTICS_Handle *h = cls;
598 struct GNUNET_STATISTICS_GetHandle *c;
599
600 LOG (GNUNET_ERROR_TYPE_DEBUG,
601 "Received end of statistics marker\n");
602 if (NULL == (c = h->current))
559 { 603 {
560 GNUNET_break (0); 604 GNUNET_break (0);
561 return GNUNET_SYSERR; 605 do_disconnect (h);
606 reconnect_later (h);
607 return;
562 } 608 }
563 LOG (GNUNET_ERROR_TYPE_DEBUG, 609 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
564 "Received valid statistic on `%s:%s': %llu\n", 610 h->current = NULL;
565 service, name, 611 schedule_action (h);
566 GNUNET_ntohll (smsg->value)); 612 if (NULL != c->cont)
567 if (GNUNET_OK !=
568 h->current->proc (h->current->cls, service, name,
569 GNUNET_ntohll (smsg->value),
570 0 !=
571 (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
572 { 613 {
573 LOG (GNUNET_ERROR_TYPE_DEBUG, 614 c->cont (c->cls,
574 "Processing of remaining statistics aborted by client.\n"); 615 GNUNET_OK);
575 h->current->aborted = GNUNET_YES; 616 c->cont = NULL;
576 } 617 }
577 LOG (GNUNET_ERROR_TYPE_DEBUG, 618 free_action_item (c);
578 "VALUE processed successfully\n");
579 return GNUNET_OK;
580} 619}
581 620
582 621
583/** 622/**
584 * We have received a watch value from the service. Process it. 623 * Try to (re)connect to the statistics service.
585 * 624 *
586 * @param h statistics handle 625 * @param h statistics handle to reconnect
587 * @param msg the watch value message 626 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
588 * @return #GNUNET_OK if the message was well-formed, #GNUNET_SYSERR if not,
589 * #GNUNET_NO if this watch has been cancelled
590 */ 627 */
591static int 628static int
592process_watch_value (struct GNUNET_STATISTICS_Handle *h, 629try_connect (struct GNUNET_STATISTICS_Handle *h)
593 const struct GNUNET_MessageHeader *msg)
594{ 630{
595 const struct GNUNET_STATISTICS_WatchValueMessage *wvm; 631 GNUNET_MQ_hd_fixed_size (test,
596 struct GNUNET_STATISTICS_WatchEntry *w; 632 GNUNET_MESSAGE_TYPE_TEST,
597 uint32_t wid; 633 struct GNUNET_MessageHeader);
634 GNUNET_MQ_hd_fixed_size (statistics_end,
635 GNUNET_MESSAGE_TYPE_STATISTICS_END,
636 struct GNUNET_MessageHeader);
637 GNUNET_MQ_hd_var_size (statistics_value,
638 GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
639 struct GNUNET_STATISTICS_ReplyMessage);
640 GNUNET_MQ_hd_fixed_size (statistics_watch_value,
641 GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
642 struct GNUNET_STATISTICS_WatchValueMessage);
643 struct GNUNET_MQ_MessageHandler handlers[] = {
644 make_test_handler (h),
645 make_statistics_end_handler (h),
646 make_statistics_value_handler (h),
647 make_statistics_watch_value_handler (h),
648 GNUNET_MQ_handler_end ()
649 };
650 struct GNUNET_STATISTICS_GetHandle *gh;
651 struct GNUNET_STATISTICS_GetHandle *gn;
598 652
599 if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size)) 653 if (NULL != h->backoff_task)
654 return GNUNET_NO;
655 if (NULL != h->mq)
656 return GNUNET_YES;
657 h->mq = GNUNET_CLIENT_connecT (h->cfg,
658 "statistics",
659 handlers,
660 &mq_error_handler,
661 h);
662 if (NULL == h->mq)
600 { 663 {
601 GNUNET_break (0); 664 LOG (GNUNET_ERROR_TYPE_DEBUG,
602 return GNUNET_SYSERR; 665 "Failed to connect to statistics service!\n");
666 return GNUNET_NO;
603 } 667 }
604 wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg; 668 gn = h->action_head;
605 GNUNET_break (0 == ntohl (wvm->reserved)); 669 while (NULL != (gh = gn))
606 wid = ntohl (wvm->wid);
607 if (wid >= h->watches_size)
608 { 670 {
609 GNUNET_break (0); 671 gn = gh->next;
610 return GNUNET_SYSERR; 672 if (gh->type == ACTION_WATCH)
673 {
674 GNUNET_CONTAINER_DLL_remove (h->action_head,
675 h->action_tail,
676 gh);
677 free_action_item (gh);
678 }
611 } 679 }
612 w = h->watches[wid]; 680 for (unsigned int i = 0; i < h->watches_size; i++)
613 if (NULL == w) 681 if (NULL != h->watches[i])
614 return GNUNET_NO; 682 schedule_watch_request (h,
615 (void) w->proc (w->proc_cls, w->subsystem, w->name, 683 h->watches[i]);
616 GNUNET_ntohll (wvm->value), 684 return GNUNET_YES;
617 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
618 return GNUNET_OK;
619} 685}
620 686
621 687
622/** 688/**
623 * Task used to destroy the statistics handle. 689 * We've waited long enough, reconnect now.
624 * 690 *
625 * @param cls the `struct GNUNET_STATISTICS_Handle` 691 * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
626 */ 692 */
627static void 693static void
628destroy_task (void *cls) 694reconnect_task (void *cls)
629{ 695{
630 struct GNUNET_STATISTICS_Handle *h = cls; 696 struct GNUNET_STATISTICS_Handle *h = cls;
631 697
632 GNUNET_STATISTICS_destroy (h, GNUNET_NO); 698 h->backoff_task = NULL;
699 schedule_action (h);
633} 700}
634 701
635 702
636/** 703/**
637 * Function called with messages from stats service. 704 * Task used by #reconnect_later() to shutdown the handle
638 * 705 *
639 * @param cls closure 706 * @param cls the statistics handle
640 * @param msg message received, NULL on timeout or fatal error
641 */ 707 */
642static void 708static void
643receive_stats (void *cls, 709do_destroy (void *cls)
644 const struct GNUNET_MessageHeader *msg)
645{ 710{
646 struct GNUNET_STATISTICS_Handle *h = cls; 711 struct GNUNET_STATISTICS_Handle *h = cls;
647 struct GNUNET_STATISTICS_GetHandle *c;
648 int ret;
649 712
650 if (NULL == msg) 713 GNUNET_STATISTICS_destroy (h,
651 { 714 GNUNET_NO);
652 LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 715}
653 "Error receiving statistics from service, is the service running?\n"); 716
654 do_disconnect (h); 717
655 reconnect_later (h); 718/**
656 return; 719 * Reconnect at a later time, respecting back-off.
657 } 720 *
658 switch (ntohs (msg->type)) 721 * @param h statistics handle
722 */
723static void
724reconnect_later (struct GNUNET_STATISTICS_Handle *h)
725{
726 int loss;
727 struct GNUNET_STATISTICS_GetHandle *gh;
728
729 GNUNET_assert (NULL == h->backoff_task);
730 if (GNUNET_YES == h->do_destroy)
659 { 731 {
660 case GNUNET_MESSAGE_TYPE_TEST: 732 /* So we are shutting down and the service is not reachable.
661 if (GNUNET_SYSERR != h->do_destroy) 733 * Chances are that it's down for good and we are not going to connect to
662 { 734 * it anymore.
663 /* not in shutdown, why do we get 'TEST'? */ 735 * Give up and don't sync the rest of the data.
664 GNUNET_break (0); 736 */
665 do_disconnect (h); 737 loss = GNUNET_NO;
666 reconnect_later (h); 738 for (gh = h->action_head; NULL != gh; gh = gh->next)
667 return; 739 if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
668 } 740 loss = GNUNET_YES;
741 if (GNUNET_YES == loss)
742 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
743 _("Could not save some persistent statistics\n"));
669 h->do_destroy = GNUNET_NO; 744 h->do_destroy = GNUNET_NO;
670 GNUNET_SCHEDULER_add_now (&destroy_task, h); 745 GNUNET_SCHEDULER_add_now (&do_destroy,
671 break; 746 h);
672 case GNUNET_MESSAGE_TYPE_STATISTICS_END:
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Received end of statistics marker\n");
675 if (NULL == (c = h->current))
676 {
677 GNUNET_break (0);
678 do_disconnect (h);
679 reconnect_later (h);
680 return;
681 }
682 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
683 if (h->watches_size > 0)
684 {
685 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
686 GNUNET_TIME_UNIT_FOREVER_REL);
687 }
688 else
689 {
690 h->receiving = GNUNET_NO;
691 }
692 h->current = NULL;
693 schedule_action (h);
694 if (NULL != c->cont)
695 {
696 c->cont (c->cls, GNUNET_OK);
697 c->cont = NULL;
698 }
699 free_action_item (c);
700 return;
701 case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
702 if (GNUNET_OK != process_statistics_value_message (h, msg))
703 {
704 do_disconnect (h);
705 reconnect_later (h);
706 return;
707 }
708 /* finally, look for more! */
709 LOG (GNUNET_ERROR_TYPE_DEBUG,
710 "Processing VALUE done, now reading more\n");
711 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
712 GNUNET_TIME_absolute_get_remaining (h->
713 current->timeout));
714 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
715 return;
716 case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
717 if (GNUNET_OK !=
718 (ret = process_watch_value (h, msg)))
719 {
720 do_disconnect (h);
721 if (GNUNET_NO == ret)
722 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
723 reconnect_later (h);
724 return;
725 }
726 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
727 GNUNET_assert (h->watches_size > 0);
728 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
729 GNUNET_TIME_UNIT_FOREVER_REL);
730 return;
731 default:
732 GNUNET_break (0);
733 do_disconnect (h);
734 reconnect_later (h);
735 return; 747 return;
736 } 748 }
749 h->backoff_task
750 = GNUNET_SCHEDULER_add_delayed (h->backoff,
751 &reconnect_task,
752 h);
753 h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
737} 754}
738 755
739 756
757
740/** 758/**
741 * Transmit a GET request (and if successful, start to receive 759 * Transmit a GET request (and if successful, start to receive
742 * the response). 760 * the response).
743 * 761 *
744 * @param handle statistics handle 762 * @param handle statistics handle
745 * @param size how many bytes can we write to @a buf
746 * @param buf where to write requests to the service
747 * @return number of bytes written to @a buf
748 */ 763 */
749static size_t 764static void
750transmit_get (struct GNUNET_STATISTICS_Handle *handle, 765transmit_get (struct GNUNET_STATISTICS_Handle *handle)
751 size_t size,
752 void *buf)
753{ 766{
754 struct GNUNET_STATISTICS_GetHandle *c; 767 struct GNUNET_STATISTICS_GetHandle *c;
755 struct GNUNET_MessageHeader *hdr; 768 struct GNUNET_MessageHeader *hdr;
769 struct GNUNET_MQ_Envelope *env;
756 size_t slen1; 770 size_t slen1;
757 size_t slen2; 771 size_t slen2;
758 uint16_t msize;
759 772
760 GNUNET_assert (NULL != (c = handle->current)); 773 GNUNET_assert (NULL != (c = handle->current));
761 if (NULL == buf)
762 {
763 /* timeout / error */
764 LOG (GNUNET_ERROR_TYPE_DEBUG,
765 "Transmission of request for statistics failed!\n");
766 do_disconnect (handle);
767 reconnect_later (handle);
768 return 0;
769 }
770 slen1 = strlen (c->subsystem) + 1; 774 slen1 = strlen (c->subsystem) + 1;
771 slen2 = strlen (c->name) + 1; 775 slen2 = strlen (c->name) + 1;
772 msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); 776 env = GNUNET_MQ_msg_extra (hdr,
773 GNUNET_assert (msize <= size); 777 slen1 + slen2,
774 hdr = (struct GNUNET_MessageHeader *) buf; 778 GNUNET_MESSAGE_TYPE_STATISTICS_GET);
775 hdr->size = htons (msize);
776 hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
777 GNUNET_assert (slen1 + slen2 == 779 GNUNET_assert (slen1 + slen2 ==
778 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, 780 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
781 slen1 + slen2,
782 2,
779 c->subsystem, 783 c->subsystem,
780 c->name)); 784 c->name));
781 if (GNUNET_YES != handle->receiving) 785 GNUNET_MQ_send (handle->mq,
782 { 786 env);
783 LOG (GNUNET_ERROR_TYPE_DEBUG,
784 "Transmission of GET done, now reading response\n");
785 handle->receiving = GNUNET_YES;
786 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
787 GNUNET_TIME_absolute_get_remaining (c->timeout));
788 }
789 return msize;
790} 787}
791 788
792 789
@@ -795,53 +792,34 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle,
795 * the response). 792 * the response).
796 * 793 *
797 * @param handle statistics handle 794 * @param handle statistics handle
798 * @param size how many bytes can we write to @a buf
799 * @param buf where to write requests to the service
800 * @return number of bytes written to @a buf
801 */ 795 */
802static size_t 796static void
803transmit_watch (struct GNUNET_STATISTICS_Handle *handle, 797transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
804 size_t size,
805 void *buf)
806{ 798{
807 struct GNUNET_MessageHeader *hdr; 799 struct GNUNET_MessageHeader *hdr;
800 struct GNUNET_MQ_Envelope *env;
808 size_t slen1; 801 size_t slen1;
809 size_t slen2; 802 size_t slen2;
810 uint16_t msize;
811 803
812 if (NULL == buf)
813 {
814 /* timeout / error */
815 LOG (GNUNET_ERROR_TYPE_DEBUG,
816 "Transmission of request for statistics failed!\n");
817 do_disconnect (handle);
818 reconnect_later (handle);
819 return 0;
820 }
821 LOG (GNUNET_ERROR_TYPE_DEBUG, 804 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Transmitting watch request for `%s'\n", 805 "Transmitting watch request for `%s'\n",
823 handle->current->name); 806 handle->current->name);
824 slen1 = strlen (handle->current->subsystem) + 1; 807 slen1 = strlen (handle->current->subsystem) + 1;
825 slen2 = strlen (handle->current->name) + 1; 808 slen2 = strlen (handle->current->name) + 1;
826 msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); 809 env = GNUNET_MQ_msg_extra (hdr,
827 GNUNET_assert (msize <= size); 810 slen1 + slen2,
828 hdr = (struct GNUNET_MessageHeader *) buf; 811 GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
829 hdr->size = htons (msize);
830 hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
831 GNUNET_assert (slen1 + slen2 == 812 GNUNET_assert (slen1 + slen2 ==
832 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, 813 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
814 slen1 + slen2,
815 2,
833 handle->current->subsystem, 816 handle->current->subsystem,
834 handle->current->name)); 817 handle->current->name));
835 if (GNUNET_YES != handle->receiving) 818 GNUNET_MQ_send (handle->mq,
836 { 819 env);
837 handle->receiving = GNUNET_YES;
838 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
839 GNUNET_TIME_UNIT_FOREVER_REL);
840 }
841 GNUNET_assert (NULL == handle->current->cont); 820 GNUNET_assert (NULL == handle->current->cont);
842 free_action_item (handle->current); 821 free_action_item (handle->current);
843 handle->current = NULL; 822 handle->current = NULL;
844 return msize;
845} 823}
846 824
847 825
@@ -849,39 +827,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle,
849 * Transmit a SET/UPDATE request. 827 * Transmit a SET/UPDATE request.
850 * 828 *
851 * @param handle statistics handle 829 * @param handle statistics handle
852 * @param size how many bytes can we write to @a buf
853 * @param buf where to write requests to the service
854 * @return number of bytes written to @a buf
855 */ 830 */
856static size_t 831static void
857transmit_set (struct GNUNET_STATISTICS_Handle *handle, 832transmit_set (struct GNUNET_STATISTICS_Handle *handle)
858 size_t size,
859 void *buf)
860{ 833{
861 struct GNUNET_STATISTICS_SetMessage *r; 834 struct GNUNET_STATISTICS_SetMessage *r;
835 struct GNUNET_MQ_Envelope *env;
862 size_t slen; 836 size_t slen;
863 size_t nlen; 837 size_t nlen;
864 size_t nsize;
865 838
866 if (NULL == buf)
867 {
868 do_disconnect (handle);
869 reconnect_later (handle);
870 return 0;
871 }
872 slen = strlen (handle->current->subsystem) + 1; 839 slen = strlen (handle->current->subsystem) + 1;
873 nlen = strlen (handle->current->name) + 1; 840 nlen = strlen (handle->current->name) + 1;
874 nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; 841 env = GNUNET_MQ_msg_extra (r,
875 if (size < nsize) 842 slen + nlen,
876 { 843 GNUNET_MESSAGE_TYPE_STATISTICS_SET);
877 GNUNET_break (0);
878 do_disconnect (handle);
879 reconnect_later (handle);
880 return 0;
881 }
882 r = buf;
883 r->header.size = htons (nsize);
884 r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
885 r->flags = 0; 844 r->flags = 0;
886 r->value = GNUNET_htonll (handle->current->value); 845 r->value = GNUNET_htonll (handle->current->value);
887 if (handle->current->make_persistent) 846 if (handle->current->make_persistent)
@@ -889,52 +848,17 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle,
889 if (handle->current->type == ACTION_UPDATE) 848 if (handle->current->type == ACTION_UPDATE)
890 r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE); 849 r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
891 GNUNET_assert (slen + nlen == 850 GNUNET_assert (slen + nlen ==
892 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2, 851 GNUNET_STRINGS_buffer_fill ((char *) &r[1],
852 slen + nlen,
853 2,
893 handle->current->subsystem, 854 handle->current->subsystem,
894 handle->current->name)); 855 handle->current->name));
895 GNUNET_assert (NULL == handle->current->cont); 856 GNUNET_assert (NULL == handle->current->cont);
896 free_action_item (handle->current); 857 free_action_item (handle->current);
897 handle->current = NULL; 858 handle->current = NULL;
898 update_memory_statistics (handle); 859 update_memory_statistics (handle);
899 return nsize; 860 GNUNET_MQ_send (handle->mq,
900} 861 env);
901
902
903/**
904 * Function called when we are ready to transmit a request to the service.
905 *
906 * @param cls the `struct GNUNET_STATISTICS_Handle`
907 * @param size how many bytes can we write to @a buf
908 * @param buf where to write requests to the service
909 * @return number of bytes written to @a buf
910 */
911static size_t
912transmit_action (void *cls, size_t size, void *buf)
913{
914 struct GNUNET_STATISTICS_Handle *h = cls;
915 size_t ret;
916
917 h->th = NULL;
918 ret = 0;
919 if (NULL != h->current)
920 switch (h->current->type)
921 {
922 case ACTION_GET:
923 ret = transmit_get (h, size, buf);
924 break;
925 case ACTION_SET:
926 case ACTION_UPDATE:
927 ret = transmit_set (h, size, buf);
928 break;
929 case ACTION_WATCH:
930 ret = transmit_watch (h, size, buf);
931 break;
932 default:
933 GNUNET_assert (0);
934 break;
935 }
936 schedule_action (h);
937 return ret;
938} 862}
939 863
940 864
@@ -952,10 +876,10 @@ GNUNET_STATISTICS_create (const char *subsystem,
952 struct GNUNET_STATISTICS_Handle *ret; 876 struct GNUNET_STATISTICS_Handle *ret;
953 877
954 if (GNUNET_YES == 878 if (GNUNET_YES ==
955 GNUNET_CONFIGURATION_get_value_yesno (cfg, "statistics", "DISABLE")) 879 GNUNET_CONFIGURATION_get_value_yesno (cfg,
880 "statistics",
881 "DISABLE"))
956 return NULL; 882 return NULL;
957 GNUNET_assert (NULL != subsystem);
958 GNUNET_assert (NULL != cfg);
959 ret = GNUNET_new (struct GNUNET_STATISTICS_Handle); 883 ret = GNUNET_new (struct GNUNET_STATISTICS_Handle);
960 ret->cfg = cfg; 884 ret->cfg = cfg;
961 ret->subsystem = GNUNET_strdup (subsystem); 885 ret->subsystem = GNUNET_strdup (subsystem);
@@ -978,8 +902,6 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
978{ 902{
979 struct GNUNET_STATISTICS_GetHandle *pos; 903 struct GNUNET_STATISTICS_GetHandle *pos;
980 struct GNUNET_STATISTICS_GetHandle *next; 904 struct GNUNET_STATISTICS_GetHandle *next;
981 struct GNUNET_TIME_Relative timeout;
982 int i;
983 905
984 if (NULL == h) 906 if (NULL == h)
985 return; 907 return;
@@ -989,26 +911,19 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
989 GNUNET_SCHEDULER_cancel (h->backoff_task); 911 GNUNET_SCHEDULER_cancel (h->backoff_task);
990 h->backoff_task = NULL; 912 h->backoff_task = NULL;
991 } 913 }
992 if (sync_first) 914 if ( (sync_first) &&
915 (GNUNET_YES == try_connect (h)) )
993 { 916 {
994 if (NULL != h->current) 917 if ( (NULL != h->current) &&
995 { 918 (ACTION_GET == h->current->type) )
996 if (ACTION_GET == h->current->type) 919 h->current->aborted = GNUNET_YES;
997 {
998 if (NULL != h->th)
999 {
1000 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
1001 h->th = NULL;
1002 }
1003 free_action_item (h->current);
1004 h->current = NULL;
1005 }
1006 }
1007 next = h->action_head; 920 next = h->action_head;
1008 while (NULL != (pos = next)) 921 while (NULL != (pos = next))
1009 { 922 {
1010 next = pos->next; 923 next = pos->next;
1011 if (ACTION_GET == pos->type) 924 if ( (ACTION_GET == pos->type) ||
925 (ACTION_WATCH == pos->type) ||
926 (GNUNET_NO == pos->make_persistent) )
1012 { 927 {
1013 GNUNET_CONTAINER_DLL_remove (h->action_head, 928 GNUNET_CONTAINER_DLL_remove (h->action_head,
1014 h->action_tail, 929 h->action_tail,
@@ -1016,25 +931,11 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
1016 free_action_item (pos); 931 free_action_item (pos);
1017 } 932 }
1018 } 933 }
1019 if ( (NULL == h->current) &&
1020 (NULL != (h->current = h->action_head)) )
1021 GNUNET_CONTAINER_DLL_remove (h->action_head,
1022 h->action_tail,
1023 h->current);
1024 h->do_destroy = GNUNET_YES; 934 h->do_destroy = GNUNET_YES;
1025 if ((NULL != h->current) && (NULL == h->th) && 935 schedule_action (h);
1026 (NULL != h->client)) 936 return; /* do not finish destruction just yet */
1027 {
1028 timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1029 h->th =
1030 GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1031 timeout, GNUNET_YES,
1032 &transmit_action, h);
1033 GNUNET_assert (NULL != h->th);
1034 }
1035 if (NULL != h->th)
1036 return; /* do not finish destruction just yet */
1037 } 937 }
938 /* do clean up all */
1038 while (NULL != (pos = h->action_head)) 939 while (NULL != (pos = h->action_head))
1039 { 940 {
1040 GNUNET_CONTAINER_DLL_remove (h->action_head, 941 GNUNET_CONTAINER_DLL_remove (h->action_head,
@@ -1043,7 +944,7 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
1043 free_action_item (pos); 944 free_action_item (pos);
1044 } 945 }
1045 do_disconnect (h); 946 do_disconnect (h);
1046 for (i = 0; i < h->watches_size; i++) 947 for (unsigned int i = 0; i < h->watches_size; i++)
1047 { 948 {
1048 if (NULL == h->watches[i]) 949 if (NULL == h->watches[i])
1049 continue; 950 continue;
@@ -1051,53 +952,15 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
1051 GNUNET_free (h->watches[i]->name); 952 GNUNET_free (h->watches[i]->name);
1052 GNUNET_free (h->watches[i]); 953 GNUNET_free (h->watches[i]);
1053 } 954 }
1054 GNUNET_array_grow (h->watches, h->watches_size, 0); 955 GNUNET_array_grow (h->watches,
956 h->watches_size,
957 0);
1055 GNUNET_free (h->subsystem); 958 GNUNET_free (h->subsystem);
1056 GNUNET_free (h); 959 GNUNET_free (h);
1057} 960}
1058 961
1059 962
1060/** 963/**
1061 * Function called to transmit TEST message to service to
1062 * confirm that the service has received all of our 'SET'
1063 * messages (during statistics disconnect/shutdown).
1064 *
1065 * @param cls the `struct GNUNET_STATISTICS_Handle`
1066 * @param size how many bytes can we write to @a buf
1067 * @param buf where to write requests to the service
1068 * @return number of bytes written to @a buf
1069 */
1070static size_t
1071transmit_test_on_shutdown (void *cls,
1072 size_t size,
1073 void *buf)
1074{
1075 struct GNUNET_STATISTICS_Handle *h = cls;
1076 struct GNUNET_MessageHeader hdr;
1077
1078 h->th = NULL;
1079 if (NULL == buf)
1080 {
1081 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1082 _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
1083 h->do_destroy = GNUNET_NO;
1084 GNUNET_SCHEDULER_add_now (&destroy_task, h);
1085 return 0;
1086 }
1087 hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
1088 hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
1089 memcpy (buf, &hdr, sizeof (hdr));
1090 if (GNUNET_YES != h->receiving)
1091 {
1092 h->receiving = GNUNET_YES;
1093 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
1094 GNUNET_TIME_UNIT_FOREVER_REL);
1095 }
1096 return sizeof (struct GNUNET_MessageHeader);
1097}
1098
1099
1100/**
1101 * Schedule the next action to be performed. 964 * Schedule the next action to be performed.
1102 * 965 *
1103 * @param h statistics handle 966 * @param h statistics handle
@@ -1105,76 +968,61 @@ transmit_test_on_shutdown (void *cls,
1105static void 968static void
1106schedule_action (struct GNUNET_STATISTICS_Handle *h) 969schedule_action (struct GNUNET_STATISTICS_Handle *h)
1107{ 970{
1108 struct GNUNET_TIME_Relative timeout; 971 if (NULL != h->backoff_task)
1109
1110 if ( (NULL != h->th) ||
1111 (NULL != h->backoff_task) )
1112 return; /* action already pending */ 972 return; /* action already pending */
1113 if (GNUNET_YES != try_connect (h)) 973 if (GNUNET_YES != try_connect (h))
1114 { 974 {
1115 reconnect_later (h); 975 reconnect_later (h);
1116 return; 976 return;
1117 } 977 }
1118 if (NULL != h->current)
1119 return; /* action already pending */
1120 /* schedule next action */ 978 /* schedule next action */
1121 h->current = h->action_head; 979 while (NULL == h->current)
1122 if (NULL == h->current)
1123 { 980 {
1124 if (GNUNET_YES == h->do_destroy) 981 h->current = h->action_head;
982 if (NULL == h->current)
1125 { 983 {
984 struct GNUNET_MessageHeader *hdr;
985 struct GNUNET_MQ_Envelope *env;
986
987 if (GNUNET_YES != h->do_destroy)
988 return; /* nothing to do */
989 /* let service know that we're done */
1126 h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ 990 h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1127 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 991 env = GNUNET_MQ_msg (hdr,
1128 sizeof (struct GNUNET_MessageHeader), 992 GNUNET_MESSAGE_TYPE_TEST);
1129 SET_TRANSMIT_TIMEOUT, 993 GNUNET_MQ_send (h->mq,
1130 GNUNET_NO, 994 env);
1131 &transmit_test_on_shutdown, h); 995 return;
996 }
997 GNUNET_CONTAINER_DLL_remove (h->action_head,
998 h->action_tail,
999 h->current);
1000 switch (h->current->type)
1001 {
1002 case ACTION_GET:
1003 transmit_get (h);
1004 break;
1005 case ACTION_SET:
1006 case ACTION_UPDATE:
1007 transmit_set (h);
1008 break;
1009 case ACTION_WATCH:
1010 transmit_watch (h);
1011 break;
1012 default:
1013 GNUNET_assert (0);
1014 break;
1132 } 1015 }
1133 return;
1134 }
1135 GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
1136 timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1137 if (NULL ==
1138 (h->th =
1139 GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1140 timeout, GNUNET_YES,
1141 &transmit_action, h)))
1142 {
1143 LOG (GNUNET_ERROR_TYPE_DEBUG,
1144 "Failed to transmit request to statistics service.\n");
1145 do_disconnect (h);
1146 reconnect_later (h);
1147 } 1016 }
1148} 1017}
1149 1018
1150 1019
1151/** 1020/**
1152 * We have run into a timeout on a #GNUNET_STATISTICS_get() operation,
1153 * call the continuation.
1154 *
1155 * @param cls the `struct GNUNET_STATISTICS_GetHandle`
1156 */
1157static void
1158run_get_timeout (void *cls)
1159{
1160 struct GNUNET_STATISTICS_GetHandle *gh = cls;
1161 GNUNET_STATISTICS_Callback cont = gh->cont;
1162 void *cont_cls = gh->cls;
1163
1164 gh->timeout_task = NULL;
1165 GNUNET_STATISTICS_get_cancel (gh);
1166 cont (cont_cls, GNUNET_SYSERR);
1167}
1168
1169
1170/**
1171 * Get statistic from the peer. 1021 * Get statistic from the peer.
1172 * 1022 *
1173 * @param handle identification of the statistics service 1023 * @param handle identification of the statistics service
1174 * @param subsystem limit to the specified subsystem, NULL for our subsystem 1024 * @param subsystem limit to the specified subsystem, NULL for our subsystem
1175 * @param name name of the statistic value, NULL for all values 1025 * @param name name of the statistic value, NULL for all values
1176 * @param timeout after how long should we give up (and call
1177 * cont with an error code)?
1178 * @param cont continuation to call when done (can be NULL) 1026 * @param cont continuation to call when done (can be NULL)
1179 * This callback CANNOT destroy the statistics handle in the same call. 1027 * This callback CANNOT destroy the statistics handle in the same call.
1180 * @param proc function to call on each value 1028 * @param proc function to call on each value
@@ -1183,10 +1031,11 @@ run_get_timeout (void *cls)
1183 */ 1031 */
1184struct GNUNET_STATISTICS_GetHandle * 1032struct GNUNET_STATISTICS_GetHandle *
1185GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, 1033GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1186 const char *subsystem, const char *name, 1034 const char *subsystem,
1187 struct GNUNET_TIME_Relative timeout, 1035 const char *name,
1188 GNUNET_STATISTICS_Callback cont, 1036 GNUNET_STATISTICS_Callback cont,
1189 GNUNET_STATISTICS_Iterator proc, void *cls) 1037 GNUNET_STATISTICS_Iterator proc,
1038 void *cls)
1190{ 1039{
1191 size_t slen1; 1040 size_t slen1;
1192 size_t slen2; 1041 size_t slen2;
@@ -1211,12 +1060,8 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1211 ai->cont = cont; 1060 ai->cont = cont;
1212 ai->proc = proc; 1061 ai->proc = proc;
1213 ai->cls = cls; 1062 ai->cls = cls;
1214 ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1215 ai->type = ACTION_GET; 1063 ai->type = ACTION_GET;
1216 ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); 1064 ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1217 ai->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1218 &run_get_timeout,
1219 ai);
1220 GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, 1065 GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1221 handle->action_tail, 1066 handle->action_tail,
1222 ai); 1067 ai);
@@ -1236,23 +1081,18 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1236{ 1081{
1237 if (NULL == gh) 1082 if (NULL == gh)
1238 return; 1083 return;
1239 if (NULL != gh->timeout_task)
1240 {
1241 GNUNET_SCHEDULER_cancel (gh->timeout_task);
1242 gh->timeout_task = NULL;
1243 }
1244 gh->cont = NULL; 1084 gh->cont = NULL;
1245 if (gh->sh->current == gh) 1085 if (gh->sh->current == gh)
1246 { 1086 {
1247 gh->aborted = GNUNET_YES; 1087 gh->aborted = GNUNET_YES;
1088 return;
1248 } 1089 }
1249 else 1090 GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1250 { 1091 gh->sh->action_tail,
1251 GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh); 1092 gh);
1252 GNUNET_free (gh->name); 1093 GNUNET_free (gh->name);
1253 GNUNET_free (gh->subsystem); 1094 GNUNET_free (gh->subsystem);
1254 GNUNET_free (gh); 1095 GNUNET_free (gh);
1255 }
1256} 1096}
1257 1097
1258 1098
@@ -1268,8 +1108,10 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1268 */ 1108 */
1269int 1109int
1270GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, 1110GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1271 const char *subsystem, const char *name, 1111 const char *subsystem,
1272 GNUNET_STATISTICS_Iterator proc, void *proc_cls) 1112 const char *name,
1113 GNUNET_STATISTICS_Iterator proc,
1114 void *proc_cls)
1273{ 1115{
1274 struct GNUNET_STATISTICS_WatchEntry *w; 1116 struct GNUNET_STATISTICS_WatchEntry *w;
1275 1117
@@ -1280,8 +1122,11 @@ GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1280 w->name = GNUNET_strdup (name); 1122 w->name = GNUNET_strdup (name);
1281 w->proc = proc; 1123 w->proc = proc;
1282 w->proc_cls = proc_cls; 1124 w->proc_cls = proc_cls;
1283 GNUNET_array_append (handle->watches, handle->watches_size, w); 1125 GNUNET_array_append (handle->watches,
1284 schedule_watch_request (handle, w); 1126 handle->watches_size,
1127 w);
1128 schedule_watch_request (handle,
1129 w);
1285 return GNUNET_OK; 1130 return GNUNET_OK;
1286} 1131}
1287 1132
@@ -1304,11 +1149,10 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1304 void *proc_cls) 1149 void *proc_cls)
1305{ 1150{
1306 struct GNUNET_STATISTICS_WatchEntry *w; 1151 struct GNUNET_STATISTICS_WatchEntry *w;
1307 unsigned int i;
1308 1152
1309 if (NULL == handle) 1153 if (NULL == handle)
1310 return GNUNET_SYSERR; 1154 return GNUNET_SYSERR;
1311 for (i=0;i<handle->watches_size;i++) 1155 for (unsigned int i=0;i<handle->watches_size;i++)
1312 { 1156 {
1313 w = handle->watches[i]; 1157 w = handle->watches[i];
1314 if (NULL == w) 1158 if (NULL == w)
@@ -1329,7 +1173,6 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1329} 1173}
1330 1174
1331 1175
1332
1333/** 1176/**
1334 * Queue a request to change a statistic. 1177 * Queue a request to change a statistic.
1335 * 1178 *
@@ -1421,7 +1264,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1421 ai->msize = nsize; 1264 ai->msize = nsize;
1422 ai->value = value; 1265 ai->value = value;
1423 ai->type = type; 1266 ai->type = type;
1424 GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, 1267 GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1268 h->action_tail,
1425 ai); 1269 ai);
1426 schedule_action (h); 1270 schedule_action (h);
1427} 1271}
@@ -1445,7 +1289,11 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1445 if (NULL == handle) 1289 if (NULL == handle)
1446 return; 1290 return;
1447 GNUNET_assert (GNUNET_NO == handle->do_destroy); 1291 GNUNET_assert (GNUNET_NO == handle->do_destroy);
1448 add_setter_action (handle, name, make_persistent, value, ACTION_SET); 1292 add_setter_action (handle,
1293 name,
1294 make_persistent,
1295 value,
1296 ACTION_SET);
1449} 1297}
1450 1298
1451 1299