aboutsummaryrefslogtreecommitdiff
path: root/src/statistics
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-12-16 22:19:26 +0000
committerChristian Grothoff <christian@grothoff.org>2011-12-16 22:19:26 +0000
commitfca68d261172f147382f00b685faf52a9b022846 (patch)
treeaf9c4f7fb4373a0a9a42ffe4511c63dc7c7608e1 /src/statistics
parentb4ad23dde8a344c9adccdb00a9e6f53ca26fa1de (diff)
downloadgnunet-fca68d261172f147382f00b685faf52a9b022846.tar.gz
gnunet-fca68d261172f147382f00b685faf52a9b022846.zip
code cleanup, also trying to fix #2011
Diffstat (limited to 'src/statistics')
-rw-r--r--src/statistics/statistics_api.c516
-rw-r--r--src/statistics/test_statistics_api_watch.c2
2 files changed, 306 insertions, 212 deletions
diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c
index 281ba8a9b..59703ac1f 100644
--- a/src/statistics/statistics_api.c
+++ b/src/statistics/statistics_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -47,9 +47,24 @@
47 */ 47 */
48enum ActionType 48enum ActionType
49{ 49{
50 /**
51 * Get a value.
52 */
50 ACTION_GET, 53 ACTION_GET,
54
55 /**
56 * Set a value.
57 */
51 ACTION_SET, 58 ACTION_SET,
59
60 /**
61 * Update a value.
62 */
52 ACTION_UPDATE, 63 ACTION_UPDATE,
64
65 /**
66 * Watch a value.
67 */
53 ACTION_WATCH 68 ACTION_WATCH
54}; 69};
55 70
@@ -239,33 +254,22 @@ struct GNUNET_STATISTICS_Handle
239}; 254};
240 255
241 256
242
243/** 257/**
244 * Schedule the next action to be performed. 258 * Schedule the next action to be performed.
259 *
260 * @param h statistics handle to reconnect
245 */ 261 */
246static void 262static void
247schedule_action (struct GNUNET_STATISTICS_Handle *h); 263schedule_action (struct GNUNET_STATISTICS_Handle *h);
248 264
265
249/** 266/**
250 * Try to (re)connect to the statistics service. 267 * Transmit request to service that we want to watch
268 * the development of a particular value.
251 * 269 *
252 * @return GNUNET_YES on success, GNUNET_NO on failure. 270 * @param h statistics handle
271 * @param watch watch entry of the value to watch
253 */ 272 */
254static int
255try_connect (struct GNUNET_STATISTICS_Handle *ret);
256
257
258static void
259insert_ai (struct GNUNET_STATISTICS_Handle *h,
260 struct GNUNET_STATISTICS_GetHandle *ai)
261{
262 GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
263 ai);
264 if (h->action_head == ai)
265 schedule_action (h);
266}
267
268
269static void 273static void
270schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, 274schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
271 struct GNUNET_STATISTICS_WatchEntry *watch) 275 struct GNUNET_STATISTICS_WatchEntry *watch)
@@ -277,11 +281,6 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
277 size_t nsize; 281 size_t nsize;
278 282
279 GNUNET_assert (h != NULL); 283 GNUNET_assert (h != NULL);
280 if (GNUNET_YES != try_connect (h))
281 {
282 schedule_action (h);
283 return;
284 }
285 slen = strlen (watch->subsystem) + 1; 284 slen = strlen (watch->subsystem) + 1;
286 nlen = strlen (watch->name) + 1; 285 nlen = strlen (watch->name) + 1;
287 nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen; 286 nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
@@ -299,27 +298,88 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
299 ai->type = ACTION_WATCH; 298 ai->type = ACTION_WATCH;
300 ai->proc = watch->proc; 299 ai->proc = watch->proc;
301 ai->cls = watch->proc_cls; 300 ai->cls = watch->proc_cls;
302 insert_ai (h, ai); 301 GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
302 ai);
303 schedule_action (h);
304}
305
306
307/**
308 * Free memory associated with the given action item.
309 *
310 * @param gh action item to free
311 */
312static void
313free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
314{
315 GNUNET_free_non_null (gh->subsystem);
316 GNUNET_free_non_null (gh->name);
317 GNUNET_free (gh);
318}
319
320
321/**
322 * Disconnect from the statistics service.
323 *
324 * @param h statistics handle to disconnect from
325 */
326static void
327do_disconnect (struct GNUNET_STATISTICS_Handle *h)
328{
329 struct GNUNET_STATISTICS_GetHandle *c;
330
331 if (NULL != h->th)
332 {
333 h->th = NULL;
334 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
335 }
336 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
337 h->client = NULL;
338 h->receiving = GNUNET_NO;
339 if (NULL != (c = h->current))
340 {
341 h->current = NULL;
342 if (c->cont != NULL)
343 c->cont (c->cls, GNUNET_SYSERR);
344 free_action_item (c);
345 }
303} 346}
304 347
305 348
306/** 349/**
307 * Try to (re)connect to the statistics service. 350 * Try to (re)connect to the statistics service.
308 * 351 *
352 * @param h statistics handle to reconnect
309 * @return GNUNET_YES on success, GNUNET_NO on failure. 353 * @return GNUNET_YES on success, GNUNET_NO on failure.
310 */ 354 */
311static int 355static int
312try_connect (struct GNUNET_STATISTICS_Handle *ret) 356try_connect (struct GNUNET_STATISTICS_Handle *h)
313{ 357{
358 struct GNUNET_STATISTICS_GetHandle *gh;
359 struct GNUNET_STATISTICS_GetHandle *gn;
314 unsigned int i; 360 unsigned int i;
315 361
316 if (ret->client != NULL) 362 if (h->backoff_task != GNUNET_SCHEDULER_NO_TASK)
363 return GNUNET_NO;
364 if (h->client != NULL)
317 return GNUNET_YES; 365 return GNUNET_YES;
318 ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg); 366 h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);
319 if (ret->client != NULL) 367 if (h->client != NULL)
320 { 368 {
321 for (i = 0; i < ret->watches_size; i++) 369 gn = h->action_head;
322 schedule_watch_request (ret, ret->watches[i]); 370 while (NULL != (gh = gn))
371 {
372 gn = gh->next;
373 if (gh->type == ACTION_WATCH)
374 {
375 GNUNET_CONTAINER_DLL_remove (h->action_head,
376 h->action_tail,
377 gh);
378 free_action_item (gh);
379 }
380 }
381 for (i = 0; i < h->watches_size; i++)
382 schedule_watch_request (h, h->watches[i]);
323 return GNUNET_YES; 383 return GNUNET_YES;
324 } 384 }
325#if DEBUG_STATISTICS 385#if DEBUG_STATISTICS
@@ -331,44 +391,48 @@ try_connect (struct GNUNET_STATISTICS_Handle *ret)
331 391
332 392
333/** 393/**
334 * Free memory associated with the given action item. 394 * We've waited long enough, reconnect now.
395 *
396 * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
397 * @param tc scheduler context (unused)
335 */ 398 */
336static void 399static void
337free_action_item (struct GNUNET_STATISTICS_GetHandle *ai) 400reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
338{ 401{
339 GNUNET_free_non_null (ai->subsystem); 402 struct GNUNET_STATISTICS_Handle *h = cls;
340 GNUNET_free_non_null (ai->name); 403
341 GNUNET_free (ai); 404 h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
405 schedule_action (h);
342} 406}
343 407
344 408
345/** 409/**
346 * GET processing is complete, tell client about it. 410 * Reconnect at a later time, respecting back-off.
411 *
412 * @param h statistics handle
347 */ 413 */
348static void 414static void
349finish (struct GNUNET_STATISTICS_Handle *h, int code) 415reconnect_later (struct GNUNET_STATISTICS_Handle *h)
350{ 416{
351 struct GNUNET_STATISTICS_GetHandle *pos = h->current; 417 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
352 418 h->backoff_task =
353 h->current = NULL; 419 GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
354 schedule_action (h); 420 h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
355 if (pos != NULL) 421 h->backoff =
356 { 422 GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
357 if (pos->cont != NULL)
358 pos->cont (pos->cls, code);
359 free_action_item (pos);
360 }
361} 423}
362 424
363 425
364/** 426/**
365 * Process the message. 427 * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
366 * 428 *
429 * @param h statistics handle
430 * @param msg message received from the service, never NULL
367 * @return GNUNET_OK if the message was well-formed 431 * @return GNUNET_OK if the message was well-formed
368 */ 432 */
369static int 433static int
370process_message (struct GNUNET_STATISTICS_Handle *h, 434process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
371 const struct GNUNET_MessageHeader *msg) 435 const struct GNUNET_MessageHeader *msg)
372{ 436{
373 char *service; 437 char *service;
374 char *name; 438 char *name;
@@ -420,6 +484,13 @@ process_message (struct GNUNET_STATISTICS_Handle *h,
420} 484}
421 485
422 486
487/**
488 * We have received a watch value from the service. Process it.
489 *
490 * @param h statistics handle
491 * @param msg the watch value message
492 * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not
493 */
423static int 494static int
424process_watch_value (struct GNUNET_STATISTICS_Handle *h, 495process_watch_value (struct GNUNET_STATISTICS_Handle *h,
425 const struct GNUNET_MessageHeader *msg) 496 const struct GNUNET_MessageHeader *msg)
@@ -459,19 +530,17 @@ static void
459receive_stats (void *cls, const struct GNUNET_MessageHeader *msg) 530receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
460{ 531{
461 struct GNUNET_STATISTICS_Handle *h = cls; 532 struct GNUNET_STATISTICS_Handle *h = cls;
533 struct GNUNET_STATISTICS_GetHandle *c;
534
462 535
463 if (msg == NULL) 536 if (msg == NULL)
464 { 537 {
465 if (NULL != h->client)
466 {
467 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
468 h->client = NULL;
469 }
470#if DEBUG_STATISTICS 538#if DEBUG_STATISTICS
471 LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 539 LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
472 "Error receiving statistics from service, is the service running?\n"); 540 "Error receiving statistics from service, is the service running?\n");
473#endif 541#endif
474 finish (h, GNUNET_SYSERR); 542 do_disconnect (h);
543 reconnect_later (h);
475 return; 544 return;
476 } 545 }
477 switch (ntohs (msg->type)) 546 switch (ntohs (msg->type))
@@ -480,6 +549,13 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
480#if DEBUG_STATISTICS 549#if DEBUG_STATISTICS
481 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n"); 550 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
482#endif 551#endif
552 if (NULL == (c = h->current))
553 {
554 GNUNET_break (0);
555 do_disconnect (h);
556 reconnect_later (h);
557 return;
558 }
483 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 559 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
484 if (h->watches_size > 0) 560 if (h->watches_size > 0)
485 { 561 {
@@ -489,73 +565,84 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
489 else 565 else
490 { 566 {
491 h->receiving = GNUNET_NO; 567 h->receiving = GNUNET_NO;
492 } 568 }
493 finish (h, GNUNET_OK); 569 h->current = NULL;
570 schedule_action (h);
571 if (c->cont != NULL)
572 c->cont (c->cls, GNUNET_OK);
573 free_action_item (c);
494 return; 574 return;
495 case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE: 575 case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
496 if (GNUNET_OK == process_message (h, msg)) 576 if (GNUNET_OK != process_statistics_value_message (h, msg))
497 { 577 {
498 /* finally, look for more! */ 578 do_disconnect (h);
579 reconnect_later (h);
580 return;
581 }
582 /* finally, look for more! */
499#if DEBUG_STATISTICS 583#if DEBUG_STATISTICS
500 LOG (GNUNET_ERROR_TYPE_DEBUG, 584 LOG (GNUNET_ERROR_TYPE_DEBUG,
501 "Processing VALUE done, now reading more\n"); 585 "Processing VALUE done, now reading more\n");
502#endif 586#endif
503 GNUNET_CLIENT_receive (h->client, &receive_stats, h, 587 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
504 GNUNET_TIME_absolute_get_remaining (h-> 588 GNUNET_TIME_absolute_get_remaining (h->
505 current->timeout)); 589 current->timeout));
506 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 590 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
507 return; 591 return;
508 }
509 GNUNET_break (0);
510 break;
511 case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE: 592 case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
512 if (GNUNET_OK == process_watch_value (h, msg)) 593 if (GNUNET_OK !=
594 process_watch_value (h, msg))
513 { 595 {
514 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 596 do_disconnect (h);
515 GNUNET_assert (h->watches_size > 0); 597 reconnect_later (h);
516 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
517 GNUNET_TIME_UNIT_FOREVER_REL);
518 return; 598 return;
519 } 599 }
520 GNUNET_break (0); 600 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
521 break; 601 GNUNET_assert (h->watches_size > 0);
602 GNUNET_CLIENT_receive (h->client, &receive_stats, h,
603 GNUNET_TIME_UNIT_FOREVER_REL);
604 return;
522 default: 605 default:
523 GNUNET_break (0); 606 GNUNET_break (0);
524 break; 607 do_disconnect (h);
525 } 608 reconnect_later (h);
526 if (NULL != h->client) 609 return;
527 {
528 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
529 h->client = NULL;
530 } 610 }
531 finish (h, GNUNET_SYSERR);
532} 611}
533 612
534 613
535/** 614/**
536 * Transmit a GET request (and if successful, start to receive 615 * Transmit a GET request (and if successful, start to receive
537 * the response). 616 * the response).
617 *
618 * @param handle statistics handle
619 * @param size how many bytes can we write to buf
620 * @param buf where to write requests to the service
621 * @return number of bytes written to buf
538 */ 622 */
539static size_t 623static size_t
540transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) 624transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
541{ 625{
626 struct GNUNET_STATISTICS_GetHandle *c;
542 struct GNUNET_MessageHeader *hdr; 627 struct GNUNET_MessageHeader *hdr;
543 size_t slen1; 628 size_t slen1;
544 size_t slen2; 629 size_t slen2;
545 uint16_t msize; 630 uint16_t msize;
546 631
632 GNUNET_assert (NULL != (c = handle->current));
547 if (buf == NULL) 633 if (buf == NULL)
548 { 634 {
549 /* timeout / error */ 635 /* timeout / error */
550#if DEBUG_STATISTICS 636#if DEBUG_STATISTICS
551 LOG (GNUNET_ERROR_TYPE_DEBUG, 637 LOG (GNUNET_ERROR_TYPE_DEBUG,
552 "Transmission of request for statistics failed!\n"); 638 "Transmission of request for statistics failed!\n");
553#endif 639#endif
554 finish (handle, GNUNET_SYSERR); 640 do_disconnect (handle);
641 reconnect_later (handle);
555 return 0; 642 return 0;
556 } 643 }
557 slen1 = strlen (handle->current->subsystem) + 1; 644 slen1 = strlen (c->subsystem) + 1;
558 slen2 = strlen (handle->current->name) + 1; 645 slen2 = strlen (c->name) + 1;
559 msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); 646 msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
560 GNUNET_assert (msize <= size); 647 GNUNET_assert (msize <= size);
561 hdr = (struct GNUNET_MessageHeader *) buf; 648 hdr = (struct GNUNET_MessageHeader *) buf;
@@ -563,9 +650,9 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
563 hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET); 650 hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
564 GNUNET_assert (slen1 + slen2 == 651 GNUNET_assert (slen1 + slen2 ==
565 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, 652 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
566 handle->current->subsystem, 653 c->subsystem,
567 handle->current->name)); 654 c->name));
568 if (!handle->receiving) 655 if (GNUNET_YES != handle->receiving)
569 { 656 {
570#if DEBUG_STATISTICS 657#if DEBUG_STATISTICS
571 LOG (GNUNET_ERROR_TYPE_DEBUG, 658 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -573,8 +660,7 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
573#endif 660#endif
574 handle->receiving = GNUNET_YES; 661 handle->receiving = GNUNET_YES;
575 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, 662 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
576 GNUNET_TIME_absolute_get_remaining (handle-> 663 GNUNET_TIME_absolute_get_remaining (c->timeout));
577 current->timeout));
578 } 664 }
579 return msize; 665 return msize;
580} 666}
@@ -583,6 +669,11 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
583/** 669/**
584 * Transmit a WATCH request (and if successful, start to receive 670 * Transmit a WATCH request (and if successful, start to receive
585 * the response). 671 * the response).
672 *
673 * @param handle statistics handle
674 * @param size how many bytes can we write to buf
675 * @param buf where to write requests to the service
676 * @return number of bytes written to buf
586 */ 677 */
587static size_t 678static size_t
588transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) 679transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
@@ -599,7 +690,8 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
599 LOG (GNUNET_ERROR_TYPE_DEBUG, 690 LOG (GNUNET_ERROR_TYPE_DEBUG,
600 "Transmission of request for statistics failed!\n"); 691 "Transmission of request for statistics failed!\n");
601#endif 692#endif
602 finish (handle, GNUNET_SYSERR); 693 do_disconnect (handle);
694 reconnect_later (handle);
603 return 0; 695 return 0;
604 } 696 }
605#if DEBUG_STATISTICS 697#if DEBUG_STATISTICS
@@ -623,13 +715,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
623 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, 715 GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
624 GNUNET_TIME_UNIT_FOREVER_REL); 716 GNUNET_TIME_UNIT_FOREVER_REL);
625 } 717 }
626 finish (handle, GNUNET_OK); 718 GNUNET_assert (NULL == handle->current->cont);
719 free_action_item (handle->current);
720 handle->current = NULL;
627 return msize; 721 return msize;
628} 722}
629 723
630 724
631/** 725/**
632 * Transmit a SET/UPDATE request. 726 * Transmit a SET/UPDATE request.
727 *
728 * @param handle statistics handle
729 * @param size how many bytes can we write to buf
730 * @param buf where to write requests to the service
731 * @return number of bytes written to buf
633 */ 732 */
634static size_t 733static size_t
635transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) 734transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
@@ -641,17 +740,18 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
641 740
642 if (NULL == buf) 741 if (NULL == buf)
643 { 742 {
644 finish (handle, GNUNET_SYSERR); 743 do_disconnect (handle);
744 reconnect_later (handle);
645 return 0; 745 return 0;
646 } 746 }
647
648 slen = strlen (handle->current->subsystem) + 1; 747 slen = strlen (handle->current->subsystem) + 1;
649 nlen = strlen (handle->current->name) + 1; 748 nlen = strlen (handle->current->name) + 1;
650 nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; 749 nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
651 if (size < nsize) 750 if (size < nsize)
652 { 751 {
653 GNUNET_break (0); 752 GNUNET_break (0);
654 finish (handle, GNUNET_SYSERR); 753 do_disconnect (handle);
754 reconnect_later (handle);
655 return 0; 755 return 0;
656 } 756 }
657 r = buf; 757 r = buf;
@@ -667,37 +767,47 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
667 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2, 767 GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
668 handle->current->subsystem, 768 handle->current->subsystem,
669 handle->current->name)); 769 handle->current->name));
670 finish (handle, GNUNET_OK); 770 GNUNET_assert (NULL == handle->current->cont);
771 free_action_item (handle->current);
772 handle->current = NULL;
671 return nsize; 773 return nsize;
672} 774}
673 775
674 776
777/**
778 * Function called when we are ready to transmit a request to the service.
779 *
780 * @param cls the 'struct GNUNET_STATISTICS_Handle'
781 * @param size how many bytes can we write to buf
782 * @param buf where to write requests to the service
783 * @return number of bytes written to buf
784 */
675static size_t 785static size_t
676transmit_action (void *cls, size_t size, void *buf) 786transmit_action (void *cls, size_t size, void *buf)
677{ 787{
678 struct GNUNET_STATISTICS_Handle *handle = cls; 788 struct GNUNET_STATISTICS_Handle *h = cls;
679 size_t ret; 789 size_t ret;
680 790
681 handle->th = NULL; 791 h->th = NULL;
682 if (NULL == handle->current) 792 ret = 0;
683 return 0; 793 if (NULL != h->current)
684 switch (handle->current->type) 794 switch (h->current->type)
685 { 795 {
686 case ACTION_GET: 796 case ACTION_GET:
687 ret = transmit_get (handle, size, buf); 797 ret = transmit_get (h, size, buf);
688 break; 798 break;
689 case ACTION_SET: 799 case ACTION_SET:
690 case ACTION_UPDATE: 800 case ACTION_UPDATE:
691 ret = transmit_set (handle, size, buf); 801 ret = transmit_set (h, size, buf);
692 break; 802 break;
693 case ACTION_WATCH: 803 case ACTION_WATCH:
694 ret = transmit_watch (handle, size, buf); 804 ret = transmit_watch (h, size, buf);
695 break; 805 break;
696 default: 806 default:
697 ret = 0; 807 GNUNET_assert (0);
698 GNUNET_break (0); 808 break;
699 break; 809 }
700 } 810 schedule_action (h);
701 return ret; 811 return ret;
702} 812}
703 813
@@ -721,12 +831,6 @@ GNUNET_STATISTICS_create (const char *subsystem,
721 ret->cfg = cfg; 831 ret->cfg = cfg;
722 ret->subsystem = GNUNET_strdup (subsystem); 832 ret->subsystem = GNUNET_strdup (subsystem);
723 ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 833 ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
724 if (GNUNET_YES != try_connect (ret))
725 {
726 GNUNET_free (ret->subsystem);
727 GNUNET_free (ret);
728 return NULL;
729 }
730 return ret; 834 return ret;
731} 835}
732 836
@@ -807,25 +911,18 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
807 if (h->th != NULL) 911 if (h->th != NULL)
808 return; 912 return;
809 } 913 }
810 if (NULL != h->th)
811 {
812 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
813 h->th = NULL;
814 }
815 if (h->current != NULL)
816 free_action_item (h->current);
817 while (NULL != (pos = h->action_head)) 914 while (NULL != (pos = h->action_head))
818 { 915 {
819 h->action_head = pos->next; 916 GNUNET_CONTAINER_DLL_remove (h->action_head,
917 h->action_tail,
918 pos);
820 free_action_item (pos); 919 free_action_item (pos);
821 } 920 }
822 if (h->client != NULL) 921 do_disconnect (h);
823 {
824 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
825 h->client = NULL;
826 }
827 for (i = 0; i < h->watches_size; i++) 922 for (i = 0; i < h->watches_size; i++)
828 { 923 {
924 if (NULL == h->watches[i])
925 continue;
829 GNUNET_free (h->watches[i]->subsystem); 926 GNUNET_free (h->watches[i]->subsystem);
830 GNUNET_free (h->watches[i]->name); 927 GNUNET_free (h->watches[i]->name);
831 GNUNET_free (h->watches[i]); 928 GNUNET_free (h->watches[i]);
@@ -836,36 +933,26 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
836} 933}
837 934
838 935
839static void
840finish_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
841{
842 struct GNUNET_STATISTICS_Handle *h = cls;
843
844 h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
845 finish (h, GNUNET_SYSERR);
846}
847
848
849/** 936/**
850 * Schedule the next action to be performed. 937 * Schedule the next action to be performed.
938 *
939 * @param h statistics handle
851 */ 940 */
852static void 941static void
853schedule_action (struct GNUNET_STATISTICS_Handle *h) 942schedule_action (struct GNUNET_STATISTICS_Handle *h)
854{ 943{
855 struct GNUNET_TIME_Relative timeout; 944 struct GNUNET_TIME_Relative timeout;
856 945
857 if (h->current != NULL) 946 if ( (h->th != NULL) ||
947 (h->backoff_task != GNUNET_SCHEDULER_NO_TASK) )
858 return; /* action already pending */ 948 return; /* action already pending */
859 if (GNUNET_YES != try_connect (h)) 949 if (GNUNET_YES != try_connect (h))
860 { 950 {
861 h->backoff_task = 951 reconnect_later (h);
862 GNUNET_SCHEDULER_add_delayed (h->backoff, &finish_task, h);
863 h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
864 h->backoff =
865 GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
866 return; 952 return;
867 } 953 }
868 954 if (NULL != h->current)
955 return; /* action already pending */
869 /* schedule next action */ 956 /* schedule next action */
870 h->current = h->action_head; 957 h->current = h->action_head;
871 if (NULL == h->current) 958 if (NULL == h->current)
@@ -879,7 +966,6 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
879 } 966 }
880 GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current); 967 GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
881 timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); 968 timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
882 GNUNET_assert (NULL == h->th);
883 if (NULL == 969 if (NULL ==
884 (h->th = 970 (h->th =
885 GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, 971 GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
@@ -890,7 +976,8 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
890 LOG (GNUNET_ERROR_TYPE_DEBUG, 976 LOG (GNUNET_ERROR_TYPE_DEBUG,
891 "Failed to transmit request to statistics service.\n"); 977 "Failed to transmit request to statistics service.\n");
892#endif 978#endif
893 finish (h, GNUNET_SYSERR); 979 do_disconnect (h);
980 reconnect_later (h);
894 } 981 }
895} 982}
896 983
@@ -919,18 +1006,10 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
919 size_t slen2; 1006 size_t slen2;
920 struct GNUNET_STATISTICS_GetHandle *ai; 1007 struct GNUNET_STATISTICS_GetHandle *ai;
921 1008
922 GNUNET_assert (handle != NULL); 1009 if (NULL == handle)
1010 return NULL;
923 GNUNET_assert (proc != NULL); 1011 GNUNET_assert (proc != NULL);
924 GNUNET_assert (GNUNET_NO == handle->do_destroy); 1012 GNUNET_assert (GNUNET_NO == handle->do_destroy);
925 if (GNUNET_YES != try_connect (handle))
926 {
927#if DEBUG_STATISTICS
928 LOG (GNUNET_ERROR_TYPE_DEBUG,
929 "Failed to connect to statistics service, can not get value `%s:%s'.\n",
930 strlen (subsystem) ? subsystem : "*", strlen (name) ? name : "*");
931#endif
932 return NULL;
933 }
934 if (subsystem == NULL) 1013 if (subsystem == NULL)
935 subsystem = ""; 1014 subsystem = "";
936 if (name == NULL) 1015 if (name == NULL)
@@ -949,7 +1028,9 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
949 ai->timeout = GNUNET_TIME_relative_to_absolute (timeout); 1028 ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
950 ai->type = ACTION_GET; 1029 ai->type = ACTION_GET;
951 ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); 1030 ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
952 insert_ai (handle, ai); 1031 GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1032 ai);
1033 schedule_action (handle);
953 return ai; 1034 return ai;
954} 1035}
955 1036
@@ -963,6 +1044,8 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
963void 1044void
964GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh) 1045GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
965{ 1046{
1047 if (NULL == gh)
1048 return;
966 if (gh->sh->current == gh) 1049 if (gh->sh->current == gh)
967 { 1050 {
968 gh->aborted = GNUNET_YES; 1051 gh->aborted = GNUNET_YES;
@@ -1009,6 +1092,15 @@ GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1009} 1092}
1010 1093
1011 1094
1095/**
1096 * Queue a request to change a statistic.
1097 *
1098 * @param h statistics handle
1099 * @param name name of the value
1100 * @param make_persistent should the value be kept across restarts?
1101 * @param value new value or change
1102 * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1103 */
1012static void 1104static void
1013add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name, 1105add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1014 int make_persistent, uint64_t value, enum ActionType type) 1106 int make_persistent, uint64_t value, enum ActionType type)
@@ -1021,8 +1113,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1021 1113
1022 GNUNET_assert (h != NULL); 1114 GNUNET_assert (h != NULL);
1023 GNUNET_assert (name != NULL); 1115 GNUNET_assert (name != NULL);
1024 if (GNUNET_YES != try_connect (h))
1025 return;
1026 slen = strlen (h->subsystem) + 1; 1116 slen = strlen (h->subsystem) + 1;
1027 nlen = strlen (name) + 1; 1117 nlen = strlen (name) + 1;
1028 nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; 1118 nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
@@ -1031,54 +1121,58 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1031 GNUNET_break (0); 1121 GNUNET_break (0);
1032 return; 1122 return;
1033 } 1123 }
1034 ai = h->action_head; 1124 for (ai = h->action_head; ai != NULL; ai = ai->next)
1035 while (ai != NULL)
1036 { 1125 {
1037 if ((0 == strcmp (ai->subsystem, h->subsystem)) && 1126 if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1038 (0 == strcmp (ai->name, name)) && ((ai->type == ACTION_UPDATE) || 1127 (0 == strcmp (ai->name, name)) &&
1039 (ai->type == ACTION_SET))) 1128 ( (ai->type == ACTION_UPDATE) ||
1129 (ai->type == ACTION_SET) ) ) )
1130 continue;
1131 if (ai->type == ACTION_SET)
1040 { 1132 {
1041 if (ai->type == ACTION_SET) 1133 if (type == ACTION_UPDATE)
1042 { 1134 {
1043 if (type == ACTION_UPDATE) 1135 delta = (int64_t) value;
1136 if (delta > 0)
1044 { 1137 {
1045 delta = (int64_t) value; 1138 /* update old set by new delta */
1046 if (delta > 0) 1139 ai->value += delta;
1047 { 1140 }
1048 ai->value += delta; 1141 else
1049 }
1050 else
1051 {
1052 if (ai->value < -delta)
1053 ai->value = 0;
1054 else
1055 ai->value += delta;
1056 }
1057 }
1058 else
1059 { 1142 {
1060 ai->value = value; 1143 /* update old set by new delta, but never go negative */
1061 } 1144 if (ai->value < -delta)
1145 ai->value = 0;
1146 else
1147 ai->value += delta;
1148 }
1062 } 1149 }
1063 else 1150 else
1064 { 1151 {
1065 if (type == ACTION_UPDATE) 1152 /* new set overrides old set */
1066 { 1153 ai->value = value;
1067 delta = (int64_t) value;
1068 ai->value += delta;
1069 }
1070 else
1071 {
1072 ai->value = value;
1073 ai->type = type;
1074 }
1075 } 1154 }
1076 ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1077 ai->make_persistent = make_persistent;
1078 return;
1079 } 1155 }
1080 ai = ai->next; 1156 else
1157 {
1158 if (type == ACTION_UPDATE)
1159 {
1160 /* make delta cummulative */
1161 delta = (int64_t) value;
1162 ai->value += delta;
1163 }
1164 else
1165 {
1166 /* drop old 'update', use new 'set' instead */
1167 ai->value = value;
1168 ai->type = type;
1169 }
1170 }
1171 ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1172 ai->make_persistent = make_persistent;
1173 return;
1081 } 1174 }
1175 /* no existing entry matches, create a fresh one */
1082 ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle)); 1176 ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1083 ai->sh = h; 1177 ai->sh = h;
1084 ai->subsystem = GNUNET_strdup (h->subsystem); 1178 ai->subsystem = GNUNET_strdup (h->subsystem);
@@ -1088,7 +1182,9 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1088 ai->msize = nsize; 1182 ai->msize = nsize;
1089 ai->value = value; 1183 ai->value = value;
1090 ai->type = type; 1184 ai->type = type;
1091 insert_ai (h, ai); 1185 GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1186 ai);
1187 schedule_action (h);
1092} 1188}
1093 1189
1094 1190
diff --git a/src/statistics/test_statistics_api_watch.c b/src/statistics/test_statistics_api_watch.c
index 14e7e1704..1323e3ad1 100644
--- a/src/statistics/test_statistics_api_watch.c
+++ b/src/statistics/test_statistics_api_watch.c
@@ -33,8 +33,6 @@
33 33
34#define START_SERVICE GNUNET_YES 34#define START_SERVICE GNUNET_YES
35 35
36#define ROUNDS (1024 * 1024)
37
38static int ok; 36static int ok;
39 37
40static struct GNUNET_STATISTICS_Handle *h; 38static struct GNUNET_STATISTICS_Handle *h;