diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-21 18:29:03 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-21 18:29:03 +0000 |
commit | 3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899 (patch) | |
tree | b586e13615fe58377cef5c0a238a677e5fd8f609 /src/statistics/statistics_api.c | |
parent | 1732154b8c021e7ee0e34c28cf3b1a843454727a (diff) | |
download | gnunet-3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899.tar.gz gnunet-3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899.zip |
update statistics API to use new MQ API style, also get rid of timeout argument
Diffstat (limited to 'src/statistics/statistics_api.c')
-rw-r--r-- | src/statistics/statistics_api.c | 934 |
1 files changed, 391 insertions, 543 deletions
diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c index 32b973eec..37aa99017 100644 --- a/src/statistics/statistics_api.c +++ b/src/statistics/statistics_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009, 2010, 2011 GNUnet e.V. | 3 | Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -88,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry | |||
88 | GNUNET_STATISTICS_Iterator proc; | 88 | GNUNET_STATISTICS_Iterator proc; |
89 | 89 | ||
90 | /** | 90 | /** |
91 | * Closure for proc | 91 | * Closure for @e proc |
92 | */ | 92 | */ |
93 | void *proc_cls; | 93 | void *proc_cls; |
94 | 94 | ||
@@ -137,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle | |||
137 | GNUNET_STATISTICS_Iterator proc; | 137 | GNUNET_STATISTICS_Iterator proc; |
138 | 138 | ||
139 | /** | 139 | /** |
140 | * Closure for proc and cont. | 140 | * Closure for @e proc and @e cont. |
141 | */ | 141 | */ |
142 | void *cls; | 142 | void *cls; |
143 | 143 | ||
@@ -147,11 +147,6 @@ struct GNUNET_STATISTICS_GetHandle | |||
147 | struct GNUNET_TIME_Absolute timeout; | 147 | struct GNUNET_TIME_Absolute timeout; |
148 | 148 | ||
149 | /** | 149 | /** |
150 | * Task run on timeout. | ||
151 | */ | ||
152 | struct GNUNET_SCHEDULER_Task * timeout_task; | ||
153 | |||
154 | /** | ||
155 | * Associated value. | 150 | * Associated value. |
156 | */ | 151 | */ |
157 | uint64_t value; | 152 | uint64_t value; |
@@ -167,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle | |||
167 | int aborted; | 162 | int aborted; |
168 | 163 | ||
169 | /** | 164 | /** |
170 | * Is this a GET, SET, UPDATE or WATCH? | 165 | * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH? |
171 | */ | 166 | */ |
172 | enum ActionType type; | 167 | enum ActionType type; |
173 | 168 | ||
@@ -195,14 +190,9 @@ struct GNUNET_STATISTICS_Handle | |||
195 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 190 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
196 | 191 | ||
197 | /** | 192 | /** |
198 | * Socket (if available). | 193 | * Message queue to the service. |
199 | */ | 194 | */ |
200 | struct GNUNET_CLIENT_Connection *client; | 195 | struct GNUNET_MQ_Handle *mq; |
201 | |||
202 | /** | ||
203 | * Currently pending transmission request. | ||
204 | */ | ||
205 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
206 | 196 | ||
207 | /** | 197 | /** |
208 | * Head of the linked list of pending actions (first action | 198 | * Head of the linked list of pending actions (first action |
@@ -230,7 +220,7 @@ struct GNUNET_STATISTICS_Handle | |||
230 | /** | 220 | /** |
231 | * Task doing exponential back-off trying to reconnect. | 221 | * Task doing exponential back-off trying to reconnect. |
232 | */ | 222 | */ |
233 | struct GNUNET_SCHEDULER_Task * backoff_task; | 223 | struct GNUNET_SCHEDULER_Task *backoff_task; |
234 | 224 | ||
235 | /** | 225 | /** |
236 | * Time for next connect retry. | 226 | * Time for next connect retry. |
@@ -248,7 +238,7 @@ struct GNUNET_STATISTICS_Handle | |||
248 | uint64_t peak_rss; | 238 | uint64_t peak_rss; |
249 | 239 | ||
250 | /** | 240 | /** |
251 | * Size of the 'watches' array. | 241 | * Size of the @e watches array. |
252 | */ | 242 | */ |
253 | unsigned int watches_size; | 243 | unsigned int watches_size; |
254 | 244 | ||
@@ -321,6 +311,15 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h); | |||
321 | 311 | ||
322 | 312 | ||
323 | /** | 313 | /** |
314 | * Reconnect at a later time, respecting back-off. | ||
315 | * | ||
316 | * @param h statistics handle | ||
317 | */ | ||
318 | static void | ||
319 | reconnect_later (struct GNUNET_STATISTICS_Handle *h); | ||
320 | |||
321 | |||
322 | /** | ||
324 | * Transmit request to service that we want to watch | 323 | * Transmit request to service that we want to watch |
325 | * the development of a particular value. | 324 | * the development of a particular value. |
326 | * | 325 | * |
@@ -353,7 +352,8 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, | |||
353 | ai->type = ACTION_WATCH; | 352 | ai->type = ACTION_WATCH; |
354 | ai->proc = watch->proc; | 353 | ai->proc = watch->proc; |
355 | ai->cls = watch->proc_cls; | 354 | ai->cls = watch->proc_cls; |
356 | GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, | 355 | GNUNET_CONTAINER_DLL_insert_tail (h->action_head, |
356 | h->action_tail, | ||
357 | ai); | 357 | ai); |
358 | schedule_action (h); | 358 | schedule_action (h); |
359 | } | 359 | } |
@@ -367,11 +367,6 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, | |||
367 | static void | 367 | static void |
368 | free_action_item (struct GNUNET_STATISTICS_GetHandle *gh) | 368 | free_action_item (struct GNUNET_STATISTICS_GetHandle *gh) |
369 | { | 369 | { |
370 | if (NULL != gh->timeout_task) | ||
371 | { | ||
372 | GNUNET_SCHEDULER_cancel (gh->timeout_task); | ||
373 | gh->timeout_task = NULL; | ||
374 | } | ||
375 | GNUNET_free_non_null (gh->subsystem); | 370 | GNUNET_free_non_null (gh->subsystem); |
376 | GNUNET_free_non_null (gh->name); | 371 | GNUNET_free_non_null (gh->name); |
377 | GNUNET_free (gh); | 372 | GNUNET_free (gh); |
@@ -388,11 +383,6 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h) | |||
388 | { | 383 | { |
389 | struct GNUNET_STATISTICS_GetHandle *c; | 384 | struct GNUNET_STATISTICS_GetHandle *c; |
390 | 385 | ||
391 | if (NULL != h->th) | ||
392 | { | ||
393 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
394 | h->th = NULL; | ||
395 | } | ||
396 | h->receiving = GNUNET_NO; | 386 | h->receiving = GNUNET_NO; |
397 | if (NULL != (c = h->current)) | 387 | if (NULL != (c = h->current)) |
398 | { | 388 | { |
@@ -400,393 +390,400 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h) | |||
400 | if ( (NULL != c->cont) && | 390 | if ( (NULL != c->cont) && |
401 | (GNUNET_YES != c->aborted) ) | 391 | (GNUNET_YES != c->aborted) ) |
402 | { | 392 | { |
403 | c->cont (c->cls, GNUNET_SYSERR); | 393 | c->cont (c->cls, |
394 | GNUNET_SYSERR); | ||
404 | c->cont = NULL; | 395 | c->cont = NULL; |
405 | } | 396 | } |
406 | free_action_item (c); | 397 | free_action_item (c); |
407 | } | 398 | } |
408 | if (NULL != h->client) | 399 | if (NULL != h->mq) |
409 | { | 400 | { |
410 | GNUNET_CLIENT_disconnect (h->client); | 401 | GNUNET_MQ_destroy (h->mq); |
411 | h->client = NULL; | 402 | h->mq = NULL; |
412 | } | 403 | } |
413 | } | 404 | } |
414 | 405 | ||
415 | 406 | ||
416 | /** | 407 | /** |
417 | * Try to (re)connect to the statistics service. | 408 | * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. |
418 | * | 409 | * |
419 | * @param h statistics handle to reconnect | 410 | * @param cls statistics handle |
420 | * @return #GNUNET_YES on success, #GNUNET_NO on failure. | 411 | * @param smsg message received from the service, never NULL |
412 | * @return #GNUNET_OK if the message was well-formed | ||
421 | */ | 413 | */ |
422 | static int | 414 | static int |
423 | try_connect (struct GNUNET_STATISTICS_Handle *h) | 415 | check_statistics_value (void *cls, |
416 | const struct GNUNET_STATISTICS_ReplyMessage *smsg) | ||
424 | { | 417 | { |
425 | struct GNUNET_STATISTICS_GetHandle *gh; | 418 | const char *service; |
426 | struct GNUNET_STATISTICS_GetHandle *gn; | 419 | const char *name; |
427 | unsigned int i; | 420 | uint16_t size; |
428 | 421 | ||
429 | if (NULL != h->backoff_task) | 422 | size = ntohs (smsg->header.size); |
430 | return GNUNET_NO; | 423 | size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); |
431 | if (NULL != h->client) | 424 | if (size != |
432 | return GNUNET_YES; | 425 | GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], |
433 | h->client = GNUNET_CLIENT_connect ("statistics", h->cfg); | 426 | size, |
434 | if (NULL != h->client) | 427 | 2, |
428 | &service, | ||
429 | &name)) | ||
435 | { | 430 | { |
436 | gn = h->action_head; | 431 | GNUNET_break (0); |
437 | while (NULL != (gh = gn)) | 432 | return GNUNET_SYSERR; |
438 | { | ||
439 | gn = gh->next; | ||
440 | if (gh->type == ACTION_WATCH) | ||
441 | { | ||
442 | GNUNET_CONTAINER_DLL_remove (h->action_head, | ||
443 | h->action_tail, | ||
444 | gh); | ||
445 | free_action_item (gh); | ||
446 | } | ||
447 | } | ||
448 | for (i = 0; i < h->watches_size; i++) | ||
449 | { | ||
450 | if (NULL != h->watches[i]) | ||
451 | schedule_watch_request (h, h->watches[i]); | ||
452 | } | ||
453 | return GNUNET_YES; | ||
454 | } | 433 | } |
455 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 434 | return GNUNET_OK; |
456 | "Failed to connect to statistics service!\n"); | ||
457 | return GNUNET_NO; | ||
458 | } | 435 | } |
459 | 436 | ||
460 | 437 | ||
461 | /** | 438 | /** |
462 | * We've waited long enough, reconnect now. | 439 | * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. |
463 | * | 440 | * |
464 | * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect | 441 | * @param cls statistics handle |
442 | * @param msg message received from the service, never NULL | ||
443 | * @return #GNUNET_OK if the message was well-formed | ||
465 | */ | 444 | */ |
466 | static void | 445 | static void |
467 | reconnect_task (void *cls) | 446 | handle_statistics_value (void *cls, |
447 | const struct GNUNET_STATISTICS_ReplyMessage *smsg) | ||
468 | { | 448 | { |
469 | struct GNUNET_STATISTICS_Handle *h = cls; | 449 | struct GNUNET_STATISTICS_Handle *h = cls; |
450 | const char *service; | ||
451 | const char *name; | ||
452 | uint16_t size; | ||
470 | 453 | ||
471 | h->backoff_task = NULL; | 454 | if (h->current->aborted) |
472 | schedule_action (h); | 455 | return; /* iteration aborted, don't bother */ |
456 | |||
457 | size = ntohs (smsg->header.size); | ||
458 | size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); | ||
459 | GNUNET_assert (size == | ||
460 | GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], | ||
461 | size, | ||
462 | 2, | ||
463 | &service, | ||
464 | &name)); | ||
465 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
466 | "Received valid statistic on `%s:%s': %llu\n", | ||
467 | service, name, | ||
468 | GNUNET_ntohll (smsg->value)); | ||
469 | if (GNUNET_OK != | ||
470 | h->current->proc (h->current->cls, | ||
471 | service, | ||
472 | name, | ||
473 | GNUNET_ntohll (smsg->value), | ||
474 | 0 != | ||
475 | (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT))) | ||
476 | { | ||
477 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
478 | "Processing of remaining statistics aborted by client.\n"); | ||
479 | h->current->aborted = GNUNET_YES; | ||
480 | } | ||
473 | } | 481 | } |
474 | 482 | ||
475 | 483 | ||
476 | /** | 484 | /** |
477 | * Task used by 'reconnect_later' to shutdown the handle | 485 | * We have received a watch value from the service. Process it. |
478 | * | 486 | * |
479 | * @param cls the statistics handle | 487 | * @param cls statistics handle |
488 | * @param msg the watch value message | ||
480 | */ | 489 | */ |
481 | static void | 490 | static void |
482 | do_destroy (void *cls) | 491 | handle_statistics_watch_value (void *cls, |
492 | const struct GNUNET_STATISTICS_WatchValueMessage *wvm) | ||
483 | { | 493 | { |
484 | struct GNUNET_STATISTICS_Handle *h = cls; | 494 | struct GNUNET_STATISTICS_Handle *h = cls; |
495 | struct GNUNET_STATISTICS_WatchEntry *w; | ||
496 | uint32_t wid; | ||
485 | 497 | ||
486 | GNUNET_STATISTICS_destroy (h, GNUNET_NO); | 498 | GNUNET_break (0 == ntohl (wvm->reserved)); |
499 | wid = ntohl (wvm->wid); | ||
500 | if (wid >= h->watches_size) | ||
501 | { | ||
502 | do_disconnect (h); | ||
503 | reconnect_later (h); | ||
504 | return; | ||
505 | } | ||
506 | w = h->watches[wid]; | ||
507 | if (NULL == w) | ||
508 | return; | ||
509 | (void) w->proc (w->proc_cls, | ||
510 | w->subsystem, | ||
511 | w->name, | ||
512 | GNUNET_ntohll (wvm->value), | ||
513 | 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT)); | ||
487 | } | 514 | } |
488 | 515 | ||
489 | 516 | ||
490 | /** | 517 | /** |
491 | * Reconnect at a later time, respecting back-off. | 518 | * Generic error handler, called with the appropriate error code and |
519 | * the same closure specified at the creation of the message queue. | ||
520 | * Not every message queue implementation supports an error handler. | ||
492 | * | 521 | * |
493 | * @param h statistics handle | 522 | * @param cls closure with the `struct GNUNET_STATISTICS_Handle *` |
523 | * @param error error code | ||
494 | */ | 524 | */ |
495 | static void | 525 | static void |
496 | reconnect_later (struct GNUNET_STATISTICS_Handle *h) | 526 | mq_error_handler (void *cls, |
527 | enum GNUNET_MQ_Error error) | ||
497 | { | 528 | { |
498 | int loss; | 529 | struct GNUNET_STATISTICS_Handle *h = cls; |
499 | struct GNUNET_STATISTICS_GetHandle *gh; | ||
500 | 530 | ||
501 | GNUNET_assert (NULL == h->backoff_task); | 531 | if (GNUNET_NO != h->do_destroy) |
502 | if (GNUNET_YES == h->do_destroy) | ||
503 | { | 532 | { |
504 | /* So we are shutting down and the service is not reachable. | ||
505 | * Chances are that it's down for good and we are not going to connect to | ||
506 | * it anymore. | ||
507 | * Give up and don't sync the rest of the data. | ||
508 | */ | ||
509 | loss = GNUNET_NO; | ||
510 | for (gh = h->action_head; NULL != gh; gh = gh->next) | ||
511 | if ( (gh->make_persistent) && (ACTION_SET == gh->type) ) | ||
512 | loss = GNUNET_YES; | ||
513 | if (GNUNET_YES == loss) | ||
514 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
515 | _("Could not save some persistent statistics\n")); | ||
516 | h->do_destroy = GNUNET_NO; | 533 | h->do_destroy = GNUNET_NO; |
517 | GNUNET_SCHEDULER_add_now (&do_destroy, h); | 534 | GNUNET_STATISTICS_destroy (h, |
535 | GNUNET_NO); | ||
518 | return; | 536 | return; |
519 | } | 537 | } |
520 | h->backoff_task = | 538 | do_disconnect (h); |
521 | GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h); | 539 | reconnect_later (h); |
522 | h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff); | ||
523 | } | 540 | } |
524 | 541 | ||
525 | 542 | ||
526 | /** | 543 | /** |
527 | * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. | 544 | * Task used to destroy the statistics handle. |
528 | * | 545 | * |
529 | * @param h statistics handle | 546 | * @param cls the `struct GNUNET_STATISTICS_Handle` |
530 | * @param msg message received from the service, never NULL | ||
531 | * @return #GNUNET_OK if the message was well-formed | ||
532 | */ | 547 | */ |
533 | static int | 548 | static void |
534 | process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h, | 549 | destroy_task (void *cls) |
535 | const struct GNUNET_MessageHeader *msg) | ||
536 | { | 550 | { |
537 | char *service; | 551 | struct GNUNET_STATISTICS_Handle *h = cls; |
538 | char *name; | ||
539 | const struct GNUNET_STATISTICS_ReplyMessage *smsg; | ||
540 | uint16_t size; | ||
541 | 552 | ||
542 | if (h->current->aborted) | 553 | GNUNET_STATISTICS_destroy (h, GNUNET_NO); |
543 | { | 554 | } |
544 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 555 | |
545 | "Iteration was aborted, ignoring VALUE\n"); | 556 | |
546 | return GNUNET_OK; /* don't bother */ | 557 | /** |
547 | } | 558 | * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this |
548 | size = ntohs (msg->size); | 559 | * message at the end of the shutdown when the service confirms that |
549 | if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage)) | 560 | * all data has been written to disk. |
561 | * | ||
562 | * @param cls our `struct GNUNET_STATISTICS_Handle *` | ||
563 | * @param msg the message | ||
564 | */ | ||
565 | static void | ||
566 | handle_test (void *cls, | ||
567 | const struct GNUNET_MessageHeader *msg) | ||
568 | { | ||
569 | struct GNUNET_STATISTICS_Handle *h = cls; | ||
570 | |||
571 | if (GNUNET_SYSERR != h->do_destroy) | ||
550 | { | 572 | { |
573 | /* not in shutdown, why do we get 'TEST'? */ | ||
551 | GNUNET_break (0); | 574 | GNUNET_break (0); |
552 | return GNUNET_SYSERR; | 575 | do_disconnect (h); |
576 | reconnect_later (h); | ||
577 | return; | ||
553 | } | 578 | } |
554 | smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg; | 579 | h->do_destroy = GNUNET_NO; |
555 | size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); | 580 | GNUNET_SCHEDULER_add_now (&destroy_task, |
556 | if (size != | 581 | h); |
557 | GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2, | 582 | } |
558 | &service, &name)) | 583 | |
584 | |||
585 | /** | ||
586 | * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive | ||
587 | * this message in response to a query to indicate that there are no | ||
588 | * further matching results. | ||
589 | * | ||
590 | * @param cls our `struct GNUNET_STATISTICS_Handle *` | ||
591 | * @param msg the message | ||
592 | */ | ||
593 | static void | ||
594 | handle_statistics_end (void *cls, | ||
595 | const struct GNUNET_MessageHeader *msg) | ||
596 | { | ||
597 | struct GNUNET_STATISTICS_Handle *h = cls; | ||
598 | struct GNUNET_STATISTICS_GetHandle *c; | ||
599 | |||
600 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
601 | "Received end of statistics marker\n"); | ||
602 | if (NULL == (c = h->current)) | ||
559 | { | 603 | { |
560 | GNUNET_break (0); | 604 | GNUNET_break (0); |
561 | return GNUNET_SYSERR; | 605 | do_disconnect (h); |
606 | reconnect_later (h); | ||
607 | return; | ||
562 | } | 608 | } |
563 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 609 | h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
564 | "Received valid statistic on `%s:%s': %llu\n", | 610 | h->current = NULL; |
565 | service, name, | 611 | schedule_action (h); |
566 | GNUNET_ntohll (smsg->value)); | 612 | if (NULL != c->cont) |
567 | if (GNUNET_OK != | ||
568 | h->current->proc (h->current->cls, service, name, | ||
569 | GNUNET_ntohll (smsg->value), | ||
570 | 0 != | ||
571 | (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT))) | ||
572 | { | 613 | { |
573 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 614 | c->cont (c->cls, |
574 | "Processing of remaining statistics aborted by client.\n"); | 615 | GNUNET_OK); |
575 | h->current->aborted = GNUNET_YES; | 616 | c->cont = NULL; |
576 | } | 617 | } |
577 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 618 | free_action_item (c); |
578 | "VALUE processed successfully\n"); | ||
579 | return GNUNET_OK; | ||
580 | } | 619 | } |
581 | 620 | ||
582 | 621 | ||
583 | /** | 622 | /** |
584 | * We have received a watch value from the service. Process it. | 623 | * Try to (re)connect to the statistics service. |
585 | * | 624 | * |
586 | * @param h statistics handle | 625 | * @param h statistics handle to reconnect |
587 | * @param msg the watch value message | 626 | * @return #GNUNET_YES on success, #GNUNET_NO on failure. |
588 | * @return #GNUNET_OK if the message was well-formed, #GNUNET_SYSERR if not, | ||
589 | * #GNUNET_NO if this watch has been cancelled | ||
590 | */ | 627 | */ |
591 | static int | 628 | static int |
592 | process_watch_value (struct GNUNET_STATISTICS_Handle *h, | 629 | try_connect (struct GNUNET_STATISTICS_Handle *h) |
593 | const struct GNUNET_MessageHeader *msg) | ||
594 | { | 630 | { |
595 | const struct GNUNET_STATISTICS_WatchValueMessage *wvm; | 631 | GNUNET_MQ_hd_fixed_size (test, |
596 | struct GNUNET_STATISTICS_WatchEntry *w; | 632 | GNUNET_MESSAGE_TYPE_TEST, |
597 | uint32_t wid; | 633 | struct GNUNET_MessageHeader); |
634 | GNUNET_MQ_hd_fixed_size (statistics_end, | ||
635 | GNUNET_MESSAGE_TYPE_STATISTICS_END, | ||
636 | struct GNUNET_MessageHeader); | ||
637 | GNUNET_MQ_hd_var_size (statistics_value, | ||
638 | GNUNET_MESSAGE_TYPE_STATISTICS_VALUE, | ||
639 | struct GNUNET_STATISTICS_ReplyMessage); | ||
640 | GNUNET_MQ_hd_fixed_size (statistics_watch_value, | ||
641 | GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE, | ||
642 | struct GNUNET_STATISTICS_WatchValueMessage); | ||
643 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
644 | make_test_handler (h), | ||
645 | make_statistics_end_handler (h), | ||
646 | make_statistics_value_handler (h), | ||
647 | make_statistics_watch_value_handler (h), | ||
648 | GNUNET_MQ_handler_end () | ||
649 | }; | ||
650 | struct GNUNET_STATISTICS_GetHandle *gh; | ||
651 | struct GNUNET_STATISTICS_GetHandle *gn; | ||
598 | 652 | ||
599 | if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size)) | 653 | if (NULL != h->backoff_task) |
654 | return GNUNET_NO; | ||
655 | if (NULL != h->mq) | ||
656 | return GNUNET_YES; | ||
657 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
658 | "statistics", | ||
659 | handlers, | ||
660 | &mq_error_handler, | ||
661 | h); | ||
662 | if (NULL == h->mq) | ||
600 | { | 663 | { |
601 | GNUNET_break (0); | 664 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
602 | return GNUNET_SYSERR; | 665 | "Failed to connect to statistics service!\n"); |
666 | return GNUNET_NO; | ||
603 | } | 667 | } |
604 | wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg; | 668 | gn = h->action_head; |
605 | GNUNET_break (0 == ntohl (wvm->reserved)); | 669 | while (NULL != (gh = gn)) |
606 | wid = ntohl (wvm->wid); | ||
607 | if (wid >= h->watches_size) | ||
608 | { | 670 | { |
609 | GNUNET_break (0); | 671 | gn = gh->next; |
610 | return GNUNET_SYSERR; | 672 | if (gh->type == ACTION_WATCH) |
673 | { | ||
674 | GNUNET_CONTAINER_DLL_remove (h->action_head, | ||
675 | h->action_tail, | ||
676 | gh); | ||
677 | free_action_item (gh); | ||
678 | } | ||
611 | } | 679 | } |
612 | w = h->watches[wid]; | 680 | for (unsigned int i = 0; i < h->watches_size; i++) |
613 | if (NULL == w) | 681 | if (NULL != h->watches[i]) |
614 | return GNUNET_NO; | 682 | schedule_watch_request (h, |
615 | (void) w->proc (w->proc_cls, w->subsystem, w->name, | 683 | h->watches[i]); |
616 | GNUNET_ntohll (wvm->value), | 684 | return GNUNET_YES; |
617 | 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT)); | ||
618 | return GNUNET_OK; | ||
619 | } | 685 | } |
620 | 686 | ||
621 | 687 | ||
622 | /** | 688 | /** |
623 | * Task used to destroy the statistics handle. | 689 | * We've waited long enough, reconnect now. |
624 | * | 690 | * |
625 | * @param cls the `struct GNUNET_STATISTICS_Handle` | 691 | * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect |
626 | */ | 692 | */ |
627 | static void | 693 | static void |
628 | destroy_task (void *cls) | 694 | reconnect_task (void *cls) |
629 | { | 695 | { |
630 | struct GNUNET_STATISTICS_Handle *h = cls; | 696 | struct GNUNET_STATISTICS_Handle *h = cls; |
631 | 697 | ||
632 | GNUNET_STATISTICS_destroy (h, GNUNET_NO); | 698 | h->backoff_task = NULL; |
699 | schedule_action (h); | ||
633 | } | 700 | } |
634 | 701 | ||
635 | 702 | ||
636 | /** | 703 | /** |
637 | * Function called with messages from stats service. | 704 | * Task used by #reconnect_later() to shutdown the handle |
638 | * | 705 | * |
639 | * @param cls closure | 706 | * @param cls the statistics handle |
640 | * @param msg message received, NULL on timeout or fatal error | ||
641 | */ | 707 | */ |
642 | static void | 708 | static void |
643 | receive_stats (void *cls, | 709 | do_destroy (void *cls) |
644 | const struct GNUNET_MessageHeader *msg) | ||
645 | { | 710 | { |
646 | struct GNUNET_STATISTICS_Handle *h = cls; | 711 | struct GNUNET_STATISTICS_Handle *h = cls; |
647 | struct GNUNET_STATISTICS_GetHandle *c; | ||
648 | int ret; | ||
649 | 712 | ||
650 | if (NULL == msg) | 713 | GNUNET_STATISTICS_destroy (h, |
651 | { | 714 | GNUNET_NO); |
652 | LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | 715 | } |
653 | "Error receiving statistics from service, is the service running?\n"); | 716 | |
654 | do_disconnect (h); | 717 | |
655 | reconnect_later (h); | 718 | /** |
656 | return; | 719 | * Reconnect at a later time, respecting back-off. |
657 | } | 720 | * |
658 | switch (ntohs (msg->type)) | 721 | * @param h statistics handle |
722 | */ | ||
723 | static void | ||
724 | reconnect_later (struct GNUNET_STATISTICS_Handle *h) | ||
725 | { | ||
726 | int loss; | ||
727 | struct GNUNET_STATISTICS_GetHandle *gh; | ||
728 | |||
729 | GNUNET_assert (NULL == h->backoff_task); | ||
730 | if (GNUNET_YES == h->do_destroy) | ||
659 | { | 731 | { |
660 | case GNUNET_MESSAGE_TYPE_TEST: | 732 | /* So we are shutting down and the service is not reachable. |
661 | if (GNUNET_SYSERR != h->do_destroy) | 733 | * Chances are that it's down for good and we are not going to connect to |
662 | { | 734 | * it anymore. |
663 | /* not in shutdown, why do we get 'TEST'? */ | 735 | * Give up and don't sync the rest of the data. |
664 | GNUNET_break (0); | 736 | */ |
665 | do_disconnect (h); | 737 | loss = GNUNET_NO; |
666 | reconnect_later (h); | 738 | for (gh = h->action_head; NULL != gh; gh = gh->next) |
667 | return; | 739 | if ( (gh->make_persistent) && (ACTION_SET == gh->type) ) |
668 | } | 740 | loss = GNUNET_YES; |
741 | if (GNUNET_YES == loss) | ||
742 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
743 | _("Could not save some persistent statistics\n")); | ||
669 | h->do_destroy = GNUNET_NO; | 744 | h->do_destroy = GNUNET_NO; |
670 | GNUNET_SCHEDULER_add_now (&destroy_task, h); | 745 | GNUNET_SCHEDULER_add_now (&do_destroy, |
671 | break; | 746 | h); |
672 | case GNUNET_MESSAGE_TYPE_STATISTICS_END: | ||
673 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
674 | "Received end of statistics marker\n"); | ||
675 | if (NULL == (c = h->current)) | ||
676 | { | ||
677 | GNUNET_break (0); | ||
678 | do_disconnect (h); | ||
679 | reconnect_later (h); | ||
680 | return; | ||
681 | } | ||
682 | h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
683 | if (h->watches_size > 0) | ||
684 | { | ||
685 | GNUNET_CLIENT_receive (h->client, &receive_stats, h, | ||
686 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
687 | } | ||
688 | else | ||
689 | { | ||
690 | h->receiving = GNUNET_NO; | ||
691 | } | ||
692 | h->current = NULL; | ||
693 | schedule_action (h); | ||
694 | if (NULL != c->cont) | ||
695 | { | ||
696 | c->cont (c->cls, GNUNET_OK); | ||
697 | c->cont = NULL; | ||
698 | } | ||
699 | free_action_item (c); | ||
700 | return; | ||
701 | case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE: | ||
702 | if (GNUNET_OK != process_statistics_value_message (h, msg)) | ||
703 | { | ||
704 | do_disconnect (h); | ||
705 | reconnect_later (h); | ||
706 | return; | ||
707 | } | ||
708 | /* finally, look for more! */ | ||
709 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
710 | "Processing VALUE done, now reading more\n"); | ||
711 | GNUNET_CLIENT_receive (h->client, &receive_stats, h, | ||
712 | GNUNET_TIME_absolute_get_remaining (h-> | ||
713 | current->timeout)); | ||
714 | h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
715 | return; | ||
716 | case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE: | ||
717 | if (GNUNET_OK != | ||
718 | (ret = process_watch_value (h, msg))) | ||
719 | { | ||
720 | do_disconnect (h); | ||
721 | if (GNUNET_NO == ret) | ||
722 | h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
723 | reconnect_later (h); | ||
724 | return; | ||
725 | } | ||
726 | h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
727 | GNUNET_assert (h->watches_size > 0); | ||
728 | GNUNET_CLIENT_receive (h->client, &receive_stats, h, | ||
729 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
730 | return; | ||
731 | default: | ||
732 | GNUNET_break (0); | ||
733 | do_disconnect (h); | ||
734 | reconnect_later (h); | ||
735 | return; | 747 | return; |
736 | } | 748 | } |
749 | h->backoff_task | ||
750 | = GNUNET_SCHEDULER_add_delayed (h->backoff, | ||
751 | &reconnect_task, | ||
752 | h); | ||
753 | h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff); | ||
737 | } | 754 | } |
738 | 755 | ||
739 | 756 | ||
757 | |||
740 | /** | 758 | /** |
741 | * Transmit a GET request (and if successful, start to receive | 759 | * Transmit a GET request (and if successful, start to receive |
742 | * the response). | 760 | * the response). |
743 | * | 761 | * |
744 | * @param handle statistics handle | 762 | * @param handle statistics handle |
745 | * @param size how many bytes can we write to @a buf | ||
746 | * @param buf where to write requests to the service | ||
747 | * @return number of bytes written to @a buf | ||
748 | */ | 763 | */ |
749 | static size_t | 764 | static void |
750 | transmit_get (struct GNUNET_STATISTICS_Handle *handle, | 765 | transmit_get (struct GNUNET_STATISTICS_Handle *handle) |
751 | size_t size, | ||
752 | void *buf) | ||
753 | { | 766 | { |
754 | struct GNUNET_STATISTICS_GetHandle *c; | 767 | struct GNUNET_STATISTICS_GetHandle *c; |
755 | struct GNUNET_MessageHeader *hdr; | 768 | struct GNUNET_MessageHeader *hdr; |
769 | struct GNUNET_MQ_Envelope *env; | ||
756 | size_t slen1; | 770 | size_t slen1; |
757 | size_t slen2; | 771 | size_t slen2; |
758 | uint16_t msize; | ||
759 | 772 | ||
760 | GNUNET_assert (NULL != (c = handle->current)); | 773 | GNUNET_assert (NULL != (c = handle->current)); |
761 | if (NULL == buf) | ||
762 | { | ||
763 | /* timeout / error */ | ||
764 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
765 | "Transmission of request for statistics failed!\n"); | ||
766 | do_disconnect (handle); | ||
767 | reconnect_later (handle); | ||
768 | return 0; | ||
769 | } | ||
770 | slen1 = strlen (c->subsystem) + 1; | 774 | slen1 = strlen (c->subsystem) + 1; |
771 | slen2 = strlen (c->name) + 1; | 775 | slen2 = strlen (c->name) + 1; |
772 | msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); | 776 | env = GNUNET_MQ_msg_extra (hdr, |
773 | GNUNET_assert (msize <= size); | 777 | slen1 + slen2, |
774 | hdr = (struct GNUNET_MessageHeader *) buf; | 778 | GNUNET_MESSAGE_TYPE_STATISTICS_GET); |
775 | hdr->size = htons (msize); | ||
776 | hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET); | ||
777 | GNUNET_assert (slen1 + slen2 == | 779 | GNUNET_assert (slen1 + slen2 == |
778 | GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, | 780 | GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], |
781 | slen1 + slen2, | ||
782 | 2, | ||
779 | c->subsystem, | 783 | c->subsystem, |
780 | c->name)); | 784 | c->name)); |
781 | if (GNUNET_YES != handle->receiving) | 785 | GNUNET_MQ_send (handle->mq, |
782 | { | 786 | env); |
783 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
784 | "Transmission of GET done, now reading response\n"); | ||
785 | handle->receiving = GNUNET_YES; | ||
786 | GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, | ||
787 | GNUNET_TIME_absolute_get_remaining (c->timeout)); | ||
788 | } | ||
789 | return msize; | ||
790 | } | 787 | } |
791 | 788 | ||
792 | 789 | ||
@@ -795,53 +792,34 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, | |||
795 | * the response). | 792 | * the response). |
796 | * | 793 | * |
797 | * @param handle statistics handle | 794 | * @param handle statistics handle |
798 | * @param size how many bytes can we write to @a buf | ||
799 | * @param buf where to write requests to the service | ||
800 | * @return number of bytes written to @a buf | ||
801 | */ | 795 | */ |
802 | static size_t | 796 | static void |
803 | transmit_watch (struct GNUNET_STATISTICS_Handle *handle, | 797 | transmit_watch (struct GNUNET_STATISTICS_Handle *handle) |
804 | size_t size, | ||
805 | void *buf) | ||
806 | { | 798 | { |
807 | struct GNUNET_MessageHeader *hdr; | 799 | struct GNUNET_MessageHeader *hdr; |
800 | struct GNUNET_MQ_Envelope *env; | ||
808 | size_t slen1; | 801 | size_t slen1; |
809 | size_t slen2; | 802 | size_t slen2; |
810 | uint16_t msize; | ||
811 | 803 | ||
812 | if (NULL == buf) | ||
813 | { | ||
814 | /* timeout / error */ | ||
815 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
816 | "Transmission of request for statistics failed!\n"); | ||
817 | do_disconnect (handle); | ||
818 | reconnect_later (handle); | ||
819 | return 0; | ||
820 | } | ||
821 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 804 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
822 | "Transmitting watch request for `%s'\n", | 805 | "Transmitting watch request for `%s'\n", |
823 | handle->current->name); | 806 | handle->current->name); |
824 | slen1 = strlen (handle->current->subsystem) + 1; | 807 | slen1 = strlen (handle->current->subsystem) + 1; |
825 | slen2 = strlen (handle->current->name) + 1; | 808 | slen2 = strlen (handle->current->name) + 1; |
826 | msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); | 809 | env = GNUNET_MQ_msg_extra (hdr, |
827 | GNUNET_assert (msize <= size); | 810 | slen1 + slen2, |
828 | hdr = (struct GNUNET_MessageHeader *) buf; | 811 | GNUNET_MESSAGE_TYPE_STATISTICS_WATCH); |
829 | hdr->size = htons (msize); | ||
830 | hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH); | ||
831 | GNUNET_assert (slen1 + slen2 == | 812 | GNUNET_assert (slen1 + slen2 == |
832 | GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, | 813 | GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], |
814 | slen1 + slen2, | ||
815 | 2, | ||
833 | handle->current->subsystem, | 816 | handle->current->subsystem, |
834 | handle->current->name)); | 817 | handle->current->name)); |
835 | if (GNUNET_YES != handle->receiving) | 818 | GNUNET_MQ_send (handle->mq, |
836 | { | 819 | env); |
837 | handle->receiving = GNUNET_YES; | ||
838 | GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, | ||
839 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
840 | } | ||
841 | GNUNET_assert (NULL == handle->current->cont); | 820 | GNUNET_assert (NULL == handle->current->cont); |
842 | free_action_item (handle->current); | 821 | free_action_item (handle->current); |
843 | handle->current = NULL; | 822 | handle->current = NULL; |
844 | return msize; | ||
845 | } | 823 | } |
846 | 824 | ||
847 | 825 | ||
@@ -849,39 +827,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, | |||
849 | * Transmit a SET/UPDATE request. | 827 | * Transmit a SET/UPDATE request. |
850 | * | 828 | * |
851 | * @param handle statistics handle | 829 | * @param handle statistics handle |
852 | * @param size how many bytes can we write to @a buf | ||
853 | * @param buf where to write requests to the service | ||
854 | * @return number of bytes written to @a buf | ||
855 | */ | 830 | */ |
856 | static size_t | 831 | static void |
857 | transmit_set (struct GNUNET_STATISTICS_Handle *handle, | 832 | transmit_set (struct GNUNET_STATISTICS_Handle *handle) |
858 | size_t size, | ||
859 | void *buf) | ||
860 | { | 833 | { |
861 | struct GNUNET_STATISTICS_SetMessage *r; | 834 | struct GNUNET_STATISTICS_SetMessage *r; |
835 | struct GNUNET_MQ_Envelope *env; | ||
862 | size_t slen; | 836 | size_t slen; |
863 | size_t nlen; | 837 | size_t nlen; |
864 | size_t nsize; | ||
865 | 838 | ||
866 | if (NULL == buf) | ||
867 | { | ||
868 | do_disconnect (handle); | ||
869 | reconnect_later (handle); | ||
870 | return 0; | ||
871 | } | ||
872 | slen = strlen (handle->current->subsystem) + 1; | 839 | slen = strlen (handle->current->subsystem) + 1; |
873 | nlen = strlen (handle->current->name) + 1; | 840 | nlen = strlen (handle->current->name) + 1; |
874 | nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; | 841 | env = GNUNET_MQ_msg_extra (r, |
875 | if (size < nsize) | 842 | slen + nlen, |
876 | { | 843 | GNUNET_MESSAGE_TYPE_STATISTICS_SET); |
877 | GNUNET_break (0); | ||
878 | do_disconnect (handle); | ||
879 | reconnect_later (handle); | ||
880 | return 0; | ||
881 | } | ||
882 | r = buf; | ||
883 | r->header.size = htons (nsize); | ||
884 | r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET); | ||
885 | r->flags = 0; | 844 | r->flags = 0; |
886 | r->value = GNUNET_htonll (handle->current->value); | 845 | r->value = GNUNET_htonll (handle->current->value); |
887 | if (handle->current->make_persistent) | 846 | if (handle->current->make_persistent) |
@@ -889,52 +848,17 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, | |||
889 | if (handle->current->type == ACTION_UPDATE) | 848 | if (handle->current->type == ACTION_UPDATE) |
890 | r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE); | 849 | r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE); |
891 | GNUNET_assert (slen + nlen == | 850 | GNUNET_assert (slen + nlen == |
892 | GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2, | 851 | GNUNET_STRINGS_buffer_fill ((char *) &r[1], |
852 | slen + nlen, | ||
853 | 2, | ||
893 | handle->current->subsystem, | 854 | handle->current->subsystem, |
894 | handle->current->name)); | 855 | handle->current->name)); |
895 | GNUNET_assert (NULL == handle->current->cont); | 856 | GNUNET_assert (NULL == handle->current->cont); |
896 | free_action_item (handle->current); | 857 | free_action_item (handle->current); |
897 | handle->current = NULL; | 858 | handle->current = NULL; |
898 | update_memory_statistics (handle); | 859 | update_memory_statistics (handle); |
899 | return nsize; | 860 | GNUNET_MQ_send (handle->mq, |
900 | } | 861 | env); |
901 | |||
902 | |||
903 | /** | ||
904 | * Function called when we are ready to transmit a request to the service. | ||
905 | * | ||
906 | * @param cls the `struct GNUNET_STATISTICS_Handle` | ||
907 | * @param size how many bytes can we write to @a buf | ||
908 | * @param buf where to write requests to the service | ||
909 | * @return number of bytes written to @a buf | ||
910 | */ | ||
911 | static size_t | ||
912 | transmit_action (void *cls, size_t size, void *buf) | ||
913 | { | ||
914 | struct GNUNET_STATISTICS_Handle *h = cls; | ||
915 | size_t ret; | ||
916 | |||
917 | h->th = NULL; | ||
918 | ret = 0; | ||
919 | if (NULL != h->current) | ||
920 | switch (h->current->type) | ||
921 | { | ||
922 | case ACTION_GET: | ||
923 | ret = transmit_get (h, size, buf); | ||
924 | break; | ||
925 | case ACTION_SET: | ||
926 | case ACTION_UPDATE: | ||
927 | ret = transmit_set (h, size, buf); | ||
928 | break; | ||
929 | case ACTION_WATCH: | ||
930 | ret = transmit_watch (h, size, buf); | ||
931 | break; | ||
932 | default: | ||
933 | GNUNET_assert (0); | ||
934 | break; | ||
935 | } | ||
936 | schedule_action (h); | ||
937 | return ret; | ||
938 | } | 862 | } |
939 | 863 | ||
940 | 864 | ||
@@ -952,10 +876,10 @@ GNUNET_STATISTICS_create (const char *subsystem, | |||
952 | struct GNUNET_STATISTICS_Handle *ret; | 876 | struct GNUNET_STATISTICS_Handle *ret; |
953 | 877 | ||
954 | if (GNUNET_YES == | 878 | if (GNUNET_YES == |
955 | GNUNET_CONFIGURATION_get_value_yesno (cfg, "statistics", "DISABLE")) | 879 | GNUNET_CONFIGURATION_get_value_yesno (cfg, |
880 | "statistics", | ||
881 | "DISABLE")) | ||
956 | return NULL; | 882 | return NULL; |
957 | GNUNET_assert (NULL != subsystem); | ||
958 | GNUNET_assert (NULL != cfg); | ||
959 | ret = GNUNET_new (struct GNUNET_STATISTICS_Handle); | 883 | ret = GNUNET_new (struct GNUNET_STATISTICS_Handle); |
960 | ret->cfg = cfg; | 884 | ret->cfg = cfg; |
961 | ret->subsystem = GNUNET_strdup (subsystem); | 885 | ret->subsystem = GNUNET_strdup (subsystem); |
@@ -978,8 +902,6 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, | |||
978 | { | 902 | { |
979 | struct GNUNET_STATISTICS_GetHandle *pos; | 903 | struct GNUNET_STATISTICS_GetHandle *pos; |
980 | struct GNUNET_STATISTICS_GetHandle *next; | 904 | struct GNUNET_STATISTICS_GetHandle *next; |
981 | struct GNUNET_TIME_Relative timeout; | ||
982 | int i; | ||
983 | 905 | ||
984 | if (NULL == h) | 906 | if (NULL == h) |
985 | return; | 907 | return; |
@@ -989,26 +911,19 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, | |||
989 | GNUNET_SCHEDULER_cancel (h->backoff_task); | 911 | GNUNET_SCHEDULER_cancel (h->backoff_task); |
990 | h->backoff_task = NULL; | 912 | h->backoff_task = NULL; |
991 | } | 913 | } |
992 | if (sync_first) | 914 | if ( (sync_first) && |
915 | (GNUNET_YES == try_connect (h)) ) | ||
993 | { | 916 | { |
994 | if (NULL != h->current) | 917 | if ( (NULL != h->current) && |
995 | { | 918 | (ACTION_GET == h->current->type) ) |
996 | if (ACTION_GET == h->current->type) | 919 | h->current->aborted = GNUNET_YES; |
997 | { | ||
998 | if (NULL != h->th) | ||
999 | { | ||
1000 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
1001 | h->th = NULL; | ||
1002 | } | ||
1003 | free_action_item (h->current); | ||
1004 | h->current = NULL; | ||
1005 | } | ||
1006 | } | ||
1007 | next = h->action_head; | 920 | next = h->action_head; |
1008 | while (NULL != (pos = next)) | 921 | while (NULL != (pos = next)) |
1009 | { | 922 | { |
1010 | next = pos->next; | 923 | next = pos->next; |
1011 | if (ACTION_GET == pos->type) | 924 | if ( (ACTION_GET == pos->type) || |
925 | (ACTION_WATCH == pos->type) || | ||
926 | (GNUNET_NO == pos->make_persistent) ) | ||
1012 | { | 927 | { |
1013 | GNUNET_CONTAINER_DLL_remove (h->action_head, | 928 | GNUNET_CONTAINER_DLL_remove (h->action_head, |
1014 | h->action_tail, | 929 | h->action_tail, |
@@ -1016,25 +931,11 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, | |||
1016 | free_action_item (pos); | 931 | free_action_item (pos); |
1017 | } | 932 | } |
1018 | } | 933 | } |
1019 | if ( (NULL == h->current) && | ||
1020 | (NULL != (h->current = h->action_head)) ) | ||
1021 | GNUNET_CONTAINER_DLL_remove (h->action_head, | ||
1022 | h->action_tail, | ||
1023 | h->current); | ||
1024 | h->do_destroy = GNUNET_YES; | 934 | h->do_destroy = GNUNET_YES; |
1025 | if ((NULL != h->current) && (NULL == h->th) && | 935 | schedule_action (h); |
1026 | (NULL != h->client)) | 936 | return; /* do not finish destruction just yet */ |
1027 | { | ||
1028 | timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); | ||
1029 | h->th = | ||
1030 | GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, | ||
1031 | timeout, GNUNET_YES, | ||
1032 | &transmit_action, h); | ||
1033 | GNUNET_assert (NULL != h->th); | ||
1034 | } | ||
1035 | if (NULL != h->th) | ||
1036 | return; /* do not finish destruction just yet */ | ||
1037 | } | 937 | } |
938 | /* do clean up all */ | ||
1038 | while (NULL != (pos = h->action_head)) | 939 | while (NULL != (pos = h->action_head)) |
1039 | { | 940 | { |
1040 | GNUNET_CONTAINER_DLL_remove (h->action_head, | 941 | GNUNET_CONTAINER_DLL_remove (h->action_head, |
@@ -1043,7 +944,7 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, | |||
1043 | free_action_item (pos); | 944 | free_action_item (pos); |
1044 | } | 945 | } |
1045 | do_disconnect (h); | 946 | do_disconnect (h); |
1046 | for (i = 0; i < h->watches_size; i++) | 947 | for (unsigned int i = 0; i < h->watches_size; i++) |
1047 | { | 948 | { |
1048 | if (NULL == h->watches[i]) | 949 | if (NULL == h->watches[i]) |
1049 | continue; | 950 | continue; |
@@ -1051,53 +952,15 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, | |||
1051 | GNUNET_free (h->watches[i]->name); | 952 | GNUNET_free (h->watches[i]->name); |
1052 | GNUNET_free (h->watches[i]); | 953 | GNUNET_free (h->watches[i]); |
1053 | } | 954 | } |
1054 | GNUNET_array_grow (h->watches, h->watches_size, 0); | 955 | GNUNET_array_grow (h->watches, |
956 | h->watches_size, | ||
957 | 0); | ||
1055 | GNUNET_free (h->subsystem); | 958 | GNUNET_free (h->subsystem); |
1056 | GNUNET_free (h); | 959 | GNUNET_free (h); |
1057 | } | 960 | } |
1058 | 961 | ||
1059 | 962 | ||
1060 | /** | 963 | /** |
1061 | * Function called to transmit TEST message to service to | ||
1062 | * confirm that the service has received all of our 'SET' | ||
1063 | * messages (during statistics disconnect/shutdown). | ||
1064 | * | ||
1065 | * @param cls the `struct GNUNET_STATISTICS_Handle` | ||
1066 | * @param size how many bytes can we write to @a buf | ||
1067 | * @param buf where to write requests to the service | ||
1068 | * @return number of bytes written to @a buf | ||
1069 | */ | ||
1070 | static size_t | ||
1071 | transmit_test_on_shutdown (void *cls, | ||
1072 | size_t size, | ||
1073 | void *buf) | ||
1074 | { | ||
1075 | struct GNUNET_STATISTICS_Handle *h = cls; | ||
1076 | struct GNUNET_MessageHeader hdr; | ||
1077 | |||
1078 | h->th = NULL; | ||
1079 | if (NULL == buf) | ||
1080 | { | ||
1081 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1082 | _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n")); | ||
1083 | h->do_destroy = GNUNET_NO; | ||
1084 | GNUNET_SCHEDULER_add_now (&destroy_task, h); | ||
1085 | return 0; | ||
1086 | } | ||
1087 | hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST); | ||
1088 | hdr.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
1089 | memcpy (buf, &hdr, sizeof (hdr)); | ||
1090 | if (GNUNET_YES != h->receiving) | ||
1091 | { | ||
1092 | h->receiving = GNUNET_YES; | ||
1093 | GNUNET_CLIENT_receive (h->client, &receive_stats, h, | ||
1094 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1095 | } | ||
1096 | return sizeof (struct GNUNET_MessageHeader); | ||
1097 | } | ||
1098 | |||
1099 | |||
1100 | /** | ||
1101 | * Schedule the next action to be performed. | 964 | * Schedule the next action to be performed. |
1102 | * | 965 | * |
1103 | * @param h statistics handle | 966 | * @param h statistics handle |
@@ -1105,76 +968,61 @@ transmit_test_on_shutdown (void *cls, | |||
1105 | static void | 968 | static void |
1106 | schedule_action (struct GNUNET_STATISTICS_Handle *h) | 969 | schedule_action (struct GNUNET_STATISTICS_Handle *h) |
1107 | { | 970 | { |
1108 | struct GNUNET_TIME_Relative timeout; | 971 | if (NULL != h->backoff_task) |
1109 | |||
1110 | if ( (NULL != h->th) || | ||
1111 | (NULL != h->backoff_task) ) | ||
1112 | return; /* action already pending */ | 972 | return; /* action already pending */ |
1113 | if (GNUNET_YES != try_connect (h)) | 973 | if (GNUNET_YES != try_connect (h)) |
1114 | { | 974 | { |
1115 | reconnect_later (h); | 975 | reconnect_later (h); |
1116 | return; | 976 | return; |
1117 | } | 977 | } |
1118 | if (NULL != h->current) | ||
1119 | return; /* action already pending */ | ||
1120 | /* schedule next action */ | 978 | /* schedule next action */ |
1121 | h->current = h->action_head; | 979 | while (NULL == h->current) |
1122 | if (NULL == h->current) | ||
1123 | { | 980 | { |
1124 | if (GNUNET_YES == h->do_destroy) | 981 | h->current = h->action_head; |
982 | if (NULL == h->current) | ||
1125 | { | 983 | { |
984 | struct GNUNET_MessageHeader *hdr; | ||
985 | struct GNUNET_MQ_Envelope *env; | ||
986 | |||
987 | if (GNUNET_YES != h->do_destroy) | ||
988 | return; /* nothing to do */ | ||
989 | /* let service know that we're done */ | ||
1126 | h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ | 990 | h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ |
1127 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, | 991 | env = GNUNET_MQ_msg (hdr, |
1128 | sizeof (struct GNUNET_MessageHeader), | 992 | GNUNET_MESSAGE_TYPE_TEST); |
1129 | SET_TRANSMIT_TIMEOUT, | 993 | GNUNET_MQ_send (h->mq, |
1130 | GNUNET_NO, | 994 | env); |
1131 | &transmit_test_on_shutdown, h); | 995 | return; |
996 | } | ||
997 | GNUNET_CONTAINER_DLL_remove (h->action_head, | ||
998 | h->action_tail, | ||
999 | h->current); | ||
1000 | switch (h->current->type) | ||
1001 | { | ||
1002 | case ACTION_GET: | ||
1003 | transmit_get (h); | ||
1004 | break; | ||
1005 | case ACTION_SET: | ||
1006 | case ACTION_UPDATE: | ||
1007 | transmit_set (h); | ||
1008 | break; | ||
1009 | case ACTION_WATCH: | ||
1010 | transmit_watch (h); | ||
1011 | break; | ||
1012 | default: | ||
1013 | GNUNET_assert (0); | ||
1014 | break; | ||
1132 | } | 1015 | } |
1133 | return; | ||
1134 | } | ||
1135 | GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current); | ||
1136 | timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); | ||
1137 | if (NULL == | ||
1138 | (h->th = | ||
1139 | GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, | ||
1140 | timeout, GNUNET_YES, | ||
1141 | &transmit_action, h))) | ||
1142 | { | ||
1143 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1144 | "Failed to transmit request to statistics service.\n"); | ||
1145 | do_disconnect (h); | ||
1146 | reconnect_later (h); | ||
1147 | } | 1016 | } |
1148 | } | 1017 | } |
1149 | 1018 | ||
1150 | 1019 | ||
1151 | /** | 1020 | /** |
1152 | * We have run into a timeout on a #GNUNET_STATISTICS_get() operation, | ||
1153 | * call the continuation. | ||
1154 | * | ||
1155 | * @param cls the `struct GNUNET_STATISTICS_GetHandle` | ||
1156 | */ | ||
1157 | static void | ||
1158 | run_get_timeout (void *cls) | ||
1159 | { | ||
1160 | struct GNUNET_STATISTICS_GetHandle *gh = cls; | ||
1161 | GNUNET_STATISTICS_Callback cont = gh->cont; | ||
1162 | void *cont_cls = gh->cls; | ||
1163 | |||
1164 | gh->timeout_task = NULL; | ||
1165 | GNUNET_STATISTICS_get_cancel (gh); | ||
1166 | cont (cont_cls, GNUNET_SYSERR); | ||
1167 | } | ||
1168 | |||
1169 | |||
1170 | /** | ||
1171 | * Get statistic from the peer. | 1021 | * Get statistic from the peer. |
1172 | * | 1022 | * |
1173 | * @param handle identification of the statistics service | 1023 | * @param handle identification of the statistics service |
1174 | * @param subsystem limit to the specified subsystem, NULL for our subsystem | 1024 | * @param subsystem limit to the specified subsystem, NULL for our subsystem |
1175 | * @param name name of the statistic value, NULL for all values | 1025 | * @param name name of the statistic value, NULL for all values |
1176 | * @param timeout after how long should we give up (and call | ||
1177 | * cont with an error code)? | ||
1178 | * @param cont continuation to call when done (can be NULL) | 1026 | * @param cont continuation to call when done (can be NULL) |
1179 | * This callback CANNOT destroy the statistics handle in the same call. | 1027 | * This callback CANNOT destroy the statistics handle in the same call. |
1180 | * @param proc function to call on each value | 1028 | * @param proc function to call on each value |
@@ -1183,10 +1031,11 @@ run_get_timeout (void *cls) | |||
1183 | */ | 1031 | */ |
1184 | struct GNUNET_STATISTICS_GetHandle * | 1032 | struct GNUNET_STATISTICS_GetHandle * |
1185 | GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, | 1033 | GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, |
1186 | const char *subsystem, const char *name, | 1034 | const char *subsystem, |
1187 | struct GNUNET_TIME_Relative timeout, | 1035 | const char *name, |
1188 | GNUNET_STATISTICS_Callback cont, | 1036 | GNUNET_STATISTICS_Callback cont, |
1189 | GNUNET_STATISTICS_Iterator proc, void *cls) | 1037 | GNUNET_STATISTICS_Iterator proc, |
1038 | void *cls) | ||
1190 | { | 1039 | { |
1191 | size_t slen1; | 1040 | size_t slen1; |
1192 | size_t slen2; | 1041 | size_t slen2; |
@@ -1211,12 +1060,8 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, | |||
1211 | ai->cont = cont; | 1060 | ai->cont = cont; |
1212 | ai->proc = proc; | 1061 | ai->proc = proc; |
1213 | ai->cls = cls; | 1062 | ai->cls = cls; |
1214 | ai->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
1215 | ai->type = ACTION_GET; | 1063 | ai->type = ACTION_GET; |
1216 | ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); | 1064 | ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); |
1217 | ai->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, | ||
1218 | &run_get_timeout, | ||
1219 | ai); | ||
1220 | GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, | 1065 | GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, |
1221 | handle->action_tail, | 1066 | handle->action_tail, |
1222 | ai); | 1067 | ai); |
@@ -1236,23 +1081,18 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh) | |||
1236 | { | 1081 | { |
1237 | if (NULL == gh) | 1082 | if (NULL == gh) |
1238 | return; | 1083 | return; |
1239 | if (NULL != gh->timeout_task) | ||
1240 | { | ||
1241 | GNUNET_SCHEDULER_cancel (gh->timeout_task); | ||
1242 | gh->timeout_task = NULL; | ||
1243 | } | ||
1244 | gh->cont = NULL; | 1084 | gh->cont = NULL; |
1245 | if (gh->sh->current == gh) | 1085 | if (gh->sh->current == gh) |
1246 | { | 1086 | { |
1247 | gh->aborted = GNUNET_YES; | 1087 | gh->aborted = GNUNET_YES; |
1088 | return; | ||
1248 | } | 1089 | } |
1249 | else | 1090 | GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, |
1250 | { | 1091 | gh->sh->action_tail, |
1251 | GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh); | 1092 | gh); |
1252 | GNUNET_free (gh->name); | 1093 | GNUNET_free (gh->name); |
1253 | GNUNET_free (gh->subsystem); | 1094 | GNUNET_free (gh->subsystem); |
1254 | GNUNET_free (gh); | 1095 | GNUNET_free (gh); |
1255 | } | ||
1256 | } | 1096 | } |
1257 | 1097 | ||
1258 | 1098 | ||
@@ -1268,8 +1108,10 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh) | |||
1268 | */ | 1108 | */ |
1269 | int | 1109 | int |
1270 | GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, | 1110 | GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, |
1271 | const char *subsystem, const char *name, | 1111 | const char *subsystem, |
1272 | GNUNET_STATISTICS_Iterator proc, void *proc_cls) | 1112 | const char *name, |
1113 | GNUNET_STATISTICS_Iterator proc, | ||
1114 | void *proc_cls) | ||
1273 | { | 1115 | { |
1274 | struct GNUNET_STATISTICS_WatchEntry *w; | 1116 | struct GNUNET_STATISTICS_WatchEntry *w; |
1275 | 1117 | ||
@@ -1280,8 +1122,11 @@ GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, | |||
1280 | w->name = GNUNET_strdup (name); | 1122 | w->name = GNUNET_strdup (name); |
1281 | w->proc = proc; | 1123 | w->proc = proc; |
1282 | w->proc_cls = proc_cls; | 1124 | w->proc_cls = proc_cls; |
1283 | GNUNET_array_append (handle->watches, handle->watches_size, w); | 1125 | GNUNET_array_append (handle->watches, |
1284 | schedule_watch_request (handle, w); | 1126 | handle->watches_size, |
1127 | w); | ||
1128 | schedule_watch_request (handle, | ||
1129 | w); | ||
1285 | return GNUNET_OK; | 1130 | return GNUNET_OK; |
1286 | } | 1131 | } |
1287 | 1132 | ||
@@ -1304,11 +1149,10 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, | |||
1304 | void *proc_cls) | 1149 | void *proc_cls) |
1305 | { | 1150 | { |
1306 | struct GNUNET_STATISTICS_WatchEntry *w; | 1151 | struct GNUNET_STATISTICS_WatchEntry *w; |
1307 | unsigned int i; | ||
1308 | 1152 | ||
1309 | if (NULL == handle) | 1153 | if (NULL == handle) |
1310 | return GNUNET_SYSERR; | 1154 | return GNUNET_SYSERR; |
1311 | for (i=0;i<handle->watches_size;i++) | 1155 | for (unsigned int i=0;i<handle->watches_size;i++) |
1312 | { | 1156 | { |
1313 | w = handle->watches[i]; | 1157 | w = handle->watches[i]; |
1314 | if (NULL == w) | 1158 | if (NULL == w) |
@@ -1329,7 +1173,6 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, | |||
1329 | } | 1173 | } |
1330 | 1174 | ||
1331 | 1175 | ||
1332 | |||
1333 | /** | 1176 | /** |
1334 | * Queue a request to change a statistic. | 1177 | * Queue a request to change a statistic. |
1335 | * | 1178 | * |
@@ -1421,7 +1264,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, | |||
1421 | ai->msize = nsize; | 1264 | ai->msize = nsize; |
1422 | ai->value = value; | 1265 | ai->value = value; |
1423 | ai->type = type; | 1266 | ai->type = type; |
1424 | GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, | 1267 | GNUNET_CONTAINER_DLL_insert_tail (h->action_head, |
1268 | h->action_tail, | ||
1425 | ai); | 1269 | ai); |
1426 | schedule_action (h); | 1270 | schedule_action (h); |
1427 | } | 1271 | } |
@@ -1445,7 +1289,11 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle, | |||
1445 | if (NULL == handle) | 1289 | if (NULL == handle) |
1446 | return; | 1290 | return; |
1447 | GNUNET_assert (GNUNET_NO == handle->do_destroy); | 1291 | GNUNET_assert (GNUNET_NO == handle->do_destroy); |
1448 | add_setter_action (handle, name, make_persistent, value, ACTION_SET); | 1292 | add_setter_action (handle, |
1293 | name, | ||
1294 | make_persistent, | ||
1295 | value, | ||
1296 | ACTION_SET); | ||
1449 | } | 1297 | } |
1450 | 1298 | ||
1451 | 1299 | ||