diff options
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r-- | src/peerstore/peerstore_api.c | 948 |
1 files changed, 0 insertions, 948 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c deleted file mode 100644 index d0c72acf1..000000000 --- a/src/peerstore/peerstore_api.c +++ /dev/null | |||
@@ -1,948 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2013-2016, 2019 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | /** | ||
21 | * @file peerstore/peerstore_api.c | ||
22 | * @brief API for peerstore | ||
23 | * @author Omar Tarabai | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "peerstore.h" | ||
29 | #include "peerstore_common.h" | ||
30 | |||
31 | #define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__) | ||
32 | |||
33 | /******************************************************************************/ | ||
34 | /************************ DATA STRUCTURES ****************************/ | ||
35 | /******************************************************************************/ | ||
36 | |||
37 | /** | ||
38 | * Handle to the PEERSTORE service. | ||
39 | */ | ||
40 | struct GNUNET_PEERSTORE_Handle | ||
41 | { | ||
42 | /** | ||
43 | * Our configuration. | ||
44 | */ | ||
45 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
46 | |||
47 | /** | ||
48 | * Message queue | ||
49 | */ | ||
50 | struct GNUNET_MQ_Handle *mq; | ||
51 | |||
52 | /** | ||
53 | * Head of active STORE requests. | ||
54 | */ | ||
55 | struct GNUNET_PEERSTORE_StoreContext *store_head; | ||
56 | |||
57 | /** | ||
58 | * Tail of active STORE requests. | ||
59 | */ | ||
60 | struct GNUNET_PEERSTORE_StoreContext *store_tail; | ||
61 | |||
62 | /** | ||
63 | * Head of active ITERATE requests. | ||
64 | */ | ||
65 | struct GNUNET_PEERSTORE_IterateContext *iterate_head; | ||
66 | |||
67 | /** | ||
68 | * Tail of active ITERATE requests. | ||
69 | */ | ||
70 | struct GNUNET_PEERSTORE_IterateContext *iterate_tail; | ||
71 | |||
72 | /** | ||
73 | * Hashmap of watch requests | ||
74 | */ | ||
75 | struct GNUNET_CONTAINER_MultiHashMap *watches; | ||
76 | |||
77 | /** | ||
78 | * ID of the task trying to reconnect to the service. | ||
79 | */ | ||
80 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
81 | |||
82 | /** | ||
83 | * Delay until we try to reconnect. | ||
84 | */ | ||
85 | struct GNUNET_TIME_Relative reconnect_delay; | ||
86 | |||
87 | /** | ||
88 | * Are we in the process of disconnecting but need to sync first? | ||
89 | */ | ||
90 | int disconnecting; | ||
91 | }; | ||
92 | |||
93 | /** | ||
94 | * Context for a store request | ||
95 | */ | ||
96 | struct GNUNET_PEERSTORE_StoreContext | ||
97 | { | ||
98 | /** | ||
99 | * Kept in a DLL. | ||
100 | */ | ||
101 | struct GNUNET_PEERSTORE_StoreContext *next; | ||
102 | |||
103 | /** | ||
104 | * Kept in a DLL. | ||
105 | */ | ||
106 | struct GNUNET_PEERSTORE_StoreContext *prev; | ||
107 | |||
108 | /** | ||
109 | * Handle to the PEERSTORE service. | ||
110 | */ | ||
111 | struct GNUNET_PEERSTORE_Handle *h; | ||
112 | |||
113 | /** | ||
114 | * Continuation called with service response | ||
115 | */ | ||
116 | GNUNET_PEERSTORE_Continuation cont; | ||
117 | |||
118 | /** | ||
119 | * Closure for @e cont | ||
120 | */ | ||
121 | void *cont_cls; | ||
122 | |||
123 | /** | ||
124 | * Which subsystem does the store? | ||
125 | */ | ||
126 | char *sub_system; | ||
127 | |||
128 | /** | ||
129 | * Key for the store operation. | ||
130 | */ | ||
131 | char *key; | ||
132 | |||
133 | /** | ||
134 | * Contains @e size bytes. | ||
135 | */ | ||
136 | void *value; | ||
137 | |||
138 | /** | ||
139 | * Peer the store is for. | ||
140 | */ | ||
141 | struct GNUNET_PeerIdentity peer; | ||
142 | |||
143 | /** | ||
144 | * Number of bytes in @e value. | ||
145 | */ | ||
146 | size_t size; | ||
147 | |||
148 | /** | ||
149 | * When does the value expire? | ||
150 | */ | ||
151 | struct GNUNET_TIME_Absolute expiry; | ||
152 | |||
153 | /** | ||
154 | * Options for the store operation. | ||
155 | */ | ||
156 | enum GNUNET_PEERSTORE_StoreOption options; | ||
157 | }; | ||
158 | |||
159 | /** | ||
160 | * Context for a iterate request | ||
161 | */ | ||
162 | struct GNUNET_PEERSTORE_IterateContext | ||
163 | { | ||
164 | /** | ||
165 | * Kept in a DLL. | ||
166 | */ | ||
167 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
168 | |||
169 | /** | ||
170 | * Kept in a DLL. | ||
171 | */ | ||
172 | struct GNUNET_PEERSTORE_IterateContext *prev; | ||
173 | |||
174 | /** | ||
175 | * Handle to the PEERSTORE service. | ||
176 | */ | ||
177 | struct GNUNET_PEERSTORE_Handle *h; | ||
178 | |||
179 | /** | ||
180 | * Which subsystem does the store? | ||
181 | */ | ||
182 | char *sub_system; | ||
183 | |||
184 | /** | ||
185 | * Peer the store is for. | ||
186 | */ | ||
187 | struct GNUNET_PeerIdentity peer; | ||
188 | |||
189 | /** | ||
190 | * Key for the store operation. | ||
191 | */ | ||
192 | char *key; | ||
193 | |||
194 | /** | ||
195 | * Callback with each matching record | ||
196 | */ | ||
197 | GNUNET_PEERSTORE_Processor callback; | ||
198 | |||
199 | /** | ||
200 | * Closure for @e callback | ||
201 | */ | ||
202 | void *callback_cls; | ||
203 | |||
204 | /** | ||
205 | * #GNUNET_YES if we are currently processing records. | ||
206 | */ | ||
207 | int iterating; | ||
208 | }; | ||
209 | |||
210 | /** | ||
211 | * Context for a watch request | ||
212 | */ | ||
213 | struct GNUNET_PEERSTORE_WatchContext | ||
214 | { | ||
215 | /** | ||
216 | * Kept in a DLL. | ||
217 | */ | ||
218 | struct GNUNET_PEERSTORE_WatchContext *next; | ||
219 | |||
220 | /** | ||
221 | * Kept in a DLL. | ||
222 | */ | ||
223 | struct GNUNET_PEERSTORE_WatchContext *prev; | ||
224 | |||
225 | /** | ||
226 | * Handle to the PEERSTORE service. | ||
227 | */ | ||
228 | struct GNUNET_PEERSTORE_Handle *h; | ||
229 | |||
230 | /** | ||
231 | * Callback with each record received | ||
232 | */ | ||
233 | GNUNET_PEERSTORE_Processor callback; | ||
234 | |||
235 | /** | ||
236 | * Closure for @e callback | ||
237 | */ | ||
238 | void *callback_cls; | ||
239 | |||
240 | /** | ||
241 | * Hash of the combined key | ||
242 | */ | ||
243 | struct GNUNET_HashCode keyhash; | ||
244 | }; | ||
245 | |||
246 | /******************************************************************************/ | ||
247 | /******************* DECLARATIONS *********************/ | ||
248 | /******************************************************************************/ | ||
249 | |||
250 | /** | ||
251 | * Close the existing connection to PEERSTORE and reconnect. | ||
252 | * | ||
253 | * @param cls a `struct GNUNET_PEERSTORE_Handle *h` | ||
254 | */ | ||
255 | static void | ||
256 | reconnect (void *cls); | ||
257 | |||
258 | |||
259 | /** | ||
260 | * Disconnect from the peerstore service. | ||
261 | * | ||
262 | * @param h peerstore handle to disconnect | ||
263 | */ | ||
264 | static void | ||
265 | disconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
266 | { | ||
267 | struct GNUNET_PEERSTORE_IterateContext *next; | ||
268 | |||
269 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; | ||
270 | ic = next) | ||
271 | { | ||
272 | next = ic->next; | ||
273 | if (GNUNET_YES == ic->iterating) | ||
274 | { | ||
275 | GNUNET_PEERSTORE_Processor icb; | ||
276 | void *icb_cls; | ||
277 | |||
278 | icb = ic->callback; | ||
279 | icb_cls = ic->callback_cls; | ||
280 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
281 | if (NULL != icb) | ||
282 | icb (icb_cls, NULL, "Iteration canceled due to reconnection"); | ||
283 | } | ||
284 | } | ||
285 | |||
286 | if (NULL != h->mq) | ||
287 | { | ||
288 | GNUNET_MQ_destroy (h->mq); | ||
289 | h->mq = NULL; | ||
290 | } | ||
291 | } | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Function that will schedule the job that will try | ||
296 | * to connect us again to the client. | ||
297 | * | ||
298 | * @param h peerstore to reconnect | ||
299 | */ | ||
300 | static void | ||
301 | disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
302 | { | ||
303 | GNUNET_assert (NULL == h->reconnect_task); | ||
304 | disconnect (h); | ||
305 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
306 | "Scheduling task to reconnect to PEERSTORE service in %s.\n", | ||
307 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); | ||
308 | h->reconnect_task = | ||
309 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); | ||
310 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
311 | } | ||
312 | |||
313 | |||
314 | /** | ||
315 | * Callback after MQ envelope is sent | ||
316 | * | ||
317 | * @param cls a `struct GNUNET_PEERSTORE_StoreContext *` | ||
318 | */ | ||
319 | static void | ||
320 | store_request_sent (void *cls) | ||
321 | { | ||
322 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; | ||
323 | GNUNET_PEERSTORE_Continuation cont; | ||
324 | void *cont_cls; | ||
325 | |||
326 | if (NULL != sc) | ||
327 | { | ||
328 | cont = sc->cont; | ||
329 | cont_cls = sc->cont_cls; | ||
330 | GNUNET_PEERSTORE_store_cancel (sc); | ||
331 | if (NULL != cont) | ||
332 | cont (cont_cls, GNUNET_OK); | ||
333 | } | ||
334 | } | ||
335 | |||
336 | |||
337 | /******************************************************************************/ | ||
338 | /******************* CONNECTION FUNCTIONS *********************/ | ||
339 | /******************************************************************************/ | ||
340 | |||
341 | |||
342 | /** | ||
343 | * Function called when we had trouble talking to the service. | ||
344 | */ | ||
345 | static void | ||
346 | handle_client_error (void *cls, enum GNUNET_MQ_Error error) | ||
347 | { | ||
348 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
349 | |||
350 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
351 | "Received an error notification from MQ of type: %d\n", | ||
352 | error); | ||
353 | disconnect_and_schedule_reconnect (h); | ||
354 | } | ||
355 | |||
356 | |||
357 | /** | ||
358 | * Iterator over previous watches to resend them | ||
359 | * | ||
360 | * @param cls the `struct GNUNET_PEERSTORE_Handle` | ||
361 | * @param key key for the watch | ||
362 | * @param value the `struct GNUNET_PEERSTORE_WatchContext *` | ||
363 | * @return #GNUNET_YES (continue to iterate) | ||
364 | */ | ||
365 | static int | ||
366 | rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
367 | { | ||
368 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
369 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | ||
370 | struct StoreKeyHashMessage *hm; | ||
371 | struct GNUNET_MQ_Envelope *ev; | ||
372 | |||
373 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
374 | hm->keyhash = wc->keyhash; | ||
375 | GNUNET_MQ_send (h->mq, ev); | ||
376 | return GNUNET_YES; | ||
377 | } | ||
378 | |||
379 | |||
380 | /** | ||
381 | * Iterator over watch requests to cancel them. | ||
382 | * | ||
383 | * @param cls unused | ||
384 | * @param key key to the watch request | ||
385 | * @param value watch context | ||
386 | * @return #GNUNET_YES to continue iteration | ||
387 | */ | ||
388 | static int | ||
389 | destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
390 | { | ||
391 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | ||
392 | |||
393 | GNUNET_PEERSTORE_watch_cancel (wc); | ||
394 | return GNUNET_YES; | ||
395 | } | ||
396 | |||
397 | |||
398 | /** | ||
399 | * Kill the connection to the service. This can be delayed in case of pending | ||
400 | * STORE requests and the user explicitly asked to sync first. Otherwise it is | ||
401 | * performed instantly. | ||
402 | * | ||
403 | * @param h Handle to the service. | ||
404 | */ | ||
405 | static void | ||
406 | final_disconnect (struct GNUNET_PEERSTORE_Handle *h) | ||
407 | { | ||
408 | if (NULL != h->mq) | ||
409 | { | ||
410 | GNUNET_MQ_destroy (h->mq); | ||
411 | h->mq = NULL; | ||
412 | } | ||
413 | GNUNET_free (h); | ||
414 | } | ||
415 | |||
416 | |||
417 | /** | ||
418 | * Connect to the PEERSTORE service. | ||
419 | * | ||
420 | * @param cfg configuration to use | ||
421 | * @return NULL on error | ||
422 | */ | ||
423 | struct GNUNET_PEERSTORE_Handle * | ||
424 | GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
425 | { | ||
426 | struct GNUNET_PEERSTORE_Handle *h; | ||
427 | |||
428 | h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); | ||
429 | h->cfg = cfg; | ||
430 | h->disconnecting = GNUNET_NO; | ||
431 | reconnect (h); | ||
432 | if (NULL == h->mq) | ||
433 | { | ||
434 | GNUNET_free (h); | ||
435 | return NULL; | ||
436 | } | ||
437 | return h; | ||
438 | } | ||
439 | |||
440 | |||
441 | /** | ||
442 | * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests | ||
443 | * will be canceled. | ||
444 | * Any pending STORE requests will depend on @e snyc_first flag. | ||
445 | * | ||
446 | * @param h handle to disconnect | ||
447 | * @param sync_first send any pending STORE requests before disconnecting | ||
448 | */ | ||
449 | void | ||
450 | GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first) | ||
451 | { | ||
452 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
453 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
454 | |||
455 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n"); | ||
456 | if (NULL != h->watches) | ||
457 | { | ||
458 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL); | ||
459 | GNUNET_CONTAINER_multihashmap_destroy (h->watches); | ||
460 | h->watches = NULL; | ||
461 | } | ||
462 | while (NULL != (ic = h->iterate_head)) | ||
463 | { | ||
464 | GNUNET_break (0); | ||
465 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
466 | } | ||
467 | if (NULL != h->store_head) | ||
468 | { | ||
469 | if (GNUNET_YES == sync_first) | ||
470 | { | ||
471 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
472 | "Delaying disconnection due to pending store requests.\n"); | ||
473 | h->disconnecting = GNUNET_YES; | ||
474 | return; | ||
475 | } | ||
476 | while (NULL != (sc = h->store_head)) | ||
477 | GNUNET_PEERSTORE_store_cancel (sc); | ||
478 | } | ||
479 | final_disconnect (h); | ||
480 | } | ||
481 | |||
482 | |||
483 | /******************************************************************************/ | ||
484 | /******************* STORE FUNCTIONS *********************/ | ||
485 | /******************************************************************************/ | ||
486 | |||
487 | |||
488 | /** | ||
489 | * Cancel a store request | ||
490 | * | ||
491 | * @param sc Store request context | ||
492 | */ | ||
493 | void | ||
494 | GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | ||
495 | { | ||
496 | struct GNUNET_PEERSTORE_Handle *h = sc->h; | ||
497 | |||
498 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
499 | "store cancel with sc %p \n", | ||
500 | sc); | ||
501 | GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc); | ||
502 | GNUNET_free (sc->sub_system); | ||
503 | GNUNET_free (sc->value); | ||
504 | GNUNET_free (sc->key); | ||
505 | GNUNET_free (sc); | ||
506 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
507 | "store cancel with sc %p is null\n", | ||
508 | sc); | ||
509 | if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head)) | ||
510 | final_disconnect (h); | ||
511 | } | ||
512 | |||
513 | |||
514 | /** | ||
515 | * Store a new entry in the PEERSTORE. | ||
516 | * Note that stored entries can be lost in some cases | ||
517 | * such as power failure. | ||
518 | * | ||
519 | * @param h Handle to the PEERSTORE service | ||
520 | * @param sub_system name of the sub system | ||
521 | * @param peer Peer Identity | ||
522 | * @param key entry key | ||
523 | * @param value entry value BLOB | ||
524 | * @param size size of @e value | ||
525 | * @param expiry absolute time after which the entry is (possibly) deleted | ||
526 | * @param options options specific to the storage operation | ||
527 | * @param cont Continuation function after the store request is sent | ||
528 | * @param cont_cls Closure for @a cont | ||
529 | */ | ||
530 | struct GNUNET_PEERSTORE_StoreContext * | ||
531 | GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | ||
532 | const char *sub_system, | ||
533 | const struct GNUNET_PeerIdentity *peer, | ||
534 | const char *key, | ||
535 | const void *value, | ||
536 | size_t size, | ||
537 | struct GNUNET_TIME_Absolute expiry, | ||
538 | enum GNUNET_PEERSTORE_StoreOption options, | ||
539 | GNUNET_PEERSTORE_Continuation cont, | ||
540 | void *cont_cls) | ||
541 | { | ||
542 | struct GNUNET_MQ_Envelope *ev; | ||
543 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
544 | |||
545 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
546 | "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n", | ||
547 | size, | ||
548 | sub_system, | ||
549 | GNUNET_i2s (peer), | ||
550 | key); | ||
551 | ev = | ||
552 | PEERSTORE_create_record_mq_envelope (sub_system, | ||
553 | peer, | ||
554 | key, | ||
555 | value, | ||
556 | size, | ||
557 | expiry, | ||
558 | options, | ||
559 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
560 | sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); | ||
561 | |||
562 | sc->sub_system = GNUNET_strdup (sub_system); | ||
563 | sc->peer = *peer; | ||
564 | sc->key = GNUNET_strdup (key); | ||
565 | sc->value = GNUNET_memdup (value, size); | ||
566 | sc->size = size; | ||
567 | sc->expiry = expiry; | ||
568 | sc->options = options; | ||
569 | sc->cont = cont; | ||
570 | sc->cont_cls = cont_cls; | ||
571 | sc->h = h; | ||
572 | |||
573 | GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc); | ||
574 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | ||
575 | GNUNET_MQ_send (h->mq, ev); | ||
576 | return sc; | ||
577 | } | ||
578 | |||
579 | |||
580 | /******************************************************************************/ | ||
581 | /******************* ITERATE FUNCTIONS *********************/ | ||
582 | /******************************************************************************/ | ||
583 | |||
584 | |||
585 | /** | ||
586 | * When a response for iterate request is received | ||
587 | * | ||
588 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
589 | * @param msg message received | ||
590 | */ | ||
591 | static void | ||
592 | handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg) | ||
593 | { | ||
594 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
595 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
596 | GNUNET_PEERSTORE_Processor callback; | ||
597 | void *callback_cls; | ||
598 | |||
599 | ic = h->iterate_head; | ||
600 | if (NULL == ic) | ||
601 | { | ||
602 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
603 | _ ("Unexpected iteration response, this should not happen.\n")); | ||
604 | disconnect_and_schedule_reconnect (h); | ||
605 | return; | ||
606 | } | ||
607 | callback = ic->callback; | ||
608 | callback_cls = ic->callback_cls; | ||
609 | ic->iterating = GNUNET_NO; | ||
610 | GNUNET_PEERSTORE_iterate_cancel (ic); | ||
611 | if (NULL != callback) | ||
612 | callback (callback_cls, NULL, NULL); | ||
613 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
614 | } | ||
615 | |||
616 | |||
617 | /** | ||
618 | * When a response for iterate request is received, check the | ||
619 | * message is well-formed. | ||
620 | * | ||
621 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
622 | * @param msg message received | ||
623 | */ | ||
624 | static int | ||
625 | check_iterate_result (void *cls, const struct StoreRecordMessage *msg) | ||
626 | { | ||
627 | /* we defer validation to #handle_iterate_result */ | ||
628 | return GNUNET_OK; | ||
629 | } | ||
630 | |||
631 | |||
632 | /** | ||
633 | * When a response for iterate request is received | ||
634 | * | ||
635 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
636 | * @param msg message received | ||
637 | */ | ||
638 | static void | ||
639 | handle_iterate_result (void *cls, const struct StoreRecordMessage *msg) | ||
640 | { | ||
641 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
642 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
643 | GNUNET_PEERSTORE_Processor callback; | ||
644 | void *callback_cls; | ||
645 | struct GNUNET_PEERSTORE_Record *record; | ||
646 | |||
647 | ic = h->iterate_head; | ||
648 | if (NULL == ic) | ||
649 | { | ||
650 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
651 | _ ("Unexpected iteration response, this should not happen.\n")); | ||
652 | disconnect_and_schedule_reconnect (h); | ||
653 | return; | ||
654 | } | ||
655 | ic->iterating = GNUNET_YES; | ||
656 | callback = ic->callback; | ||
657 | callback_cls = ic->callback_cls; | ||
658 | if (NULL == callback) | ||
659 | return; | ||
660 | record = PEERSTORE_parse_record_message (msg); | ||
661 | if (NULL == record) | ||
662 | { | ||
663 | callback (callback_cls, | ||
664 | NULL, | ||
665 | _ ("Received a malformed response from service.")); | ||
666 | } | ||
667 | else | ||
668 | { | ||
669 | callback (callback_cls, record, NULL); | ||
670 | PEERSTORE_destroy_record (record); | ||
671 | } | ||
672 | } | ||
673 | |||
674 | |||
675 | /** | ||
676 | * Cancel an iterate request | ||
677 | * Please do not call after the iterate request is done | ||
678 | * | ||
679 | * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate() | ||
680 | */ | ||
681 | void | ||
682 | GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) | ||
683 | { | ||
684 | if (GNUNET_NO == ic->iterating) | ||
685 | { | ||
686 | GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic); | ||
687 | GNUNET_free (ic->sub_system); | ||
688 | GNUNET_free (ic->key); | ||
689 | GNUNET_free (ic); | ||
690 | } | ||
691 | else | ||
692 | ic->callback = NULL; | ||
693 | } | ||
694 | |||
695 | |||
696 | /** | ||
697 | * Iterate over records matching supplied key information | ||
698 | * | ||
699 | * @param h handle to the PEERSTORE service | ||
700 | * @param sub_system name of sub system | ||
701 | * @param peer Peer identity (can be NULL) | ||
702 | * @param key entry key string (can be NULL) | ||
703 | * @param callback function called with each matching record, all NULL's on end | ||
704 | * @param callback_cls closure for @a callback | ||
705 | * @return Handle to iteration request | ||
706 | */ | ||
707 | struct GNUNET_PEERSTORE_IterateContext * | ||
708 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | ||
709 | const char *sub_system, | ||
710 | const struct GNUNET_PeerIdentity *peer, | ||
711 | const char *key, | ||
712 | GNUNET_PEERSTORE_Processor callback, | ||
713 | void *callback_cls) | ||
714 | { | ||
715 | struct GNUNET_MQ_Envelope *ev; | ||
716 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
717 | |||
718 | ev = | ||
719 | PEERSTORE_create_record_mq_envelope (sub_system, | ||
720 | peer, | ||
721 | key, | ||
722 | NULL, | ||
723 | 0, | ||
724 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
725 | 0, | ||
726 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
727 | ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); | ||
728 | ic->callback = callback; | ||
729 | ic->callback_cls = callback_cls; | ||
730 | ic->h = h; | ||
731 | ic->sub_system = GNUNET_strdup (sub_system); | ||
732 | if (NULL != peer) | ||
733 | ic->peer = *peer; | ||
734 | if (NULL != key) | ||
735 | ic->key = GNUNET_strdup (key); | ||
736 | GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic); | ||
737 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
738 | "Sending an iterate request for sub system `%s'\n", | ||
739 | sub_system); | ||
740 | GNUNET_MQ_send (h->mq, ev); | ||
741 | return ic; | ||
742 | } | ||
743 | |||
744 | |||
745 | /******************************************************************************/ | ||
746 | /******************* WATCH FUNCTIONS *********************/ | ||
747 | /******************************************************************************/ | ||
748 | |||
749 | /** | ||
750 | * When a watch record is received, validate it is well-formed. | ||
751 | * | ||
752 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
753 | * @param msg message received | ||
754 | */ | ||
755 | static int | ||
756 | check_watch_record (void *cls, const struct StoreRecordMessage *msg) | ||
757 | { | ||
758 | /* we defer validation to #handle_watch_result */ | ||
759 | return GNUNET_OK; | ||
760 | } | ||
761 | |||
762 | |||
763 | /** | ||
764 | * When a watch record is received, process it. | ||
765 | * | ||
766 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
767 | * @param msg message received | ||
768 | */ | ||
769 | static void | ||
770 | handle_watch_record (void *cls, const struct StoreRecordMessage *msg) | ||
771 | { | ||
772 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
773 | struct GNUNET_PEERSTORE_Record *record; | ||
774 | struct GNUNET_HashCode keyhash; | ||
775 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
776 | |||
777 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); | ||
778 | record = PEERSTORE_parse_record_message (msg); | ||
779 | if (NULL == record) | ||
780 | { | ||
781 | disconnect_and_schedule_reconnect (h); | ||
782 | return; | ||
783 | } | ||
784 | PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash); | ||
785 | // FIXME: what if there are multiple watches for the same key? | ||
786 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); | ||
787 | if (NULL == wc) | ||
788 | { | ||
789 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
790 | _ ("Received a watch result for a non existing watch.\n")); | ||
791 | PEERSTORE_destroy_record (record); | ||
792 | disconnect_and_schedule_reconnect (h); | ||
793 | return; | ||
794 | } | ||
795 | if (NULL != wc->callback) | ||
796 | wc->callback (wc->callback_cls, record, NULL); | ||
797 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
798 | PEERSTORE_destroy_record (record); | ||
799 | } | ||
800 | |||
801 | |||
802 | /** | ||
803 | * Close the existing connection to PEERSTORE and reconnect. | ||
804 | * | ||
805 | * @param cls a `struct GNUNET_PEERSTORE_Handle *` | ||
806 | */ | ||
807 | static void | ||
808 | reconnect (void *cls) | ||
809 | { | ||
810 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
811 | struct GNUNET_MQ_MessageHandler mq_handlers[] = | ||
812 | { GNUNET_MQ_hd_fixed_size (iterate_end, | ||
813 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, | ||
814 | struct GNUNET_MessageHeader, | ||
815 | h), | ||
816 | GNUNET_MQ_hd_var_size (iterate_result, | ||
817 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, | ||
818 | struct StoreRecordMessage, | ||
819 | h), | ||
820 | GNUNET_MQ_hd_var_size (watch_record, | ||
821 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, | ||
822 | struct StoreRecordMessage, | ||
823 | h), | ||
824 | GNUNET_MQ_handler_end () }; | ||
825 | struct GNUNET_MQ_Envelope *ev; | ||
826 | |||
827 | h->reconnect_task = NULL; | ||
828 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); | ||
829 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
830 | "peerstore", | ||
831 | mq_handlers, | ||
832 | &handle_client_error, | ||
833 | h); | ||
834 | if (NULL == h->mq) | ||
835 | { | ||
836 | h->reconnect_task = | ||
837 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); | ||
838 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
839 | return; | ||
840 | } | ||
841 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
842 | "Resending pending requests after reconnect.\n"); | ||
843 | if (NULL != h->watches) | ||
844 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h); | ||
845 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; | ||
846 | ic = ic->next) | ||
847 | { | ||
848 | ev = | ||
849 | PEERSTORE_create_record_mq_envelope (ic->sub_system, | ||
850 | &ic->peer, | ||
851 | ic->key, | ||
852 | NULL, | ||
853 | 0, | ||
854 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
855 | 0, | ||
856 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
857 | GNUNET_MQ_send (h->mq, ev); | ||
858 | } | ||
859 | for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc; | ||
860 | sc = sc->next) | ||
861 | { | ||
862 | ev = | ||
863 | PEERSTORE_create_record_mq_envelope (sc->sub_system, | ||
864 | &sc->peer, | ||
865 | sc->key, | ||
866 | sc->value, | ||
867 | sc->size, | ||
868 | sc->expiry, | ||
869 | sc->options, | ||
870 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
871 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | ||
872 | GNUNET_MQ_send (h->mq, ev); | ||
873 | } | ||
874 | } | ||
875 | |||
876 | |||
877 | /** | ||
878 | * Cancel a watch request | ||
879 | * | ||
880 | * @param wc handle to the watch request | ||
881 | */ | ||
882 | void | ||
883 | GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc) | ||
884 | { | ||
885 | struct GNUNET_PEERSTORE_Handle *h = wc->h; | ||
886 | struct GNUNET_MQ_Envelope *ev; | ||
887 | struct StoreKeyHashMessage *hm; | ||
888 | |||
889 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); | ||
890 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | ||
891 | hm->keyhash = wc->keyhash; | ||
892 | GNUNET_MQ_send (h->mq, ev); | ||
893 | GNUNET_assert ( | ||
894 | GNUNET_YES == | ||
895 | GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc)); | ||
896 | GNUNET_free (wc); | ||
897 | } | ||
898 | |||
899 | |||
900 | /** | ||
901 | * Request watching a given key | ||
902 | * User will be notified with any new values added to key | ||
903 | * | ||
904 | * @param h handle to the PEERSTORE service | ||
905 | * @param sub_system name of sub system | ||
906 | * @param peer Peer identity | ||
907 | * @param key entry key string | ||
908 | * @param callback function called with each new value | ||
909 | * @param callback_cls closure for @a callback | ||
910 | * @return Handle to watch request | ||
911 | */ | ||
912 | struct GNUNET_PEERSTORE_WatchContext * | ||
913 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | ||
914 | const char *sub_system, | ||
915 | const struct GNUNET_PeerIdentity *peer, | ||
916 | const char *key, | ||
917 | GNUNET_PEERSTORE_Processor callback, | ||
918 | void *callback_cls) | ||
919 | { | ||
920 | struct GNUNET_MQ_Envelope *ev; | ||
921 | struct StoreKeyHashMessage *hm; | ||
922 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
923 | |||
924 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
925 | PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash); | ||
926 | wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); | ||
927 | wc->callback = callback; | ||
928 | wc->callback_cls = callback_cls; | ||
929 | wc->h = h; | ||
930 | wc->keyhash = hm->keyhash; | ||
931 | if (NULL == h->watches) | ||
932 | h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO); | ||
933 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( | ||
934 | h->watches, | ||
935 | &wc->keyhash, | ||
936 | wc, | ||
937 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
938 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
939 | "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", | ||
940 | sub_system, | ||
941 | GNUNET_i2s (peer), | ||
942 | key); | ||
943 | GNUNET_MQ_send (h->mq, ev); | ||
944 | return wc; | ||
945 | } | ||
946 | |||
947 | |||
948 | /* end of peerstore_api.c */ | ||