aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c948
1 files changed, 0 insertions, 948 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
deleted file mode 100644
index d0c72acf1..000000000
--- a/src/peerstore/peerstore_api.c
+++ /dev/null
@@ -1,948 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013-2016, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file peerstore/peerstore_api.c
22 * @brief API for peerstore
23 * @author Omar Tarabai
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "peerstore.h"
29#include "peerstore_common.h"
30
31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32
33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/
35/******************************************************************************/
36
37/**
38 * Handle to the PEERSTORE service.
39 */
40struct GNUNET_PEERSTORE_Handle
41{
42 /**
43 * Our configuration.
44 */
45 const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48 * Message queue
49 */
50 struct GNUNET_MQ_Handle *mq;
51
52 /**
53 * Head of active STORE requests.
54 */
55 struct GNUNET_PEERSTORE_StoreContext *store_head;
56
57 /**
58 * Tail of active STORE requests.
59 */
60 struct GNUNET_PEERSTORE_StoreContext *store_tail;
61
62 /**
63 * Head of active ITERATE requests.
64 */
65 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
66
67 /**
68 * Tail of active ITERATE requests.
69 */
70 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
71
72 /**
73 * Hashmap of watch requests
74 */
75 struct GNUNET_CONTAINER_MultiHashMap *watches;
76
77 /**
78 * ID of the task trying to reconnect to the service.
79 */
80 struct GNUNET_SCHEDULER_Task *reconnect_task;
81
82 /**
83 * Delay until we try to reconnect.
84 */
85 struct GNUNET_TIME_Relative reconnect_delay;
86
87 /**
88 * Are we in the process of disconnecting but need to sync first?
89 */
90 int disconnecting;
91};
92
93/**
94 * Context for a store request
95 */
96struct GNUNET_PEERSTORE_StoreContext
97{
98 /**
99 * Kept in a DLL.
100 */
101 struct GNUNET_PEERSTORE_StoreContext *next;
102
103 /**
104 * Kept in a DLL.
105 */
106 struct GNUNET_PEERSTORE_StoreContext *prev;
107
108 /**
109 * Handle to the PEERSTORE service.
110 */
111 struct GNUNET_PEERSTORE_Handle *h;
112
113 /**
114 * Continuation called with service response
115 */
116 GNUNET_PEERSTORE_Continuation cont;
117
118 /**
119 * Closure for @e cont
120 */
121 void *cont_cls;
122
123 /**
124 * Which subsystem does the store?
125 */
126 char *sub_system;
127
128 /**
129 * Key for the store operation.
130 */
131 char *key;
132
133 /**
134 * Contains @e size bytes.
135 */
136 void *value;
137
138 /**
139 * Peer the store is for.
140 */
141 struct GNUNET_PeerIdentity peer;
142
143 /**
144 * Number of bytes in @e value.
145 */
146 size_t size;
147
148 /**
149 * When does the value expire?
150 */
151 struct GNUNET_TIME_Absolute expiry;
152
153 /**
154 * Options for the store operation.
155 */
156 enum GNUNET_PEERSTORE_StoreOption options;
157};
158
159/**
160 * Context for a iterate request
161 */
162struct GNUNET_PEERSTORE_IterateContext
163{
164 /**
165 * Kept in a DLL.
166 */
167 struct GNUNET_PEERSTORE_IterateContext *next;
168
169 /**
170 * Kept in a DLL.
171 */
172 struct GNUNET_PEERSTORE_IterateContext *prev;
173
174 /**
175 * Handle to the PEERSTORE service.
176 */
177 struct GNUNET_PEERSTORE_Handle *h;
178
179 /**
180 * Which subsystem does the store?
181 */
182 char *sub_system;
183
184 /**
185 * Peer the store is for.
186 */
187 struct GNUNET_PeerIdentity peer;
188
189 /**
190 * Key for the store operation.
191 */
192 char *key;
193
194 /**
195 * Callback with each matching record
196 */
197 GNUNET_PEERSTORE_Processor callback;
198
199 /**
200 * Closure for @e callback
201 */
202 void *callback_cls;
203
204 /**
205 * #GNUNET_YES if we are currently processing records.
206 */
207 int iterating;
208};
209
210/**
211 * Context for a watch request
212 */
213struct GNUNET_PEERSTORE_WatchContext
214{
215 /**
216 * Kept in a DLL.
217 */
218 struct GNUNET_PEERSTORE_WatchContext *next;
219
220 /**
221 * Kept in a DLL.
222 */
223 struct GNUNET_PEERSTORE_WatchContext *prev;
224
225 /**
226 * Handle to the PEERSTORE service.
227 */
228 struct GNUNET_PEERSTORE_Handle *h;
229
230 /**
231 * Callback with each record received
232 */
233 GNUNET_PEERSTORE_Processor callback;
234
235 /**
236 * Closure for @e callback
237 */
238 void *callback_cls;
239
240 /**
241 * Hash of the combined key
242 */
243 struct GNUNET_HashCode keyhash;
244};
245
246/******************************************************************************/
247/******************* DECLARATIONS *********************/
248/******************************************************************************/
249
250/**
251 * Close the existing connection to PEERSTORE and reconnect.
252 *
253 * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
254 */
255static void
256reconnect (void *cls);
257
258
259/**
260 * Disconnect from the peerstore service.
261 *
262 * @param h peerstore handle to disconnect
263 */
264static void
265disconnect (struct GNUNET_PEERSTORE_Handle *h)
266{
267 struct GNUNET_PEERSTORE_IterateContext *next;
268
269 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
270 ic = next)
271 {
272 next = ic->next;
273 if (GNUNET_YES == ic->iterating)
274 {
275 GNUNET_PEERSTORE_Processor icb;
276 void *icb_cls;
277
278 icb = ic->callback;
279 icb_cls = ic->callback_cls;
280 GNUNET_PEERSTORE_iterate_cancel (ic);
281 if (NULL != icb)
282 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
283 }
284 }
285
286 if (NULL != h->mq)
287 {
288 GNUNET_MQ_destroy (h->mq);
289 h->mq = NULL;
290 }
291}
292
293
294/**
295 * Function that will schedule the job that will try
296 * to connect us again to the client.
297 *
298 * @param h peerstore to reconnect
299 */
300static void
301disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
302{
303 GNUNET_assert (NULL == h->reconnect_task);
304 disconnect (h);
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
307 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
308 h->reconnect_task =
309 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
310 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
311}
312
313
314/**
315 * Callback after MQ envelope is sent
316 *
317 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
318 */
319static void
320store_request_sent (void *cls)
321{
322 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
323 GNUNET_PEERSTORE_Continuation cont;
324 void *cont_cls;
325
326 if (NULL != sc)
327 {
328 cont = sc->cont;
329 cont_cls = sc->cont_cls;
330 GNUNET_PEERSTORE_store_cancel (sc);
331 if (NULL != cont)
332 cont (cont_cls, GNUNET_OK);
333 }
334}
335
336
337/******************************************************************************/
338/******************* CONNECTION FUNCTIONS *********************/
339/******************************************************************************/
340
341
342/**
343 * Function called when we had trouble talking to the service.
344 */
345static void
346handle_client_error (void *cls, enum GNUNET_MQ_Error error)
347{
348 struct GNUNET_PEERSTORE_Handle *h = cls;
349
350 LOG (GNUNET_ERROR_TYPE_ERROR,
351 "Received an error notification from MQ of type: %d\n",
352 error);
353 disconnect_and_schedule_reconnect (h);
354}
355
356
357/**
358 * Iterator over previous watches to resend them
359 *
360 * @param cls the `struct GNUNET_PEERSTORE_Handle`
361 * @param key key for the watch
362 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
363 * @return #GNUNET_YES (continue to iterate)
364 */
365static int
366rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
367{
368 struct GNUNET_PEERSTORE_Handle *h = cls;
369 struct GNUNET_PEERSTORE_WatchContext *wc = value;
370 struct StoreKeyHashMessage *hm;
371 struct GNUNET_MQ_Envelope *ev;
372
373 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
374 hm->keyhash = wc->keyhash;
375 GNUNET_MQ_send (h->mq, ev);
376 return GNUNET_YES;
377}
378
379
380/**
381 * Iterator over watch requests to cancel them.
382 *
383 * @param cls unused
384 * @param key key to the watch request
385 * @param value watch context
386 * @return #GNUNET_YES to continue iteration
387 */
388static int
389destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
390{
391 struct GNUNET_PEERSTORE_WatchContext *wc = value;
392
393 GNUNET_PEERSTORE_watch_cancel (wc);
394 return GNUNET_YES;
395}
396
397
398/**
399 * Kill the connection to the service. This can be delayed in case of pending
400 * STORE requests and the user explicitly asked to sync first. Otherwise it is
401 * performed instantly.
402 *
403 * @param h Handle to the service.
404 */
405static void
406final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
407{
408 if (NULL != h->mq)
409 {
410 GNUNET_MQ_destroy (h->mq);
411 h->mq = NULL;
412 }
413 GNUNET_free (h);
414}
415
416
417/**
418 * Connect to the PEERSTORE service.
419 *
420 * @param cfg configuration to use
421 * @return NULL on error
422 */
423struct GNUNET_PEERSTORE_Handle *
424GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
425{
426 struct GNUNET_PEERSTORE_Handle *h;
427
428 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
429 h->cfg = cfg;
430 h->disconnecting = GNUNET_NO;
431 reconnect (h);
432 if (NULL == h->mq)
433 {
434 GNUNET_free (h);
435 return NULL;
436 }
437 return h;
438}
439
440
441/**
442 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
443 * will be canceled.
444 * Any pending STORE requests will depend on @e snyc_first flag.
445 *
446 * @param h handle to disconnect
447 * @param sync_first send any pending STORE requests before disconnecting
448 */
449void
450GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
451{
452 struct GNUNET_PEERSTORE_IterateContext *ic;
453 struct GNUNET_PEERSTORE_StoreContext *sc;
454
455 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
456 if (NULL != h->watches)
457 {
458 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
459 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
460 h->watches = NULL;
461 }
462 while (NULL != (ic = h->iterate_head))
463 {
464 GNUNET_break (0);
465 GNUNET_PEERSTORE_iterate_cancel (ic);
466 }
467 if (NULL != h->store_head)
468 {
469 if (GNUNET_YES == sync_first)
470 {
471 LOG (GNUNET_ERROR_TYPE_DEBUG,
472 "Delaying disconnection due to pending store requests.\n");
473 h->disconnecting = GNUNET_YES;
474 return;
475 }
476 while (NULL != (sc = h->store_head))
477 GNUNET_PEERSTORE_store_cancel (sc);
478 }
479 final_disconnect (h);
480}
481
482
483/******************************************************************************/
484/******************* STORE FUNCTIONS *********************/
485/******************************************************************************/
486
487
488/**
489 * Cancel a store request
490 *
491 * @param sc Store request context
492 */
493void
494GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
495{
496 struct GNUNET_PEERSTORE_Handle *h = sc->h;
497
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "store cancel with sc %p \n",
500 sc);
501 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
502 GNUNET_free (sc->sub_system);
503 GNUNET_free (sc->value);
504 GNUNET_free (sc->key);
505 GNUNET_free (sc);
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "store cancel with sc %p is null\n",
508 sc);
509 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
510 final_disconnect (h);
511}
512
513
514/**
515 * Store a new entry in the PEERSTORE.
516 * Note that stored entries can be lost in some cases
517 * such as power failure.
518 *
519 * @param h Handle to the PEERSTORE service
520 * @param sub_system name of the sub system
521 * @param peer Peer Identity
522 * @param key entry key
523 * @param value entry value BLOB
524 * @param size size of @e value
525 * @param expiry absolute time after which the entry is (possibly) deleted
526 * @param options options specific to the storage operation
527 * @param cont Continuation function after the store request is sent
528 * @param cont_cls Closure for @a cont
529 */
530struct GNUNET_PEERSTORE_StoreContext *
531GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
532 const char *sub_system,
533 const struct GNUNET_PeerIdentity *peer,
534 const char *key,
535 const void *value,
536 size_t size,
537 struct GNUNET_TIME_Absolute expiry,
538 enum GNUNET_PEERSTORE_StoreOption options,
539 GNUNET_PEERSTORE_Continuation cont,
540 void *cont_cls)
541{
542 struct GNUNET_MQ_Envelope *ev;
543 struct GNUNET_PEERSTORE_StoreContext *sc;
544
545 LOG (GNUNET_ERROR_TYPE_DEBUG,
546 "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n",
547 size,
548 sub_system,
549 GNUNET_i2s (peer),
550 key);
551 ev =
552 PEERSTORE_create_record_mq_envelope (sub_system,
553 peer,
554 key,
555 value,
556 size,
557 expiry,
558 options,
559 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
560 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
561
562 sc->sub_system = GNUNET_strdup (sub_system);
563 sc->peer = *peer;
564 sc->key = GNUNET_strdup (key);
565 sc->value = GNUNET_memdup (value, size);
566 sc->size = size;
567 sc->expiry = expiry;
568 sc->options = options;
569 sc->cont = cont;
570 sc->cont_cls = cont_cls;
571 sc->h = h;
572
573 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
574 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
575 GNUNET_MQ_send (h->mq, ev);
576 return sc;
577}
578
579
580/******************************************************************************/
581/******************* ITERATE FUNCTIONS *********************/
582/******************************************************************************/
583
584
585/**
586 * When a response for iterate request is received
587 *
588 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
589 * @param msg message received
590 */
591static void
592handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
593{
594 struct GNUNET_PEERSTORE_Handle *h = cls;
595 struct GNUNET_PEERSTORE_IterateContext *ic;
596 GNUNET_PEERSTORE_Processor callback;
597 void *callback_cls;
598
599 ic = h->iterate_head;
600 if (NULL == ic)
601 {
602 LOG (GNUNET_ERROR_TYPE_ERROR,
603 _ ("Unexpected iteration response, this should not happen.\n"));
604 disconnect_and_schedule_reconnect (h);
605 return;
606 }
607 callback = ic->callback;
608 callback_cls = ic->callback_cls;
609 ic->iterating = GNUNET_NO;
610 GNUNET_PEERSTORE_iterate_cancel (ic);
611 if (NULL != callback)
612 callback (callback_cls, NULL, NULL);
613 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
614}
615
616
617/**
618 * When a response for iterate request is received, check the
619 * message is well-formed.
620 *
621 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
622 * @param msg message received
623 */
624static int
625check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
626{
627 /* we defer validation to #handle_iterate_result */
628 return GNUNET_OK;
629}
630
631
632/**
633 * When a response for iterate request is received
634 *
635 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
636 * @param msg message received
637 */
638static void
639handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
640{
641 struct GNUNET_PEERSTORE_Handle *h = cls;
642 struct GNUNET_PEERSTORE_IterateContext *ic;
643 GNUNET_PEERSTORE_Processor callback;
644 void *callback_cls;
645 struct GNUNET_PEERSTORE_Record *record;
646
647 ic = h->iterate_head;
648 if (NULL == ic)
649 {
650 LOG (GNUNET_ERROR_TYPE_ERROR,
651 _ ("Unexpected iteration response, this should not happen.\n"));
652 disconnect_and_schedule_reconnect (h);
653 return;
654 }
655 ic->iterating = GNUNET_YES;
656 callback = ic->callback;
657 callback_cls = ic->callback_cls;
658 if (NULL == callback)
659 return;
660 record = PEERSTORE_parse_record_message (msg);
661 if (NULL == record)
662 {
663 callback (callback_cls,
664 NULL,
665 _ ("Received a malformed response from service."));
666 }
667 else
668 {
669 callback (callback_cls, record, NULL);
670 PEERSTORE_destroy_record (record);
671 }
672}
673
674
675/**
676 * Cancel an iterate request
677 * Please do not call after the iterate request is done
678 *
679 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
680 */
681void
682GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
683{
684 if (GNUNET_NO == ic->iterating)
685 {
686 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
687 GNUNET_free (ic->sub_system);
688 GNUNET_free (ic->key);
689 GNUNET_free (ic);
690 }
691 else
692 ic->callback = NULL;
693}
694
695
696/**
697 * Iterate over records matching supplied key information
698 *
699 * @param h handle to the PEERSTORE service
700 * @param sub_system name of sub system
701 * @param peer Peer identity (can be NULL)
702 * @param key entry key string (can be NULL)
703 * @param callback function called with each matching record, all NULL's on end
704 * @param callback_cls closure for @a callback
705 * @return Handle to iteration request
706 */
707struct GNUNET_PEERSTORE_IterateContext *
708GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
709 const char *sub_system,
710 const struct GNUNET_PeerIdentity *peer,
711 const char *key,
712 GNUNET_PEERSTORE_Processor callback,
713 void *callback_cls)
714{
715 struct GNUNET_MQ_Envelope *ev;
716 struct GNUNET_PEERSTORE_IterateContext *ic;
717
718 ev =
719 PEERSTORE_create_record_mq_envelope (sub_system,
720 peer,
721 key,
722 NULL,
723 0,
724 GNUNET_TIME_UNIT_FOREVER_ABS,
725 0,
726 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
727 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
728 ic->callback = callback;
729 ic->callback_cls = callback_cls;
730 ic->h = h;
731 ic->sub_system = GNUNET_strdup (sub_system);
732 if (NULL != peer)
733 ic->peer = *peer;
734 if (NULL != key)
735 ic->key = GNUNET_strdup (key);
736 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
737 LOG (GNUNET_ERROR_TYPE_DEBUG,
738 "Sending an iterate request for sub system `%s'\n",
739 sub_system);
740 GNUNET_MQ_send (h->mq, ev);
741 return ic;
742}
743
744
745/******************************************************************************/
746/******************* WATCH FUNCTIONS *********************/
747/******************************************************************************/
748
749/**
750 * When a watch record is received, validate it is well-formed.
751 *
752 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
753 * @param msg message received
754 */
755static int
756check_watch_record (void *cls, const struct StoreRecordMessage *msg)
757{
758 /* we defer validation to #handle_watch_result */
759 return GNUNET_OK;
760}
761
762
763/**
764 * When a watch record is received, process it.
765 *
766 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
767 * @param msg message received
768 */
769static void
770handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
771{
772 struct GNUNET_PEERSTORE_Handle *h = cls;
773 struct GNUNET_PEERSTORE_Record *record;
774 struct GNUNET_HashCode keyhash;
775 struct GNUNET_PEERSTORE_WatchContext *wc;
776
777 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
778 record = PEERSTORE_parse_record_message (msg);
779 if (NULL == record)
780 {
781 disconnect_and_schedule_reconnect (h);
782 return;
783 }
784 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
785 // FIXME: what if there are multiple watches for the same key?
786 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
787 if (NULL == wc)
788 {
789 LOG (GNUNET_ERROR_TYPE_ERROR,
790 _ ("Received a watch result for a non existing watch.\n"));
791 PEERSTORE_destroy_record (record);
792 disconnect_and_schedule_reconnect (h);
793 return;
794 }
795 if (NULL != wc->callback)
796 wc->callback (wc->callback_cls, record, NULL);
797 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
798 PEERSTORE_destroy_record (record);
799}
800
801
802/**
803 * Close the existing connection to PEERSTORE and reconnect.
804 *
805 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
806 */
807static void
808reconnect (void *cls)
809{
810 struct GNUNET_PEERSTORE_Handle *h = cls;
811 struct GNUNET_MQ_MessageHandler mq_handlers[] =
812 { GNUNET_MQ_hd_fixed_size (iterate_end,
813 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
814 struct GNUNET_MessageHeader,
815 h),
816 GNUNET_MQ_hd_var_size (iterate_result,
817 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
818 struct StoreRecordMessage,
819 h),
820 GNUNET_MQ_hd_var_size (watch_record,
821 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
822 struct StoreRecordMessage,
823 h),
824 GNUNET_MQ_handler_end () };
825 struct GNUNET_MQ_Envelope *ev;
826
827 h->reconnect_task = NULL;
828 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
829 h->mq = GNUNET_CLIENT_connect (h->cfg,
830 "peerstore",
831 mq_handlers,
832 &handle_client_error,
833 h);
834 if (NULL == h->mq)
835 {
836 h->reconnect_task =
837 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
838 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
839 return;
840 }
841 LOG (GNUNET_ERROR_TYPE_DEBUG,
842 "Resending pending requests after reconnect.\n");
843 if (NULL != h->watches)
844 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
845 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
846 ic = ic->next)
847 {
848 ev =
849 PEERSTORE_create_record_mq_envelope (ic->sub_system,
850 &ic->peer,
851 ic->key,
852 NULL,
853 0,
854 GNUNET_TIME_UNIT_FOREVER_ABS,
855 0,
856 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
857 GNUNET_MQ_send (h->mq, ev);
858 }
859 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
860 sc = sc->next)
861 {
862 ev =
863 PEERSTORE_create_record_mq_envelope (sc->sub_system,
864 &sc->peer,
865 sc->key,
866 sc->value,
867 sc->size,
868 sc->expiry,
869 sc->options,
870 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
871 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
872 GNUNET_MQ_send (h->mq, ev);
873 }
874}
875
876
877/**
878 * Cancel a watch request
879 *
880 * @param wc handle to the watch request
881 */
882void
883GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
884{
885 struct GNUNET_PEERSTORE_Handle *h = wc->h;
886 struct GNUNET_MQ_Envelope *ev;
887 struct StoreKeyHashMessage *hm;
888
889 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
890 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
891 hm->keyhash = wc->keyhash;
892 GNUNET_MQ_send (h->mq, ev);
893 GNUNET_assert (
894 GNUNET_YES ==
895 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
896 GNUNET_free (wc);
897}
898
899
900/**
901 * Request watching a given key
902 * User will be notified with any new values added to key
903 *
904 * @param h handle to the PEERSTORE service
905 * @param sub_system name of sub system
906 * @param peer Peer identity
907 * @param key entry key string
908 * @param callback function called with each new value
909 * @param callback_cls closure for @a callback
910 * @return Handle to watch request
911 */
912struct GNUNET_PEERSTORE_WatchContext *
913GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
914 const char *sub_system,
915 const struct GNUNET_PeerIdentity *peer,
916 const char *key,
917 GNUNET_PEERSTORE_Processor callback,
918 void *callback_cls)
919{
920 struct GNUNET_MQ_Envelope *ev;
921 struct StoreKeyHashMessage *hm;
922 struct GNUNET_PEERSTORE_WatchContext *wc;
923
924 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
925 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
926 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
927 wc->callback = callback;
928 wc->callback_cls = callback_cls;
929 wc->h = h;
930 wc->keyhash = hm->keyhash;
931 if (NULL == h->watches)
932 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
933 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
934 h->watches,
935 &wc->keyhash,
936 wc,
937 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
938 LOG (GNUNET_ERROR_TYPE_DEBUG,
939 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
940 sub_system,
941 GNUNET_i2s (peer),
942 key);
943 GNUNET_MQ_send (h->mq, ev);
944 return wc;
945}
946
947
948/* end of peerstore_api.c */