diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 1474 |
1 files changed, 0 insertions, 1474 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c deleted file mode 100644 index a49bc8586..000000000 --- a/src/datastore/datastore_api.c +++ /dev/null | |||
@@ -1,1474 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | Copyright (C) 2004-2013, 2016 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file datastore/datastore_api.c | ||
23 | * @brief Management for the datastore for files stored on a GNUnet node. Implements | ||
24 | * a priority queue for requests | ||
25 | * @author Christian Grothoff | ||
26 | */ | ||
27 | #include "platform.h" | ||
28 | #include "gnunet_arm_service.h" | ||
29 | #include "gnunet_constants.h" | ||
30 | #include "gnunet_datastore_service.h" | ||
31 | #include "gnunet_statistics_service.h" | ||
32 | #include "datastore.h" | ||
33 | |||
34 | #define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__) | ||
35 | |||
36 | #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES | ||
37 | |||
38 | /** | ||
39 | * Collect an instance number of statistics? May cause excessive IPC. | ||
40 | */ | ||
41 | #define INSANE_STATISTICS GNUNET_NO | ||
42 | |||
43 | /** | ||
44 | * If a client stopped asking for more results, how many more do | ||
45 | * we receive from the DB before killing the connection? Trade-off | ||
46 | * between re-doing TCP handshakes and (needlessly) receiving | ||
47 | * useless results. | ||
48 | */ | ||
49 | #define MAX_EXCESS_RESULTS 8 | ||
50 | |||
51 | /** | ||
52 | * Context for processing status messages. | ||
53 | */ | ||
54 | struct StatusContext | ||
55 | { | ||
56 | /** | ||
57 | * Continuation to call with the status. | ||
58 | */ | ||
59 | GNUNET_DATASTORE_ContinuationWithStatus cont; | ||
60 | |||
61 | /** | ||
62 | * Closure for @e cont. | ||
63 | */ | ||
64 | void *cont_cls; | ||
65 | }; | ||
66 | |||
67 | |||
68 | /** | ||
69 | * Context for processing result messages. | ||
70 | */ | ||
71 | struct ResultContext | ||
72 | { | ||
73 | /** | ||
74 | * Function to call with the result. | ||
75 | */ | ||
76 | GNUNET_DATASTORE_DatumProcessor proc; | ||
77 | |||
78 | /** | ||
79 | * Closure for @e proc. | ||
80 | */ | ||
81 | void *proc_cls; | ||
82 | }; | ||
83 | |||
84 | |||
85 | /** | ||
86 | * Context for a queue operation. | ||
87 | */ | ||
88 | union QueueContext | ||
89 | { | ||
90 | struct StatusContext sc; | ||
91 | |||
92 | struct ResultContext rc; | ||
93 | }; | ||
94 | |||
95 | |||
96 | /** | ||
97 | * Entry in our priority queue. | ||
98 | */ | ||
99 | struct GNUNET_DATASTORE_QueueEntry | ||
100 | { | ||
101 | /** | ||
102 | * This is a linked list. | ||
103 | */ | ||
104 | struct GNUNET_DATASTORE_QueueEntry *next; | ||
105 | |||
106 | /** | ||
107 | * This is a linked list. | ||
108 | */ | ||
109 | struct GNUNET_DATASTORE_QueueEntry *prev; | ||
110 | |||
111 | /** | ||
112 | * Handle to the master context. | ||
113 | */ | ||
114 | struct GNUNET_DATASTORE_Handle *h; | ||
115 | |||
116 | /** | ||
117 | * Function to call after transmission of the request. | ||
118 | */ | ||
119 | GNUNET_DATASTORE_ContinuationWithStatus cont; | ||
120 | |||
121 | /** | ||
122 | * Closure for @e cont. | ||
123 | */ | ||
124 | void *cont_cls; | ||
125 | |||
126 | /** | ||
127 | * Context for the operation. | ||
128 | */ | ||
129 | union QueueContext qc; | ||
130 | |||
131 | /** | ||
132 | * Envelope of the request to transmit, NULL after | ||
133 | * transmission. | ||
134 | */ | ||
135 | struct GNUNET_MQ_Envelope *env; | ||
136 | |||
137 | /** | ||
138 | * Task we run if this entry stalls the queue and we | ||
139 | * need to warn the user. | ||
140 | */ | ||
141 | struct GNUNET_SCHEDULER_Task *delay_warn_task; | ||
142 | |||
143 | /** | ||
144 | * Priority in the queue. | ||
145 | */ | ||
146 | unsigned int priority; | ||
147 | |||
148 | /** | ||
149 | * Maximum allowed length of queue (otherwise | ||
150 | * this request should be discarded). | ||
151 | */ | ||
152 | unsigned int max_queue; | ||
153 | |||
154 | /** | ||
155 | * Expected response type. | ||
156 | */ | ||
157 | uint16_t response_type; | ||
158 | }; | ||
159 | |||
160 | |||
161 | /** | ||
162 | * Handle to the datastore service. | ||
163 | */ | ||
164 | struct GNUNET_DATASTORE_Handle | ||
165 | { | ||
166 | /** | ||
167 | * Our configuration. | ||
168 | */ | ||
169 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
170 | |||
171 | /** | ||
172 | * Current connection to the datastore service. | ||
173 | */ | ||
174 | struct GNUNET_MQ_Handle *mq; | ||
175 | |||
176 | /** | ||
177 | * Handle for statistics. | ||
178 | */ | ||
179 | struct GNUNET_STATISTICS_Handle *stats; | ||
180 | |||
181 | /** | ||
182 | * Current head of priority queue. | ||
183 | */ | ||
184 | struct GNUNET_DATASTORE_QueueEntry *queue_head; | ||
185 | |||
186 | /** | ||
187 | * Current tail of priority queue. | ||
188 | */ | ||
189 | struct GNUNET_DATASTORE_QueueEntry *queue_tail; | ||
190 | |||
191 | /** | ||
192 | * Task for trying to reconnect. | ||
193 | */ | ||
194 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
195 | |||
196 | /** | ||
197 | * How quickly should we retry? Used for exponential back-off on | ||
198 | * connect-errors. | ||
199 | */ | ||
200 | struct GNUNET_TIME_Relative retry_time; | ||
201 | |||
202 | /** | ||
203 | * Number of entries in the queue. | ||
204 | */ | ||
205 | unsigned int queue_size; | ||
206 | |||
207 | /** | ||
208 | * Number of results we're receiving for the current query | ||
209 | * after application stopped to care. Used to determine when | ||
210 | * to reset the connection. | ||
211 | */ | ||
212 | unsigned int result_count; | ||
213 | |||
214 | /** | ||
215 | * We should ignore the next message(s) from the service. | ||
216 | */ | ||
217 | unsigned int skip_next_messages; | ||
218 | }; | ||
219 | |||
220 | |||
221 | /** | ||
222 | * Try reconnecting to the datastore service. | ||
223 | * | ||
224 | * @param cls the `struct GNUNET_DATASTORE_Handle` | ||
225 | */ | ||
226 | static void | ||
227 | try_reconnect (void *cls); | ||
228 | |||
229 | |||
230 | /** | ||
231 | * Disconnect from the service and then try reconnecting to the datastore service | ||
232 | * after some delay. | ||
233 | * | ||
234 | * @param h handle to datastore to disconnect and reconnect | ||
235 | */ | ||
236 | static void | ||
237 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) | ||
238 | { | ||
239 | if (NULL == h->mq) | ||
240 | { | ||
241 | GNUNET_break (0); | ||
242 | return; | ||
243 | } | ||
244 | GNUNET_MQ_destroy (h->mq); | ||
245 | h->mq = NULL; | ||
246 | h->skip_next_messages = 0; | ||
247 | h->reconnect_task | ||
248 | = GNUNET_SCHEDULER_add_delayed (h->retry_time, | ||
249 | &try_reconnect, | ||
250 | h); | ||
251 | } | ||
252 | |||
253 | |||
254 | /** | ||
255 | * Free a queue entry. Removes the given entry from the | ||
256 | * queue and releases associated resources. Does NOT | ||
257 | * call the callback. | ||
258 | * | ||
259 | * @param qe entry to free. | ||
260 | */ | ||
261 | static void | ||
262 | free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | ||
263 | { | ||
264 | struct GNUNET_DATASTORE_Handle *h = qe->h; | ||
265 | |||
266 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
267 | h->queue_tail, | ||
268 | qe); | ||
269 | h->queue_size--; | ||
270 | if (NULL != qe->env) | ||
271 | GNUNET_MQ_discard (qe->env); | ||
272 | if (NULL != qe->delay_warn_task) | ||
273 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
274 | GNUNET_free (qe); | ||
275 | } | ||
276 | |||
277 | |||
278 | /** | ||
279 | * Task that logs an error after some time. | ||
280 | * | ||
281 | * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is | ||
282 | */ | ||
283 | static void | ||
284 | delay_warning (void *cls) | ||
285 | { | ||
286 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | ||
287 | |||
288 | qe->delay_warn_task = NULL; | ||
289 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
290 | "Request %p of type %u at head of datastore queue for more than %s\n", | ||
291 | qe, | ||
292 | (unsigned int) qe->response_type, | ||
293 | GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT, | ||
294 | GNUNET_YES)); | ||
295 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
296 | &delay_warning, | ||
297 | qe); | ||
298 | } | ||
299 | |||
300 | |||
301 | /** | ||
302 | * Handle error in sending drop request to datastore. | ||
303 | * | ||
304 | * @param cls closure with the datastore handle | ||
305 | * @param error error code | ||
306 | */ | ||
307 | static void | ||
308 | mq_error_handler (void *cls, | ||
309 | enum GNUNET_MQ_Error error) | ||
310 | { | ||
311 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
312 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
313 | |||
314 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
315 | "MQ error, reconnecting to DATASTORE\n"); | ||
316 | do_disconnect (h); | ||
317 | qe = h->queue_head; | ||
318 | if (NULL == qe) | ||
319 | return; | ||
320 | if (NULL != qe->delay_warn_task) | ||
321 | { | ||
322 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
323 | qe->delay_warn_task = NULL; | ||
324 | } | ||
325 | if (NULL == qe->env) | ||
326 | { | ||
327 | union QueueContext qc = qe->qc; | ||
328 | uint16_t rt = qe->response_type; | ||
329 | |||
330 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
331 | "Failed to receive response from database.\n"); | ||
332 | free_queue_entry (qe); | ||
333 | switch (rt) | ||
334 | { | ||
335 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | ||
336 | if (NULL != qc.sc.cont) | ||
337 | qc.sc.cont (qc.sc.cont_cls, | ||
338 | GNUNET_SYSERR, | ||
339 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
340 | _ ("DATASTORE disconnected")); | ||
341 | break; | ||
342 | |||
343 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | ||
344 | if (NULL != qc.rc.proc) | ||
345 | qc.rc.proc (qc.rc.proc_cls, | ||
346 | NULL, | ||
347 | 0, | ||
348 | NULL, | ||
349 | 0, | ||
350 | 0, | ||
351 | 0, | ||
352 | 0, | ||
353 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
354 | 0); | ||
355 | break; | ||
356 | |||
357 | default: | ||
358 | GNUNET_break (0); | ||
359 | } | ||
360 | } | ||
361 | } | ||
362 | |||
363 | |||
364 | /** | ||
365 | * Connect to the datastore service. | ||
366 | * | ||
367 | * @param cfg configuration to use | ||
368 | * @return handle to use to access the service | ||
369 | */ | ||
370 | struct GNUNET_DATASTORE_Handle * | ||
371 | GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
372 | { | ||
373 | struct GNUNET_DATASTORE_Handle *h; | ||
374 | |||
375 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
376 | "Establishing DATASTORE connection!\n"); | ||
377 | h = GNUNET_new (struct GNUNET_DATASTORE_Handle); | ||
378 | h->cfg = cfg; | ||
379 | try_reconnect (h); | ||
380 | if (NULL == h->mq) | ||
381 | { | ||
382 | GNUNET_free (h); | ||
383 | return NULL; | ||
384 | } | ||
385 | h->stats = GNUNET_STATISTICS_create ("datastore-api", | ||
386 | cfg); | ||
387 | return h; | ||
388 | } | ||
389 | |||
390 | |||
391 | /** | ||
392 | * Task used by to disconnect from the datastore after | ||
393 | * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. | ||
394 | * | ||
395 | * @param cls the datastore handle | ||
396 | */ | ||
397 | static void | ||
398 | disconnect_after_drop (void *cls) | ||
399 | { | ||
400 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
401 | |||
402 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
403 | "Drop sent, disconnecting\n"); | ||
404 | GNUNET_DATASTORE_disconnect (h, | ||
405 | GNUNET_NO); | ||
406 | } | ||
407 | |||
408 | |||
409 | /** | ||
410 | * Handle error in sending drop request to datastore. | ||
411 | * | ||
412 | * @param cls closure with the datastore handle | ||
413 | * @param error error code | ||
414 | */ | ||
415 | static void | ||
416 | disconnect_on_mq_error (void *cls, | ||
417 | enum GNUNET_MQ_Error error) | ||
418 | { | ||
419 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
420 | |||
421 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
422 | "Failed to ask datastore to drop tables\n"); | ||
423 | GNUNET_DATASTORE_disconnect (h, | ||
424 | GNUNET_NO); | ||
425 | } | ||
426 | |||
427 | |||
428 | /** | ||
429 | * Disconnect from the datastore service (and free | ||
430 | * associated resources). | ||
431 | * | ||
432 | * @param h handle to the datastore | ||
433 | * @param drop set to #GNUNET_YES to delete all data in datastore (!) | ||
434 | */ | ||
435 | void | ||
436 | GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | ||
437 | int drop) | ||
438 | { | ||
439 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
440 | |||
441 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
442 | "Datastore disconnect\n"); | ||
443 | if (NULL != h->mq) | ||
444 | { | ||
445 | GNUNET_MQ_destroy (h->mq); | ||
446 | h->mq = NULL; | ||
447 | } | ||
448 | if (NULL != h->reconnect_task) | ||
449 | { | ||
450 | GNUNET_SCHEDULER_cancel (h->reconnect_task); | ||
451 | h->reconnect_task = NULL; | ||
452 | } | ||
453 | while (NULL != (qe = h->queue_head)) | ||
454 | { | ||
455 | switch (qe->response_type) | ||
456 | { | ||
457 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | ||
458 | if (NULL != qe->qc.sc.cont) | ||
459 | qe->qc.sc.cont (qe->qc.sc.cont_cls, | ||
460 | GNUNET_SYSERR, | ||
461 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
462 | _ ("Disconnected from DATASTORE")); | ||
463 | break; | ||
464 | |||
465 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | ||
466 | if (NULL != qe->qc.rc.proc) | ||
467 | qe->qc.rc.proc (qe->qc.rc.proc_cls, | ||
468 | NULL, | ||
469 | 0, | ||
470 | NULL, | ||
471 | 0, | ||
472 | 0, | ||
473 | 0, | ||
474 | 0, | ||
475 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
476 | 0); | ||
477 | break; | ||
478 | |||
479 | default: | ||
480 | GNUNET_break (0); | ||
481 | } | ||
482 | free_queue_entry (qe); | ||
483 | } | ||
484 | if (GNUNET_YES == drop) | ||
485 | { | ||
486 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
487 | "Re-connecting to issue DROP!\n"); | ||
488 | GNUNET_assert (NULL == h->mq); | ||
489 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
490 | "datastore", | ||
491 | NULL, | ||
492 | &disconnect_on_mq_error, | ||
493 | h); | ||
494 | if (NULL != h->mq) | ||
495 | { | ||
496 | struct GNUNET_MessageHeader *hdr; | ||
497 | struct GNUNET_MQ_Envelope *env; | ||
498 | |||
499 | env = GNUNET_MQ_msg (hdr, | ||
500 | GNUNET_MESSAGE_TYPE_DATASTORE_DROP); | ||
501 | GNUNET_MQ_notify_sent (env, | ||
502 | &disconnect_after_drop, | ||
503 | h); | ||
504 | GNUNET_MQ_send (h->mq, | ||
505 | env); | ||
506 | return; | ||
507 | } | ||
508 | GNUNET_break (0); | ||
509 | } | ||
510 | GNUNET_STATISTICS_destroy (h->stats, | ||
511 | GNUNET_NO); | ||
512 | h->stats = NULL; | ||
513 | GNUNET_free (h); | ||
514 | } | ||
515 | |||
516 | |||
517 | /** | ||
518 | * Create a new entry for our priority queue (and possibly discard other entries if | ||
519 | * the queue is getting too long). | ||
520 | * | ||
521 | * @param h handle to the datastore | ||
522 | * @param env envelope with the message to queue | ||
523 | * @param queue_priority priority of the entry | ||
524 | * @param max_queue_size at what queue size should this request be dropped | ||
525 | * (if other requests of higher priority are in the queue) | ||
526 | * @param expected_type which type of response do we expect, | ||
527 | * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or | ||
528 | * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA | ||
529 | * @param qc client context (NOT a closure for @a response_proc) | ||
530 | * @return NULL if the queue is full | ||
531 | */ | ||
532 | static struct GNUNET_DATASTORE_QueueEntry * | ||
533 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | ||
534 | struct GNUNET_MQ_Envelope *env, | ||
535 | unsigned int queue_priority, | ||
536 | unsigned int max_queue_size, | ||
537 | uint16_t expected_type, | ||
538 | const union QueueContext *qc) | ||
539 | { | ||
540 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
541 | struct GNUNET_DATASTORE_QueueEntry *pos; | ||
542 | unsigned int c; | ||
543 | |||
544 | if ((NULL != h->queue_tail) && | ||
545 | (h->queue_tail->priority >= queue_priority)) | ||
546 | { | ||
547 | c = h->queue_size; | ||
548 | pos = NULL; | ||
549 | } | ||
550 | else | ||
551 | { | ||
552 | c = 0; | ||
553 | pos = h->queue_head; | ||
554 | } | ||
555 | while ((NULL != pos) && | ||
556 | (c < max_queue_size) && | ||
557 | (pos->priority >= queue_priority)) | ||
558 | { | ||
559 | c++; | ||
560 | pos = pos->next; | ||
561 | } | ||
562 | if (c >= max_queue_size) | ||
563 | { | ||
564 | GNUNET_STATISTICS_update (h->stats, | ||
565 | gettext_noop ("# queue overflows"), | ||
566 | 1, | ||
567 | GNUNET_NO); | ||
568 | GNUNET_MQ_discard (env); | ||
569 | return NULL; | ||
570 | } | ||
571 | qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); | ||
572 | qe->h = h; | ||
573 | qe->env = env; | ||
574 | qe->response_type = expected_type; | ||
575 | qe->qc = *qc; | ||
576 | qe->priority = queue_priority; | ||
577 | qe->max_queue = max_queue_size; | ||
578 | if (NULL == pos) | ||
579 | { | ||
580 | /* append at the tail */ | ||
581 | pos = h->queue_tail; | ||
582 | } | ||
583 | else | ||
584 | { | ||
585 | pos = pos->prev; | ||
586 | /* do not insert at HEAD if HEAD query was already | ||
587 | * transmitted and we are still receiving replies! */ | ||
588 | if ((NULL == pos) && | ||
589 | (NULL == h->queue_head->env)) | ||
590 | pos = h->queue_head; | ||
591 | } | ||
592 | c++; | ||
593 | #if INSANE_STATISTICS | ||
594 | GNUNET_STATISTICS_update (h->stats, | ||
595 | gettext_noop ("# queue entries created"), | ||
596 | 1, | ||
597 | GNUNET_NO); | ||
598 | #endif | ||
599 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, | ||
600 | h->queue_tail, | ||
601 | pos, | ||
602 | qe); | ||
603 | h->queue_size++; | ||
604 | return qe; | ||
605 | } | ||
606 | |||
607 | |||
608 | /** | ||
609 | * Process entries in the queue (or do nothing if we are already | ||
610 | * doing so). | ||
611 | * | ||
612 | * @param h handle to the datastore | ||
613 | */ | ||
614 | static void | ||
615 | process_queue (struct GNUNET_DATASTORE_Handle *h) | ||
616 | { | ||
617 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
618 | |||
619 | if (NULL == (qe = h->queue_head)) | ||
620 | { | ||
621 | /* no entry in queue */ | ||
622 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
623 | "Queue empty\n"); | ||
624 | return; | ||
625 | } | ||
626 | if (NULL == qe->env) | ||
627 | { | ||
628 | /* waiting for replies */ | ||
629 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
630 | "Head request already transmitted\n"); | ||
631 | return; | ||
632 | } | ||
633 | if (NULL == h->mq) | ||
634 | { | ||
635 | /* waiting for reconnect */ | ||
636 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
637 | "Not connected\n"); | ||
638 | return; | ||
639 | } | ||
640 | GNUNET_assert (NULL == qe->delay_warn_task); | ||
641 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
642 | &delay_warning, | ||
643 | qe); | ||
644 | GNUNET_MQ_send (h->mq, | ||
645 | qe->env); | ||
646 | qe->env = NULL; | ||
647 | } | ||
648 | |||
649 | |||
650 | /** | ||
651 | * Get the entry at the head of the message queue. | ||
652 | * | ||
653 | * @param h handle to the datastore | ||
654 | * @param response_type the expected response type | ||
655 | * @return the queue entry | ||
656 | */ | ||
657 | static struct GNUNET_DATASTORE_QueueEntry * | ||
658 | get_queue_head (struct GNUNET_DATASTORE_Handle *h, | ||
659 | uint16_t response_type) | ||
660 | { | ||
661 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
662 | |||
663 | if (h->skip_next_messages > 0) | ||
664 | { | ||
665 | h->skip_next_messages--; | ||
666 | process_queue (h); | ||
667 | return NULL; | ||
668 | } | ||
669 | qe = h->queue_head; | ||
670 | if (NULL == qe) | ||
671 | { | ||
672 | GNUNET_break (0); | ||
673 | do_disconnect (h); | ||
674 | return NULL; | ||
675 | } | ||
676 | if (NULL != qe->env) | ||
677 | { | ||
678 | GNUNET_break (0); | ||
679 | do_disconnect (h); | ||
680 | return NULL; | ||
681 | } | ||
682 | if (response_type != qe->response_type) | ||
683 | { | ||
684 | GNUNET_break (0); | ||
685 | do_disconnect (h); | ||
686 | return NULL; | ||
687 | } | ||
688 | return qe; | ||
689 | } | ||
690 | |||
691 | |||
692 | /** | ||
693 | * Function called to check status message from the service. | ||
694 | * | ||
695 | * @param cls closure | ||
696 | * @param sm status message received | ||
697 | * @return #GNUNET_OK if the message is well-formed | ||
698 | */ | ||
699 | static int | ||
700 | check_status (void *cls, | ||
701 | const struct StatusMessage *sm) | ||
702 | { | ||
703 | uint16_t msize = ntohs (sm->header.size) - sizeof(*sm); | ||
704 | int32_t status = ntohl (sm->status); | ||
705 | |||
706 | if (msize > 0) | ||
707 | { | ||
708 | const char *emsg = (const char *) &sm[1]; | ||
709 | |||
710 | if ('\0' != emsg[msize - 1]) | ||
711 | { | ||
712 | GNUNET_break (0); | ||
713 | return GNUNET_SYSERR; | ||
714 | } | ||
715 | } | ||
716 | else if (GNUNET_SYSERR == status) | ||
717 | { | ||
718 | GNUNET_break (0); | ||
719 | return GNUNET_SYSERR; | ||
720 | } | ||
721 | return GNUNET_OK; | ||
722 | } | ||
723 | |||
724 | |||
725 | /** | ||
726 | * Function called to handle status message from the service. | ||
727 | * | ||
728 | * @param cls closure | ||
729 | * @param sm status message received | ||
730 | */ | ||
731 | static void | ||
732 | handle_status (void *cls, | ||
733 | const struct StatusMessage *sm) | ||
734 | { | ||
735 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
736 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
737 | struct StatusContext rc; | ||
738 | const char *emsg; | ||
739 | int32_t status = ntohl (sm->status); | ||
740 | |||
741 | qe = get_queue_head (h, | ||
742 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); | ||
743 | if (NULL == qe) | ||
744 | return; | ||
745 | rc = qe->qc.sc; | ||
746 | free_queue_entry (qe); | ||
747 | if (ntohs (sm->header.size) > sizeof(struct StatusMessage)) | ||
748 | emsg = (const char *) &sm[1]; | ||
749 | else | ||
750 | emsg = NULL; | ||
751 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
752 | "Received status %d/%s\n", | ||
753 | (int) status, | ||
754 | emsg); | ||
755 | GNUNET_STATISTICS_update (h->stats, | ||
756 | gettext_noop ("# status messages received"), | ||
757 | 1, | ||
758 | GNUNET_NO); | ||
759 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
760 | process_queue (h); | ||
761 | if (NULL != rc.cont) | ||
762 | rc.cont (rc.cont_cls, | ||
763 | status, | ||
764 | GNUNET_TIME_absolute_ntoh (sm->min_expiration), | ||
765 | emsg); | ||
766 | } | ||
767 | |||
768 | |||
769 | /** | ||
770 | * Check data message we received from the service. | ||
771 | * | ||
772 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` | ||
773 | * @param dm message received | ||
774 | */ | ||
775 | static int | ||
776 | check_data (void *cls, | ||
777 | const struct DataMessage *dm) | ||
778 | { | ||
779 | uint16_t msize = ntohs (dm->header.size) - sizeof(*dm); | ||
780 | |||
781 | if (msize != ntohl (dm->size)) | ||
782 | { | ||
783 | GNUNET_break (0); | ||
784 | return GNUNET_SYSERR; | ||
785 | } | ||
786 | return GNUNET_OK; | ||
787 | } | ||
788 | |||
789 | |||
790 | /** | ||
791 | * Handle data message we got from the service. | ||
792 | * | ||
793 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` | ||
794 | * @param dm message received | ||
795 | */ | ||
796 | static void | ||
797 | handle_data (void *cls, | ||
798 | const struct DataMessage *dm) | ||
799 | { | ||
800 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
801 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
802 | struct ResultContext rc; | ||
803 | |||
804 | qe = get_queue_head (h, | ||
805 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); | ||
806 | if (NULL == qe) | ||
807 | return; | ||
808 | #if INSANE_STATISTICS | ||
809 | GNUNET_STATISTICS_update (h->stats, | ||
810 | gettext_noop ("# Results received"), | ||
811 | 1, | ||
812 | GNUNET_NO); | ||
813 | #endif | ||
814 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
815 | "Received result %llu with type %u and size %u with key %s\n", | ||
816 | (unsigned long long) GNUNET_ntohll (dm->uid), | ||
817 | ntohl (dm->type), | ||
818 | ntohl (dm->size), | ||
819 | GNUNET_h2s (&dm->key)); | ||
820 | rc = qe->qc.rc; | ||
821 | free_queue_entry (qe); | ||
822 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
823 | process_queue (h); | ||
824 | if (NULL != rc.proc) | ||
825 | rc.proc (rc.proc_cls, | ||
826 | &dm->key, | ||
827 | ntohl (dm->size), | ||
828 | &dm[1], | ||
829 | ntohl (dm->type), | ||
830 | ntohl (dm->priority), | ||
831 | ntohl (dm->anonymity), | ||
832 | ntohl (dm->replication), | ||
833 | GNUNET_TIME_absolute_ntoh (dm->expiration), | ||
834 | GNUNET_ntohll (dm->uid)); | ||
835 | } | ||
836 | |||
837 | |||
838 | /** | ||
839 | * Type of a function to call when we receive a | ||
840 | * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. | ||
841 | * | ||
842 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` | ||
843 | * @param msg message received | ||
844 | */ | ||
845 | static void | ||
846 | handle_data_end (void *cls, | ||
847 | const struct GNUNET_MessageHeader *msg) | ||
848 | { | ||
849 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
850 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
851 | struct ResultContext rc; | ||
852 | |||
853 | qe = get_queue_head (h, | ||
854 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); | ||
855 | if (NULL == qe) | ||
856 | return; | ||
857 | rc = qe->qc.rc; | ||
858 | free_queue_entry (qe); | ||
859 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
860 | "Received end of result set, new queue size is %u\n", | ||
861 | h->queue_size); | ||
862 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
863 | h->result_count = 0; | ||
864 | process_queue (h); | ||
865 | /* signal end of iteration */ | ||
866 | if (NULL != rc.proc) | ||
867 | rc.proc (rc.proc_cls, | ||
868 | NULL, | ||
869 | 0, | ||
870 | NULL, | ||
871 | 0, | ||
872 | 0, | ||
873 | 0, | ||
874 | 0, | ||
875 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
876 | 0); | ||
877 | } | ||
878 | |||
879 | |||
880 | /** | ||
881 | * Try reconnecting to the datastore service. | ||
882 | * | ||
883 | * @param cls the `struct GNUNET_DATASTORE_Handle` | ||
884 | */ | ||
885 | static void | ||
886 | try_reconnect (void *cls) | ||
887 | { | ||
888 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
889 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
890 | GNUNET_MQ_hd_var_size (status, | ||
891 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
892 | struct StatusMessage, | ||
893 | h), | ||
894 | GNUNET_MQ_hd_var_size (data, | ||
895 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
896 | struct DataMessage, | ||
897 | h), | ||
898 | GNUNET_MQ_hd_fixed_size (data_end, | ||
899 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, | ||
900 | struct GNUNET_MessageHeader, | ||
901 | h), | ||
902 | GNUNET_MQ_handler_end () | ||
903 | }; | ||
904 | |||
905 | h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); | ||
906 | h->reconnect_task = NULL; | ||
907 | GNUNET_assert (NULL == h->mq); | ||
908 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
909 | "datastore", | ||
910 | handlers, | ||
911 | &mq_error_handler, | ||
912 | h); | ||
913 | if (NULL == h->mq) | ||
914 | return; | ||
915 | GNUNET_STATISTICS_update (h->stats, | ||
916 | gettext_noop ( | ||
917 | "# datastore connections (re)created"), | ||
918 | 1, | ||
919 | GNUNET_NO); | ||
920 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
921 | "Reconnected to DATASTORE\n"); | ||
922 | process_queue (h); | ||
923 | } | ||
924 | |||
925 | |||
926 | /** | ||
927 | * Dummy continuation used to do nothing (but be non-zero). | ||
928 | * | ||
929 | * @param cls closure | ||
930 | * @param result result | ||
931 | * @param min_expiration expiration time | ||
932 | * @param emsg error message | ||
933 | */ | ||
934 | static void | ||
935 | drop_status_cont (void *cls, | ||
936 | int32_t result, | ||
937 | struct GNUNET_TIME_Absolute min_expiration, | ||
938 | const char *emsg) | ||
939 | { | ||
940 | /* do nothing */ | ||
941 | } | ||
942 | |||
943 | |||
944 | /** | ||
945 | * Store an item in the datastore. If the item is already present, | ||
946 | * the priorities are summed up and the higher expiration time and | ||
947 | * lower anonymity level is used. | ||
948 | * | ||
949 | * @param h handle to the datastore | ||
950 | * @param rid reservation ID to use (from "reserve"); use 0 if no | ||
951 | * prior reservation was made | ||
952 | * @param key key for the value | ||
953 | * @param size number of bytes in data | ||
954 | * @param data content stored | ||
955 | * @param type type of the content | ||
956 | * @param priority priority of the content | ||
957 | * @param anonymity anonymity-level for the content | ||
958 | * @param replication how often should the content be replicated to other peers? | ||
959 | * @param expiration expiration time for the content | ||
960 | * @param queue_priority ranking of this request in the priority queue | ||
961 | * @param max_queue_size at what queue size should this request be dropped | ||
962 | * (if other requests of higher priority are in the queue) | ||
963 | * @param cont continuation to call when done | ||
964 | * @param cont_cls closure for @a cont | ||
965 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
966 | * cancel; note that even if NULL is returned, the callback will be invoked | ||
967 | * (or rather, will already have been invoked) | ||
968 | */ | ||
969 | struct GNUNET_DATASTORE_QueueEntry * | ||
970 | GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | ||
971 | uint32_t rid, | ||
972 | const struct GNUNET_HashCode *key, | ||
973 | size_t size, | ||
974 | const void *data, | ||
975 | enum GNUNET_BLOCK_Type type, | ||
976 | uint32_t priority, | ||
977 | uint32_t anonymity, | ||
978 | uint32_t replication, | ||
979 | struct GNUNET_TIME_Absolute expiration, | ||
980 | unsigned int queue_priority, | ||
981 | unsigned int max_queue_size, | ||
982 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
983 | void *cont_cls) | ||
984 | { | ||
985 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
986 | struct GNUNET_MQ_Envelope *env; | ||
987 | struct DataMessage *dm; | ||
988 | union QueueContext qc; | ||
989 | |||
990 | if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE) | ||
991 | { | ||
992 | GNUNET_break (0); | ||
993 | return NULL; | ||
994 | } | ||
995 | |||
996 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
997 | "Asked to put %lu bytes of data under key `%s' for %s\n", | ||
998 | (unsigned long) size, | ||
999 | GNUNET_h2s (key), | ||
1000 | GNUNET_STRINGS_relative_time_to_string ( | ||
1001 | GNUNET_TIME_absolute_get_remaining (expiration), | ||
1002 | GNUNET_YES)); | ||
1003 | env = GNUNET_MQ_msg_extra (dm, | ||
1004 | size, | ||
1005 | GNUNET_MESSAGE_TYPE_DATASTORE_PUT); | ||
1006 | dm->rid = htonl (rid); | ||
1007 | dm->size = htonl ((uint32_t) size); | ||
1008 | dm->type = htonl (type); | ||
1009 | dm->priority = htonl (priority); | ||
1010 | dm->anonymity = htonl (anonymity); | ||
1011 | dm->replication = htonl (replication); | ||
1012 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1013 | dm->key = *key; | ||
1014 | GNUNET_memcpy (&dm[1], | ||
1015 | data, | ||
1016 | size); | ||
1017 | qc.sc.cont = cont; | ||
1018 | qc.sc.cont_cls = cont_cls; | ||
1019 | qe = make_queue_entry (h, | ||
1020 | env, | ||
1021 | queue_priority, | ||
1022 | max_queue_size, | ||
1023 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1024 | &qc); | ||
1025 | if (NULL == qe) | ||
1026 | { | ||
1027 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1028 | "Could not create queue entry for PUT\n"); | ||
1029 | return NULL; | ||
1030 | } | ||
1031 | GNUNET_STATISTICS_update (h->stats, | ||
1032 | gettext_noop ("# PUT requests executed"), | ||
1033 | 1, | ||
1034 | GNUNET_NO); | ||
1035 | process_queue (h); | ||
1036 | return qe; | ||
1037 | } | ||
1038 | |||
1039 | |||
1040 | /** | ||
1041 | * Reserve space in the datastore. This function should be used | ||
1042 | * to avoid "out of space" failures during a longer sequence of "put" | ||
1043 | * operations (for example, when a file is being inserted). | ||
1044 | * | ||
1045 | * @param h handle to the datastore | ||
1046 | * @param amount how much space (in bytes) should be reserved (for content only) | ||
1047 | * @param entries how many entries will be created (to calculate per-entry overhead) | ||
1048 | * @param cont continuation to call when done; "success" will be set to | ||
1049 | * a positive reservation value if space could be reserved. | ||
1050 | * @param cont_cls closure for @a cont | ||
1051 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1052 | * cancel; note that even if NULL is returned, the callback will be invoked | ||
1053 | * (or rather, will already have been invoked) | ||
1054 | */ | ||
1055 | struct GNUNET_DATASTORE_QueueEntry * | ||
1056 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | ||
1057 | uint64_t amount, | ||
1058 | uint32_t entries, | ||
1059 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
1060 | void *cont_cls) | ||
1061 | { | ||
1062 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1063 | struct GNUNET_MQ_Envelope *env; | ||
1064 | struct ReserveMessage *rm; | ||
1065 | union QueueContext qc; | ||
1066 | |||
1067 | if (NULL == cont) | ||
1068 | cont = &drop_status_cont; | ||
1069 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1070 | "Asked to reserve %llu bytes of data and %u entries\n", | ||
1071 | (unsigned long long) amount, | ||
1072 | (unsigned int) entries); | ||
1073 | env = GNUNET_MQ_msg (rm, | ||
1074 | GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); | ||
1075 | rm->entries = htonl (entries); | ||
1076 | rm->amount = GNUNET_htonll (amount); | ||
1077 | |||
1078 | qc.sc.cont = cont; | ||
1079 | qc.sc.cont_cls = cont_cls; | ||
1080 | qe = make_queue_entry (h, | ||
1081 | env, | ||
1082 | UINT_MAX, | ||
1083 | UINT_MAX, | ||
1084 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1085 | &qc); | ||
1086 | if (NULL == qe) | ||
1087 | { | ||
1088 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1089 | "Could not create queue entry to reserve\n"); | ||
1090 | return NULL; | ||
1091 | } | ||
1092 | GNUNET_STATISTICS_update (h->stats, | ||
1093 | gettext_noop ("# RESERVE requests executed"), | ||
1094 | 1, | ||
1095 | GNUNET_NO); | ||
1096 | process_queue (h); | ||
1097 | return qe; | ||
1098 | } | ||
1099 | |||
1100 | |||
1101 | /** | ||
1102 | * Signal that all of the data for which a reservation was made has | ||
1103 | * been stored and that whatever excess space might have been reserved | ||
1104 | * can now be released. | ||
1105 | * | ||
1106 | * @param h handle to the datastore | ||
1107 | * @param rid reservation ID (value of "success" in original continuation | ||
1108 | * from the "reserve" function). | ||
1109 | * @param queue_priority ranking of this request in the priority queue | ||
1110 | * @param max_queue_size at what queue size should this request be dropped | ||
1111 | * (if other requests of higher priority are in the queue) | ||
1112 | * @param queue_priority ranking of this request in the priority queue | ||
1113 | * @param max_queue_size at what queue size should this request be dropped | ||
1114 | * (if other requests of higher priority are in the queue) | ||
1115 | * @param cont continuation to call when done | ||
1116 | * @param cont_cls closure for @a cont | ||
1117 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1118 | * cancel; note that even if NULL is returned, the callback will be invoked | ||
1119 | * (or rather, will already have been invoked) | ||
1120 | */ | ||
1121 | struct GNUNET_DATASTORE_QueueEntry * | ||
1122 | GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | ||
1123 | uint32_t rid, | ||
1124 | unsigned int queue_priority, | ||
1125 | unsigned int max_queue_size, | ||
1126 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
1127 | void *cont_cls) | ||
1128 | { | ||
1129 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1130 | struct GNUNET_MQ_Envelope *env; | ||
1131 | struct ReleaseReserveMessage *rrm; | ||
1132 | union QueueContext qc; | ||
1133 | |||
1134 | if (NULL == cont) | ||
1135 | cont = &drop_status_cont; | ||
1136 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1137 | "Asked to release reserve %d\n", | ||
1138 | rid); | ||
1139 | env = GNUNET_MQ_msg (rrm, | ||
1140 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); | ||
1141 | rrm->rid = htonl (rid); | ||
1142 | qc.sc.cont = cont; | ||
1143 | qc.sc.cont_cls = cont_cls; | ||
1144 | qe = make_queue_entry (h, | ||
1145 | env, | ||
1146 | queue_priority, | ||
1147 | max_queue_size, | ||
1148 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1149 | &qc); | ||
1150 | if (NULL == qe) | ||
1151 | { | ||
1152 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1153 | "Could not create queue entry to release reserve\n"); | ||
1154 | return NULL; | ||
1155 | } | ||
1156 | GNUNET_STATISTICS_update (h->stats, | ||
1157 | gettext_noop | ||
1158 | ("# RELEASE RESERVE requests executed"), 1, | ||
1159 | GNUNET_NO); | ||
1160 | process_queue (h); | ||
1161 | return qe; | ||
1162 | } | ||
1163 | |||
1164 | |||
1165 | /** | ||
1166 | * Explicitly remove some content from the database. | ||
1167 | * The @a cont continuation will be called with `status` | ||
1168 | * #GNUNET_OK" if content was removed, #GNUNET_NO | ||
1169 | * if no matching entry was found and #GNUNET_SYSERR | ||
1170 | * on all other types of errors. | ||
1171 | * | ||
1172 | * @param h handle to the datastore | ||
1173 | * @param key key for the value | ||
1174 | * @param size number of bytes in data | ||
1175 | * @param data content stored | ||
1176 | * @param queue_priority ranking of this request in the priority queue | ||
1177 | * @param max_queue_size at what queue size should this request be dropped | ||
1178 | * (if other requests of higher priority are in the queue) | ||
1179 | * @param cont continuation to call when done | ||
1180 | * @param cont_cls closure for @a cont | ||
1181 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1182 | * cancel; note that even if NULL is returned, the callback will be invoked | ||
1183 | * (or rather, will already have been invoked) | ||
1184 | */ | ||
1185 | struct GNUNET_DATASTORE_QueueEntry * | ||
1186 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | ||
1187 | const struct GNUNET_HashCode *key, | ||
1188 | size_t size, | ||
1189 | const void *data, | ||
1190 | unsigned int queue_priority, | ||
1191 | unsigned int max_queue_size, | ||
1192 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
1193 | void *cont_cls) | ||
1194 | { | ||
1195 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1196 | struct DataMessage *dm; | ||
1197 | struct GNUNET_MQ_Envelope *env; | ||
1198 | union QueueContext qc; | ||
1199 | |||
1200 | if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) | ||
1201 | { | ||
1202 | GNUNET_break (0); | ||
1203 | return NULL; | ||
1204 | } | ||
1205 | if (NULL == cont) | ||
1206 | cont = &drop_status_cont; | ||
1207 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1208 | "Asked to remove %lu bytes under key `%s'\n", | ||
1209 | (unsigned long) size, | ||
1210 | GNUNET_h2s (key)); | ||
1211 | env = GNUNET_MQ_msg_extra (dm, | ||
1212 | size, | ||
1213 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | ||
1214 | dm->size = htonl (size); | ||
1215 | dm->key = *key; | ||
1216 | GNUNET_memcpy (&dm[1], | ||
1217 | data, | ||
1218 | size); | ||
1219 | |||
1220 | qc.sc.cont = cont; | ||
1221 | qc.sc.cont_cls = cont_cls; | ||
1222 | |||
1223 | qe = make_queue_entry (h, | ||
1224 | env, | ||
1225 | queue_priority, | ||
1226 | max_queue_size, | ||
1227 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1228 | &qc); | ||
1229 | if (NULL == qe) | ||
1230 | { | ||
1231 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1232 | "Could not create queue entry for REMOVE\n"); | ||
1233 | return NULL; | ||
1234 | } | ||
1235 | GNUNET_STATISTICS_update (h->stats, | ||
1236 | gettext_noop ("# REMOVE requests executed"), | ||
1237 | 1, | ||
1238 | GNUNET_NO); | ||
1239 | process_queue (h); | ||
1240 | return qe; | ||
1241 | } | ||
1242 | |||
1243 | |||
1244 | /** | ||
1245 | * Get a random value from the datastore for content replication. | ||
1246 | * Returns a single, random value among those with the highest | ||
1247 | * replication score, lowering positive replication scores by one for | ||
1248 | * the chosen value (if only content with a replication score exists, | ||
1249 | * a random value is returned and replication scores are not changed). | ||
1250 | * | ||
1251 | * @param h handle to the datastore | ||
1252 | * @param queue_priority ranking of this request in the priority queue | ||
1253 | * @param max_queue_size at what queue size should this request be dropped | ||
1254 | * (if other requests of higher priority are in the queue) | ||
1255 | * @param proc function to call on a random value; it | ||
1256 | * will be called once with a value (if available) | ||
1257 | * and always once with a value of NULL. | ||
1258 | * @param proc_cls closure for @a proc | ||
1259 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1260 | * cancel | ||
1261 | */ | ||
1262 | struct GNUNET_DATASTORE_QueueEntry * | ||
1263 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | ||
1264 | unsigned int queue_priority, | ||
1265 | unsigned int max_queue_size, | ||
1266 | GNUNET_DATASTORE_DatumProcessor proc, | ||
1267 | void *proc_cls) | ||
1268 | { | ||
1269 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1270 | struct GNUNET_MQ_Envelope *env; | ||
1271 | struct GNUNET_MessageHeader *m; | ||
1272 | union QueueContext qc; | ||
1273 | |||
1274 | GNUNET_assert (NULL != proc); | ||
1275 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1276 | "Asked to get replication entry\n"); | ||
1277 | env = GNUNET_MQ_msg (m, | ||
1278 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); | ||
1279 | qc.rc.proc = proc; | ||
1280 | qc.rc.proc_cls = proc_cls; | ||
1281 | qe = make_queue_entry (h, | ||
1282 | env, | ||
1283 | queue_priority, | ||
1284 | max_queue_size, | ||
1285 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
1286 | &qc); | ||
1287 | if (NULL == qe) | ||
1288 | { | ||
1289 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1290 | "Could not create queue entry for GET REPLICATION\n"); | ||
1291 | return NULL; | ||
1292 | } | ||
1293 | GNUNET_STATISTICS_update (h->stats, | ||
1294 | gettext_noop | ||
1295 | ("# GET REPLICATION requests executed"), 1, | ||
1296 | GNUNET_NO); | ||
1297 | process_queue (h); | ||
1298 | return qe; | ||
1299 | } | ||
1300 | |||
1301 | |||
1302 | /** | ||
1303 | * Get a single zero-anonymity value from the datastore. | ||
1304 | * | ||
1305 | * @param h handle to the datastore | ||
1306 | * @param next_uid return the result with lowest uid >= next_uid | ||
1307 | * @param queue_priority ranking of this request in the priority queue | ||
1308 | * @param max_queue_size at what queue size should this request be dropped | ||
1309 | * (if other requests of higher priority are in the queue) | ||
1310 | * @param type allowed type for the operation (never zero) | ||
1311 | * @param proc function to call on a random value; it | ||
1312 | * will be called once with a value (if available) | ||
1313 | * or with NULL if none value exists. | ||
1314 | * @param proc_cls closure for @a proc | ||
1315 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1316 | * cancel | ||
1317 | */ | ||
1318 | struct GNUNET_DATASTORE_QueueEntry * | ||
1319 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | ||
1320 | uint64_t next_uid, | ||
1321 | unsigned int queue_priority, | ||
1322 | unsigned int max_queue_size, | ||
1323 | enum GNUNET_BLOCK_Type type, | ||
1324 | GNUNET_DATASTORE_DatumProcessor proc, | ||
1325 | void *proc_cls) | ||
1326 | { | ||
1327 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1328 | struct GNUNET_MQ_Envelope *env; | ||
1329 | struct GetZeroAnonymityMessage *m; | ||
1330 | union QueueContext qc; | ||
1331 | |||
1332 | GNUNET_assert (NULL != proc); | ||
1333 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | ||
1334 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1335 | "Asked to get a zero-anonymity entry of type %d\n", | ||
1336 | type); | ||
1337 | env = GNUNET_MQ_msg (m, | ||
1338 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | ||
1339 | m->type = htonl ((uint32_t) type); | ||
1340 | m->next_uid = GNUNET_htonll (next_uid); | ||
1341 | qc.rc.proc = proc; | ||
1342 | qc.rc.proc_cls = proc_cls; | ||
1343 | qe = make_queue_entry (h, | ||
1344 | env, | ||
1345 | queue_priority, | ||
1346 | max_queue_size, | ||
1347 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
1348 | &qc); | ||
1349 | if (NULL == qe) | ||
1350 | { | ||
1351 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1352 | "Could not create queue entry for zero-anonymity procation\n"); | ||
1353 | return NULL; | ||
1354 | } | ||
1355 | GNUNET_STATISTICS_update (h->stats, | ||
1356 | gettext_noop | ||
1357 | ("# GET ZERO ANONYMITY requests executed"), 1, | ||
1358 | GNUNET_NO); | ||
1359 | process_queue (h); | ||
1360 | return qe; | ||
1361 | } | ||
1362 | |||
1363 | |||
1364 | /** | ||
1365 | * Get a result for a particular key from the datastore. The processor | ||
1366 | * will only be called once. | ||
1367 | * | ||
1368 | * @param h handle to the datastore | ||
1369 | * @param next_uid return the result with lowest uid >= next_uid | ||
1370 | * @param random if true, return a random result instead of using next_uid | ||
1371 | * @param key maybe NULL (to match all entries) | ||
1372 | * @param type desired type, 0 for any | ||
1373 | * @param queue_priority ranking of this request in the priority queue | ||
1374 | * @param max_queue_size at what queue size should this request be dropped | ||
1375 | * (if other requests of higher priority are in the queue) | ||
1376 | * @param proc function to call on each matching value; | ||
1377 | * will be called once with a NULL value at the end | ||
1378 | * @param proc_cls closure for @a proc | ||
1379 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1380 | * cancel | ||
1381 | */ | ||
1382 | struct GNUNET_DATASTORE_QueueEntry * | ||
1383 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | ||
1384 | uint64_t next_uid, | ||
1385 | bool random, | ||
1386 | const struct GNUNET_HashCode *key, | ||
1387 | enum GNUNET_BLOCK_Type type, | ||
1388 | unsigned int queue_priority, | ||
1389 | unsigned int max_queue_size, | ||
1390 | GNUNET_DATASTORE_DatumProcessor proc, | ||
1391 | void *proc_cls) | ||
1392 | { | ||
1393 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1394 | struct GNUNET_MQ_Envelope *env; | ||
1395 | struct GetKeyMessage *gkm; | ||
1396 | struct GetMessage *gm; | ||
1397 | union QueueContext qc; | ||
1398 | |||
1399 | GNUNET_assert (NULL != proc); | ||
1400 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1401 | "Asked to look for data of type %u under key `%s'\n", | ||
1402 | (unsigned int) type, | ||
1403 | (NULL != key) ? GNUNET_h2s (key) : "NULL"); | ||
1404 | if (NULL == key) | ||
1405 | { | ||
1406 | env = GNUNET_MQ_msg (gm, | ||
1407 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); | ||
1408 | gm->type = htonl (type); | ||
1409 | gm->next_uid = GNUNET_htonll (next_uid); | ||
1410 | gm->random = random; | ||
1411 | } | ||
1412 | else | ||
1413 | { | ||
1414 | env = GNUNET_MQ_msg (gkm, | ||
1415 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); | ||
1416 | gkm->type = htonl (type); | ||
1417 | gkm->next_uid = GNUNET_htonll (next_uid); | ||
1418 | gkm->random = random; | ||
1419 | gkm->key = *key; | ||
1420 | } | ||
1421 | qc.rc.proc = proc; | ||
1422 | qc.rc.proc_cls = proc_cls; | ||
1423 | qe = make_queue_entry (h, | ||
1424 | env, | ||
1425 | queue_priority, | ||
1426 | max_queue_size, | ||
1427 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
1428 | &qc); | ||
1429 | if (NULL == qe) | ||
1430 | { | ||
1431 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1432 | "Could not queue request for `%s'\n", | ||
1433 | (NULL != key) ? GNUNET_h2s (key): "NULL"); | ||
1434 | return NULL; | ||
1435 | } | ||
1436 | #if INSANE_STATISTICS | ||
1437 | GNUNET_STATISTICS_update (h->stats, | ||
1438 | gettext_noop ("# GET requests executed"), | ||
1439 | 1, | ||
1440 | GNUNET_NO); | ||
1441 | #endif | ||
1442 | process_queue (h); | ||
1443 | return qe; | ||
1444 | } | ||
1445 | |||
1446 | |||
1447 | /** | ||
1448 | * Cancel a datastore operation. The final callback from the | ||
1449 | * operation must not have been done yet. | ||
1450 | * | ||
1451 | * @param qe operation to cancel | ||
1452 | */ | ||
1453 | void | ||
1454 | GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | ||
1455 | { | ||
1456 | struct GNUNET_DATASTORE_Handle *h = qe->h; | ||
1457 | |||
1458 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1459 | "Pending DATASTORE request %p cancelled (%d, %d)\n", | ||
1460 | qe, | ||
1461 | NULL == qe->env, | ||
1462 | h->queue_head == qe); | ||
1463 | if (NULL == qe->env) | ||
1464 | { | ||
1465 | free_queue_entry (qe); | ||
1466 | h->skip_next_messages++; | ||
1467 | return; | ||
1468 | } | ||
1469 | free_queue_entry (qe); | ||
1470 | process_queue (h); | ||
1471 | } | ||
1472 | |||
1473 | |||
1474 | /* end of datastore_api.c */ | ||