aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/gnunet-service-datastore.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-09-23 13:14:25 +0000
committerChristian Grothoff <christian@grothoff.org>2016-09-23 13:14:25 +0000
commit3e872864741aaa83be947b614eafb4fef2c74253 (patch)
tree1e08eb75c1c05394f1f7cf42b2fca69ba59aa2f0 /src/datastore/gnunet-service-datastore.c
parent194a99b2c76f07f1d58b6b24f514237db23d4bcb (diff)
downloadgnunet-3e872864741aaa83be947b614eafb4fef2c74253.tar.gz
gnunet-3e872864741aaa83be947b614eafb4fef2c74253.zip
converting datastore to new MQ API
Diffstat (limited to 'src/datastore/gnunet-service-datastore.c')
-rw-r--r--src/datastore/gnunet-service-datastore.c726
1 files changed, 334 insertions, 392 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 5853d447d..e632e33e0 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2004-2014 GNUnet e.V. 3 Copyright (C) 2004-2014, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -115,7 +115,7 @@ struct ReservationList
115 /** 115 /**
116 * Client that made the reservation. 116 * Client that made the reservation.
117 */ 117 */
118 struct GNUNET_SERVER_Client *client; 118 struct GNUNET_SERVICE_Client *client;
119 119
120 /** 120 /**
121 * Number of bytes (still) reserved. 121 * Number of bytes (still) reserved.
@@ -247,50 +247,6 @@ sync_stats ()
247 247
248 248
249/** 249/**
250 * Context for transmitting replies to clients.
251 */
252struct TransmitCallbackContext
253{
254
255 /**
256 * We keep these in a doubly-linked list (for cleanup).
257 */
258 struct TransmitCallbackContext *next;
259
260 /**
261 * We keep these in a doubly-linked list (for cleanup).
262 */
263 struct TransmitCallbackContext *prev;
264
265 /**
266 * The message that we're asked to transmit.
267 */
268 struct GNUNET_MessageHeader *msg;
269
270 /**
271 * Handle for the transmission request.
272 */
273 struct GNUNET_SERVER_TransmitHandle *th;
274
275 /**
276 * Client that we are transmitting to.
277 */
278 struct GNUNET_SERVER_Client *client;
279
280};
281
282
283/**
284 * Head of the doubly-linked list (for cleanup).
285 */
286static struct TransmitCallbackContext *tcc_head;
287
288/**
289 * Tail of the doubly-linked list (for cleanup).
290 */
291static struct TransmitCallbackContext *tcc_tail;
292
293/**
294 * Have we already cleaned up the TCCs and are hence no longer 250 * Have we already cleaned up the TCCs and are hence no longer
295 * willing (or able) to transmit anything to anyone? 251 * willing (or able) to transmit anything to anyone?
296 */ 252 */
@@ -304,7 +260,7 @@ static struct GNUNET_STATISTICS_GetHandle *stat_get;
304/** 260/**
305 * Handle to our server. 261 * Handle to our server.
306 */ 262 */
307static struct GNUNET_SERVER_Handle *server; 263static struct GNUNET_SERVICE_Handle *service;
308 264
309/** 265/**
310 * Task that is used to remove expired entries from 266 * Task that is used to remove expired entries from
@@ -496,86 +452,6 @@ manage_space (unsigned long long need)
496 452
497 453
498/** 454/**
499 * Function called to notify a client about the socket
500 * begin ready to queue more data. "buf" will be
501 * NULL and "size" zero if the socket was closed for
502 * writing in the meantime.
503 *
504 * @param cls closure
505 * @param size number of bytes available in buf
506 * @param buf where the callee should write the message
507 * @return number of bytes written to buf
508 */
509static size_t
510transmit_callback (void *cls, size_t size, void *buf)
511{
512 struct TransmitCallbackContext *tcc = cls;
513 size_t msize;
514
515 tcc->th = NULL;
516 GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
517 msize = ntohs (tcc->msg->size);
518 if (size == 0)
519 {
520 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
521 _("Transmission to client failed!\n"));
522 GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
523 GNUNET_SERVER_client_drop (tcc->client);
524 GNUNET_free (tcc->msg);
525 GNUNET_free (tcc);
526 return 0;
527 }
528 GNUNET_assert (size >= msize);
529 GNUNET_memcpy (buf, tcc->msg, msize);
530 GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
531 GNUNET_SERVER_client_drop (tcc->client);
532 GNUNET_free (tcc->msg);
533 GNUNET_free (tcc);
534 return msize;
535}
536
537
538/**
539 * Transmit the given message to the client.
540 *
541 * @param client target of the message
542 * @param msg message to transmit, will be freed!
543 */
544static void
545transmit (struct GNUNET_SERVER_Client *client,
546 struct GNUNET_MessageHeader *msg)
547{
548 struct TransmitCallbackContext *tcc;
549
550 if (GNUNET_YES == cleaning_done)
551 {
552 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
553 _("Shutdown in progress, aborting transmission.\n"));
554 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
555 GNUNET_free (msg);
556 return;
557 }
558 tcc = GNUNET_new (struct TransmitCallbackContext);
559 tcc->msg = msg;
560 tcc->client = client;
561 if (NULL ==
562 (tcc->th =
563 GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
564 GNUNET_TIME_UNIT_FOREVER_REL,
565 &transmit_callback, tcc)))
566 {
567 GNUNET_break (0);
568 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
569 GNUNET_free (msg);
570 GNUNET_free (tcc);
571 return;
572 }
573 GNUNET_SERVER_client_keep (client);
574 GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
575}
576
577
578/**
579 * Transmit a status code to the client. 455 * Transmit a status code to the client.
580 * 456 *
581 * @param client receiver of the response 457 * @param client receiver of the response
@@ -583,8 +459,11 @@ transmit (struct GNUNET_SERVER_Client *client,
583 * @param msg optional error message (can be NULL) 459 * @param msg optional error message (can be NULL)
584 */ 460 */
585static void 461static void
586transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg) 462transmit_status (struct GNUNET_SERVICE_Client *client,
463 int code,
464 const char *msg)
587{ 465{
466 struct GNUNET_MQ_Envelope *env;
588 struct StatusMessage *sm; 467 struct StatusMessage *sm;
589 size_t slen; 468 size_t slen;
590 469
@@ -592,14 +471,16 @@ transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
592 "Transmitting `%s' message with value %d and message `%s'\n", 471 "Transmitting `%s' message with value %d and message `%s'\n",
593 "STATUS", code, msg != NULL ? msg : "(none)"); 472 "STATUS", code, msg != NULL ? msg : "(none)");
594 slen = (msg == NULL) ? 0 : strlen (msg) + 1; 473 slen = (msg == NULL) ? 0 : strlen (msg) + 1;
595 sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen); 474 env = GNUNET_MQ_msg_extra (sm,
596 sm->header.size = htons (sizeof (struct StatusMessage) + slen); 475 slen,
597 sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); 476 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
598 sm->status = htonl (code); 477 sm->status = htonl (code);
599 sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration); 478 sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
600 if (slen > 0) 479 GNUNET_memcpy (&sm[1],
601 GNUNET_memcpy (&sm[1], msg, slen); 480 msg,
602 transmit (client, &sm->header); 481 slen);
482 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
483 env);
603} 484}
604 485
605 486
@@ -607,7 +488,7 @@ transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
607 * Function that will transmit the given datastore entry 488 * Function that will transmit the given datastore entry
608 * to the client. 489 * to the client.
609 * 490 *
610 * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client). 491 * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
611 * @param key key for the content 492 * @param key key for the content
612 * @param size number of bytes in data 493 * @param size number of bytes in data
613 * @param data content stored 494 * @param data content stored
@@ -631,27 +512,27 @@ transmit_item (void *cls,
631 struct GNUNET_TIME_Absolute expiration, 512 struct GNUNET_TIME_Absolute expiration,
632 uint64_t uid) 513 uint64_t uid)
633{ 514{
634 struct GNUNET_SERVER_Client *client = cls; 515 struct GNUNET_SERVICE_Client *client = cls;
516 struct GNUNET_MQ_Envelope *env;
635 struct GNUNET_MessageHeader *end; 517 struct GNUNET_MessageHeader *end;
636 struct DataMessage *dm; 518 struct DataMessage *dm;
637 519
638 if (key == NULL) 520 if (NULL == key)
639 { 521 {
640 /* transmit 'DATA_END' */ 522 /* transmit 'DATA_END' */
641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n", 523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642 "DATA_END"); 524 "Transmitting DATA_END message\n");
643 end = GNUNET_new (struct GNUNET_MessageHeader); 525 env = GNUNET_MQ_msg (end,
644 end->size = htons (sizeof (struct GNUNET_MessageHeader)); 526 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
645 end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); 527 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
646 transmit (client, end); 528 env);
647 GNUNET_SERVER_client_drop (client);
648 return GNUNET_OK; 529 return GNUNET_OK;
649 } 530 }
650 GNUNET_assert (sizeof (struct DataMessage) + size < 531 GNUNET_assert (sizeof (struct DataMessage) + size <
651 GNUNET_SERVER_MAX_MESSAGE_SIZE); 532 GNUNET_SERVER_MAX_MESSAGE_SIZE);
652 dm = GNUNET_malloc (sizeof (struct DataMessage) + size); 533 env = GNUNET_MQ_msg_extra (dm,
653 dm->header.size = htons (sizeof (struct DataMessage) + size); 534 size,
654 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 535 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
655 dm->rid = htonl (0); 536 dm->rid = htonl (0);
656 dm->size = htonl (size); 537 dm->size = htonl (size);
657 dm->type = htonl (type); 538 dm->type = htonl (type);
@@ -662,10 +543,13 @@ transmit_item (void *cls,
662 dm->expiration = GNUNET_TIME_absolute_hton (expiration); 543 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
663 dm->uid = GNUNET_htonll (uid); 544 dm->uid = GNUNET_htonll (uid);
664 dm->key = *key; 545 dm->key = *key;
665 GNUNET_memcpy (&dm[1], data, size); 546 GNUNET_memcpy (&dm[1],
547 data,
548 size);
666 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667 "Transmitting `%s' message for `%s' of type %u with expiration %s (in: %s)\n", 550 "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
668 "DATA", GNUNET_h2s (key), type, 551 GNUNET_h2s (key),
552 type,
669 GNUNET_STRINGS_absolute_time_to_string (expiration), 553 GNUNET_STRINGS_absolute_time_to_string (expiration),
670 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), 554 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
671 GNUNET_YES)); 555 GNUNET_YES));
@@ -673,8 +557,8 @@ transmit_item (void *cls,
673 gettext_noop ("# results found"), 557 gettext_noop ("# results found"),
674 1, 558 1,
675 GNUNET_NO); 559 GNUNET_NO);
676 transmit (client, &dm->header); 560 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
677 GNUNET_SERVER_client_drop (client); 561 env);
678 return GNUNET_OK; 562 return GNUNET_OK;
679} 563}
680 564
@@ -682,20 +566,18 @@ transmit_item (void *cls,
682/** 566/**
683 * Handle RESERVE-message. 567 * Handle RESERVE-message.
684 * 568 *
685 * @param cls closure 569 * @param cls identification of the client
686 * @param client identification of the client
687 * @param message the actual message 570 * @param message the actual message
688 */ 571 */
689static void 572static void
690handle_reserve (void *cls, struct GNUNET_SERVER_Client *client, 573handle_reserve (void *cls,
691 const struct GNUNET_MessageHeader *message) 574 const struct ReserveMessage *msg)
692{ 575{
693 /** 576 /**
694 * Static counter to produce reservation identifiers. 577 * Static counter to produce reservation identifiers.
695 */ 578 */
696 static int reservation_gen; 579 static int reservation_gen;
697 580 struct GNUNET_SERVICE_Client *client = cls;
698 const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
699 struct ReservationList *e; 581 struct ReservationList *e;
700 unsigned long long used; 582 unsigned long long used;
701 unsigned long long req; 583 unsigned long long req;
@@ -707,16 +589,15 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
707 amount = GNUNET_ntohll (msg->amount); 589 amount = GNUNET_ntohll (msg->amount);
708 entries = ntohl (msg->entries); 590 entries = ntohl (msg->entries);
709 used = payload + reserved; 591 used = payload + reserved;
710 req = 592 req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
711 amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
712 if (used + req > quota) 593 if (used + req > quota)
713 { 594 {
714 if (quota < used) 595 if (quota < used)
715 used = quota; /* cheat a bit for error message (to avoid negative numbers) */ 596 used = quota; /* cheat a bit for error message (to avoid negative numbers) */
716 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 597 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
717 _ 598 _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
718 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"), 599 quota - used,
719 quota - used, "RESERVE", req); 600 req);
720 if (cache_size < req) 601 if (cache_size < req)
721 { 602 {
722 /* TODO: document this in the FAQ; essentially, if this 603 /* TODO: document this in the FAQ; essentially, if this
@@ -725,19 +606,22 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
725 * larger than 1/8th of the overall available space, and 606 * larger than 1/8th of the overall available space, and
726 * we only reserve 1/8th for "fresh" insertions */ 607 * we only reserve 1/8th for "fresh" insertions */
727 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 608 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
728 _ 609 _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
729 ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"), 610 req,
730 req, cache_size); 611 cache_size);
731 transmit_status (client, 0, 612 transmit_status (client,
613 0,
732 gettext_noop 614 gettext_noop
733 ("Insufficient space to satisfy request and " 615 ("Insufficient space to satisfy request and "
734 "requested amount is larger than cache size")); 616 "requested amount is larger than cache size"));
735 } 617 }
736 else 618 else
737 { 619 {
738 transmit_status (client, 0, 620 transmit_status (client,
621 0,
739 gettext_noop ("Insufficient space to satisfy request")); 622 gettext_noop ("Insufficient space to satisfy request"));
740 } 623 }
624 GNUNET_SERVICE_client_continue (client);
741 return; 625 return;
742 } 626 }
743 reserved += req; 627 reserved += req;
@@ -754,24 +638,24 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
754 e->rid = ++reservation_gen; 638 e->rid = ++reservation_gen;
755 if (reservation_gen < 0) 639 if (reservation_gen < 0)
756 reservation_gen = 0; /* wrap around */ 640 reservation_gen = 0; /* wrap around */
757 transmit_status (client, e->rid, NULL); 641 transmit_status (client,
642 e->rid,
643 NULL);
644 GNUNET_SERVICE_client_continue (client);
758} 645}
759 646
760 647
761/** 648/**
762 * Handle RELEASE_RESERVE-message. 649 * Handle RELEASE_RESERVE-message.
763 * 650 *
764 * @param cls closure 651 * @param cls identification of the client
765 * @param client identification of the client
766 * @param message the actual message 652 * @param message the actual message
767 */ 653 */
768static void 654static void
769handle_release_reserve (void *cls, 655handle_release_reserve (void *cls,
770 struct GNUNET_SERVER_Client *client, 656 const struct ReleaseReserveMessage *msg)
771 const struct GNUNET_MessageHeader *message)
772{ 657{
773 const struct ReleaseReserveMessage *msg = 658 struct GNUNET_SERVICE_Client *client = cls;
774 (const struct ReleaseReserveMessage *) message;
775 struct ReservationList *pos; 659 struct ReservationList *pos;
776 struct ReservationList *prev; 660 struct ReservationList *prev;
777 struct ReservationList *next; 661 struct ReservationList *next;
@@ -804,43 +688,42 @@ handle_release_reserve (void *cls,
804 "Returning %llu remaining reserved bytes to storage pool\n", 688 "Returning %llu remaining reserved bytes to storage pool\n",
805 rem); 689 rem);
806 GNUNET_free (pos); 690 GNUNET_free (pos);
807 transmit_status (client, GNUNET_OK, NULL); 691 transmit_status (client,
692 GNUNET_OK,
693 NULL);
694 GNUNET_SERVICE_client_continue (client);
808 return; 695 return;
809 } 696 }
810 prev = pos; 697 prev = pos;
811 } 698 }
812 GNUNET_break (0); 699 GNUNET_break (0);
813 transmit_status (client, GNUNET_SYSERR, 700 transmit_status (client,
701 GNUNET_SYSERR,
814 gettext_noop ("Could not find matching reservation")); 702 gettext_noop ("Could not find matching reservation"));
703 GNUNET_SERVICE_client_continue (client);
815} 704}
816 705
817 706
818/** 707/**
819 * Check that the given message is a valid data message. 708 * Check that the given message is a valid data message.
820 * 709 *
821 * @return NULL if the message is not well-formed, otherwise the message 710 * @param dm message to check
711 * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
822 */ 712 */
823static const struct DataMessage * 713static int
824check_data (const struct GNUNET_MessageHeader *message) 714check_data (const struct DataMessage *dm)
825{ 715{
826 uint16_t size; 716 uint16_t size;
827 uint32_t dsize; 717 uint32_t dsize;
828 const struct DataMessage *dm;
829 718
830 size = ntohs (message->size); 719 size = ntohs (dm->header.size);
831 if (size < sizeof (struct DataMessage))
832 {
833 GNUNET_break (0);
834 return NULL;
835 }
836 dm = (const struct DataMessage *) message;
837 dsize = ntohl (dm->size); 720 dsize = ntohl (dm->size);
838 if (size != dsize + sizeof (struct DataMessage)) 721 if (size != dsize + sizeof (struct DataMessage))
839 { 722 {
840 GNUNET_break (0); 723 GNUNET_break (0);
841 return NULL; 724 return GNUNET_SYSERR;
842 } 725 }
843 return dm; 726 return GNUNET_OK;
844} 727}
845 728
846 729
@@ -853,7 +736,7 @@ struct PutContext
853 /** 736 /**
854 * Client to notify on completion. 737 * Client to notify on completion.
855 */ 738 */
856 struct GNUNET_SERVER_Client *client; 739 struct GNUNET_SERVICE_Client *client;
857 740
858#if ! HAVE_UNALIGNED_64_ACCESS 741#if ! HAVE_UNALIGNED_64_ACCESS
859 void *reserved; 742 void *reserved;
@@ -887,13 +770,16 @@ put_continuation (void *cls,
887 gettext_noop ("# bytes stored"), 770 gettext_noop ("# bytes stored"),
888 size, 771 size,
889 GNUNET_YES); 772 GNUNET_YES);
890 GNUNET_CONTAINER_bloomfilter_add (filter, key); 773 GNUNET_CONTAINER_bloomfilter_add (filter,
774 key);
891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892 "Successfully stored %u bytes under key `%s'\n", 776 "Successfully stored %u bytes under key `%s'\n",
893 size, GNUNET_h2s (key)); 777 size,
778 GNUNET_h2s (key));
894 } 779 }
895 transmit_status (pc->client, status, msg); 780 transmit_status (pc->client,
896 GNUNET_SERVER_client_drop (pc->client); 781 status,
782 msg);
897 GNUNET_free (pc); 783 GNUNET_free (pc);
898 if (quota - reserved - cache_size < payload) 784 if (quota - reserved - cache_size < payload)
899 { 785 {
@@ -918,11 +804,17 @@ execute_put (struct PutContext *pc)
918 const struct DataMessage *dm; 804 const struct DataMessage *dm;
919 805
920 dm = (const struct DataMessage *) &pc[1]; 806 dm = (const struct DataMessage *) &pc[1];
921 plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1], 807 plugin->api->put (plugin->api->cls,
922 ntohl (dm->type), ntohl (dm->priority), 808 &dm->key,
923 ntohl (dm->anonymity), ntohl (dm->replication), 809 ntohl (dm->size),
810 &dm[1],
811 ntohl (dm->type),
812 ntohl (dm->priority),
813 ntohl (dm->anonymity),
814 ntohl (dm->replication),
924 GNUNET_TIME_absolute_ntoh (dm->expiration), 815 GNUNET_TIME_absolute_ntoh (dm->expiration),
925 &put_continuation, pc); 816 &put_continuation,
817 pc);
926} 818}
927 819
928 820
@@ -937,10 +829,11 @@ check_present_continuation (void *cls,
937 int status, 829 int status,
938 const char *msg) 830 const char *msg)
939{ 831{
940 struct GNUNET_SERVER_Client *client = cls; 832 struct GNUNET_SERVICE_Client *client = cls;
941 833
942 transmit_status (client, GNUNET_NO, NULL); 834 transmit_status (client,
943 GNUNET_SERVER_client_drop (client); 835 GNUNET_NO,
836 NULL);
944} 837}
945 838
946 839
@@ -984,7 +877,9 @@ check_present (void *cls,
984 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) || 877 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
985 (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || 878 (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
986 ( (size == ntohl (dm->size)) && 879 ( (size == ntohl (dm->size)) &&
987 (0 == memcmp (&dm[1], data, size)) ) ) 880 (0 == memcmp (&dm[1],
881 data,
882 size)) ) )
988 { 883 {
989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990 "Result already present in datastore\n"); 885 "Result already present in datastore\n");
@@ -1000,8 +895,9 @@ check_present (void *cls,
1000 pc->client); 895 pc->client);
1001 else 896 else
1002 { 897 {
1003 transmit_status (pc->client, GNUNET_NO, NULL); 898 transmit_status (pc->client,
1004 GNUNET_SERVER_client_drop (pc->client); 899 GNUNET_NO,
900 NULL);
1005 } 901 }
1006 GNUNET_free (pc); 902 GNUNET_free (pc);
1007 } 903 }
@@ -1014,30 +910,42 @@ check_present (void *cls,
1014 910
1015 911
1016/** 912/**
913 * Verify PUT-message.
914 *
915 * @param cls identification of the client
916 * @param message the actual message
917 * @return #GNUNET_OK if @a dm is well-formed
918 */
919static int
920check_put (void *cls,
921 const struct DataMessage *dm)
922{
923 if (GNUNET_OK != check_data (dm))
924 {
925 GNUNET_break (0);
926 return GNUNET_SYSERR;
927 }
928 return GNUNET_OK;
929}
930
931
932/**
1017 * Handle PUT-message. 933 * Handle PUT-message.
1018 * 934 *
1019 * @param cls closure 935 * @param cls identification of the client
1020 * @param client identification of the client
1021 * @param message the actual message 936 * @param message the actual message
1022 */ 937 */
1023static void 938static void
1024handle_put (void *cls, 939handle_put (void *cls,
1025 struct GNUNET_SERVER_Client *client, 940 const struct DataMessage *dm)
1026 const struct GNUNET_MessageHeader *message)
1027{ 941{
1028 const struct DataMessage *dm = check_data (message); 942 struct GNUNET_SERVICE_Client *client = cls;
1029 int rid; 943 int rid;
1030 struct ReservationList *pos; 944 struct ReservationList *pos;
1031 struct PutContext *pc; 945 struct PutContext *pc;
1032 struct GNUNET_HashCode vhash; 946 struct GNUNET_HashCode vhash;
1033 uint32_t size; 947 uint32_t size;
1034 948
1035 if ((dm == NULL) || (ntohl (dm->type) == 0))
1036 {
1037 GNUNET_break (0);
1038 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1039 return;
1040 }
1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042 "Processing PUT request for `%s' of type %u\n", 950 "Processing PUT request for `%s' of type %u\n",
1043 GNUNET_h2s (&dm->key), 951 GNUNET_h2s (&dm->key),
@@ -1066,11 +974,15 @@ handle_put (void *cls,
1066 pc = GNUNET_malloc (sizeof (struct PutContext) + size + 974 pc = GNUNET_malloc (sizeof (struct PutContext) + size +
1067 sizeof (struct DataMessage)); 975 sizeof (struct DataMessage));
1068 pc->client = client; 976 pc->client = client;
1069 GNUNET_SERVER_client_keep (client); 977 GNUNET_memcpy (&pc[1],
1070 GNUNET_memcpy (&pc[1], dm, size + sizeof (struct DataMessage)); 978 dm,
1071 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key)) 979 size + sizeof (struct DataMessage));
980 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
981 &dm->key))
1072 { 982 {
1073 GNUNET_CRYPTO_hash (&dm[1], size, &vhash); 983 GNUNET_CRYPTO_hash (&dm[1],
984 size,
985 &vhash);
1074 plugin->api->get_key (plugin->api->cls, 986 plugin->api->get_key (plugin->api->cls,
1075 0, 987 0,
1076 &dm->key, 988 &dm->key,
@@ -1078,27 +990,26 @@ handle_put (void *cls,
1078 ntohl (dm->type), 990 ntohl (dm->type),
1079 &check_present, 991 &check_present,
1080 pc); 992 pc);
993 GNUNET_SERVICE_client_continue (client);
1081 return; 994 return;
1082 } 995 }
1083 execute_put (pc); 996 execute_put (pc);
997 GNUNET_SERVICE_client_continue (client);
1084} 998}
1085 999
1086 1000
1087/** 1001/**
1088 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message. 1002 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1089 * 1003 *
1090 * @param cls closure 1004 * @param cls identification of the client
1091 * @param client identification of the client 1005 * @param msg the actual message
1092 * @param message the actual message
1093 */ 1006 */
1094static void 1007static void
1095handle_get (void *cls, 1008handle_get (void *cls,
1096 struct GNUNET_SERVER_Client *client, 1009 const struct GetMessage *msg)
1097 const struct GNUNET_MessageHeader *message)
1098{ 1010{
1099 const struct GetMessage *msg; 1011 struct GNUNET_SERVICE_Client *client = cls;
1100 1012
1101 msg = (const struct GetMessage *) message;
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1013 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "Processing GET request of type %u\n", 1014 "Processing GET request of type %u\n",
1104 ntohl (msg->type)); 1015 ntohl (msg->type));
@@ -1106,7 +1017,6 @@ handle_get (void *cls,
1106 gettext_noop ("# GET requests received"), 1017 gettext_noop ("# GET requests received"),
1107 1, 1018 1,
1108 GNUNET_NO); 1019 GNUNET_NO);
1109 GNUNET_SERVER_client_keep (client);
1110 plugin->api->get_key (plugin->api->cls, 1020 plugin->api->get_key (plugin->api->cls,
1111 GNUNET_ntohll (msg->offset), 1021 GNUNET_ntohll (msg->offset),
1112 NULL, 1022 NULL,
@@ -1114,23 +1024,22 @@ handle_get (void *cls,
1114 ntohl (msg->type), 1024 ntohl (msg->type),
1115 &transmit_item, 1025 &transmit_item,
1116 client); 1026 client);
1027 GNUNET_SERVICE_client_continue (client);
1117} 1028}
1118 1029
1030
1119/** 1031/**
1120 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message. 1032 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1121 * 1033 *
1122 * @param cls closure 1034 * @param cls closure
1123 * @param client identification of the client 1035 * @param msg the actual message
1124 * @param message the actual message
1125 */ 1036 */
1126static void 1037static void
1127handle_get_key (void *cls, 1038handle_get_key (void *cls,
1128 struct GNUNET_SERVER_Client *client, 1039 const struct GetKeyMessage *msg)
1129 const struct GNUNET_MessageHeader *message)
1130{ 1040{
1131 const struct GetKeyMessage *msg; 1041 struct GNUNET_SERVICE_Client *client = cls;
1132 1042
1133 msg = (const struct GetKeyMessage *) message;
1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135 "Processing GET request for `%s' of type %u\n", 1044 "Processing GET request for `%s' of type %u\n",
1136 GNUNET_h2s (&msg->key), 1045 GNUNET_h2s (&msg->key),
@@ -1139,7 +1048,6 @@ handle_get_key (void *cls,
1139 gettext_noop ("# GET KEY requests received"), 1048 gettext_noop ("# GET KEY requests received"),
1140 1, 1049 1,
1141 GNUNET_NO); 1050 GNUNET_NO);
1142 GNUNET_SERVER_client_keep (client);
1143 if (GNUNET_YES != 1051 if (GNUNET_YES !=
1144 GNUNET_CONTAINER_bloomfilter_test (filter, 1052 GNUNET_CONTAINER_bloomfilter_test (filter,
1145 &msg->key)) 1053 &msg->key))
@@ -1157,6 +1065,7 @@ handle_get_key (void *cls,
1157 NULL, 0, NULL, 0, 0, 0, 1065 NULL, 0, NULL, 0, 0, 0,
1158 GNUNET_TIME_UNIT_ZERO_ABS, 1066 GNUNET_TIME_UNIT_ZERO_ABS,
1159 0); 1067 0);
1068 GNUNET_SERVICE_client_continue (client);
1160 return; 1069 return;
1161 } 1070 }
1162 plugin->api->get_key (plugin->api->cls, 1071 plugin->api->get_key (plugin->api->cls,
@@ -1166,6 +1075,7 @@ handle_get_key (void *cls,
1166 ntohl (msg->type), 1075 ntohl (msg->type),
1167 &transmit_item, 1076 &transmit_item,
1168 client); 1077 client);
1078 GNUNET_SERVICE_client_continue (client);
1169} 1079}
1170 1080
1171 1081
@@ -1181,104 +1091,100 @@ update_continuation (void *cls,
1181 int status, 1091 int status,
1182 const char *msg) 1092 const char *msg)
1183{ 1093{
1184 struct GNUNET_SERVER_Client *client = cls; 1094 struct GNUNET_SERVICE_Client *client = cls;
1185 1095
1186 transmit_status (client, status, msg); 1096 transmit_status (client,
1187 GNUNET_SERVER_client_drop (client); 1097 status,
1098 msg);
1188} 1099}
1189 1100
1190 1101
1191/** 1102/**
1192 * Handle UPDATE-message. 1103 * Handle UPDATE-message.
1193 * 1104 *
1194 * @param cls closure 1105 * @param cls client identification of the client
1195 * @param client identification of the client
1196 * @param message the actual message 1106 * @param message the actual message
1197 */ 1107 */
1198static void 1108static void
1199handle_update (void *cls, 1109handle_update (void *cls,
1200 struct GNUNET_SERVER_Client *client, 1110 const struct UpdateMessage *msg)
1201 const struct GNUNET_MessageHeader *message)
1202{ 1111{
1203 const struct UpdateMessage *msg; 1112 struct GNUNET_SERVICE_Client *client = cls;
1204 1113
1205 GNUNET_STATISTICS_update (stats, 1114 GNUNET_STATISTICS_update (stats,
1206 gettext_noop ("# UPDATE requests received"), 1115 gettext_noop ("# UPDATE requests received"),
1207 1, 1116 1,
1208 GNUNET_NO); 1117 GNUNET_NO);
1209 msg = (const struct UpdateMessage *) message;
1210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211 "Processing UPDATE request for %llu\n", 1119 "Processing UPDATE request for %llu\n",
1212 (unsigned long long) GNUNET_ntohll (msg->uid)); 1120 (unsigned long long) GNUNET_ntohll (msg->uid));
1213 GNUNET_SERVER_client_keep (client);
1214 plugin->api->update (plugin->api->cls, 1121 plugin->api->update (plugin->api->cls,
1215 GNUNET_ntohll (msg->uid), 1122 GNUNET_ntohll (msg->uid),
1216 (int32_t) ntohl (msg->priority), 1123 (int32_t) ntohl (msg->priority),
1217 GNUNET_TIME_absolute_ntoh (msg->expiration), 1124 GNUNET_TIME_absolute_ntoh (msg->expiration),
1218 &update_continuation, client); 1125 &update_continuation,
1126 client);
1127 GNUNET_SERVICE_client_continue (client);
1219} 1128}
1220 1129
1221 1130
1222/** 1131/**
1223 * Handle GET_REPLICATION-message. 1132 * Handle GET_REPLICATION-message.
1224 * 1133 *
1225 * @param cls closure 1134 * @param cls identification of the client
1226 * @param client identification of the client
1227 * @param message the actual message 1135 * @param message the actual message
1228 */ 1136 */
1229static void 1137static void
1230handle_get_replication (void *cls, 1138handle_get_replication (void *cls,
1231 struct GNUNET_SERVER_Client *client,
1232 const struct GNUNET_MessageHeader *message) 1139 const struct GNUNET_MessageHeader *message)
1233{ 1140{
1141 struct GNUNET_SERVICE_Client *client = cls;
1142
1234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1143 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235 "Processing `%s' request\n", 1144 "Processing GET_REPLICATION request\n");
1236 "GET_REPLICATION");
1237 GNUNET_STATISTICS_update (stats, 1145 GNUNET_STATISTICS_update (stats,
1238 gettext_noop ("# GET REPLICATION requests received"), 1146 gettext_noop ("# GET REPLICATION requests received"),
1239 1, 1147 1,
1240 GNUNET_NO); 1148 GNUNET_NO);
1241 GNUNET_SERVER_client_keep (client);
1242 plugin->api->get_replication (plugin->api->cls, 1149 plugin->api->get_replication (plugin->api->cls,
1243 &transmit_item, client); 1150 &transmit_item,
1151 client);
1152 GNUNET_SERVICE_client_continue (client);
1244} 1153}
1245 1154
1246 1155
1247/** 1156/**
1248 * Handle GET_ZERO_ANONYMITY-message. 1157 * Handle GET_ZERO_ANONYMITY-message.
1249 * 1158 *
1250 * @param cls closure 1159 * @param cls client identification of the client
1251 * @param client identification of the client
1252 * @param message the actual message 1160 * @param message the actual message
1253 */ 1161 */
1254static void 1162static void
1255handle_get_zero_anonymity (void *cls, 1163handle_get_zero_anonymity (void *cls,
1256 struct GNUNET_SERVER_Client *client, 1164 const struct GetZeroAnonymityMessage *msg)
1257 const struct GNUNET_MessageHeader *message)
1258{ 1165{
1259 const struct GetZeroAnonymityMessage *msg = 1166 struct GNUNET_SERVICE_Client *client = cls;
1260 (const struct GetZeroAnonymityMessage *) message;
1261 enum GNUNET_BLOCK_Type type; 1167 enum GNUNET_BLOCK_Type type;
1262 1168
1263 type = (enum GNUNET_BLOCK_Type) ntohl (msg->type); 1169 type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1264 if (type == GNUNET_BLOCK_TYPE_ANY) 1170 if (type == GNUNET_BLOCK_TYPE_ANY)
1265 { 1171 {
1266 GNUNET_break (0); 1172 GNUNET_break (0);
1267 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1173 GNUNET_SERVICE_client_drop (client);
1268 return; 1174 return;
1269 } 1175 }
1270 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271 "Processing `%s' request\n", 1177 "Processing GET_ZERO_ANONYMITY request\n");
1272 "GET_ZERO_ANONYMITY");
1273 GNUNET_STATISTICS_update (stats, 1178 GNUNET_STATISTICS_update (stats,
1274 gettext_noop ("# GET ZERO ANONYMITY requests received"), 1179 gettext_noop ("# GET ZERO ANONYMITY requests received"),
1275 1, 1180 1,
1276 GNUNET_NO); 1181 GNUNET_NO);
1277 GNUNET_SERVER_client_keep (client);
1278 plugin->api->get_zero_anonymity (plugin->api->cls, 1182 plugin->api->get_zero_anonymity (plugin->api->cls,
1279 GNUNET_ntohll (msg->offset), 1183 GNUNET_ntohll (msg->offset),
1280 type, 1184 type,
1281 &transmit_item, client); 1185 &transmit_item,
1186 client);
1187 GNUNET_SERVICE_client_continue (client);
1282} 1188}
1283 1189
1284 1190
@@ -1309,7 +1215,7 @@ remove_callback (void *cls,
1309 struct GNUNET_TIME_Absolute expiration, 1215 struct GNUNET_TIME_Absolute expiration,
1310 uint64_t uid) 1216 uint64_t uid)
1311{ 1217{
1312 struct GNUNET_SERVER_Client *client = cls; 1218 struct GNUNET_SERVICE_Client *client = cls;
1313 1219
1314 if (NULL == key) 1220 if (NULL == key)
1315 { 1221 {
@@ -1318,7 +1224,6 @@ remove_callback (void *cls,
1318 transmit_status (client, 1224 transmit_status (client,
1319 GNUNET_NO, 1225 GNUNET_NO,
1320 _("Content not found")); 1226 _("Content not found"));
1321 GNUNET_SERVER_client_drop (client);
1322 return GNUNET_OK; /* last item */ 1227 return GNUNET_OK; /* last item */
1323 } 1228 }
1324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1330,14 +1235,36 @@ remove_callback (void *cls,
1330 gettext_noop ("# bytes removed (explicit request)"), 1235 gettext_noop ("# bytes removed (explicit request)"),
1331 size, 1236 size,
1332 GNUNET_YES); 1237 GNUNET_YES);
1333 GNUNET_CONTAINER_bloomfilter_remove (filter, key); 1238 GNUNET_CONTAINER_bloomfilter_remove (filter,
1334 transmit_status (client, GNUNET_OK, NULL); 1239 key);
1335 GNUNET_SERVER_client_drop (client); 1240 transmit_status (client,
1241 GNUNET_OK,
1242 NULL);
1336 return GNUNET_NO; 1243 return GNUNET_NO;
1337} 1244}
1338 1245
1339 1246
1340/** 1247/**
1248 * Verify REMOVE-message.
1249 *
1250 * @param cls identification of the client
1251 * @param message the actual message
1252 * @return #GNUNET_OK if @a dm is well-formed
1253 */
1254static int
1255check_remove (void *cls,
1256 const struct DataMessage *dm)
1257{
1258 if (GNUNET_OK != check_data (dm))
1259 {
1260 GNUNET_break (0);
1261 return GNUNET_SYSERR;
1262 }
1263 return GNUNET_OK;
1264}
1265
1266
1267/**
1341 * Handle REMOVE-message. 1268 * Handle REMOVE-message.
1342 * 1269 *
1343 * @param cls closure 1270 * @param cls closure
@@ -1346,22 +1273,14 @@ remove_callback (void *cls,
1346 */ 1273 */
1347static void 1274static void
1348handle_remove (void *cls, 1275handle_remove (void *cls,
1349 struct GNUNET_SERVER_Client *client, 1276 const struct DataMessage *dm)
1350 const struct GNUNET_MessageHeader *message)
1351{ 1277{
1352 const struct DataMessage *dm = check_data (message); 1278 struct GNUNET_SERVICE_Client *client = cls;
1353 struct GNUNET_HashCode vhash; 1279 struct GNUNET_HashCode vhash;
1354 1280
1355 if (NULL == dm)
1356 {
1357 GNUNET_break (0);
1358 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1359 return;
1360 }
1361 GNUNET_STATISTICS_update (stats, 1281 GNUNET_STATISTICS_update (stats,
1362 gettext_noop ("# REMOVE requests received"), 1282 gettext_noop ("# REMOVE requests received"),
1363 1, GNUNET_NO); 1283 1, GNUNET_NO);
1364 GNUNET_SERVER_client_keep (client);
1365 GNUNET_CRYPTO_hash (&dm[1], 1284 GNUNET_CRYPTO_hash (&dm[1],
1366 ntohl (dm->size), 1285 ntohl (dm->size),
1367 &vhash); 1286 &vhash);
@@ -1374,26 +1293,28 @@ handle_remove (void *cls,
1374 &dm->key, 1293 &dm->key,
1375 &vhash, 1294 &vhash,
1376 (enum GNUNET_BLOCK_Type) ntohl (dm->type), 1295 (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1377 &remove_callback, client); 1296 &remove_callback,
1297 client);
1298 GNUNET_SERVICE_client_continue (client);
1378} 1299}
1379 1300
1380 1301
1381/** 1302/**
1382 * Handle DROP-message. 1303 * Handle DROP-message.
1383 * 1304 *
1384 * @param cls closure 1305 * @param cls identification of the client
1385 * @param client identification of the client
1386 * @param message the actual message 1306 * @param message the actual message
1387 */ 1307 */
1388static void 1308static void
1389handle_drop (void *cls, 1309handle_drop (void *cls,
1390 struct GNUNET_SERVER_Client *client,
1391 const struct GNUNET_MessageHeader *message) 1310 const struct GNUNET_MessageHeader *message)
1392{ 1311{
1312 struct GNUNET_SERVICE_Client *client = cls;
1313
1393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394 "Processing DROP request\n"); 1315 "Processing DROP request\n");
1395 do_drop = GNUNET_YES; 1316 do_drop = GNUNET_YES;
1396 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1317 GNUNET_SERVICE_client_continue (client);
1397} 1318}
1398 1319
1399 1320
@@ -1479,7 +1400,8 @@ load_plugin ()
1479 plugin_name); 1400 plugin_name);
1480 ret->short_name = GNUNET_strdup (plugin_name); 1401 ret->short_name = GNUNET_strdup (plugin_name);
1481 ret->lib_name = libname; 1402 ret->lib_name = libname;
1482 ret->api = GNUNET_PLUGIN_load (libname, &ret->env); 1403 ret->api = GNUNET_PLUGIN_load (libname,
1404 &ret->env);
1483 if (NULL == ret->api) 1405 if (NULL == ret->api)
1484 { 1406 {
1485 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1407 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1512,30 +1434,18 @@ unload_plugin (struct DatastorePlugin *plug)
1512} 1434}
1513 1435
1514 1436
1515static const struct GNUNET_SERVER_MessageHandler handlers[] = { 1437/**
1516 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 1438 * Initialization complete, start operating the service.
1517 sizeof (struct ReserveMessage)}, 1439 */
1518 {&handle_release_reserve, NULL, 1440static void
1519 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 1441begin_service ()
1520 sizeof (struct ReleaseReserveMessage)}, 1442{
1521 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, 1443 GNUNET_SERVICE_resume (service);
1522 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 1444 expired_kill_task
1523 sizeof (struct UpdateMessage)}, 1445 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1524 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 1446 &delete_expired,
1525 sizeof (struct GetMessage) }, 1447 NULL);
1526 {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY, 1448}
1527 sizeof (struct GetKeyMessage) },
1528 {&handle_get_replication, NULL,
1529 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1530 sizeof (struct GNUNET_MessageHeader)},
1531 {&handle_get_zero_anonymity, NULL,
1532 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1533 sizeof (struct GetZeroAnonymityMessage)},
1534 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1535 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1536 sizeof (struct GNUNET_MessageHeader)},
1537 {NULL, NULL, 0, 0}
1538};
1539 1449
1540 1450
1541/** 1451/**
@@ -1556,17 +1466,13 @@ add_key_to_bloomfilter (void *cls,
1556 { 1466 {
1557 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1467 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1558 _("Bloomfilter construction complete.\n")); 1468 _("Bloomfilter construction complete.\n"));
1559 GNUNET_SERVER_add_handlers (server, handlers); 1469 begin_service ();
1560 GNUNET_SERVER_resume (server);
1561 expired_kill_task
1562 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1563 &delete_expired,
1564 NULL);
1565 return; 1470 return;
1566 } 1471 }
1567 1472
1568 while (0 < count--) 1473 while (0 < count--)
1569 GNUNET_CONTAINER_bloomfilter_add (bf, key); 1474 GNUNET_CONTAINER_bloomfilter_add (bf,
1475 key);
1570} 1476}
1571 1477
1572 1478
@@ -1594,11 +1500,13 @@ process_stat_done (void *cls,
1594 filter = NULL; 1500 filter = NULL;
1595 if (NULL != stats) 1501 if (NULL != stats)
1596 { 1502 {
1597 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 1503 GNUNET_STATISTICS_destroy (stats,
1504 GNUNET_YES);
1598 stats = NULL; 1505 stats = NULL;
1599 } 1506 }
1600 return; 1507 return;
1601 } 1508 }
1509
1602 if (GNUNET_NO == stats_worked) 1510 if (GNUNET_NO == stats_worked)
1603 { 1511 {
1604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1608,8 +1516,8 @@ process_stat_done (void *cls,
1608 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1516 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1609 _("New payload: %lld\n"), 1517 _("New payload: %lld\n"),
1610 (long long) payload); 1518 (long long) payload);
1611
1612 } 1519 }
1520
1613 if (GNUNET_YES == refresh_bf) 1521 if (GNUNET_YES == refresh_bf)
1614 { 1522 {
1615 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1523 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1622,19 +1530,12 @@ process_stat_done (void *cls,
1622 return; 1530 return;
1623 } 1531 }
1624 else 1532 else
1533 {
1625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1534 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1626 _("Plugin does not support get_keys function. Please fix!\n")); 1535 _("Plugin does not support get_keys function. Please fix!\n"));
1627 1536 }
1628 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1629 _("Bloomfilter construction complete.\n"));
1630 } 1537 }
1631 1538 begin_service ();
1632 GNUNET_SERVER_add_handlers (server, handlers);
1633 GNUNET_SERVER_resume (server);
1634 expired_kill_task
1635 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1636 &delete_expired,
1637 NULL);
1638} 1539}
1639 1540
1640 1541
@@ -1648,7 +1549,8 @@ stat_timeout (void *cls)
1648{ 1549{
1649 stat_timeout_task = NULL; 1550 stat_timeout_task = NULL;
1650 GNUNET_STATISTICS_get_cancel (stat_get); 1551 GNUNET_STATISTICS_get_cancel (stat_get);
1651 process_stat_done (NULL, GNUNET_NO); 1552 process_stat_done (NULL,
1553 GNUNET_NO);
1652} 1554}
1653 1555
1654 1556
@@ -1658,20 +1560,7 @@ stat_timeout (void *cls)
1658static void 1560static void
1659cleaning_task (void *cls) 1561cleaning_task (void *cls)
1660{ 1562{
1661 struct TransmitCallbackContext *tcc;
1662
1663 cleaning_done = GNUNET_YES; 1563 cleaning_done = GNUNET_YES;
1664 while (NULL != (tcc = tcc_head))
1665 {
1666 GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1667 if (tcc->th != NULL)
1668 {
1669 GNUNET_SERVER_notify_transmit_ready_cancel (tcc->th);
1670 GNUNET_SERVER_client_drop (tcc->client);
1671 }
1672 GNUNET_free (tcc->msg);
1673 GNUNET_free (tcc);
1674 }
1675 if (NULL != expired_kill_task) 1564 if (NULL != expired_kill_task)
1676 { 1565 {
1677 GNUNET_SCHEDULER_cancel (expired_kill_task); 1566 GNUNET_SCHEDULER_cancel (expired_kill_task);
@@ -1721,23 +1610,40 @@ cleaning_task (void *cls)
1721 1610
1722 1611
1723/** 1612/**
1724 * Function that removes all active reservations made 1613 * Add a client to our list of active clients.
1725 * by the given client and releases the space for other 1614 *
1726 * requests. 1615 * @param cls NULL
1616 * @param client client to add
1617 * @param mq message queue for @a client
1618 * @return @a client
1619 */
1620static void *
1621client_connect_cb (void *cls,
1622 struct GNUNET_SERVICE_Client *client,
1623 struct GNUNET_MQ_Handle *mq)
1624{
1625 return client;
1626}
1627
1628
1629/**
1630 * Called whenever a client is disconnected.
1631 * Frees our resources associated with that client.
1727 * 1632 *
1728 * @param cls closure 1633 * @param cls closure
1729 * @param client identification of the client 1634 * @param client identification of the client
1635 * @param app_ctx must match @a client
1730 */ 1636 */
1731static void 1637static void
1732cleanup_reservations (void *cls, 1638client_disconnect_cb (void *cls,
1733 struct GNUNET_SERVER_Client *client) 1639 struct GNUNET_SERVICE_Client *client,
1640 void *app_ctx)
1734{ 1641{
1735 struct ReservationList *pos; 1642 struct ReservationList *pos;
1736 struct ReservationList *prev; 1643 struct ReservationList *prev;
1737 struct ReservationList *next; 1644 struct ReservationList *next;
1738 1645
1739 if (NULL == client) 1646 GNUNET_assert (app_ctx == client);
1740 return;
1741 prev = NULL; 1647 prev = NULL;
1742 pos = reservations; 1648 pos = reservations;
1743 while (NULL != pos) 1649 while (NULL != pos)
@@ -1762,6 +1668,7 @@ cleanup_reservations (void *cls,
1762 gettext_noop ("# reserved"), 1668 gettext_noop ("# reserved"),
1763 reserved, 1669 reserved,
1764 GNUNET_NO); 1670 GNUNET_NO);
1671
1765} 1672}
1766 1673
1767 1674
@@ -1769,19 +1676,19 @@ cleanup_reservations (void *cls,
1769 * Process datastore requests. 1676 * Process datastore requests.
1770 * 1677 *
1771 * @param cls closure 1678 * @param cls closure
1772 * @param serv the initialized server 1679 * @param serv the initialized service
1773 * @param c configuration to use 1680 * @param c configuration to use
1774 */ 1681 */
1775static void 1682static void
1776run (void *cls, 1683run (void *cls,
1777 struct GNUNET_SERVER_Handle *serv, 1684 const struct GNUNET_CONFIGURATION_Handle *c,
1778 const struct GNUNET_CONFIGURATION_Handle *c) 1685 struct GNUNET_SERVICE_Handle *serv)
1779{ 1686{
1780 char *fn; 1687 char *fn;
1781 char *pfn; 1688 char *pfn;
1782 unsigned int bf_size; 1689 unsigned int bf_size;
1783 1690
1784 server = serv; 1691 service = serv;
1785 cfg = c; 1692 cfg = c;
1786 if (GNUNET_OK != 1693 if (GNUNET_OK !=
1787 GNUNET_CONFIGURATION_get_value_string (cfg, 1694 GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -1789,25 +1696,27 @@ run (void *cls,
1789 "DATABASE", 1696 "DATABASE",
1790 &plugin_name)) 1697 &plugin_name))
1791 { 1698 {
1792 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1699 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1793 _("No `%s' specified for `%s' in configuration!\n"), 1700 "DATABASE",
1794 "DATABASE", 1701 "DATASTORE");
1795 "DATASTORE");
1796 return; 1702 return;
1797 } 1703 }
1798 GNUNET_asprintf (&quota_stat_name, 1704 GNUNET_asprintf (&quota_stat_name,
1799 _("# bytes used in file-sharing datastore `%s'"), 1705 _("# bytes used in file-sharing datastore `%s'"),
1800 plugin_name); 1706 plugin_name);
1801 if (GNUNET_OK != 1707 if (GNUNET_OK !=
1802 GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota)) 1708 GNUNET_CONFIGURATION_get_value_size (cfg,
1709 "DATASTORE",
1710 "QUOTA",
1711 &quota))
1803 { 1712 {
1804 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1713 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1805 _("No `%s' specified for `%s' in configuration!\n"), 1714 "QUOTA",
1806 "QUOTA", 1715 "DATASTORE");
1807 "DATASTORE");
1808 return; 1716 return;
1809 } 1717 }
1810 stats = GNUNET_STATISTICS_create ("datastore", cfg); 1718 stats = GNUNET_STATISTICS_create ("datastore",
1719 cfg);
1811 GNUNET_STATISTICS_set (stats, 1720 GNUNET_STATISTICS_set (stats,
1812 gettext_noop ("# quota"), 1721 gettext_noop ("# quota"),
1813 quota, 1722 quota,
@@ -1887,7 +1796,9 @@ run (void *cls,
1887 } 1796 }
1888 else 1797 else
1889 { 1798 {
1890 filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5); /* approx. 3% false positives at max use */ 1799 filter = GNUNET_CONTAINER_bloomfilter_init (NULL,
1800 bf_size,
1801 5); /* approx. 3% false positives at max use */
1891 refresh_bf = GNUNET_YES; 1802 refresh_bf = GNUNET_YES;
1892 } 1803 }
1893 GNUNET_free_non_null (fn); 1804 GNUNET_free_non_null (fn);
@@ -1897,12 +1808,13 @@ run (void *cls,
1897 _("Failed to initialize bloomfilter.\n")); 1808 _("Failed to initialize bloomfilter.\n"));
1898 if (NULL != stats) 1809 if (NULL != stats)
1899 { 1810 {
1900 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 1811 GNUNET_STATISTICS_destroy (stats,
1812 GNUNET_YES);
1901 stats = NULL; 1813 stats = NULL;
1902 } 1814 }
1903 return; 1815 return;
1904 } 1816 }
1905 GNUNET_SERVER_suspend (server); 1817 GNUNET_SERVICE_suspend (service);
1906 stat_get = 1818 stat_get =
1907 GNUNET_STATISTICS_get (stats, 1819 GNUNET_STATISTICS_get (stats,
1908 "datastore", 1820 "datastore",
@@ -1911,39 +1823,69 @@ run (void *cls,
1911 &process_stat_in, 1823 &process_stat_in,
1912 NULL); 1824 NULL);
1913 if (NULL == stat_get) 1825 if (NULL == stat_get)
1914 process_stat_done (NULL, GNUNET_SYSERR); 1826 process_stat_done (NULL,
1827 GNUNET_SYSERR);
1915 else 1828 else
1916 stat_timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 1829 stat_timeout_task
1917 &stat_timeout, 1830 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1918 NULL); 1831 &stat_timeout,
1919 GNUNET_SERVER_disconnect_notify (server, 1832 NULL);
1920 &cleanup_reservations,
1921 NULL);
1922 GNUNET_SCHEDULER_add_shutdown (&cleaning_task, 1833 GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1923 NULL); 1834 NULL);
1924} 1835}
1925 1836
1926 1837
1927/** 1838/**
1928 * The main function for the datastore service. 1839 * Define "main" method using service macro.
1929 * 1840 */
1930 * @param argc number of arguments from the command line 1841GNUNET_SERVICE_MAIN
1931 * @param argv command line arguments 1842("datastore",
1932 * @return 0 ok, 1 on error 1843 GNUNET_SERVICE_OPTION_NONE,
1933 */ 1844 &run,
1934int 1845 &client_connect_cb,
1935main (int argc, 1846 &client_disconnect_cb,
1936 char *const *argv) 1847 NULL,
1937{ 1848 GNUNET_MQ_hd_fixed_size (reserve,
1938 int ret; 1849 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1939 1850 struct ReserveMessage,
1940 ret = 1851 NULL),
1941 (GNUNET_OK == 1852 GNUNET_MQ_hd_fixed_size (release_reserve,
1942 GNUNET_SERVICE_run (argc, argv, "datastore", 1853 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1943 GNUNET_SERVICE_OPTION_NONE, 1854 struct ReleaseReserveMessage,
1944 &run, NULL)) ? 0 : 1; 1855 NULL),
1945 return ret; 1856 GNUNET_MQ_hd_var_size (put,
1946} 1857 GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1858 struct DataMessage,
1859 NULL),
1860 GNUNET_MQ_hd_fixed_size (update,
1861 GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1862 struct UpdateMessage,
1863 NULL),
1864 GNUNET_MQ_hd_fixed_size (get,
1865 GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1866 struct GetMessage,
1867 NULL),
1868 GNUNET_MQ_hd_fixed_size (get_key,
1869 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1870 struct GetKeyMessage,
1871 NULL),
1872 GNUNET_MQ_hd_fixed_size (get_replication,
1873 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1874 struct GNUNET_MessageHeader,
1875 NULL),
1876 GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1877 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1878 struct GetZeroAnonymityMessage,
1879 NULL),
1880 GNUNET_MQ_hd_var_size (remove,
1881 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1882 struct DataMessage,
1883 NULL),
1884 GNUNET_MQ_hd_fixed_size (drop,
1885 GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1886 struct GNUNET_MessageHeader,
1887 NULL),
1888 GNUNET_MQ_handler_end ());
1947 1889
1948 1890
1949/* end of gnunet-service-datastore.c */ 1891/* end of gnunet-service-datastore.c */