diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-18 20:26:06 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-18 20:26:06 +0000 |
commit | 4c59a7e576e8ac8a287bba8f180c1ea87678230a (patch) | |
tree | a5212b2e0ea7254ace384dd88ba12866dc4678f8 /src/peerstore | |
parent | e35e4dbeb9c349981d0092542aa1e3813bf844c9 (diff) | |
download | gnunet-4c59a7e576e8ac8a287bba8f180c1ea87678230a.tar.gz gnunet-4c59a7e576e8ac8a287bba8f180c1ea87678230a.zip |
adapt peerstore API to new MQ API
Diffstat (limited to 'src/peerstore')
-rw-r--r-- | src/peerstore/peerstore_api.c | 391 | ||||
-rw-r--r-- | src/peerstore/test_plugin_peerstore_flat.conf | 2 |
2 files changed, 224 insertions, 169 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 0339ff93a..b1f4695ec 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -259,24 +259,6 @@ struct GNUNET_PEERSTORE_WatchContext | |||
259 | /******************************************************************************/ | 259 | /******************************************************************************/ |
260 | 260 | ||
261 | /** | 261 | /** |
262 | * When a response for iterate request is received | ||
263 | * | ||
264 | * @param cls a 'struct GNUNET_PEERSTORE_Handle *' | ||
265 | * @param msg message received, NULL on timeout or fatal error | ||
266 | */ | ||
267 | static void | ||
268 | handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg); | ||
269 | |||
270 | /** | ||
271 | * When a watch record is received | ||
272 | * | ||
273 | * @param cls a 'struct GNUNET_PEERSTORE_Handle *' | ||
274 | * @param msg message received, NULL on timeout or fatal error | ||
275 | */ | ||
276 | static void | ||
277 | handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg); | ||
278 | |||
279 | /** | ||
280 | * Close the existing connection to PEERSTORE and reconnect. | 262 | * Close the existing connection to PEERSTORE and reconnect. |
281 | * | 263 | * |
282 | * @param h handle to the service | 264 | * @param h handle to the service |
@@ -305,28 +287,19 @@ store_request_sent (void *cls) | |||
305 | } | 287 | } |
306 | 288 | ||
307 | 289 | ||
308 | /** | ||
309 | * MQ message handlers | ||
310 | */ | ||
311 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
312 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, | ||
313 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, | ||
314 | sizeof (struct GNUNET_MessageHeader)}, | ||
315 | {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, | ||
316 | GNUNET_MQ_HANDLERS_END | ||
317 | }; | ||
318 | |||
319 | /******************************************************************************/ | 290 | /******************************************************************************/ |
320 | /******************* CONNECTION FUNCTIONS *********************/ | 291 | /******************* CONNECTION FUNCTIONS *********************/ |
321 | /******************************************************************************/ | 292 | /******************************************************************************/ |
322 | 293 | ||
323 | static void | 294 | static void |
324 | handle_client_error (void *cls, enum GNUNET_MQ_Error error) | 295 | handle_client_error (void *cls, |
296 | enum GNUNET_MQ_Error error) | ||
325 | { | 297 | { |
326 | struct GNUNET_PEERSTORE_Handle *h = cls; | 298 | struct GNUNET_PEERSTORE_Handle *h = cls; |
327 | 299 | ||
328 | LOG (GNUNET_ERROR_TYPE_ERROR, | 300 | LOG (GNUNET_ERROR_TYPE_ERROR, |
329 | _("Received an error notification from MQ of type: %d\n"), error); | 301 | _("Received an error notification from MQ of type: %d\n"), |
302 | error); | ||
330 | reconnect (h); | 303 | reconnect (h); |
331 | } | 304 | } |
332 | 305 | ||
@@ -378,80 +351,6 @@ iterate_timeout (void *cls) | |||
378 | 351 | ||
379 | 352 | ||
380 | /** | 353 | /** |
381 | * Close the existing connection to PEERSTORE and reconnect. | ||
382 | * | ||
383 | * @param h handle to the service | ||
384 | */ | ||
385 | static void | ||
386 | reconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
387 | { | ||
388 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
389 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
390 | GNUNET_PEERSTORE_Processor icb; | ||
391 | void *icb_cls; | ||
392 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
393 | struct GNUNET_MQ_Envelope *ev; | ||
394 | |||
395 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
396 | "Reconnecting...\n"); | ||
397 | for (ic = h->iterate_head; NULL != ic; ic = next) | ||
398 | { | ||
399 | next = ic->next; | ||
400 | if (GNUNET_YES == ic->iterating) | ||
401 | { | ||
402 | icb = ic->callback; | ||
403 | icb_cls = ic->callback_cls; | ||
404 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
405 | if (NULL != icb) | ||
406 | icb (icb_cls, | ||
407 | NULL, | ||
408 | "Iteration canceled due to reconnection"); | ||
409 | } | ||
410 | } | ||
411 | if (NULL != h->mq) | ||
412 | { | ||
413 | GNUNET_MQ_destroy (h->mq); | ||
414 | h->mq = NULL; | ||
415 | } | ||
416 | if (NULL != h->client) | ||
417 | { | ||
418 | GNUNET_CLIENT_disconnect (h->client); | ||
419 | h->client = NULL; | ||
420 | } | ||
421 | |||
422 | h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); | ||
423 | GNUNET_assert (NULL != h->client); | ||
424 | h->mq = | ||
425 | GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers, | ||
426 | &handle_client_error, h); | ||
427 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
428 | "Resending pending requests after reconnect.\n"); | ||
429 | if (NULL != h->watches) | ||
430 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h); | ||
431 | for (ic = h->iterate_head; NULL != ic; ic = ic->next) | ||
432 | { | ||
433 | ev = | ||
434 | PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key, | ||
435 | NULL, 0, NULL, 0, | ||
436 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
437 | GNUNET_MQ_send (h->mq, ev); | ||
438 | ic->timeout_task = | ||
439 | GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic); | ||
440 | } | ||
441 | for (sc = h->store_head; NULL != sc; sc = sc->next) | ||
442 | { | ||
443 | ev = | ||
444 | PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key, | ||
445 | sc->value, sc->size, &sc->expiry, | ||
446 | sc->options, | ||
447 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
448 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | ||
449 | GNUNET_MQ_send (h->mq, ev); | ||
450 | } | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Iterator over watch requests to cancel them. | 354 | * Iterator over watch requests to cancel them. |
456 | * | 355 | * |
457 | * @param cls unsused | 356 | * @param cls unsused |
@@ -460,7 +359,9 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
460 | * @return #GNUNET_YES to continue iteration | 359 | * @return #GNUNET_YES to continue iteration |
461 | */ | 360 | */ |
462 | static int | 361 | static int |
463 | destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value) | 362 | destroy_watch (void *cls, |
363 | const struct GNUNET_HashCode *key, | ||
364 | void *value) | ||
464 | { | 365 | { |
465 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | 366 | struct GNUNET_PEERSTORE_WatchContext *wc = value; |
466 | 367 | ||
@@ -514,16 +415,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
514 | } | 415 | } |
515 | h->cfg = cfg; | 416 | h->cfg = cfg; |
516 | h->disconnecting = GNUNET_NO; | 417 | h->disconnecting = GNUNET_NO; |
517 | h->mq = | 418 | reconnect (h); |
518 | GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers, | ||
519 | &handle_client_error, h); | ||
520 | if (NULL == h->mq) | ||
521 | { | ||
522 | GNUNET_CLIENT_disconnect (h->client); | ||
523 | GNUNET_free (h); | ||
524 | return NULL; | ||
525 | } | ||
526 | LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n"); | ||
527 | return h; | 419 | return h; |
528 | } | 420 | } |
529 | 421 | ||
@@ -615,11 +507,13 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | |||
615 | struct GNUNET_PEERSTORE_StoreContext * | 507 | struct GNUNET_PEERSTORE_StoreContext * |
616 | GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | 508 | GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, |
617 | const char *sub_system, | 509 | const char *sub_system, |
618 | const struct GNUNET_PeerIdentity *peer, const char *key, | 510 | const struct GNUNET_PeerIdentity *peer, |
511 | const char *key, | ||
619 | const void *value, size_t size, | 512 | const void *value, size_t size, |
620 | struct GNUNET_TIME_Absolute expiry, | 513 | struct GNUNET_TIME_Absolute expiry, |
621 | enum GNUNET_PEERSTORE_StoreOption options, | 514 | enum GNUNET_PEERSTORE_StoreOption options, |
622 | GNUNET_PEERSTORE_Continuation cont, void *cont_cls) | 515 | GNUNET_PEERSTORE_Continuation cont, |
516 | void *cont_cls) | ||
623 | { | 517 | { |
624 | struct GNUNET_MQ_Envelope *ev; | 518 | struct GNUNET_MQ_Envelope *ev; |
625 | struct GNUNET_PEERSTORE_StoreContext *sc; | 519 | struct GNUNET_PEERSTORE_StoreContext *sc; |
@@ -655,21 +549,21 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
655 | /******************* ITERATE FUNCTIONS *********************/ | 549 | /******************* ITERATE FUNCTIONS *********************/ |
656 | /******************************************************************************/ | 550 | /******************************************************************************/ |
657 | 551 | ||
552 | |||
658 | /** | 553 | /** |
659 | * When a response for iterate request is received | 554 | * When a response for iterate request is received |
660 | * | 555 | * |
661 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | 556 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` |
662 | * @param msg message received, NULL on timeout or fatal error | 557 | * @param msg message received |
663 | */ | 558 | */ |
664 | static void | 559 | static void |
665 | handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg) | 560 | handle_iterate_end (void *cls, |
561 | const struct GNUNET_MessageHeader *msg) | ||
666 | { | 562 | { |
667 | struct GNUNET_PEERSTORE_Handle *h = cls; | 563 | struct GNUNET_PEERSTORE_Handle *h = cls; |
668 | struct GNUNET_PEERSTORE_IterateContext *ic; | 564 | struct GNUNET_PEERSTORE_IterateContext *ic; |
669 | GNUNET_PEERSTORE_Processor callback; | 565 | GNUNET_PEERSTORE_Processor callback; |
670 | void *callback_cls; | 566 | void *callback_cls; |
671 | uint16_t msg_type; | ||
672 | struct GNUNET_PEERSTORE_Record *record; | ||
673 | 567 | ||
674 | ic = h->iterate_head; | 568 | ic = h->iterate_head; |
675 | if (NULL == ic) | 569 | if (NULL == ic) |
@@ -679,37 +573,73 @@ handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg) | |||
679 | reconnect (h); | 573 | reconnect (h); |
680 | return; | 574 | return; |
681 | } | 575 | } |
682 | ic->iterating = GNUNET_YES; | ||
683 | callback = ic->callback; | 576 | callback = ic->callback; |
684 | callback_cls = ic->callback_cls; | 577 | callback_cls = ic->callback_cls; |
685 | if (NULL == msg) /* Connection error */ | 578 | ic->iterating = GNUNET_NO; |
579 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
580 | if (NULL != callback) | ||
581 | callback (callback_cls, NULL, NULL); | ||
582 | } | ||
583 | |||
584 | |||
585 | /** | ||
586 | * When a response for iterate request is received, check the | ||
587 | * message is well-formed. | ||
588 | * | ||
589 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
590 | * @param msg message received | ||
591 | */ | ||
592 | static int | ||
593 | check_iterate_result (void *cls, | ||
594 | const struct GNUNET_MessageHeader *msg) | ||
595 | { | ||
596 | /* we defer validation to #handle_iterate_result */ | ||
597 | return GNUNET_OK; | ||
598 | } | ||
599 | |||
600 | |||
601 | /** | ||
602 | * When a response for iterate request is received | ||
603 | * | ||
604 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
605 | * @param msg message received | ||
606 | */ | ||
607 | static void | ||
608 | handle_iterate_result (void *cls, | ||
609 | const struct GNUNET_MessageHeader *msg) | ||
610 | { | ||
611 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
612 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
613 | GNUNET_PEERSTORE_Processor callback; | ||
614 | void *callback_cls; | ||
615 | struct GNUNET_PEERSTORE_Record *record; | ||
616 | |||
617 | ic = h->iterate_head; | ||
618 | if (NULL == ic) | ||
686 | { | 619 | { |
687 | if (NULL != callback) | 620 | LOG (GNUNET_ERROR_TYPE_ERROR, |
688 | callback (callback_cls, NULL, | 621 | _("Unexpected iteration response, this should not happen.\n")); |
689 | _("Error communicating with `PEERSTORE' service.")); | ||
690 | reconnect (h); | 622 | reconnect (h); |
691 | return; | 623 | return; |
692 | } | 624 | } |
693 | msg_type = ntohs (msg->type); | 625 | ic->iterating = GNUNET_YES; |
694 | if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) | 626 | callback = ic->callback; |
695 | { | 627 | callback_cls = ic->callback_cls; |
696 | ic->iterating = GNUNET_NO; | 628 | if (NULL == callback) |
697 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
698 | if (NULL != callback) | ||
699 | callback (callback_cls, NULL, NULL); | ||
700 | return; | 629 | return; |
630 | record = PEERSTORE_parse_record_message (msg); | ||
631 | if (NULL == record) | ||
632 | { | ||
633 | callback (callback_cls, | ||
634 | NULL, | ||
635 | _("Received a malformed response from service.")); | ||
701 | } | 636 | } |
702 | if (NULL != callback) | 637 | else |
703 | { | 638 | { |
704 | record = PEERSTORE_parse_record_message (msg); | 639 | callback (callback_cls, |
705 | if (NULL == record) | 640 | record, |
706 | callback (callback_cls, NULL, | 641 | NULL); |
707 | _("Received a malformed response from service.")); | 642 | PEERSTORE_destroy_record (record); |
708 | else | ||
709 | { | ||
710 | callback (callback_cls, record, NULL); | ||
711 | PEERSTORE_destroy_record (record); | ||
712 | } | ||
713 | } | 643 | } |
714 | } | 644 | } |
715 | 645 | ||
@@ -734,8 +664,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) | |||
734 | ic->h->iterate_tail, | 664 | ic->h->iterate_tail, |
735 | ic); | 665 | ic); |
736 | GNUNET_free (ic->sub_system); | 666 | GNUNET_free (ic->sub_system); |
737 | if (NULL != ic->key) | 667 | GNUNET_free_non_null (ic->key); |
738 | GNUNET_free (ic->key); | ||
739 | GNUNET_free (ic); | 668 | GNUNET_free (ic); |
740 | } | 669 | } |
741 | else | 670 | else |
@@ -759,7 +688,8 @@ struct GNUNET_PEERSTORE_IterateContext * | |||
759 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | 688 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, |
760 | const char *sub_system, | 689 | const char *sub_system, |
761 | const struct GNUNET_PeerIdentity *peer, | 690 | const struct GNUNET_PeerIdentity *peer, |
762 | const char *key, struct GNUNET_TIME_Relative timeout, | 691 | const char *key, |
692 | struct GNUNET_TIME_Relative timeout, | ||
763 | GNUNET_PEERSTORE_Processor callback, | 693 | GNUNET_PEERSTORE_Processor callback, |
764 | void *callback_cls) | 694 | void *callback_cls) |
765 | { | 695 | { |
@@ -784,10 +714,13 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
784 | h->iterate_tail, | 714 | h->iterate_tail, |
785 | ic); | 715 | ic); |
786 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 716 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
787 | "Sending an iterate request for sub system `%s'\n", sub_system); | 717 | "Sending an iterate request for sub system `%s'\n", |
718 | sub_system); | ||
788 | GNUNET_MQ_send (h->mq, ev); | 719 | GNUNET_MQ_send (h->mq, ev); |
789 | ic->timeout_task = | 720 | ic->timeout_task = |
790 | GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic); | 721 | GNUNET_SCHEDULER_add_delayed (timeout, |
722 | &iterate_timeout, | ||
723 | ic); | ||
791 | return ic; | 724 | return ic; |
792 | } | 725 | } |
793 | 726 | ||
@@ -797,32 +730,50 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
797 | /******************************************************************************/ | 730 | /******************************************************************************/ |
798 | 731 | ||
799 | /** | 732 | /** |
800 | * When a watch record is received | 733 | * When a watch record is received, validate it is well-formed. |
734 | * | ||
735 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
736 | * @param msg message received | ||
737 | */ | ||
738 | static int | ||
739 | check_watch_record (void *cls, | ||
740 | const struct GNUNET_MessageHeader *msg) | ||
741 | { | ||
742 | /* we defer validation to #handle_watch_result */ | ||
743 | return GNUNET_OK; | ||
744 | } | ||
745 | |||
746 | |||
747 | /** | ||
748 | * When a watch record is received, process it. | ||
801 | * | 749 | * |
802 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | 750 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` |
803 | * @param msg message received, NULL on timeout or fatal error | 751 | * @param msg message received |
804 | */ | 752 | */ |
805 | static void | 753 | static void |
806 | handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) | 754 | handle_watch_record (void *cls, |
755 | const struct GNUNET_MessageHeader *msg) | ||
807 | { | 756 | { |
808 | struct GNUNET_PEERSTORE_Handle *h = cls; | 757 | struct GNUNET_PEERSTORE_Handle *h = cls; |
809 | struct GNUNET_PEERSTORE_Record *record; | 758 | struct GNUNET_PEERSTORE_Record *record; |
810 | struct GNUNET_HashCode keyhash; | 759 | struct GNUNET_HashCode keyhash; |
811 | struct GNUNET_PEERSTORE_WatchContext *wc; | 760 | struct GNUNET_PEERSTORE_WatchContext *wc; |
812 | 761 | ||
813 | if (NULL == msg) | 762 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
763 | "Received a watch record from service.\n"); | ||
764 | record = PEERSTORE_parse_record_message (msg); | ||
765 | if (NULL == record) | ||
814 | { | 766 | { |
815 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
816 | _ | ||
817 | ("Problem receiving a watch response, no way to determine which request.\n")); | ||
818 | reconnect (h); | 767 | reconnect (h); |
819 | return; | 768 | return; |
820 | } | 769 | } |
821 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); | 770 | PEERSTORE_hash_key (record->sub_system, |
822 | record = PEERSTORE_parse_record_message (msg); | 771 | record->peer, |
823 | PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash); | 772 | record->key, |
773 | &keyhash); | ||
824 | // FIXME: what if there are multiple watches for the same key? | 774 | // FIXME: what if there are multiple watches for the same key? |
825 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); | 775 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, |
776 | &keyhash); | ||
826 | if (NULL == wc) | 777 | if (NULL == wc) |
827 | { | 778 | { |
828 | LOG (GNUNET_ERROR_TYPE_ERROR, | 779 | LOG (GNUNET_ERROR_TYPE_ERROR, |
@@ -832,12 +783,115 @@ handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) | |||
832 | return; | 783 | return; |
833 | } | 784 | } |
834 | if (NULL != wc->callback) | 785 | if (NULL != wc->callback) |
835 | wc->callback (wc->callback_cls, record, NULL); | 786 | wc->callback (wc->callback_cls, |
787 | record, | ||
788 | NULL); | ||
836 | PEERSTORE_destroy_record (record); | 789 | PEERSTORE_destroy_record (record); |
837 | } | 790 | } |
838 | 791 | ||
839 | 792 | ||
840 | /** | 793 | /** |
794 | * Close the existing connection to PEERSTORE and reconnect. | ||
795 | * | ||
796 | * @param h handle to the service | ||
797 | */ | ||
798 | static void | ||
799 | reconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
800 | { | ||
801 | GNUNET_MQ_hd_fixed_size (iterate_end, | ||
802 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, | ||
803 | struct GNUNET_MessageHeader); | ||
804 | GNUNET_MQ_hd_var_size (iterate_result, | ||
805 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, | ||
806 | struct GNUNET_MessageHeader); | ||
807 | GNUNET_MQ_hd_var_size (watch_record, | ||
808 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, | ||
809 | struct GNUNET_MessageHeader); | ||
810 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
811 | make_iterate_end_handler (h), | ||
812 | make_iterate_result_handler (h), | ||
813 | make_watch_record_handler (h), | ||
814 | GNUNET_MQ_handler_end () | ||
815 | }; | ||
816 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
817 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
818 | GNUNET_PEERSTORE_Processor icb; | ||
819 | void *icb_cls; | ||
820 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
821 | struct GNUNET_MQ_Envelope *ev; | ||
822 | |||
823 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
824 | "Reconnecting...\n"); | ||
825 | for (ic = h->iterate_head; NULL != ic; ic = next) | ||
826 | { | ||
827 | next = ic->next; | ||
828 | if (GNUNET_YES == ic->iterating) | ||
829 | { | ||
830 | icb = ic->callback; | ||
831 | icb_cls = ic->callback_cls; | ||
832 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
833 | if (NULL != icb) | ||
834 | icb (icb_cls, | ||
835 | NULL, | ||
836 | "Iteration canceled due to reconnection"); | ||
837 | } | ||
838 | } | ||
839 | if (NULL != h->mq) | ||
840 | { | ||
841 | GNUNET_MQ_destroy (h->mq); | ||
842 | h->mq = NULL; | ||
843 | } | ||
844 | if (NULL != h->client) | ||
845 | { | ||
846 | GNUNET_CLIENT_disconnect (h->client); | ||
847 | h->client = NULL; | ||
848 | } | ||
849 | h->client = GNUNET_CLIENT_connect ("peerstore", | ||
850 | h->cfg); | ||
851 | GNUNET_assert (NULL != h->client); | ||
852 | h->mq = GNUNET_MQ_queue_for_connection_client (h->client, | ||
853 | mq_handlers, | ||
854 | &handle_client_error, h); | ||
855 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
856 | "Resending pending requests after reconnect.\n"); | ||
857 | if (NULL != h->watches) | ||
858 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, | ||
859 | &rewatch_it, | ||
860 | h); | ||
861 | for (ic = h->iterate_head; NULL != ic; ic = ic->next) | ||
862 | { | ||
863 | ev = PEERSTORE_create_record_mq_envelope (ic->sub_system, | ||
864 | &ic->peer, | ||
865 | ic->key, | ||
866 | NULL, 0, | ||
867 | NULL, 0, | ||
868 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
869 | GNUNET_MQ_send (h->mq, ev); | ||
870 | ic->timeout_task | ||
871 | = GNUNET_SCHEDULER_add_delayed (ic->timeout, | ||
872 | &iterate_timeout, | ||
873 | ic); | ||
874 | } | ||
875 | for (sc = h->store_head; NULL != sc; sc = sc->next) | ||
876 | { | ||
877 | ev = PEERSTORE_create_record_mq_envelope (sc->sub_system, | ||
878 | &sc->peer, | ||
879 | sc->key, | ||
880 | sc->value, | ||
881 | sc->size, | ||
882 | &sc->expiry, | ||
883 | sc->options, | ||
884 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
885 | GNUNET_MQ_notify_sent (ev, | ||
886 | &store_request_sent, | ||
887 | sc); | ||
888 | GNUNET_MQ_send (h->mq, | ||
889 | ev); | ||
890 | } | ||
891 | } | ||
892 | |||
893 | |||
894 | /** | ||
841 | * Cancel a watch request | 895 | * Cancel a watch request |
842 | * | 896 | * |
843 | * @param wc handle to the watch request | 897 | * @param wc handle to the watch request |
@@ -851,7 +905,8 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc) | |||
851 | 905 | ||
852 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 906 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
853 | "Canceling watch.\n"); | 907 | "Canceling watch.\n"); |
854 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | 908 | ev = GNUNET_MQ_msg (hm, |
909 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | ||
855 | hm->keyhash = wc->keyhash; | 910 | hm->keyhash = wc->keyhash; |
856 | GNUNET_MQ_send (h->mq, ev); | 911 | GNUNET_MQ_send (h->mq, ev); |
857 | GNUNET_CONTAINER_multihashmap_remove (h->watches, | 912 | GNUNET_CONTAINER_multihashmap_remove (h->watches, |
@@ -876,17 +931,17 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc) | |||
876 | struct GNUNET_PEERSTORE_WatchContext * | 931 | struct GNUNET_PEERSTORE_WatchContext * |
877 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | 932 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, |
878 | const char *sub_system, | 933 | const char *sub_system, |
879 | const struct GNUNET_PeerIdentity *peer, const char *key, | 934 | const struct GNUNET_PeerIdentity *peer, |
880 | GNUNET_PEERSTORE_Processor callback, void *callback_cls) | 935 | const char *key, |
936 | GNUNET_PEERSTORE_Processor callback, | ||
937 | void *callback_cls) | ||
881 | { | 938 | { |
882 | struct GNUNET_MQ_Envelope *ev; | 939 | struct GNUNET_MQ_Envelope *ev; |
883 | struct StoreKeyHashMessage *hm; | 940 | struct StoreKeyHashMessage *hm; |
884 | struct GNUNET_PEERSTORE_WatchContext *wc; | 941 | struct GNUNET_PEERSTORE_WatchContext *wc; |
885 | 942 | ||
886 | GNUNET_assert (NULL != sub_system); | 943 | ev = GNUNET_MQ_msg (hm, |
887 | GNUNET_assert (NULL != peer); | 944 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); |
888 | GNUNET_assert (NULL != key); | ||
889 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
890 | PEERSTORE_hash_key (sub_system, | 945 | PEERSTORE_hash_key (sub_system, |
891 | peer, | 946 | peer, |
892 | key, | 947 | key, |
diff --git a/src/peerstore/test_plugin_peerstore_flat.conf b/src/peerstore/test_plugin_peerstore_flat.conf index 48cc9940d..f0548e5c8 100644 --- a/src/peerstore/test_plugin_peerstore_flat.conf +++ b/src/peerstore/test_plugin_peerstore_flat.conf | |||
@@ -2,4 +2,4 @@ | |||
2 | FILENAME = /tmp/gnunet-test-plugin-namestore-flat/flatdb | 2 | FILENAME = /tmp/gnunet-test-plugin-namestore-flat/flatdb |
3 | 3 | ||
4 | [peerstore] | 4 | [peerstore] |
5 | PREFIX = valgrind --log-file=/home/schanzen/dev/gnunet/src/peerstore/vg_log | 5 | # PREFIX = valgrind --log-file=/home/schanzen/dev/gnunet/src/peerstore/vg_log |