aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c938
1 files changed, 0 insertions, 938 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
deleted file mode 100644
index 1c13369cf..000000000
--- a/src/peerstore/peerstore_api.c
+++ /dev/null
@@ -1,938 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013-2016, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file peerstore/peerstore_api.c
22 * @brief API for peerstore
23 * @author Omar Tarabai
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "peerstore.h"
29#include "peerstore_common.h"
30
31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32
33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/
35/******************************************************************************/
36
37/**
38 * Handle to the PEERSTORE service.
39 */
40struct GNUNET_PEERSTORE_Handle
41{
42 /**
43 * Our configuration.
44 */
45 const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48 * Message queue
49 */
50 struct GNUNET_MQ_Handle *mq;
51
52 /**
53 * Head of active STORE requests.
54 */
55 struct GNUNET_PEERSTORE_StoreContext *store_head;
56
57 /**
58 * Tail of active STORE requests.
59 */
60 struct GNUNET_PEERSTORE_StoreContext *store_tail;
61
62 /**
63 * Head of active ITERATE requests.
64 */
65 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
66
67 /**
68 * Tail of active ITERATE requests.
69 */
70 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
71
72 /**
73 * Hashmap of watch requests
74 */
75 struct GNUNET_CONTAINER_MultiHashMap *watches;
76
77 /**
78 * ID of the task trying to reconnect to the service.
79 */
80 struct GNUNET_SCHEDULER_Task *reconnect_task;
81
82 /**
83 * Delay until we try to reconnect.
84 */
85 struct GNUNET_TIME_Relative reconnect_delay;
86
87 /**
88 * Are we in the process of disconnecting but need to sync first?
89 */
90 int disconnecting;
91};
92
93/**
94 * Context for a store request
95 */
96struct GNUNET_PEERSTORE_StoreContext
97{
98 /**
99 * Kept in a DLL.
100 */
101 struct GNUNET_PEERSTORE_StoreContext *next;
102
103 /**
104 * Kept in a DLL.
105 */
106 struct GNUNET_PEERSTORE_StoreContext *prev;
107
108 /**
109 * Handle to the PEERSTORE service.
110 */
111 struct GNUNET_PEERSTORE_Handle *h;
112
113 /**
114 * Continuation called with service response
115 */
116 GNUNET_PEERSTORE_Continuation cont;
117
118 /**
119 * Closure for @e cont
120 */
121 void *cont_cls;
122
123 /**
124 * Which subsystem does the store?
125 */
126 char *sub_system;
127
128 /**
129 * Key for the store operation.
130 */
131 char *key;
132
133 /**
134 * Contains @e size bytes.
135 */
136 void *value;
137
138 /**
139 * Peer the store is for.
140 */
141 struct GNUNET_PeerIdentity peer;
142
143 /**
144 * Number of bytes in @e value.
145 */
146 size_t size;
147
148 /**
149 * When does the value expire?
150 */
151 struct GNUNET_TIME_Absolute expiry;
152
153 /**
154 * Options for the store operation.
155 */
156 enum GNUNET_PEERSTORE_StoreOption options;
157};
158
159/**
160 * Context for a iterate request
161 */
162struct GNUNET_PEERSTORE_IterateContext
163{
164 /**
165 * Kept in a DLL.
166 */
167 struct GNUNET_PEERSTORE_IterateContext *next;
168
169 /**
170 * Kept in a DLL.
171 */
172 struct GNUNET_PEERSTORE_IterateContext *prev;
173
174 /**
175 * Handle to the PEERSTORE service.
176 */
177 struct GNUNET_PEERSTORE_Handle *h;
178
179 /**
180 * Which subsystem does the store?
181 */
182 char *sub_system;
183
184 /**
185 * Peer the store is for.
186 */
187 struct GNUNET_PeerIdentity peer;
188
189 /**
190 * Key for the store operation.
191 */
192 char *key;
193
194 /**
195 * Callback with each matching record
196 */
197 GNUNET_PEERSTORE_Processor callback;
198
199 /**
200 * Closure for @e callback
201 */
202 void *callback_cls;
203
204 /**
205 * #GNUNET_YES if we are currently processing records.
206 */
207 int iterating;
208};
209
210/**
211 * Context for a watch request
212 */
213struct GNUNET_PEERSTORE_WatchContext
214{
215 /**
216 * Kept in a DLL.
217 */
218 struct GNUNET_PEERSTORE_WatchContext *next;
219
220 /**
221 * Kept in a DLL.
222 */
223 struct GNUNET_PEERSTORE_WatchContext *prev;
224
225 /**
226 * Handle to the PEERSTORE service.
227 */
228 struct GNUNET_PEERSTORE_Handle *h;
229
230 /**
231 * Callback with each record received
232 */
233 GNUNET_PEERSTORE_Processor callback;
234
235 /**
236 * Closure for @e callback
237 */
238 void *callback_cls;
239
240 /**
241 * Hash of the combined key
242 */
243 struct GNUNET_HashCode keyhash;
244};
245
246/******************************************************************************/
247/******************* DECLARATIONS *********************/
248/******************************************************************************/
249
250/**
251 * Close the existing connection to PEERSTORE and reconnect.
252 *
253 * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
254 */
255static void
256reconnect (void *cls);
257
258
259/**
260 * Disconnect from the peerstore service.
261 *
262 * @param h peerstore handle to disconnect
263 */
264static void
265disconnect (struct GNUNET_PEERSTORE_Handle *h)
266{
267 struct GNUNET_PEERSTORE_IterateContext *next;
268
269 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
270 ic = next)
271 {
272 next = ic->next;
273 if (GNUNET_YES == ic->iterating)
274 {
275 GNUNET_PEERSTORE_Processor icb;
276 void *icb_cls;
277
278 icb = ic->callback;
279 icb_cls = ic->callback_cls;
280 GNUNET_PEERSTORE_iterate_cancel (ic);
281 if (NULL != icb)
282 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
283 }
284 }
285
286 if (NULL != h->mq)
287 {
288 GNUNET_MQ_destroy (h->mq);
289 h->mq = NULL;
290 }
291}
292
293
294/**
295 * Function that will schedule the job that will try
296 * to connect us again to the client.
297 *
298 * @param h peerstore to reconnect
299 */
300static void
301disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
302{
303 GNUNET_assert (NULL == h->reconnect_task);
304 disconnect (h);
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
307 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
308 h->reconnect_task =
309 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
310 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
311}
312
313
314/**
315 * Callback after MQ envelope is sent
316 *
317 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
318 */
319static void
320store_request_sent (void *cls)
321{
322 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
323 GNUNET_PEERSTORE_Continuation cont;
324 void *cont_cls;
325
326 if (NULL != sc)
327 {
328 cont = sc->cont;
329 cont_cls = sc->cont_cls;
330 GNUNET_PEERSTORE_store_cancel (sc);
331 if (NULL != cont)
332 cont (cont_cls, GNUNET_OK);
333 }
334}
335
336
337/******************************************************************************/
338/******************* CONNECTION FUNCTIONS *********************/
339/******************************************************************************/
340
341
342/**
343 * Function called when we had trouble talking to the service.
344 */
345static void
346handle_client_error (void *cls, enum GNUNET_MQ_Error error)
347{
348 struct GNUNET_PEERSTORE_Handle *h = cls;
349
350 LOG (GNUNET_ERROR_TYPE_ERROR,
351 "Received an error notification from MQ of type: %d\n",
352 error);
353 disconnect_and_schedule_reconnect (h);
354}
355
356
357/**
358 * Iterator over previous watches to resend them
359 *
360 * @param cls the `struct GNUNET_PEERSTORE_Handle`
361 * @param key key for the watch
362 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
363 * @return #GNUNET_YES (continue to iterate)
364 */
365static int
366rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
367{
368 struct GNUNET_PEERSTORE_Handle *h = cls;
369 struct GNUNET_PEERSTORE_WatchContext *wc = value;
370 struct StoreKeyHashMessage *hm;
371 struct GNUNET_MQ_Envelope *ev;
372
373 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
374 hm->keyhash = wc->keyhash;
375 GNUNET_MQ_send (h->mq, ev);
376 return GNUNET_YES;
377}
378
379
380/**
381 * Iterator over watch requests to cancel them.
382 *
383 * @param cls unused
384 * @param key key to the watch request
385 * @param value watch context
386 * @return #GNUNET_YES to continue iteration
387 */
388static int
389destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
390{
391 struct GNUNET_PEERSTORE_WatchContext *wc = value;
392
393 GNUNET_PEERSTORE_watch_cancel (wc);
394 return GNUNET_YES;
395}
396
397
398/**
399 * Kill the connection to the service. This can be delayed in case of pending
400 * STORE requests and the user explicitly asked to sync first. Otherwise it is
401 * performed instantly.
402 *
403 * @param h Handle to the service.
404 */
405static void
406final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
407{
408 if (NULL != h->mq)
409 {
410 GNUNET_MQ_destroy (h->mq);
411 h->mq = NULL;
412 }
413 GNUNET_free (h);
414}
415
416
417/**
418 * Connect to the PEERSTORE service.
419 *
420 * @param cfg configuration to use
421 * @return NULL on error
422 */
423struct GNUNET_PEERSTORE_Handle *
424GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
425{
426 struct GNUNET_PEERSTORE_Handle *h;
427
428 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
429 h->cfg = cfg;
430 h->disconnecting = GNUNET_NO;
431 reconnect (h);
432 if (NULL == h->mq)
433 {
434 GNUNET_free (h);
435 return NULL;
436 }
437 return h;
438}
439
440
441/**
442 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
443 * will be canceled.
444 * Any pending STORE requests will depend on @e snyc_first flag.
445 *
446 * @param h handle to disconnect
447 * @param sync_first send any pending STORE requests before disconnecting
448 */
449void
450GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
451{
452 struct GNUNET_PEERSTORE_IterateContext *ic;
453 struct GNUNET_PEERSTORE_StoreContext *sc;
454
455 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
456 if (NULL != h->watches)
457 {
458 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
459 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
460 h->watches = NULL;
461 }
462 while (NULL != (ic = h->iterate_head))
463 {
464 GNUNET_break (0);
465 GNUNET_PEERSTORE_iterate_cancel (ic);
466 }
467 if (NULL != h->store_head)
468 {
469 if (GNUNET_YES == sync_first)
470 {
471 LOG (GNUNET_ERROR_TYPE_DEBUG,
472 "Delaying disconnection due to pending store requests.\n");
473 h->disconnecting = GNUNET_YES;
474 return;
475 }
476 while (NULL != (sc = h->store_head))
477 GNUNET_PEERSTORE_store_cancel (sc);
478 }
479 final_disconnect (h);
480}
481
482
483/******************************************************************************/
484/******************* STORE FUNCTIONS *********************/
485/******************************************************************************/
486
487
488/**
489 * Cancel a store request
490 *
491 * @param sc Store request context
492 */
493void
494GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
495{
496 struct GNUNET_PEERSTORE_Handle *h = sc->h;
497
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "store cancel with sc %p \n",
500 sc);
501 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
502 GNUNET_free (sc->sub_system);
503 GNUNET_free (sc->value);
504 GNUNET_free (sc->key);
505 GNUNET_free (sc);
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "store cancel with sc %p is null\n",
508 sc);
509 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
510 final_disconnect (h);
511}
512
513
514/**
515 * Store a new entry in the PEERSTORE.
516 * Note that stored entries can be lost in some cases
517 * such as power failure.
518 *
519 * @param h Handle to the PEERSTORE service
520 * @param sub_system name of the sub system
521 * @param peer Peer Identity
522 * @param key entry key
523 * @param value entry value BLOB
524 * @param size size of @e value
525 * @param expiry absolute time after which the entry is (possibly) deleted
526 * @param options options specific to the storage operation
527 * @param cont Continuation function after the store request is sent
528 * @param cont_cls Closure for @a cont
529 */
530struct GNUNET_PEERSTORE_StoreContext *
531GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
532 const char *sub_system,
533 const struct GNUNET_PeerIdentity *peer,
534 const char *key,
535 const void *value,
536 size_t size,
537 struct GNUNET_TIME_Absolute expiry,
538 enum GNUNET_PEERSTORE_StoreOption options,
539 GNUNET_PEERSTORE_Continuation cont,
540 void *cont_cls)
541{
542 struct GNUNET_MQ_Envelope *ev;
543 struct GNUNET_PEERSTORE_StoreContext *sc;
544
545 LOG (GNUNET_ERROR_TYPE_DEBUG,
546 "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n",
547 size,
548 sub_system,
549 GNUNET_i2s (peer),
550 key);
551 ev =
552 PEERSTORE_create_record_mq_envelope (sub_system,
553 peer,
554 key,
555 value,
556 size,
557 expiry,
558 options,
559 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
560 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
561
562 sc->sub_system = GNUNET_strdup (sub_system);
563 sc->peer = *peer;
564 sc->key = GNUNET_strdup (key);
565 sc->value = GNUNET_memdup (value, size);
566 sc->size = size;
567 sc->expiry = expiry;
568 sc->options = options;
569 sc->cont = cont;
570 sc->cont_cls = cont_cls;
571 sc->h = h;
572
573 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
574 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
575 GNUNET_MQ_send (h->mq, ev);
576 return sc;
577}
578
579
580/******************************************************************************/
581/******************* ITERATE FUNCTIONS *********************/
582/******************************************************************************/
583
584
585/**
586 * When a response for iterate request is received
587 *
588 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
589 * @param msg message received
590 */
591static void
592handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
593{
594 struct GNUNET_PEERSTORE_Handle *h = cls;
595 struct GNUNET_PEERSTORE_IterateContext *ic;
596 GNUNET_PEERSTORE_Processor callback;
597 void *callback_cls;
598
599 ic = h->iterate_head;
600 if (NULL == ic)
601 {
602 LOG (GNUNET_ERROR_TYPE_ERROR,
603 _ ("Unexpected iteration response, this should not happen.\n"));
604 disconnect_and_schedule_reconnect (h);
605 return;
606 }
607 callback = ic->callback;
608 callback_cls = ic->callback_cls;
609 ic->iterating = GNUNET_NO;
610 GNUNET_PEERSTORE_iterate_cancel (ic);
611 /* NOTE: set this here and not after callback because callback may free h */
612 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
613 if (NULL != callback)
614 callback (callback_cls, NULL, NULL);
615}
616
617
618/**
619 * When a response for iterate request is received, check the
620 * message is well-formed.
621 *
622 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
623 * @param msg message received
624 */
625static int
626check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
627{
628 /* we defer validation to #handle_iterate_result */
629 return GNUNET_OK;
630}
631
632
633/**
634 * When a response for iterate request is received
635 *
636 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
637 * @param msg message received
638 */
639static void
640handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
641{
642 struct GNUNET_PEERSTORE_Handle *h = cls;
643 struct GNUNET_PEERSTORE_IterateContext *ic;
644 GNUNET_PEERSTORE_Processor callback;
645 void *callback_cls;
646 struct GNUNET_PEERSTORE_Record *record;
647
648 ic = h->iterate_head;
649 if (NULL == ic)
650 {
651 LOG (GNUNET_ERROR_TYPE_ERROR,
652 _ ("Unexpected iteration response, this should not happen.\n"));
653 disconnect_and_schedule_reconnect (h);
654 return;
655 }
656 ic->iterating = GNUNET_YES;
657 callback = ic->callback;
658 callback_cls = ic->callback_cls;
659 if (NULL == callback)
660 return;
661 record = PEERSTORE_parse_record_message (msg);
662 if (NULL == record)
663 {
664 callback (callback_cls,
665 NULL,
666 _ ("Received a malformed response from service."));
667 }
668 else
669 {
670 callback (callback_cls, record, NULL);
671 PEERSTORE_destroy_record (record);
672 }
673}
674
675
676/**
677 * Cancel an iterate request
678 * Please do not call after the iterate request is done
679 *
680 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
681 */
682void
683GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
684{
685 if (GNUNET_NO == ic->iterating)
686 {
687 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
688 GNUNET_free (ic->sub_system);
689 GNUNET_free (ic->key);
690 GNUNET_free (ic);
691 }
692 else
693 ic->callback = NULL;
694}
695
696
697struct GNUNET_PEERSTORE_IterateContext *
698GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
699 const char *sub_system,
700 const struct GNUNET_PeerIdentity *peer,
701 const char *key,
702 GNUNET_PEERSTORE_Processor callback,
703 void *callback_cls)
704{
705 struct GNUNET_MQ_Envelope *ev;
706 struct GNUNET_PEERSTORE_IterateContext *ic;
707
708 ev =
709 PEERSTORE_create_record_mq_envelope (sub_system,
710 peer,
711 key,
712 NULL,
713 0,
714 GNUNET_TIME_UNIT_FOREVER_ABS,
715 0,
716 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
717 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
718 ic->callback = callback;
719 ic->callback_cls = callback_cls;
720 ic->h = h;
721 ic->sub_system = GNUNET_strdup (sub_system);
722 if (NULL != peer)
723 ic->peer = *peer;
724 if (NULL != key)
725 ic->key = GNUNET_strdup (key);
726 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
727 LOG (GNUNET_ERROR_TYPE_DEBUG,
728 "Sending an iterate request for sub system `%s'\n",
729 sub_system);
730 GNUNET_MQ_send (h->mq, ev);
731 return ic;
732}
733
734
735/******************************************************************************/
736/******************* WATCH FUNCTIONS *********************/
737/******************************************************************************/
738
739/**
740 * When a watch record is received, validate it is well-formed.
741 *
742 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
743 * @param msg message received
744 */
745static int
746check_watch_record (void *cls, const struct StoreRecordMessage *msg)
747{
748 /* we defer validation to #handle_watch_result */
749 return GNUNET_OK;
750}
751
752
753/**
754 * When a watch record is received, process it.
755 *
756 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
757 * @param msg message received
758 */
759static void
760handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
761{
762 struct GNUNET_PEERSTORE_Handle *h = cls;
763 struct GNUNET_PEERSTORE_Record *record;
764 struct GNUNET_HashCode keyhash;
765 struct GNUNET_PEERSTORE_WatchContext *wc;
766
767 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
768 record = PEERSTORE_parse_record_message (msg);
769 if (NULL == record)
770 {
771 disconnect_and_schedule_reconnect (h);
772 return;
773 }
774 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
775 // FIXME: what if there are multiple watches for the same key?
776 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
777 if (NULL == wc)
778 {
779 LOG (GNUNET_ERROR_TYPE_ERROR,
780 _ ("Received a watch result for a non existing watch.\n"));
781 PEERSTORE_destroy_record (record);
782 disconnect_and_schedule_reconnect (h);
783 return;
784 }
785 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
786 if (NULL != wc->callback)
787 wc->callback (wc->callback_cls, record, NULL);
788 PEERSTORE_destroy_record (record);
789}
790
791
792/**
793 * Close the existing connection to PEERSTORE and reconnect.
794 *
795 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
796 */
797static void
798reconnect (void *cls)
799{
800 struct GNUNET_PEERSTORE_Handle *h = cls;
801 struct GNUNET_MQ_MessageHandler mq_handlers[] =
802 { GNUNET_MQ_hd_fixed_size (iterate_end,
803 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
804 struct GNUNET_MessageHeader,
805 h),
806 GNUNET_MQ_hd_var_size (iterate_result,
807 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
808 struct StoreRecordMessage,
809 h),
810 GNUNET_MQ_hd_var_size (watch_record,
811 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
812 struct StoreRecordMessage,
813 h),
814 GNUNET_MQ_handler_end () };
815 struct GNUNET_MQ_Envelope *ev;
816
817 h->reconnect_task = NULL;
818 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
819 h->mq = GNUNET_CLIENT_connect (h->cfg,
820 "peerstore",
821 mq_handlers,
822 &handle_client_error,
823 h);
824 if (NULL == h->mq)
825 {
826 h->reconnect_task =
827 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
828 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
829 return;
830 }
831 LOG (GNUNET_ERROR_TYPE_DEBUG,
832 "Resending pending requests after reconnect.\n");
833 if (NULL != h->watches)
834 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
835 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
836 ic = ic->next)
837 {
838 ev =
839 PEERSTORE_create_record_mq_envelope (ic->sub_system,
840 &ic->peer,
841 ic->key,
842 NULL,
843 0,
844 GNUNET_TIME_UNIT_FOREVER_ABS,
845 0,
846 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
847 GNUNET_MQ_send (h->mq, ev);
848 }
849 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
850 sc = sc->next)
851 {
852 ev =
853 PEERSTORE_create_record_mq_envelope (sc->sub_system,
854 &sc->peer,
855 sc->key,
856 sc->value,
857 sc->size,
858 sc->expiry,
859 sc->options,
860 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
861 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
862 GNUNET_MQ_send (h->mq, ev);
863 }
864}
865
866
867/**
868 * Cancel a watch request
869 *
870 * @param wc handle to the watch request
871 */
872void
873GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
874{
875 struct GNUNET_PEERSTORE_Handle *h = wc->h;
876 struct GNUNET_MQ_Envelope *ev;
877 struct StoreKeyHashMessage *hm;
878
879 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
880 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
881 hm->keyhash = wc->keyhash;
882 GNUNET_MQ_send (h->mq, ev);
883 GNUNET_assert (
884 GNUNET_YES ==
885 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
886 GNUNET_free (wc);
887}
888
889
890/**
891 * Request watching a given key
892 * User will be notified with any new values added to key
893 *
894 * @param h handle to the PEERSTORE service
895 * @param sub_system name of sub system
896 * @param peer Peer identity
897 * @param key entry key string
898 * @param callback function called with each new value
899 * @param callback_cls closure for @a callback
900 * @return Handle to watch request
901 */
902struct GNUNET_PEERSTORE_WatchContext *
903GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
904 const char *sub_system,
905 const struct GNUNET_PeerIdentity *peer,
906 const char *key,
907 GNUNET_PEERSTORE_Processor callback,
908 void *callback_cls)
909{
910 struct GNUNET_MQ_Envelope *ev;
911 struct StoreKeyHashMessage *hm;
912 struct GNUNET_PEERSTORE_WatchContext *wc;
913
914 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
915 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
916 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
917 wc->callback = callback;
918 wc->callback_cls = callback_cls;
919 wc->h = h;
920 wc->keyhash = hm->keyhash;
921 if (NULL == h->watches)
922 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
923 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
924 h->watches,
925 &wc->keyhash,
926 wc,
927 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
928 LOG (GNUNET_ERROR_TYPE_DEBUG,
929 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
930 sub_system,
931 GNUNET_i2s (peer),
932 key);
933 GNUNET_MQ_send (h->mq, ev);
934 return wc;
935}
936
937
938/* end of peerstore_api.c */