diff options
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r-- | src/peerstore/peerstore_api.c | 939 |
1 files changed, 0 insertions, 939 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c deleted file mode 100644 index 502b38646..000000000 --- a/src/peerstore/peerstore_api.c +++ /dev/null | |||
@@ -1,939 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2013-2016, 2019 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | /** | ||
21 | * @file peerstore/peerstore_api.c | ||
22 | * @brief API for peerstore | ||
23 | * @author Omar Tarabai | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "peerstore.h" | ||
29 | #include "peerstore_common.h" | ||
30 | |||
31 | #define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__) | ||
32 | |||
33 | /******************************************************************************/ | ||
34 | /************************ DATA STRUCTURES ****************************/ | ||
35 | /******************************************************************************/ | ||
36 | |||
37 | /** | ||
38 | * Handle to the PEERSTORE service. | ||
39 | */ | ||
40 | struct GNUNET_PEERSTORE_Handle | ||
41 | { | ||
42 | /** | ||
43 | * Our configuration. | ||
44 | */ | ||
45 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
46 | |||
47 | /** | ||
48 | * Message queue | ||
49 | */ | ||
50 | struct GNUNET_MQ_Handle *mq; | ||
51 | |||
52 | /** | ||
53 | * Head of active STORE requests. | ||
54 | */ | ||
55 | struct GNUNET_PEERSTORE_StoreContext *store_head; | ||
56 | |||
57 | /** | ||
58 | * Tail of active STORE requests. | ||
59 | */ | ||
60 | struct GNUNET_PEERSTORE_StoreContext *store_tail; | ||
61 | |||
62 | /** | ||
63 | * Head of active ITERATE requests. | ||
64 | */ | ||
65 | struct GNUNET_PEERSTORE_IterateContext *iterate_head; | ||
66 | |||
67 | /** | ||
68 | * Tail of active ITERATE requests. | ||
69 | */ | ||
70 | struct GNUNET_PEERSTORE_IterateContext *iterate_tail; | ||
71 | |||
72 | /** | ||
73 | * Hashmap of watch requests | ||
74 | */ | ||
75 | struct GNUNET_CONTAINER_MultiHashMap *watches; | ||
76 | |||
77 | /** | ||
78 | * ID of the task trying to reconnect to the service. | ||
79 | */ | ||
80 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
81 | |||
82 | /** | ||
83 | * Delay until we try to reconnect. | ||
84 | */ | ||
85 | struct GNUNET_TIME_Relative reconnect_delay; | ||
86 | |||
87 | /** | ||
88 | * Are we in the process of disconnecting but need to sync first? | ||
89 | */ | ||
90 | int disconnecting; | ||
91 | }; | ||
92 | |||
93 | /** | ||
94 | * Context for a store request | ||
95 | */ | ||
96 | struct GNUNET_PEERSTORE_StoreContext | ||
97 | { | ||
98 | /** | ||
99 | * Kept in a DLL. | ||
100 | */ | ||
101 | struct GNUNET_PEERSTORE_StoreContext *next; | ||
102 | |||
103 | /** | ||
104 | * Kept in a DLL. | ||
105 | */ | ||
106 | struct GNUNET_PEERSTORE_StoreContext *prev; | ||
107 | |||
108 | /** | ||
109 | * Handle to the PEERSTORE service. | ||
110 | */ | ||
111 | struct GNUNET_PEERSTORE_Handle *h; | ||
112 | |||
113 | /** | ||
114 | * Continuation called with service response | ||
115 | */ | ||
116 | GNUNET_PEERSTORE_Continuation cont; | ||
117 | |||
118 | /** | ||
119 | * Closure for @e cont | ||
120 | */ | ||
121 | void *cont_cls; | ||
122 | |||
123 | /** | ||
124 | * Which subsystem does the store? | ||
125 | */ | ||
126 | char *sub_system; | ||
127 | |||
128 | /** | ||
129 | * Key for the store operation. | ||
130 | */ | ||
131 | char *key; | ||
132 | |||
133 | /** | ||
134 | * Contains @e size bytes. | ||
135 | */ | ||
136 | void *value; | ||
137 | |||
138 | /** | ||
139 | * Peer the store is for. | ||
140 | */ | ||
141 | struct GNUNET_PeerIdentity peer; | ||
142 | |||
143 | /** | ||
144 | * Number of bytes in @e value. | ||
145 | */ | ||
146 | size_t size; | ||
147 | |||
148 | /** | ||
149 | * When does the value expire? | ||
150 | */ | ||
151 | struct GNUNET_TIME_Absolute expiry; | ||
152 | |||
153 | /** | ||
154 | * Options for the store operation. | ||
155 | */ | ||
156 | enum GNUNET_PEERSTORE_StoreOption options; | ||
157 | }; | ||
158 | |||
159 | /** | ||
160 | * Context for a iterate request | ||
161 | */ | ||
162 | struct GNUNET_PEERSTORE_IterateContext | ||
163 | { | ||
164 | /** | ||
165 | * Kept in a DLL. | ||
166 | */ | ||
167 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
168 | |||
169 | /** | ||
170 | * Kept in a DLL. | ||
171 | */ | ||
172 | struct GNUNET_PEERSTORE_IterateContext *prev; | ||
173 | |||
174 | /** | ||
175 | * Handle to the PEERSTORE service. | ||
176 | */ | ||
177 | struct GNUNET_PEERSTORE_Handle *h; | ||
178 | |||
179 | /** | ||
180 | * Which subsystem does the store? | ||
181 | */ | ||
182 | char *sub_system; | ||
183 | |||
184 | /** | ||
185 | * Peer the store is for. | ||
186 | */ | ||
187 | struct GNUNET_PeerIdentity peer; | ||
188 | |||
189 | /** | ||
190 | * Key for the store operation. | ||
191 | */ | ||
192 | char *key; | ||
193 | |||
194 | /** | ||
195 | * Callback with each matching record | ||
196 | */ | ||
197 | GNUNET_PEERSTORE_Processor callback; | ||
198 | |||
199 | /** | ||
200 | * Closure for @e callback | ||
201 | */ | ||
202 | void *callback_cls; | ||
203 | |||
204 | /** | ||
205 | * #GNUNET_YES if we are currently processing records. | ||
206 | */ | ||
207 | int iterating; | ||
208 | }; | ||
209 | |||
210 | /** | ||
211 | * Context for a watch request | ||
212 | */ | ||
213 | struct GNUNET_PEERSTORE_WatchContext | ||
214 | { | ||
215 | /** | ||
216 | * Kept in a DLL. | ||
217 | */ | ||
218 | struct GNUNET_PEERSTORE_WatchContext *next; | ||
219 | |||
220 | /** | ||
221 | * Kept in a DLL. | ||
222 | */ | ||
223 | struct GNUNET_PEERSTORE_WatchContext *prev; | ||
224 | |||
225 | /** | ||
226 | * Handle to the PEERSTORE service. | ||
227 | */ | ||
228 | struct GNUNET_PEERSTORE_Handle *h; | ||
229 | |||
230 | /** | ||
231 | * Callback with each record received | ||
232 | */ | ||
233 | GNUNET_PEERSTORE_Processor callback; | ||
234 | |||
235 | /** | ||
236 | * Closure for @e callback | ||
237 | */ | ||
238 | void *callback_cls; | ||
239 | |||
240 | /** | ||
241 | * Hash of the combined key | ||
242 | */ | ||
243 | struct GNUNET_HashCode keyhash; | ||
244 | }; | ||
245 | |||
246 | /******************************************************************************/ | ||
247 | /******************* DECLARATIONS *********************/ | ||
248 | /******************************************************************************/ | ||
249 | |||
250 | /** | ||
251 | * Close the existing connection to PEERSTORE and reconnect. | ||
252 | * | ||
253 | * @param cls a `struct GNUNET_PEERSTORE_Handle *h` | ||
254 | */ | ||
255 | static void | ||
256 | reconnect (void *cls); | ||
257 | |||
258 | |||
259 | /** | ||
260 | * Disconnect from the peerstore service. | ||
261 | * | ||
262 | * @param h peerstore handle to disconnect | ||
263 | */ | ||
264 | static void | ||
265 | disconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
266 | { | ||
267 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
268 | |||
269 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; | ||
270 | ic = next) | ||
271 | { | ||
272 | next = ic->next; | ||
273 | if (GNUNET_YES == ic->iterating) | ||
274 | { | ||
275 | GNUNET_PEERSTORE_Processor icb; | ||
276 | void *icb_cls; | ||
277 | |||
278 | icb = ic->callback; | ||
279 | icb_cls = ic->callback_cls; | ||
280 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
281 | if (NULL != icb) | ||
282 | icb (icb_cls, NULL, "Iteration canceled due to reconnection"); | ||
283 | } | ||
284 | } | ||
285 | |||
286 | if (NULL != h->mq) | ||
287 | { | ||
288 | GNUNET_MQ_destroy (h->mq); | ||
289 | h->mq = NULL; | ||
290 | } | ||
291 | } | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Function that will schedule the job that will try | ||
296 | * to connect us again to the client. | ||
297 | * | ||
298 | * @param h peerstore to reconnect | ||
299 | */ | ||
300 | static void | ||
301 | disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
302 | { | ||
303 | GNUNET_assert (NULL == h->reconnect_task); | ||
304 | disconnect (h); | ||
305 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
306 | "Scheduling task to reconnect to PEERSTORE service in %s.\n", | ||
307 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); | ||
308 | h->reconnect_task = | ||
309 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); | ||
310 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
311 | } | ||
312 | |||
313 | |||
314 | /** | ||
315 | * Callback after MQ envelope is sent | ||
316 | * | ||
317 | * @param cls a `struct GNUNET_PEERSTORE_StoreContext *` | ||
318 | */ | ||
319 | static void | ||
320 | store_request_sent (void *cls) | ||
321 | { | ||
322 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; | ||
323 | GNUNET_PEERSTORE_Continuation cont; | ||
324 | void *cont_cls; | ||
325 | |||
326 | cont = sc->cont; | ||
327 | cont_cls = sc->cont_cls; | ||
328 | GNUNET_PEERSTORE_store_cancel (sc); | ||
329 | if (NULL != cont) | ||
330 | cont (cont_cls, GNUNET_OK); | ||
331 | } | ||
332 | |||
333 | |||
334 | /******************************************************************************/ | ||
335 | /******************* CONNECTION FUNCTIONS *********************/ | ||
336 | /******************************************************************************/ | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Function called when we had trouble talking to the service. | ||
341 | */ | ||
342 | static void | ||
343 | handle_client_error (void *cls, enum GNUNET_MQ_Error error) | ||
344 | { | ||
345 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
346 | |||
347 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
348 | "Received an error notification from MQ of type: %d\n", | ||
349 | error); | ||
350 | disconnect_and_schedule_reconnect (h); | ||
351 | } | ||
352 | |||
353 | |||
354 | /** | ||
355 | * Iterator over previous watches to resend them | ||
356 | * | ||
357 | * @param cls the `struct GNUNET_PEERSTORE_Handle` | ||
358 | * @param key key for the watch | ||
359 | * @param value the `struct GNUNET_PEERSTORE_WatchContext *` | ||
360 | * @return #GNUNET_YES (continue to iterate) | ||
361 | */ | ||
362 | static int | ||
363 | rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
364 | { | ||
365 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
366 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | ||
367 | struct StoreKeyHashMessage *hm; | ||
368 | struct GNUNET_MQ_Envelope *ev; | ||
369 | |||
370 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
371 | hm->keyhash = wc->keyhash; | ||
372 | GNUNET_MQ_send (h->mq, ev); | ||
373 | return GNUNET_YES; | ||
374 | } | ||
375 | |||
376 | |||
377 | /** | ||
378 | * Iterator over watch requests to cancel them. | ||
379 | * | ||
380 | * @param cls unused | ||
381 | * @param key key to the watch request | ||
382 | * @param value watch context | ||
383 | * @return #GNUNET_YES to continue iteration | ||
384 | */ | ||
385 | static int | ||
386 | destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
387 | { | ||
388 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | ||
389 | |||
390 | GNUNET_PEERSTORE_watch_cancel (wc); | ||
391 | return GNUNET_YES; | ||
392 | } | ||
393 | |||
394 | |||
395 | /** | ||
396 | * Kill the connection to the service. This can be delayed in case of pending | ||
397 | * STORE requests and the user explicitly asked to sync first. Otherwise it is | ||
398 | * performed instantly. | ||
399 | * | ||
400 | * @param h Handle to the service. | ||
401 | */ | ||
402 | static void | ||
403 | final_disconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
404 | { | ||
405 | if (NULL != h->mq) | ||
406 | { | ||
407 | GNUNET_MQ_destroy (h->mq); | ||
408 | h->mq = NULL; | ||
409 | } | ||
410 | GNUNET_free (h); | ||
411 | } | ||
412 | |||
413 | |||
414 | /** | ||
415 | * Connect to the PEERSTORE service. | ||
416 | * | ||
417 | * @param cfg configuration to use | ||
418 | * @return NULL on error | ||
419 | */ | ||
420 | struct GNUNET_PEERSTORE_Handle * | ||
421 | GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
422 | { | ||
423 | struct GNUNET_PEERSTORE_Handle *h; | ||
424 | |||
425 | h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); | ||
426 | h->cfg = cfg; | ||
427 | h->disconnecting = GNUNET_NO; | ||
428 | reconnect (h); | ||
429 | if (NULL == h->mq) | ||
430 | { | ||
431 | GNUNET_free (h); | ||
432 | return NULL; | ||
433 | } | ||
434 | return h; | ||
435 | } | ||
436 | |||
437 | |||
438 | /** | ||
439 | * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests | ||
440 | * will be canceled. | ||
441 | * Any pending STORE requests will depend on @e snyc_first flag. | ||
442 | * | ||
443 | * @param h handle to disconnect | ||
444 | * @param sync_first send any pending STORE requests before disconnecting | ||
445 | */ | ||
446 | void | ||
447 | GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first) | ||
448 | { | ||
449 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
450 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
451 | |||
452 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n"); | ||
453 | if (NULL != h->watches) | ||
454 | { | ||
455 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL); | ||
456 | GNUNET_CONTAINER_multihashmap_destroy (h->watches); | ||
457 | h->watches = NULL; | ||
458 | } | ||
459 | while (NULL != (ic = h->iterate_head)) | ||
460 | { | ||
461 | GNUNET_break (0); | ||
462 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
463 | } | ||
464 | if (NULL != h->store_head) | ||
465 | { | ||
466 | if (GNUNET_YES == sync_first) | ||
467 | { | ||
468 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
469 | "Delaying disconnection due to pending store requests.\n"); | ||
470 | h->disconnecting = GNUNET_YES; | ||
471 | return; | ||
472 | } | ||
473 | while (NULL != (sc = h->store_head)) | ||
474 | GNUNET_PEERSTORE_store_cancel (sc); | ||
475 | } | ||
476 | final_disconnect (h); | ||
477 | } | ||
478 | |||
479 | |||
480 | /******************************************************************************/ | ||
481 | /******************* STORE FUNCTIONS *********************/ | ||
482 | /******************************************************************************/ | ||
483 | |||
484 | |||
485 | /** | ||
486 | * Cancel a store request | ||
487 | * | ||
488 | * @param sc Store request context | ||
489 | */ | ||
490 | void | ||
491 | GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | ||
492 | { | ||
493 | struct GNUNET_PEERSTORE_Handle *h = sc->h; | ||
494 | |||
495 | GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc); | ||
496 | GNUNET_free (sc->sub_system); | ||
497 | GNUNET_free (sc->value); | ||
498 | GNUNET_free (sc->key); | ||
499 | GNUNET_free (sc); | ||
500 | if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head)) | ||
501 | final_disconnect (h); | ||
502 | } | ||
503 | |||
504 | |||
505 | /** | ||
506 | * Store a new entry in the PEERSTORE. | ||
507 | * Note that stored entries can be lost in some cases | ||
508 | * such as power failure. | ||
509 | * | ||
510 | * @param h Handle to the PEERSTORE service | ||
511 | * @param sub_system name of the sub system | ||
512 | * @param peer Peer Identity | ||
513 | * @param key entry key | ||
514 | * @param value entry value BLOB | ||
515 | * @param size size of @e value | ||
516 | * @param expiry absolute time after which the entry is (possibly) deleted | ||
517 | * @param options options specific to the storage operation | ||
518 | * @param cont Continuation function after the store request is sent | ||
519 | * @param cont_cls Closure for @a cont | ||
520 | */ | ||
521 | struct GNUNET_PEERSTORE_StoreContext * | ||
522 | GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | ||
523 | const char *sub_system, | ||
524 | const struct GNUNET_PeerIdentity *peer, | ||
525 | const char *key, | ||
526 | const void *value, | ||
527 | size_t size, | ||
528 | struct GNUNET_TIME_Absolute expiry, | ||
529 | enum GNUNET_PEERSTORE_StoreOption options, | ||
530 | GNUNET_PEERSTORE_Continuation cont, | ||
531 | void *cont_cls) | ||
532 | { | ||
533 | struct GNUNET_MQ_Envelope *ev; | ||
534 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
535 | |||
536 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
537 | "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n", | ||
538 | size, | ||
539 | sub_system, | ||
540 | GNUNET_i2s (peer), | ||
541 | key); | ||
542 | ev = | ||
543 | PEERSTORE_create_record_mq_envelope (sub_system, | ||
544 | peer, | ||
545 | key, | ||
546 | value, | ||
547 | size, | ||
548 | expiry, | ||
549 | options, | ||
550 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
551 | sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); | ||
552 | |||
553 | sc->sub_system = GNUNET_strdup (sub_system); | ||
554 | sc->peer = *peer; | ||
555 | sc->key = GNUNET_strdup (key); | ||
556 | sc->value = GNUNET_memdup (value, size); | ||
557 | sc->size = size; | ||
558 | sc->expiry = expiry; | ||
559 | sc->options = options; | ||
560 | sc->cont = cont; | ||
561 | sc->cont_cls = cont_cls; | ||
562 | sc->h = h; | ||
563 | |||
564 | GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc); | ||
565 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | ||
566 | GNUNET_MQ_send (h->mq, ev); | ||
567 | return sc; | ||
568 | } | ||
569 | |||
570 | |||
571 | /******************************************************************************/ | ||
572 | /******************* ITERATE FUNCTIONS *********************/ | ||
573 | /******************************************************************************/ | ||
574 | |||
575 | |||
576 | /** | ||
577 | * When a response for iterate request is received | ||
578 | * | ||
579 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
580 | * @param msg message received | ||
581 | */ | ||
582 | static void | ||
583 | handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg) | ||
584 | { | ||
585 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
586 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
587 | GNUNET_PEERSTORE_Processor callback; | ||
588 | void *callback_cls; | ||
589 | |||
590 | ic = h->iterate_head; | ||
591 | if (NULL == ic) | ||
592 | { | ||
593 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
594 | _ ("Unexpected iteration response, this should not happen.\n")); | ||
595 | disconnect_and_schedule_reconnect (h); | ||
596 | return; | ||
597 | } | ||
598 | callback = ic->callback; | ||
599 | callback_cls = ic->callback_cls; | ||
600 | ic->iterating = GNUNET_NO; | ||
601 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
602 | if (NULL != callback) | ||
603 | callback (callback_cls, NULL, NULL); | ||
604 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
605 | } | ||
606 | |||
607 | |||
608 | /** | ||
609 | * When a response for iterate request is received, check the | ||
610 | * message is well-formed. | ||
611 | * | ||
612 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
613 | * @param msg message received | ||
614 | */ | ||
615 | static int | ||
616 | check_iterate_result (void *cls, const struct StoreRecordMessage *msg) | ||
617 | { | ||
618 | /* we defer validation to #handle_iterate_result */ | ||
619 | return GNUNET_OK; | ||
620 | } | ||
621 | |||
622 | |||
623 | /** | ||
624 | * When a response for iterate request is received | ||
625 | * | ||
626 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
627 | * @param msg message received | ||
628 | */ | ||
629 | static void | ||
630 | handle_iterate_result (void *cls, const struct StoreRecordMessage *msg) | ||
631 | { | ||
632 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
633 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
634 | GNUNET_PEERSTORE_Processor callback; | ||
635 | void *callback_cls; | ||
636 | struct GNUNET_PEERSTORE_Record *record; | ||
637 | |||
638 | ic = h->iterate_head; | ||
639 | if (NULL == ic) | ||
640 | { | ||
641 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
642 | _ ("Unexpected iteration response, this should not happen.\n")); | ||
643 | disconnect_and_schedule_reconnect (h); | ||
644 | return; | ||
645 | } | ||
646 | ic->iterating = GNUNET_YES; | ||
647 | callback = ic->callback; | ||
648 | callback_cls = ic->callback_cls; | ||
649 | if (NULL == callback) | ||
650 | return; | ||
651 | record = PEERSTORE_parse_record_message (msg); | ||
652 | if (NULL == record) | ||
653 | { | ||
654 | callback (callback_cls, | ||
655 | NULL, | ||
656 | _ ("Received a malformed response from service.")); | ||
657 | } | ||
658 | else | ||
659 | { | ||
660 | callback (callback_cls, record, NULL); | ||
661 | PEERSTORE_destroy_record (record); | ||
662 | } | ||
663 | } | ||
664 | |||
665 | |||
666 | /** | ||
667 | * Cancel an iterate request | ||
668 | * Please do not call after the iterate request is done | ||
669 | * | ||
670 | * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate() | ||
671 | */ | ||
672 | void | ||
673 | GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) | ||
674 | { | ||
675 | if (GNUNET_NO == ic->iterating) | ||
676 | { | ||
677 | GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic); | ||
678 | GNUNET_free (ic->sub_system); | ||
679 | GNUNET_free (ic->key); | ||
680 | GNUNET_free (ic); | ||
681 | } | ||
682 | else | ||
683 | ic->callback = NULL; | ||
684 | } | ||
685 | |||
686 | |||
687 | /** | ||
688 | * Iterate over records matching supplied key information | ||
689 | * | ||
690 | * @param h handle to the PEERSTORE service | ||
691 | * @param sub_system name of sub system | ||
692 | * @param peer Peer identity (can be NULL) | ||
693 | * @param key entry key string (can be NULL) | ||
694 | * @param callback function called with each matching record, all NULL's on end | ||
695 | * @param callback_cls closure for @a callback | ||
696 | * @return Handle to iteration request | ||
697 | */ | ||
698 | struct GNUNET_PEERSTORE_IterateContext * | ||
699 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | ||
700 | const char *sub_system, | ||
701 | const struct GNUNET_PeerIdentity *peer, | ||
702 | const char *key, | ||
703 | GNUNET_PEERSTORE_Processor callback, | ||
704 | void *callback_cls) | ||
705 | { | ||
706 | struct GNUNET_MQ_Envelope *ev; | ||
707 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
708 | |||
709 | ev = | ||
710 | PEERSTORE_create_record_mq_envelope (sub_system, | ||
711 | peer, | ||
712 | key, | ||
713 | NULL, | ||
714 | 0, | ||
715 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
716 | 0, | ||
717 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
718 | ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); | ||
719 | ic->callback = callback; | ||
720 | ic->callback_cls = callback_cls; | ||
721 | ic->h = h; | ||
722 | ic->sub_system = GNUNET_strdup (sub_system); | ||
723 | if (NULL != peer) | ||
724 | ic->peer = *peer; | ||
725 | if (NULL != key) | ||
726 | ic->key = GNUNET_strdup (key); | ||
727 | GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic); | ||
728 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
729 | "Sending an iterate request for sub system `%s'\n", | ||
730 | sub_system); | ||
731 | GNUNET_MQ_send (h->mq, ev); | ||
732 | return ic; | ||
733 | } | ||
734 | |||
735 | |||
736 | /******************************************************************************/ | ||
737 | /******************* WATCH FUNCTIONS *********************/ | ||
738 | /******************************************************************************/ | ||
739 | |||
740 | /** | ||
741 | * When a watch record is received, validate it is well-formed. | ||
742 | * | ||
743 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
744 | * @param msg message received | ||
745 | */ | ||
746 | static int | ||
747 | check_watch_record (void *cls, const struct StoreRecordMessage *msg) | ||
748 | { | ||
749 | /* we defer validation to #handle_watch_result */ | ||
750 | return GNUNET_OK; | ||
751 | } | ||
752 | |||
753 | |||
754 | /** | ||
755 | * When a watch record is received, process it. | ||
756 | * | ||
757 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
758 | * @param msg message received | ||
759 | */ | ||
760 | static void | ||
761 | handle_watch_record (void *cls, const struct StoreRecordMessage *msg) | ||
762 | { | ||
763 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
764 | struct GNUNET_PEERSTORE_Record *record; | ||
765 | struct GNUNET_HashCode keyhash; | ||
766 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
767 | |||
768 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); | ||
769 | record = PEERSTORE_parse_record_message (msg); | ||
770 | if (NULL == record) | ||
771 | { | ||
772 | disconnect_and_schedule_reconnect (h); | ||
773 | return; | ||
774 | } | ||
775 | PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash); | ||
776 | // FIXME: what if there are multiple watches for the same key? | ||
777 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); | ||
778 | if (NULL == wc) | ||
779 | { | ||
780 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
781 | _ ("Received a watch result for a non existing watch.\n")); | ||
782 | PEERSTORE_destroy_record (record); | ||
783 | disconnect_and_schedule_reconnect (h); | ||
784 | return; | ||
785 | } | ||
786 | if (NULL != wc->callback) | ||
787 | wc->callback (wc->callback_cls, record, NULL); | ||
788 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
789 | PEERSTORE_destroy_record (record); | ||
790 | } | ||
791 | |||
792 | |||
793 | /** | ||
794 | * Close the existing connection to PEERSTORE and reconnect. | ||
795 | * | ||
796 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
797 | */ | ||
798 | static void | ||
799 | reconnect (void *cls) | ||
800 | { | ||
801 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
802 | struct GNUNET_MQ_MessageHandler mq_handlers[] = | ||
803 | { GNUNET_MQ_hd_fixed_size (iterate_end, | ||
804 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, | ||
805 | struct GNUNET_MessageHeader, | ||
806 | h), | ||
807 | GNUNET_MQ_hd_var_size (iterate_result, | ||
808 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, | ||
809 | struct StoreRecordMessage, | ||
810 | h), | ||
811 | GNUNET_MQ_hd_var_size (watch_record, | ||
812 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, | ||
813 | struct StoreRecordMessage, | ||
814 | h), | ||
815 | GNUNET_MQ_handler_end () }; | ||
816 | struct GNUNET_MQ_Envelope *ev; | ||
817 | |||
818 | h->reconnect_task = NULL; | ||
819 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); | ||
820 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
821 | "peerstore", | ||
822 | mq_handlers, | ||
823 | &handle_client_error, | ||
824 | h); | ||
825 | if (NULL == h->mq) | ||
826 | { | ||
827 | h->reconnect_task = | ||
828 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); | ||
829 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
830 | return; | ||
831 | } | ||
832 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
833 | "Resending pending requests after reconnect.\n"); | ||
834 | if (NULL != h->watches) | ||
835 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h); | ||
836 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; | ||
837 | ic = ic->next) | ||
838 | { | ||
839 | ev = | ||
840 | PEERSTORE_create_record_mq_envelope (ic->sub_system, | ||
841 | &ic->peer, | ||
842 | ic->key, | ||
843 | NULL, | ||
844 | 0, | ||
845 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
846 | 0, | ||
847 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
848 | GNUNET_MQ_send (h->mq, ev); | ||
849 | } | ||
850 | for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc; | ||
851 | sc = sc->next) | ||
852 | { | ||
853 | ev = | ||
854 | PEERSTORE_create_record_mq_envelope (sc->sub_system, | ||
855 | &sc->peer, | ||
856 | sc->key, | ||
857 | sc->value, | ||
858 | sc->size, | ||
859 | sc->expiry, | ||
860 | sc->options, | ||
861 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
862 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | ||
863 | GNUNET_MQ_send (h->mq, ev); | ||
864 | } | ||
865 | } | ||
866 | |||
867 | |||
868 | /** | ||
869 | * Cancel a watch request | ||
870 | * | ||
871 | * @param wc handle to the watch request | ||
872 | */ | ||
873 | void | ||
874 | GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc) | ||
875 | { | ||
876 | struct GNUNET_PEERSTORE_Handle *h = wc->h; | ||
877 | struct GNUNET_MQ_Envelope *ev; | ||
878 | struct StoreKeyHashMessage *hm; | ||
879 | |||
880 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); | ||
881 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | ||
882 | hm->keyhash = wc->keyhash; | ||
883 | GNUNET_MQ_send (h->mq, ev); | ||
884 | GNUNET_assert ( | ||
885 | GNUNET_YES == | ||
886 | GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc)); | ||
887 | GNUNET_free (wc); | ||
888 | } | ||
889 | |||
890 | |||
891 | /** | ||
892 | * Request watching a given key | ||
893 | * User will be notified with any new values added to key | ||
894 | * | ||
895 | * @param h handle to the PEERSTORE service | ||
896 | * @param sub_system name of sub system | ||
897 | * @param peer Peer identity | ||
898 | * @param key entry key string | ||
899 | * @param callback function called with each new value | ||
900 | * @param callback_cls closure for @a callback | ||
901 | * @return Handle to watch request | ||
902 | */ | ||
903 | struct GNUNET_PEERSTORE_WatchContext * | ||
904 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | ||
905 | const char *sub_system, | ||
906 | const struct GNUNET_PeerIdentity *peer, | ||
907 | const char *key, | ||
908 | GNUNET_PEERSTORE_Processor callback, | ||
909 | void *callback_cls) | ||
910 | { | ||
911 | struct GNUNET_MQ_Envelope *ev; | ||
912 | struct StoreKeyHashMessage *hm; | ||
913 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
914 | |||
915 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
916 | PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash); | ||
917 | wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); | ||
918 | wc->callback = callback; | ||
919 | wc->callback_cls = callback_cls; | ||
920 | wc->h = h; | ||
921 | wc->keyhash = hm->keyhash; | ||
922 | if (NULL == h->watches) | ||
923 | h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO); | ||
924 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( | ||
925 | h->watches, | ||
926 | &wc->keyhash, | ||
927 | wc, | ||
928 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
929 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
930 | "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", | ||
931 | sub_system, | ||
932 | GNUNET_i2s (peer), | ||
933 | key); | ||
934 | GNUNET_MQ_send (h->mq, ev); | ||
935 | return wc; | ||
936 | } | ||
937 | |||
938 | |||
939 | /* end of peerstore_api.c */ | ||