aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c939
1 files changed, 0 insertions, 939 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
deleted file mode 100644
index 502b38646..000000000
--- a/src/peerstore/peerstore_api.c
+++ /dev/null
@@ -1,939 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013-2016, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file peerstore/peerstore_api.c
22 * @brief API for peerstore
23 * @author Omar Tarabai
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "peerstore.h"
29#include "peerstore_common.h"
30
31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32
33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/
35/******************************************************************************/
36
37/**
38 * Handle to the PEERSTORE service.
39 */
40struct GNUNET_PEERSTORE_Handle
41{
42 /**
43 * Our configuration.
44 */
45 const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48 * Message queue
49 */
50 struct GNUNET_MQ_Handle *mq;
51
52 /**
53 * Head of active STORE requests.
54 */
55 struct GNUNET_PEERSTORE_StoreContext *store_head;
56
57 /**
58 * Tail of active STORE requests.
59 */
60 struct GNUNET_PEERSTORE_StoreContext *store_tail;
61
62 /**
63 * Head of active ITERATE requests.
64 */
65 struct GNUNET_PEERSTORE_IterateContext *iterate_head;
66
67 /**
68 * Tail of active ITERATE requests.
69 */
70 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
71
72 /**
73 * Hashmap of watch requests
74 */
75 struct GNUNET_CONTAINER_MultiHashMap *watches;
76
77 /**
78 * ID of the task trying to reconnect to the service.
79 */
80 struct GNUNET_SCHEDULER_Task *reconnect_task;
81
82 /**
83 * Delay until we try to reconnect.
84 */
85 struct GNUNET_TIME_Relative reconnect_delay;
86
87 /**
88 * Are we in the process of disconnecting but need to sync first?
89 */
90 int disconnecting;
91};
92
93/**
94 * Context for a store request
95 */
96struct GNUNET_PEERSTORE_StoreContext
97{
98 /**
99 * Kept in a DLL.
100 */
101 struct GNUNET_PEERSTORE_StoreContext *next;
102
103 /**
104 * Kept in a DLL.
105 */
106 struct GNUNET_PEERSTORE_StoreContext *prev;
107
108 /**
109 * Handle to the PEERSTORE service.
110 */
111 struct GNUNET_PEERSTORE_Handle *h;
112
113 /**
114 * Continuation called with service response
115 */
116 GNUNET_PEERSTORE_Continuation cont;
117
118 /**
119 * Closure for @e cont
120 */
121 void *cont_cls;
122
123 /**
124 * Which subsystem does the store?
125 */
126 char *sub_system;
127
128 /**
129 * Key for the store operation.
130 */
131 char *key;
132
133 /**
134 * Contains @e size bytes.
135 */
136 void *value;
137
138 /**
139 * Peer the store is for.
140 */
141 struct GNUNET_PeerIdentity peer;
142
143 /**
144 * Number of bytes in @e value.
145 */
146 size_t size;
147
148 /**
149 * When does the value expire?
150 */
151 struct GNUNET_TIME_Absolute expiry;
152
153 /**
154 * Options for the store operation.
155 */
156 enum GNUNET_PEERSTORE_StoreOption options;
157};
158
159/**
160 * Context for a iterate request
161 */
162struct GNUNET_PEERSTORE_IterateContext
163{
164 /**
165 * Kept in a DLL.
166 */
167 struct GNUNET_PEERSTORE_IterateContext *next;
168
169 /**
170 * Kept in a DLL.
171 */
172 struct GNUNET_PEERSTORE_IterateContext *prev;
173
174 /**
175 * Handle to the PEERSTORE service.
176 */
177 struct GNUNET_PEERSTORE_Handle *h;
178
179 /**
180 * Which subsystem does the store?
181 */
182 char *sub_system;
183
184 /**
185 * Peer the store is for.
186 */
187 struct GNUNET_PeerIdentity peer;
188
189 /**
190 * Key for the store operation.
191 */
192 char *key;
193
194 /**
195 * Callback with each matching record
196 */
197 GNUNET_PEERSTORE_Processor callback;
198
199 /**
200 * Closure for @e callback
201 */
202 void *callback_cls;
203
204 /**
205 * #GNUNET_YES if we are currently processing records.
206 */
207 int iterating;
208};
209
210/**
211 * Context for a watch request
212 */
213struct GNUNET_PEERSTORE_WatchContext
214{
215 /**
216 * Kept in a DLL.
217 */
218 struct GNUNET_PEERSTORE_WatchContext *next;
219
220 /**
221 * Kept in a DLL.
222 */
223 struct GNUNET_PEERSTORE_WatchContext *prev;
224
225 /**
226 * Handle to the PEERSTORE service.
227 */
228 struct GNUNET_PEERSTORE_Handle *h;
229
230 /**
231 * Callback with each record received
232 */
233 GNUNET_PEERSTORE_Processor callback;
234
235 /**
236 * Closure for @e callback
237 */
238 void *callback_cls;
239
240 /**
241 * Hash of the combined key
242 */
243 struct GNUNET_HashCode keyhash;
244};
245
246/******************************************************************************/
247/******************* DECLARATIONS *********************/
248/******************************************************************************/
249
250/**
251 * Close the existing connection to PEERSTORE and reconnect.
252 *
253 * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
254 */
255static void
256reconnect (void *cls);
257
258
259/**
260 * Disconnect from the peerstore service.
261 *
262 * @param h peerstore handle to disconnect
263 */
264static void
265disconnect (struct GNUNET_PEERSTORE_Handle *h)
266{
267 struct GNUNET_PEERSTORE_IterateContext *next;
268
269 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
270 ic = next)
271 {
272 next = ic->next;
273 if (GNUNET_YES == ic->iterating)
274 {
275 GNUNET_PEERSTORE_Processor icb;
276 void *icb_cls;
277
278 icb = ic->callback;
279 icb_cls = ic->callback_cls;
280 GNUNET_PEERSTORE_iterate_cancel (ic);
281 if (NULL != icb)
282 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
283 }
284 }
285
286 if (NULL != h->mq)
287 {
288 GNUNET_MQ_destroy (h->mq);
289 h->mq = NULL;
290 }
291}
292
293
294/**
295 * Function that will schedule the job that will try
296 * to connect us again to the client.
297 *
298 * @param h peerstore to reconnect
299 */
300static void
301disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
302{
303 GNUNET_assert (NULL == h->reconnect_task);
304 disconnect (h);
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
307 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
308 h->reconnect_task =
309 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
310 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
311}
312
313
314/**
315 * Callback after MQ envelope is sent
316 *
317 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
318 */
319static void
320store_request_sent (void *cls)
321{
322 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
323 GNUNET_PEERSTORE_Continuation cont;
324 void *cont_cls;
325
326 cont = sc->cont;
327 cont_cls = sc->cont_cls;
328 GNUNET_PEERSTORE_store_cancel (sc);
329 if (NULL != cont)
330 cont (cont_cls, GNUNET_OK);
331}
332
333
334/******************************************************************************/
335/******************* CONNECTION FUNCTIONS *********************/
336/******************************************************************************/
337
338
339/**
340 * Function called when we had trouble talking to the service.
341 */
342static void
343handle_client_error (void *cls, enum GNUNET_MQ_Error error)
344{
345 struct GNUNET_PEERSTORE_Handle *h = cls;
346
347 LOG (GNUNET_ERROR_TYPE_ERROR,
348 "Received an error notification from MQ of type: %d\n",
349 error);
350 disconnect_and_schedule_reconnect (h);
351}
352
353
354/**
355 * Iterator over previous watches to resend them
356 *
357 * @param cls the `struct GNUNET_PEERSTORE_Handle`
358 * @param key key for the watch
359 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
360 * @return #GNUNET_YES (continue to iterate)
361 */
362static int
363rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
364{
365 struct GNUNET_PEERSTORE_Handle *h = cls;
366 struct GNUNET_PEERSTORE_WatchContext *wc = value;
367 struct StoreKeyHashMessage *hm;
368 struct GNUNET_MQ_Envelope *ev;
369
370 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
371 hm->keyhash = wc->keyhash;
372 GNUNET_MQ_send (h->mq, ev);
373 return GNUNET_YES;
374}
375
376
377/**
378 * Iterator over watch requests to cancel them.
379 *
380 * @param cls unused
381 * @param key key to the watch request
382 * @param value watch context
383 * @return #GNUNET_YES to continue iteration
384 */
385static int
386destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
387{
388 struct GNUNET_PEERSTORE_WatchContext *wc = value;
389
390 GNUNET_PEERSTORE_watch_cancel (wc);
391 return GNUNET_YES;
392}
393
394
395/**
396 * Kill the connection to the service. This can be delayed in case of pending
397 * STORE requests and the user explicitly asked to sync first. Otherwise it is
398 * performed instantly.
399 *
400 * @param h Handle to the service.
401 */
402static void
403final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
404{
405 if (NULL != h->mq)
406 {
407 GNUNET_MQ_destroy (h->mq);
408 h->mq = NULL;
409 }
410 GNUNET_free (h);
411}
412
413
414/**
415 * Connect to the PEERSTORE service.
416 *
417 * @param cfg configuration to use
418 * @return NULL on error
419 */
420struct GNUNET_PEERSTORE_Handle *
421GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
422{
423 struct GNUNET_PEERSTORE_Handle *h;
424
425 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
426 h->cfg = cfg;
427 h->disconnecting = GNUNET_NO;
428 reconnect (h);
429 if (NULL == h->mq)
430 {
431 GNUNET_free (h);
432 return NULL;
433 }
434 return h;
435}
436
437
438/**
439 * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
440 * will be canceled.
441 * Any pending STORE requests will depend on @e snyc_first flag.
442 *
443 * @param h handle to disconnect
444 * @param sync_first send any pending STORE requests before disconnecting
445 */
446void
447GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
448{
449 struct GNUNET_PEERSTORE_IterateContext *ic;
450 struct GNUNET_PEERSTORE_StoreContext *sc;
451
452 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
453 if (NULL != h->watches)
454 {
455 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
456 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
457 h->watches = NULL;
458 }
459 while (NULL != (ic = h->iterate_head))
460 {
461 GNUNET_break (0);
462 GNUNET_PEERSTORE_iterate_cancel (ic);
463 }
464 if (NULL != h->store_head)
465 {
466 if (GNUNET_YES == sync_first)
467 {
468 LOG (GNUNET_ERROR_TYPE_DEBUG,
469 "Delaying disconnection due to pending store requests.\n");
470 h->disconnecting = GNUNET_YES;
471 return;
472 }
473 while (NULL != (sc = h->store_head))
474 GNUNET_PEERSTORE_store_cancel (sc);
475 }
476 final_disconnect (h);
477}
478
479
480/******************************************************************************/
481/******************* STORE FUNCTIONS *********************/
482/******************************************************************************/
483
484
485/**
486 * Cancel a store request
487 *
488 * @param sc Store request context
489 */
490void
491GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
492{
493 struct GNUNET_PEERSTORE_Handle *h = sc->h;
494
495 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
496 GNUNET_free (sc->sub_system);
497 GNUNET_free (sc->value);
498 GNUNET_free (sc->key);
499 GNUNET_free (sc);
500 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
501 final_disconnect (h);
502}
503
504
505/**
506 * Store a new entry in the PEERSTORE.
507 * Note that stored entries can be lost in some cases
508 * such as power failure.
509 *
510 * @param h Handle to the PEERSTORE service
511 * @param sub_system name of the sub system
512 * @param peer Peer Identity
513 * @param key entry key
514 * @param value entry value BLOB
515 * @param size size of @e value
516 * @param expiry absolute time after which the entry is (possibly) deleted
517 * @param options options specific to the storage operation
518 * @param cont Continuation function after the store request is sent
519 * @param cont_cls Closure for @a cont
520 */
521struct GNUNET_PEERSTORE_StoreContext *
522GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
523 const char *sub_system,
524 const struct GNUNET_PeerIdentity *peer,
525 const char *key,
526 const void *value,
527 size_t size,
528 struct GNUNET_TIME_Absolute expiry,
529 enum GNUNET_PEERSTORE_StoreOption options,
530 GNUNET_PEERSTORE_Continuation cont,
531 void *cont_cls)
532{
533 struct GNUNET_MQ_Envelope *ev;
534 struct GNUNET_PEERSTORE_StoreContext *sc;
535
536 LOG (GNUNET_ERROR_TYPE_DEBUG,
537 "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n",
538 size,
539 sub_system,
540 GNUNET_i2s (peer),
541 key);
542 ev =
543 PEERSTORE_create_record_mq_envelope (sub_system,
544 peer,
545 key,
546 value,
547 size,
548 expiry,
549 options,
550 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
551 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
552
553 sc->sub_system = GNUNET_strdup (sub_system);
554 sc->peer = *peer;
555 sc->key = GNUNET_strdup (key);
556 sc->value = GNUNET_memdup (value, size);
557 sc->size = size;
558 sc->expiry = expiry;
559 sc->options = options;
560 sc->cont = cont;
561 sc->cont_cls = cont_cls;
562 sc->h = h;
563
564 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
565 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
566 GNUNET_MQ_send (h->mq, ev);
567 return sc;
568}
569
570
571/******************************************************************************/
572/******************* ITERATE FUNCTIONS *********************/
573/******************************************************************************/
574
575
576/**
577 * When a response for iterate request is received
578 *
579 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
580 * @param msg message received
581 */
582static void
583handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
584{
585 struct GNUNET_PEERSTORE_Handle *h = cls;
586 struct GNUNET_PEERSTORE_IterateContext *ic;
587 GNUNET_PEERSTORE_Processor callback;
588 void *callback_cls;
589
590 ic = h->iterate_head;
591 if (NULL == ic)
592 {
593 LOG (GNUNET_ERROR_TYPE_ERROR,
594 _ ("Unexpected iteration response, this should not happen.\n"));
595 disconnect_and_schedule_reconnect (h);
596 return;
597 }
598 callback = ic->callback;
599 callback_cls = ic->callback_cls;
600 ic->iterating = GNUNET_NO;
601 GNUNET_PEERSTORE_iterate_cancel (ic);
602 if (NULL != callback)
603 callback (callback_cls, NULL, NULL);
604 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
605}
606
607
608/**
609 * When a response for iterate request is received, check the
610 * message is well-formed.
611 *
612 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
613 * @param msg message received
614 */
615static int
616check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
617{
618 /* we defer validation to #handle_iterate_result */
619 return GNUNET_OK;
620}
621
622
623/**
624 * When a response for iterate request is received
625 *
626 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
627 * @param msg message received
628 */
629static void
630handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
631{
632 struct GNUNET_PEERSTORE_Handle *h = cls;
633 struct GNUNET_PEERSTORE_IterateContext *ic;
634 GNUNET_PEERSTORE_Processor callback;
635 void *callback_cls;
636 struct GNUNET_PEERSTORE_Record *record;
637
638 ic = h->iterate_head;
639 if (NULL == ic)
640 {
641 LOG (GNUNET_ERROR_TYPE_ERROR,
642 _ ("Unexpected iteration response, this should not happen.\n"));
643 disconnect_and_schedule_reconnect (h);
644 return;
645 }
646 ic->iterating = GNUNET_YES;
647 callback = ic->callback;
648 callback_cls = ic->callback_cls;
649 if (NULL == callback)
650 return;
651 record = PEERSTORE_parse_record_message (msg);
652 if (NULL == record)
653 {
654 callback (callback_cls,
655 NULL,
656 _ ("Received a malformed response from service."));
657 }
658 else
659 {
660 callback (callback_cls, record, NULL);
661 PEERSTORE_destroy_record (record);
662 }
663}
664
665
666/**
667 * Cancel an iterate request
668 * Please do not call after the iterate request is done
669 *
670 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
671 */
672void
673GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
674{
675 if (GNUNET_NO == ic->iterating)
676 {
677 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
678 GNUNET_free (ic->sub_system);
679 GNUNET_free (ic->key);
680 GNUNET_free (ic);
681 }
682 else
683 ic->callback = NULL;
684}
685
686
687/**
688 * Iterate over records matching supplied key information
689 *
690 * @param h handle to the PEERSTORE service
691 * @param sub_system name of sub system
692 * @param peer Peer identity (can be NULL)
693 * @param key entry key string (can be NULL)
694 * @param callback function called with each matching record, all NULL's on end
695 * @param callback_cls closure for @a callback
696 * @return Handle to iteration request
697 */
698struct GNUNET_PEERSTORE_IterateContext *
699GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
700 const char *sub_system,
701 const struct GNUNET_PeerIdentity *peer,
702 const char *key,
703 GNUNET_PEERSTORE_Processor callback,
704 void *callback_cls)
705{
706 struct GNUNET_MQ_Envelope *ev;
707 struct GNUNET_PEERSTORE_IterateContext *ic;
708
709 ev =
710 PEERSTORE_create_record_mq_envelope (sub_system,
711 peer,
712 key,
713 NULL,
714 0,
715 GNUNET_TIME_UNIT_FOREVER_ABS,
716 0,
717 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
718 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
719 ic->callback = callback;
720 ic->callback_cls = callback_cls;
721 ic->h = h;
722 ic->sub_system = GNUNET_strdup (sub_system);
723 if (NULL != peer)
724 ic->peer = *peer;
725 if (NULL != key)
726 ic->key = GNUNET_strdup (key);
727 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
728 LOG (GNUNET_ERROR_TYPE_DEBUG,
729 "Sending an iterate request for sub system `%s'\n",
730 sub_system);
731 GNUNET_MQ_send (h->mq, ev);
732 return ic;
733}
734
735
736/******************************************************************************/
737/******************* WATCH FUNCTIONS *********************/
738/******************************************************************************/
739
740/**
741 * When a watch record is received, validate it is well-formed.
742 *
743 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
744 * @param msg message received
745 */
746static int
747check_watch_record (void *cls, const struct StoreRecordMessage *msg)
748{
749 /* we defer validation to #handle_watch_result */
750 return GNUNET_OK;
751}
752
753
754/**
755 * When a watch record is received, process it.
756 *
757 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
758 * @param msg message received
759 */
760static void
761handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
762{
763 struct GNUNET_PEERSTORE_Handle *h = cls;
764 struct GNUNET_PEERSTORE_Record *record;
765 struct GNUNET_HashCode keyhash;
766 struct GNUNET_PEERSTORE_WatchContext *wc;
767
768 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
769 record = PEERSTORE_parse_record_message (msg);
770 if (NULL == record)
771 {
772 disconnect_and_schedule_reconnect (h);
773 return;
774 }
775 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
776 // FIXME: what if there are multiple watches for the same key?
777 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
778 if (NULL == wc)
779 {
780 LOG (GNUNET_ERROR_TYPE_ERROR,
781 _ ("Received a watch result for a non existing watch.\n"));
782 PEERSTORE_destroy_record (record);
783 disconnect_and_schedule_reconnect (h);
784 return;
785 }
786 if (NULL != wc->callback)
787 wc->callback (wc->callback_cls, record, NULL);
788 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
789 PEERSTORE_destroy_record (record);
790}
791
792
793/**
794 * Close the existing connection to PEERSTORE and reconnect.
795 *
796 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
797 */
798static void
799reconnect (void *cls)
800{
801 struct GNUNET_PEERSTORE_Handle *h = cls;
802 struct GNUNET_MQ_MessageHandler mq_handlers[] =
803 { GNUNET_MQ_hd_fixed_size (iterate_end,
804 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
805 struct GNUNET_MessageHeader,
806 h),
807 GNUNET_MQ_hd_var_size (iterate_result,
808 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
809 struct StoreRecordMessage,
810 h),
811 GNUNET_MQ_hd_var_size (watch_record,
812 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
813 struct StoreRecordMessage,
814 h),
815 GNUNET_MQ_handler_end () };
816 struct GNUNET_MQ_Envelope *ev;
817
818 h->reconnect_task = NULL;
819 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
820 h->mq = GNUNET_CLIENT_connect (h->cfg,
821 "peerstore",
822 mq_handlers,
823 &handle_client_error,
824 h);
825 if (NULL == h->mq)
826 {
827 h->reconnect_task =
828 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
829 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
830 return;
831 }
832 LOG (GNUNET_ERROR_TYPE_DEBUG,
833 "Resending pending requests after reconnect.\n");
834 if (NULL != h->watches)
835 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
836 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
837 ic = ic->next)
838 {
839 ev =
840 PEERSTORE_create_record_mq_envelope (ic->sub_system,
841 &ic->peer,
842 ic->key,
843 NULL,
844 0,
845 GNUNET_TIME_UNIT_FOREVER_ABS,
846 0,
847 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
848 GNUNET_MQ_send (h->mq, ev);
849 }
850 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
851 sc = sc->next)
852 {
853 ev =
854 PEERSTORE_create_record_mq_envelope (sc->sub_system,
855 &sc->peer,
856 sc->key,
857 sc->value,
858 sc->size,
859 sc->expiry,
860 sc->options,
861 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
862 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
863 GNUNET_MQ_send (h->mq, ev);
864 }
865}
866
867
868/**
869 * Cancel a watch request
870 *
871 * @param wc handle to the watch request
872 */
873void
874GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
875{
876 struct GNUNET_PEERSTORE_Handle *h = wc->h;
877 struct GNUNET_MQ_Envelope *ev;
878 struct StoreKeyHashMessage *hm;
879
880 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
881 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
882 hm->keyhash = wc->keyhash;
883 GNUNET_MQ_send (h->mq, ev);
884 GNUNET_assert (
885 GNUNET_YES ==
886 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
887 GNUNET_free (wc);
888}
889
890
891/**
892 * Request watching a given key
893 * User will be notified with any new values added to key
894 *
895 * @param h handle to the PEERSTORE service
896 * @param sub_system name of sub system
897 * @param peer Peer identity
898 * @param key entry key string
899 * @param callback function called with each new value
900 * @param callback_cls closure for @a callback
901 * @return Handle to watch request
902 */
903struct GNUNET_PEERSTORE_WatchContext *
904GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
905 const char *sub_system,
906 const struct GNUNET_PeerIdentity *peer,
907 const char *key,
908 GNUNET_PEERSTORE_Processor callback,
909 void *callback_cls)
910{
911 struct GNUNET_MQ_Envelope *ev;
912 struct StoreKeyHashMessage *hm;
913 struct GNUNET_PEERSTORE_WatchContext *wc;
914
915 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
916 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
917 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
918 wc->callback = callback;
919 wc->callback_cls = callback_cls;
920 wc->h = h;
921 wc->keyhash = hm->keyhash;
922 if (NULL == h->watches)
923 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
924 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
925 h->watches,
926 &wc->keyhash,
927 wc,
928 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
929 LOG (GNUNET_ERROR_TYPE_DEBUG,
930 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
931 sub_system,
932 GNUNET_i2s (peer),
933 key);
934 GNUNET_MQ_send (h->mq, ev);
935 return wc;
936}
937
938
939/* end of peerstore_api.c */