aboutsummaryrefslogtreecommitdiff
path: root/src/service/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/datastore/datastore_api.c')
-rw-r--r--src/service/datastore/datastore_api.c1393
1 files changed, 1393 insertions, 0 deletions
diff --git a/src/service/datastore/datastore_api.c b/src/service/datastore/datastore_api.c
new file mode 100644
index 000000000..4d27efb4e
--- /dev/null
+++ b/src/service/datastore/datastore_api.c
@@ -0,0 +1,1393 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet_arm_service.h"
29#include "gnunet_constants.h"
30#include "gnunet_datastore_service.h"
31#include "gnunet_statistics_service.h"
32#include "datastore.h"
33
34#define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38/**
39 * Collect an instance number of statistics? May cause excessive IPC.
40 */
41#define INSANE_STATISTICS GNUNET_NO
42
43/**
44 * If a client stopped asking for more results, how many more do
45 * we receive from the DB before killing the connection? Trade-off
46 * between re-doing TCP handshakes and (needlessly) receiving
47 * useless results.
48 */
49#define MAX_EXCESS_RESULTS 8
50
51/**
52 * Context for processing status messages.
53 */
54struct StatusContext
55{
56 /**
57 * Continuation to call with the status.
58 */
59 GNUNET_DATASTORE_ContinuationWithStatus cont;
60
61 /**
62 * Closure for @e cont.
63 */
64 void *cont_cls;
65};
66
67
68/**
69 * Context for processing result messages.
70 */
71struct ResultContext
72{
73 /**
74 * Function to call with the result.
75 */
76 GNUNET_DATASTORE_DatumProcessor proc;
77
78 /**
79 * Closure for @e proc.
80 */
81 void *proc_cls;
82};
83
84
85/**
86 * Context for a queue operation.
87 */
88union QueueContext
89{
90 struct StatusContext sc;
91
92 struct ResultContext rc;
93};
94
95
96/**
97 * Entry in our priority queue.
98 */
99struct GNUNET_DATASTORE_QueueEntry
100{
101 /**
102 * This is a linked list.
103 */
104 struct GNUNET_DATASTORE_QueueEntry *next;
105
106 /**
107 * This is a linked list.
108 */
109 struct GNUNET_DATASTORE_QueueEntry *prev;
110
111 /**
112 * Handle to the master context.
113 */
114 struct GNUNET_DATASTORE_Handle *h;
115
116 /**
117 * Function to call after transmission of the request.
118 */
119 GNUNET_DATASTORE_ContinuationWithStatus cont;
120
121 /**
122 * Closure for @e cont.
123 */
124 void *cont_cls;
125
126 /**
127 * Context for the operation.
128 */
129 union QueueContext qc;
130
131 /**
132 * Envelope of the request to transmit, NULL after
133 * transmission.
134 */
135 struct GNUNET_MQ_Envelope *env;
136
137 /**
138 * Task we run if this entry stalls the queue and we
139 * need to warn the user.
140 */
141 struct GNUNET_SCHEDULER_Task *delay_warn_task;
142
143 /**
144 * Priority in the queue.
145 */
146 unsigned int priority;
147
148 /**
149 * Maximum allowed length of queue (otherwise
150 * this request should be discarded).
151 */
152 unsigned int max_queue;
153
154 /**
155 * Expected response type.
156 */
157 uint16_t response_type;
158};
159
160
161/**
162 * Handle to the datastore service.
163 */
164struct GNUNET_DATASTORE_Handle
165{
166 /**
167 * Our configuration.
168 */
169 const struct GNUNET_CONFIGURATION_Handle *cfg;
170
171 /**
172 * Current connection to the datastore service.
173 */
174 struct GNUNET_MQ_Handle *mq;
175
176 /**
177 * Handle for statistics.
178 */
179 struct GNUNET_STATISTICS_Handle *stats;
180
181 /**
182 * Current head of priority queue.
183 */
184 struct GNUNET_DATASTORE_QueueEntry *queue_head;
185
186 /**
187 * Current tail of priority queue.
188 */
189 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
190
191 /**
192 * Task for trying to reconnect.
193 */
194 struct GNUNET_SCHEDULER_Task *reconnect_task;
195
196 /**
197 * How quickly should we retry? Used for exponential back-off on
198 * connect-errors.
199 */
200 struct GNUNET_TIME_Relative retry_time;
201
202 /**
203 * Number of entries in the queue.
204 */
205 unsigned int queue_size;
206
207 /**
208 * Number of results we're receiving for the current query
209 * after application stopped to care. Used to determine when
210 * to reset the connection.
211 */
212 unsigned int result_count;
213
214 /**
215 * We should ignore the next message(s) from the service.
216 */
217 unsigned int skip_next_messages;
218};
219
220
221/**
222 * Try reconnecting to the datastore service.
223 *
224 * @param cls the `struct GNUNET_DATASTORE_Handle`
225 */
226static void
227try_reconnect (void *cls);
228
229
230/**
231 * Disconnect from the service and then try reconnecting to the datastore service
232 * after some delay.
233 *
234 * @param h handle to datastore to disconnect and reconnect
235 */
236static void
237do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238{
239 if (NULL == h->mq)
240 {
241 GNUNET_break (0);
242 return;
243 }
244 GNUNET_MQ_destroy (h->mq);
245 h->mq = NULL;
246 h->skip_next_messages = 0;
247 h->reconnect_task
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249 &try_reconnect,
250 h);
251}
252
253
254/**
255 * Free a queue entry. Removes the given entry from the
256 * queue and releases associated resources. Does NOT
257 * call the callback.
258 *
259 * @param qe entry to free.
260 */
261static void
262free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263{
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
267 h->queue_tail,
268 qe);
269 h->queue_size--;
270 if (NULL != qe->env)
271 GNUNET_MQ_discard (qe->env);
272 if (NULL != qe->delay_warn_task)
273 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
274 GNUNET_free (qe);
275}
276
277
278/**
279 * Task that logs an error after some time.
280 *
281 * @param cls `struct GNUNET_DATASTORE_QueueEntry` about which the error is
282 */
283static void
284delay_warning (void *cls)
285{
286 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
287
288 qe->delay_warn_task = NULL;
289 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
290 "Request %p of type %u at head of datastore queue for more than %s\n",
291 qe,
292 (unsigned int) qe->response_type,
293 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
294 GNUNET_YES));
295 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
296 &delay_warning,
297 qe);
298}
299
300
301/**
302 * Handle error in sending drop request to datastore.
303 *
304 * @param cls closure with the datastore handle
305 * @param error error code
306 */
307static void
308mq_error_handler (void *cls,
309 enum GNUNET_MQ_Error error)
310{
311 struct GNUNET_DATASTORE_Handle *h = cls;
312 struct GNUNET_DATASTORE_QueueEntry *qe;
313
314 LOG (GNUNET_ERROR_TYPE_DEBUG,
315 "MQ error, reconnecting to DATASTORE\n");
316 do_disconnect (h);
317 qe = h->queue_head;
318 if (NULL == qe)
319 return;
320 if (NULL != qe->delay_warn_task)
321 {
322 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
323 qe->delay_warn_task = NULL;
324 }
325 if (NULL == qe->env)
326 {
327 union QueueContext qc = qe->qc;
328 uint16_t rt = qe->response_type;
329
330 LOG (GNUNET_ERROR_TYPE_DEBUG,
331 "Failed to receive response from database.\n");
332 free_queue_entry (qe);
333 switch (rt)
334 {
335 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
336 if (NULL != qc.sc.cont)
337 qc.sc.cont (qc.sc.cont_cls,
338 GNUNET_SYSERR,
339 GNUNET_TIME_UNIT_ZERO_ABS,
340 _ ("DATASTORE disconnected"));
341 break;
342
343 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
344 if (NULL != qc.rc.proc)
345 qc.rc.proc (qc.rc.proc_cls,
346 NULL,
347 0,
348 NULL,
349 0,
350 0,
351 0,
352 0,
353 GNUNET_TIME_UNIT_ZERO_ABS,
354 0);
355 break;
356
357 default:
358 GNUNET_break (0);
359 }
360 }
361}
362
363
364/**
365 * Connect to the datastore service.
366 *
367 * @param cfg configuration to use
368 * @return handle to use to access the service
369 */
370struct GNUNET_DATASTORE_Handle *
371GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
372{
373 struct GNUNET_DATASTORE_Handle *h;
374
375 LOG (GNUNET_ERROR_TYPE_DEBUG,
376 "Establishing DATASTORE connection!\n");
377 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
378 h->cfg = cfg;
379 try_reconnect (h);
380 if (NULL == h->mq)
381 {
382 GNUNET_free (h);
383 return NULL;
384 }
385 h->stats = GNUNET_STATISTICS_create ("datastore-api",
386 cfg);
387 return h;
388}
389
390
391/**
392 * Task used by to disconnect from the datastore after
393 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
394 *
395 * @param cls the datastore handle
396 */
397static void
398disconnect_after_drop (void *cls)
399{
400 struct GNUNET_DATASTORE_Handle *h = cls;
401
402 LOG (GNUNET_ERROR_TYPE_DEBUG,
403 "Drop sent, disconnecting\n");
404 GNUNET_DATASTORE_disconnect (h,
405 GNUNET_NO);
406}
407
408
409/**
410 * Handle error in sending drop request to datastore.
411 *
412 * @param cls closure with the datastore handle
413 * @param error error code
414 */
415static void
416disconnect_on_mq_error (void *cls,
417 enum GNUNET_MQ_Error error)
418{
419 struct GNUNET_DATASTORE_Handle *h = cls;
420
421 LOG (GNUNET_ERROR_TYPE_ERROR,
422 "Failed to ask datastore to drop tables\n");
423 GNUNET_DATASTORE_disconnect (h,
424 GNUNET_NO);
425}
426
427
428/**
429 * Disconnect from the datastore service (and free
430 * associated resources).
431 *
432 * @param h handle to the datastore
433 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
434 */
435void
436GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
437 int drop)
438{
439 struct GNUNET_DATASTORE_QueueEntry *qe;
440
441 LOG (GNUNET_ERROR_TYPE_DEBUG,
442 "Datastore disconnect\n");
443 if (NULL != h->mq)
444 {
445 GNUNET_MQ_destroy (h->mq);
446 h->mq = NULL;
447 }
448 if (NULL != h->reconnect_task)
449 {
450 GNUNET_SCHEDULER_cancel (h->reconnect_task);
451 h->reconnect_task = NULL;
452 }
453 while (NULL != (qe = h->queue_head))
454 {
455 switch (qe->response_type)
456 {
457 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
458 if (NULL != qe->qc.sc.cont)
459 qe->qc.sc.cont (qe->qc.sc.cont_cls,
460 GNUNET_SYSERR,
461 GNUNET_TIME_UNIT_ZERO_ABS,
462 _ ("Disconnected from DATASTORE"));
463 break;
464
465 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
466 if (NULL != qe->qc.rc.proc)
467 qe->qc.rc.proc (qe->qc.rc.proc_cls,
468 NULL,
469 0,
470 NULL,
471 0,
472 0,
473 0,
474 0,
475 GNUNET_TIME_UNIT_ZERO_ABS,
476 0);
477 break;
478
479 default:
480 GNUNET_break (0);
481 }
482 free_queue_entry (qe);
483 }
484 if (GNUNET_YES == drop)
485 {
486 LOG (GNUNET_ERROR_TYPE_DEBUG,
487 "Re-connecting to issue DROP!\n");
488 GNUNET_assert (NULL == h->mq);
489 h->mq = GNUNET_CLIENT_connect (h->cfg,
490 "datastore",
491 NULL,
492 &disconnect_on_mq_error,
493 h);
494 if (NULL != h->mq)
495 {
496 struct GNUNET_MessageHeader *hdr;
497 struct GNUNET_MQ_Envelope *env;
498
499 env = GNUNET_MQ_msg (hdr,
500 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
501 GNUNET_MQ_notify_sent (env,
502 &disconnect_after_drop,
503 h);
504 GNUNET_MQ_send (h->mq,
505 env);
506 return;
507 }
508 GNUNET_break (0);
509 }
510 GNUNET_STATISTICS_destroy (h->stats,
511 GNUNET_NO);
512 h->stats = NULL;
513 GNUNET_free (h);
514}
515
516
517/**
518 * Create a new entry for our priority queue (and possibly discard other entries if
519 * the queue is getting too long).
520 *
521 * @param h handle to the datastore
522 * @param env envelope with the message to queue
523 * @param queue_priority priority of the entry
524 * @param max_queue_size at what queue size should this request be dropped
525 * (if other requests of higher priority are in the queue)
526 * @param expected_type which type of response do we expect,
527 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
528 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
529 * @param qc client context (NOT a closure for @a response_proc)
530 * @return NULL if the queue is full
531 */
532static struct GNUNET_DATASTORE_QueueEntry *
533make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
534 struct GNUNET_MQ_Envelope *env,
535 unsigned int queue_priority,
536 unsigned int max_queue_size,
537 uint16_t expected_type,
538 const union QueueContext *qc)
539{
540 struct GNUNET_DATASTORE_QueueEntry *qe;
541 struct GNUNET_DATASTORE_QueueEntry *pos;
542 unsigned int c;
543
544 if ((NULL != h->queue_tail) &&
545 (h->queue_tail->priority >= queue_priority))
546 {
547 c = h->queue_size;
548 pos = NULL;
549 }
550 else
551 {
552 c = 0;
553 pos = h->queue_head;
554 }
555 while ((NULL != pos) &&
556 (c < max_queue_size) &&
557 (pos->priority >= queue_priority))
558 {
559 c++;
560 pos = pos->next;
561 }
562 if (c >= max_queue_size)
563 {
564 GNUNET_STATISTICS_update (h->stats,
565 gettext_noop ("# queue overflows"),
566 1,
567 GNUNET_NO);
568 GNUNET_MQ_discard (env);
569 return NULL;
570 }
571 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
572 qe->h = h;
573 qe->env = env;
574 qe->response_type = expected_type;
575 qe->qc = *qc;
576 qe->priority = queue_priority;
577 qe->max_queue = max_queue_size;
578 if (NULL == pos)
579 {
580 /* append at the tail */
581 pos = h->queue_tail;
582 }
583 else
584 {
585 pos = pos->prev;
586 /* do not insert at HEAD if HEAD query was already
587 * transmitted and we are still receiving replies! */
588 if ((NULL == pos) &&
589 (NULL == h->queue_head->env))
590 pos = h->queue_head;
591 }
592 c++;
593#if INSANE_STATISTICS
594 GNUNET_STATISTICS_update (h->stats,
595 gettext_noop ("# queue entries created"),
596 1,
597 GNUNET_NO);
598#endif
599 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
600 h->queue_tail,
601 pos,
602 qe);
603 h->queue_size++;
604 return qe;
605}
606
607
608/**
609 * Process entries in the queue (or do nothing if we are already
610 * doing so).
611 *
612 * @param h handle to the datastore
613 */
614static void
615process_queue (struct GNUNET_DATASTORE_Handle *h)
616{
617 struct GNUNET_DATASTORE_QueueEntry *qe;
618
619 if (NULL == (qe = h->queue_head))
620 {
621 /* no entry in queue */
622 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Queue empty\n");
624 return;
625 }
626 if (NULL == qe->env)
627 {
628 /* waiting for replies */
629 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 "Head request already transmitted\n");
631 return;
632 }
633 if (NULL == h->mq)
634 {
635 /* waiting for reconnect */
636 LOG (GNUNET_ERROR_TYPE_DEBUG,
637 "Not connected\n");
638 return;
639 }
640 GNUNET_assert (NULL == qe->delay_warn_task);
641 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
642 &delay_warning,
643 qe);
644 GNUNET_MQ_send (h->mq,
645 qe->env);
646 qe->env = NULL;
647}
648
649
650/**
651 * Get the entry at the head of the message queue.
652 *
653 * @param h handle to the datastore
654 * @param response_type the expected response type
655 * @return the queue entry
656 */
657static struct GNUNET_DATASTORE_QueueEntry *
658get_queue_head (struct GNUNET_DATASTORE_Handle *h,
659 uint16_t response_type)
660{
661 struct GNUNET_DATASTORE_QueueEntry *qe;
662
663 if (h->skip_next_messages > 0)
664 {
665 h->skip_next_messages--;
666 process_queue (h);
667 return NULL;
668 }
669 qe = h->queue_head;
670 if (NULL == qe)
671 {
672 GNUNET_break (0);
673 do_disconnect (h);
674 return NULL;
675 }
676 if (NULL != qe->env)
677 {
678 GNUNET_break (0);
679 do_disconnect (h);
680 return NULL;
681 }
682 if (response_type != qe->response_type)
683 {
684 GNUNET_break (0);
685 do_disconnect (h);
686 return NULL;
687 }
688 return qe;
689}
690
691
692/**
693 * Function called to check status message from the service.
694 *
695 * @param cls closure
696 * @param sm status message received
697 * @return #GNUNET_OK if the message is well-formed
698 */
699static int
700check_status (void *cls,
701 const struct StatusMessage *sm)
702{
703 uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
704 int32_t status = ntohl (sm->status);
705
706 if (msize > 0)
707 {
708 const char *emsg = (const char *) &sm[1];
709
710 if ('\0' != emsg[msize - 1])
711 {
712 GNUNET_break (0);
713 return GNUNET_SYSERR;
714 }
715 }
716 else if (GNUNET_SYSERR == status)
717 {
718 GNUNET_break (0);
719 return GNUNET_SYSERR;
720 }
721 return GNUNET_OK;
722}
723
724
725/**
726 * Function called to handle status message from the service.
727 *
728 * @param cls closure
729 * @param sm status message received
730 */
731static void
732handle_status (void *cls,
733 const struct StatusMessage *sm)
734{
735 struct GNUNET_DATASTORE_Handle *h = cls;
736 struct GNUNET_DATASTORE_QueueEntry *qe;
737 struct StatusContext rc;
738 const char *emsg;
739 int32_t status = ntohl (sm->status);
740
741 qe = get_queue_head (h,
742 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
743 if (NULL == qe)
744 return;
745 rc = qe->qc.sc;
746 free_queue_entry (qe);
747 if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
748 emsg = (const char *) &sm[1];
749 else
750 emsg = NULL;
751 LOG (GNUNET_ERROR_TYPE_DEBUG,
752 "Received status %d/%s\n",
753 (int) status,
754 emsg);
755 GNUNET_STATISTICS_update (h->stats,
756 gettext_noop ("# status messages received"),
757 1,
758 GNUNET_NO);
759 h->retry_time = GNUNET_TIME_UNIT_ZERO;
760 process_queue (h);
761 if (NULL != rc.cont)
762 rc.cont (rc.cont_cls,
763 status,
764 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
765 emsg);
766}
767
768
769/**
770 * Check data message we received from the service.
771 *
772 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
773 * @param dm message received
774 */
775static int
776check_data (void *cls,
777 const struct DataMessage *dm)
778{
779 uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
780
781 if (msize != ntohl (dm->size))
782 {
783 GNUNET_break (0);
784 return GNUNET_SYSERR;
785 }
786 return GNUNET_OK;
787}
788
789
790/**
791 * Handle data message we got from the service.
792 *
793 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
794 * @param dm message received
795 */
796static void
797handle_data (void *cls,
798 const struct DataMessage *dm)
799{
800 struct GNUNET_DATASTORE_Handle *h = cls;
801 struct GNUNET_DATASTORE_QueueEntry *qe;
802 struct ResultContext rc;
803
804 qe = get_queue_head (h,
805 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
806 if (NULL == qe)
807 return;
808#if INSANE_STATISTICS
809 GNUNET_STATISTICS_update (h->stats,
810 gettext_noop ("# Results received"),
811 1,
812 GNUNET_NO);
813#endif
814 LOG (GNUNET_ERROR_TYPE_DEBUG,
815 "Received result %llu with type %u and size %u with key %s\n",
816 (unsigned long long) GNUNET_ntohll (dm->uid),
817 ntohl (dm->type),
818 ntohl (dm->size),
819 GNUNET_h2s (&dm->key));
820 rc = qe->qc.rc;
821 free_queue_entry (qe);
822 h->retry_time = GNUNET_TIME_UNIT_ZERO;
823 process_queue (h);
824 if (NULL != rc.proc)
825 rc.proc (rc.proc_cls,
826 &dm->key,
827 ntohl (dm->size),
828 &dm[1],
829 ntohl (dm->type),
830 ntohl (dm->priority),
831 ntohl (dm->anonymity),
832 ntohl (dm->replication),
833 GNUNET_TIME_absolute_ntoh (dm->expiration),
834 GNUNET_ntohll (dm->uid));
835}
836
837
838/**
839 * Type of a function to call when we receive a
840 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
841 *
842 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
843 * @param msg message received
844 */
845static void
846handle_data_end (void *cls,
847 const struct GNUNET_MessageHeader *msg)
848{
849 struct GNUNET_DATASTORE_Handle *h = cls;
850 struct GNUNET_DATASTORE_QueueEntry *qe;
851 struct ResultContext rc;
852
853 qe = get_queue_head (h,
854 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
855 if (NULL == qe)
856 return;
857 rc = qe->qc.rc;
858 free_queue_entry (qe);
859 LOG (GNUNET_ERROR_TYPE_DEBUG,
860 "Received end of result set, new queue size is %u\n",
861 h->queue_size);
862 h->retry_time = GNUNET_TIME_UNIT_ZERO;
863 h->result_count = 0;
864 process_queue (h);
865 /* signal end of iteration */
866 if (NULL != rc.proc)
867 rc.proc (rc.proc_cls,
868 NULL,
869 0,
870 NULL,
871 0,
872 0,
873 0,
874 0,
875 GNUNET_TIME_UNIT_ZERO_ABS,
876 0);
877}
878
879
880/**
881 * Try reconnecting to the datastore service.
882 *
883 * @param cls the `struct GNUNET_DATASTORE_Handle`
884 */
885static void
886try_reconnect (void *cls)
887{
888 struct GNUNET_DATASTORE_Handle *h = cls;
889 struct GNUNET_MQ_MessageHandler handlers[] = {
890 GNUNET_MQ_hd_var_size (status,
891 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
892 struct StatusMessage,
893 h),
894 GNUNET_MQ_hd_var_size (data,
895 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
896 struct DataMessage,
897 h),
898 GNUNET_MQ_hd_fixed_size (data_end,
899 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
900 struct GNUNET_MessageHeader,
901 h),
902 GNUNET_MQ_handler_end ()
903 };
904
905 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
906 h->reconnect_task = NULL;
907 GNUNET_assert (NULL == h->mq);
908 h->mq = GNUNET_CLIENT_connect (h->cfg,
909 "datastore",
910 handlers,
911 &mq_error_handler,
912 h);
913 if (NULL == h->mq)
914 return;
915 GNUNET_STATISTICS_update (h->stats,
916 gettext_noop (
917 "# datastore connections (re)created"),
918 1,
919 GNUNET_NO);
920 LOG (GNUNET_ERROR_TYPE_DEBUG,
921 "Reconnected to DATASTORE\n");
922 process_queue (h);
923}
924
925
926/**
927 * Dummy continuation used to do nothing (but be non-zero).
928 *
929 * @param cls closure
930 * @param result result
931 * @param min_expiration expiration time
932 * @param emsg error message
933 */
934static void
935drop_status_cont (void *cls,
936 int32_t result,
937 struct GNUNET_TIME_Absolute min_expiration,
938 const char *emsg)
939{
940 /* do nothing */
941}
942
943
944struct GNUNET_DATASTORE_QueueEntry *
945GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
946 uint32_t rid,
947 const struct GNUNET_HashCode *key,
948 size_t size,
949 const void *data,
950 enum GNUNET_BLOCK_Type type,
951 uint32_t priority,
952 uint32_t anonymity,
953 uint32_t replication,
954 struct GNUNET_TIME_Absolute expiration,
955 unsigned int queue_priority,
956 unsigned int max_queue_size,
957 GNUNET_DATASTORE_ContinuationWithStatus cont,
958 void *cont_cls)
959{
960 struct GNUNET_DATASTORE_QueueEntry *qe;
961 struct GNUNET_MQ_Envelope *env;
962 struct DataMessage *dm;
963 union QueueContext qc;
964
965 if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
966 {
967 GNUNET_break (0);
968 return NULL;
969 }
970
971 LOG (GNUNET_ERROR_TYPE_DEBUG,
972 "Asked to put %lu bytes of data under key `%s' for %s\n",
973 (unsigned long) size,
974 GNUNET_h2s (key),
975 GNUNET_STRINGS_relative_time_to_string (
976 GNUNET_TIME_absolute_get_remaining (expiration),
977 GNUNET_YES));
978 env = GNUNET_MQ_msg_extra (dm,
979 size,
980 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
981 dm->rid = htonl (rid);
982 dm->size = htonl ((uint32_t) size);
983 dm->type = htonl (type);
984 dm->priority = htonl (priority);
985 dm->anonymity = htonl (anonymity);
986 dm->replication = htonl (replication);
987 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
988 dm->key = *key;
989 GNUNET_memcpy (&dm[1],
990 data,
991 size);
992 qc.sc.cont = cont;
993 qc.sc.cont_cls = cont_cls;
994 qe = make_queue_entry (h,
995 env,
996 queue_priority,
997 max_queue_size,
998 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
999 &qc);
1000 if (NULL == qe)
1001 {
1002 LOG (GNUNET_ERROR_TYPE_DEBUG,
1003 "Could not create queue entry for PUT\n");
1004 return NULL;
1005 }
1006 GNUNET_STATISTICS_update (h->stats,
1007 gettext_noop ("# PUT requests executed"),
1008 1,
1009 GNUNET_NO);
1010 process_queue (h);
1011 return qe;
1012}
1013
1014
1015/**
1016 * Reserve space in the datastore. This function should be used
1017 * to avoid "out of space" failures during a longer sequence of "put"
1018 * operations (for example, when a file is being inserted).
1019 *
1020 * @param h handle to the datastore
1021 * @param amount how much space (in bytes) should be reserved (for content only)
1022 * @param entries how many entries will be created (to calculate per-entry overhead)
1023 * @param cont continuation to call when done; "success" will be set to
1024 * a positive reservation value if space could be reserved.
1025 * @param cont_cls closure for @a cont
1026 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1027 * cancel; note that even if NULL is returned, the callback will be invoked
1028 * (or rather, will already have been invoked)
1029 */
1030struct GNUNET_DATASTORE_QueueEntry *
1031GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1032 uint64_t amount,
1033 uint32_t entries,
1034 GNUNET_DATASTORE_ContinuationWithStatus cont,
1035 void *cont_cls)
1036{
1037 struct GNUNET_DATASTORE_QueueEntry *qe;
1038 struct GNUNET_MQ_Envelope *env;
1039 struct ReserveMessage *rm;
1040 union QueueContext qc;
1041
1042 if (NULL == cont)
1043 cont = &drop_status_cont;
1044 LOG (GNUNET_ERROR_TYPE_DEBUG,
1045 "Asked to reserve %llu bytes of data and %u entries\n",
1046 (unsigned long long) amount,
1047 (unsigned int) entries);
1048 env = GNUNET_MQ_msg (rm,
1049 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1050 rm->entries = htonl (entries);
1051 rm->amount = GNUNET_htonll (amount);
1052
1053 qc.sc.cont = cont;
1054 qc.sc.cont_cls = cont_cls;
1055 qe = make_queue_entry (h,
1056 env,
1057 UINT_MAX,
1058 UINT_MAX,
1059 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1060 &qc);
1061 if (NULL == qe)
1062 {
1063 LOG (GNUNET_ERROR_TYPE_DEBUG,
1064 "Could not create queue entry to reserve\n");
1065 return NULL;
1066 }
1067 GNUNET_STATISTICS_update (h->stats,
1068 gettext_noop ("# RESERVE requests executed"),
1069 1,
1070 GNUNET_NO);
1071 process_queue (h);
1072 return qe;
1073}
1074
1075
1076struct GNUNET_DATASTORE_QueueEntry *
1077GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1078 uint32_t rid,
1079 unsigned int queue_priority,
1080 unsigned int max_queue_size,
1081 GNUNET_DATASTORE_ContinuationWithStatus cont,
1082 void *cont_cls)
1083{
1084 struct GNUNET_DATASTORE_QueueEntry *qe;
1085 struct GNUNET_MQ_Envelope *env;
1086 struct ReleaseReserveMessage *rrm;
1087 union QueueContext qc;
1088
1089 if (NULL == cont)
1090 cont = &drop_status_cont;
1091 LOG (GNUNET_ERROR_TYPE_DEBUG,
1092 "Asked to release reserve %d\n",
1093 rid);
1094 env = GNUNET_MQ_msg (rrm,
1095 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1096 rrm->rid = htonl (rid);
1097 qc.sc.cont = cont;
1098 qc.sc.cont_cls = cont_cls;
1099 qe = make_queue_entry (h,
1100 env,
1101 queue_priority,
1102 max_queue_size,
1103 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1104 &qc);
1105 if (NULL == qe)
1106 {
1107 LOG (GNUNET_ERROR_TYPE_DEBUG,
1108 "Could not create queue entry to release reserve\n");
1109 return NULL;
1110 }
1111 GNUNET_STATISTICS_update (h->stats,
1112 gettext_noop
1113 ("# RELEASE RESERVE requests executed"), 1,
1114 GNUNET_NO);
1115 process_queue (h);
1116 return qe;
1117}
1118
1119
1120struct GNUNET_DATASTORE_QueueEntry *
1121GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1122 const struct GNUNET_HashCode *key,
1123 size_t size,
1124 const void *data,
1125 unsigned int queue_priority,
1126 unsigned int max_queue_size,
1127 GNUNET_DATASTORE_ContinuationWithStatus cont,
1128 void *cont_cls)
1129{
1130 struct GNUNET_DATASTORE_QueueEntry *qe;
1131 struct DataMessage *dm;
1132 struct GNUNET_MQ_Envelope *env;
1133 union QueueContext qc;
1134
1135 if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1136 {
1137 GNUNET_break (0);
1138 return NULL;
1139 }
1140 if (NULL == cont)
1141 cont = &drop_status_cont;
1142 LOG (GNUNET_ERROR_TYPE_DEBUG,
1143 "Asked to remove %lu bytes under key `%s'\n",
1144 (unsigned long) size,
1145 GNUNET_h2s (key));
1146 env = GNUNET_MQ_msg_extra (dm,
1147 size,
1148 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1149 dm->size = htonl (size);
1150 dm->key = *key;
1151 GNUNET_memcpy (&dm[1],
1152 data,
1153 size);
1154
1155 qc.sc.cont = cont;
1156 qc.sc.cont_cls = cont_cls;
1157
1158 qe = make_queue_entry (h,
1159 env,
1160 queue_priority,
1161 max_queue_size,
1162 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1163 &qc);
1164 if (NULL == qe)
1165 {
1166 LOG (GNUNET_ERROR_TYPE_DEBUG,
1167 "Could not create queue entry for REMOVE\n");
1168 return NULL;
1169 }
1170 GNUNET_STATISTICS_update (h->stats,
1171 gettext_noop ("# REMOVE requests executed"),
1172 1,
1173 GNUNET_NO);
1174 process_queue (h);
1175 return qe;
1176}
1177
1178
1179/**
1180 * Get a random value from the datastore for content replication.
1181 * Returns a single, random value among those with the highest
1182 * replication score, lowering positive replication scores by one for
1183 * the chosen value (if only content with a replication score exists,
1184 * a random value is returned and replication scores are not changed).
1185 *
1186 * @param h handle to the datastore
1187 * @param queue_priority ranking of this request in the priority queue
1188 * @param max_queue_size at what queue size should this request be dropped
1189 * (if other requests of higher priority are in the queue)
1190 * @param proc function to call on a random value; it
1191 * will be called once with a value (if available)
1192 * and always once with a value of NULL.
1193 * @param proc_cls closure for @a proc
1194 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1195 * cancel
1196 */
1197struct GNUNET_DATASTORE_QueueEntry *
1198GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1199 unsigned int queue_priority,
1200 unsigned int max_queue_size,
1201 GNUNET_DATASTORE_DatumProcessor proc,
1202 void *proc_cls)
1203{
1204 struct GNUNET_DATASTORE_QueueEntry *qe;
1205 struct GNUNET_MQ_Envelope *env;
1206 struct GNUNET_MessageHeader *m;
1207 union QueueContext qc;
1208
1209 GNUNET_assert (NULL != proc);
1210 LOG (GNUNET_ERROR_TYPE_DEBUG,
1211 "Asked to get replication entry\n");
1212 env = GNUNET_MQ_msg (m,
1213 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1214 qc.rc.proc = proc;
1215 qc.rc.proc_cls = proc_cls;
1216 qe = make_queue_entry (h,
1217 env,
1218 queue_priority,
1219 max_queue_size,
1220 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1221 &qc);
1222 if (NULL == qe)
1223 {
1224 LOG (GNUNET_ERROR_TYPE_DEBUG,
1225 "Could not create queue entry for GET REPLICATION\n");
1226 return NULL;
1227 }
1228 GNUNET_STATISTICS_update (h->stats,
1229 gettext_noop
1230 ("# GET REPLICATION requests executed"), 1,
1231 GNUNET_NO);
1232 process_queue (h);
1233 return qe;
1234}
1235
1236
1237struct GNUNET_DATASTORE_QueueEntry *
1238GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1239 uint64_t next_uid,
1240 unsigned int queue_priority,
1241 unsigned int max_queue_size,
1242 enum GNUNET_BLOCK_Type type,
1243 GNUNET_DATASTORE_DatumProcessor proc,
1244 void *proc_cls)
1245{
1246 struct GNUNET_DATASTORE_QueueEntry *qe;
1247 struct GNUNET_MQ_Envelope *env;
1248 struct GetZeroAnonymityMessage *m;
1249 union QueueContext qc;
1250
1251 GNUNET_assert (NULL != proc);
1252 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1253 LOG (GNUNET_ERROR_TYPE_DEBUG,
1254 "Asked to get a zero-anonymity entry of type %d\n",
1255 type);
1256 env = GNUNET_MQ_msg (m,
1257 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1258 m->type = htonl ((uint32_t) type);
1259 m->next_uid = GNUNET_htonll (next_uid);
1260 qc.rc.proc = proc;
1261 qc.rc.proc_cls = proc_cls;
1262 qe = make_queue_entry (h,
1263 env,
1264 queue_priority,
1265 max_queue_size,
1266 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1267 &qc);
1268 if (NULL == qe)
1269 {
1270 LOG (GNUNET_ERROR_TYPE_DEBUG,
1271 "Could not create queue entry for zero-anonymity procation\n");
1272 return NULL;
1273 }
1274 GNUNET_STATISTICS_update (h->stats,
1275 gettext_noop
1276 ("# GET ZERO ANONYMITY requests executed"), 1,
1277 GNUNET_NO);
1278 process_queue (h);
1279 return qe;
1280}
1281
1282
1283/**
1284 * Get a result for a particular key from the datastore. The processor
1285 * will only be called once.
1286 *
1287 * @param h handle to the datastore
1288 * @param next_uid return the result with lowest uid >= next_uid
1289 * @param random if true, return a random result instead of using next_uid
1290 * @param key maybe NULL (to match all entries)
1291 * @param type desired type, 0 for any
1292 * @param queue_priority ranking of this request in the priority queue
1293 * @param max_queue_size at what queue size should this request be dropped
1294 * (if other requests of higher priority are in the queue)
1295 * @param proc function to call on each matching value;
1296 * will be called once with a NULL value at the end
1297 * @param proc_cls closure for @a proc
1298 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1299 * cancel
1300 */
1301struct GNUNET_DATASTORE_QueueEntry *
1302GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1303 uint64_t next_uid,
1304 bool random,
1305 const struct GNUNET_HashCode *key,
1306 enum GNUNET_BLOCK_Type type,
1307 unsigned int queue_priority,
1308 unsigned int max_queue_size,
1309 GNUNET_DATASTORE_DatumProcessor proc,
1310 void *proc_cls)
1311{
1312 struct GNUNET_DATASTORE_QueueEntry *qe;
1313 struct GNUNET_MQ_Envelope *env;
1314 struct GetKeyMessage *gkm;
1315 struct GetMessage *gm;
1316 union QueueContext qc;
1317
1318 GNUNET_assert (NULL != proc);
1319 LOG (GNUNET_ERROR_TYPE_DEBUG,
1320 "Asked to look for data of type %u under key `%s'\n",
1321 (unsigned int) type,
1322 (NULL != key) ? GNUNET_h2s (key) : "NULL");
1323 if (NULL == key)
1324 {
1325 env = GNUNET_MQ_msg (gm,
1326 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1327 gm->type = htonl (type);
1328 gm->next_uid = GNUNET_htonll (next_uid);
1329 gm->random = random;
1330 }
1331 else
1332 {
1333 env = GNUNET_MQ_msg (gkm,
1334 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1335 gkm->type = htonl (type);
1336 gkm->next_uid = GNUNET_htonll (next_uid);
1337 gkm->random = random;
1338 gkm->key = *key;
1339 }
1340 qc.rc.proc = proc;
1341 qc.rc.proc_cls = proc_cls;
1342 qe = make_queue_entry (h,
1343 env,
1344 queue_priority,
1345 max_queue_size,
1346 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1347 &qc);
1348 if (NULL == qe)
1349 {
1350 LOG (GNUNET_ERROR_TYPE_DEBUG,
1351 "Could not queue request for `%s'\n",
1352 (NULL != key) ? GNUNET_h2s (key): "NULL");
1353 return NULL;
1354 }
1355#if INSANE_STATISTICS
1356 GNUNET_STATISTICS_update (h->stats,
1357 gettext_noop ("# GET requests executed"),
1358 1,
1359 GNUNET_NO);
1360#endif
1361 process_queue (h);
1362 return qe;
1363}
1364
1365
1366/**
1367 * Cancel a datastore operation. The final callback from the
1368 * operation must not have been done yet.
1369 *
1370 * @param qe operation to cancel
1371 */
1372void
1373GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1374{
1375 struct GNUNET_DATASTORE_Handle *h = qe->h;
1376
1377 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1379 qe,
1380 NULL == qe->env,
1381 h->queue_head == qe);
1382 if (NULL == qe->env)
1383 {
1384 free_queue_entry (qe);
1385 h->skip_next_messages++;
1386 return;
1387 }
1388 free_queue_entry (qe);
1389 process_queue (h);
1390}
1391
1392
1393/* end of datastore_api.c */