summaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c540
1 files changed, 272 insertions, 268 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 3ce2c5533..e5881a6b4 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -28,7 +28,7 @@
28#include "peerstore.h" 28#include "peerstore.h"
29#include "peerstore_common.h" 29#include "peerstore_common.h"
30 30
31#define LOG(kind, ...) GNUNET_log_from(kind, "peerstore-api", __VA_ARGS__) 31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32 32
33/******************************************************************************/ 33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/ 34/************************ DATA STRUCTURES ****************************/
@@ -37,7 +37,8 @@
37/** 37/**
38 * Handle to the PEERSTORE service. 38 * Handle to the PEERSTORE service.
39 */ 39 */
40struct GNUNET_PEERSTORE_Handle { 40struct GNUNET_PEERSTORE_Handle
41{
41 /** 42 /**
42 * Our configuration. 43 * Our configuration.
43 */ 44 */
@@ -92,7 +93,8 @@ struct GNUNET_PEERSTORE_Handle {
92/** 93/**
93 * Context for a store request 94 * Context for a store request
94 */ 95 */
95struct GNUNET_PEERSTORE_StoreContext { 96struct GNUNET_PEERSTORE_StoreContext
97{
96 /** 98 /**
97 * Kept in a DLL. 99 * Kept in a DLL.
98 */ 100 */
@@ -157,7 +159,8 @@ struct GNUNET_PEERSTORE_StoreContext {
157/** 159/**
158 * Context for a iterate request 160 * Context for a iterate request
159 */ 161 */
160struct GNUNET_PEERSTORE_IterateContext { 162struct GNUNET_PEERSTORE_IterateContext
163{
161 /** 164 /**
162 * Kept in a DLL. 165 * Kept in a DLL.
163 */ 166 */
@@ -207,7 +210,8 @@ struct GNUNET_PEERSTORE_IterateContext {
207/** 210/**
208 * Context for a watch request 211 * Context for a watch request
209 */ 212 */
210struct GNUNET_PEERSTORE_WatchContext { 213struct GNUNET_PEERSTORE_WatchContext
214{
211 /** 215 /**
212 * Kept in a DLL. 216 * Kept in a DLL.
213 */ 217 */
@@ -249,7 +253,7 @@ struct GNUNET_PEERSTORE_WatchContext {
249 * @param cls a `struct GNUNET_PEERSTORE_Handle *h` 253 * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
250 */ 254 */
251static void 255static void
252reconnect(void *cls); 256reconnect (void *cls);
253 257
254 258
255/** 259/**
@@ -258,32 +262,32 @@ reconnect(void *cls);
258 * @param h peerstore handle to disconnect 262 * @param h peerstore handle to disconnect
259 */ 263 */
260static void 264static void
261disconnect(struct GNUNET_PEERSTORE_Handle *h) 265disconnect (struct GNUNET_PEERSTORE_Handle *h)
262{ 266{
263 struct GNUNET_PEERSTORE_IterateContext *next; 267 struct GNUNET_PEERSTORE_IterateContext *next;
264 268
265 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; 269 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
266 ic = next) 270 ic = next)
271 {
272 next = ic->next;
273 if (GNUNET_YES == ic->iterating)
267 { 274 {
268 next = ic->next; 275 GNUNET_PEERSTORE_Processor icb;
269 if (GNUNET_YES == ic->iterating) 276 void *icb_cls;
270 { 277
271 GNUNET_PEERSTORE_Processor icb; 278 icb = ic->callback;
272 void *icb_cls; 279 icb_cls = ic->callback_cls;
273 280 GNUNET_PEERSTORE_iterate_cancel (ic);
274 icb = ic->callback; 281 if (NULL != icb)
275 icb_cls = ic->callback_cls; 282 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
276 GNUNET_PEERSTORE_iterate_cancel(ic);
277 if (NULL != icb)
278 icb(icb_cls, NULL, "Iteration canceled due to reconnection");
279 }
280 } 283 }
284 }
281 285
282 if (NULL != h->mq) 286 if (NULL != h->mq)
283 { 287 {
284 GNUNET_MQ_destroy(h->mq); 288 GNUNET_MQ_destroy (h->mq);
285 h->mq = NULL; 289 h->mq = NULL;
286 } 290 }
287} 291}
288 292
289 293
@@ -294,16 +298,16 @@ disconnect(struct GNUNET_PEERSTORE_Handle *h)
294 * @param h peerstore to reconnect 298 * @param h peerstore to reconnect
295 */ 299 */
296static void 300static void
297disconnect_and_schedule_reconnect(struct GNUNET_PEERSTORE_Handle *h) 301disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
298{ 302{
299 GNUNET_assert(NULL == h->reconnect_task); 303 GNUNET_assert (NULL == h->reconnect_task);
300 disconnect(h); 304 disconnect (h);
301 LOG(GNUNET_ERROR_TYPE_DEBUG, 305 LOG (GNUNET_ERROR_TYPE_DEBUG,
302 "Scheduling task to reconnect to PEERSTORE service in %s.\n", 306 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
303 GNUNET_STRINGS_relative_time_to_string(h->reconnect_delay, GNUNET_YES)); 307 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
304 h->reconnect_task = 308 h->reconnect_task =
305 GNUNET_SCHEDULER_add_delayed(h->reconnect_delay, &reconnect, h); 309 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
306 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF(h->reconnect_delay); 310 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
307} 311}
308 312
309 313
@@ -313,7 +317,7 @@ disconnect_and_schedule_reconnect(struct GNUNET_PEERSTORE_Handle *h)
313 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *` 317 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
314 */ 318 */
315static void 319static void
316store_request_sent(void *cls) 320store_request_sent (void *cls)
317{ 321{
318 struct GNUNET_PEERSTORE_StoreContext *sc = cls; 322 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
319 GNUNET_PEERSTORE_Continuation cont; 323 GNUNET_PEERSTORE_Continuation cont;
@@ -321,9 +325,9 @@ store_request_sent(void *cls)
321 325
322 cont = sc->cont; 326 cont = sc->cont;
323 cont_cls = sc->cont_cls; 327 cont_cls = sc->cont_cls;
324 GNUNET_PEERSTORE_store_cancel(sc); 328 GNUNET_PEERSTORE_store_cancel (sc);
325 if (NULL != cont) 329 if (NULL != cont)
326 cont(cont_cls, GNUNET_OK); 330 cont (cont_cls, GNUNET_OK);
327} 331}
328 332
329 333
@@ -336,14 +340,14 @@ store_request_sent(void *cls)
336 * Function called when we had trouble talking to the service. 340 * Function called when we had trouble talking to the service.
337 */ 341 */
338static void 342static void
339handle_client_error(void *cls, enum GNUNET_MQ_Error error) 343handle_client_error (void *cls, enum GNUNET_MQ_Error error)
340{ 344{
341 struct GNUNET_PEERSTORE_Handle *h = cls; 345 struct GNUNET_PEERSTORE_Handle *h = cls;
342 346
343 LOG(GNUNET_ERROR_TYPE_ERROR, 347 LOG (GNUNET_ERROR_TYPE_ERROR,
344 "Received an error notification from MQ of type: %d\n", 348 "Received an error notification from MQ of type: %d\n",
345 error); 349 error);
346 disconnect_and_schedule_reconnect(h); 350 disconnect_and_schedule_reconnect (h);
347} 351}
348 352
349 353
@@ -356,16 +360,16 @@ handle_client_error(void *cls, enum GNUNET_MQ_Error error)
356 * @return #GNUNET_YES (continue to iterate) 360 * @return #GNUNET_YES (continue to iterate)
357 */ 361 */
358static int 362static int
359rewatch_it(void *cls, const struct GNUNET_HashCode *key, void *value) 363rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
360{ 364{
361 struct GNUNET_PEERSTORE_Handle *h = cls; 365 struct GNUNET_PEERSTORE_Handle *h = cls;
362 struct GNUNET_PEERSTORE_WatchContext *wc = value; 366 struct GNUNET_PEERSTORE_WatchContext *wc = value;
363 struct StoreKeyHashMessage *hm; 367 struct StoreKeyHashMessage *hm;
364 struct GNUNET_MQ_Envelope *ev; 368 struct GNUNET_MQ_Envelope *ev;
365 369
366 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 370 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
367 hm->keyhash = wc->keyhash; 371 hm->keyhash = wc->keyhash;
368 GNUNET_MQ_send(h->mq, ev); 372 GNUNET_MQ_send (h->mq, ev);
369 return GNUNET_YES; 373 return GNUNET_YES;
370} 374}
371 375
@@ -379,11 +383,11 @@ rewatch_it(void *cls, const struct GNUNET_HashCode *key, void *value)
379 * @return #GNUNET_YES to continue iteration 383 * @return #GNUNET_YES to continue iteration
380 */ 384 */
381static int 385static int
382destroy_watch(void *cls, const struct GNUNET_HashCode *key, void *value) 386destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
383{ 387{
384 struct GNUNET_PEERSTORE_WatchContext *wc = value; 388 struct GNUNET_PEERSTORE_WatchContext *wc = value;
385 389
386 GNUNET_PEERSTORE_watch_cancel(wc); 390 GNUNET_PEERSTORE_watch_cancel (wc);
387 return GNUNET_YES; 391 return GNUNET_YES;
388} 392}
389 393
@@ -396,14 +400,14 @@ destroy_watch(void *cls, const struct GNUNET_HashCode *key, void *value)
396 * @param h Handle to the service. 400 * @param h Handle to the service.
397 */ 401 */
398static void 402static void
399final_disconnect(struct GNUNET_PEERSTORE_Handle *h) 403final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
400{ 404{
401 if (NULL != h->mq) 405 if (NULL != h->mq)
402 { 406 {
403 GNUNET_MQ_destroy(h->mq); 407 GNUNET_MQ_destroy (h->mq);
404 h->mq = NULL; 408 h->mq = NULL;
405 } 409 }
406 GNUNET_free(h); 410 GNUNET_free (h);
407} 411}
408 412
409 413
@@ -414,19 +418,19 @@ final_disconnect(struct GNUNET_PEERSTORE_Handle *h)
414 * @return NULL on error 418 * @return NULL on error
415 */ 419 */
416struct GNUNET_PEERSTORE_Handle * 420struct GNUNET_PEERSTORE_Handle *
417GNUNET_PEERSTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg) 421GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
418{ 422{
419 struct GNUNET_PEERSTORE_Handle *h; 423 struct GNUNET_PEERSTORE_Handle *h;
420 424
421 h = GNUNET_new(struct GNUNET_PEERSTORE_Handle); 425 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
422 h->cfg = cfg; 426 h->cfg = cfg;
423 h->disconnecting = GNUNET_NO; 427 h->disconnecting = GNUNET_NO;
424 reconnect(h); 428 reconnect (h);
425 if (NULL == h->mq) 429 if (NULL == h->mq)
426 { 430 {
427 GNUNET_free(h); 431 GNUNET_free (h);
428 return NULL; 432 return NULL;
429 } 433 }
430 return h; 434 return h;
431} 435}
432 436
@@ -440,36 +444,36 @@ GNUNET_PEERSTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
440 * @param sync_first send any pending STORE requests before disconnecting 444 * @param sync_first send any pending STORE requests before disconnecting
441 */ 445 */
442void 446void
443GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h, int sync_first) 447GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
444{ 448{
445 struct GNUNET_PEERSTORE_IterateContext *ic; 449 struct GNUNET_PEERSTORE_IterateContext *ic;
446 struct GNUNET_PEERSTORE_StoreContext *sc; 450 struct GNUNET_PEERSTORE_StoreContext *sc;
447 451
448 LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n"); 452 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
449 if (NULL != h->watches) 453 if (NULL != h->watches)
450 { 454 {
451 GNUNET_CONTAINER_multihashmap_iterate(h->watches, &destroy_watch, NULL); 455 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
452 GNUNET_CONTAINER_multihashmap_destroy(h->watches); 456 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
453 h->watches = NULL; 457 h->watches = NULL;
454 } 458 }
455 while (NULL != (ic = h->iterate_head)) 459 while (NULL != (ic = h->iterate_head))
456 { 460 {
457 GNUNET_break(0); 461 GNUNET_break (0);
458 GNUNET_PEERSTORE_iterate_cancel(ic); 462 GNUNET_PEERSTORE_iterate_cancel (ic);
459 } 463 }
460 if (NULL != h->store_head) 464 if (NULL != h->store_head)
465 {
466 if (GNUNET_YES == sync_first)
461 { 467 {
462 if (GNUNET_YES == sync_first) 468 LOG (GNUNET_ERROR_TYPE_DEBUG,
463 { 469 "Delaying disconnection due to pending store requests.\n");
464 LOG(GNUNET_ERROR_TYPE_DEBUG, 470 h->disconnecting = GNUNET_YES;
465 "Delaying disconnection due to pending store requests.\n"); 471 return;
466 h->disconnecting = GNUNET_YES;
467 return;
468 }
469 while (NULL != (sc = h->store_head))
470 GNUNET_PEERSTORE_store_cancel(sc);
471 } 472 }
472 final_disconnect(h); 473 while (NULL != (sc = h->store_head))
474 GNUNET_PEERSTORE_store_cancel (sc);
475 }
476 final_disconnect (h);
473} 477}
474 478
475 479
@@ -484,17 +488,17 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h, int sync_first)
484 * @param sc Store request context 488 * @param sc Store request context
485 */ 489 */
486void 490void
487GNUNET_PEERSTORE_store_cancel(struct GNUNET_PEERSTORE_StoreContext *sc) 491GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
488{ 492{
489 struct GNUNET_PEERSTORE_Handle *h = sc->h; 493 struct GNUNET_PEERSTORE_Handle *h = sc->h;
490 494
491 GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc); 495 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
492 GNUNET_free(sc->sub_system); 496 GNUNET_free (sc->sub_system);
493 GNUNET_free(sc->value); 497 GNUNET_free (sc->value);
494 GNUNET_free(sc->key); 498 GNUNET_free (sc->key);
495 GNUNET_free(sc); 499 GNUNET_free (sc);
496 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head)) 500 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
497 final_disconnect(h); 501 final_disconnect (h);
498} 502}
499 503
500 504
@@ -515,41 +519,41 @@ GNUNET_PEERSTORE_store_cancel(struct GNUNET_PEERSTORE_StoreContext *sc)
515 * @param cont_cls Closure for @a cont 519 * @param cont_cls Closure for @a cont
516 */ 520 */
517struct GNUNET_PEERSTORE_StoreContext * 521struct GNUNET_PEERSTORE_StoreContext *
518GNUNET_PEERSTORE_store(struct GNUNET_PEERSTORE_Handle *h, 522GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
519 const char *sub_system, 523 const char *sub_system,
520 const struct GNUNET_PeerIdentity *peer, 524 const struct GNUNET_PeerIdentity *peer,
521 const char *key, 525 const char *key,
522 const void *value, 526 const void *value,
523 size_t size, 527 size_t size,
524 struct GNUNET_TIME_Absolute expiry, 528 struct GNUNET_TIME_Absolute expiry,
525 enum GNUNET_PEERSTORE_StoreOption options, 529 enum GNUNET_PEERSTORE_StoreOption options,
526 GNUNET_PEERSTORE_Continuation cont, 530 GNUNET_PEERSTORE_Continuation cont,
527 void *cont_cls) 531 void *cont_cls)
528{ 532{
529 struct GNUNET_MQ_Envelope *ev; 533 struct GNUNET_MQ_Envelope *ev;
530 struct GNUNET_PEERSTORE_StoreContext *sc; 534 struct GNUNET_PEERSTORE_StoreContext *sc;
531 535
532 LOG(GNUNET_ERROR_TYPE_DEBUG, 536 LOG (GNUNET_ERROR_TYPE_DEBUG,
533 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", 537 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
534 size, 538 size,
535 sub_system, 539 sub_system,
536 GNUNET_i2s(peer), 540 GNUNET_i2s (peer),
537 key); 541 key);
538 ev = 542 ev =
539 PEERSTORE_create_record_mq_envelope(sub_system, 543 PEERSTORE_create_record_mq_envelope (sub_system,
540 peer, 544 peer,
541 key, 545 key,
542 value, 546 value,
543 size, 547 size,
544 expiry, 548 expiry,
545 options, 549 options,
546 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 550 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
547 sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); 551 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
548 552
549 sc->sub_system = GNUNET_strdup(sub_system); 553 sc->sub_system = GNUNET_strdup (sub_system);
550 sc->peer = *peer; 554 sc->peer = *peer;
551 sc->key = GNUNET_strdup(key); 555 sc->key = GNUNET_strdup (key);
552 sc->value = GNUNET_memdup(value, size); 556 sc->value = GNUNET_memdup (value, size);
553 sc->size = size; 557 sc->size = size;
554 sc->expiry = expiry; 558 sc->expiry = expiry;
555 sc->options = options; 559 sc->options = options;
@@ -557,9 +561,9 @@ GNUNET_PEERSTORE_store(struct GNUNET_PEERSTORE_Handle *h,
557 sc->cont_cls = cont_cls; 561 sc->cont_cls = cont_cls;
558 sc->h = h; 562 sc->h = h;
559 563
560 GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc); 564 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
561 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); 565 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
562 GNUNET_MQ_send(h->mq, ev); 566 GNUNET_MQ_send (h->mq, ev);
563 return sc; 567 return sc;
564} 568}
565 569
@@ -576,7 +580,7 @@ GNUNET_PEERSTORE_store(struct GNUNET_PEERSTORE_Handle *h,
576 * @param msg message received 580 * @param msg message received
577 */ 581 */
578static void 582static void
579handle_iterate_end(void *cls, const struct GNUNET_MessageHeader *msg) 583handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
580{ 584{
581 struct GNUNET_PEERSTORE_Handle *h = cls; 585 struct GNUNET_PEERSTORE_Handle *h = cls;
582 struct GNUNET_PEERSTORE_IterateContext *ic; 586 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -585,18 +589,18 @@ handle_iterate_end(void *cls, const struct GNUNET_MessageHeader *msg)
585 589
586 ic = h->iterate_head; 590 ic = h->iterate_head;
587 if (NULL == ic) 591 if (NULL == ic)
588 { 592 {
589 LOG(GNUNET_ERROR_TYPE_ERROR, 593 LOG (GNUNET_ERROR_TYPE_ERROR,
590 _("Unexpected iteration response, this should not happen.\n")); 594 _ ("Unexpected iteration response, this should not happen.\n"));
591 disconnect_and_schedule_reconnect(h); 595 disconnect_and_schedule_reconnect (h);
592 return; 596 return;
593 } 597 }
594 callback = ic->callback; 598 callback = ic->callback;
595 callback_cls = ic->callback_cls; 599 callback_cls = ic->callback_cls;
596 ic->iterating = GNUNET_NO; 600 ic->iterating = GNUNET_NO;
597 GNUNET_PEERSTORE_iterate_cancel(ic); 601 GNUNET_PEERSTORE_iterate_cancel (ic);
598 if (NULL != callback) 602 if (NULL != callback)
599 callback(callback_cls, NULL, NULL); 603 callback (callback_cls, NULL, NULL);
600 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 604 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
601} 605}
602 606
@@ -609,7 +613,7 @@ handle_iterate_end(void *cls, const struct GNUNET_MessageHeader *msg)
609 * @param msg message received 613 * @param msg message received
610 */ 614 */
611static int 615static int
612check_iterate_result(void *cls, const struct StoreRecordMessage *msg) 616check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
613{ 617{
614 /* we defer validation to #handle_iterate_result */ 618 /* we defer validation to #handle_iterate_result */
615 return GNUNET_OK; 619 return GNUNET_OK;
@@ -623,7 +627,7 @@ check_iterate_result(void *cls, const struct StoreRecordMessage *msg)
623 * @param msg message received 627 * @param msg message received
624 */ 628 */
625static void 629static void
626handle_iterate_result(void *cls, const struct StoreRecordMessage *msg) 630handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
627{ 631{
628 struct GNUNET_PEERSTORE_Handle *h = cls; 632 struct GNUNET_PEERSTORE_Handle *h = cls;
629 struct GNUNET_PEERSTORE_IterateContext *ic; 633 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -633,29 +637,29 @@ handle_iterate_result(void *cls, const struct StoreRecordMessage *msg)
633 637
634 ic = h->iterate_head; 638 ic = h->iterate_head;
635 if (NULL == ic) 639 if (NULL == ic)
636 { 640 {
637 LOG(GNUNET_ERROR_TYPE_ERROR, 641 LOG (GNUNET_ERROR_TYPE_ERROR,
638 _("Unexpected iteration response, this should not happen.\n")); 642 _ ("Unexpected iteration response, this should not happen.\n"));
639 disconnect_and_schedule_reconnect(h); 643 disconnect_and_schedule_reconnect (h);
640 return; 644 return;
641 } 645 }
642 ic->iterating = GNUNET_YES; 646 ic->iterating = GNUNET_YES;
643 callback = ic->callback; 647 callback = ic->callback;
644 callback_cls = ic->callback_cls; 648 callback_cls = ic->callback_cls;
645 if (NULL == callback) 649 if (NULL == callback)
646 return; 650 return;
647 record = PEERSTORE_parse_record_message(msg); 651 record = PEERSTORE_parse_record_message (msg);
648 if (NULL == record) 652 if (NULL == record)
649 { 653 {
650 callback(callback_cls, 654 callback (callback_cls,
651 NULL, 655 NULL,
652 _("Received a malformed response from service.")); 656 _ ("Received a malformed response from service."));
653 } 657 }
654 else 658 else
655 { 659 {
656 callback(callback_cls, record, NULL); 660 callback (callback_cls, record, NULL);
657 PEERSTORE_destroy_record(record); 661 PEERSTORE_destroy_record (record);
658 } 662 }
659} 663}
660 664
661 665
@@ -666,15 +670,15 @@ handle_iterate_result(void *cls, const struct StoreRecordMessage *msg)
666 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate() 670 * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
667 */ 671 */
668void 672void
669GNUNET_PEERSTORE_iterate_cancel(struct GNUNET_PEERSTORE_IterateContext *ic) 673GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
670{ 674{
671 if (GNUNET_NO == ic->iterating) 675 if (GNUNET_NO == ic->iterating)
672 { 676 {
673 GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic); 677 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
674 GNUNET_free(ic->sub_system); 678 GNUNET_free (ic->sub_system);
675 GNUNET_free_non_null(ic->key); 679 GNUNET_free_non_null (ic->key);
676 GNUNET_free(ic); 680 GNUNET_free (ic);
677 } 681 }
678 else 682 else
679 ic->callback = NULL; 683 ic->callback = NULL;
680} 684}
@@ -692,39 +696,39 @@ GNUNET_PEERSTORE_iterate_cancel(struct GNUNET_PEERSTORE_IterateContext *ic)
692 * @return Handle to iteration request 696 * @return Handle to iteration request
693 */ 697 */
694struct GNUNET_PEERSTORE_IterateContext * 698struct GNUNET_PEERSTORE_IterateContext *
695GNUNET_PEERSTORE_iterate(struct GNUNET_PEERSTORE_Handle *h, 699GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
696 const char *sub_system, 700 const char *sub_system,
697 const struct GNUNET_PeerIdentity *peer, 701 const struct GNUNET_PeerIdentity *peer,
698 const char *key, 702 const char *key,
699 GNUNET_PEERSTORE_Processor callback, 703 GNUNET_PEERSTORE_Processor callback,
700 void *callback_cls) 704 void *callback_cls)
701{ 705{
702 struct GNUNET_MQ_Envelope *ev; 706 struct GNUNET_MQ_Envelope *ev;
703 struct GNUNET_PEERSTORE_IterateContext *ic; 707 struct GNUNET_PEERSTORE_IterateContext *ic;
704 708
705 ev = 709 ev =
706 PEERSTORE_create_record_mq_envelope(sub_system, 710 PEERSTORE_create_record_mq_envelope (sub_system,
707 peer, 711 peer,
708 key, 712 key,
709 NULL, 713 NULL,
710 0, 714 0,
711 GNUNET_TIME_UNIT_FOREVER_ABS, 715 GNUNET_TIME_UNIT_FOREVER_ABS,
712 0, 716 0,
713 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 717 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
714 ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext); 718 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
715 ic->callback = callback; 719 ic->callback = callback;
716 ic->callback_cls = callback_cls; 720 ic->callback_cls = callback_cls;
717 ic->h = h; 721 ic->h = h;
718 ic->sub_system = GNUNET_strdup(sub_system); 722 ic->sub_system = GNUNET_strdup (sub_system);
719 if (NULL != peer) 723 if (NULL != peer)
720 ic->peer = *peer; 724 ic->peer = *peer;
721 if (NULL != key) 725 if (NULL != key)
722 ic->key = GNUNET_strdup(key); 726 ic->key = GNUNET_strdup (key);
723 GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic); 727 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
724 LOG(GNUNET_ERROR_TYPE_DEBUG, 728 LOG (GNUNET_ERROR_TYPE_DEBUG,
725 "Sending an iterate request for sub system `%s'\n", 729 "Sending an iterate request for sub system `%s'\n",
726 sub_system); 730 sub_system);
727 GNUNET_MQ_send(h->mq, ev); 731 GNUNET_MQ_send (h->mq, ev);
728 return ic; 732 return ic;
729} 733}
730 734
@@ -740,7 +744,7 @@ GNUNET_PEERSTORE_iterate(struct GNUNET_PEERSTORE_Handle *h,
740 * @param msg message received 744 * @param msg message received
741 */ 745 */
742static int 746static int
743check_watch_record(void *cls, const struct StoreRecordMessage *msg) 747check_watch_record (void *cls, const struct StoreRecordMessage *msg)
744{ 748{
745 /* we defer validation to #handle_watch_result */ 749 /* we defer validation to #handle_watch_result */
746 return GNUNET_OK; 750 return GNUNET_OK;
@@ -754,35 +758,35 @@ check_watch_record(void *cls, const struct StoreRecordMessage *msg)
754 * @param msg message received 758 * @param msg message received
755 */ 759 */
756static void 760static void
757handle_watch_record(void *cls, const struct StoreRecordMessage *msg) 761handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
758{ 762{
759 struct GNUNET_PEERSTORE_Handle *h = cls; 763 struct GNUNET_PEERSTORE_Handle *h = cls;
760 struct GNUNET_PEERSTORE_Record *record; 764 struct GNUNET_PEERSTORE_Record *record;
761 struct GNUNET_HashCode keyhash; 765 struct GNUNET_HashCode keyhash;
762 struct GNUNET_PEERSTORE_WatchContext *wc; 766 struct GNUNET_PEERSTORE_WatchContext *wc;
763 767
764 LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); 768 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
765 record = PEERSTORE_parse_record_message(msg); 769 record = PEERSTORE_parse_record_message (msg);
766 if (NULL == record) 770 if (NULL == record)
767 { 771 {
768 disconnect_and_schedule_reconnect(h); 772 disconnect_and_schedule_reconnect (h);
769 return; 773 return;
770 } 774 }
771 PEERSTORE_hash_key(record->sub_system, &record->peer, record->key, &keyhash); 775 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
772 // FIXME: what if there are multiple watches for the same key? 776 // FIXME: what if there are multiple watches for the same key?
773 wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash); 777 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
774 if (NULL == wc) 778 if (NULL == wc)
775 { 779 {
776 LOG(GNUNET_ERROR_TYPE_ERROR, 780 LOG (GNUNET_ERROR_TYPE_ERROR,
777 _("Received a watch result for a non existing watch.\n")); 781 _ ("Received a watch result for a non existing watch.\n"));
778 PEERSTORE_destroy_record(record); 782 PEERSTORE_destroy_record (record);
779 disconnect_and_schedule_reconnect(h); 783 disconnect_and_schedule_reconnect (h);
780 return; 784 return;
781 } 785 }
782 if (NULL != wc->callback) 786 if (NULL != wc->callback)
783 wc->callback(wc->callback_cls, record, NULL); 787 wc->callback (wc->callback_cls, record, NULL);
784 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 788 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
785 PEERSTORE_destroy_record(record); 789 PEERSTORE_destroy_record (record);
786} 790}
787 791
788 792
@@ -792,67 +796,67 @@ handle_watch_record(void *cls, const struct StoreRecordMessage *msg)
792 * @param cls a `struct GNUNET_PEERSTORE_Handle *` 796 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
793 */ 797 */
794static void 798static void
795reconnect(void *cls) 799reconnect (void *cls)
796{ 800{
797 struct GNUNET_PEERSTORE_Handle *h = cls; 801 struct GNUNET_PEERSTORE_Handle *h = cls;
798 struct GNUNET_MQ_MessageHandler mq_handlers[] = 802 struct GNUNET_MQ_MessageHandler mq_handlers[] =
799 { GNUNET_MQ_hd_fixed_size(iterate_end, 803 { GNUNET_MQ_hd_fixed_size (iterate_end,
800 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, 804 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
801 struct GNUNET_MessageHeader, 805 struct GNUNET_MessageHeader,
802 h), 806 h),
803 GNUNET_MQ_hd_var_size(iterate_result, 807 GNUNET_MQ_hd_var_size (iterate_result,
804 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 808 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
805 struct StoreRecordMessage, 809 struct StoreRecordMessage,
806 h), 810 h),
807 GNUNET_MQ_hd_var_size(watch_record, 811 GNUNET_MQ_hd_var_size (watch_record,
808 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 812 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
809 struct StoreRecordMessage, 813 struct StoreRecordMessage,
810 h), 814 h),
811 GNUNET_MQ_handler_end() }; 815 GNUNET_MQ_handler_end () };
812 struct GNUNET_MQ_Envelope *ev; 816 struct GNUNET_MQ_Envelope *ev;
813 817
814 h->reconnect_task = NULL; 818 h->reconnect_task = NULL;
815 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); 819 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
816 h->mq = GNUNET_CLIENT_connect(h->cfg, 820 h->mq = GNUNET_CLIENT_connect (h->cfg,
817 "peerstore", 821 "peerstore",
818 mq_handlers, 822 mq_handlers,
819 &handle_client_error, 823 &handle_client_error,
820 h); 824 h);
821 if (NULL == h->mq) 825 if (NULL == h->mq)
822 return; 826 return;
823 LOG(GNUNET_ERROR_TYPE_DEBUG, 827 LOG (GNUNET_ERROR_TYPE_DEBUG,
824 "Resending pending requests after reconnect.\n"); 828 "Resending pending requests after reconnect.\n");
825 if (NULL != h->watches) 829 if (NULL != h->watches)
826 GNUNET_CONTAINER_multihashmap_iterate(h->watches, &rewatch_it, h); 830 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
827 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; 831 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
828 ic = ic->next) 832 ic = ic->next)
829 { 833 {
830 ev = 834 ev =
831 PEERSTORE_create_record_mq_envelope(ic->sub_system, 835 PEERSTORE_create_record_mq_envelope (ic->sub_system,
832 &ic->peer, 836 &ic->peer,
833 ic->key, 837 ic->key,
834 NULL, 838 NULL,
835 0, 839 0,
836 GNUNET_TIME_UNIT_FOREVER_ABS, 840 GNUNET_TIME_UNIT_FOREVER_ABS,
837 0, 841 0,
838 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 842 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
839 GNUNET_MQ_send(h->mq, ev); 843 GNUNET_MQ_send (h->mq, ev);
840 } 844 }
841 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc; 845 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
842 sc = sc->next) 846 sc = sc->next)
843 { 847 {
844 ev = 848 ev =
845 PEERSTORE_create_record_mq_envelope(sc->sub_system, 849 PEERSTORE_create_record_mq_envelope (sc->sub_system,
846 &sc->peer, 850 &sc->peer,
847 sc->key, 851 sc->key,
848 sc->value, 852 sc->value,
849 sc->size, 853 sc->size,
850 sc->expiry, 854 sc->expiry,
851 sc->options, 855 sc->options,
852 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 856 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
853 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); 857 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
854 GNUNET_MQ_send(h->mq, ev); 858 GNUNET_MQ_send (h->mq, ev);
855 } 859 }
856} 860}
857 861
858 862
@@ -862,20 +866,20 @@ reconnect(void *cls)
862 * @param wc handle to the watch request 866 * @param wc handle to the watch request
863 */ 867 */
864void 868void
865GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc) 869GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
866{ 870{
867 struct GNUNET_PEERSTORE_Handle *h = wc->h; 871 struct GNUNET_PEERSTORE_Handle *h = wc->h;
868 struct GNUNET_MQ_Envelope *ev; 872 struct GNUNET_MQ_Envelope *ev;
869 struct StoreKeyHashMessage *hm; 873 struct StoreKeyHashMessage *hm;
870 874
871 LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); 875 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
872 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); 876 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
873 hm->keyhash = wc->keyhash; 877 hm->keyhash = wc->keyhash;
874 GNUNET_MQ_send(h->mq, ev); 878 GNUNET_MQ_send (h->mq, ev);
875 GNUNET_assert( 879 GNUNET_assert (
876 GNUNET_YES == 880 GNUNET_YES ==
877 GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc)); 881 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
878 GNUNET_free(wc); 882 GNUNET_free (wc);
879} 883}
880 884
881 885
@@ -892,37 +896,37 @@ GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
892 * @return Handle to watch request 896 * @return Handle to watch request
893 */ 897 */
894struct GNUNET_PEERSTORE_WatchContext * 898struct GNUNET_PEERSTORE_WatchContext *
895GNUNET_PEERSTORE_watch(struct GNUNET_PEERSTORE_Handle *h, 899GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
896 const char *sub_system, 900 const char *sub_system,
897 const struct GNUNET_PeerIdentity *peer, 901 const struct GNUNET_PeerIdentity *peer,
898 const char *key, 902 const char *key,
899 GNUNET_PEERSTORE_Processor callback, 903 GNUNET_PEERSTORE_Processor callback,
900 void *callback_cls) 904 void *callback_cls)
901{ 905{
902 struct GNUNET_MQ_Envelope *ev; 906 struct GNUNET_MQ_Envelope *ev;
903 struct StoreKeyHashMessage *hm; 907 struct StoreKeyHashMessage *hm;
904 struct GNUNET_PEERSTORE_WatchContext *wc; 908 struct GNUNET_PEERSTORE_WatchContext *wc;
905 909
906 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 910 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
907 PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash); 911 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
908 wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); 912 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
909 wc->callback = callback; 913 wc->callback = callback;
910 wc->callback_cls = callback_cls; 914 wc->callback_cls = callback_cls;
911 wc->h = h; 915 wc->h = h;
912 wc->keyhash = hm->keyhash; 916 wc->keyhash = hm->keyhash;
913 if (NULL == h->watches) 917 if (NULL == h->watches)
914 h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); 918 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
915 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put( 919 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
916 h->watches, 920 h->watches,
917 &wc->keyhash, 921 &wc->keyhash,
918 wc, 922 wc,
919 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 923 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
920 LOG(GNUNET_ERROR_TYPE_DEBUG, 924 LOG (GNUNET_ERROR_TYPE_DEBUG,
921 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", 925 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
922 sub_system, 926 sub_system,
923 GNUNET_i2s(peer), 927 GNUNET_i2s (peer),
924 key); 928 key);
925 GNUNET_MQ_send(h->mq, ev); 929 GNUNET_MQ_send (h->mq, ev);
926 return wc; 930 return wc;
927} 931}
928 932