aboutsummaryrefslogtreecommitdiff
path: root/src/statistics/statistics_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/statistics/statistics_api.c')
-rw-r--r--src/statistics/statistics_api.c365
1 files changed, 333 insertions, 32 deletions
diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c
index 9de9f78fd..a5dde0e55 100644
--- a/src/statistics/statistics_api.c
+++ b/src/statistics/statistics_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -25,6 +25,7 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_client_lib.h" 27#include "gnunet_client_lib.h"
28#include "gnunet_constants.h"
28#include "gnunet_container_lib.h" 29#include "gnunet_container_lib.h"
29#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
30#include "gnunet_server_lib.h" 31#include "gnunet_server_lib.h"
@@ -47,7 +48,37 @@ enum ActionType
47{ 48{
48 ACTION_GET, 49 ACTION_GET,
49 ACTION_SET, 50 ACTION_SET,
50 ACTION_UPDATE 51 ACTION_UPDATE,
52 ACTION_WATCH
53};
54
55
56/**
57 * Entry kept for each value we are watching.
58 */
59struct GNUNET_STATISTICS_WatchEntry
60{
61
62 /**
63 * What subsystem is this action about? (never NULL)
64 */
65 char *subsystem;
66
67 /**
68 * What value is this action about? (never NULL)
69 */
70 char *name;
71
72 /**
73 * Function to call
74 */
75 GNUNET_STATISTICS_Iterator proc;
76
77 /**
78 * Closure for proc
79 */
80 void *proc_cls;
81
51}; 82};
52 83
53 84
@@ -118,7 +149,7 @@ struct GNUNET_STATISTICS_GetHandle
118 int aborted; 149 int aborted;
119 150
120 /** 151 /**
121 * Is this a GET, SET or UPDATE? 152 * Is this a GET, SET, UPDATE or WATCH?
122 */ 153 */
123 enum ActionType type; 154 enum ActionType type;
124 155
@@ -179,14 +210,103 @@ struct GNUNET_STATISTICS_Handle
179 struct GNUNET_STATISTICS_GetHandle *current; 210 struct GNUNET_STATISTICS_GetHandle *current;
180 211
181 /** 212 /**
213 * Array of watch entries.
214 */
215 struct GNUNET_STATISTICS_WatchEntry **watches;
216
217 /**
218 * Task doing exponential back-off trying to reconnect.
219 */
220 GNUNET_SCHEDULER_TaskIdentifier backoff_task;
221
222 /**
223 * Time for next connect retry.
224 */
225 struct GNUNET_TIME_Relative backoff;
226
227 /**
228 * Size of the 'watches' array.
229 */
230 unsigned int watches_size;
231
232 /**
182 * Should this handle auto-destruct once all actions have 233 * Should this handle auto-destruct once all actions have
183 * been processed? 234 * been processed?
184 */ 235 */
185 int do_destroy; 236 int do_destroy;
186 237
238 /**
239 * Are we currently receiving from the service?
240 */
241 int receiving;
242
187}; 243};
188 244
189 245
246
247/**
248 * Schedule the next action to be performed.
249 */
250static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
251
252/**
253 * Try to (re)connect to the statistics service.
254 *
255 * @return GNUNET_YES on success, GNUNET_NO on failure.
256 */
257static int
258try_connect (struct GNUNET_STATISTICS_Handle *ret);
259
260
261static void
262insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
263{
264 GNUNET_CONTAINER_DLL_insert_after (h->action_head,
265 h->action_tail,
266 h->action_tail,
267 ai);
268 if (h->action_head == ai)
269 schedule_action (h);
270}
271
272
273static void
274schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
275 struct GNUNET_STATISTICS_WatchEntry *watch)
276{
277
278 struct GNUNET_STATISTICS_GetHandle *ai;
279 size_t slen;
280 size_t nlen;
281 size_t nsize;
282
283 GNUNET_assert (h != NULL);
284 if (GNUNET_YES != try_connect (h))
285 {
286 schedule_action (h);
287 return;
288 }
289 slen = strlen (watch->subsystem) + 1;
290 nlen = strlen (watch->name) + 1;
291 nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
292 if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
293 {
294 GNUNET_break (0);
295 return;
296 }
297 ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
298 ai->sh = h;
299 ai->subsystem = GNUNET_strdup (watch->subsystem);
300 ai->name = GNUNET_strdup (watch->name);
301 ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
302 ai->msize = nsize;
303 ai->type = ACTION_WATCH;
304 ai->proc = watch->proc;
305 ai->cls = watch->proc_cls;
306 insert_ai (h, ai);
307}
308
309
190/** 310/**
191 * Try to (re)connect to the statistics service. 311 * Try to (re)connect to the statistics service.
192 * 312 *
@@ -195,11 +315,16 @@ struct GNUNET_STATISTICS_Handle
195static int 315static int
196try_connect (struct GNUNET_STATISTICS_Handle *ret) 316try_connect (struct GNUNET_STATISTICS_Handle *ret)
197{ 317{
318 unsigned int i;
198 if (ret->client != NULL) 319 if (ret->client != NULL)
199 return GNUNET_YES; 320 return GNUNET_YES;
200 ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg); 321 ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg);
201 if (ret->client != NULL) 322 if (ret->client != NULL)
202 return GNUNET_YES; 323 {
324 for (i=0;i<ret->watches_size;i++)
325 schedule_watch_request (ret, ret->watches[i]);
326 return GNUNET_YES;
327 }
203#if DEBUG_STATISTICS 328#if DEBUG_STATISTICS
204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
205 _("Failed to connect to statistics service!\n")); 330 _("Failed to connect to statistics service!\n"));
@@ -221,12 +346,6 @@ free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
221 346
222 347
223/** 348/**
224 * Schedule the next action to be performed.
225 */
226static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
227
228
229/**
230 * GET processing is complete, tell client about it. 349 * GET processing is complete, tell client about it.
231 */ 350 */
232static void 351static void
@@ -259,7 +378,13 @@ process_message (struct GNUNET_STATISTICS_Handle *h,
259 uint16_t size; 378 uint16_t size;
260 379
261 if (h->current->aborted) 380 if (h->current->aborted)
262 return GNUNET_OK; /* don't bother */ 381 {
382#if DEBUG_STATISTICS
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "Iteration was aborted, ignoring VALUE\n");
385#endif
386 return GNUNET_OK; /* don't bother */
387 }
263 size = ntohs (msg->size); 388 size = ntohs (msg->size);
264 if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage)) 389 if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
265 { 390 {
@@ -293,6 +418,42 @@ process_message (struct GNUNET_STATISTICS_Handle *h,
293#endif 418#endif
294 h->current->aborted = GNUNET_YES; 419 h->current->aborted = GNUNET_YES;
295 } 420 }
421#if DEBUG_STATISTICS
422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423 "VALUE processed successfully\n");
424#endif
425 return GNUNET_OK;
426}
427
428
429static int
430process_watch_value (struct GNUNET_STATISTICS_Handle *h,
431 const struct GNUNET_MessageHeader *msg)
432{
433 const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
434 struct GNUNET_STATISTICS_WatchEntry *w;
435 uint32_t wid;
436
437 if (sizeof(struct GNUNET_STATISTICS_WatchValueMessage) !=
438 ntohs (msg->size))
439 {
440 GNUNET_break (0);
441 return GNUNET_SYSERR;
442 }
443 wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *)msg;
444 wid = ntohl (wvm->wid);
445 if (wid >= h->watches_size)
446 {
447 GNUNET_break (0);
448 return GNUNET_SYSERR;
449 }
450 w = h->watches[wid];
451 (void) w->proc (w->proc_cls,
452 w->subsystem,
453 w->name,
454 GNUNET_ntohll (wvm->value),
455 0 !=
456 (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
296 return GNUNET_OK; 457 return GNUNET_OK;
297} 458}
298 459
@@ -329,21 +490,53 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 490 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330 "Received end of statistics marker\n"); 491 "Received end of statistics marker\n");
331#endif 492#endif
493 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
332 finish (h, GNUNET_OK); 494 finish (h, GNUNET_OK);
495 if (h->watches_size > 0)
496 {
497 GNUNET_CLIENT_receive (h->client,
498 &receive_stats,
499 h,
500 GNUNET_TIME_UNIT_FOREVER_REL);
501 }
502 else
503 {
504 h->receiving = GNUNET_NO;
505 }
333 return; 506 return;
334 case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE: 507 case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
335 if (GNUNET_OK == process_message (h, msg)) 508 if (GNUNET_OK == process_message (h, msg))
336 { 509 {
337 /* finally, look for more! */ 510 /* finally, look for more! */
511#if DEBUG_STATISTICS
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "Processing VALUE done, now reading more\n");
514#endif
338 GNUNET_CLIENT_receive (h->client, 515 GNUNET_CLIENT_receive (h->client,
339 &receive_stats, 516 &receive_stats,
340 h, 517 h,
341 GNUNET_TIME_absolute_get_remaining 518 GNUNET_TIME_absolute_get_remaining
342 (h->current->timeout)); 519 (h->current->timeout));
520 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
343 return; 521 return;
344 } 522 }
345 GNUNET_break (0); 523 GNUNET_break (0);
346 break; 524 break;
525 case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
526 if (GNUNET_OK ==
527 process_watch_value (h,
528 msg))
529 {
530 h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
531 GNUNET_assert (h->watches_size > 0);
532 GNUNET_CLIENT_receive (h->client,
533 &receive_stats,
534 h,
535 GNUNET_TIME_UNIT_FOREVER_REL);
536 return;
537 }
538 GNUNET_break (0);
539 break;
347 default: 540 default:
348 GNUNET_break (0); 541 GNUNET_break (0);
349 break; 542 break;
@@ -392,15 +585,69 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
392 2, 585 2,
393 handle->current->subsystem, 586 handle->current->subsystem,
394 handle->current->name)); 587 handle->current->name));
395 GNUNET_CLIENT_receive (handle->client, 588 if (! handle->receiving)
396 &receive_stats, 589 {
397 handle, 590#if DEBUG_STATISTICS
398 GNUNET_TIME_absolute_get_remaining (handle-> 591 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399 current->timeout)); 592 "Transmission of GET done, now reading response\n");
593#endif
594 handle->receiving = GNUNET_YES;
595 GNUNET_CLIENT_receive (handle->client,
596 &receive_stats,
597 handle,
598 GNUNET_TIME_absolute_get_remaining (handle->
599 current->timeout));
600 }
400 return msize; 601 return msize;
401} 602}
402 603
403 604
605/**
606 * Transmit a WATCH request (and if successful, start to receive
607 * the response).
608 */
609static size_t
610transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
611{
612 struct GNUNET_MessageHeader *hdr;
613 size_t slen1;
614 size_t slen2;
615 uint16_t msize;
616
617 if (buf == NULL)
618 {
619 /* timeout / error */
620#if DEBUG_STATISTICS
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "Transmission of request for statistics failed!\n");
623#endif
624 finish (handle, GNUNET_SYSERR);
625 return 0;
626 }
627 slen1 = strlen (handle->current->subsystem) + 1;
628 slen2 = strlen (handle->current->name) + 1;
629 msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
630 GNUNET_assert (msize <= size);
631 hdr = (struct GNUNET_MessageHeader *) buf;
632 hdr->size = htons (msize);
633 hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
634 GNUNET_assert (slen1 + slen2 ==
635 GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
636 slen1 + slen2,
637 2,
638 handle->current->subsystem,
639 handle->current->name));
640 if (! handle->receiving)
641 {
642 handle->receiving = GNUNET_YES;
643 GNUNET_CLIENT_receive (handle->client,
644 &receive_stats,
645 handle,
646 GNUNET_TIME_UNIT_FOREVER_REL);
647 }
648 return msize;
649}
650
404 651
405/** 652/**
406 * Transmit a SET/UPDATE request. 653 * Transmit a SET/UPDATE request.
@@ -464,6 +711,9 @@ transmit_action (void *cls, size_t size, void *buf)
464 case ACTION_UPDATE: 711 case ACTION_UPDATE:
465 ret = transmit_set (handle, size, buf); 712 ret = transmit_set (handle, size, buf);
466 break; 713 break;
714 case ACTION_WATCH:
715 ret = transmit_watch (handle, size, buf);
716 break;
467 default: 717 default:
468 ret = 0; 718 ret = 0;
469 GNUNET_break (0); 719 GNUNET_break (0);
@@ -495,6 +745,7 @@ GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
495 ret->sched = sched; 745 ret->sched = sched;
496 ret->cfg = cfg; 746 ret->cfg = cfg;
497 ret->subsystem = GNUNET_strdup (subsystem); 747 ret->subsystem = GNUNET_strdup (subsystem);
748 ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
498 try_connect (ret); 749 try_connect (ret);
499 return ret; 750 return ret;
500} 751}
@@ -516,7 +767,11 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
516 struct GNUNET_STATISTICS_GetHandle *next; 767 struct GNUNET_STATISTICS_GetHandle *next;
517 struct GNUNET_STATISTICS_GetHandle *prev; 768 struct GNUNET_STATISTICS_GetHandle *prev;
518 struct GNUNET_TIME_Relative timeout; 769 struct GNUNET_TIME_Relative timeout;
770 int i;
519 771
772 if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
773 GNUNET_SCHEDULER_cancel (h->sched,
774 h->backoff_task);
520 if (sync_first) 775 if (sync_first)
521 { 776 {
522 if (h->current != NULL) 777 if (h->current != NULL)
@@ -591,11 +846,30 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
591 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES); 846 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
592 h->client = NULL; 847 h->client = NULL;
593 } 848 }
849 for (i=0;i<h->watches_size;i++)
850 {
851 GNUNET_free (h->watches[i]->subsystem);
852 GNUNET_free (h->watches[i]->name);
853 GNUNET_free (h->watches[i]);
854 }
855 GNUNET_array_grow (h->watches,
856 h->watches_size,
857 0);
594 GNUNET_free (h->subsystem); 858 GNUNET_free (h->subsystem);
595 GNUNET_free (h); 859 GNUNET_free (h);
596} 860}
597 861
598 862
863static void
864finish_task (void *cls,
865 const struct GNUNET_SCHEDULER_TaskContext *tc)
866{
867 struct GNUNET_STATISTICS_Handle *h = cls;
868
869 h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
870 finish (h, GNUNET_SYSERR);
871}
872
599 873
600/** 874/**
601 * Schedule the next action to be performed. 875 * Schedule the next action to be performed.
@@ -609,7 +883,13 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
609 return; /* action already pending */ 883 return; /* action already pending */
610 if (GNUNET_YES != try_connect (h)) 884 if (GNUNET_YES != try_connect (h))
611 { 885 {
612 finish (h, GNUNET_SYSERR); 886 h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->sched,
887 h->backoff,
888 &finish_task,
889 h);
890 h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
891 h->backoff = GNUNET_TIME_relative_min (h->backoff,
892 GNUNET_CONSTANTS_SERVICE_TIMEOUT);
613 return; 893 return;
614 } 894 }
615 895
@@ -643,19 +923,6 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h)
643 } 923 }
644} 924}
645 925
646
647static void
648insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
649{
650 GNUNET_CONTAINER_DLL_insert_after (h->action_head,
651 h->action_tail,
652 h->action_tail,
653 ai);
654 if (h->action_head == ai)
655 schedule_action (h);
656}
657
658
659/** 926/**
660 * Get statistic from the peer. 927 * Get statistic from the peer.
661 * 928 *
@@ -742,6 +1009,40 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
742} 1009}
743 1010
744 1011
1012/**
1013 * Watch statistics from the peer (be notified whenever they change).
1014 * Note that the only way to cancel a "watch" request is to destroy
1015 * the statistics handle given as the first argument to this call.
1016 *
1017 * @param handle identification of the statistics service
1018 * @param subsystem limit to the specified subsystem, never NULL
1019 * @param name name of the statistic value, never NULL
1020 * @param proc function to call on each value
1021 * @param proc_cls closure for proc
1022 * @return GNUNET_OK on success, GNUNET_SYSERR on error
1023 */
1024int
1025GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1026 const char *subsystem,
1027 const char *name,
1028 GNUNET_STATISTICS_Iterator proc,
1029 void *proc_cls)
1030{
1031 struct GNUNET_STATISTICS_WatchEntry *w;
1032
1033 w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1034 w->subsystem = GNUNET_strdup (subsystem);
1035 w->name = GNUNET_strdup (name);
1036 w->proc = proc;
1037 w->proc_cls = proc_cls;
1038 GNUNET_array_append (handle->watches,
1039 handle->watches_size,
1040 w);
1041 schedule_watch_request (handle, w);
1042 return GNUNET_OK;
1043}
1044
1045
745static void 1046static void
746add_setter_action (struct GNUNET_STATISTICS_Handle *h, 1047add_setter_action (struct GNUNET_STATISTICS_Handle *h,
747 const char *name, 1048 const char *name,
@@ -809,7 +1110,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
809 ai->type = type; 1110 ai->type = type;
810 } 1111 }
811 } 1112 }
812 ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT); 1113 ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1114 ai->make_persistent = make_persistent;
813 return; 1115 return;
814 } 1116 }
815 ai = ai->next; 1117 ai = ai->next;
@@ -824,7 +1126,6 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h,
824 ai->value = value; 1126 ai->value = value;
825 ai->type = type; 1127 ai->type = type;
826 insert_ai (h, ai); 1128 insert_ai (h, ai);
827 schedule_action (h);
828} 1129}
829 1130
830 1131