aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-18 20:26:06 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-18 20:26:06 +0000
commit4c59a7e576e8ac8a287bba8f180c1ea87678230a (patch)
treea5212b2e0ea7254ace384dd88ba12866dc4678f8 /src/peerstore
parente35e4dbeb9c349981d0092542aa1e3813bf844c9 (diff)
downloadgnunet-4c59a7e576e8ac8a287bba8f180c1ea87678230a.tar.gz
gnunet-4c59a7e576e8ac8a287bba8f180c1ea87678230a.zip
adapt peerstore API to new MQ API
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/peerstore_api.c391
-rw-r--r--src/peerstore/test_plugin_peerstore_flat.conf2
2 files changed, 224 insertions, 169 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 0339ff93a..b1f4695ec 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -259,24 +259,6 @@ struct GNUNET_PEERSTORE_WatchContext
259/******************************************************************************/ 259/******************************************************************************/
260 260
261/** 261/**
262 * When a response for iterate request is received
263 *
264 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
265 * @param msg message received, NULL on timeout or fatal error
266 */
267static void
268handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
269
270/**
271 * When a watch record is received
272 *
273 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
274 * @param msg message received, NULL on timeout or fatal error
275 */
276static void
277handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
278
279/**
280 * Close the existing connection to PEERSTORE and reconnect. 262 * Close the existing connection to PEERSTORE and reconnect.
281 * 263 *
282 * @param h handle to the service 264 * @param h handle to the service
@@ -305,28 +287,19 @@ store_request_sent (void *cls)
305} 287}
306 288
307 289
308/**
309 * MQ message handlers
310 */
311static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
312 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
313 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
314 sizeof (struct GNUNET_MessageHeader)},
315 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
316 GNUNET_MQ_HANDLERS_END
317};
318
319/******************************************************************************/ 290/******************************************************************************/
320/******************* CONNECTION FUNCTIONS *********************/ 291/******************* CONNECTION FUNCTIONS *********************/
321/******************************************************************************/ 292/******************************************************************************/
322 293
323static void 294static void
324handle_client_error (void *cls, enum GNUNET_MQ_Error error) 295handle_client_error (void *cls,
296 enum GNUNET_MQ_Error error)
325{ 297{
326 struct GNUNET_PEERSTORE_Handle *h = cls; 298 struct GNUNET_PEERSTORE_Handle *h = cls;
327 299
328 LOG (GNUNET_ERROR_TYPE_ERROR, 300 LOG (GNUNET_ERROR_TYPE_ERROR,
329 _("Received an error notification from MQ of type: %d\n"), error); 301 _("Received an error notification from MQ of type: %d\n"),
302 error);
330 reconnect (h); 303 reconnect (h);
331} 304}
332 305
@@ -378,80 +351,6 @@ iterate_timeout (void *cls)
378 351
379 352
380/** 353/**
381 * Close the existing connection to PEERSTORE and reconnect.
382 *
383 * @param h handle to the service
384 */
385static void
386reconnect (struct GNUNET_PEERSTORE_Handle *h)
387{
388 struct GNUNET_PEERSTORE_IterateContext *ic;
389 struct GNUNET_PEERSTORE_IterateContext *next;
390 GNUNET_PEERSTORE_Processor icb;
391 void *icb_cls;
392 struct GNUNET_PEERSTORE_StoreContext *sc;
393 struct GNUNET_MQ_Envelope *ev;
394
395 LOG (GNUNET_ERROR_TYPE_DEBUG,
396 "Reconnecting...\n");
397 for (ic = h->iterate_head; NULL != ic; ic = next)
398 {
399 next = ic->next;
400 if (GNUNET_YES == ic->iterating)
401 {
402 icb = ic->callback;
403 icb_cls = ic->callback_cls;
404 GNUNET_PEERSTORE_iterate_cancel (ic);
405 if (NULL != icb)
406 icb (icb_cls,
407 NULL,
408 "Iteration canceled due to reconnection");
409 }
410 }
411 if (NULL != h->mq)
412 {
413 GNUNET_MQ_destroy (h->mq);
414 h->mq = NULL;
415 }
416 if (NULL != h->client)
417 {
418 GNUNET_CLIENT_disconnect (h->client);
419 h->client = NULL;
420 }
421
422 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
423 GNUNET_assert (NULL != h->client);
424 h->mq =
425 GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
426 &handle_client_error, h);
427 LOG (GNUNET_ERROR_TYPE_DEBUG,
428 "Resending pending requests after reconnect.\n");
429 if (NULL != h->watches)
430 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
431 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
432 {
433 ev =
434 PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
435 NULL, 0, NULL, 0,
436 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
437 GNUNET_MQ_send (h->mq, ev);
438 ic->timeout_task =
439 GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
440 }
441 for (sc = h->store_head; NULL != sc; sc = sc->next)
442 {
443 ev =
444 PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
445 sc->value, sc->size, &sc->expiry,
446 sc->options,
447 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
448 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
449 GNUNET_MQ_send (h->mq, ev);
450 }
451}
452
453
454/**
455 * Iterator over watch requests to cancel them. 354 * Iterator over watch requests to cancel them.
456 * 355 *
457 * @param cls unsused 356 * @param cls unsused
@@ -460,7 +359,9 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
460 * @return #GNUNET_YES to continue iteration 359 * @return #GNUNET_YES to continue iteration
461 */ 360 */
462static int 361static int
463destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value) 362destroy_watch (void *cls,
363 const struct GNUNET_HashCode *key,
364 void *value)
464{ 365{
465 struct GNUNET_PEERSTORE_WatchContext *wc = value; 366 struct GNUNET_PEERSTORE_WatchContext *wc = value;
466 367
@@ -514,16 +415,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
514 } 415 }
515 h->cfg = cfg; 416 h->cfg = cfg;
516 h->disconnecting = GNUNET_NO; 417 h->disconnecting = GNUNET_NO;
517 h->mq = 418 reconnect (h);
518 GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
519 &handle_client_error, h);
520 if (NULL == h->mq)
521 {
522 GNUNET_CLIENT_disconnect (h->client);
523 GNUNET_free (h);
524 return NULL;
525 }
526 LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
527 return h; 419 return h;
528} 420}
529 421
@@ -615,11 +507,13 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
615struct GNUNET_PEERSTORE_StoreContext * 507struct GNUNET_PEERSTORE_StoreContext *
616GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, 508GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
617 const char *sub_system, 509 const char *sub_system,
618 const struct GNUNET_PeerIdentity *peer, const char *key, 510 const struct GNUNET_PeerIdentity *peer,
511 const char *key,
619 const void *value, size_t size, 512 const void *value, size_t size,
620 struct GNUNET_TIME_Absolute expiry, 513 struct GNUNET_TIME_Absolute expiry,
621 enum GNUNET_PEERSTORE_StoreOption options, 514 enum GNUNET_PEERSTORE_StoreOption options,
622 GNUNET_PEERSTORE_Continuation cont, void *cont_cls) 515 GNUNET_PEERSTORE_Continuation cont,
516 void *cont_cls)
623{ 517{
624 struct GNUNET_MQ_Envelope *ev; 518 struct GNUNET_MQ_Envelope *ev;
625 struct GNUNET_PEERSTORE_StoreContext *sc; 519 struct GNUNET_PEERSTORE_StoreContext *sc;
@@ -655,21 +549,21 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
655/******************* ITERATE FUNCTIONS *********************/ 549/******************* ITERATE FUNCTIONS *********************/
656/******************************************************************************/ 550/******************************************************************************/
657 551
552
658/** 553/**
659 * When a response for iterate request is received 554 * When a response for iterate request is received
660 * 555 *
661 * @param cls a `struct GNUNET_PEERSTORE_Handle *` 556 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
662 * @param msg message received, NULL on timeout or fatal error 557 * @param msg message received
663 */ 558 */
664static void 559static void
665handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg) 560handle_iterate_end (void *cls,
561 const struct GNUNET_MessageHeader *msg)
666{ 562{
667 struct GNUNET_PEERSTORE_Handle *h = cls; 563 struct GNUNET_PEERSTORE_Handle *h = cls;
668 struct GNUNET_PEERSTORE_IterateContext *ic; 564 struct GNUNET_PEERSTORE_IterateContext *ic;
669 GNUNET_PEERSTORE_Processor callback; 565 GNUNET_PEERSTORE_Processor callback;
670 void *callback_cls; 566 void *callback_cls;
671 uint16_t msg_type;
672 struct GNUNET_PEERSTORE_Record *record;
673 567
674 ic = h->iterate_head; 568 ic = h->iterate_head;
675 if (NULL == ic) 569 if (NULL == ic)
@@ -679,37 +573,73 @@ handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
679 reconnect (h); 573 reconnect (h);
680 return; 574 return;
681 } 575 }
682 ic->iterating = GNUNET_YES;
683 callback = ic->callback; 576 callback = ic->callback;
684 callback_cls = ic->callback_cls; 577 callback_cls = ic->callback_cls;
685 if (NULL == msg) /* Connection error */ 578 ic->iterating = GNUNET_NO;
579 GNUNET_PEERSTORE_iterate_cancel (ic);
580 if (NULL != callback)
581 callback (callback_cls, NULL, NULL);
582}
583
584
585/**
586 * When a response for iterate request is received, check the
587 * message is well-formed.
588 *
589 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
590 * @param msg message received
591 */
592static int
593check_iterate_result (void *cls,
594 const struct GNUNET_MessageHeader *msg)
595{
596 /* we defer validation to #handle_iterate_result */
597 return GNUNET_OK;
598}
599
600
601/**
602 * When a response for iterate request is received
603 *
604 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
605 * @param msg message received
606 */
607static void
608handle_iterate_result (void *cls,
609 const struct GNUNET_MessageHeader *msg)
610{
611 struct GNUNET_PEERSTORE_Handle *h = cls;
612 struct GNUNET_PEERSTORE_IterateContext *ic;
613 GNUNET_PEERSTORE_Processor callback;
614 void *callback_cls;
615 struct GNUNET_PEERSTORE_Record *record;
616
617 ic = h->iterate_head;
618 if (NULL == ic)
686 { 619 {
687 if (NULL != callback) 620 LOG (GNUNET_ERROR_TYPE_ERROR,
688 callback (callback_cls, NULL, 621 _("Unexpected iteration response, this should not happen.\n"));
689 _("Error communicating with `PEERSTORE' service."));
690 reconnect (h); 622 reconnect (h);
691 return; 623 return;
692 } 624 }
693 msg_type = ntohs (msg->type); 625 ic->iterating = GNUNET_YES;
694 if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) 626 callback = ic->callback;
695 { 627 callback_cls = ic->callback_cls;
696 ic->iterating = GNUNET_NO; 628 if (NULL == callback)
697 GNUNET_PEERSTORE_iterate_cancel (ic);
698 if (NULL != callback)
699 callback (callback_cls, NULL, NULL);
700 return; 629 return;
630 record = PEERSTORE_parse_record_message (msg);
631 if (NULL == record)
632 {
633 callback (callback_cls,
634 NULL,
635 _("Received a malformed response from service."));
701 } 636 }
702 if (NULL != callback) 637 else
703 { 638 {
704 record = PEERSTORE_parse_record_message (msg); 639 callback (callback_cls,
705 if (NULL == record) 640 record,
706 callback (callback_cls, NULL, 641 NULL);
707 _("Received a malformed response from service.")); 642 PEERSTORE_destroy_record (record);
708 else
709 {
710 callback (callback_cls, record, NULL);
711 PEERSTORE_destroy_record (record);
712 }
713 } 643 }
714} 644}
715 645
@@ -734,8 +664,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
734 ic->h->iterate_tail, 664 ic->h->iterate_tail,
735 ic); 665 ic);
736 GNUNET_free (ic->sub_system); 666 GNUNET_free (ic->sub_system);
737 if (NULL != ic->key) 667 GNUNET_free_non_null (ic->key);
738 GNUNET_free (ic->key);
739 GNUNET_free (ic); 668 GNUNET_free (ic);
740 } 669 }
741 else 670 else
@@ -759,7 +688,8 @@ struct GNUNET_PEERSTORE_IterateContext *
759GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, 688GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
760 const char *sub_system, 689 const char *sub_system,
761 const struct GNUNET_PeerIdentity *peer, 690 const struct GNUNET_PeerIdentity *peer,
762 const char *key, struct GNUNET_TIME_Relative timeout, 691 const char *key,
692 struct GNUNET_TIME_Relative timeout,
763 GNUNET_PEERSTORE_Processor callback, 693 GNUNET_PEERSTORE_Processor callback,
764 void *callback_cls) 694 void *callback_cls)
765{ 695{
@@ -784,10 +714,13 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
784 h->iterate_tail, 714 h->iterate_tail,
785 ic); 715 ic);
786 LOG (GNUNET_ERROR_TYPE_DEBUG, 716 LOG (GNUNET_ERROR_TYPE_DEBUG,
787 "Sending an iterate request for sub system `%s'\n", sub_system); 717 "Sending an iterate request for sub system `%s'\n",
718 sub_system);
788 GNUNET_MQ_send (h->mq, ev); 719 GNUNET_MQ_send (h->mq, ev);
789 ic->timeout_task = 720 ic->timeout_task =
790 GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic); 721 GNUNET_SCHEDULER_add_delayed (timeout,
722 &iterate_timeout,
723 ic);
791 return ic; 724 return ic;
792} 725}
793 726
@@ -797,32 +730,50 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
797/******************************************************************************/ 730/******************************************************************************/
798 731
799/** 732/**
800 * When a watch record is received 733 * When a watch record is received, validate it is well-formed.
734 *
735 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
736 * @param msg message received
737 */
738static int
739check_watch_record (void *cls,
740 const struct GNUNET_MessageHeader *msg)
741{
742 /* we defer validation to #handle_watch_result */
743 return GNUNET_OK;
744}
745
746
747/**
748 * When a watch record is received, process it.
801 * 749 *
802 * @param cls a `struct GNUNET_PEERSTORE_Handle *` 750 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
803 * @param msg message received, NULL on timeout or fatal error 751 * @param msg message received
804 */ 752 */
805static void 753static void
806handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) 754handle_watch_record (void *cls,
755 const struct GNUNET_MessageHeader *msg)
807{ 756{
808 struct GNUNET_PEERSTORE_Handle *h = cls; 757 struct GNUNET_PEERSTORE_Handle *h = cls;
809 struct GNUNET_PEERSTORE_Record *record; 758 struct GNUNET_PEERSTORE_Record *record;
810 struct GNUNET_HashCode keyhash; 759 struct GNUNET_HashCode keyhash;
811 struct GNUNET_PEERSTORE_WatchContext *wc; 760 struct GNUNET_PEERSTORE_WatchContext *wc;
812 761
813 if (NULL == msg) 762 LOG (GNUNET_ERROR_TYPE_DEBUG,
763 "Received a watch record from service.\n");
764 record = PEERSTORE_parse_record_message (msg);
765 if (NULL == record)
814 { 766 {
815 LOG (GNUNET_ERROR_TYPE_ERROR,
816 _
817 ("Problem receiving a watch response, no way to determine which request.\n"));
818 reconnect (h); 767 reconnect (h);
819 return; 768 return;
820 } 769 }
821 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); 770 PEERSTORE_hash_key (record->sub_system,
822 record = PEERSTORE_parse_record_message (msg); 771 record->peer,
823 PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash); 772 record->key,
773 &keyhash);
824 // FIXME: what if there are multiple watches for the same key? 774 // FIXME: what if there are multiple watches for the same key?
825 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); 775 wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
776 &keyhash);
826 if (NULL == wc) 777 if (NULL == wc)
827 { 778 {
828 LOG (GNUNET_ERROR_TYPE_ERROR, 779 LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -832,12 +783,115 @@ handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
832 return; 783 return;
833 } 784 }
834 if (NULL != wc->callback) 785 if (NULL != wc->callback)
835 wc->callback (wc->callback_cls, record, NULL); 786 wc->callback (wc->callback_cls,
787 record,
788 NULL);
836 PEERSTORE_destroy_record (record); 789 PEERSTORE_destroy_record (record);
837} 790}
838 791
839 792
840/** 793/**
794 * Close the existing connection to PEERSTORE and reconnect.
795 *
796 * @param h handle to the service
797 */
798static void
799reconnect (struct GNUNET_PEERSTORE_Handle *h)
800{
801 GNUNET_MQ_hd_fixed_size (iterate_end,
802 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
803 struct GNUNET_MessageHeader);
804 GNUNET_MQ_hd_var_size (iterate_result,
805 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
806 struct GNUNET_MessageHeader);
807 GNUNET_MQ_hd_var_size (watch_record,
808 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
809 struct GNUNET_MessageHeader);
810 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
811 make_iterate_end_handler (h),
812 make_iterate_result_handler (h),
813 make_watch_record_handler (h),
814 GNUNET_MQ_handler_end ()
815 };
816 struct GNUNET_PEERSTORE_IterateContext *ic;
817 struct GNUNET_PEERSTORE_IterateContext *next;
818 GNUNET_PEERSTORE_Processor icb;
819 void *icb_cls;
820 struct GNUNET_PEERSTORE_StoreContext *sc;
821 struct GNUNET_MQ_Envelope *ev;
822
823 LOG (GNUNET_ERROR_TYPE_DEBUG,
824 "Reconnecting...\n");
825 for (ic = h->iterate_head; NULL != ic; ic = next)
826 {
827 next = ic->next;
828 if (GNUNET_YES == ic->iterating)
829 {
830 icb = ic->callback;
831 icb_cls = ic->callback_cls;
832 GNUNET_PEERSTORE_iterate_cancel (ic);
833 if (NULL != icb)
834 icb (icb_cls,
835 NULL,
836 "Iteration canceled due to reconnection");
837 }
838 }
839 if (NULL != h->mq)
840 {
841 GNUNET_MQ_destroy (h->mq);
842 h->mq = NULL;
843 }
844 if (NULL != h->client)
845 {
846 GNUNET_CLIENT_disconnect (h->client);
847 h->client = NULL;
848 }
849 h->client = GNUNET_CLIENT_connect ("peerstore",
850 h->cfg);
851 GNUNET_assert (NULL != h->client);
852 h->mq = GNUNET_MQ_queue_for_connection_client (h->client,
853 mq_handlers,
854 &handle_client_error, h);
855 LOG (GNUNET_ERROR_TYPE_DEBUG,
856 "Resending pending requests after reconnect.\n");
857 if (NULL != h->watches)
858 GNUNET_CONTAINER_multihashmap_iterate (h->watches,
859 &rewatch_it,
860 h);
861 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
862 {
863 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
864 &ic->peer,
865 ic->key,
866 NULL, 0,
867 NULL, 0,
868 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
869 GNUNET_MQ_send (h->mq, ev);
870 ic->timeout_task
871 = GNUNET_SCHEDULER_add_delayed (ic->timeout,
872 &iterate_timeout,
873 ic);
874 }
875 for (sc = h->store_head; NULL != sc; sc = sc->next)
876 {
877 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
878 &sc->peer,
879 sc->key,
880 sc->value,
881 sc->size,
882 &sc->expiry,
883 sc->options,
884 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
885 GNUNET_MQ_notify_sent (ev,
886 &store_request_sent,
887 sc);
888 GNUNET_MQ_send (h->mq,
889 ev);
890 }
891}
892
893
894/**
841 * Cancel a watch request 895 * Cancel a watch request
842 * 896 *
843 * @param wc handle to the watch request 897 * @param wc handle to the watch request
@@ -851,7 +905,8 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
851 905
852 LOG (GNUNET_ERROR_TYPE_DEBUG, 906 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "Canceling watch.\n"); 907 "Canceling watch.\n");
854 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); 908 ev = GNUNET_MQ_msg (hm,
909 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
855 hm->keyhash = wc->keyhash; 910 hm->keyhash = wc->keyhash;
856 GNUNET_MQ_send (h->mq, ev); 911 GNUNET_MQ_send (h->mq, ev);
857 GNUNET_CONTAINER_multihashmap_remove (h->watches, 912 GNUNET_CONTAINER_multihashmap_remove (h->watches,
@@ -876,17 +931,17 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
876struct GNUNET_PEERSTORE_WatchContext * 931struct GNUNET_PEERSTORE_WatchContext *
877GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, 932GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
878 const char *sub_system, 933 const char *sub_system,
879 const struct GNUNET_PeerIdentity *peer, const char *key, 934 const struct GNUNET_PeerIdentity *peer,
880 GNUNET_PEERSTORE_Processor callback, void *callback_cls) 935 const char *key,
936 GNUNET_PEERSTORE_Processor callback,
937 void *callback_cls)
881{ 938{
882 struct GNUNET_MQ_Envelope *ev; 939 struct GNUNET_MQ_Envelope *ev;
883 struct StoreKeyHashMessage *hm; 940 struct StoreKeyHashMessage *hm;
884 struct GNUNET_PEERSTORE_WatchContext *wc; 941 struct GNUNET_PEERSTORE_WatchContext *wc;
885 942
886 GNUNET_assert (NULL != sub_system); 943 ev = GNUNET_MQ_msg (hm,
887 GNUNET_assert (NULL != peer); 944 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
888 GNUNET_assert (NULL != key);
889 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
890 PEERSTORE_hash_key (sub_system, 945 PEERSTORE_hash_key (sub_system,
891 peer, 946 peer,
892 key, 947 key,
diff --git a/src/peerstore/test_plugin_peerstore_flat.conf b/src/peerstore/test_plugin_peerstore_flat.conf
index 48cc9940d..f0548e5c8 100644
--- a/src/peerstore/test_plugin_peerstore_flat.conf
+++ b/src/peerstore/test_plugin_peerstore_flat.conf
@@ -2,4 +2,4 @@
2FILENAME = /tmp/gnunet-test-plugin-namestore-flat/flatdb 2FILENAME = /tmp/gnunet-test-plugin-namestore-flat/flatdb
3 3
4[peerstore] 4[peerstore]
5PREFIX = valgrind --log-file=/home/schanzen/dev/gnunet/src/peerstore/vg_log 5# PREFIX = valgrind --log-file=/home/schanzen/dev/gnunet/src/peerstore/vg_log