aboutsummaryrefslogtreecommitdiff
path: root/src/service/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/peerstore/peerstore_api.c')
-rw-r--r--src/service/peerstore/peerstore_api.c1391
1 files changed, 1391 insertions, 0 deletions
diff --git a/src/service/peerstore/peerstore_api.c b/src/service/peerstore/peerstore_api.c
new file mode 100644
index 000000000..8770c36e4
--- /dev/null
+++ b/src/service/peerstore/peerstore_api.c
@@ -0,0 +1,1391 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013-2016, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file peerstore/peerstore_api.c
22 * @brief API for peerstore
23 * @author Omar Tarabai
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_hello_uri_lib.h"
29#include "peerstore.h"
30#include "peerstore_common.h"
31#include "gnunet_peerstore_service.h"
32
33#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
34
35/******************************************************************************/
36/************************ DATA STRUCTURES ****************************/
37/******************************************************************************/
38
39/**
40 * Handle to the PEERSTORE service.
41 */
42struct GNUNET_PEERSTORE_Handle
43{
44 /**
45 * Our configuration.
46 */
47 const struct GNUNET_CONFIGURATION_Handle *cfg;
48
49 /**
50 * Message queue
51 */
52 struct GNUNET_MQ_Handle *mq;
53
54 /**
55 * Head of active STORE requests.
56 */
57 struct GNUNET_PEERSTORE_StoreContext *store_head;
58
59 /**
60 * Tail of active STORE requests.
61 */
62 struct GNUNET_PEERSTORE_StoreContext *store_tail;
63
64 /**
65 * Head of active ITERATE requests.
66 */
67 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
68
69 /**
70 * Tail of active ITERATE requests.
71 */
72 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
73
74 /**
75 * Hashmap of watch requests
76 */
77 struct GNUNET_CONTAINER_MultiHashMap *watches;
78
79 /**
80 * ID of the task trying to reconnect to the service.
81 */
82 struct GNUNET_SCHEDULER_Task *reconnect_task;
83
84 /**
85 * Delay until we try to reconnect.
86 */
87 struct GNUNET_TIME_Relative reconnect_delay;
88
89 /**
90 * Are we in the process of disconnecting but need to sync first?
91 */
92 int disconnecting;
93};
94
95/**
96 * Context for a store request
97 */
98struct GNUNET_PEERSTORE_StoreContext
99{
100 /**
101 * Kept in a DLL.
102 */
103 struct GNUNET_PEERSTORE_StoreContext *next;
104
105 /**
106 * Kept in a DLL.
107 */
108 struct GNUNET_PEERSTORE_StoreContext *prev;
109
110 /**
111 * Handle to the PEERSTORE service.
112 */
113 struct GNUNET_PEERSTORE_Handle *h;
114
115 /**
116 * Continuation called with service response
117 */
118 GNUNET_PEERSTORE_Continuation cont;
119
120 /**
121 * Closure for @e cont
122 */
123 void *cont_cls;
124
125 /**
126 * Which subsystem does the store?
127 */
128 char *sub_system;
129
130 /**
131 * Key for the store operation.
132 */
133 char *key;
134
135 /**
136 * Contains @e size bytes.
137 */
138 void *value;
139
140 /**
141 * Peer the store is for.
142 */
143 struct GNUNET_PeerIdentity peer;
144
145 /**
146 * Number of bytes in @e value.
147 */
148 size_t size;
149
150 /**
151 * When does the value expire?
152 */
153 struct GNUNET_TIME_Absolute expiry;
154
155 /**
156 * Options for the store operation.
157 */
158 enum GNUNET_PEERSTORE_StoreOption options;
159};
160
161/**
162 * Closure for store callback when storing hello uris.
163 */
164struct StoreHelloCls
165{
166 /**
167 * The corresponding store context.
168 */
169 struct GNUNET_PEERSTORE_StoreContext *sc;
170
171 /**
172 * The corresponding hello uri add request.
173 */
174 struct GNUNET_PEERSTORE_StoreHelloContext *huc;
175};
176
177/**
178 * Context for a iterate request
179 */
180struct GNUNET_PEERSTORE_IterateContext
181{
182 /**
183 * Kept in a DLL.
184 */
185 struct GNUNET_PEERSTORE_IterateContext *next;
186
187 /**
188 * Kept in a DLL.
189 */
190 struct GNUNET_PEERSTORE_IterateContext *prev;
191
192 /**
193 * Handle to the PEERSTORE service.
194 */
195 struct GNUNET_PEERSTORE_Handle *h;
196
197 /**
198 * Which subsystem does the store?
199 */
200 char *sub_system;
201
202 /**
203 * Peer the store is for.
204 */
205 struct GNUNET_PeerIdentity peer;
206
207 /**
208 * Key for the store operation.
209 */
210 char *key;
211
212 /**
213 * Callback with each matching record
214 */
215 GNUNET_PEERSTORE_Processor callback;
216
217 /**
218 * Closure for @e callback
219 */
220 void *callback_cls;
221
222 /**
223 * #GNUNET_YES if we are currently processing records.
224 */
225 int iterating;
226};
227
228/**
229 * Context for a watch request
230 */
231struct GNUNET_PEERSTORE_WatchContext
232{
233 /**
234 * Kept in a DLL.
235 */
236 struct GNUNET_PEERSTORE_WatchContext *next;
237
238 /**
239 * Kept in a DLL.
240 */
241 struct GNUNET_PEERSTORE_WatchContext *prev;
242
243 /**
244 * Handle to the PEERSTORE service.
245 */
246 struct GNUNET_PEERSTORE_Handle *h;
247
248 /**
249 * Callback with each record received
250 */
251 GNUNET_PEERSTORE_Processor callback;
252
253 /**
254 * Closure for @e callback
255 */
256 void *callback_cls;
257
258 /**
259 * Hash of the combined key
260 */
261 struct GNUNET_HashCode keyhash;
262
263 /**
264 * The iteration context to deliver the actual values for the key.
265 */
266 struct GNUNET_PEERSTORE_IterateContext *ic;
267
268 /**
269 * The peer we are watching for values.
270 */
271 const struct GNUNET_PeerIdentity *peer;
272
273 /**
274 * The key we like to watch for values.
275 */
276 const char *key;
277
278 /**
279 * The sub system requested the watch.
280 */
281 const char *sub_system;
282};
283
284/**
285 * Context for the info handler.
286 */
287struct GNUNET_PEERSTORE_NotifyContext
288{
289 /**
290 * Peerstore handle.
291 */
292 struct GNUNET_PEERSTORE_Handle *h;
293
294 /**
295 * Function to call with information.
296 */
297 GNUNET_PEERSTORE_hello_notify_cb callback;
298
299 /**
300 * Closure for @e callback.
301 */
302 void *callback_cls;
303
304 /**
305 * The watch for this context.
306 */
307 struct GNUNET_PEERSTORE_WatchContext *wc;
308
309 /**
310 * Is this request canceled.
311 */
312 unsigned int canceled;
313};
314
315/**
316 * Context for a add hello uri request.
317 */
318struct GNUNET_PEERSTORE_StoreHelloContext
319{
320 /**
321 * Peerstore handle.
322 */
323 struct GNUNET_PEERSTORE_Handle *h;
324
325 /**
326 * Function to call with information.
327 */
328 GNUNET_PEERSTORE_Continuation cont;
329
330 /**
331 * Closure for @e callback.
332 */
333 void *cont_cls;
334
335 /**
336 * Map with all store contexts started during adding hello.
337 */
338 struct GNUNET_CONTAINER_MultiPeerMap *store_context_map;
339
340 /**
341 * Active watch to be notified about conflicting hello uri add requests.
342 */
343 struct GNUNET_PEERSTORE_WatchContext *wc;
344
345 /**
346 * Hello uri which was request for storing.
347 */
348 struct GNUNET_MessageHeader *hello;
349
350 /**
351 * The peer id for the hello.
352 */
353 struct GNUNET_PeerIdentity *pid;
354
355 /**
356 * Was this request successful.
357 */
358 int success;
359};
360
361/******************************************************************************/
362/******************* DECLARATIONS *********************/
363/******************************************************************************/
364
365/**
366 * Close the existing connection to PEERSTORE and reconnect.
367 *
368 * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
369 */
370static void
371reconnect (void *cls);
372
373
374/**
375 * Disconnect from the peerstore service.
376 *
377 * @param h peerstore handle to disconnect
378 */
379static void
380disconnect (struct GNUNET_PEERSTORE_Handle *h)
381{
382 struct GNUNET_PEERSTORE_IterateContext *next;
383
384 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
385 ic = next)
386 {
387 next = ic->next;
388 if (GNUNET_YES == ic->iterating)
389 {
390 GNUNET_PEERSTORE_Processor icb;
391 void *icb_cls;
392
393 icb = ic->callback;
394 icb_cls = ic->callback_cls;
395 GNUNET_PEERSTORE_iterate_cancel (ic);
396 if (NULL != icb)
397 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
398 }
399 }
400
401 if (NULL != h->mq)
402 {
403 GNUNET_MQ_destroy (h->mq);
404 h->mq = NULL;
405 }
406}
407
408
409/**
410 * Function that will schedule the job that will try
411 * to connect us again to the client.
412 *
413 * @param h peerstore to reconnect
414 */
415static void
416disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
417{
418 GNUNET_assert (NULL == h->reconnect_task);
419 disconnect (h);
420 LOG (GNUNET_ERROR_TYPE_DEBUG,
421 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
422 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
423 h->reconnect_task =
424 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
425 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
426}
427
428
429/**
430 * Callback after MQ envelope is sent
431 *
432 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
433 */
434static void
435store_request_sent (void *cls)
436{
437 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
438 GNUNET_PEERSTORE_Continuation cont;
439 void *cont_cls;
440
441 if (NULL != sc)
442 {
443 cont = sc->cont;
444 cont_cls = sc->cont_cls;
445 GNUNET_PEERSTORE_store_cancel (sc);
446 if (NULL != cont)
447 cont (cont_cls, GNUNET_OK);
448 }
449}
450
451
452/******************************************************************************/
453/******************* CONNECTION FUNCTIONS *********************/
454/******************************************************************************/
455
456
457/**
458 * Function called when we had trouble talking to the service.
459 */
460static void
461handle_client_error (void *cls, enum GNUNET_MQ_Error error)
462{
463 struct GNUNET_PEERSTORE_Handle *h = cls;
464
465 LOG (GNUNET_ERROR_TYPE_ERROR,
466 "Received an error notification from MQ of type: %d\n",
467 error);
468 disconnect_and_schedule_reconnect (h);
469}
470
471
472/**
473 * Iterator over previous watches to resend them
474 *
475 * @param cls the `struct GNUNET_PEERSTORE_Handle`
476 * @param key key for the watch
477 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
478 * @return #GNUNET_YES (continue to iterate)
479 */
480static int
481rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
482{
483 struct GNUNET_PEERSTORE_Handle *h = cls;
484 struct GNUNET_PEERSTORE_WatchContext *wc = value;
485 struct StoreKeyHashMessage *hm;
486 struct GNUNET_MQ_Envelope *ev;
487
488 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
489 hm->keyhash = wc->keyhash;
490 GNUNET_MQ_send (h->mq, ev);
491 return GNUNET_YES;
492}
493
494
495/**
496 * Iterator over watch requests to cancel them.
497 *
498 * @param cls unused
499 * @param key key to the watch request
500 * @param value watch context
501 * @return #GNUNET_YES to continue iteration
502 */
503static int
504destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
505{
506 struct GNUNET_PEERSTORE_WatchContext *wc = value;
507
508 GNUNET_PEERSTORE_watch_cancel (wc);
509 return GNUNET_YES;
510}
511
512
513/**
514 * Kill the connection to the service. This can be delayed in case of pending
515 * STORE requests and the user explicitly asked to sync first. Otherwise it is
516 * performed instantly.
517 *
518 * @param h Handle to the service.
519 */
520static void
521final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
522{
523 if (NULL != h->mq)
524 {
525 GNUNET_MQ_destroy (h->mq);
526 h->mq = NULL;
527 }
528 GNUNET_free (h);
529}
530
531
532/**
533 * Connect to the PEERSTORE service.
534 *
535 * @param cfg configuration to use
536 * @return NULL on error
537 */
538struct GNUNET_PEERSTORE_Handle *
539GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
540{
541 struct GNUNET_PEERSTORE_Handle *h;
542
543 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
544 h->cfg = cfg;
545 h->disconnecting = GNUNET_NO;
546 reconnect (h);
547 if (NULL == h->mq)
548 {
549 GNUNET_free (h);
550 return NULL;
551 }
552 return h;
553}
554
555
556/**
557 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
558 * will be canceled.
559 * Any pending STORE requests will depend on @e snyc_first flag.
560 *
561 * @param h handle to disconnect
562 * @param sync_first send any pending STORE requests before disconnecting
563 */
564void
565GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
566{
567 struct GNUNET_PEERSTORE_IterateContext *ic;
568 struct GNUNET_PEERSTORE_StoreContext *sc;
569
570 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
571 if (NULL != h->watches)
572 {
573 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
574 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
575 h->watches = NULL;
576 }
577 while (NULL != (ic = h->iterate_head))
578 {
579 GNUNET_break (0);
580 GNUNET_PEERSTORE_iterate_cancel (ic);
581 }
582 if (NULL != h->store_head)
583 {
584 if (GNUNET_YES == sync_first)
585 {
586 LOG (GNUNET_ERROR_TYPE_DEBUG,
587 "Delaying disconnection due to pending store requests.\n");
588 h->disconnecting = GNUNET_YES;
589 return;
590 }
591 while (NULL != (sc = h->store_head))
592 GNUNET_PEERSTORE_store_cancel (sc);
593 }
594 final_disconnect (h);
595}
596
597
598/******************************************************************************/
599/******************* STORE FUNCTIONS *********************/
600/******************************************************************************/
601
602
603/**
604 * Cancel a store request
605 *
606 * @param sc Store request context
607 */
608void
609GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
610{
611 struct GNUNET_PEERSTORE_Handle *h = sc->h;
612
613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614 "store cancel with sc %p \n",
615 sc);
616 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
617 GNUNET_free (sc->sub_system);
618 GNUNET_free (sc->value);
619 GNUNET_free (sc->key);
620 GNUNET_free (sc);
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "store cancel with sc %p is null\n",
623 sc);
624 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
625 final_disconnect (h);
626}
627
628
629/**
630 * Store a new entry in the PEERSTORE.
631 * Note that stored entries can be lost in some cases
632 * such as power failure.
633 *
634 * @param h Handle to the PEERSTORE service
635 * @param sub_system name of the sub system
636 * @param peer Peer Identity
637 * @param key entry key
638 * @param value entry value BLOB
639 * @param size size of @e value
640 * @param expiry absolute time after which the entry is (possibly) deleted
641 * @param options options specific to the storage operation
642 * @param cont Continuation function after the store request is sent
643 * @param cont_cls Closure for @a cont
644 */
645struct GNUNET_PEERSTORE_StoreContext *
646GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
647 const char *sub_system,
648 const struct GNUNET_PeerIdentity *peer,
649 const char *key,
650 const void *value,
651 size_t size,
652 struct GNUNET_TIME_Absolute expiry,
653 enum GNUNET_PEERSTORE_StoreOption options,
654 GNUNET_PEERSTORE_Continuation cont,
655 void *cont_cls)
656{
657 struct GNUNET_MQ_Envelope *ev;
658 struct GNUNET_PEERSTORE_StoreContext *sc;
659
660 LOG (GNUNET_ERROR_TYPE_DEBUG,
661 "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n",
662 size,
663 sub_system,
664 GNUNET_i2s (peer),
665 key);
666 ev =
667 PEERSTORE_create_record_mq_envelope (sub_system,
668 peer,
669 key,
670 value,
671 size,
672 expiry,
673 options,
674 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
675 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
676
677 sc->sub_system = GNUNET_strdup (sub_system);
678 sc->peer = *peer;
679 sc->key = GNUNET_strdup (key);
680 sc->value = GNUNET_memdup (value, size);
681 sc->size = size;
682 sc->expiry = expiry;
683 sc->options = options;
684 sc->cont = cont;
685 sc->cont_cls = cont_cls;
686 sc->h = h;
687
688 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
689 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
690 GNUNET_MQ_send (h->mq, ev);
691 return sc;
692}
693
694
695/******************************************************************************/
696/******************* ITERATE FUNCTIONS *********************/
697/******************************************************************************/
698
699
700/**
701 * When a response for iterate request is received
702 *
703 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
704 * @param msg message received
705 */
706static void
707handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
708{
709 struct GNUNET_PEERSTORE_Handle *h = cls;
710 struct GNUNET_PEERSTORE_IterateContext *ic;
711 GNUNET_PEERSTORE_Processor callback;
712 void *callback_cls;
713
714 ic = h->iterate_head;
715 if (NULL == ic)
716 {
717 LOG (GNUNET_ERROR_TYPE_ERROR,
718 _ ("Unexpected iteration response, this should not happen.\n"));
719 disconnect_and_schedule_reconnect (h);
720 return;
721 }
722 callback = ic->callback;
723 callback_cls = ic->callback_cls;
724 ic->iterating = GNUNET_NO;
725 GNUNET_PEERSTORE_iterate_cancel (ic);
726 /* NOTE: set this here and not after callback because callback may free h */
727 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
728 if (NULL != callback)
729 callback (callback_cls, NULL, NULL);
730}
731
732
733/**
734 * When a response for iterate request is received, check the
735 * message is well-formed.
736 *
737 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
738 * @param msg message received
739 */
740static int
741check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
742{
743 /* we defer validation to #handle_iterate_result */
744 return GNUNET_OK;
745}
746
747
748/**
749 * When a response for iterate request is received
750 *
751 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
752 * @param msg message received
753 */
754static void
755handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
756{
757 struct GNUNET_PEERSTORE_Handle *h = cls;
758 struct GNUNET_PEERSTORE_IterateContext *ic;
759 GNUNET_PEERSTORE_Processor callback;
760 void *callback_cls;
761 struct GNUNET_PEERSTORE_Record *record;
762
763 ic = h->iterate_head;
764 if (NULL == ic)
765 {
766 LOG (GNUNET_ERROR_TYPE_ERROR,
767 _ ("Unexpected iteration response, this should not happen.\n"));
768 disconnect_and_schedule_reconnect (h);
769 return;
770 }
771 ic->iterating = GNUNET_YES;
772 callback = ic->callback;
773 callback_cls = ic->callback_cls;
774 if (NULL == callback)
775 return;
776 record = PEERSTORE_parse_record_message (msg);
777 if (NULL == record)
778 {
779 callback (callback_cls,
780 NULL,
781 _ ("Received a malformed response from service."));
782 }
783 else
784 {
785 callback (callback_cls, record, NULL);
786 PEERSTORE_destroy_record (record);
787 }
788}
789
790
791/**
792 * Cancel an iterate request
793 * Please do not call after the iterate request is done
794 *
795 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
796 */
797void
798GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
799{
800 if (GNUNET_NO == ic->iterating)
801 {
802 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
803 GNUNET_free (ic->sub_system);
804 GNUNET_free (ic->key);
805 GNUNET_free (ic);
806 }
807 else
808 ic->callback = NULL;
809}
810
811
812struct GNUNET_PEERSTORE_IterateContext *
813GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
814 const char *sub_system,
815 const struct GNUNET_PeerIdentity *peer,
816 const char *key,
817 GNUNET_PEERSTORE_Processor callback,
818 void *callback_cls)
819{
820 struct GNUNET_MQ_Envelope *ev;
821 struct GNUNET_PEERSTORE_IterateContext *ic;
822
823 ev =
824 PEERSTORE_create_record_mq_envelope (sub_system,
825 peer,
826 key,
827 NULL,
828 0,
829 GNUNET_TIME_UNIT_FOREVER_ABS,
830 0,
831 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
832 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
833 ic->callback = callback;
834 ic->callback_cls = callback_cls;
835 ic->h = h;
836 ic->sub_system = GNUNET_strdup (sub_system);
837 if (NULL != peer)
838 ic->peer = *peer;
839 if (NULL != key)
840 ic->key = GNUNET_strdup (key);
841 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
842 LOG (GNUNET_ERROR_TYPE_DEBUG,
843 "Sending an iterate request for sub system `%s'\n",
844 sub_system);
845 GNUNET_MQ_send (h->mq, ev);
846 return ic;
847}
848
849
850/******************************************************************************/
851/******************* WATCH FUNCTIONS *********************/
852/******************************************************************************/
853
854/**
855 * When a watch record is received, validate it is well-formed.
856 *
857 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
858 * @param msg message received
859 */
860static int
861check_watch_record (void *cls, const struct StoreRecordMessage *msg)
862{
863 /* we defer validation to #handle_watch_result */
864 return GNUNET_OK;
865}
866
867
868/**
869 * When a watch record is received, process it.
870 *
871 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
872 * @param msg message received
873 */
874static void
875handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
876{
877 struct GNUNET_PEERSTORE_Handle *h = cls;
878 struct GNUNET_PEERSTORE_Record *record;
879 struct GNUNET_HashCode keyhash;
880 struct GNUNET_PEERSTORE_WatchContext *wc;
881
882 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
883 record = PEERSTORE_parse_record_message (msg);
884 if (NULL == record)
885 {
886 disconnect_and_schedule_reconnect (h);
887 return;
888 }
889 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
890 // FIXME: what if there are multiple watches for the same key?
891 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
892 if (NULL == wc)
893 {
894 LOG (GNUNET_ERROR_TYPE_ERROR,
895 _ ("Received a watch result for a non existing watch.\n"));
896 PEERSTORE_destroy_record (record);
897 disconnect_and_schedule_reconnect (h);
898 return;
899 }
900 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
901 if (NULL != wc->callback)
902 wc->callback (wc->callback_cls, record, NULL);
903 PEERSTORE_destroy_record (record);
904}
905
906
907/**
908 * Close the existing connection to PEERSTORE and reconnect.
909 *
910 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
911 */
912static void
913reconnect (void *cls)
914{
915 struct GNUNET_PEERSTORE_Handle *h = cls;
916 struct GNUNET_MQ_MessageHandler mq_handlers[] =
917 { GNUNET_MQ_hd_fixed_size (iterate_end,
918 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
919 struct GNUNET_MessageHeader,
920 h),
921 GNUNET_MQ_hd_var_size (iterate_result,
922 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
923 struct StoreRecordMessage,
924 h),
925 GNUNET_MQ_hd_var_size (watch_record,
926 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
927 struct StoreRecordMessage,
928 h),
929 GNUNET_MQ_handler_end () };
930 struct GNUNET_MQ_Envelope *ev;
931
932 h->reconnect_task = NULL;
933 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
934 h->mq = GNUNET_CLIENT_connect (h->cfg,
935 "peerstore",
936 mq_handlers,
937 &handle_client_error,
938 h);
939 if (NULL == h->mq)
940 {
941 h->reconnect_task =
942 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
943 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
944 return;
945 }
946 LOG (GNUNET_ERROR_TYPE_DEBUG,
947 "Resending pending requests after reconnect.\n");
948 if (NULL != h->watches)
949 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
950 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
951 ic = ic->next)
952 {
953 ev =
954 PEERSTORE_create_record_mq_envelope (ic->sub_system,
955 &ic->peer,
956 ic->key,
957 NULL,
958 0,
959 GNUNET_TIME_UNIT_FOREVER_ABS,
960 0,
961 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
962 GNUNET_MQ_send (h->mq, ev);
963 }
964 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
965 sc = sc->next)
966 {
967 ev =
968 PEERSTORE_create_record_mq_envelope (sc->sub_system,
969 &sc->peer,
970 sc->key,
971 sc->value,
972 sc->size,
973 sc->expiry,
974 sc->options,
975 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
976 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
977 GNUNET_MQ_send (h->mq, ev);
978 }
979}
980
981
982/**
983 * Cancel a watch request
984 *
985 * @param wc handle to the watch request
986 */
987void
988GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
989{
990 struct GNUNET_PEERSTORE_Handle *h = wc->h;
991 struct GNUNET_MQ_Envelope *ev;
992 struct StoreKeyHashMessage *hm;
993
994 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
995 if (NULL != wc->ic)
996 {
997 GNUNET_PEERSTORE_iterate_cancel (wc->ic);
998 GNUNET_free (wc);
999 return;
1000 }
1001
1002 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
1003 hm->keyhash = wc->keyhash;
1004 GNUNET_MQ_send (h->mq, ev);
1005 GNUNET_assert (
1006 GNUNET_YES ==
1007 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
1008 GNUNET_free (wc);
1009}
1010
1011
1012static void
1013watch_iterate (void *cls,
1014 const struct GNUNET_PEERSTORE_Record *record,
1015 const char *emsg)
1016{
1017 struct GNUNET_PEERSTORE_WatchContext *wc = cls;
1018 struct GNUNET_PEERSTORE_Handle *h = wc->h;
1019 struct StoreKeyHashMessage *hm;
1020
1021 if (NULL != emsg)
1022 {
1023 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1024 "Got failure from PEERSTORE: %s\n",
1025 emsg);
1026 wc->callback (wc->callback_cls, NULL, emsg);
1027 return;
1028 }
1029 if (NULL == record)
1030 {
1031 struct GNUNET_MQ_Envelope *ev;
1032 const struct GNUNET_PeerIdentity *peer;
1033
1034 if (NULL == wc->peer)
1035 peer = GNUNET_new (struct GNUNET_PeerIdentity);
1036 else
1037 peer = wc->peer;
1038 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
1039 PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
1040 LOG (GNUNET_ERROR_TYPE_DEBUG,
1041 "Hash key we watch for %s\n",
1042 GNUNET_h2s_full (&hm->keyhash));
1043 wc->keyhash = hm->keyhash;
1044 if (NULL == h->watches)
1045 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
1046 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
1047 h->watches,
1048 &wc->keyhash,
1049 wc,
1050 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1051 LOG (GNUNET_ERROR_TYPE_DEBUG,
1052 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
1053 wc->sub_system,
1054 GNUNET_i2s (peer),
1055 wc->key);
1056 GNUNET_MQ_send (h->mq, ev);
1057 wc->ic = NULL;
1058 if (NULL != wc->callback)
1059 wc->callback (wc->callback_cls, record, NULL);
1060 return;
1061 }
1062
1063 if (NULL != wc->callback)
1064 wc->callback (wc->callback_cls, record, NULL);
1065}
1066
1067
1068/**
1069 * Request watching a given key
1070 * User will be notified with any new values added to key,
1071 * all existing entries are supplied beforehand.
1072 *
1073 * @param h handle to the PEERSTORE service
1074 * @param sub_system name of sub system
1075 * @param peer Peer identity
1076 * @param key entry key string
1077 * @param callback function called with each new value
1078 * @param callback_cls closure for @a callback
1079 * @return Handle to watch request
1080 */
1081struct GNUNET_PEERSTORE_WatchContext *
1082GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
1083 const char *sub_system,
1084 const struct GNUNET_PeerIdentity *peer,
1085 const char *key,
1086 GNUNET_PEERSTORE_Processor callback,
1087 void *callback_cls)
1088{
1089 struct GNUNET_PEERSTORE_IterateContext *ic;
1090 struct GNUNET_PEERSTORE_WatchContext *wc;
1091
1092 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
1093 wc->callback = callback;
1094 wc->callback_cls = callback_cls;
1095 wc->h = h;
1096 wc->ic = ic;
1097 wc->key = key;
1098 wc->peer = peer;
1099 wc->sub_system = sub_system;
1100
1101 ic = GNUNET_PEERSTORE_iterate (h,
1102 sub_system,
1103 peer,
1104 key,
1105 &watch_iterate,
1106 wc);
1107
1108 return wc;
1109}
1110
1111
1112/******************************************************************************/
1113/******************* HELLO FUNCTIONS *********************/
1114/******************************************************************************/
1115
1116
1117static void
1118hello_updated (void *cls,
1119 const struct GNUNET_PEERSTORE_Record *record,
1120 const char *emsg)
1121{
1122 struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
1123 const struct GNUNET_MessageHeader *hello;
1124 struct GNUNET_HELLO_Builder *builder;
1125
1126 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127 "hello_updated\n");
1128 if (NULL != emsg)
1129 {
1130 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1131 "Got failure from PEERSTORE: %s\n",
1132 emsg);
1133 nc->callback (nc->callback_cls, NULL, NULL, emsg);
1134 return;
1135 }
1136 if (NULL == record)
1137 return;
1138 hello = record->value;
1139 builder = GNUNET_HELLO_builder_from_msg (hello);
1140 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1141 "hello_updated with expired %s and size %u for peer %s\n",
1142 GNUNET_STRINGS_absolute_time_to_string (GNUNET_HELLO_builder_get_expiration_time (hello)),
1143 ntohs (hello->size),
1144 GNUNET_i2s (&record->peer));
1145 if ((0 == record->value_size))
1146 {
1147 GNUNET_break (0);
1148 return;
1149 }
1150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151 "hello_updated call callback\n");
1152 nc->callback (nc->callback_cls, &record->peer, hello, NULL);
1153}
1154
1155
1156struct GNUNET_PEERSTORE_NotifyContext *
1157GNUNET_PEERSTORE_hello_changed_notify (struct GNUNET_PEERSTORE_Handle *h,
1158 int include_friend_only,
1159 GNUNET_PEERSTORE_hello_notify_cb callback,
1160 void *callback_cls)
1161{
1162 struct GNUNET_PEERSTORE_NotifyContext *nc;
1163
1164 nc = GNUNET_new (struct GNUNET_PEERSTORE_NotifyContext);
1165 nc->callback = callback;
1166 nc->callback_cls = callback_cls;
1167 nc->h = h;
1168
1169 nc->wc = GNUNET_PEERSTORE_watch (h,
1170 "peerstore",
1171 NULL,
1172 GNUNET_PEERSTORE_HELLO_KEY,
1173 &hello_updated,
1174 nc);
1175
1176 return nc;
1177}
1178
1179
1180/**
1181 * Stop notifying about changes.
1182 *
1183 * @param nc context to stop notifying
1184 */
1185void
1186GNUNET_PEERSTORE_hello_changed_notify_cancel (struct
1187 GNUNET_PEERSTORE_NotifyContext *nc)
1188{
1189 if (NULL != nc->wc)
1190 {
1191 GNUNET_PEERSTORE_watch_cancel (nc->wc);
1192 nc->wc = NULL;
1193 }
1194}
1195
1196
1197static void
1198merge_success (void *cls, int success)
1199{
1200 struct StoreHelloCls *shu_cls = cls;
1201 struct GNUNET_PEERSTORE_StoreHelloContext *huc = shu_cls->huc;
1202
1203 if (GNUNET_OK != success)
1204 {
1205 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1206 "Storing hello uri failed\n");
1207 huc->cont (huc->cont_cls, success);
1208 GNUNET_free (huc->hello);
1209 GNUNET_free (huc->pid);
1210 GNUNET_free (huc);
1211 return;
1212 }
1213 GNUNET_CONTAINER_multipeermap_remove (huc->store_context_map, huc->pid, shu_cls->sc);
1214 if (0 == GNUNET_CONTAINER_multipeermap_size (huc->store_context_map))
1215 {
1216 GNUNET_PEERSTORE_watch_cancel (huc->wc);
1217 huc->wc = NULL;
1218 huc->cont (huc->cont_cls, GNUNET_OK);
1219 huc->success = GNUNET_OK;
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "Storing hello uri succeeded for peer %s!\n",
1222 GNUNET_i2s (huc->pid));
1223 GNUNET_free (huc->hello);
1224 GNUNET_free (huc->pid);
1225 GNUNET_free (huc);
1226 return;
1227 }
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "Got notified during storing hello uri for peer %s!\n",
1230 GNUNET_i2s (huc->pid));
1231}
1232
1233
1234static void
1235store_hello (struct GNUNET_PEERSTORE_StoreHelloContext *huc,
1236 const struct GNUNET_MessageHeader *hello)
1237{
1238 struct GNUNET_PEERSTORE_Handle *h = huc->h;
1239 struct GNUNET_PEERSTORE_StoreContext *sc;
1240 struct StoreHelloCls *shu_cls = GNUNET_new (struct StoreHelloCls);
1241 struct GNUNET_TIME_Absolute hello_exp;
1242
1243 shu_cls->huc = huc;
1244 hello_exp = GNUNET_HELLO_builder_get_expiration_time (hello);
1245 sc = GNUNET_PEERSTORE_store (h,
1246 "peerstore",
1247 huc->pid,
1248 GNUNET_PEERSTORE_HELLO_KEY,
1249 hello,
1250 ntohs (hello->size),
1251 hello_exp,
1252 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
1253 merge_success,
1254 shu_cls);
1255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256 "store_hello with expiration %s\n",
1257 GNUNET_STRINGS_absolute_time_to_string (hello_exp));
1258 GNUNET_CONTAINER_multipeermap_put (huc->store_context_map,
1259 huc->pid,
1260 sc,
1261 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1262 shu_cls->sc = sc;
1263}
1264
1265
1266static void
1267merge_uri (void *cls,
1268 const struct GNUNET_PEERSTORE_Record *record,
1269 const char *emsg)
1270{
1271 struct GNUNET_PEERSTORE_StoreHelloContext *huc = cls;
1272 struct GNUNET_MessageHeader *hello;
1273 struct GNUNET_TIME_Absolute huc_hello_exp_time;
1274 struct GNUNET_TIME_Absolute record_hello_exp_time;
1275
1276 if (NULL != emsg)
1277 {
1278 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1279 "Got failure from PEERSTORE: %s\n",
1280 emsg);
1281 return;
1282 }
1283
1284 if (NULL == record && GNUNET_NO == huc->success)
1285 {
1286 huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
1287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288 "merge_uri just store for peer %s with expiration %s\n",
1289 GNUNET_i2s (huc->pid),
1290 GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
1291 store_hello (huc, huc->hello);
1292 }
1293 else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid, &record->peer))
1294 {
1295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296 "merge_uri record for peer %s\n",
1297 GNUNET_i2s (&record->peer));
1298 hello = record->value;
1299 if ((0 == record->value_size))
1300 {
1301 GNUNET_break (0);
1302 return;
1303 }
1304
1305 huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
1306 record_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (hello);
1307
1308 if (GNUNET_TIME_absolute_cmp (huc_hello_exp_time, >, record_hello_exp_time))
1309 store_hello (huc, huc->hello);
1310 }
1311}
1312
1313
1314struct GNUNET_PEERSTORE_StoreHelloContext *
1315GNUNET_PEERSTORE_hello_add (struct GNUNET_PEERSTORE_Handle *h,
1316 const struct GNUNET_MessageHeader *msg,
1317 GNUNET_PEERSTORE_Continuation cont,
1318 void *cont_cls)
1319{
1320 struct GNUNET_HELLO_Builder *builder = GNUNET_HELLO_builder_from_msg (msg);
1321 struct GNUNET_PEERSTORE_StoreHelloContext *huc;
1322 struct GNUNET_PeerIdentity *pid;
1323 struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
1324 struct GNUNET_TIME_Absolute hello_exp =
1325 GNUNET_HELLO_builder_get_expiration_time (msg);
1326 struct GNUNET_TIME_Absolute huc_exp;
1327 uint16_t pid_size;
1328 uint16_t size_msg = ntohs (msg->size);
1329
1330 if (NULL == builder)
1331 return NULL;
1332 if (GNUNET_TIME_absolute_cmp (hello_exp, <, now))
1333 return NULL;
1334
1335 huc = GNUNET_new (struct GNUNET_PEERSTORE_StoreHelloContext);
1336 huc->store_context_map = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1337 huc->h = h;
1338 huc->cont = cont;
1339 huc->cont_cls = cont_cls;
1340 huc->hello = GNUNET_malloc (size_msg);
1341 GNUNET_memcpy (huc->hello, msg, size_msg);
1342 huc_exp =
1343 GNUNET_HELLO_builder_get_expiration_time (huc->hello);
1344 pid = GNUNET_HELLO_builder_get_id (builder);
1345 pid_size = sizeof (struct GNUNET_PeerIdentity);
1346 huc->pid = GNUNET_malloc (pid_size);
1347 GNUNET_memcpy (huc->pid, pid, pid_size);
1348 LOG (GNUNET_ERROR_TYPE_DEBUG,
1349 "Adding hello for peer %s with expiration %s msg size %u\n",
1350 GNUNET_i2s (huc->pid),
1351 GNUNET_STRINGS_absolute_time_to_string (huc_exp),
1352 size_msg);
1353 huc->wc = GNUNET_PEERSTORE_watch (h,
1354 "peerstore",
1355 NULL,
1356 GNUNET_PEERSTORE_HELLO_KEY,
1357 &merge_uri,
1358 huc);
1359 GNUNET_HELLO_builder_free (builder);
1360
1361 return huc;
1362}
1363
1364
1365static enum GNUNET_GenericReturnValue
1366free_store_context(void *cls,
1367 const struct GNUNET_PeerIdentity *key,
1368 void *value)
1369{
1370 (void) cls;
1371
1372 GNUNET_PEERSTORE_store_cancel ((struct GNUNET_PEERSTORE_StoreContext *) value);
1373 return GNUNET_YES; // FIXME why is this a map anyway
1374}
1375
1376
1377void
1378GNUNET_PEERSTORE_hello_add_cancel (struct
1379 GNUNET_PEERSTORE_StoreHelloContext *huc)
1380{
1381 GNUNET_PEERSTORE_watch_cancel (huc->wc);
1382 GNUNET_CONTAINER_multipeermap_iterate (huc->store_context_map,
1383 free_store_context,
1384 NULL);
1385 GNUNET_free (huc->hello);
1386 GNUNET_free (huc->pid);
1387 GNUNET_free (huc);
1388}
1389
1390
1391/* end of peerstore_api.c */