aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
commit3140154d46212e08e0d73ed891a66213a6813075 (patch)
tree018a55a3899207664b388fcf47a679ca54ca6fbf /src/datastore/datastore_api.c
parentd5fd881c2a044474b54ddf03b6ab8be8d2b75927 (diff)
downloadgnunet-3140154d46212e08e0d73ed891a66213a6813075.tar.gz
gnunet-3140154d46212e08e0d73ed891a66213a6813075.zip
refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql backend
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c1213
1 files changed, 579 insertions, 634 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index b2de3d35d..285634759 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -21,7 +21,7 @@
21/** 21/**
22 * @file datastore/datastore_api.c 22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements 23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests (with timeouts). 24 * a priority queue for requests
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27#include "platform.h" 27#include "platform.h"
@@ -95,7 +95,6 @@ union QueueContext
95}; 95};
96 96
97 97
98
99/** 98/**
100 * Entry in our priority queue. 99 * Entry in our priority queue.
101 */ 100 */
@@ -118,13 +117,6 @@ struct GNUNET_DATASTORE_QueueEntry
118 struct GNUNET_DATASTORE_Handle *h; 117 struct GNUNET_DATASTORE_Handle *h;
119 118
120 /** 119 /**
121 * Response processor (NULL if we are not waiting for a response).
122 * This struct should be used for the closure, function-specific
123 * arguments can be passed via 'qc'.
124 */
125 GNUNET_CLIENT_MessageHandler response_proc;
126
127 /**
128 * Function to call after transmission of the request. 120 * Function to call after transmission of the request.
129 */ 121 */
130 GNUNET_DATASTORE_ContinuationWithStatus cont; 122 GNUNET_DATASTORE_ContinuationWithStatus cont;
@@ -140,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry
140 union QueueContext qc; 132 union QueueContext qc;
141 133
142 /** 134 /**
143 * Task for timeout signalling. 135 * Envelope of the request to transmit, NULL after
136 * transmission.
144 */ 137 */
145 struct GNUNET_SCHEDULER_Task *task; 138 struct GNUNET_MQ_Envelope *env;
146
147 /**
148 * Timeout for the current operation.
149 */
150 struct GNUNET_TIME_Absolute timeout;
151 139
152 /** 140 /**
153 * Priority in the queue. 141 * Priority in the queue.
@@ -161,22 +149,13 @@ struct GNUNET_DATASTORE_QueueEntry
161 unsigned int max_queue; 149 unsigned int max_queue;
162 150
163 /** 151 /**
164 * Number of bytes in the request message following 152 * Expected response type.
165 * this struct. 32-bit value for nicer memory
166 * access (and overall struct alignment).
167 */ 153 */
168 uint32_t message_size; 154 uint16_t response_type;
169
170 /**
171 * Has this message been transmitted to the service?
172 * Only ever #GNUNET_YES for the head of the queue.
173 * Note that the overall struct should end at a
174 * multiple of 64 bits.
175 */
176 int was_transmitted;
177 155
178}; 156};
179 157
158
180/** 159/**
181 * Handle to the datastore service. 160 * Handle to the datastore service.
182 */ 161 */
@@ -191,7 +170,7 @@ struct GNUNET_DATASTORE_Handle
191 /** 170 /**
192 * Current connection to the datastore service. 171 * Current connection to the datastore service.
193 */ 172 */
194 struct GNUNET_CLIENT_Connection *client; 173 struct GNUNET_MQ_Handle *mq;
195 174
196 /** 175 /**
197 * Handle for statistics. 176 * Handle for statistics.
@@ -199,11 +178,6 @@ struct GNUNET_DATASTORE_Handle
199 struct GNUNET_STATISTICS_Handle *stats; 178 struct GNUNET_STATISTICS_Handle *stats;
200 179
201 /** 180 /**
202 * Current transmit handle.
203 */
204 struct GNUNET_CLIENT_TransmitHandle *th;
205
206 /**
207 * Current head of priority queue. 181 * Current head of priority queue.
208 */ 182 */
209 struct GNUNET_DATASTORE_QueueEntry *queue_head; 183 struct GNUNET_DATASTORE_QueueEntry *queue_head;
@@ -216,7 +190,7 @@ struct GNUNET_DATASTORE_Handle
216 /** 190 /**
217 * Task for trying to reconnect. 191 * Task for trying to reconnect.
218 */ 192 */
219 struct GNUNET_SCHEDULER_Task * reconnect_task; 193 struct GNUNET_SCHEDULER_Task *reconnect_task;
220 194
221 /** 195 /**
222 * How quickly should we retry? Used for exponential back-off on 196 * How quickly should we retry? Used for exponential back-off on
@@ -237,11 +211,6 @@ struct GNUNET_DATASTORE_Handle
237 unsigned int result_count; 211 unsigned int result_count;
238 212
239 /** 213 /**
240 * Are we currently trying to receive from the service?
241 */
242 int in_receive;
243
244 /**
245 * We should ignore the next message(s) from the service. 214 * We should ignore the next message(s) from the service.
246 */ 215 */
247 unsigned int skip_next_messages; 216 unsigned int skip_next_messages;
@@ -250,6 +219,110 @@ struct GNUNET_DATASTORE_Handle
250 219
251 220
252/** 221/**
222 * Try reconnecting to the datastore service.
223 *
224 * @param cls the `struct GNUNET_DATASTORE_Handle`
225 */
226static void
227try_reconnect (void *cls);
228
229
230/**
231 * Disconnect from the service and then try reconnecting to the datastore service
232 * after some delay.
233 *
234 * @param h handle to datastore to disconnect and reconnect
235 */
236static void
237do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238{
239 if (NULL == h->mq)
240 {
241 GNUNET_break (0);
242 return;
243 }
244 GNUNET_MQ_destroy (h->mq);
245 h->mq = NULL;
246 h->skip_next_messages = 0;
247 h->reconnect_task
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249 &try_reconnect,
250 h);
251}
252
253
254/**
255 * Free a queue entry. Removes the given entry from the
256 * queue and releases associated resources. Does NOT
257 * call the callback.
258 *
259 * @param qe entry to free.
260 */
261static void
262free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263{
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
267 h->queue_tail,
268 qe);
269 h->queue_size--;
270 GNUNET_free (qe);
271}
272
273
274/**
275 * Handle error in sending drop request to datastore.
276 *
277 * @param cls closure with the datastore handle
278 * @param error error code
279 */
280static void
281mq_error_handler (void *cls,
282 enum GNUNET_MQ_Error error)
283{
284 struct GNUNET_DATASTORE_Handle *h = cls;
285 struct GNUNET_DATASTORE_QueueEntry *qe;
286
287 LOG (GNUNET_ERROR_TYPE_DEBUG,
288 "MQ error, reconnecting to DATASTORE\n");
289 do_disconnect (h);
290 qe = h->queue_head;
291 if ( (NULL != qe) &&
292 (NULL == qe->env) )
293 {
294 union QueueContext qc = qe->qc;
295 uint16_t rt = qe->response_type;
296
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Failed to receive response from database.\n");
299 free_queue_entry (qe);
300 switch (rt)
301 {
302 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
303 if (NULL != qc.sc.cont)
304 qc.sc.cont (qc.sc.cont_cls,
305 GNUNET_SYSERR,
306 GNUNET_TIME_UNIT_ZERO_ABS,
307 _("DATASTORE disconnected"));
308 break;
309 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
310 if (NULL != qc.rc.proc)
311 qc.rc.proc (qc.rc.proc_cls,
312 NULL,
313 0,
314 NULL, 0, 0, 0,
315 GNUNET_TIME_UNIT_ZERO_ABS,
316 0);
317 break;
318 default:
319 GNUNET_break (0);
320 }
321 }
322}
323
324
325/**
253 * Connect to the datastore service. 326 * Connect to the datastore service.
254 * 327 *
255 * @param cfg configuration to use 328 * @param cfg configuration to use
@@ -258,22 +331,27 @@ struct GNUNET_DATASTORE_Handle
258struct GNUNET_DATASTORE_Handle * 331struct GNUNET_DATASTORE_Handle *
259GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 332GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
260{ 333{
261 struct GNUNET_CLIENT_Connection *c;
262 struct GNUNET_DATASTORE_Handle *h; 334 struct GNUNET_DATASTORE_Handle *h;
263 335
264 c = GNUNET_CLIENT_connect ("datastore", cfg); 336 LOG (GNUNET_ERROR_TYPE_DEBUG,
265 if (c == NULL) 337 "Establishing DATASTORE connection!\n");
266 return NULL; /* oops */
267 h = GNUNET_new (struct GNUNET_DATASTORE_Handle); 338 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
268 h->client = c;
269 h->cfg = cfg; 339 h->cfg = cfg;
270 h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); 340 try_reconnect (h);
341 if (NULL == h->mq)
342 {
343 GNUNET_free (h);
344 return NULL;
345 }
346 h->stats = GNUNET_STATISTICS_create ("datastore-api",
347 cfg);
271 return h; 348 return h;
272} 349}
273 350
274 351
275/** 352/**
276 * Task used by 'transmit_drop' to disconnect the datastore. 353 * Task used by to disconnect from the datastore after
354 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
277 * 355 *
278 * @param cls the datastore handle 356 * @param cls the datastore handle
279 */ 357 */
@@ -282,39 +360,29 @@ disconnect_after_drop (void *cls)
282{ 360{
283 struct GNUNET_DATASTORE_Handle *h = cls; 361 struct GNUNET_DATASTORE_Handle *h = cls;
284 362
363 LOG (GNUNET_ERROR_TYPE_DEBUG,
364 "Drop sent, disconnecting\n");
285 GNUNET_DATASTORE_disconnect (h, 365 GNUNET_DATASTORE_disconnect (h,
286 GNUNET_NO); 366 GNUNET_NO);
287} 367}
288 368
289 369
290/** 370/**
291 * Transmit DROP message to datastore service. 371 * Handle error in sending drop request to datastore.
292 * 372 *
293 * @param cls the `struct GNUNET_DATASTORE_Handle` 373 * @param cls closure with the datastore handle
294 * @param size number of bytes that can be copied to @a buf 374 * @param error error code
295 * @param buf where to copy the drop message
296 * @return number of bytes written to @a buf
297 */ 375 */
298static size_t 376static void
299transmit_drop (void *cls, size_t size, void *buf) 377disconnect_on_mq_error (void *cls,
378 enum GNUNET_MQ_Error error)
300{ 379{
301 struct GNUNET_DATASTORE_Handle *h = cls; 380 struct GNUNET_DATASTORE_Handle *h = cls;
302 struct GNUNET_MessageHeader *hdr;
303 381
304 if (buf == NULL) 382 LOG (GNUNET_ERROR_TYPE_ERROR,
305 { 383 "Failed to ask datastore to drop tables\n");
306 LOG (GNUNET_ERROR_TYPE_WARNING, 384 GNUNET_DATASTORE_disconnect (h,
307 _("Failed to transmit request to drop database.\n")); 385 GNUNET_NO);
308 GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
309 return 0;
310 }
311 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
312 hdr = buf;
313 hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
314 hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
315 GNUNET_SCHEDULER_add_now (&disconnect_after_drop,
316 h);
317 return sizeof (struct GNUNET_MessageHeader);
318} 386}
319 387
320 388
@@ -333,15 +401,10 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
333 401
334 LOG (GNUNET_ERROR_TYPE_DEBUG, 402 LOG (GNUNET_ERROR_TYPE_DEBUG,
335 "Datastore disconnect\n"); 403 "Datastore disconnect\n");
336 if (NULL != h->th) 404 if (NULL != h->mq)
337 {
338 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
339 h->th = NULL;
340 }
341 if (NULL != h->client)
342 { 405 {
343 GNUNET_CLIENT_disconnect (h->client); 406 GNUNET_MQ_destroy (h->mq);
344 h->client = NULL; 407 h->mq = NULL;
345 } 408 }
346 if (NULL != h->reconnect_task) 409 if (NULL != h->reconnect_task)
347 { 410 {
@@ -350,25 +413,52 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
350 } 413 }
351 while (NULL != (qe = h->queue_head)) 414 while (NULL != (qe = h->queue_head))
352 { 415 {
353 GNUNET_assert (NULL != qe->response_proc); 416 switch (qe->response_type)
354 qe->response_proc (h, NULL); 417 {
418 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
419 if (NULL != qe->qc.sc.cont)
420 qe->qc.sc.cont (qe->qc.sc.cont_cls,
421 GNUNET_SYSERR,
422 GNUNET_TIME_UNIT_ZERO_ABS,
423 _("Disconnected from DATASTORE"));
424 break;
425 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
426 if (NULL != qe->qc.rc.proc)
427 qe->qc.rc.proc (qe->qc.rc.proc_cls,
428 NULL,
429 0,
430 NULL, 0, 0, 0,
431 GNUNET_TIME_UNIT_ZERO_ABS,
432 0);
433 break;
434 default:
435 GNUNET_break (0);
436 }
437 free_queue_entry (qe);
355 } 438 }
356 if (GNUNET_YES == drop) 439 if (GNUNET_YES == drop)
357 { 440 {
358 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); 441 LOG (GNUNET_ERROR_TYPE_DEBUG,
359 if (NULL != h->client) 442 "Re-connecting to issue DROP!\n");
443 GNUNET_assert (NULL == h->mq);
444 h->mq = GNUNET_CLIENT_connecT (h->cfg,
445 "datastore",
446 NULL,
447 &disconnect_on_mq_error,
448 h);
449 if (NULL != h->mq)
360 { 450 {
361 if (NULL != 451 struct GNUNET_MessageHeader *hdr;
362 GNUNET_CLIENT_notify_transmit_ready (h->client, 452 struct GNUNET_MQ_Envelope *env;
363 sizeof (struct 453
364 GNUNET_MessageHeader), 454 env = GNUNET_MQ_msg (hdr,
365 GNUNET_TIME_UNIT_SECONDS, 455 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
366 GNUNET_YES, 456 GNUNET_MQ_notify_sent (env,
367 &transmit_drop, 457 &disconnect_after_drop,
368 h)) 458 h);
369 return; 459 GNUNET_MQ_send (h->mq,
370 GNUNET_CLIENT_disconnect (h->client); 460 env);
371 h->client = NULL; 461 return;
372 } 462 }
373 GNUNET_break (0); 463 GNUNET_break (0);
374 } 464 }
@@ -380,67 +470,37 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
380 470
381 471
382/** 472/**
383 * A request has timed out (before being transmitted to the service).
384 *
385 * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
386 */
387static void
388timeout_queue_entry (void *cls)
389{
390 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
391 struct GNUNET_DATASTORE_Handle *h = qe->h;
392
393 GNUNET_STATISTICS_update (h->stats,
394 gettext_noop ("# queue entry timeouts"),
395 1,
396 GNUNET_NO);
397 qe->task = NULL;
398 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
399 LOG (GNUNET_ERROR_TYPE_DEBUG,
400 "Timeout of request in datastore queue\n");
401 /* response_proc's expect request at the head of the queue! */
402 GNUNET_CONTAINER_DLL_remove (h->queue_head,
403 h->queue_tail,
404 qe);
405 GNUNET_CONTAINER_DLL_insert (h->queue_head,
406 h->queue_tail,
407 qe);
408 GNUNET_assert (h->queue_head == qe);
409 qe->response_proc (qe->h, NULL);
410}
411
412
413/**
414 * Create a new entry for our priority queue (and possibly discard other entires if 473 * Create a new entry for our priority queue (and possibly discard other entires if
415 * the queue is getting too long). 474 * the queue is getting too long).
416 * 475 *
417 * @param h handle to the datastore 476 * @param h handle to the datastore
418 * @param msize size of the message to queue 477 * @param env envelope with the message to queue
419 * @param queue_priority priority of the entry 478 * @param queue_priority priority of the entry
420 * @param max_queue_size at what queue size should this request be dropped 479 * @param max_queue_size at what queue size should this request be dropped
421 * (if other requests of higher priority are in the queue) 480 * (if other requests of higher priority are in the queue)
422 * @param timeout timeout for the operation 481 * @param expected_type which type of response do we expect,
423 * @param response_proc function to call with replies (can be NULL) 482 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
483 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
424 * @param qc client context (NOT a closure for @a response_proc) 484 * @param qc client context (NOT a closure for @a response_proc)
425 * @return NULL if the queue is full 485 * @return NULL if the queue is full
426 */ 486 */
427static struct GNUNET_DATASTORE_QueueEntry * 487static struct GNUNET_DATASTORE_QueueEntry *
428make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 488make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
429 size_t msize, 489 struct GNUNET_MQ_Envelope *env,
430 unsigned int queue_priority, 490 unsigned int queue_priority,
431 unsigned int max_queue_size, 491 unsigned int max_queue_size,
432 struct GNUNET_TIME_Relative timeout, 492 uint16_t expected_type,
433 GNUNET_CLIENT_MessageHandler response_proc,
434 const union QueueContext *qc) 493 const union QueueContext *qc)
435{ 494{
436 struct GNUNET_DATASTORE_QueueEntry *ret; 495 struct GNUNET_DATASTORE_QueueEntry *qe;
437 struct GNUNET_DATASTORE_QueueEntry *pos; 496 struct GNUNET_DATASTORE_QueueEntry *pos;
438 unsigned int c; 497 unsigned int c;
439 498
440 c = 0; 499 c = 0;
441 pos = h->queue_head; 500 pos = h->queue_head;
442 while ((pos != NULL) && (c < max_queue_size) && 501 while ( (NULL != pos) &&
443 (pos->priority >= queue_priority)) 502 (c < max_queue_size) &&
503 (pos->priority >= queue_priority) )
444 { 504 {
445 c++; 505 c++;
446 pos = pos->next; 506 pos = pos->next;
@@ -451,18 +511,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
451 gettext_noop ("# queue overflows"), 511 gettext_noop ("# queue overflows"),
452 1, 512 1,
453 GNUNET_NO); 513 GNUNET_NO);
514 GNUNET_MQ_discard (env);
454 return NULL; 515 return NULL;
455 } 516 }
456 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); 517 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
457 ret->h = h; 518 qe->h = h;
458 ret->response_proc = response_proc; 519 qe->env = env;
459 ret->qc = *qc; 520 qe->response_type = expected_type;
460 ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); 521 qe->qc = *qc;
461 ret->priority = queue_priority; 522 qe->priority = queue_priority;
462 ret->max_queue = max_queue_size; 523 qe->max_queue = max_queue_size;
463 ret->message_size = msize; 524 if (NULL == pos)
464 ret->was_transmitted = GNUNET_NO;
465 if (pos == NULL)
466 { 525 {
467 /* append at the tail */ 526 /* append at the tail */
468 pos = h->queue_tail; 527 pos = h->queue_tail;
@@ -472,49 +531,23 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
472 pos = pos->prev; 531 pos = pos->prev;
473 /* do not insert at HEAD if HEAD query was already 532 /* do not insert at HEAD if HEAD query was already
474 * transmitted and we are still receiving replies! */ 533 * transmitted and we are still receiving replies! */
475 if ((pos == NULL) && (h->queue_head->was_transmitted)) 534 if ( (NULL == pos) &&
535 (NULL == h->queue_head->env) )
476 pos = h->queue_head; 536 pos = h->queue_head;
477 } 537 }
478 c++; 538 c++;
479#if INSANE_STATISTICS 539#if INSANE_STATISTICS
480 GNUNET_STATISTICS_update (h->stats, 540 GNUNET_STATISTICS_update (h->stats,
481 gettext_noop ("# queue entries created"), 541 gettext_noop ("# queue entries created"),
482 1, GNUNET_NO); 542 1,
543 GNUNET_NO);
483#endif 544#endif
484 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, 545 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
485 h->queue_tail, 546 h->queue_tail,
486 pos, 547 pos,
487 ret); 548 qe);
488 h->queue_size++; 549 h->queue_size++;
489 ret->task = GNUNET_SCHEDULER_add_delayed (timeout, 550 return qe;
490 &timeout_queue_entry,
491 ret);
492 for (pos = ret->next; NULL != pos; pos = pos->next)
493 {
494 if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
495 {
496 GNUNET_assert (NULL != pos->response_proc);
497 /* move 'pos' element to head so that it will be
498 * killed on 'NULL' call below */
499 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "Dropping request from datastore queue\n");
501 /* response_proc's expect request at the head of the queue! */
502 GNUNET_CONTAINER_DLL_remove (h->queue_head,
503 h->queue_tail,
504 pos);
505 GNUNET_CONTAINER_DLL_insert (h->queue_head,
506 h->queue_tail,
507 pos);
508 GNUNET_STATISTICS_update (h->stats,
509 gettext_noop
510 ("# Requests dropped from datastore queue"), 1,
511 GNUNET_NO);
512 GNUNET_assert (h->queue_head == pos);
513 pos->response_proc (h, NULL);
514 break;
515 }
516 }
517 return ret;
518} 551}
519 552
520 553
@@ -525,78 +558,88 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
525 * @param h handle to the datastore 558 * @param h handle to the datastore
526 */ 559 */
527static void 560static void
528process_queue (struct GNUNET_DATASTORE_Handle *h); 561process_queue (struct GNUNET_DATASTORE_Handle *h)
529
530
531/**
532 * Try reconnecting to the datastore service.
533 *
534 * @param cls the `struct GNUNET_DATASTORE_Handle`
535 */
536static void
537try_reconnect (void *cls)
538{ 562{
539 struct GNUNET_DATASTORE_Handle *h = cls; 563 struct GNUNET_DATASTORE_QueueEntry *qe;
540 564
541 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); 565 if (NULL == (qe = h->queue_head))
542 h->reconnect_task = NULL;
543 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
544 if (h->client == NULL)
545 { 566 {
546 LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); 567 /* no entry in queue */
568 LOG (GNUNET_ERROR_TYPE_DEBUG,
569 "Queue empty\n");
547 return; 570 return;
548 } 571 }
549 GNUNET_STATISTICS_update (h->stats, 572 if (NULL == qe->env)
550 gettext_noop 573 {
551 ("# datastore connections (re)created"), 1, 574 /* waiting for replies */
552 GNUNET_NO); 575 LOG (GNUNET_ERROR_TYPE_DEBUG,
553 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); 576 "Head request already transmitted\n");
554 process_queue (h); 577 return;
578 }
579 if (NULL == h->mq)
580 {
581 /* waiting for reconnect */
582 LOG (GNUNET_ERROR_TYPE_DEBUG,
583 "Not connected\n");
584 return;
585 }
586 GNUNET_MQ_send (h->mq,
587 qe->env);
588 qe->env = NULL;
555} 589}
556 590
557 591
592
593
558/** 594/**
559 * Disconnect from the service and then try reconnecting to the datastore service 595 * Function called to check status message from the service.
560 * after some delay.
561 * 596 *
562 * @param h handle to datastore to disconnect and reconnect 597 * @param cls closure
598 * @param sm status message received
599 * @return #GNUNET_OK if the message is well-formed
563 */ 600 */
564static void 601static int
565do_disconnect (struct GNUNET_DATASTORE_Handle *h) 602check_status (void *cls,
603 const struct StatusMessage *sm)
566{ 604{
567 if (NULL == h->client) 605 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
606 int32_t status = ntohl (sm->status);
607
608 if (msize > 0)
568 { 609 {
569 LOG (GNUNET_ERROR_TYPE_DEBUG, 610 const char *emsg = (const char *) &sm[1];
570 "Client NULL in disconnect, will not try to reconnect\n"); 611
571 return; 612 if ('\0' != emsg[msize - 1])
613 {
614 GNUNET_break (0);
615 return GNUNET_SYSERR;
616 }
572 } 617 }
573 GNUNET_CLIENT_disconnect (h->client); 618 else if (GNUNET_SYSERR == status)
574 h->skip_next_messages = 0; 619 {
575 h->client = NULL; 620 GNUNET_break (0);
576 h->reconnect_task = 621 return GNUNET_SYSERR;
577 GNUNET_SCHEDULER_add_delayed (h->retry_time, 622 }
578 &try_reconnect, 623 return GNUNET_OK;
579 h);
580} 624}
581 625
582 626
583/** 627/**
584 * Function called whenever we receive a message from 628 * Function called to handle status message from the service.
585 * the service. Calls the appropriate handler.
586 * 629 *
587 * @param cls the `struct GNUNET_DATASTORE_Handle` 630 * @param cls closure
588 * @param msg the received message 631 * @param sm status message received
589 */ 632 */
590static void 633static void
591receive_cb (void *cls, 634handle_status (void *cls,
592 const struct GNUNET_MessageHeader *msg) 635 const struct StatusMessage *sm)
593{ 636{
594 struct GNUNET_DATASTORE_Handle *h = cls; 637 struct GNUNET_DATASTORE_Handle *h = cls;
595 struct GNUNET_DATASTORE_QueueEntry *qe; 638 struct GNUNET_DATASTORE_QueueEntry *qe;
639 struct StatusContext rc;
640 const char *emsg;
641 int32_t status = ntohl (sm->status);
596 642
597 h->in_receive = GNUNET_NO;
598 LOG (GNUNET_ERROR_TYPE_DEBUG,
599 "Receiving reply from datastore\n");
600 if (h->skip_next_messages > 0) 643 if (h->skip_next_messages > 0)
601 { 644 {
602 h->skip_next_messages--; 645 h->skip_next_messages--;
@@ -606,252 +649,255 @@ receive_cb (void *cls,
606 if (NULL == (qe = h->queue_head)) 649 if (NULL == (qe = h->queue_head))
607 { 650 {
608 GNUNET_break (0); 651 GNUNET_break (0);
609 process_queue (h); 652 do_disconnect (h);
653 return;
654 }
655 if (NULL != qe->env)
656 {
657 GNUNET_break (0);
658 do_disconnect (h);
610 return; 659 return;
611 } 660 }
612 qe->response_proc (h, msg); 661 if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
662 {
663 GNUNET_break (0);
664 do_disconnect (h);
665 return;
666 }
667 rc = qe->qc.sc;
668 free_queue_entry (qe);
669 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
670 emsg = (const char *) &sm[1];
671 else
672 emsg = NULL;
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Received status %d/%s\n",
675 (int) status,
676 emsg);
677 GNUNET_STATISTICS_update (h->stats,
678 gettext_noop ("# status messages received"),
679 1,
680 GNUNET_NO);
681 h->retry_time = GNUNET_TIME_UNIT_ZERO;
682 process_queue (h);
683 if (NULL != rc.cont)
684 rc.cont (rc.cont_cls,
685 status,
686 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
687 emsg);
613} 688}
614 689
615 690
616/** 691/**
617 * Transmit request from queue to datastore service. 692 * Check data message we received from the service.
618 * 693 *
619 * @param cls the `struct GNUNET_DATASTORE_Handle` 694 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
620 * @param size number of bytes that can be copied to @a buf 695 * @param dm message received
621 * @param buf where to copy the drop message
622 * @return number of bytes written to @a buf
623 */ 696 */
624static size_t 697static int
625transmit_request (void *cls, 698check_data (void *cls,
626 size_t size, 699 const struct DataMessage *dm)
627 void *buf)
628{ 700{
629 struct GNUNET_DATASTORE_Handle *h = cls; 701 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
630 struct GNUNET_DATASTORE_QueueEntry *qe;
631 size_t msize;
632 702
633 h->th = NULL; 703 if (msize != ntohl (dm->size))
634 if (NULL == (qe = h->queue_head))
635 return 0; /* no entry in queue */
636 if (NULL == buf)
637 { 704 {
638 LOG (GNUNET_ERROR_TYPE_DEBUG, 705 GNUNET_break (0);
639 "Failed to transmit request to DATASTORE.\n"); 706 return GNUNET_SYSERR;
640 GNUNET_STATISTICS_update (h->stats,
641 gettext_noop ("# transmission request failures"),
642 1, GNUNET_NO);
643 do_disconnect (h);
644 return 0;
645 }
646 if (size < (msize = qe->message_size))
647 {
648 process_queue (h);
649 return 0;
650 } 707 }
651 LOG (GNUNET_ERROR_TYPE_DEBUG, 708 return GNUNET_OK;
652 "Transmitting %u byte request to DATASTORE\n",
653 msize);
654 memcpy (buf, &qe[1], msize);
655 qe->was_transmitted = GNUNET_YES;
656 GNUNET_SCHEDULER_cancel (qe->task);
657 qe->task = NULL;
658 GNUNET_assert (GNUNET_NO == h->in_receive);
659 h->in_receive = GNUNET_YES;
660 GNUNET_CLIENT_receive (h->client,
661 &receive_cb, h,
662 GNUNET_TIME_absolute_get_remaining (qe->timeout));
663#if INSANE_STATISTICS
664 GNUNET_STATISTICS_update (h->stats,
665 gettext_noop ("# bytes sent to datastore"), msize,
666 GNUNET_NO);
667#endif
668 return msize;
669} 709}
670 710
671 711
672/** 712/**
673 * Process entries in the queue (or do nothing if we are already 713 * Handle data message we got from the service.
674 * doing so).
675 * 714 *
676 * @param h handle to the datastore 715 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
716 * @param dm message received
677 */ 717 */
678static void 718static void
679process_queue (struct GNUNET_DATASTORE_Handle *h) 719handle_data (void *cls,
720 const struct DataMessage *dm)
680{ 721{
722 struct GNUNET_DATASTORE_Handle *h = cls;
681 struct GNUNET_DATASTORE_QueueEntry *qe; 723 struct GNUNET_DATASTORE_QueueEntry *qe;
724 struct ResultContext rc;
682 725
683 if (NULL == (qe = h->queue_head)) 726 if (h->skip_next_messages > 0)
684 {
685 /* no entry in queue */
686 LOG (GNUNET_ERROR_TYPE_DEBUG,
687 "Queue empty\n");
688 return;
689 }
690 if (GNUNET_YES == qe->was_transmitted)
691 { 727 {
692 /* waiting for replies */ 728 process_queue (h);
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "Head request already transmitted\n");
695 return; 729 return;
696 } 730 }
697 if (NULL != h->th) 731 qe = h->queue_head;
732 if (NULL == qe)
698 { 733 {
699 /* request pending */ 734 GNUNET_break (0);
700 LOG (GNUNET_ERROR_TYPE_DEBUG, 735 do_disconnect (h);
701 "Pending transmission request\n");
702 return; 736 return;
703 } 737 }
704 if (NULL == h->client) 738 if (NULL != qe->env)
705 { 739 {
706 /* waiting for reconnect */ 740 GNUNET_break (0);
707 LOG (GNUNET_ERROR_TYPE_DEBUG, 741 do_disconnect (h);
708 "Not connected\n");
709 return; 742 return;
710 } 743 }
711 if (GNUNET_YES == h->in_receive) 744 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
712 { 745 {
713 /* wait for response to previous query */ 746 GNUNET_break (0);
747 do_disconnect (h);
714 return; 748 return;
715 } 749 }
750#if INSANE_STATISTICS
751 GNUNET_STATISTICS_update (h->stats,
752 gettext_noop ("# Results received"),
753 1,
754 GNUNET_NO);
755#endif
716 LOG (GNUNET_ERROR_TYPE_DEBUG, 756 LOG (GNUNET_ERROR_TYPE_DEBUG,
717 "Queueing %u byte request to DATASTORE\n", 757 "Received result %llu with type %u and size %u with key %s\n",
718 qe->message_size); 758 (unsigned long long) GNUNET_ntohll (dm->uid),
719 h->th 759 ntohl (dm->type),
720 = GNUNET_CLIENT_notify_transmit_ready (h->client, 760 ntohl (dm->size),
721 qe->message_size, 761 GNUNET_h2s (&dm->key));
722 GNUNET_TIME_absolute_get_remaining (qe->timeout), 762 rc = qe->qc.rc;
723 GNUNET_YES, 763 free_queue_entry (qe);
724 &transmit_request, 764 h->retry_time = GNUNET_TIME_UNIT_ZERO;
725 h); 765 process_queue (h);
726 GNUNET_assert (GNUNET_NO == h->in_receive); 766 if (NULL != rc.proc)
727 GNUNET_break (NULL != h->th); 767 rc.proc (rc.proc_cls,
728} 768 &dm->key,
729 769 ntohl (dm->size),
730 770 &dm[1],
731/** 771 ntohl (dm->type),
732 * Dummy continuation used to do nothing (but be non-zero). 772 ntohl (dm->priority),
733 * 773 ntohl (dm->anonymity),
734 * @param cls closure 774 GNUNET_TIME_absolute_ntoh (dm->expiration),
735 * @param result result 775 GNUNET_ntohll (dm->uid));
736 * @param min_expiration expiration time
737 * @param emsg error message
738 */
739static void
740drop_status_cont (void *cls, int32_t result,
741 struct GNUNET_TIME_Absolute min_expiration,
742 const char *emsg)
743{
744 /* do nothing */
745}
746
747
748/**
749 * Free a queue entry. Removes the given entry from the
750 * queue and releases associated resources. Does NOT
751 * call the callback.
752 *
753 * @param qe entry to free.
754 */
755static void
756free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
757{
758 struct GNUNET_DATASTORE_Handle *h = qe->h;
759
760 GNUNET_CONTAINER_DLL_remove (h->queue_head,
761 h->queue_tail,
762 qe);
763 if (qe->task != NULL)
764 {
765 GNUNET_SCHEDULER_cancel (qe->task);
766 qe->task = NULL;
767 }
768 h->queue_size--;
769 qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
770 GNUNET_free (qe);
771} 776}
772 777
773 778
774/** 779/**
775 * Type of a function to call when we receive a message 780 * Type of a function to call when we receive a
776 * from the service. 781 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
777 * 782 *
778 * @param cls closure 783 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
779 * @param msg message received, NULL on timeout or fatal error 784 * @param msg message received
780 */ 785 */
781static void 786static void
782process_status_message (void *cls, 787handle_data_end (void *cls,
783 const struct GNUNET_MessageHeader *msg) 788 const struct GNUNET_MessageHeader *msg)
784{ 789{
785 struct GNUNET_DATASTORE_Handle *h = cls; 790 struct GNUNET_DATASTORE_Handle *h = cls;
786 struct GNUNET_DATASTORE_QueueEntry *qe; 791 struct GNUNET_DATASTORE_QueueEntry *qe;
787 struct StatusContext rc; 792 struct ResultContext rc;
788 const struct StatusMessage *sm;
789 const char *emsg;
790 int32_t status;
791 int was_transmitted;
792 793
793 if (NULL == (qe = h->queue_head)) 794 if (h->skip_next_messages > 0)
794 { 795 {
795 GNUNET_break (0); 796 h->skip_next_messages--;
796 do_disconnect (h); 797 process_queue (h);
797 return; 798 return;
798 } 799 }
799 rc = qe->qc.sc; 800 qe = h->queue_head;
800 if (NULL == msg) 801 if (NULL == qe)
801 { 802 {
802 was_transmitted = qe->was_transmitted; 803 GNUNET_break (0);
803 free_queue_entry (qe); 804 do_disconnect (h);
804 if (was_transmitted == GNUNET_YES)
805 do_disconnect (h);
806 else
807 process_queue (h);
808 if (NULL != rc.cont)
809 rc.cont (rc.cont_cls, GNUNET_SYSERR,
810 GNUNET_TIME_UNIT_ZERO_ABS,
811 _("Failed to receive status response from database."));
812 return; 805 return;
813 } 806 }
814 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 807 if (NULL != qe->env)
815 free_queue_entry (qe);
816 if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
817 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
818 { 808 {
819 GNUNET_break (0); 809 GNUNET_break (0);
820 h->retry_time = GNUNET_TIME_UNIT_ZERO;
821 do_disconnect (h); 810 do_disconnect (h);
822 if (rc.cont != NULL)
823 rc.cont (rc.cont_cls, GNUNET_SYSERR,
824 GNUNET_TIME_UNIT_ZERO_ABS,
825 _("Error reading response from datastore service"));
826 return; 811 return;
827 } 812 }
828 sm = (const struct StatusMessage *) msg; 813 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
829 status = ntohl (sm->status);
830 emsg = NULL;
831 if (ntohs (msg->size) > sizeof (struct StatusMessage))
832 {
833 emsg = (const char *) &sm[1];
834 if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
835 {
836 GNUNET_break (0);
837 emsg = _("Invalid error message received from datastore service");
838 }
839 }
840 if ((status == GNUNET_SYSERR) && (emsg == NULL))
841 { 814 {
842 GNUNET_break (0); 815 GNUNET_break (0);
843 emsg = _("Invalid error message received from datastore service"); 816 do_disconnect (h);
817 return;
844 } 818 }
845 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); 819 rc = qe->qc.rc;
820 free_queue_entry (qe);
821 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Received end of result set, new queue size is %u\n",
823 h->queue_size);
824 h->retry_time = GNUNET_TIME_UNIT_ZERO;
825 h->result_count = 0;
826 process_queue (h);
827 /* signal end of iteration */
828 if (NULL != rc.proc)
829 rc.proc (rc.proc_cls,
830 NULL,
831 0,
832 NULL,
833 0,
834 0,
835 0,
836 GNUNET_TIME_UNIT_ZERO_ABS,
837 0);
838}
839
840
841/**
842 * Try reconnecting to the datastore service.
843 *
844 * @param cls the `struct GNUNET_DATASTORE_Handle`
845 */
846static void
847try_reconnect (void *cls)
848{
849 GNUNET_MQ_hd_var_size (status,
850 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
851 struct StatusMessage);
852 GNUNET_MQ_hd_var_size (data,
853 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
854 struct DataMessage);
855 GNUNET_MQ_hd_fixed_size (data_end,
856 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
857 struct GNUNET_MessageHeader);
858 struct GNUNET_DATASTORE_Handle *h = cls;
859 struct GNUNET_MQ_MessageHandler handlers[] = {
860 make_status_handler (h),
861 make_data_handler (h),
862 make_data_end_handler (h),
863 GNUNET_MQ_handler_end ()
864 };
865
866 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
867 h->reconnect_task = NULL;
868 GNUNET_assert (NULL == h->mq);
869 h->mq = GNUNET_CLIENT_connecT (h->cfg,
870 "datastore",
871 handlers,
872 &mq_error_handler,
873 h);
874 if (NULL == h->mq)
875 return;
846 GNUNET_STATISTICS_update (h->stats, 876 GNUNET_STATISTICS_update (h->stats,
847 gettext_noop ("# status messages received"), 1, 877 gettext_noop ("# datastore connections (re)created"),
878 1,
848 GNUNET_NO); 879 GNUNET_NO);
849 h->retry_time = GNUNET_TIME_UNIT_ZERO; 880 LOG (GNUNET_ERROR_TYPE_DEBUG,
881 "Reconnected to DATASTORE\n");
850 process_queue (h); 882 process_queue (h);
851 if (rc.cont != NULL) 883}
852 rc.cont (rc.cont_cls, status, 884
853 GNUNET_TIME_absolute_ntoh (sm->min_expiration), 885
854 emsg); 886/**
887 * Dummy continuation used to do nothing (but be non-zero).
888 *
889 * @param cls closure
890 * @param result result
891 * @param min_expiration expiration time
892 * @param emsg error message
893 */
894static void
895drop_status_cont (void *cls,
896 int32_t result,
897 struct GNUNET_TIME_Absolute min_expiration,
898 const char *emsg)
899{
900 /* do nothing */
855} 901}
856 902
857 903
@@ -874,7 +920,6 @@ process_status_message (void *cls,
874 * @param queue_priority ranking of this request in the priority queue 920 * @param queue_priority ranking of this request in the priority queue
875 * @param max_queue_size at what queue size should this request be dropped 921 * @param max_queue_size at what queue size should this request be dropped
876 * (if other requests of higher priority are in the queue) 922 * (if other requests of higher priority are in the queue)
877 * @param timeout timeout for the operation
878 * @param cont continuation to call when done 923 * @param cont continuation to call when done
879 * @param cont_cls closure for @a cont 924 * @param cont_cls closure for @a cont
880 * @return NULL if the entry was not queued, otherwise a handle that can be used to 925 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -894,33 +939,51 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
894 struct GNUNET_TIME_Absolute expiration, 939 struct GNUNET_TIME_Absolute expiration,
895 unsigned int queue_priority, 940 unsigned int queue_priority,
896 unsigned int max_queue_size, 941 unsigned int max_queue_size,
897 struct GNUNET_TIME_Relative timeout,
898 GNUNET_DATASTORE_ContinuationWithStatus cont, 942 GNUNET_DATASTORE_ContinuationWithStatus cont,
899 void *cont_cls) 943 void *cont_cls)
900{ 944{
901 struct GNUNET_DATASTORE_QueueEntry *qe; 945 struct GNUNET_DATASTORE_QueueEntry *qe;
946 struct GNUNET_MQ_Envelope *env;
902 struct DataMessage *dm; 947 struct DataMessage *dm;
903 size_t msize;
904 union QueueContext qc; 948 union QueueContext qc;
905 949
950 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
951 {
952 GNUNET_break (0);
953 return NULL;
954 }
955
906 LOG (GNUNET_ERROR_TYPE_DEBUG, 956 LOG (GNUNET_ERROR_TYPE_DEBUG,
907 "Asked to put %u bytes of data under key `%s' for %s\n", 957 "Asked to put %u bytes of data under key `%s' for %s\n",
908 size, 958 size,
909 GNUNET_h2s (key), 959 GNUNET_h2s (key),
910 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), 960 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
911 GNUNET_YES)); 961 GNUNET_YES));
912 msize = sizeof (struct DataMessage) + size; 962 env = GNUNET_MQ_msg_extra (dm,
913 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 963 size,
964 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
965 dm->rid = htonl (rid);
966 dm->size = htonl ((uint32_t) size);
967 dm->type = htonl (type);
968 dm->priority = htonl (priority);
969 dm->anonymity = htonl (anonymity);
970 dm->replication = htonl (replication);
971 dm->reserved = htonl (0);
972 dm->uid = GNUNET_htonll (0);
973 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
974 dm->key = *key;
975 memcpy (&dm[1],
976 data,
977 size);
914 qc.sc.cont = cont; 978 qc.sc.cont = cont;
915 qc.sc.cont_cls = cont_cls; 979 qc.sc.cont_cls = cont_cls;
916 qe = make_queue_entry (h, 980 qe = make_queue_entry (h,
917 msize, 981 env,
918 queue_priority, 982 queue_priority,
919 max_queue_size, 983 max_queue_size,
920 timeout, 984 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
921 &process_status_message,
922 &qc); 985 &qc);
923 if (qe == NULL) 986 if (NULL == qe)
924 { 987 {
925 LOG (GNUNET_ERROR_TYPE_DEBUG, 988 LOG (GNUNET_ERROR_TYPE_DEBUG,
926 "Could not create queue entry for PUT\n"); 989 "Could not create queue entry for PUT\n");
@@ -930,20 +993,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
930 gettext_noop ("# PUT requests executed"), 993 gettext_noop ("# PUT requests executed"),
931 1, 994 1,
932 GNUNET_NO); 995 GNUNET_NO);
933 dm = (struct DataMessage *) &qe[1];
934 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
935 dm->header.size = htons (msize);
936 dm->rid = htonl (rid);
937 dm->size = htonl ((uint32_t) size);
938 dm->type = htonl (type);
939 dm->priority = htonl (priority);
940 dm->anonymity = htonl (anonymity);
941 dm->replication = htonl (replication);
942 dm->reserved = htonl (0);
943 dm->uid = GNUNET_htonll (0);
944 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
945 dm->key = *key;
946 memcpy (&dm[1], data, size);
947 process_queue (h); 996 process_queue (h);
948 return qe; 997 return qe;
949} 998}
@@ -972,6 +1021,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
972 void *cont_cls) 1021 void *cont_cls)
973{ 1022{
974 struct GNUNET_DATASTORE_QueueEntry *qe; 1023 struct GNUNET_DATASTORE_QueueEntry *qe;
1024 struct GNUNET_MQ_Envelope *env;
975 struct ReserveMessage *rm; 1025 struct ReserveMessage *rm;
976 union QueueContext qc; 1026 union QueueContext qc;
977 1027
@@ -981,14 +1031,18 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
981 "Asked to reserve %llu bytes of data and %u entries\n", 1031 "Asked to reserve %llu bytes of data and %u entries\n",
982 (unsigned long long) amount, 1032 (unsigned long long) amount,
983 (unsigned int) entries); 1033 (unsigned int) entries);
1034 env = GNUNET_MQ_msg (rm,
1035 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1036 rm->entries = htonl (entries);
1037 rm->amount = GNUNET_htonll (amount);
1038
984 qc.sc.cont = cont; 1039 qc.sc.cont = cont;
985 qc.sc.cont_cls = cont_cls; 1040 qc.sc.cont_cls = cont_cls;
986 qe = make_queue_entry (h, 1041 qe = make_queue_entry (h,
987 sizeof (struct ReserveMessage), 1042 env,
988 UINT_MAX, 1043 UINT_MAX,
989 UINT_MAX, 1044 UINT_MAX,
990 GNUNET_TIME_UNIT_FOREVER_REL, 1045 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
991 &process_status_message,
992 &qc); 1046 &qc);
993 if (NULL == qe) 1047 if (NULL == qe)
994 { 1048 {
@@ -1000,11 +1054,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1000 gettext_noop ("# RESERVE requests executed"), 1054 gettext_noop ("# RESERVE requests executed"),
1001 1, 1055 1,
1002 GNUNET_NO); 1056 GNUNET_NO);
1003 rm = (struct ReserveMessage *) &qe[1];
1004 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1005 rm->header.size = htons (sizeof (struct ReserveMessage));
1006 rm->entries = htonl (entries);
1007 rm->amount = GNUNET_htonll (amount);
1008 process_queue (h); 1057 process_queue (h);
1009 return qe; 1058 return qe;
1010} 1059}
@@ -1024,7 +1073,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1024 * @param queue_priority ranking of this request in the priority queue 1073 * @param queue_priority ranking of this request in the priority queue
1025 * @param max_queue_size at what queue size should this request be dropped 1074 * @param max_queue_size at what queue size should this request be dropped
1026 * (if other requests of higher priority are in the queue) 1075 * (if other requests of higher priority are in the queue)
1027 * @param timeout how long to wait at most for a response
1028 * @param cont continuation to call when done 1076 * @param cont continuation to call when done
1029 * @param cont_cls closure for @a cont 1077 * @param cont_cls closure for @a cont
1030 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1078 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1036,29 +1084,31 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1036 uint32_t rid, 1084 uint32_t rid,
1037 unsigned int queue_priority, 1085 unsigned int queue_priority,
1038 unsigned int max_queue_size, 1086 unsigned int max_queue_size,
1039 struct GNUNET_TIME_Relative timeout,
1040 GNUNET_DATASTORE_ContinuationWithStatus cont, 1087 GNUNET_DATASTORE_ContinuationWithStatus cont,
1041 void *cont_cls) 1088 void *cont_cls)
1042{ 1089{
1043 struct GNUNET_DATASTORE_QueueEntry *qe; 1090 struct GNUNET_DATASTORE_QueueEntry *qe;
1091 struct GNUNET_MQ_Envelope *env;
1044 struct ReleaseReserveMessage *rrm; 1092 struct ReleaseReserveMessage *rrm;
1045 union QueueContext qc; 1093 union QueueContext qc;
1046 1094
1047 if (cont == NULL) 1095 if (NULL == cont)
1048 cont = &drop_status_cont; 1096 cont = &drop_status_cont;
1049 LOG (GNUNET_ERROR_TYPE_DEBUG, 1097 LOG (GNUNET_ERROR_TYPE_DEBUG,
1050 "Asked to release reserve %d\n", 1098 "Asked to release reserve %d\n",
1051 rid); 1099 rid);
1100 env = GNUNET_MQ_msg (rrm,
1101 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1102 rrm->rid = htonl (rid);
1052 qc.sc.cont = cont; 1103 qc.sc.cont = cont;
1053 qc.sc.cont_cls = cont_cls; 1104 qc.sc.cont_cls = cont_cls;
1054 qe = make_queue_entry (h, 1105 qe = make_queue_entry (h,
1055 sizeof (struct ReleaseReserveMessage), 1106 env,
1056 queue_priority, 1107 queue_priority,
1057 max_queue_size, 1108 max_queue_size,
1058 timeout, 1109 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1059 &process_status_message,
1060 &qc); 1110 &qc);
1061 if (qe == NULL) 1111 if (NULL == qe)
1062 { 1112 {
1063 LOG (GNUNET_ERROR_TYPE_DEBUG, 1113 LOG (GNUNET_ERROR_TYPE_DEBUG,
1064 "Could not create queue entry to release reserve\n"); 1114 "Could not create queue entry to release reserve\n");
@@ -1068,10 +1118,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1068 gettext_noop 1118 gettext_noop
1069 ("# RELEASE RESERVE requests executed"), 1, 1119 ("# RELEASE RESERVE requests executed"), 1,
1070 GNUNET_NO); 1120 GNUNET_NO);
1071 rrm = (struct ReleaseReserveMessage *) &qe[1];
1072 rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1073 rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1074 rrm->rid = htonl (rid);
1075 process_queue (h); 1121 process_queue (h);
1076 return qe; 1122 return qe;
1077} 1123}
@@ -1087,7 +1133,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1087 * @param queue_priority ranking of this request in the priority queue 1133 * @param queue_priority ranking of this request in the priority queue
1088 * @param max_queue_size at what queue size should this request be dropped 1134 * @param max_queue_size at what queue size should this request be dropped
1089 * (if other requests of higher priority are in the queue) 1135 * (if other requests of higher priority are in the queue)
1090 * @param timeout how long to wait at most for a response
1091 * @param cont continuation to call when done 1136 * @param cont continuation to call when done
1092 * @param cont_cls closure for @a cont 1137 * @param cont_cls closure for @a cont
1093 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1138 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1101,26 +1146,36 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1101 struct GNUNET_TIME_Absolute expiration, 1146 struct GNUNET_TIME_Absolute expiration,
1102 unsigned int queue_priority, 1147 unsigned int queue_priority,
1103 unsigned int max_queue_size, 1148 unsigned int max_queue_size,
1104 struct GNUNET_TIME_Relative timeout,
1105 GNUNET_DATASTORE_ContinuationWithStatus cont, 1149 GNUNET_DATASTORE_ContinuationWithStatus cont,
1106 void *cont_cls) 1150 void *cont_cls)
1107{ 1151{
1108 struct GNUNET_DATASTORE_QueueEntry *qe; 1152 struct GNUNET_DATASTORE_QueueEntry *qe;
1153 struct GNUNET_MQ_Envelope *env;
1109 struct UpdateMessage *um; 1154 struct UpdateMessage *um;
1110 union QueueContext qc; 1155 union QueueContext qc;
1111 1156
1112 if (cont == NULL) 1157 if (NULL == cont)
1113 cont = &drop_status_cont; 1158 cont = &drop_status_cont;
1114 LOG (GNUNET_ERROR_TYPE_DEBUG, 1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1115 "Asked to update entry %llu raising priority by %u and expiration to %s\n", 1160 "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1116 uid, 1161 uid,
1117 (unsigned int) priority, 1162 (unsigned int) priority,
1118 GNUNET_STRINGS_absolute_time_to_string (expiration)); 1163 GNUNET_STRINGS_absolute_time_to_string (expiration));
1164 env = GNUNET_MQ_msg (um,
1165 GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1166 um->priority = htonl (priority);
1167 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1168 um->uid = GNUNET_htonll (uid);
1169
1119 qc.sc.cont = cont; 1170 qc.sc.cont = cont;
1120 qc.sc.cont_cls = cont_cls; 1171 qc.sc.cont_cls = cont_cls;
1121 qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, 1172 qe = make_queue_entry (h,
1122 max_queue_size, timeout, &process_status_message, &qc); 1173 env,
1123 if (qe == NULL) 1174 queue_priority,
1175 max_queue_size,
1176 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1177 &qc);
1178 if (NULL == qe)
1124 { 1179 {
1125 LOG (GNUNET_ERROR_TYPE_DEBUG, 1180 LOG (GNUNET_ERROR_TYPE_DEBUG,
1126 "Could not create queue entry for UPDATE\n"); 1181 "Could not create queue entry for UPDATE\n");
@@ -1129,12 +1184,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1129 GNUNET_STATISTICS_update (h->stats, 1184 GNUNET_STATISTICS_update (h->stats,
1130 gettext_noop ("# UPDATE requests executed"), 1, 1185 gettext_noop ("# UPDATE requests executed"), 1,
1131 GNUNET_NO); 1186 GNUNET_NO);
1132 um = (struct UpdateMessage *) &qe[1];
1133 um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1134 um->header.size = htons (sizeof (struct UpdateMessage));
1135 um->priority = htonl (priority);
1136 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1137 um->uid = GNUNET_htonll (uid);
1138 process_queue (h); 1187 process_queue (h);
1139 return qe; 1188 return qe;
1140} 1189}
@@ -1154,7 +1203,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1154 * @param queue_priority ranking of this request in the priority queue 1203 * @param queue_priority ranking of this request in the priority queue
1155 * @param max_queue_size at what queue size should this request be dropped 1204 * @param max_queue_size at what queue size should this request be dropped
1156 * (if other requests of higher priority are in the queue) 1205 * (if other requests of higher priority are in the queue)
1157 * @param timeout how long to wait at most for a response
1158 * @param cont continuation to call when done 1206 * @param cont continuation to call when done
1159 * @param cont_cls closure for @a cont 1207 * @param cont_cls closure for @a cont
1160 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1208 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1168,161 +1216,64 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1168 const void *data, 1216 const void *data,
1169 unsigned int queue_priority, 1217 unsigned int queue_priority,
1170 unsigned int max_queue_size, 1218 unsigned int max_queue_size,
1171 struct GNUNET_TIME_Relative timeout,
1172 GNUNET_DATASTORE_ContinuationWithStatus cont, 1219 GNUNET_DATASTORE_ContinuationWithStatus cont,
1173 void *cont_cls) 1220 void *cont_cls)
1174{ 1221{
1175 struct GNUNET_DATASTORE_QueueEntry *qe; 1222 struct GNUNET_DATASTORE_QueueEntry *qe;
1176 struct DataMessage *dm; 1223 struct DataMessage *dm;
1177 size_t msize; 1224 struct GNUNET_MQ_Envelope *env;
1178 union QueueContext qc; 1225 union QueueContext qc;
1179 1226
1180 if (cont == NULL) 1227 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1228 {
1229 GNUNET_break (0);
1230 return NULL;
1231 }
1232 if (NULL == cont)
1181 cont = &drop_status_cont; 1233 cont = &drop_status_cont;
1182 LOG (GNUNET_ERROR_TYPE_DEBUG, 1234 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "Asked to remove %u bytes under key `%s'\n", 1235 "Asked to remove %u bytes under key `%s'\n",
1184 size, 1236 size,
1185 GNUNET_h2s (key)); 1237 GNUNET_h2s (key));
1238 env = GNUNET_MQ_msg_extra (dm,
1239 size,
1240 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1241 dm->rid = htonl (0);
1242 dm->size = htonl (size);
1243 dm->type = htonl (0);
1244 dm->priority = htonl (0);
1245 dm->anonymity = htonl (0);
1246 dm->uid = GNUNET_htonll (0);
1247 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1248 dm->key = *key;
1249 memcpy (&dm[1],
1250 data,
1251 size);
1252
1186 qc.sc.cont = cont; 1253 qc.sc.cont = cont;
1187 qc.sc.cont_cls = cont_cls; 1254 qc.sc.cont_cls = cont_cls;
1188 msize = sizeof (struct DataMessage) + size; 1255
1189 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1190 qe = make_queue_entry (h, 1256 qe = make_queue_entry (h,
1191 msize, 1257 env,
1192 queue_priority, 1258 queue_priority,
1193 max_queue_size, 1259 max_queue_size,
1194 timeout, 1260 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1195 &process_status_message,
1196 &qc); 1261 &qc);
1197 if (qe == NULL) 1262 if (NULL == qe)
1198 { 1263 {
1199 LOG (GNUNET_ERROR_TYPE_DEBUG, 1264 LOG (GNUNET_ERROR_TYPE_DEBUG,
1200 "Could not create queue entry for REMOVE\n"); 1265 "Could not create queue entry for REMOVE\n");
1201 return NULL; 1266 return NULL;
1202 } 1267 }
1203 GNUNET_STATISTICS_update (h->stats, 1268 GNUNET_STATISTICS_update (h->stats,
1204 gettext_noop ("# REMOVE requests executed"), 1, 1269 gettext_noop ("# REMOVE requests executed"),
1270 1,
1205 GNUNET_NO); 1271 GNUNET_NO);
1206 dm = (struct DataMessage *) &qe[1];
1207 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1208 dm->header.size = htons (msize);
1209 dm->rid = htonl (0);
1210 dm->size = htonl (size);
1211 dm->type = htonl (0);
1212 dm->priority = htonl (0);
1213 dm->anonymity = htonl (0);
1214 dm->uid = GNUNET_htonll (0);
1215 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1216 dm->key = *key;
1217 memcpy (&dm[1], data, size);
1218 process_queue (h); 1272 process_queue (h);
1219 return qe; 1273 return qe;
1220} 1274}
1221 1275
1222 1276
1223/**
1224 * Type of a function to call when we receive a message
1225 * from the service.
1226 *
1227 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
1228 * @param msg message received, NULL on timeout or fatal error
1229 */
1230static void
1231process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1232{
1233 struct GNUNET_DATASTORE_Handle *h = cls;
1234 struct GNUNET_DATASTORE_QueueEntry *qe;
1235 struct ResultContext rc;
1236 const struct DataMessage *dm;
1237 int was_transmitted;
1238
1239 if (NULL == msg)
1240 {
1241 qe = h->queue_head;
1242 GNUNET_assert (NULL != qe);
1243 rc = qe->qc.rc;
1244 was_transmitted = qe->was_transmitted;
1245 free_queue_entry (qe);
1246 if (GNUNET_YES == was_transmitted)
1247 {
1248 LOG (GNUNET_ERROR_TYPE_DEBUG,
1249 "Failed to receive response from database.\n");
1250 do_disconnect (h);
1251 }
1252 else
1253 {
1254 process_queue (h);
1255 }
1256 if (NULL != rc.proc)
1257 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1258 0);
1259 return;
1260 }
1261 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1262 {
1263 GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1264 qe = h->queue_head;
1265 rc = qe->qc.rc;
1266 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1267 free_queue_entry (qe);
1268 LOG (GNUNET_ERROR_TYPE_DEBUG,
1269 "Received end of result set, new queue size is %u\n", h->queue_size);
1270 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1271 h->result_count = 0;
1272 process_queue (h);
1273 if (NULL != rc.proc)
1274 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1275 0);
1276 return;
1277 }
1278 qe = h->queue_head;
1279 GNUNET_assert (NULL != qe);
1280 rc = qe->qc.rc;
1281 if (GNUNET_YES != qe->was_transmitted)
1282 {
1283 GNUNET_break (0);
1284 free_queue_entry (qe);
1285 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1286 do_disconnect (h);
1287 if (rc.proc != NULL)
1288 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1289 0);
1290 return;
1291 }
1292 if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1293 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1294 (ntohs (msg->size) !=
1295 sizeof (struct DataMessage) +
1296 ntohl (((const struct DataMessage *) msg)->size)))
1297 {
1298 GNUNET_break (0);
1299 free_queue_entry (qe);
1300 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1301 do_disconnect (h);
1302 if (rc.proc != NULL)
1303 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1304 0);
1305 return;
1306 }
1307#if INSANE_STATISTICS
1308 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1309 GNUNET_NO);
1310#endif
1311 dm = (const struct DataMessage *) msg;
1312 LOG (GNUNET_ERROR_TYPE_DEBUG,
1313 "Received result %llu with type %u and size %u with key %s\n",
1314 (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1315 ntohl (dm->size), GNUNET_h2s (&dm->key));
1316 free_queue_entry (qe);
1317 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1318 process_queue (h);
1319 if (rc.proc != NULL)
1320 rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1321 ntohl (dm->priority), ntohl (dm->anonymity),
1322 GNUNET_TIME_absolute_ntoh (dm->expiration),
1323 GNUNET_ntohll (dm->uid));
1324}
1325
1326 1277
1327/** 1278/**
1328 * Get a random value from the datastore for content replication. 1279 * Get a random value from the datastore for content replication.
@@ -1335,7 +1286,6 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1335 * @param queue_priority ranking of this request in the priority queue 1286 * @param queue_priority ranking of this request in the priority queue
1336 * @param max_queue_size at what queue size should this request be dropped 1287 * @param max_queue_size at what queue size should this request be dropped
1337 * (if other requests of higher priority are in the queue) 1288 * (if other requests of higher priority are in the queue)
1338 * @param timeout how long to wait at most for a response
1339 * @param proc function to call on a random value; it 1289 * @param proc function to call on a random value; it
1340 * will be called once with a value (if available) 1290 * will be called once with a value (if available)
1341 * and always once with a value of NULL. 1291 * and always once with a value of NULL.
@@ -1347,23 +1297,27 @@ struct GNUNET_DATASTORE_QueueEntry *
1347GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, 1297GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1348 unsigned int queue_priority, 1298 unsigned int queue_priority,
1349 unsigned int max_queue_size, 1299 unsigned int max_queue_size,
1350 struct GNUNET_TIME_Relative timeout,
1351 GNUNET_DATASTORE_DatumProcessor proc, 1300 GNUNET_DATASTORE_DatumProcessor proc,
1352 void *proc_cls) 1301 void *proc_cls)
1353{ 1302{
1354 struct GNUNET_DATASTORE_QueueEntry *qe; 1303 struct GNUNET_DATASTORE_QueueEntry *qe;
1304 struct GNUNET_MQ_Envelope *env;
1355 struct GNUNET_MessageHeader *m; 1305 struct GNUNET_MessageHeader *m;
1356 union QueueContext qc; 1306 union QueueContext qc;
1357 1307
1358 GNUNET_assert (NULL != proc); 1308 GNUNET_assert (NULL != proc);
1359 LOG (GNUNET_ERROR_TYPE_DEBUG, 1309 LOG (GNUNET_ERROR_TYPE_DEBUG,
1360 "Asked to get replication entry in %s\n", 1310 "Asked to get replication entry\n");
1361 GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); 1311 env = GNUNET_MQ_msg (m,
1312 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1362 qc.rc.proc = proc; 1313 qc.rc.proc = proc;
1363 qc.rc.proc_cls = proc_cls; 1314 qc.rc.proc_cls = proc_cls;
1364 qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), 1315 qe = make_queue_entry (h,
1365 queue_priority, max_queue_size, timeout, 1316 env,
1366 &process_result_message, &qc); 1317 queue_priority,
1318 max_queue_size,
1319 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1320 &qc);
1367 if (NULL == qe) 1321 if (NULL == qe)
1368 { 1322 {
1369 LOG (GNUNET_ERROR_TYPE_DEBUG, 1323 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1374,9 +1328,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1374 gettext_noop 1328 gettext_noop
1375 ("# GET REPLICATION requests executed"), 1, 1329 ("# GET REPLICATION requests executed"), 1,
1376 GNUNET_NO); 1330 GNUNET_NO);
1377 m = (struct GNUNET_MessageHeader *) &qe[1];
1378 m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1379 m->size = htons (sizeof (struct GNUNET_MessageHeader));
1380 process_queue (h); 1331 process_queue (h);
1381 return qe; 1332 return qe;
1382} 1333}
@@ -1393,7 +1344,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1393 * @param queue_priority ranking of this request in the priority queue 1344 * @param queue_priority ranking of this request in the priority queue
1394 * @param max_queue_size at what queue size should this request be dropped 1345 * @param max_queue_size at what queue size should this request be dropped
1395 * (if other requests of higher priority are in the queue) 1346 * (if other requests of higher priority are in the queue)
1396 * @param timeout how long to wait at most for a response
1397 * @param type allowed type for the operation (never zero) 1347 * @param type allowed type for the operation (never zero)
1398 * @param proc function to call on a random value; it 1348 * @param proc function to call on a random value; it
1399 * will be called once with a value (if available) 1349 * will be called once with a value (if available)
@@ -1407,31 +1357,32 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1407 uint64_t offset, 1357 uint64_t offset,
1408 unsigned int queue_priority, 1358 unsigned int queue_priority,
1409 unsigned int max_queue_size, 1359 unsigned int max_queue_size,
1410 struct GNUNET_TIME_Relative timeout,
1411 enum GNUNET_BLOCK_Type type, 1360 enum GNUNET_BLOCK_Type type,
1412 GNUNET_DATASTORE_DatumProcessor proc, 1361 GNUNET_DATASTORE_DatumProcessor proc,
1413 void *proc_cls) 1362 void *proc_cls)
1414{ 1363{
1415 struct GNUNET_DATASTORE_QueueEntry *qe; 1364 struct GNUNET_DATASTORE_QueueEntry *qe;
1365 struct GNUNET_MQ_Envelope *env;
1416 struct GetZeroAnonymityMessage *m; 1366 struct GetZeroAnonymityMessage *m;
1417 union QueueContext qc; 1367 union QueueContext qc;
1418 1368
1419 GNUNET_assert (NULL != proc); 1369 GNUNET_assert (NULL != proc);
1420 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1370 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1421 LOG (GNUNET_ERROR_TYPE_DEBUG, 1371 LOG (GNUNET_ERROR_TYPE_DEBUG,
1422 "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", 1372 "Asked to get %llu-th zero-anonymity entry of type %d\n",
1423 (unsigned long long) offset, 1373 (unsigned long long) offset,
1424 type, 1374 type);
1425 GNUNET_STRINGS_relative_time_to_string (timeout, 1375 env = GNUNET_MQ_msg (m,
1426 GNUNET_YES)); 1376 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1377 m->type = htonl ((uint32_t) type);
1378 m->offset = GNUNET_htonll (offset);
1427 qc.rc.proc = proc; 1379 qc.rc.proc = proc;
1428 qc.rc.proc_cls = proc_cls; 1380 qc.rc.proc_cls = proc_cls;
1429 qe = make_queue_entry (h, 1381 qe = make_queue_entry (h,
1430 sizeof (struct GetZeroAnonymityMessage), 1382 env,
1431 queue_priority, 1383 queue_priority,
1432 max_queue_size, 1384 max_queue_size,
1433 timeout, 1385 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1434 &process_result_message,
1435 &qc); 1386 &qc);
1436 if (NULL == qe) 1387 if (NULL == qe)
1437 { 1388 {
@@ -1443,11 +1394,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1443 gettext_noop 1394 gettext_noop
1444 ("# GET ZERO ANONYMITY requests executed"), 1, 1395 ("# GET ZERO ANONYMITY requests executed"), 1,
1445 GNUNET_NO); 1396 GNUNET_NO);
1446 m = (struct GetZeroAnonymityMessage *) &qe[1];
1447 m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1448 m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1449 m->type = htonl ((uint32_t) type);
1450 m->offset = GNUNET_htonll (offset);
1451 process_queue (h); 1397 process_queue (h);
1452 return qe; 1398 return qe;
1453} 1399}
@@ -1467,7 +1413,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1467 * @param queue_priority ranking of this request in the priority queue 1413 * @param queue_priority ranking of this request in the priority queue
1468 * @param max_queue_size at what queue size should this request be dropped 1414 * @param max_queue_size at what queue size should this request be dropped
1469 * (if other requests of higher priority are in the queue) 1415 * (if other requests of higher priority are in the queue)
1470 * @param timeout how long to wait at most for a response
1471 * @param proc function to call on each matching value; 1416 * @param proc function to call on each matching value;
1472 * will be called once with a NULL value at the end 1417 * will be called once with a NULL value at the end
1473 * @param proc_cls closure for @a proc 1418 * @param proc_cls closure for @a proc
@@ -1481,28 +1426,44 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1481 enum GNUNET_BLOCK_Type type, 1426 enum GNUNET_BLOCK_Type type,
1482 unsigned int queue_priority, 1427 unsigned int queue_priority,
1483 unsigned int max_queue_size, 1428 unsigned int max_queue_size,
1484 struct GNUNET_TIME_Relative timeout,
1485 GNUNET_DATASTORE_DatumProcessor proc, 1429 GNUNET_DATASTORE_DatumProcessor proc,
1486 void *proc_cls) 1430 void *proc_cls)
1487{ 1431{
1488 struct GNUNET_DATASTORE_QueueEntry *qe; 1432 struct GNUNET_DATASTORE_QueueEntry *qe;
1433 struct GNUNET_MQ_Envelope *env;
1434 struct GetKeyMessage *gkm;
1489 struct GetMessage *gm; 1435 struct GetMessage *gm;
1490 union QueueContext qc; 1436 union QueueContext qc;
1491 1437
1492 GNUNET_assert (NULL != proc); 1438 GNUNET_assert (NULL != proc);
1493 LOG (GNUNET_ERROR_TYPE_DEBUG, 1439 LOG (GNUNET_ERROR_TYPE_DEBUG,
1494 "Asked to look for data of type %u under key `%s'\n", 1440 "Asked to look for data of type %u under key `%s'\n",
1495 (unsigned int) type, GNUNET_h2s (key)); 1441 (unsigned int) type,
1442 GNUNET_h2s (key));
1443 if (NULL == key)
1444 {
1445 env = GNUNET_MQ_msg (gm,
1446 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1447 gm->type = htonl (type);
1448 gm->offset = GNUNET_htonll (offset);
1449 }
1450 else
1451 {
1452 env = GNUNET_MQ_msg (gkm,
1453 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1454 gkm->type = htonl (type);
1455 gkm->offset = GNUNET_htonll (offset);
1456 gkm->key = *key;
1457 }
1496 qc.rc.proc = proc; 1458 qc.rc.proc = proc;
1497 qc.rc.proc_cls = proc_cls; 1459 qc.rc.proc_cls = proc_cls;
1498 qe = make_queue_entry (h, 1460 qe = make_queue_entry (h,
1499 sizeof (struct GetMessage), 1461 env,
1500 queue_priority, 1462 queue_priority,
1501 max_queue_size, 1463 max_queue_size,
1502 timeout, 1464 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1503 &process_result_message,
1504 &qc); 1465 &qc);
1505 if (qe == NULL) 1466 if (NULL == qe)
1506 { 1467 {
1507 LOG (GNUNET_ERROR_TYPE_DEBUG, 1468 LOG (GNUNET_ERROR_TYPE_DEBUG,
1508 "Could not queue request for `%s'\n", 1469 "Could not queue request for `%s'\n",
@@ -1515,20 +1476,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1515 1, 1476 1,
1516 GNUNET_NO); 1477 GNUNET_NO);
1517#endif 1478#endif
1518 gm = (struct GetMessage *) &qe[1];
1519 gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1520 gm->type = htonl (type);
1521 gm->offset = GNUNET_htonll (offset);
1522 if (key != NULL)
1523 {
1524 gm->header.size = htons (sizeof (struct GetMessage));
1525 gm->key = *key;
1526 }
1527 else
1528 {
1529 gm->header.size =
1530 htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1531 }
1532 process_queue (h); 1479 process_queue (h);
1533 return qe; 1480 return qe;
1534} 1481}
@@ -1543,16 +1490,14 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1543void 1490void
1544GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) 1491GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1545{ 1492{
1546 struct GNUNET_DATASTORE_Handle *h; 1493 struct GNUNET_DATASTORE_Handle *h = qe->h;
1547 1494
1548 GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1549 h = qe->h;
1550 LOG (GNUNET_ERROR_TYPE_DEBUG, 1495 LOG (GNUNET_ERROR_TYPE_DEBUG,
1551 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1496 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1552 qe, 1497 qe,
1553 qe->was_transmitted, 1498 NULL == qe->env,
1554 h->queue_head == qe); 1499 h->queue_head == qe);
1555 if (GNUNET_YES == qe->was_transmitted) 1500 if (NULL == qe->env)
1556 { 1501 {
1557 free_queue_entry (qe); 1502 free_queue_entry (qe);
1558 h->skip_next_messages++; 1503 h->skip_next_messages++;