aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c1295
1 files changed, 651 insertions, 644 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 7069975e4..b10c43944 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -31,7 +31,7 @@
31#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
32#include "datastore.h" 32#include "datastore.h"
33 33
34#define LOG(kind, ...) GNUNET_log_from(kind, "datastore-api", __VA_ARGS__) 34#define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35 35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES 36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 37
@@ -51,7 +51,8 @@
51/** 51/**
52 * Context for processing status messages. 52 * Context for processing status messages.
53 */ 53 */
54struct StatusContext { 54struct StatusContext
55{
55 /** 56 /**
56 * Continuation to call with the status. 57 * Continuation to call with the status.
57 */ 58 */
@@ -67,7 +68,8 @@ struct StatusContext {
67/** 68/**
68 * Context for processing result messages. 69 * Context for processing result messages.
69 */ 70 */
70struct ResultContext { 71struct ResultContext
72{
71 /** 73 /**
72 * Function to call with the result. 74 * Function to call with the result.
73 */ 75 */
@@ -83,7 +85,8 @@ struct ResultContext {
83/** 85/**
84 * Context for a queue operation. 86 * Context for a queue operation.
85 */ 87 */
86union QueueContext { 88union QueueContext
89{
87 struct StatusContext sc; 90 struct StatusContext sc;
88 91
89 struct ResultContext rc; 92 struct ResultContext rc;
@@ -93,7 +96,8 @@ union QueueContext {
93/** 96/**
94 * Entry in our priority queue. 97 * Entry in our priority queue.
95 */ 98 */
96struct GNUNET_DATASTORE_QueueEntry { 99struct GNUNET_DATASTORE_QueueEntry
100{
97 /** 101 /**
98 * This is a linked list. 102 * This is a linked list.
99 */ 103 */
@@ -157,7 +161,8 @@ struct GNUNET_DATASTORE_QueueEntry {
157/** 161/**
158 * Handle to the datastore service. 162 * Handle to the datastore service.
159 */ 163 */
160struct GNUNET_DATASTORE_Handle { 164struct GNUNET_DATASTORE_Handle
165{
161 /** 166 /**
162 * Our configuration. 167 * Our configuration.
163 */ 168 */
@@ -219,7 +224,7 @@ struct GNUNET_DATASTORE_Handle {
219 * @param cls the `struct GNUNET_DATASTORE_Handle` 224 * @param cls the `struct GNUNET_DATASTORE_Handle`
220 */ 225 */
221static void 226static void
222try_reconnect(void *cls); 227try_reconnect (void *cls);
223 228
224 229
225/** 230/**
@@ -229,20 +234,20 @@ try_reconnect(void *cls);
229 * @param h handle to datastore to disconnect and reconnect 234 * @param h handle to datastore to disconnect and reconnect
230 */ 235 */
231static void 236static void
232do_disconnect(struct GNUNET_DATASTORE_Handle *h) 237do_disconnect (struct GNUNET_DATASTORE_Handle *h)
233{ 238{
234 if (NULL == h->mq) 239 if (NULL == h->mq)
235 { 240 {
236 GNUNET_break(0); 241 GNUNET_break (0);
237 return; 242 return;
238 } 243 }
239 GNUNET_MQ_destroy(h->mq); 244 GNUNET_MQ_destroy (h->mq);
240 h->mq = NULL; 245 h->mq = NULL;
241 h->skip_next_messages = 0; 246 h->skip_next_messages = 0;
242 h->reconnect_task 247 h->reconnect_task
243 = GNUNET_SCHEDULER_add_delayed(h->retry_time, 248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
244 &try_reconnect, 249 &try_reconnect,
245 h); 250 h);
246} 251}
247 252
248 253
@@ -254,19 +259,19 @@ do_disconnect(struct GNUNET_DATASTORE_Handle *h)
254 * @param qe entry to free. 259 * @param qe entry to free.
255 */ 260 */
256static void 261static void
257free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe) 262free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
258{ 263{
259 struct GNUNET_DATASTORE_Handle *h = qe->h; 264 struct GNUNET_DATASTORE_Handle *h = qe->h;
260 265
261 GNUNET_CONTAINER_DLL_remove(h->queue_head, 266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
262 h->queue_tail, 267 h->queue_tail,
263 qe); 268 qe);
264 h->queue_size--; 269 h->queue_size--;
265 if (NULL != qe->env) 270 if (NULL != qe->env)
266 GNUNET_MQ_discard(qe->env); 271 GNUNET_MQ_discard (qe->env);
267 if (NULL != qe->delay_warn_task) 272 if (NULL != qe->delay_warn_task)
268 GNUNET_SCHEDULER_cancel(qe->delay_warn_task); 273 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
269 GNUNET_free(qe); 274 GNUNET_free (qe);
270} 275}
271 276
272 277
@@ -276,20 +281,20 @@ free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
276 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is 281 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
277 */ 282 */
278static void 283static void
279delay_warning(void *cls) 284delay_warning (void *cls)
280{ 285{
281 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 286 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
282 287
283 qe->delay_warn_task = NULL; 288 qe->delay_warn_task = NULL;
284 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 289 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
285 "Request %p of type %u at head of datastore queue for more than %s\n", 290 "Request %p of type %u at head of datastore queue for more than %s\n",
286 qe, 291 qe,
287 (unsigned int)qe->response_type, 292 (unsigned int) qe->response_type,
288 GNUNET_STRINGS_relative_time_to_string(DELAY_WARN_TIMEOUT, 293 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
289 GNUNET_YES)); 294 GNUNET_YES));
290 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT, 295 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
291 &delay_warning, 296 &delay_warning,
292 qe); 297 qe);
293} 298}
294 299
295 300
@@ -300,59 +305,59 @@ delay_warning(void *cls)
300 * @param error error code 305 * @param error error code
301 */ 306 */
302static void 307static void
303mq_error_handler(void *cls, 308mq_error_handler (void *cls,
304 enum GNUNET_MQ_Error error) 309 enum GNUNET_MQ_Error error)
305{ 310{
306 struct GNUNET_DATASTORE_Handle *h = cls; 311 struct GNUNET_DATASTORE_Handle *h = cls;
307 struct GNUNET_DATASTORE_QueueEntry *qe; 312 struct GNUNET_DATASTORE_QueueEntry *qe;
308 313
309 LOG(GNUNET_ERROR_TYPE_DEBUG, 314 LOG (GNUNET_ERROR_TYPE_DEBUG,
310 "MQ error, reconnecting to DATASTORE\n"); 315 "MQ error, reconnecting to DATASTORE\n");
311 do_disconnect(h); 316 do_disconnect (h);
312 qe = h->queue_head; 317 qe = h->queue_head;
313 if (NULL == qe) 318 if (NULL == qe)
314 return; 319 return;
315 if (NULL != qe->delay_warn_task) 320 if (NULL != qe->delay_warn_task)
316 { 321 {
317 GNUNET_SCHEDULER_cancel(qe->delay_warn_task); 322 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
318 qe->delay_warn_task = NULL; 323 qe->delay_warn_task = NULL;
319 } 324 }
320 if (NULL == qe->env) 325 if (NULL == qe->env)
326 {
327 union QueueContext qc = qe->qc;
328 uint16_t rt = qe->response_type;
329
330 LOG (GNUNET_ERROR_TYPE_DEBUG,
331 "Failed to receive response from database.\n");
332 free_queue_entry (qe);
333 switch (rt)
321 { 334 {
322 union QueueContext qc = qe->qc; 335 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
323 uint16_t rt = qe->response_type; 336 if (NULL != qc.sc.cont)
324 337 qc.sc.cont (qc.sc.cont_cls,
325 LOG(GNUNET_ERROR_TYPE_DEBUG, 338 GNUNET_SYSERR,
326 "Failed to receive response from database.\n"); 339 GNUNET_TIME_UNIT_ZERO_ABS,
327 free_queue_entry(qe); 340 _ ("DATASTORE disconnected"));
328 switch (rt) 341 break;
329 { 342
330 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: 343 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
331 if (NULL != qc.sc.cont) 344 if (NULL != qc.rc.proc)
332 qc.sc.cont(qc.sc.cont_cls, 345 qc.rc.proc (qc.rc.proc_cls,
333 GNUNET_SYSERR, 346 NULL,
334 GNUNET_TIME_UNIT_ZERO_ABS, 347 0,
335 _("DATASTORE disconnected")); 348 NULL,
336 break; 349 0,
337 350 0,
338 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: 351 0,
339 if (NULL != qc.rc.proc) 352 0,
340 qc.rc.proc(qc.rc.proc_cls, 353 GNUNET_TIME_UNIT_ZERO_ABS,
341 NULL, 354 0);
342 0, 355 break;
343 NULL, 356
344 0, 357 default:
345 0, 358 GNUNET_break (0);
346 0,
347 0,
348 GNUNET_TIME_UNIT_ZERO_ABS,
349 0);
350 break;
351
352 default:
353 GNUNET_break(0);
354 }
355 } 359 }
360 }
356} 361}
357 362
358 363
@@ -363,22 +368,22 @@ mq_error_handler(void *cls,
363 * @return handle to use to access the service 368 * @return handle to use to access the service
364 */ 369 */
365struct GNUNET_DATASTORE_Handle * 370struct GNUNET_DATASTORE_Handle *
366GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg) 371GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
367{ 372{
368 struct GNUNET_DATASTORE_Handle *h; 373 struct GNUNET_DATASTORE_Handle *h;
369 374
370 LOG(GNUNET_ERROR_TYPE_DEBUG, 375 LOG (GNUNET_ERROR_TYPE_DEBUG,
371 "Establishing DATASTORE connection!\n"); 376 "Establishing DATASTORE connection!\n");
372 h = GNUNET_new(struct GNUNET_DATASTORE_Handle); 377 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
373 h->cfg = cfg; 378 h->cfg = cfg;
374 try_reconnect(h); 379 try_reconnect (h);
375 if (NULL == h->mq) 380 if (NULL == h->mq)
376 { 381 {
377 GNUNET_free(h); 382 GNUNET_free (h);
378 return NULL; 383 return NULL;
379 } 384 }
380 h->stats = GNUNET_STATISTICS_create("datastore-api", 385 h->stats = GNUNET_STATISTICS_create ("datastore-api",
381 cfg); 386 cfg);
382 return h; 387 return h;
383} 388}
384 389
@@ -390,14 +395,14 @@ GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
390 * @param cls the datastore handle 395 * @param cls the datastore handle
391 */ 396 */
392static void 397static void
393disconnect_after_drop(void *cls) 398disconnect_after_drop (void *cls)
394{ 399{
395 struct GNUNET_DATASTORE_Handle *h = cls; 400 struct GNUNET_DATASTORE_Handle *h = cls;
396 401
397 LOG(GNUNET_ERROR_TYPE_DEBUG, 402 LOG (GNUNET_ERROR_TYPE_DEBUG,
398 "Drop sent, disconnecting\n"); 403 "Drop sent, disconnecting\n");
399 GNUNET_DATASTORE_disconnect(h, 404 GNUNET_DATASTORE_disconnect (h,
400 GNUNET_NO); 405 GNUNET_NO);
401} 406}
402 407
403 408
@@ -408,15 +413,15 @@ disconnect_after_drop(void *cls)
408 * @param error error code 413 * @param error error code
409 */ 414 */
410static void 415static void
411disconnect_on_mq_error(void *cls, 416disconnect_on_mq_error (void *cls,
412 enum GNUNET_MQ_Error error) 417 enum GNUNET_MQ_Error error)
413{ 418{
414 struct GNUNET_DATASTORE_Handle *h = cls; 419 struct GNUNET_DATASTORE_Handle *h = cls;
415 420
416 LOG(GNUNET_ERROR_TYPE_ERROR, 421 LOG (GNUNET_ERROR_TYPE_ERROR,
417 "Failed to ask datastore to drop tables\n"); 422 "Failed to ask datastore to drop tables\n");
418 GNUNET_DATASTORE_disconnect(h, 423 GNUNET_DATASTORE_disconnect (h,
419 GNUNET_NO); 424 GNUNET_NO);
420} 425}
421 426
422 427
@@ -428,84 +433,84 @@ disconnect_on_mq_error(void *cls,
428 * @param drop set to #GNUNET_YES to delete all data in datastore (!) 433 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
429 */ 434 */
430void 435void
431GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, 436GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
432 int drop) 437 int drop)
433{ 438{
434 struct GNUNET_DATASTORE_QueueEntry *qe; 439 struct GNUNET_DATASTORE_QueueEntry *qe;
435 440
436 LOG(GNUNET_ERROR_TYPE_DEBUG, 441 LOG (GNUNET_ERROR_TYPE_DEBUG,
437 "Datastore disconnect\n"); 442 "Datastore disconnect\n");
438 if (NULL != h->mq) 443 if (NULL != h->mq)
439 { 444 {
440 GNUNET_MQ_destroy(h->mq); 445 GNUNET_MQ_destroy (h->mq);
441 h->mq = NULL; 446 h->mq = NULL;
442 } 447 }
443 if (NULL != h->reconnect_task) 448 if (NULL != h->reconnect_task)
444 { 449 {
445 GNUNET_SCHEDULER_cancel(h->reconnect_task); 450 GNUNET_SCHEDULER_cancel (h->reconnect_task);
446 h->reconnect_task = NULL; 451 h->reconnect_task = NULL;
447 } 452 }
448 while (NULL != (qe = h->queue_head)) 453 while (NULL != (qe = h->queue_head))
454 {
455 switch (qe->response_type)
449 { 456 {
450 switch (qe->response_type) 457 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
451 { 458 if (NULL != qe->qc.sc.cont)
452 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: 459 qe->qc.sc.cont (qe->qc.sc.cont_cls,
453 if (NULL != qe->qc.sc.cont) 460 GNUNET_SYSERR,
454 qe->qc.sc.cont(qe->qc.sc.cont_cls, 461 GNUNET_TIME_UNIT_ZERO_ABS,
455 GNUNET_SYSERR, 462 _ ("Disconnected from DATASTORE"));
456 GNUNET_TIME_UNIT_ZERO_ABS, 463 break;
457 _("Disconnected from DATASTORE")); 464
458 break; 465 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
459 466 if (NULL != qe->qc.rc.proc)
460 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: 467 qe->qc.rc.proc (qe->qc.rc.proc_cls,
461 if (NULL != qe->qc.rc.proc) 468 NULL,
462 qe->qc.rc.proc(qe->qc.rc.proc_cls, 469 0,
463 NULL, 470 NULL,
464 0, 471 0,
465 NULL, 472 0,
466 0, 473 0,
467 0, 474 0,
468 0, 475 GNUNET_TIME_UNIT_ZERO_ABS,
469 0, 476 0);
470 GNUNET_TIME_UNIT_ZERO_ABS, 477 break;
471 0); 478
472 break; 479 default:
473 480 GNUNET_break (0);
474 default:
475 GNUNET_break(0);
476 }
477 free_queue_entry(qe);
478 } 481 }
482 free_queue_entry (qe);
483 }
479 if (GNUNET_YES == drop) 484 if (GNUNET_YES == drop)
485 {
486 LOG (GNUNET_ERROR_TYPE_DEBUG,
487 "Re-connecting to issue DROP!\n");
488 GNUNET_assert (NULL == h->mq);
489 h->mq = GNUNET_CLIENT_connect (h->cfg,
490 "datastore",
491 NULL,
492 &disconnect_on_mq_error,
493 h);
494 if (NULL != h->mq)
480 { 495 {
481 LOG(GNUNET_ERROR_TYPE_DEBUG, 496 struct GNUNET_MessageHeader *hdr;
482 "Re-connecting to issue DROP!\n"); 497 struct GNUNET_MQ_Envelope *env;
483 GNUNET_assert(NULL == h->mq); 498
484 h->mq = GNUNET_CLIENT_connect(h->cfg, 499 env = GNUNET_MQ_msg (hdr,
485 "datastore", 500 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
486 NULL, 501 GNUNET_MQ_notify_sent (env,
487 &disconnect_on_mq_error, 502 &disconnect_after_drop,
488 h); 503 h);
489 if (NULL != h->mq) 504 GNUNET_MQ_send (h->mq,
490 { 505 env);
491 struct GNUNET_MessageHeader *hdr; 506 return;
492 struct GNUNET_MQ_Envelope *env;
493
494 env = GNUNET_MQ_msg(hdr,
495 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
496 GNUNET_MQ_notify_sent(env,
497 &disconnect_after_drop,
498 h);
499 GNUNET_MQ_send(h->mq,
500 env);
501 return;
502 }
503 GNUNET_break(0);
504 } 507 }
505 GNUNET_STATISTICS_destroy(h->stats, 508 GNUNET_break (0);
506 GNUNET_NO); 509 }
510 GNUNET_STATISTICS_destroy (h->stats,
511 GNUNET_NO);
507 h->stats = NULL; 512 h->stats = NULL;
508 GNUNET_free(h); 513 GNUNET_free (h);
509} 514}
510 515
511 516
@@ -525,12 +530,12 @@ GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h,
525 * @return NULL if the queue is full 530 * @return NULL if the queue is full
526 */ 531 */
527static struct GNUNET_DATASTORE_QueueEntry * 532static struct GNUNET_DATASTORE_QueueEntry *
528make_queue_entry(struct GNUNET_DATASTORE_Handle *h, 533make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
529 struct GNUNET_MQ_Envelope *env, 534 struct GNUNET_MQ_Envelope *env,
530 unsigned int queue_priority, 535 unsigned int queue_priority,
531 unsigned int max_queue_size, 536 unsigned int max_queue_size,
532 uint16_t expected_type, 537 uint16_t expected_type,
533 const union QueueContext *qc) 538 const union QueueContext *qc)
534{ 539{
535 struct GNUNET_DATASTORE_QueueEntry *qe; 540 struct GNUNET_DATASTORE_QueueEntry *qe;
536 struct GNUNET_DATASTORE_QueueEntry *pos; 541 struct GNUNET_DATASTORE_QueueEntry *pos;
@@ -538,32 +543,32 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h,
538 543
539 if ((NULL != h->queue_tail) && 544 if ((NULL != h->queue_tail) &&
540 (h->queue_tail->priority >= queue_priority)) 545 (h->queue_tail->priority >= queue_priority))
541 { 546 {
542 c = h->queue_size; 547 c = h->queue_size;
543 pos = NULL; 548 pos = NULL;
544 } 549 }
545 else 550 else
546 { 551 {
547 c = 0; 552 c = 0;
548 pos = h->queue_head; 553 pos = h->queue_head;
549 } 554 }
550 while ((NULL != pos) && 555 while ((NULL != pos) &&
551 (c < max_queue_size) && 556 (c < max_queue_size) &&
552 (pos->priority >= queue_priority)) 557 (pos->priority >= queue_priority))
553 { 558 {
554 c++; 559 c++;
555 pos = pos->next; 560 pos = pos->next;
556 } 561 }
557 if (c >= max_queue_size) 562 if (c >= max_queue_size)
558 { 563 {
559 GNUNET_STATISTICS_update(h->stats, 564 GNUNET_STATISTICS_update (h->stats,
560 gettext_noop("# queue overflows"), 565 gettext_noop ("# queue overflows"),
561 1, 566 1,
562 GNUNET_NO); 567 GNUNET_NO);
563 GNUNET_MQ_discard(env); 568 GNUNET_MQ_discard (env);
564 return NULL; 569 return NULL;
565 } 570 }
566 qe = GNUNET_new(struct GNUNET_DATASTORE_QueueEntry); 571 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
567 qe->h = h; 572 qe->h = h;
568 qe->env = env; 573 qe->env = env;
569 qe->response_type = expected_type; 574 qe->response_type = expected_type;
@@ -571,30 +576,30 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h,
571 qe->priority = queue_priority; 576 qe->priority = queue_priority;
572 qe->max_queue = max_queue_size; 577 qe->max_queue = max_queue_size;
573 if (NULL == pos) 578 if (NULL == pos)
574 { 579 {
575 /* append at the tail */ 580 /* append at the tail */
576 pos = h->queue_tail; 581 pos = h->queue_tail;
577 } 582 }
578 else 583 else
579 { 584 {
580 pos = pos->prev; 585 pos = pos->prev;
581 /* do not insert at HEAD if HEAD query was already 586 /* do not insert at HEAD if HEAD query was already
582 * transmitted and we are still receiving replies! */ 587 * transmitted and we are still receiving replies! */
583 if ((NULL == pos) && 588 if ((NULL == pos) &&
584 (NULL == h->queue_head->env)) 589 (NULL == h->queue_head->env))
585 pos = h->queue_head; 590 pos = h->queue_head;
586 } 591 }
587 c++; 592 c++;
588#if INSANE_STATISTICS 593#if INSANE_STATISTICS
589 GNUNET_STATISTICS_update(h->stats, 594 GNUNET_STATISTICS_update (h->stats,
590 gettext_noop("# queue entries created"), 595 gettext_noop ("# queue entries created"),
591 1, 596 1,
592 GNUNET_NO); 597 GNUNET_NO);
593#endif 598#endif
594 GNUNET_CONTAINER_DLL_insert_after(h->queue_head, 599 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
595 h->queue_tail, 600 h->queue_tail,
596 pos, 601 pos,
597 qe); 602 qe);
598 h->queue_size++; 603 h->queue_size++;
599 return qe; 604 return qe;
600} 605}
@@ -607,37 +612,37 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h,
607 * @param h handle to the datastore 612 * @param h handle to the datastore
608 */ 613 */
609static void 614static void
610process_queue(struct GNUNET_DATASTORE_Handle *h) 615process_queue (struct GNUNET_DATASTORE_Handle *h)
611{ 616{
612 struct GNUNET_DATASTORE_QueueEntry *qe; 617 struct GNUNET_DATASTORE_QueueEntry *qe;
613 618
614 if (NULL == (qe = h->queue_head)) 619 if (NULL == (qe = h->queue_head))
615 { 620 {
616 /* no entry in queue */ 621 /* no entry in queue */
617 LOG(GNUNET_ERROR_TYPE_DEBUG, 622 LOG (GNUNET_ERROR_TYPE_DEBUG,
618 "Queue empty\n"); 623 "Queue empty\n");
619 return; 624 return;
620 } 625 }
621 if (NULL == qe->env) 626 if (NULL == qe->env)
622 { 627 {
623 /* waiting for replies */ 628 /* waiting for replies */
624 LOG(GNUNET_ERROR_TYPE_DEBUG, 629 LOG (GNUNET_ERROR_TYPE_DEBUG,
625 "Head request already transmitted\n"); 630 "Head request already transmitted\n");
626 return; 631 return;
627 } 632 }
628 if (NULL == h->mq) 633 if (NULL == h->mq)
629 { 634 {
630 /* waiting for reconnect */ 635 /* waiting for reconnect */
631 LOG(GNUNET_ERROR_TYPE_DEBUG, 636 LOG (GNUNET_ERROR_TYPE_DEBUG,
632 "Not connected\n"); 637 "Not connected\n");
633 return; 638 return;
634 } 639 }
635 GNUNET_assert(NULL == qe->delay_warn_task); 640 GNUNET_assert (NULL == qe->delay_warn_task);
636 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT, 641 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
637 &delay_warning, 642 &delay_warning,
638 qe); 643 qe);
639 GNUNET_MQ_send(h->mq, 644 GNUNET_MQ_send (h->mq,
640 qe->env); 645 qe->env);
641 qe->env = NULL; 646 qe->env = NULL;
642} 647}
643 648
@@ -650,36 +655,36 @@ process_queue(struct GNUNET_DATASTORE_Handle *h)
650 * @return the queue entry 655 * @return the queue entry
651 */ 656 */
652static struct GNUNET_DATASTORE_QueueEntry * 657static struct GNUNET_DATASTORE_QueueEntry *
653get_queue_head(struct GNUNET_DATASTORE_Handle *h, 658get_queue_head (struct GNUNET_DATASTORE_Handle *h,
654 uint16_t response_type) 659 uint16_t response_type)
655{ 660{
656 struct GNUNET_DATASTORE_QueueEntry *qe; 661 struct GNUNET_DATASTORE_QueueEntry *qe;
657 662
658 if (h->skip_next_messages > 0) 663 if (h->skip_next_messages > 0)
659 { 664 {
660 h->skip_next_messages--; 665 h->skip_next_messages--;
661 process_queue(h); 666 process_queue (h);
662 return NULL; 667 return NULL;
663 } 668 }
664 qe = h->queue_head; 669 qe = h->queue_head;
665 if (NULL == qe) 670 if (NULL == qe)
666 { 671 {
667 GNUNET_break(0); 672 GNUNET_break (0);
668 do_disconnect(h); 673 do_disconnect (h);
669 return NULL; 674 return NULL;
670 } 675 }
671 if (NULL != qe->env) 676 if (NULL != qe->env)
672 { 677 {
673 GNUNET_break(0); 678 GNUNET_break (0);
674 do_disconnect(h); 679 do_disconnect (h);
675 return NULL; 680 return NULL;
676 } 681 }
677 if (response_type != qe->response_type) 682 if (response_type != qe->response_type)
678 { 683 {
679 GNUNET_break(0); 684 GNUNET_break (0);
680 do_disconnect(h); 685 do_disconnect (h);
681 return NULL; 686 return NULL;
682 } 687 }
683 return qe; 688 return qe;
684} 689}
685 690
@@ -692,27 +697,27 @@ get_queue_head(struct GNUNET_DATASTORE_Handle *h,
692 * @return #GNUNET_OK if the message is well-formed 697 * @return #GNUNET_OK if the message is well-formed
693 */ 698 */
694static int 699static int
695check_status(void *cls, 700check_status (void *cls,
696 const struct StatusMessage *sm) 701 const struct StatusMessage *sm)
697{ 702{
698 uint16_t msize = ntohs(sm->header.size) - sizeof(*sm); 703 uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
699 int32_t status = ntohl(sm->status); 704 int32_t status = ntohl (sm->status);
700 705
701 if (msize > 0) 706 if (msize > 0)
702 { 707 {
703 const char *emsg = (const char *)&sm[1]; 708 const char *emsg = (const char *) &sm[1];
704 709
705 if ('\0' != emsg[msize - 1]) 710 if ('\0' != emsg[msize - 1])
706 {
707 GNUNET_break(0);
708 return GNUNET_SYSERR;
709 }
710 }
711 else if (GNUNET_SYSERR == status)
712 { 711 {
713 GNUNET_break(0); 712 GNUNET_break (0);
714 return GNUNET_SYSERR; 713 return GNUNET_SYSERR;
715 } 714 }
715 }
716 else if (GNUNET_SYSERR == status)
717 {
718 GNUNET_break (0);
719 return GNUNET_SYSERR;
720 }
716 return GNUNET_OK; 721 return GNUNET_OK;
717} 722}
718 723
@@ -724,40 +729,40 @@ check_status(void *cls,
724 * @param sm status message received 729 * @param sm status message received
725 */ 730 */
726static void 731static void
727handle_status(void *cls, 732handle_status (void *cls,
728 const struct StatusMessage *sm) 733 const struct StatusMessage *sm)
729{ 734{
730 struct GNUNET_DATASTORE_Handle *h = cls; 735 struct GNUNET_DATASTORE_Handle *h = cls;
731 struct GNUNET_DATASTORE_QueueEntry *qe; 736 struct GNUNET_DATASTORE_QueueEntry *qe;
732 struct StatusContext rc; 737 struct StatusContext rc;
733 const char *emsg; 738 const char *emsg;
734 int32_t status = ntohl(sm->status); 739 int32_t status = ntohl (sm->status);
735 740
736 qe = get_queue_head(h, 741 qe = get_queue_head (h,
737 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); 742 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
738 if (NULL == qe) 743 if (NULL == qe)
739 return; 744 return;
740 rc = qe->qc.sc; 745 rc = qe->qc.sc;
741 free_queue_entry(qe); 746 free_queue_entry (qe);
742 if (ntohs(sm->header.size) > sizeof(struct StatusMessage)) 747 if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
743 emsg = (const char *)&sm[1]; 748 emsg = (const char *) &sm[1];
744 else 749 else
745 emsg = NULL; 750 emsg = NULL;
746 LOG(GNUNET_ERROR_TYPE_DEBUG, 751 LOG (GNUNET_ERROR_TYPE_DEBUG,
747 "Received status %d/%s\n", 752 "Received status %d/%s\n",
748 (int)status, 753 (int) status,
749 emsg); 754 emsg);
750 GNUNET_STATISTICS_update(h->stats, 755 GNUNET_STATISTICS_update (h->stats,
751 gettext_noop("# status messages received"), 756 gettext_noop ("# status messages received"),
752 1, 757 1,
753 GNUNET_NO); 758 GNUNET_NO);
754 h->retry_time = GNUNET_TIME_UNIT_ZERO; 759 h->retry_time = GNUNET_TIME_UNIT_ZERO;
755 process_queue(h); 760 process_queue (h);
756 if (NULL != rc.cont) 761 if (NULL != rc.cont)
757 rc.cont(rc.cont_cls, 762 rc.cont (rc.cont_cls,
758 status, 763 status,
759 GNUNET_TIME_absolute_ntoh(sm->min_expiration), 764 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
760 emsg); 765 emsg);
761} 766}
762 767
763 768
@@ -768,16 +773,16 @@ handle_status(void *cls,
768 * @param dm message received 773 * @param dm message received
769 */ 774 */
770static int 775static int
771check_data(void *cls, 776check_data (void *cls,
772 const struct DataMessage *dm) 777 const struct DataMessage *dm)
773{ 778{
774 uint16_t msize = ntohs(dm->header.size) - sizeof(*dm); 779 uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
775 780
776 if (msize != ntohl(dm->size)) 781 if (msize != ntohl (dm->size))
777 { 782 {
778 GNUNET_break(0); 783 GNUNET_break (0);
779 return GNUNET_SYSERR; 784 return GNUNET_SYSERR;
780 } 785 }
781 return GNUNET_OK; 786 return GNUNET_OK;
782} 787}
783 788
@@ -789,44 +794,44 @@ check_data(void *cls,
789 * @param dm message received 794 * @param dm message received
790 */ 795 */
791static void 796static void
792handle_data(void *cls, 797handle_data (void *cls,
793 const struct DataMessage *dm) 798 const struct DataMessage *dm)
794{ 799{
795 struct GNUNET_DATASTORE_Handle *h = cls; 800 struct GNUNET_DATASTORE_Handle *h = cls;
796 struct GNUNET_DATASTORE_QueueEntry *qe; 801 struct GNUNET_DATASTORE_QueueEntry *qe;
797 struct ResultContext rc; 802 struct ResultContext rc;
798 803
799 qe = get_queue_head(h, 804 qe = get_queue_head (h,
800 GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 805 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
801 if (NULL == qe) 806 if (NULL == qe)
802 return; 807 return;
803#if INSANE_STATISTICS 808#if INSANE_STATISTICS
804 GNUNET_STATISTICS_update(h->stats, 809 GNUNET_STATISTICS_update (h->stats,
805 gettext_noop("# Results received"), 810 gettext_noop ("# Results received"),
806 1, 811 1,
807 GNUNET_NO); 812 GNUNET_NO);
808#endif 813#endif
809 LOG(GNUNET_ERROR_TYPE_DEBUG, 814 LOG (GNUNET_ERROR_TYPE_DEBUG,
810 "Received result %llu with type %u and size %u with key %s\n", 815 "Received result %llu with type %u and size %u with key %s\n",
811 (unsigned long long)GNUNET_ntohll(dm->uid), 816 (unsigned long long) GNUNET_ntohll (dm->uid),
812 ntohl(dm->type), 817 ntohl (dm->type),
813 ntohl(dm->size), 818 ntohl (dm->size),
814 GNUNET_h2s(&dm->key)); 819 GNUNET_h2s (&dm->key));
815 rc = qe->qc.rc; 820 rc = qe->qc.rc;
816 free_queue_entry(qe); 821 free_queue_entry (qe);
817 h->retry_time = GNUNET_TIME_UNIT_ZERO; 822 h->retry_time = GNUNET_TIME_UNIT_ZERO;
818 process_queue(h); 823 process_queue (h);
819 if (NULL != rc.proc) 824 if (NULL != rc.proc)
820 rc.proc(rc.proc_cls, 825 rc.proc (rc.proc_cls,
821 &dm->key, 826 &dm->key,
822 ntohl(dm->size), 827 ntohl (dm->size),
823 &dm[1], 828 &dm[1],
824 ntohl(dm->type), 829 ntohl (dm->type),
825 ntohl(dm->priority), 830 ntohl (dm->priority),
826 ntohl(dm->anonymity), 831 ntohl (dm->anonymity),
827 ntohl(dm->replication), 832 ntohl (dm->replication),
828 GNUNET_TIME_absolute_ntoh(dm->expiration), 833 GNUNET_TIME_absolute_ntoh (dm->expiration),
829 GNUNET_ntohll(dm->uid)); 834 GNUNET_ntohll (dm->uid));
830} 835}
831 836
832 837
@@ -838,37 +843,37 @@ handle_data(void *cls,
838 * @param msg message received 843 * @param msg message received
839 */ 844 */
840static void 845static void
841handle_data_end(void *cls, 846handle_data_end (void *cls,
842 const struct GNUNET_MessageHeader *msg) 847 const struct GNUNET_MessageHeader *msg)
843{ 848{
844 struct GNUNET_DATASTORE_Handle *h = cls; 849 struct GNUNET_DATASTORE_Handle *h = cls;
845 struct GNUNET_DATASTORE_QueueEntry *qe; 850 struct GNUNET_DATASTORE_QueueEntry *qe;
846 struct ResultContext rc; 851 struct ResultContext rc;
847 852
848 qe = get_queue_head(h, 853 qe = get_queue_head (h,
849 GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 854 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
850 if (NULL == qe) 855 if (NULL == qe)
851 return; 856 return;
852 rc = qe->qc.rc; 857 rc = qe->qc.rc;
853 free_queue_entry(qe); 858 free_queue_entry (qe);
854 LOG(GNUNET_ERROR_TYPE_DEBUG, 859 LOG (GNUNET_ERROR_TYPE_DEBUG,
855 "Received end of result set, new queue size is %u\n", 860 "Received end of result set, new queue size is %u\n",
856 h->queue_size); 861 h->queue_size);
857 h->retry_time = GNUNET_TIME_UNIT_ZERO; 862 h->retry_time = GNUNET_TIME_UNIT_ZERO;
858 h->result_count = 0; 863 h->result_count = 0;
859 process_queue(h); 864 process_queue (h);
860 /* signal end of iteration */ 865 /* signal end of iteration */
861 if (NULL != rc.proc) 866 if (NULL != rc.proc)
862 rc.proc(rc.proc_cls, 867 rc.proc (rc.proc_cls,
863 NULL, 868 NULL,
864 0, 869 0,
865 NULL, 870 NULL,
866 0, 871 0,
867 0, 872 0,
868 0, 873 0,
869 0, 874 0,
870 GNUNET_TIME_UNIT_ZERO_ABS, 875 GNUNET_TIME_UNIT_ZERO_ABS,
871 0); 876 0);
872} 877}
873 878
874 879
@@ -878,42 +883,43 @@ handle_data_end(void *cls,
878 * @param cls the `struct GNUNET_DATASTORE_Handle` 883 * @param cls the `struct GNUNET_DATASTORE_Handle`
879 */ 884 */
880static void 885static void
881try_reconnect(void *cls) 886try_reconnect (void *cls)
882{ 887{
883 struct GNUNET_DATASTORE_Handle *h = cls; 888 struct GNUNET_DATASTORE_Handle *h = cls;
884 struct GNUNET_MQ_MessageHandler handlers[] = { 889 struct GNUNET_MQ_MessageHandler handlers[] = {
885 GNUNET_MQ_hd_var_size(status, 890 GNUNET_MQ_hd_var_size (status,
886 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 891 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
887 struct StatusMessage, 892 struct StatusMessage,
888 h), 893 h),
889 GNUNET_MQ_hd_var_size(data, 894 GNUNET_MQ_hd_var_size (data,
890 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 895 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
891 struct DataMessage, 896 struct DataMessage,
892 h), 897 h),
893 GNUNET_MQ_hd_fixed_size(data_end, 898 GNUNET_MQ_hd_fixed_size (data_end,
894 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, 899 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
895 struct GNUNET_MessageHeader, 900 struct GNUNET_MessageHeader,
896 h), 901 h),
897 GNUNET_MQ_handler_end() 902 GNUNET_MQ_handler_end ()
898 }; 903 };
899 904
900 h->retry_time = GNUNET_TIME_STD_BACKOFF(h->retry_time); 905 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
901 h->reconnect_task = NULL; 906 h->reconnect_task = NULL;
902 GNUNET_assert(NULL == h->mq); 907 GNUNET_assert (NULL == h->mq);
903 h->mq = GNUNET_CLIENT_connect(h->cfg, 908 h->mq = GNUNET_CLIENT_connect (h->cfg,
904 "datastore", 909 "datastore",
905 handlers, 910 handlers,
906 &mq_error_handler, 911 &mq_error_handler,
907 h); 912 h);
908 if (NULL == h->mq) 913 if (NULL == h->mq)
909 return; 914 return;
910 GNUNET_STATISTICS_update(h->stats, 915 GNUNET_STATISTICS_update (h->stats,
911 gettext_noop("# datastore connections (re)created"), 916 gettext_noop (
912 1, 917 "# datastore connections (re)created"),
913 GNUNET_NO); 918 1,
914 LOG(GNUNET_ERROR_TYPE_DEBUG, 919 GNUNET_NO);
915 "Reconnected to DATASTORE\n"); 920 LOG (GNUNET_ERROR_TYPE_DEBUG,
916 process_queue(h); 921 "Reconnected to DATASTORE\n");
922 process_queue (h);
917} 923}
918 924
919 925
@@ -926,10 +932,10 @@ try_reconnect(void *cls)
926 * @param emsg error message 932 * @param emsg error message
927 */ 933 */
928static void 934static void
929drop_status_cont(void *cls, 935drop_status_cont (void *cls,
930 int32_t result, 936 int32_t result,
931 struct GNUNET_TIME_Absolute min_expiration, 937 struct GNUNET_TIME_Absolute min_expiration,
932 const char *emsg) 938 const char *emsg)
933{ 939{
934 /* do nothing */ 940 /* do nothing */
935} 941}
@@ -961,20 +967,20 @@ drop_status_cont(void *cls,
961 * (or rather, will already have been invoked) 967 * (or rather, will already have been invoked)
962 */ 968 */
963struct GNUNET_DATASTORE_QueueEntry * 969struct GNUNET_DATASTORE_QueueEntry *
964GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, 970GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
965 uint32_t rid, 971 uint32_t rid,
966 const struct GNUNET_HashCode *key, 972 const struct GNUNET_HashCode *key,
967 size_t size, 973 size_t size,
968 const void *data, 974 const void *data,
969 enum GNUNET_BLOCK_Type type, 975 enum GNUNET_BLOCK_Type type,
970 uint32_t priority, 976 uint32_t priority,
971 uint32_t anonymity, 977 uint32_t anonymity,
972 uint32_t replication, 978 uint32_t replication,
973 struct GNUNET_TIME_Absolute expiration, 979 struct GNUNET_TIME_Absolute expiration,
974 unsigned int queue_priority, 980 unsigned int queue_priority,
975 unsigned int max_queue_size, 981 unsigned int max_queue_size,
976 GNUNET_DATASTORE_ContinuationWithStatus cont, 982 GNUNET_DATASTORE_ContinuationWithStatus cont,
977 void *cont_cls) 983 void *cont_cls)
978{ 984{
979 struct GNUNET_DATASTORE_QueueEntry *qe; 985 struct GNUNET_DATASTORE_QueueEntry *qe;
980 struct GNUNET_MQ_Envelope *env; 986 struct GNUNET_MQ_Envelope *env;
@@ -982,50 +988,51 @@ GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h,
982 union QueueContext qc; 988 union QueueContext qc;
983 989
984 if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE) 990 if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
985 { 991 {
986 GNUNET_break(0); 992 GNUNET_break (0);
987 return NULL; 993 return NULL;
988 } 994 }
989 995
990 LOG(GNUNET_ERROR_TYPE_DEBUG, 996 LOG (GNUNET_ERROR_TYPE_DEBUG,
991 "Asked to put %u bytes of data under key `%s' for %s\n", 997 "Asked to put %u bytes of data under key `%s' for %s\n",
992 size, 998 size,
993 GNUNET_h2s(key), 999 GNUNET_h2s (key),
994 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(expiration), 1000 GNUNET_STRINGS_relative_time_to_string (
995 GNUNET_YES)); 1001 GNUNET_TIME_absolute_get_remaining (expiration),
996 env = GNUNET_MQ_msg_extra(dm, 1002 GNUNET_YES));
997 size, 1003 env = GNUNET_MQ_msg_extra (dm,
998 GNUNET_MESSAGE_TYPE_DATASTORE_PUT); 1004 size,
999 dm->rid = htonl(rid); 1005 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1000 dm->size = htonl((uint32_t)size); 1006 dm->rid = htonl (rid);
1001 dm->type = htonl(type); 1007 dm->size = htonl ((uint32_t) size);
1002 dm->priority = htonl(priority); 1008 dm->type = htonl (type);
1003 dm->anonymity = htonl(anonymity); 1009 dm->priority = htonl (priority);
1004 dm->replication = htonl(replication); 1010 dm->anonymity = htonl (anonymity);
1005 dm->expiration = GNUNET_TIME_absolute_hton(expiration); 1011 dm->replication = htonl (replication);
1012 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1006 dm->key = *key; 1013 dm->key = *key;
1007 GNUNET_memcpy(&dm[1], 1014 GNUNET_memcpy (&dm[1],
1008 data, 1015 data,
1009 size); 1016 size);
1010 qc.sc.cont = cont; 1017 qc.sc.cont = cont;
1011 qc.sc.cont_cls = cont_cls; 1018 qc.sc.cont_cls = cont_cls;
1012 qe = make_queue_entry(h, 1019 qe = make_queue_entry (h,
1013 env, 1020 env,
1014 queue_priority, 1021 queue_priority,
1015 max_queue_size, 1022 max_queue_size,
1016 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1023 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1017 &qc); 1024 &qc);
1018 if (NULL == qe) 1025 if (NULL == qe)
1019 { 1026 {
1020 LOG(GNUNET_ERROR_TYPE_DEBUG, 1027 LOG (GNUNET_ERROR_TYPE_DEBUG,
1021 "Could not create queue entry for PUT\n"); 1028 "Could not create queue entry for PUT\n");
1022 return NULL; 1029 return NULL;
1023 } 1030 }
1024 GNUNET_STATISTICS_update(h->stats, 1031 GNUNET_STATISTICS_update (h->stats,
1025 gettext_noop("# PUT requests executed"), 1032 gettext_noop ("# PUT requests executed"),
1026 1, 1033 1,
1027 GNUNET_NO); 1034 GNUNET_NO);
1028 process_queue(h); 1035 process_queue (h);
1029 return qe; 1036 return qe;
1030} 1037}
1031 1038
@@ -1046,11 +1053,11 @@ GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h,
1046 * (or rather, will already have been invoked) 1053 * (or rather, will already have been invoked)
1047 */ 1054 */
1048struct GNUNET_DATASTORE_QueueEntry * 1055struct GNUNET_DATASTORE_QueueEntry *
1049GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, 1056GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1050 uint64_t amount, 1057 uint64_t amount,
1051 uint32_t entries, 1058 uint32_t entries,
1052 GNUNET_DATASTORE_ContinuationWithStatus cont, 1059 GNUNET_DATASTORE_ContinuationWithStatus cont,
1053 void *cont_cls) 1060 void *cont_cls)
1054{ 1061{
1055 struct GNUNET_DATASTORE_QueueEntry *qe; 1062 struct GNUNET_DATASTORE_QueueEntry *qe;
1056 struct GNUNET_MQ_Envelope *env; 1063 struct GNUNET_MQ_Envelope *env;
@@ -1059,34 +1066,34 @@ GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h,
1059 1066
1060 if (NULL == cont) 1067 if (NULL == cont)
1061 cont = &drop_status_cont; 1068 cont = &drop_status_cont;
1062 LOG(GNUNET_ERROR_TYPE_DEBUG, 1069 LOG (GNUNET_ERROR_TYPE_DEBUG,
1063 "Asked to reserve %llu bytes of data and %u entries\n", 1070 "Asked to reserve %llu bytes of data and %u entries\n",
1064 (unsigned long long)amount, 1071 (unsigned long long) amount,
1065 (unsigned int)entries); 1072 (unsigned int) entries);
1066 env = GNUNET_MQ_msg(rm, 1073 env = GNUNET_MQ_msg (rm,
1067 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); 1074 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1068 rm->entries = htonl(entries); 1075 rm->entries = htonl (entries);
1069 rm->amount = GNUNET_htonll(amount); 1076 rm->amount = GNUNET_htonll (amount);
1070 1077
1071 qc.sc.cont = cont; 1078 qc.sc.cont = cont;
1072 qc.sc.cont_cls = cont_cls; 1079 qc.sc.cont_cls = cont_cls;
1073 qe = make_queue_entry(h, 1080 qe = make_queue_entry (h,
1074 env, 1081 env,
1075 UINT_MAX, 1082 UINT_MAX,
1076 UINT_MAX, 1083 UINT_MAX,
1077 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1084 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1078 &qc); 1085 &qc);
1079 if (NULL == qe) 1086 if (NULL == qe)
1080 { 1087 {
1081 LOG(GNUNET_ERROR_TYPE_DEBUG, 1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1082 "Could not create queue entry to reserve\n"); 1089 "Could not create queue entry to reserve\n");
1083 return NULL; 1090 return NULL;
1084 } 1091 }
1085 GNUNET_STATISTICS_update(h->stats, 1092 GNUNET_STATISTICS_update (h->stats,
1086 gettext_noop("# RESERVE requests executed"), 1093 gettext_noop ("# RESERVE requests executed"),
1087 1, 1094 1,
1088 GNUNET_NO); 1095 GNUNET_NO);
1089 process_queue(h); 1096 process_queue (h);
1090 return qe; 1097 return qe;
1091} 1098}
1092 1099
@@ -1112,12 +1119,12 @@ GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h,
1112 * (or rather, will already have been invoked) 1119 * (or rather, will already have been invoked)
1113 */ 1120 */
1114struct GNUNET_DATASTORE_QueueEntry * 1121struct GNUNET_DATASTORE_QueueEntry *
1115GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, 1122GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1116 uint32_t rid, 1123 uint32_t rid,
1117 unsigned int queue_priority, 1124 unsigned int queue_priority,
1118 unsigned int max_queue_size, 1125 unsigned int max_queue_size,
1119 GNUNET_DATASTORE_ContinuationWithStatus cont, 1126 GNUNET_DATASTORE_ContinuationWithStatus cont,
1120 void *cont_cls) 1127 void *cont_cls)
1121{ 1128{
1122 struct GNUNET_DATASTORE_QueueEntry *qe; 1129 struct GNUNET_DATASTORE_QueueEntry *qe;
1123 struct GNUNET_MQ_Envelope *env; 1130 struct GNUNET_MQ_Envelope *env;
@@ -1126,31 +1133,31 @@ GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h,
1126 1133
1127 if (NULL == cont) 1134 if (NULL == cont)
1128 cont = &drop_status_cont; 1135 cont = &drop_status_cont;
1129 LOG(GNUNET_ERROR_TYPE_DEBUG, 1136 LOG (GNUNET_ERROR_TYPE_DEBUG,
1130 "Asked to release reserve %d\n", 1137 "Asked to release reserve %d\n",
1131 rid); 1138 rid);
1132 env = GNUNET_MQ_msg(rrm, 1139 env = GNUNET_MQ_msg (rrm,
1133 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); 1140 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1134 rrm->rid = htonl(rid); 1141 rrm->rid = htonl (rid);
1135 qc.sc.cont = cont; 1142 qc.sc.cont = cont;
1136 qc.sc.cont_cls = cont_cls; 1143 qc.sc.cont_cls = cont_cls;
1137 qe = make_queue_entry(h, 1144 qe = make_queue_entry (h,
1138 env, 1145 env,
1139 queue_priority, 1146 queue_priority,
1140 max_queue_size, 1147 max_queue_size,
1141 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1148 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1142 &qc); 1149 &qc);
1143 if (NULL == qe) 1150 if (NULL == qe)
1144 { 1151 {
1145 LOG(GNUNET_ERROR_TYPE_DEBUG, 1152 LOG (GNUNET_ERROR_TYPE_DEBUG,
1146 "Could not create queue entry to release reserve\n"); 1153 "Could not create queue entry to release reserve\n");
1147 return NULL; 1154 return NULL;
1148 } 1155 }
1149 GNUNET_STATISTICS_update(h->stats, 1156 GNUNET_STATISTICS_update (h->stats,
1150 gettext_noop 1157 gettext_noop
1151 ("# RELEASE RESERVE requests executed"), 1, 1158 ("# RELEASE RESERVE requests executed"), 1,
1152 GNUNET_NO); 1159 GNUNET_NO);
1153 process_queue(h); 1160 process_queue (h);
1154 return qe; 1161 return qe;
1155} 1162}
1156 1163
@@ -1176,14 +1183,14 @@ GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h,
1176 * (or rather, will already have been invoked) 1183 * (or rather, will already have been invoked)
1177 */ 1184 */
1178struct GNUNET_DATASTORE_QueueEntry * 1185struct GNUNET_DATASTORE_QueueEntry *
1179GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, 1186GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1180 const struct GNUNET_HashCode *key, 1187 const struct GNUNET_HashCode *key,
1181 size_t size, 1188 size_t size,
1182 const void *data, 1189 const void *data,
1183 unsigned int queue_priority, 1190 unsigned int queue_priority,
1184 unsigned int max_queue_size, 1191 unsigned int max_queue_size,
1185 GNUNET_DATASTORE_ContinuationWithStatus cont, 1192 GNUNET_DATASTORE_ContinuationWithStatus cont,
1186 void *cont_cls) 1193 void *cont_cls)
1187{ 1194{
1188 struct GNUNET_DATASTORE_QueueEntry *qe; 1195 struct GNUNET_DATASTORE_QueueEntry *qe;
1189 struct DataMessage *dm; 1196 struct DataMessage *dm;
@@ -1191,45 +1198,45 @@ GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h,
1191 union QueueContext qc; 1198 union QueueContext qc;
1192 1199
1193 if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) 1200 if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1194 { 1201 {
1195 GNUNET_break(0); 1202 GNUNET_break (0);
1196 return NULL; 1203 return NULL;
1197 } 1204 }
1198 if (NULL == cont) 1205 if (NULL == cont)
1199 cont = &drop_status_cont; 1206 cont = &drop_status_cont;
1200 LOG(GNUNET_ERROR_TYPE_DEBUG, 1207 LOG (GNUNET_ERROR_TYPE_DEBUG,
1201 "Asked to remove %u bytes under key `%s'\n", 1208 "Asked to remove %u bytes under key `%s'\n",
1202 size, 1209 size,
1203 GNUNET_h2s(key)); 1210 GNUNET_h2s (key));
1204 env = GNUNET_MQ_msg_extra(dm, 1211 env = GNUNET_MQ_msg_extra (dm,
1205 size, 1212 size,
1206 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); 1213 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1207 dm->size = htonl(size); 1214 dm->size = htonl (size);
1208 dm->key = *key; 1215 dm->key = *key;
1209 GNUNET_memcpy(&dm[1], 1216 GNUNET_memcpy (&dm[1],
1210 data, 1217 data,
1211 size); 1218 size);
1212 1219
1213 qc.sc.cont = cont; 1220 qc.sc.cont = cont;
1214 qc.sc.cont_cls = cont_cls; 1221 qc.sc.cont_cls = cont_cls;
1215 1222
1216 qe = make_queue_entry(h, 1223 qe = make_queue_entry (h,
1217 env, 1224 env,
1218 queue_priority, 1225 queue_priority,
1219 max_queue_size, 1226 max_queue_size,
1220 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1227 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1221 &qc); 1228 &qc);
1222 if (NULL == qe) 1229 if (NULL == qe)
1223 { 1230 {
1224 LOG(GNUNET_ERROR_TYPE_DEBUG, 1231 LOG (GNUNET_ERROR_TYPE_DEBUG,
1225 "Could not create queue entry for REMOVE\n"); 1232 "Could not create queue entry for REMOVE\n");
1226 return NULL; 1233 return NULL;
1227 } 1234 }
1228 GNUNET_STATISTICS_update(h->stats, 1235 GNUNET_STATISTICS_update (h->stats,
1229 gettext_noop("# REMOVE requests executed"), 1236 gettext_noop ("# REMOVE requests executed"),
1230 1, 1237 1,
1231 GNUNET_NO); 1238 GNUNET_NO);
1232 process_queue(h); 1239 process_queue (h);
1233 return qe; 1240 return qe;
1234} 1241}
1235 1242
@@ -1254,41 +1261,41 @@ GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h,
1254 * cancel 1261 * cancel
1255 */ 1262 */
1256struct GNUNET_DATASTORE_QueueEntry * 1263struct GNUNET_DATASTORE_QueueEntry *
1257GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, 1264GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1258 unsigned int queue_priority, 1265 unsigned int queue_priority,
1259 unsigned int max_queue_size, 1266 unsigned int max_queue_size,
1260 GNUNET_DATASTORE_DatumProcessor proc, 1267 GNUNET_DATASTORE_DatumProcessor proc,
1261 void *proc_cls) 1268 void *proc_cls)
1262{ 1269{
1263 struct GNUNET_DATASTORE_QueueEntry *qe; 1270 struct GNUNET_DATASTORE_QueueEntry *qe;
1264 struct GNUNET_MQ_Envelope *env; 1271 struct GNUNET_MQ_Envelope *env;
1265 struct GNUNET_MessageHeader *m; 1272 struct GNUNET_MessageHeader *m;
1266 union QueueContext qc; 1273 union QueueContext qc;
1267 1274
1268 GNUNET_assert(NULL != proc); 1275 GNUNET_assert (NULL != proc);
1269 LOG(GNUNET_ERROR_TYPE_DEBUG, 1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1270 "Asked to get replication entry\n"); 1277 "Asked to get replication entry\n");
1271 env = GNUNET_MQ_msg(m, 1278 env = GNUNET_MQ_msg (m,
1272 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); 1279 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1273 qc.rc.proc = proc; 1280 qc.rc.proc = proc;
1274 qc.rc.proc_cls = proc_cls; 1281 qc.rc.proc_cls = proc_cls;
1275 qe = make_queue_entry(h, 1282 qe = make_queue_entry (h,
1276 env, 1283 env,
1277 queue_priority, 1284 queue_priority,
1278 max_queue_size, 1285 max_queue_size,
1279 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1286 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1280 &qc); 1287 &qc);
1281 if (NULL == qe) 1288 if (NULL == qe)
1282 { 1289 {
1283 LOG(GNUNET_ERROR_TYPE_DEBUG, 1290 LOG (GNUNET_ERROR_TYPE_DEBUG,
1284 "Could not create queue entry for GET REPLICATION\n"); 1291 "Could not create queue entry for GET REPLICATION\n");
1285 return NULL; 1292 return NULL;
1286 } 1293 }
1287 GNUNET_STATISTICS_update(h->stats, 1294 GNUNET_STATISTICS_update (h->stats,
1288 gettext_noop 1295 gettext_noop
1289 ("# GET REPLICATION requests executed"), 1, 1296 ("# GET REPLICATION requests executed"), 1,
1290 GNUNET_NO); 1297 GNUNET_NO);
1291 process_queue(h); 1298 process_queue (h);
1292 return qe; 1299 return qe;
1293} 1300}
1294 1301
@@ -1310,47 +1317,47 @@ GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h,
1310 * cancel 1317 * cancel
1311 */ 1318 */
1312struct GNUNET_DATASTORE_QueueEntry * 1319struct GNUNET_DATASTORE_QueueEntry *
1313GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, 1320GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1314 uint64_t next_uid, 1321 uint64_t next_uid,
1315 unsigned int queue_priority, 1322 unsigned int queue_priority,
1316 unsigned int max_queue_size, 1323 unsigned int max_queue_size,
1317 enum GNUNET_BLOCK_Type type, 1324 enum GNUNET_BLOCK_Type type,
1318 GNUNET_DATASTORE_DatumProcessor proc, 1325 GNUNET_DATASTORE_DatumProcessor proc,
1319 void *proc_cls) 1326 void *proc_cls)
1320{ 1327{
1321 struct GNUNET_DATASTORE_QueueEntry *qe; 1328 struct GNUNET_DATASTORE_QueueEntry *qe;
1322 struct GNUNET_MQ_Envelope *env; 1329 struct GNUNET_MQ_Envelope *env;
1323 struct GetZeroAnonymityMessage *m; 1330 struct GetZeroAnonymityMessage *m;
1324 union QueueContext qc; 1331 union QueueContext qc;
1325 1332
1326 GNUNET_assert(NULL != proc); 1333 GNUNET_assert (NULL != proc);
1327 GNUNET_assert(type != GNUNET_BLOCK_TYPE_ANY); 1334 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1328 LOG(GNUNET_ERROR_TYPE_DEBUG, 1335 LOG (GNUNET_ERROR_TYPE_DEBUG,
1329 "Asked to get a zero-anonymity entry of type %d\n", 1336 "Asked to get a zero-anonymity entry of type %d\n",
1330 type); 1337 type);
1331 env = GNUNET_MQ_msg(m, 1338 env = GNUNET_MQ_msg (m,
1332 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1339 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1333 m->type = htonl((uint32_t)type); 1340 m->type = htonl ((uint32_t) type);
1334 m->next_uid = GNUNET_htonll(next_uid); 1341 m->next_uid = GNUNET_htonll (next_uid);
1335 qc.rc.proc = proc; 1342 qc.rc.proc = proc;
1336 qc.rc.proc_cls = proc_cls; 1343 qc.rc.proc_cls = proc_cls;
1337 qe = make_queue_entry(h, 1344 qe = make_queue_entry (h,
1338 env, 1345 env,
1339 queue_priority, 1346 queue_priority,
1340 max_queue_size, 1347 max_queue_size,
1341 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1348 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1342 &qc); 1349 &qc);
1343 if (NULL == qe) 1350 if (NULL == qe)
1344 { 1351 {
1345 LOG(GNUNET_ERROR_TYPE_DEBUG, 1352 LOG (GNUNET_ERROR_TYPE_DEBUG,
1346 "Could not create queue entry for zero-anonymity procation\n"); 1353 "Could not create queue entry for zero-anonymity procation\n");
1347 return NULL; 1354 return NULL;
1348 } 1355 }
1349 GNUNET_STATISTICS_update(h->stats, 1356 GNUNET_STATISTICS_update (h->stats,
1350 gettext_noop 1357 gettext_noop
1351 ("# GET ZERO ANONYMITY requests executed"), 1, 1358 ("# GET ZERO ANONYMITY requests executed"), 1,
1352 GNUNET_NO); 1359 GNUNET_NO);
1353 process_queue(h); 1360 process_queue (h);
1354 return qe; 1361 return qe;
1355} 1362}
1356 1363
@@ -1374,15 +1381,15 @@ GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h,
1374 * cancel 1381 * cancel
1375 */ 1382 */
1376struct GNUNET_DATASTORE_QueueEntry * 1383struct GNUNET_DATASTORE_QueueEntry *
1377GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, 1384GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1378 uint64_t next_uid, 1385 uint64_t next_uid,
1379 bool random, 1386 bool random,
1380 const struct GNUNET_HashCode *key, 1387 const struct GNUNET_HashCode *key,
1381 enum GNUNET_BLOCK_Type type, 1388 enum GNUNET_BLOCK_Type type,
1382 unsigned int queue_priority, 1389 unsigned int queue_priority,
1383 unsigned int max_queue_size, 1390 unsigned int max_queue_size,
1384 GNUNET_DATASTORE_DatumProcessor proc, 1391 GNUNET_DATASTORE_DatumProcessor proc,
1385 void *proc_cls) 1392 void *proc_cls)
1386{ 1393{
1387 struct GNUNET_DATASTORE_QueueEntry *qe; 1394 struct GNUNET_DATASTORE_QueueEntry *qe;
1388 struct GNUNET_MQ_Envelope *env; 1395 struct GNUNET_MQ_Envelope *env;
@@ -1390,50 +1397,50 @@ GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h,
1390 struct GetMessage *gm; 1397 struct GetMessage *gm;
1391 union QueueContext qc; 1398 union QueueContext qc;
1392 1399
1393 GNUNET_assert(NULL != proc); 1400 GNUNET_assert (NULL != proc);
1394 LOG(GNUNET_ERROR_TYPE_DEBUG, 1401 LOG (GNUNET_ERROR_TYPE_DEBUG,
1395 "Asked to look for data of type %u under key `%s'\n", 1402 "Asked to look for data of type %u under key `%s'\n",
1396 (unsigned int)type, 1403 (unsigned int) type,
1397 GNUNET_h2s(key)); 1404 GNUNET_h2s (key));
1398 if (NULL == key) 1405 if (NULL == key)
1399 { 1406 {
1400 env = GNUNET_MQ_msg(gm, 1407 env = GNUNET_MQ_msg (gm,
1401 GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1408 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1402 gm->type = htonl(type); 1409 gm->type = htonl (type);
1403 gm->next_uid = GNUNET_htonll(next_uid); 1410 gm->next_uid = GNUNET_htonll (next_uid);
1404 gm->random = random; 1411 gm->random = random;
1405 } 1412 }
1406 else 1413 else
1407 { 1414 {
1408 env = GNUNET_MQ_msg(gkm, 1415 env = GNUNET_MQ_msg (gkm,
1409 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); 1416 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1410 gkm->type = htonl(type); 1417 gkm->type = htonl (type);
1411 gkm->next_uid = GNUNET_htonll(next_uid); 1418 gkm->next_uid = GNUNET_htonll (next_uid);
1412 gkm->random = random; 1419 gkm->random = random;
1413 gkm->key = *key; 1420 gkm->key = *key;
1414 } 1421 }
1415 qc.rc.proc = proc; 1422 qc.rc.proc = proc;
1416 qc.rc.proc_cls = proc_cls; 1423 qc.rc.proc_cls = proc_cls;
1417 qe = make_queue_entry(h, 1424 qe = make_queue_entry (h,
1418 env, 1425 env,
1419 queue_priority, 1426 queue_priority,
1420 max_queue_size, 1427 max_queue_size,
1421 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1428 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1422 &qc); 1429 &qc);
1423 if (NULL == qe) 1430 if (NULL == qe)
1424 { 1431 {
1425 LOG(GNUNET_ERROR_TYPE_DEBUG, 1432 LOG (GNUNET_ERROR_TYPE_DEBUG,
1426 "Could not queue request for `%s'\n", 1433 "Could not queue request for `%s'\n",
1427 GNUNET_h2s(key)); 1434 GNUNET_h2s (key));
1428 return NULL; 1435 return NULL;
1429 } 1436 }
1430#if INSANE_STATISTICS 1437#if INSANE_STATISTICS
1431 GNUNET_STATISTICS_update(h->stats, 1438 GNUNET_STATISTICS_update (h->stats,
1432 gettext_noop("# GET requests executed"), 1439 gettext_noop ("# GET requests executed"),
1433 1, 1440 1,
1434 GNUNET_NO); 1441 GNUNET_NO);
1435#endif 1442#endif
1436 process_queue(h); 1443 process_queue (h);
1437 return qe; 1444 return qe;
1438} 1445}
1439 1446
@@ -1445,23 +1452,23 @@ GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h,
1445 * @param qe operation to cancel 1452 * @param qe operation to cancel
1446 */ 1453 */
1447void 1454void
1448GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe) 1455GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1449{ 1456{
1450 struct GNUNET_DATASTORE_Handle *h = qe->h; 1457 struct GNUNET_DATASTORE_Handle *h = qe->h;
1451 1458
1452 LOG(GNUNET_ERROR_TYPE_DEBUG, 1459 LOG (GNUNET_ERROR_TYPE_DEBUG,
1453 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1460 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1454 qe, 1461 qe,
1455 NULL == qe->env, 1462 NULL == qe->env,
1456 h->queue_head == qe); 1463 h->queue_head == qe);
1457 if (NULL == qe->env) 1464 if (NULL == qe->env)
1458 { 1465 {
1459 free_queue_entry(qe); 1466 free_queue_entry (qe);
1460 h->skip_next_messages++; 1467 h->skip_next_messages++;
1461 return; 1468 return;
1462 } 1469 }
1463 free_queue_entry(qe); 1470 free_queue_entry (qe);
1464 process_queue(h); 1471 process_queue (h);
1465} 1472}
1466 1473
1467 1474