aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c1315
1 files changed, 653 insertions, 662 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 636dbdd1a..7069975e4 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -11,12 +11,12 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file datastore/datastore_api.c 22 * @file datastore/datastore_api.c
@@ -31,7 +31,7 @@
31#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
32#include "datastore.h" 32#include "datastore.h"
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) 34#define LOG(kind, ...) GNUNET_log_from(kind, "datastore-api", __VA_ARGS__)
35 35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES 36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 37
@@ -51,8 +51,7 @@
51/** 51/**
52 * Context for processing status messages. 52 * Context for processing status messages.
53 */ 53 */
54struct StatusContext 54struct StatusContext {
55{
56 /** 55 /**
57 * Continuation to call with the status. 56 * Continuation to call with the status.
58 */ 57 */
@@ -62,15 +61,13 @@ struct StatusContext
62 * Closure for @e cont. 61 * Closure for @e cont.
63 */ 62 */
64 void *cont_cls; 63 void *cont_cls;
65
66}; 64};
67 65
68 66
69/** 67/**
70 * Context for processing result messages. 68 * Context for processing result messages.
71 */ 69 */
72struct ResultContext 70struct ResultContext {
73{
74 /** 71 /**
75 * Function to call with the result. 72 * Function to call with the result.
76 */ 73 */
@@ -80,29 +77,23 @@ struct ResultContext
80 * Closure for @e proc. 77 * Closure for @e proc.
81 */ 78 */
82 void *proc_cls; 79 void *proc_cls;
83
84}; 80};
85 81
86 82
87/** 83/**
88 * Context for a queue operation. 84 * Context for a queue operation.
89 */ 85 */
90union QueueContext 86union QueueContext {
91{
92
93 struct StatusContext sc; 87 struct StatusContext sc;
94 88
95 struct ResultContext rc; 89 struct ResultContext rc;
96
97}; 90};
98 91
99 92
100/** 93/**
101 * Entry in our priority queue. 94 * Entry in our priority queue.
102 */ 95 */
103struct GNUNET_DATASTORE_QueueEntry 96struct GNUNET_DATASTORE_QueueEntry {
104{
105
106 /** 97 /**
107 * This is a linked list. 98 * This is a linked list.
108 */ 99 */
@@ -160,16 +151,13 @@ struct GNUNET_DATASTORE_QueueEntry
160 * Expected response type. 151 * Expected response type.
161 */ 152 */
162 uint16_t response_type; 153 uint16_t response_type;
163
164}; 154};
165 155
166 156
167/** 157/**
168 * Handle to the datastore service. 158 * Handle to the datastore service.
169 */ 159 */
170struct GNUNET_DATASTORE_Handle 160struct GNUNET_DATASTORE_Handle {
171{
172
173 /** 161 /**
174 * Our configuration. 162 * Our configuration.
175 */ 163 */
@@ -222,7 +210,6 @@ struct GNUNET_DATASTORE_Handle
222 * We should ignore the next message(s) from the service. 210 * We should ignore the next message(s) from the service.
223 */ 211 */
224 unsigned int skip_next_messages; 212 unsigned int skip_next_messages;
225
226}; 213};
227 214
228 215
@@ -232,7 +219,7 @@ struct GNUNET_DATASTORE_Handle
232 * @param cls the `struct GNUNET_DATASTORE_Handle` 219 * @param cls the `struct GNUNET_DATASTORE_Handle`
233 */ 220 */
234static void 221static void
235try_reconnect (void *cls); 222try_reconnect(void *cls);
236 223
237 224
238/** 225/**
@@ -242,20 +229,20 @@ try_reconnect (void *cls);
242 * @param h handle to datastore to disconnect and reconnect 229 * @param h handle to datastore to disconnect and reconnect
243 */ 230 */
244static void 231static void
245do_disconnect (struct GNUNET_DATASTORE_Handle *h) 232do_disconnect(struct GNUNET_DATASTORE_Handle *h)
246{ 233{
247 if (NULL == h->mq) 234 if (NULL == h->mq)
248 { 235 {
249 GNUNET_break (0); 236 GNUNET_break(0);
250 return; 237 return;
251 } 238 }
252 GNUNET_MQ_destroy (h->mq); 239 GNUNET_MQ_destroy(h->mq);
253 h->mq = NULL; 240 h->mq = NULL;
254 h->skip_next_messages = 0; 241 h->skip_next_messages = 0;
255 h->reconnect_task 242 h->reconnect_task
256 = GNUNET_SCHEDULER_add_delayed (h->retry_time, 243 = GNUNET_SCHEDULER_add_delayed(h->retry_time,
257 &try_reconnect, 244 &try_reconnect,
258 h); 245 h);
259} 246}
260 247
261 248
@@ -267,19 +254,19 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
267 * @param qe entry to free. 254 * @param qe entry to free.
268 */ 255 */
269static void 256static void
270free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) 257free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
271{ 258{
272 struct GNUNET_DATASTORE_Handle *h = qe->h; 259 struct GNUNET_DATASTORE_Handle *h = qe->h;
273 260
274 GNUNET_CONTAINER_DLL_remove (h->queue_head, 261 GNUNET_CONTAINER_DLL_remove(h->queue_head,
275 h->queue_tail, 262 h->queue_tail,
276 qe); 263 qe);
277 h->queue_size--; 264 h->queue_size--;
278 if (NULL != qe->env) 265 if (NULL != qe->env)
279 GNUNET_MQ_discard (qe->env); 266 GNUNET_MQ_discard(qe->env);
280 if (NULL != qe->delay_warn_task) 267 if (NULL != qe->delay_warn_task)
281 GNUNET_SCHEDULER_cancel (qe->delay_warn_task); 268 GNUNET_SCHEDULER_cancel(qe->delay_warn_task);
282 GNUNET_free (qe); 269 GNUNET_free(qe);
283} 270}
284 271
285 272
@@ -289,20 +276,20 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
289 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is 276 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
290 */ 277 */
291static void 278static void
292delay_warning (void *cls) 279delay_warning(void *cls)
293{ 280{
294 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 281 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
295 282
296 qe->delay_warn_task = NULL; 283 qe->delay_warn_task = NULL;
297 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 284 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
298 "Request %p of type %u at head of datastore queue for more than %s\n", 285 "Request %p of type %u at head of datastore queue for more than %s\n",
299 qe, 286 qe,
300 (unsigned int) qe->response_type, 287 (unsigned int)qe->response_type,
301 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT, 288 GNUNET_STRINGS_relative_time_to_string(DELAY_WARN_TIMEOUT,
302 GNUNET_YES)); 289 GNUNET_YES));
303 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, 290 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT,
304 &delay_warning, 291 &delay_warning,
305 qe); 292 qe);
306} 293}
307 294
308 295
@@ -313,57 +300,59 @@ delay_warning (void *cls)
313 * @param error error code 300 * @param error error code
314 */ 301 */
315static void 302static void
316mq_error_handler (void *cls, 303mq_error_handler(void *cls,
317 enum GNUNET_MQ_Error error) 304 enum GNUNET_MQ_Error error)
318{ 305{
319 struct GNUNET_DATASTORE_Handle *h = cls; 306 struct GNUNET_DATASTORE_Handle *h = cls;
320 struct GNUNET_DATASTORE_QueueEntry *qe; 307 struct GNUNET_DATASTORE_QueueEntry *qe;
321 308
322 LOG (GNUNET_ERROR_TYPE_DEBUG, 309 LOG(GNUNET_ERROR_TYPE_DEBUG,
323 "MQ error, reconnecting to DATASTORE\n"); 310 "MQ error, reconnecting to DATASTORE\n");
324 do_disconnect (h); 311 do_disconnect(h);
325 qe = h->queue_head; 312 qe = h->queue_head;
326 if (NULL == qe) 313 if (NULL == qe)
327 return; 314 return;
328 if (NULL != qe->delay_warn_task) 315 if (NULL != qe->delay_warn_task)
329 { 316 {
330 GNUNET_SCHEDULER_cancel (qe->delay_warn_task); 317 GNUNET_SCHEDULER_cancel(qe->delay_warn_task);
331 qe->delay_warn_task = NULL; 318 qe->delay_warn_task = NULL;
332 } 319 }
333 if (NULL == qe->env) 320 if (NULL == qe->env)
334 {
335 union QueueContext qc = qe->qc;
336 uint16_t rt = qe->response_type;
337
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "Failed to receive response from database.\n");
340 free_queue_entry (qe);
341 switch (rt)
342 { 321 {
343 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: 322 union QueueContext qc = qe->qc;
344 if (NULL != qc.sc.cont) 323 uint16_t rt = qe->response_type;
345 qc.sc.cont (qc.sc.cont_cls, 324
346 GNUNET_SYSERR, 325 LOG(GNUNET_ERROR_TYPE_DEBUG,
347 GNUNET_TIME_UNIT_ZERO_ABS, 326 "Failed to receive response from database.\n");
348 _("DATASTORE disconnected")); 327 free_queue_entry(qe);
349 break; 328 switch (rt)
350 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: 329 {
351 if (NULL != qc.rc.proc) 330 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
352 qc.rc.proc (qc.rc.proc_cls, 331 if (NULL != qc.sc.cont)
353 NULL, 332 qc.sc.cont(qc.sc.cont_cls,
354 0, 333 GNUNET_SYSERR,
355 NULL, 334 GNUNET_TIME_UNIT_ZERO_ABS,
356 0, 335 _("DATASTORE disconnected"));
357 0, 336 break;
358 0, 337
359 0, 338 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
360 GNUNET_TIME_UNIT_ZERO_ABS, 339 if (NULL != qc.rc.proc)
361 0); 340 qc.rc.proc(qc.rc.proc_cls,
362 break; 341 NULL,
363 default: 342 0,
364 GNUNET_break (0); 343 NULL,
344 0,
345 0,
346 0,
347 0,
348 GNUNET_TIME_UNIT_ZERO_ABS,
349 0);
350 break;
351
352 default:
353 GNUNET_break(0);
354 }
365 } 355 }
366 }
367} 356}
368 357
369 358
@@ -374,22 +363,22 @@ mq_error_handler (void *cls,
374 * @return handle to use to access the service 363 * @return handle to use to access the service
375 */ 364 */
376struct GNUNET_DATASTORE_Handle * 365struct GNUNET_DATASTORE_Handle *
377GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 366GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
378{ 367{
379 struct GNUNET_DATASTORE_Handle *h; 368 struct GNUNET_DATASTORE_Handle *h;
380 369
381 LOG (GNUNET_ERROR_TYPE_DEBUG, 370 LOG(GNUNET_ERROR_TYPE_DEBUG,
382 "Establishing DATASTORE connection!\n"); 371 "Establishing DATASTORE connection!\n");
383 h = GNUNET_new (struct GNUNET_DATASTORE_Handle); 372 h = GNUNET_new(struct GNUNET_DATASTORE_Handle);
384 h->cfg = cfg; 373 h->cfg = cfg;
385 try_reconnect (h); 374 try_reconnect(h);
386 if (NULL == h->mq) 375 if (NULL == h->mq)
387 { 376 {
388 GNUNET_free (h); 377 GNUNET_free(h);
389 return NULL; 378 return NULL;
390 } 379 }
391 h->stats = GNUNET_STATISTICS_create ("datastore-api", 380 h->stats = GNUNET_STATISTICS_create("datastore-api",
392 cfg); 381 cfg);
393 return h; 382 return h;
394} 383}
395 384
@@ -401,14 +390,14 @@ GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
401 * @param cls the datastore handle 390 * @param cls the datastore handle
402 */ 391 */
403static void 392static void
404disconnect_after_drop (void *cls) 393disconnect_after_drop(void *cls)
405{ 394{
406 struct GNUNET_DATASTORE_Handle *h = cls; 395 struct GNUNET_DATASTORE_Handle *h = cls;
407 396
408 LOG (GNUNET_ERROR_TYPE_DEBUG, 397 LOG(GNUNET_ERROR_TYPE_DEBUG,
409 "Drop sent, disconnecting\n"); 398 "Drop sent, disconnecting\n");
410 GNUNET_DATASTORE_disconnect (h, 399 GNUNET_DATASTORE_disconnect(h,
411 GNUNET_NO); 400 GNUNET_NO);
412} 401}
413 402
414 403
@@ -419,15 +408,15 @@ disconnect_after_drop (void *cls)
419 * @param error error code 408 * @param error error code
420 */ 409 */
421static void 410static void
422disconnect_on_mq_error (void *cls, 411disconnect_on_mq_error(void *cls,
423 enum GNUNET_MQ_Error error) 412 enum GNUNET_MQ_Error error)
424{ 413{
425 struct GNUNET_DATASTORE_Handle *h = cls; 414 struct GNUNET_DATASTORE_Handle *h = cls;
426 415
427 LOG (GNUNET_ERROR_TYPE_ERROR, 416 LOG(GNUNET_ERROR_TYPE_ERROR,
428 "Failed to ask datastore to drop tables\n"); 417 "Failed to ask datastore to drop tables\n");
429 GNUNET_DATASTORE_disconnect (h, 418 GNUNET_DATASTORE_disconnect(h,
430 GNUNET_NO); 419 GNUNET_NO);
431} 420}
432 421
433 422
@@ -439,82 +428,84 @@ disconnect_on_mq_error (void *cls,
439 * @param drop set to #GNUNET_YES to delete all data in datastore (!) 428 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
440 */ 429 */
441void 430void
442GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, 431GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h,
443 int drop) 432 int drop)
444{ 433{
445 struct GNUNET_DATASTORE_QueueEntry *qe; 434 struct GNUNET_DATASTORE_QueueEntry *qe;
446 435
447 LOG (GNUNET_ERROR_TYPE_DEBUG, 436 LOG(GNUNET_ERROR_TYPE_DEBUG,
448 "Datastore disconnect\n"); 437 "Datastore disconnect\n");
449 if (NULL != h->mq) 438 if (NULL != h->mq)
450 { 439 {
451 GNUNET_MQ_destroy (h->mq); 440 GNUNET_MQ_destroy(h->mq);
452 h->mq = NULL; 441 h->mq = NULL;
453 } 442 }
454 if (NULL != h->reconnect_task) 443 if (NULL != h->reconnect_task)
455 { 444 {
456 GNUNET_SCHEDULER_cancel (h->reconnect_task); 445 GNUNET_SCHEDULER_cancel(h->reconnect_task);
457 h->reconnect_task = NULL; 446 h->reconnect_task = NULL;
458 } 447 }
459 while (NULL != (qe = h->queue_head)) 448 while (NULL != (qe = h->queue_head))
460 {
461 switch (qe->response_type)
462 { 449 {
463 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: 450 switch (qe->response_type)
464 if (NULL != qe->qc.sc.cont) 451 {
465 qe->qc.sc.cont (qe->qc.sc.cont_cls, 452 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
466 GNUNET_SYSERR, 453 if (NULL != qe->qc.sc.cont)
467 GNUNET_TIME_UNIT_ZERO_ABS, 454 qe->qc.sc.cont(qe->qc.sc.cont_cls,
468 _("Disconnected from DATASTORE")); 455 GNUNET_SYSERR,
469 break; 456 GNUNET_TIME_UNIT_ZERO_ABS,
470 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: 457 _("Disconnected from DATASTORE"));
471 if (NULL != qe->qc.rc.proc) 458 break;
472 qe->qc.rc.proc (qe->qc.rc.proc_cls, 459
473 NULL, 460 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
474 0, 461 if (NULL != qe->qc.rc.proc)
475 NULL, 462 qe->qc.rc.proc(qe->qc.rc.proc_cls,
476 0, 463 NULL,
477 0, 464 0,
478 0, 465 NULL,
479 0, 466 0,
480 GNUNET_TIME_UNIT_ZERO_ABS, 467 0,
481 0); 468 0,
482 break; 469 0,
483 default: 470 GNUNET_TIME_UNIT_ZERO_ABS,
484 GNUNET_break (0); 471 0);
472 break;
473
474 default:
475 GNUNET_break(0);
476 }
477 free_queue_entry(qe);
485 } 478 }
486 free_queue_entry (qe);
487 }
488 if (GNUNET_YES == drop) 479 if (GNUNET_YES == drop)
489 {
490 LOG (GNUNET_ERROR_TYPE_DEBUG,
491 "Re-connecting to issue DROP!\n");
492 GNUNET_assert (NULL == h->mq);
493 h->mq = GNUNET_CLIENT_connect (h->cfg,
494 "datastore",
495 NULL,
496 &disconnect_on_mq_error,
497 h);
498 if (NULL != h->mq)
499 { 480 {
500 struct GNUNET_MessageHeader *hdr; 481 LOG(GNUNET_ERROR_TYPE_DEBUG,
501 struct GNUNET_MQ_Envelope *env; 482 "Re-connecting to issue DROP!\n");
502 483 GNUNET_assert(NULL == h->mq);
503 env = GNUNET_MQ_msg (hdr, 484 h->mq = GNUNET_CLIENT_connect(h->cfg,
504 GNUNET_MESSAGE_TYPE_DATASTORE_DROP); 485 "datastore",
505 GNUNET_MQ_notify_sent (env, 486 NULL,
506 &disconnect_after_drop, 487 &disconnect_on_mq_error,
507 h); 488 h);
508 GNUNET_MQ_send (h->mq, 489 if (NULL != h->mq)
509 env); 490 {
510 return; 491 struct GNUNET_MessageHeader *hdr;
492 struct GNUNET_MQ_Envelope *env;
493
494 env = GNUNET_MQ_msg(hdr,
495 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
496 GNUNET_MQ_notify_sent(env,
497 &disconnect_after_drop,
498 h);
499 GNUNET_MQ_send(h->mq,
500 env);
501 return;
502 }
503 GNUNET_break(0);
511 } 504 }
512 GNUNET_break (0); 505 GNUNET_STATISTICS_destroy(h->stats,
513 } 506 GNUNET_NO);
514 GNUNET_STATISTICS_destroy (h->stats,
515 GNUNET_NO);
516 h->stats = NULL; 507 h->stats = NULL;
517 GNUNET_free (h); 508 GNUNET_free(h);
518} 509}
519 510
520 511
@@ -534,45 +525,45 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
534 * @return NULL if the queue is full 525 * @return NULL if the queue is full
535 */ 526 */
536static struct GNUNET_DATASTORE_QueueEntry * 527static struct GNUNET_DATASTORE_QueueEntry *
537make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 528make_queue_entry(struct GNUNET_DATASTORE_Handle *h,
538 struct GNUNET_MQ_Envelope *env, 529 struct GNUNET_MQ_Envelope *env,
539 unsigned int queue_priority, 530 unsigned int queue_priority,
540 unsigned int max_queue_size, 531 unsigned int max_queue_size,
541 uint16_t expected_type, 532 uint16_t expected_type,
542 const union QueueContext *qc) 533 const union QueueContext *qc)
543{ 534{
544 struct GNUNET_DATASTORE_QueueEntry *qe; 535 struct GNUNET_DATASTORE_QueueEntry *qe;
545 struct GNUNET_DATASTORE_QueueEntry *pos; 536 struct GNUNET_DATASTORE_QueueEntry *pos;
546 unsigned int c; 537 unsigned int c;
547 538
548 if ( (NULL != h->queue_tail) && 539 if ((NULL != h->queue_tail) &&
549 (h->queue_tail->priority >= queue_priority) ) 540 (h->queue_tail->priority >= queue_priority))
550 { 541 {
551 c = h->queue_size; 542 c = h->queue_size;
552 pos = NULL; 543 pos = NULL;
553 } 544 }
554 else 545 else
555 { 546 {
556 c = 0; 547 c = 0;
557 pos = h->queue_head; 548 pos = h->queue_head;
558 } 549 }
559 while ( (NULL != pos) && 550 while ((NULL != pos) &&
560 (c < max_queue_size) && 551 (c < max_queue_size) &&
561 (pos->priority >= queue_priority) ) 552 (pos->priority >= queue_priority))
562 { 553 {
563 c++; 554 c++;
564 pos = pos->next; 555 pos = pos->next;
565 } 556 }
566 if (c >= max_queue_size) 557 if (c >= max_queue_size)
567 { 558 {
568 GNUNET_STATISTICS_update (h->stats, 559 GNUNET_STATISTICS_update(h->stats,
569 gettext_noop ("# queue overflows"), 560 gettext_noop("# queue overflows"),
570 1, 561 1,
571 GNUNET_NO); 562 GNUNET_NO);
572 GNUNET_MQ_discard (env); 563 GNUNET_MQ_discard(env);
573 return NULL; 564 return NULL;
574 } 565 }
575 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); 566 qe = GNUNET_new(struct GNUNET_DATASTORE_QueueEntry);
576 qe->h = h; 567 qe->h = h;
577 qe->env = env; 568 qe->env = env;
578 qe->response_type = expected_type; 569 qe->response_type = expected_type;
@@ -580,30 +571,30 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
580 qe->priority = queue_priority; 571 qe->priority = queue_priority;
581 qe->max_queue = max_queue_size; 572 qe->max_queue = max_queue_size;
582 if (NULL == pos) 573 if (NULL == pos)
583 { 574 {
584 /* append at the tail */ 575 /* append at the tail */
585 pos = h->queue_tail; 576 pos = h->queue_tail;
586 } 577 }
587 else 578 else
588 { 579 {
589 pos = pos->prev; 580 pos = pos->prev;
590 /* do not insert at HEAD if HEAD query was already 581 /* do not insert at HEAD if HEAD query was already
591 * transmitted and we are still receiving replies! */ 582 * transmitted and we are still receiving replies! */
592 if ( (NULL == pos) && 583 if ((NULL == pos) &&
593 (NULL == h->queue_head->env) ) 584 (NULL == h->queue_head->env))
594 pos = h->queue_head; 585 pos = h->queue_head;
595 } 586 }
596 c++; 587 c++;
597#if INSANE_STATISTICS 588#if INSANE_STATISTICS
598 GNUNET_STATISTICS_update (h->stats, 589 GNUNET_STATISTICS_update(h->stats,
599 gettext_noop ("# queue entries created"), 590 gettext_noop("# queue entries created"),
600 1, 591 1,
601 GNUNET_NO); 592 GNUNET_NO);
602#endif 593#endif
603 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, 594 GNUNET_CONTAINER_DLL_insert_after(h->queue_head,
604 h->queue_tail, 595 h->queue_tail,
605 pos, 596 pos,
606 qe); 597 qe);
607 h->queue_size++; 598 h->queue_size++;
608 return qe; 599 return qe;
609} 600}
@@ -616,37 +607,37 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
616 * @param h handle to the datastore 607 * @param h handle to the datastore
617 */ 608 */
618static void 609static void
619process_queue (struct GNUNET_DATASTORE_Handle *h) 610process_queue(struct GNUNET_DATASTORE_Handle *h)
620{ 611{
621 struct GNUNET_DATASTORE_QueueEntry *qe; 612 struct GNUNET_DATASTORE_QueueEntry *qe;
622 613
623 if (NULL == (qe = h->queue_head)) 614 if (NULL == (qe = h->queue_head))
624 { 615 {
625 /* no entry in queue */ 616 /* no entry in queue */
626 LOG (GNUNET_ERROR_TYPE_DEBUG, 617 LOG(GNUNET_ERROR_TYPE_DEBUG,
627 "Queue empty\n"); 618 "Queue empty\n");
628 return; 619 return;
629 } 620 }
630 if (NULL == qe->env) 621 if (NULL == qe->env)
631 { 622 {
632 /* waiting for replies */ 623 /* waiting for replies */
633 LOG (GNUNET_ERROR_TYPE_DEBUG, 624 LOG(GNUNET_ERROR_TYPE_DEBUG,
634 "Head request already transmitted\n"); 625 "Head request already transmitted\n");
635 return; 626 return;
636 } 627 }
637 if (NULL == h->mq) 628 if (NULL == h->mq)
638 { 629 {
639 /* waiting for reconnect */ 630 /* waiting for reconnect */
640 LOG (GNUNET_ERROR_TYPE_DEBUG, 631 LOG(GNUNET_ERROR_TYPE_DEBUG,
641 "Not connected\n"); 632 "Not connected\n");
642 return; 633 return;
643 } 634 }
644 GNUNET_assert (NULL == qe->delay_warn_task); 635 GNUNET_assert(NULL == qe->delay_warn_task);
645 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, 636 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT,
646 &delay_warning, 637 &delay_warning,
647 qe); 638 qe);
648 GNUNET_MQ_send (h->mq, 639 GNUNET_MQ_send(h->mq,
649 qe->env); 640 qe->env);
650 qe->env = NULL; 641 qe->env = NULL;
651} 642}
652 643
@@ -659,36 +650,36 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
659 * @return the queue entry 650 * @return the queue entry
660 */ 651 */
661static struct GNUNET_DATASTORE_QueueEntry * 652static struct GNUNET_DATASTORE_QueueEntry *
662get_queue_head (struct GNUNET_DATASTORE_Handle *h, 653get_queue_head(struct GNUNET_DATASTORE_Handle *h,
663 uint16_t response_type) 654 uint16_t response_type)
664{ 655{
665 struct GNUNET_DATASTORE_QueueEntry *qe; 656 struct GNUNET_DATASTORE_QueueEntry *qe;
666 657
667 if (h->skip_next_messages > 0) 658 if (h->skip_next_messages > 0)
668 { 659 {
669 h->skip_next_messages--; 660 h->skip_next_messages--;
670 process_queue (h); 661 process_queue(h);
671 return NULL; 662 return NULL;
672 } 663 }
673 qe = h->queue_head; 664 qe = h->queue_head;
674 if (NULL == qe) 665 if (NULL == qe)
675 { 666 {
676 GNUNET_break (0); 667 GNUNET_break(0);
677 do_disconnect (h); 668 do_disconnect(h);
678 return NULL; 669 return NULL;
679 } 670 }
680 if (NULL != qe->env) 671 if (NULL != qe->env)
681 { 672 {
682 GNUNET_break (0); 673 GNUNET_break(0);
683 do_disconnect (h); 674 do_disconnect(h);
684 return NULL; 675 return NULL;
685 } 676 }
686 if (response_type != qe->response_type) 677 if (response_type != qe->response_type)
687 { 678 {
688 GNUNET_break (0); 679 GNUNET_break(0);
689 do_disconnect (h); 680 do_disconnect(h);
690 return NULL; 681 return NULL;
691 } 682 }
692 return qe; 683 return qe;
693} 684}
694 685
@@ -701,27 +692,27 @@ get_queue_head (struct GNUNET_DATASTORE_Handle *h,
701 * @return #GNUNET_OK if the message is well-formed 692 * @return #GNUNET_OK if the message is well-formed
702 */ 693 */
703static int 694static int
704check_status (void *cls, 695check_status(void *cls,
705 const struct StatusMessage *sm) 696 const struct StatusMessage *sm)
706{ 697{
707 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); 698 uint16_t msize = ntohs(sm->header.size) - sizeof(*sm);
708 int32_t status = ntohl (sm->status); 699 int32_t status = ntohl(sm->status);
709 700
710 if (msize > 0) 701 if (msize > 0)
711 { 702 {
712 const char *emsg = (const char *) &sm[1]; 703 const char *emsg = (const char *)&sm[1];
713 704
714 if ('\0' != emsg[msize - 1]) 705 if ('\0' != emsg[msize - 1])
706 {
707 GNUNET_break(0);
708 return GNUNET_SYSERR;
709 }
710 }
711 else if (GNUNET_SYSERR == status)
715 { 712 {
716 GNUNET_break (0); 713 GNUNET_break(0);
717 return GNUNET_SYSERR; 714 return GNUNET_SYSERR;
718 } 715 }
719 }
720 else if (GNUNET_SYSERR == status)
721 {
722 GNUNET_break (0);
723 return GNUNET_SYSERR;
724 }
725 return GNUNET_OK; 716 return GNUNET_OK;
726} 717}
727 718
@@ -733,40 +724,40 @@ check_status (void *cls,
733 * @param sm status message received 724 * @param sm status message received
734 */ 725 */
735static void 726static void
736handle_status (void *cls, 727handle_status(void *cls,
737 const struct StatusMessage *sm) 728 const struct StatusMessage *sm)
738{ 729{
739 struct GNUNET_DATASTORE_Handle *h = cls; 730 struct GNUNET_DATASTORE_Handle *h = cls;
740 struct GNUNET_DATASTORE_QueueEntry *qe; 731 struct GNUNET_DATASTORE_QueueEntry *qe;
741 struct StatusContext rc; 732 struct StatusContext rc;
742 const char *emsg; 733 const char *emsg;
743 int32_t status = ntohl (sm->status); 734 int32_t status = ntohl(sm->status);
744 735
745 qe = get_queue_head (h, 736 qe = get_queue_head(h,
746 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); 737 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
747 if (NULL == qe) 738 if (NULL == qe)
748 return; 739 return;
749 rc = qe->qc.sc; 740 rc = qe->qc.sc;
750 free_queue_entry (qe); 741 free_queue_entry(qe);
751 if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) 742 if (ntohs(sm->header.size) > sizeof(struct StatusMessage))
752 emsg = (const char *) &sm[1]; 743 emsg = (const char *)&sm[1];
753 else 744 else
754 emsg = NULL; 745 emsg = NULL;
755 LOG (GNUNET_ERROR_TYPE_DEBUG, 746 LOG(GNUNET_ERROR_TYPE_DEBUG,
756 "Received status %d/%s\n", 747 "Received status %d/%s\n",
757 (int) status, 748 (int)status,
758 emsg); 749 emsg);
759 GNUNET_STATISTICS_update (h->stats, 750 GNUNET_STATISTICS_update(h->stats,
760 gettext_noop ("# status messages received"), 751 gettext_noop("# status messages received"),
761 1, 752 1,
762 GNUNET_NO); 753 GNUNET_NO);
763 h->retry_time = GNUNET_TIME_UNIT_ZERO; 754 h->retry_time = GNUNET_TIME_UNIT_ZERO;
764 process_queue (h); 755 process_queue(h);
765 if (NULL != rc.cont) 756 if (NULL != rc.cont)
766 rc.cont (rc.cont_cls, 757 rc.cont(rc.cont_cls,
767 status, 758 status,
768 GNUNET_TIME_absolute_ntoh (sm->min_expiration), 759 GNUNET_TIME_absolute_ntoh(sm->min_expiration),
769 emsg); 760 emsg);
770} 761}
771 762
772 763
@@ -777,16 +768,16 @@ handle_status (void *cls,
777 * @param dm message received 768 * @param dm message received
778 */ 769 */
779static int 770static int
780check_data (void *cls, 771check_data(void *cls,
781 const struct DataMessage *dm) 772 const struct DataMessage *dm)
782{ 773{
783 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); 774 uint16_t msize = ntohs(dm->header.size) - sizeof(*dm);
784 775
785 if (msize != ntohl (dm->size)) 776 if (msize != ntohl(dm->size))
786 { 777 {
787 GNUNET_break (0); 778 GNUNET_break(0);
788 return GNUNET_SYSERR; 779 return GNUNET_SYSERR;
789 } 780 }
790 return GNUNET_OK; 781 return GNUNET_OK;
791} 782}
792 783
@@ -798,44 +789,44 @@ check_data (void *cls,
798 * @param dm message received 789 * @param dm message received
799 */ 790 */
800static void 791static void
801handle_data (void *cls, 792handle_data(void *cls,
802 const struct DataMessage *dm) 793 const struct DataMessage *dm)
803{ 794{
804 struct GNUNET_DATASTORE_Handle *h = cls; 795 struct GNUNET_DATASTORE_Handle *h = cls;
805 struct GNUNET_DATASTORE_QueueEntry *qe; 796 struct GNUNET_DATASTORE_QueueEntry *qe;
806 struct ResultContext rc; 797 struct ResultContext rc;
807 798
808 qe = get_queue_head (h, 799 qe = get_queue_head(h,
809 GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 800 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
810 if (NULL == qe) 801 if (NULL == qe)
811 return; 802 return;
812#if INSANE_STATISTICS 803#if INSANE_STATISTICS
813 GNUNET_STATISTICS_update (h->stats, 804 GNUNET_STATISTICS_update(h->stats,
814 gettext_noop ("# Results received"), 805 gettext_noop("# Results received"),
815 1, 806 1,
816 GNUNET_NO); 807 GNUNET_NO);
817#endif 808#endif
818 LOG (GNUNET_ERROR_TYPE_DEBUG, 809 LOG(GNUNET_ERROR_TYPE_DEBUG,
819 "Received result %llu with type %u and size %u with key %s\n", 810 "Received result %llu with type %u and size %u with key %s\n",
820 (unsigned long long) GNUNET_ntohll (dm->uid), 811 (unsigned long long)GNUNET_ntohll(dm->uid),
821 ntohl (dm->type), 812 ntohl(dm->type),
822 ntohl (dm->size), 813 ntohl(dm->size),
823 GNUNET_h2s (&dm->key)); 814 GNUNET_h2s(&dm->key));
824 rc = qe->qc.rc; 815 rc = qe->qc.rc;
825 free_queue_entry (qe); 816 free_queue_entry(qe);
826 h->retry_time = GNUNET_TIME_UNIT_ZERO; 817 h->retry_time = GNUNET_TIME_UNIT_ZERO;
827 process_queue (h); 818 process_queue(h);
828 if (NULL != rc.proc) 819 if (NULL != rc.proc)
829 rc.proc (rc.proc_cls, 820 rc.proc(rc.proc_cls,
830 &dm->key, 821 &dm->key,
831 ntohl (dm->size), 822 ntohl(dm->size),
832 &dm[1], 823 &dm[1],
833 ntohl (dm->type), 824 ntohl(dm->type),
834 ntohl (dm->priority), 825 ntohl(dm->priority),
835 ntohl (dm->anonymity), 826 ntohl(dm->anonymity),
836 ntohl (dm->replication), 827 ntohl(dm->replication),
837 GNUNET_TIME_absolute_ntoh (dm->expiration), 828 GNUNET_TIME_absolute_ntoh(dm->expiration),
838 GNUNET_ntohll (dm->uid)); 829 GNUNET_ntohll(dm->uid));
839} 830}
840 831
841 832
@@ -847,37 +838,37 @@ handle_data (void *cls,
847 * @param msg message received 838 * @param msg message received
848 */ 839 */
849static void 840static void
850handle_data_end (void *cls, 841handle_data_end(void *cls,
851 const struct GNUNET_MessageHeader *msg) 842 const struct GNUNET_MessageHeader *msg)
852{ 843{
853 struct GNUNET_DATASTORE_Handle *h = cls; 844 struct GNUNET_DATASTORE_Handle *h = cls;
854 struct GNUNET_DATASTORE_QueueEntry *qe; 845 struct GNUNET_DATASTORE_QueueEntry *qe;
855 struct ResultContext rc; 846 struct ResultContext rc;
856 847
857 qe = get_queue_head (h, 848 qe = get_queue_head(h,
858 GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 849 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
859 if (NULL == qe) 850 if (NULL == qe)
860 return; 851 return;
861 rc = qe->qc.rc; 852 rc = qe->qc.rc;
862 free_queue_entry (qe); 853 free_queue_entry(qe);
863 LOG (GNUNET_ERROR_TYPE_DEBUG, 854 LOG(GNUNET_ERROR_TYPE_DEBUG,
864 "Received end of result set, new queue size is %u\n", 855 "Received end of result set, new queue size is %u\n",
865 h->queue_size); 856 h->queue_size);
866 h->retry_time = GNUNET_TIME_UNIT_ZERO; 857 h->retry_time = GNUNET_TIME_UNIT_ZERO;
867 h->result_count = 0; 858 h->result_count = 0;
868 process_queue (h); 859 process_queue(h);
869 /* signal end of iteration */ 860 /* signal end of iteration */
870 if (NULL != rc.proc) 861 if (NULL != rc.proc)
871 rc.proc (rc.proc_cls, 862 rc.proc(rc.proc_cls,
872 NULL, 863 NULL,
873 0, 864 0,
874 NULL, 865 NULL,
875 0, 866 0,
876 0, 867 0,
877 0, 868 0,
878 0, 869 0,
879 GNUNET_TIME_UNIT_ZERO_ABS, 870 GNUNET_TIME_UNIT_ZERO_ABS,
880 0); 871 0);
881} 872}
882 873
883 874
@@ -887,42 +878,42 @@ handle_data_end (void *cls,
887 * @param cls the `struct GNUNET_DATASTORE_Handle` 878 * @param cls the `struct GNUNET_DATASTORE_Handle`
888 */ 879 */
889static void 880static void
890try_reconnect (void *cls) 881try_reconnect(void *cls)
891{ 882{
892 struct GNUNET_DATASTORE_Handle *h = cls; 883 struct GNUNET_DATASTORE_Handle *h = cls;
893 struct GNUNET_MQ_MessageHandler handlers[] = { 884 struct GNUNET_MQ_MessageHandler handlers[] = {
894 GNUNET_MQ_hd_var_size (status, 885 GNUNET_MQ_hd_var_size(status,
895 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 886 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
896 struct StatusMessage, 887 struct StatusMessage,
897 h), 888 h),
898 GNUNET_MQ_hd_var_size (data, 889 GNUNET_MQ_hd_var_size(data,
899 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 890 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
900 struct DataMessage, 891 struct DataMessage,
901 h), 892 h),
902 GNUNET_MQ_hd_fixed_size (data_end, 893 GNUNET_MQ_hd_fixed_size(data_end,
903 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, 894 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
904 struct GNUNET_MessageHeader, 895 struct GNUNET_MessageHeader,
905 h), 896 h),
906 GNUNET_MQ_handler_end () 897 GNUNET_MQ_handler_end()
907 }; 898 };
908 899
909 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); 900 h->retry_time = GNUNET_TIME_STD_BACKOFF(h->retry_time);
910 h->reconnect_task = NULL; 901 h->reconnect_task = NULL;
911 GNUNET_assert (NULL == h->mq); 902 GNUNET_assert(NULL == h->mq);
912 h->mq = GNUNET_CLIENT_connect (h->cfg, 903 h->mq = GNUNET_CLIENT_connect(h->cfg,
913 "datastore", 904 "datastore",
914 handlers, 905 handlers,
915 &mq_error_handler, 906 &mq_error_handler,
916 h); 907 h);
917 if (NULL == h->mq) 908 if (NULL == h->mq)
918 return; 909 return;
919 GNUNET_STATISTICS_update (h->stats, 910 GNUNET_STATISTICS_update(h->stats,
920 gettext_noop ("# datastore connections (re)created"), 911 gettext_noop("# datastore connections (re)created"),
921 1, 912 1,
922 GNUNET_NO); 913 GNUNET_NO);
923 LOG (GNUNET_ERROR_TYPE_DEBUG, 914 LOG(GNUNET_ERROR_TYPE_DEBUG,
924 "Reconnected to DATASTORE\n"); 915 "Reconnected to DATASTORE\n");
925 process_queue (h); 916 process_queue(h);
926} 917}
927 918
928 919
@@ -935,10 +926,10 @@ try_reconnect (void *cls)
935 * @param emsg error message 926 * @param emsg error message
936 */ 927 */
937static void 928static void
938drop_status_cont (void *cls, 929drop_status_cont(void *cls,
939 int32_t result, 930 int32_t result,
940 struct GNUNET_TIME_Absolute min_expiration, 931 struct GNUNET_TIME_Absolute min_expiration,
941 const char *emsg) 932 const char *emsg)
942{ 933{
943 /* do nothing */ 934 /* do nothing */
944} 935}
@@ -970,71 +961,71 @@ drop_status_cont (void *cls,
970 * (or rather, will already have been invoked) 961 * (or rather, will already have been invoked)
971 */ 962 */
972struct GNUNET_DATASTORE_QueueEntry * 963struct GNUNET_DATASTORE_QueueEntry *
973GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, 964GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h,
974 uint32_t rid, 965 uint32_t rid,
975 const struct GNUNET_HashCode *key, 966 const struct GNUNET_HashCode *key,
976 size_t size, 967 size_t size,
977 const void *data, 968 const void *data,
978 enum GNUNET_BLOCK_Type type, 969 enum GNUNET_BLOCK_Type type,
979 uint32_t priority, 970 uint32_t priority,
980 uint32_t anonymity, 971 uint32_t anonymity,
981 uint32_t replication, 972 uint32_t replication,
982 struct GNUNET_TIME_Absolute expiration, 973 struct GNUNET_TIME_Absolute expiration,
983 unsigned int queue_priority, 974 unsigned int queue_priority,
984 unsigned int max_queue_size, 975 unsigned int max_queue_size,
985 GNUNET_DATASTORE_ContinuationWithStatus cont, 976 GNUNET_DATASTORE_ContinuationWithStatus cont,
986 void *cont_cls) 977 void *cont_cls)
987{ 978{
988 struct GNUNET_DATASTORE_QueueEntry *qe; 979 struct GNUNET_DATASTORE_QueueEntry *qe;
989 struct GNUNET_MQ_Envelope *env; 980 struct GNUNET_MQ_Envelope *env;
990 struct DataMessage *dm; 981 struct DataMessage *dm;
991 union QueueContext qc; 982 union QueueContext qc;
992 983
993 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE) 984 if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
994 { 985 {
995 GNUNET_break (0); 986 GNUNET_break(0);
996 return NULL; 987 return NULL;
997 } 988 }
998 989
999 LOG (GNUNET_ERROR_TYPE_DEBUG, 990 LOG(GNUNET_ERROR_TYPE_DEBUG,
1000 "Asked to put %u bytes of data under key `%s' for %s\n", 991 "Asked to put %u bytes of data under key `%s' for %s\n",
1001 size, 992 size,
1002 GNUNET_h2s (key), 993 GNUNET_h2s(key),
1003 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), 994 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(expiration),
1004 GNUNET_YES)); 995 GNUNET_YES));
1005 env = GNUNET_MQ_msg_extra (dm, 996 env = GNUNET_MQ_msg_extra(dm,
1006 size, 997 size,
1007 GNUNET_MESSAGE_TYPE_DATASTORE_PUT); 998 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1008 dm->rid = htonl (rid); 999 dm->rid = htonl(rid);
1009 dm->size = htonl ((uint32_t) size); 1000 dm->size = htonl((uint32_t)size);
1010 dm->type = htonl (type); 1001 dm->type = htonl(type);
1011 dm->priority = htonl (priority); 1002 dm->priority = htonl(priority);
1012 dm->anonymity = htonl (anonymity); 1003 dm->anonymity = htonl(anonymity);
1013 dm->replication = htonl (replication); 1004 dm->replication = htonl(replication);
1014 dm->expiration = GNUNET_TIME_absolute_hton (expiration); 1005 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
1015 dm->key = *key; 1006 dm->key = *key;
1016 GNUNET_memcpy (&dm[1], 1007 GNUNET_memcpy(&dm[1],
1017 data, 1008 data,
1018 size); 1009 size);
1019 qc.sc.cont = cont; 1010 qc.sc.cont = cont;
1020 qc.sc.cont_cls = cont_cls; 1011 qc.sc.cont_cls = cont_cls;
1021 qe = make_queue_entry (h, 1012 qe = make_queue_entry(h,
1022 env, 1013 env,
1023 queue_priority, 1014 queue_priority,
1024 max_queue_size, 1015 max_queue_size,
1025 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1016 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1026 &qc); 1017 &qc);
1027 if (NULL == qe) 1018 if (NULL == qe)
1028 { 1019 {
1029 LOG (GNUNET_ERROR_TYPE_DEBUG, 1020 LOG(GNUNET_ERROR_TYPE_DEBUG,
1030 "Could not create queue entry for PUT\n"); 1021 "Could not create queue entry for PUT\n");
1031 return NULL; 1022 return NULL;
1032 } 1023 }
1033 GNUNET_STATISTICS_update (h->stats, 1024 GNUNET_STATISTICS_update(h->stats,
1034 gettext_noop ("# PUT requests executed"), 1025 gettext_noop("# PUT requests executed"),
1035 1, 1026 1,
1036 GNUNET_NO); 1027 GNUNET_NO);
1037 process_queue (h); 1028 process_queue(h);
1038 return qe; 1029 return qe;
1039} 1030}
1040 1031
@@ -1055,11 +1046,11 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
1055 * (or rather, will already have been invoked) 1046 * (or rather, will already have been invoked)
1056 */ 1047 */
1057struct GNUNET_DATASTORE_QueueEntry * 1048struct GNUNET_DATASTORE_QueueEntry *
1058GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, 1049GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h,
1059 uint64_t amount, 1050 uint64_t amount,
1060 uint32_t entries, 1051 uint32_t entries,
1061 GNUNET_DATASTORE_ContinuationWithStatus cont, 1052 GNUNET_DATASTORE_ContinuationWithStatus cont,
1062 void *cont_cls) 1053 void *cont_cls)
1063{ 1054{
1064 struct GNUNET_DATASTORE_QueueEntry *qe; 1055 struct GNUNET_DATASTORE_QueueEntry *qe;
1065 struct GNUNET_MQ_Envelope *env; 1056 struct GNUNET_MQ_Envelope *env;
@@ -1068,34 +1059,34 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1068 1059
1069 if (NULL == cont) 1060 if (NULL == cont)
1070 cont = &drop_status_cont; 1061 cont = &drop_status_cont;
1071 LOG (GNUNET_ERROR_TYPE_DEBUG, 1062 LOG(GNUNET_ERROR_TYPE_DEBUG,
1072 "Asked to reserve %llu bytes of data and %u entries\n", 1063 "Asked to reserve %llu bytes of data and %u entries\n",
1073 (unsigned long long) amount, 1064 (unsigned long long)amount,
1074 (unsigned int) entries); 1065 (unsigned int)entries);
1075 env = GNUNET_MQ_msg (rm, 1066 env = GNUNET_MQ_msg(rm,
1076 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); 1067 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1077 rm->entries = htonl (entries); 1068 rm->entries = htonl(entries);
1078 rm->amount = GNUNET_htonll (amount); 1069 rm->amount = GNUNET_htonll(amount);
1079 1070
1080 qc.sc.cont = cont; 1071 qc.sc.cont = cont;
1081 qc.sc.cont_cls = cont_cls; 1072 qc.sc.cont_cls = cont_cls;
1082 qe = make_queue_entry (h, 1073 qe = make_queue_entry(h,
1083 env, 1074 env,
1084 UINT_MAX, 1075 UINT_MAX,
1085 UINT_MAX, 1076 UINT_MAX,
1086 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1077 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1087 &qc); 1078 &qc);
1088 if (NULL == qe) 1079 if (NULL == qe)
1089 { 1080 {
1090 LOG (GNUNET_ERROR_TYPE_DEBUG, 1081 LOG(GNUNET_ERROR_TYPE_DEBUG,
1091 "Could not create queue entry to reserve\n"); 1082 "Could not create queue entry to reserve\n");
1092 return NULL; 1083 return NULL;
1093 } 1084 }
1094 GNUNET_STATISTICS_update (h->stats, 1085 GNUNET_STATISTICS_update(h->stats,
1095 gettext_noop ("# RESERVE requests executed"), 1086 gettext_noop("# RESERVE requests executed"),
1096 1, 1087 1,
1097 GNUNET_NO); 1088 GNUNET_NO);
1098 process_queue (h); 1089 process_queue(h);
1099 return qe; 1090 return qe;
1100} 1091}
1101 1092
@@ -1121,12 +1112,12 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1121 * (or rather, will already have been invoked) 1112 * (or rather, will already have been invoked)
1122 */ 1113 */
1123struct GNUNET_DATASTORE_QueueEntry * 1114struct GNUNET_DATASTORE_QueueEntry *
1124GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 1115GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h,
1125 uint32_t rid, 1116 uint32_t rid,
1126 unsigned int queue_priority, 1117 unsigned int queue_priority,
1127 unsigned int max_queue_size, 1118 unsigned int max_queue_size,
1128 GNUNET_DATASTORE_ContinuationWithStatus cont, 1119 GNUNET_DATASTORE_ContinuationWithStatus cont,
1129 void *cont_cls) 1120 void *cont_cls)
1130{ 1121{
1131 struct GNUNET_DATASTORE_QueueEntry *qe; 1122 struct GNUNET_DATASTORE_QueueEntry *qe;
1132 struct GNUNET_MQ_Envelope *env; 1123 struct GNUNET_MQ_Envelope *env;
@@ -1135,31 +1126,31 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1135 1126
1136 if (NULL == cont) 1127 if (NULL == cont)
1137 cont = &drop_status_cont; 1128 cont = &drop_status_cont;
1138 LOG (GNUNET_ERROR_TYPE_DEBUG, 1129 LOG(GNUNET_ERROR_TYPE_DEBUG,
1139 "Asked to release reserve %d\n", 1130 "Asked to release reserve %d\n",
1140 rid); 1131 rid);
1141 env = GNUNET_MQ_msg (rrm, 1132 env = GNUNET_MQ_msg(rrm,
1142 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); 1133 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1143 rrm->rid = htonl (rid); 1134 rrm->rid = htonl(rid);
1144 qc.sc.cont = cont; 1135 qc.sc.cont = cont;
1145 qc.sc.cont_cls = cont_cls; 1136 qc.sc.cont_cls = cont_cls;
1146 qe = make_queue_entry (h, 1137 qe = make_queue_entry(h,
1147 env, 1138 env,
1148 queue_priority, 1139 queue_priority,
1149 max_queue_size, 1140 max_queue_size,
1150 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1141 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1151 &qc); 1142 &qc);
1152 if (NULL == qe) 1143 if (NULL == qe)
1153 { 1144 {
1154 LOG (GNUNET_ERROR_TYPE_DEBUG, 1145 LOG(GNUNET_ERROR_TYPE_DEBUG,
1155 "Could not create queue entry to release reserve\n"); 1146 "Could not create queue entry to release reserve\n");
1156 return NULL; 1147 return NULL;
1157 } 1148 }
1158 GNUNET_STATISTICS_update (h->stats, 1149 GNUNET_STATISTICS_update(h->stats,
1159 gettext_noop 1150 gettext_noop
1160 ("# RELEASE RESERVE requests executed"), 1, 1151 ("# RELEASE RESERVE requests executed"), 1,
1161 GNUNET_NO); 1152 GNUNET_NO);
1162 process_queue (h); 1153 process_queue(h);
1163 return qe; 1154 return qe;
1164} 1155}
1165 1156
@@ -1185,60 +1176,60 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1185 * (or rather, will already have been invoked) 1176 * (or rather, will already have been invoked)
1186 */ 1177 */
1187struct GNUNET_DATASTORE_QueueEntry * 1178struct GNUNET_DATASTORE_QueueEntry *
1188GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, 1179GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h,
1189 const struct GNUNET_HashCode *key, 1180 const struct GNUNET_HashCode *key,
1190 size_t size, 1181 size_t size,
1191 const void *data, 1182 const void *data,
1192 unsigned int queue_priority, 1183 unsigned int queue_priority,
1193 unsigned int max_queue_size, 1184 unsigned int max_queue_size,
1194 GNUNET_DATASTORE_ContinuationWithStatus cont, 1185 GNUNET_DATASTORE_ContinuationWithStatus cont,
1195 void *cont_cls) 1186 void *cont_cls)
1196{ 1187{
1197 struct GNUNET_DATASTORE_QueueEntry *qe; 1188 struct GNUNET_DATASTORE_QueueEntry *qe;
1198 struct DataMessage *dm; 1189 struct DataMessage *dm;
1199 struct GNUNET_MQ_Envelope *env; 1190 struct GNUNET_MQ_Envelope *env;
1200 union QueueContext qc; 1191 union QueueContext qc;
1201 1192
1202 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) 1193 if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1203 { 1194 {
1204 GNUNET_break (0); 1195 GNUNET_break(0);
1205 return NULL; 1196 return NULL;
1206 } 1197 }
1207 if (NULL == cont) 1198 if (NULL == cont)
1208 cont = &drop_status_cont; 1199 cont = &drop_status_cont;
1209 LOG (GNUNET_ERROR_TYPE_DEBUG, 1200 LOG(GNUNET_ERROR_TYPE_DEBUG,
1210 "Asked to remove %u bytes under key `%s'\n", 1201 "Asked to remove %u bytes under key `%s'\n",
1211 size, 1202 size,
1212 GNUNET_h2s (key)); 1203 GNUNET_h2s(key));
1213 env = GNUNET_MQ_msg_extra (dm, 1204 env = GNUNET_MQ_msg_extra(dm,
1214 size, 1205 size,
1215 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); 1206 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1216 dm->size = htonl (size); 1207 dm->size = htonl(size);
1217 dm->key = *key; 1208 dm->key = *key;
1218 GNUNET_memcpy (&dm[1], 1209 GNUNET_memcpy(&dm[1],
1219 data, 1210 data,
1220 size); 1211 size);
1221 1212
1222 qc.sc.cont = cont; 1213 qc.sc.cont = cont;
1223 qc.sc.cont_cls = cont_cls; 1214 qc.sc.cont_cls = cont_cls;
1224 1215
1225 qe = make_queue_entry (h, 1216 qe = make_queue_entry(h,
1226 env, 1217 env,
1227 queue_priority, 1218 queue_priority,
1228 max_queue_size, 1219 max_queue_size,
1229 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, 1220 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1230 &qc); 1221 &qc);
1231 if (NULL == qe) 1222 if (NULL == qe)
1232 { 1223 {
1233 LOG (GNUNET_ERROR_TYPE_DEBUG, 1224 LOG(GNUNET_ERROR_TYPE_DEBUG,
1234 "Could not create queue entry for REMOVE\n"); 1225 "Could not create queue entry for REMOVE\n");
1235 return NULL; 1226 return NULL;
1236 } 1227 }
1237 GNUNET_STATISTICS_update (h->stats, 1228 GNUNET_STATISTICS_update(h->stats,
1238 gettext_noop ("# REMOVE requests executed"), 1229 gettext_noop("# REMOVE requests executed"),
1239 1, 1230 1,
1240 GNUNET_NO); 1231 GNUNET_NO);
1241 process_queue (h); 1232 process_queue(h);
1242 return qe; 1233 return qe;
1243} 1234}
1244 1235
@@ -1263,41 +1254,41 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1263 * cancel 1254 * cancel
1264 */ 1255 */
1265struct GNUNET_DATASTORE_QueueEntry * 1256struct GNUNET_DATASTORE_QueueEntry *
1266GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, 1257GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h,
1267 unsigned int queue_priority, 1258 unsigned int queue_priority,
1268 unsigned int max_queue_size, 1259 unsigned int max_queue_size,
1269 GNUNET_DATASTORE_DatumProcessor proc, 1260 GNUNET_DATASTORE_DatumProcessor proc,
1270 void *proc_cls) 1261 void *proc_cls)
1271{ 1262{
1272 struct GNUNET_DATASTORE_QueueEntry *qe; 1263 struct GNUNET_DATASTORE_QueueEntry *qe;
1273 struct GNUNET_MQ_Envelope *env; 1264 struct GNUNET_MQ_Envelope *env;
1274 struct GNUNET_MessageHeader *m; 1265 struct GNUNET_MessageHeader *m;
1275 union QueueContext qc; 1266 union QueueContext qc;
1276 1267
1277 GNUNET_assert (NULL != proc); 1268 GNUNET_assert(NULL != proc);
1278 LOG (GNUNET_ERROR_TYPE_DEBUG, 1269 LOG(GNUNET_ERROR_TYPE_DEBUG,
1279 "Asked to get replication entry\n"); 1270 "Asked to get replication entry\n");
1280 env = GNUNET_MQ_msg (m, 1271 env = GNUNET_MQ_msg(m,
1281 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); 1272 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1282 qc.rc.proc = proc; 1273 qc.rc.proc = proc;
1283 qc.rc.proc_cls = proc_cls; 1274 qc.rc.proc_cls = proc_cls;
1284 qe = make_queue_entry (h, 1275 qe = make_queue_entry(h,
1285 env, 1276 env,
1286 queue_priority, 1277 queue_priority,
1287 max_queue_size, 1278 max_queue_size,
1288 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1279 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1289 &qc); 1280 &qc);
1290 if (NULL == qe) 1281 if (NULL == qe)
1291 { 1282 {
1292 LOG (GNUNET_ERROR_TYPE_DEBUG, 1283 LOG(GNUNET_ERROR_TYPE_DEBUG,
1293 "Could not create queue entry for GET REPLICATION\n"); 1284 "Could not create queue entry for GET REPLICATION\n");
1294 return NULL; 1285 return NULL;
1295 } 1286 }
1296 GNUNET_STATISTICS_update (h->stats, 1287 GNUNET_STATISTICS_update(h->stats,
1297 gettext_noop 1288 gettext_noop
1298 ("# GET REPLICATION requests executed"), 1, 1289 ("# GET REPLICATION requests executed"), 1,
1299 GNUNET_NO); 1290 GNUNET_NO);
1300 process_queue (h); 1291 process_queue(h);
1301 return qe; 1292 return qe;
1302} 1293}
1303 1294
@@ -1319,47 +1310,47 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1319 * cancel 1310 * cancel
1320 */ 1311 */
1321struct GNUNET_DATASTORE_QueueEntry * 1312struct GNUNET_DATASTORE_QueueEntry *
1322GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1313GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h,
1323 uint64_t next_uid, 1314 uint64_t next_uid,
1324 unsigned int queue_priority, 1315 unsigned int queue_priority,
1325 unsigned int max_queue_size, 1316 unsigned int max_queue_size,
1326 enum GNUNET_BLOCK_Type type, 1317 enum GNUNET_BLOCK_Type type,
1327 GNUNET_DATASTORE_DatumProcessor proc, 1318 GNUNET_DATASTORE_DatumProcessor proc,
1328 void *proc_cls) 1319 void *proc_cls)
1329{ 1320{
1330 struct GNUNET_DATASTORE_QueueEntry *qe; 1321 struct GNUNET_DATASTORE_QueueEntry *qe;
1331 struct GNUNET_MQ_Envelope *env; 1322 struct GNUNET_MQ_Envelope *env;
1332 struct GetZeroAnonymityMessage *m; 1323 struct GetZeroAnonymityMessage *m;
1333 union QueueContext qc; 1324 union QueueContext qc;
1334 1325
1335 GNUNET_assert (NULL != proc); 1326 GNUNET_assert(NULL != proc);
1336 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1327 GNUNET_assert(type != GNUNET_BLOCK_TYPE_ANY);
1337 LOG (GNUNET_ERROR_TYPE_DEBUG, 1328 LOG(GNUNET_ERROR_TYPE_DEBUG,
1338 "Asked to get a zero-anonymity entry of type %d\n", 1329 "Asked to get a zero-anonymity entry of type %d\n",
1339 type); 1330 type);
1340 env = GNUNET_MQ_msg (m, 1331 env = GNUNET_MQ_msg(m,
1341 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1332 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1342 m->type = htonl ((uint32_t) type); 1333 m->type = htonl((uint32_t)type);
1343 m->next_uid = GNUNET_htonll (next_uid); 1334 m->next_uid = GNUNET_htonll(next_uid);
1344 qc.rc.proc = proc; 1335 qc.rc.proc = proc;
1345 qc.rc.proc_cls = proc_cls; 1336 qc.rc.proc_cls = proc_cls;
1346 qe = make_queue_entry (h, 1337 qe = make_queue_entry(h,
1347 env, 1338 env,
1348 queue_priority, 1339 queue_priority,
1349 max_queue_size, 1340 max_queue_size,
1350 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1341 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1351 &qc); 1342 &qc);
1352 if (NULL == qe) 1343 if (NULL == qe)
1353 { 1344 {
1354 LOG (GNUNET_ERROR_TYPE_DEBUG, 1345 LOG(GNUNET_ERROR_TYPE_DEBUG,
1355 "Could not create queue entry for zero-anonymity procation\n"); 1346 "Could not create queue entry for zero-anonymity procation\n");
1356 return NULL; 1347 return NULL;
1357 } 1348 }
1358 GNUNET_STATISTICS_update (h->stats, 1349 GNUNET_STATISTICS_update(h->stats,
1359 gettext_noop 1350 gettext_noop
1360 ("# GET ZERO ANONYMITY requests executed"), 1, 1351 ("# GET ZERO ANONYMITY requests executed"), 1,
1361 GNUNET_NO); 1352 GNUNET_NO);
1362 process_queue (h); 1353 process_queue(h);
1363 return qe; 1354 return qe;
1364} 1355}
1365 1356
@@ -1383,15 +1374,15 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1383 * cancel 1374 * cancel
1384 */ 1375 */
1385struct GNUNET_DATASTORE_QueueEntry * 1376struct GNUNET_DATASTORE_QueueEntry *
1386GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1377GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h,
1387 uint64_t next_uid, 1378 uint64_t next_uid,
1388 bool random, 1379 bool random,
1389 const struct GNUNET_HashCode *key, 1380 const struct GNUNET_HashCode *key,
1390 enum GNUNET_BLOCK_Type type, 1381 enum GNUNET_BLOCK_Type type,
1391 unsigned int queue_priority, 1382 unsigned int queue_priority,
1392 unsigned int max_queue_size, 1383 unsigned int max_queue_size,
1393 GNUNET_DATASTORE_DatumProcessor proc, 1384 GNUNET_DATASTORE_DatumProcessor proc,
1394 void *proc_cls) 1385 void *proc_cls)
1395{ 1386{
1396 struct GNUNET_DATASTORE_QueueEntry *qe; 1387 struct GNUNET_DATASTORE_QueueEntry *qe;
1397 struct GNUNET_MQ_Envelope *env; 1388 struct GNUNET_MQ_Envelope *env;
@@ -1399,50 +1390,50 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1399 struct GetMessage *gm; 1390 struct GetMessage *gm;
1400 union QueueContext qc; 1391 union QueueContext qc;
1401 1392
1402 GNUNET_assert (NULL != proc); 1393 GNUNET_assert(NULL != proc);
1403 LOG (GNUNET_ERROR_TYPE_DEBUG, 1394 LOG(GNUNET_ERROR_TYPE_DEBUG,
1404 "Asked to look for data of type %u under key `%s'\n", 1395 "Asked to look for data of type %u under key `%s'\n",
1405 (unsigned int) type, 1396 (unsigned int)type,
1406 GNUNET_h2s (key)); 1397 GNUNET_h2s(key));
1407 if (NULL == key) 1398 if (NULL == key)
1408 { 1399 {
1409 env = GNUNET_MQ_msg (gm, 1400 env = GNUNET_MQ_msg(gm,
1410 GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1401 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1411 gm->type = htonl (type); 1402 gm->type = htonl(type);
1412 gm->next_uid = GNUNET_htonll (next_uid); 1403 gm->next_uid = GNUNET_htonll(next_uid);
1413 gm->random = random; 1404 gm->random = random;
1414 } 1405 }
1415 else 1406 else
1416 { 1407 {
1417 env = GNUNET_MQ_msg (gkm, 1408 env = GNUNET_MQ_msg(gkm,
1418 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); 1409 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1419 gkm->type = htonl (type); 1410 gkm->type = htonl(type);
1420 gkm->next_uid = GNUNET_htonll (next_uid); 1411 gkm->next_uid = GNUNET_htonll(next_uid);
1421 gkm->random = random; 1412 gkm->random = random;
1422 gkm->key = *key; 1413 gkm->key = *key;
1423 } 1414 }
1424 qc.rc.proc = proc; 1415 qc.rc.proc = proc;
1425 qc.rc.proc_cls = proc_cls; 1416 qc.rc.proc_cls = proc_cls;
1426 qe = make_queue_entry (h, 1417 qe = make_queue_entry(h,
1427 env, 1418 env,
1428 queue_priority, 1419 queue_priority,
1429 max_queue_size, 1420 max_queue_size,
1430 GNUNET_MESSAGE_TYPE_DATASTORE_DATA, 1421 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1431 &qc); 1422 &qc);
1432 if (NULL == qe) 1423 if (NULL == qe)
1433 { 1424 {
1434 LOG (GNUNET_ERROR_TYPE_DEBUG, 1425 LOG(GNUNET_ERROR_TYPE_DEBUG,
1435 "Could not queue request for `%s'\n", 1426 "Could not queue request for `%s'\n",
1436 GNUNET_h2s (key)); 1427 GNUNET_h2s(key));
1437 return NULL; 1428 return NULL;
1438 } 1429 }
1439#if INSANE_STATISTICS 1430#if INSANE_STATISTICS
1440 GNUNET_STATISTICS_update (h->stats, 1431 GNUNET_STATISTICS_update(h->stats,
1441 gettext_noop ("# GET requests executed"), 1432 gettext_noop("# GET requests executed"),
1442 1, 1433 1,
1443 GNUNET_NO); 1434 GNUNET_NO);
1444#endif 1435#endif
1445 process_queue (h); 1436 process_queue(h);
1446 return qe; 1437 return qe;
1447} 1438}
1448 1439
@@ -1454,23 +1445,23 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1454 * @param qe operation to cancel 1445 * @param qe operation to cancel
1455 */ 1446 */
1456void 1447void
1457GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) 1448GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
1458{ 1449{
1459 struct GNUNET_DATASTORE_Handle *h = qe->h; 1450 struct GNUNET_DATASTORE_Handle *h = qe->h;
1460 1451
1461 LOG (GNUNET_ERROR_TYPE_DEBUG, 1452 LOG(GNUNET_ERROR_TYPE_DEBUG,
1462 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1453 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1463 qe, 1454 qe,
1464 NULL == qe->env, 1455 NULL == qe->env,
1465 h->queue_head == qe); 1456 h->queue_head == qe);
1466 if (NULL == qe->env) 1457 if (NULL == qe->env)
1467 { 1458 {
1468 free_queue_entry (qe); 1459 free_queue_entry(qe);
1469 h->skip_next_messages++; 1460 h->skip_next_messages++;
1470 return; 1461 return;
1471 } 1462 }
1472 free_queue_entry (qe); 1463 free_queue_entry(qe);
1473 process_queue (h); 1464 process_queue(h);
1474} 1465}
1475 1466
1476 1467