aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c1150
1 files changed, 537 insertions, 613 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index f355dfab5..85e402a4d 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -82,7 +82,7 @@ union QueueContext
82{ 82{
83 83
84 struct StatusContext sc; 84 struct StatusContext sc;
85 85
86 struct ResultContext rc; 86 struct ResultContext rc;
87 87
88}; 88};
@@ -121,7 +121,7 @@ struct GNUNET_DATASTORE_QueueEntry
121 * Function to call after transmission of the request. 121 * Function to call after transmission of the request.
122 */ 122 */
123 GNUNET_DATASTORE_ContinuationWithStatus cont; 123 GNUNET_DATASTORE_ContinuationWithStatus cont;
124 124
125 /** 125 /**
126 * Closure for 'cont'. 126 * Closure for 'cont'.
127 */ 127 */
@@ -167,7 +167,7 @@ struct GNUNET_DATASTORE_QueueEntry
167 * multiple of 64 bits. 167 * multiple of 64 bits.
168 */ 168 */
169 int was_transmitted; 169 int was_transmitted;
170 170
171}; 171};
172 172
173/** 173/**
@@ -250,22 +250,19 @@ struct GNUNET_DATASTORE_Handle
250 * @return handle to use to access the service 250 * @return handle to use to access the service
251 */ 251 */
252struct GNUNET_DATASTORE_Handle * 252struct GNUNET_DATASTORE_Handle *
253GNUNET_DATASTORE_connect (const struct 253GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
254 GNUNET_CONFIGURATION_Handle
255 *cfg)
256{ 254{
257 struct GNUNET_CLIENT_Connection *c; 255 struct GNUNET_CLIENT_Connection *c;
258 struct GNUNET_DATASTORE_Handle *h; 256 struct GNUNET_DATASTORE_Handle *h;
259 257
260 c = GNUNET_CLIENT_connect ("datastore", cfg); 258 c = GNUNET_CLIENT_connect ("datastore", cfg);
261 if (c == NULL) 259 if (c == NULL)
262 return NULL; /* oops */ 260 return NULL; /* oops */
263 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 261 h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
264 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); 262 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265 h->client = c; 263 h->client = c;
266 h->cfg = cfg; 264 h->cfg = cfg;
267 h->stats = GNUNET_STATISTICS_create ("datastore-api", 265 h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
268 cfg);
269 return h; 266 return h;
270} 267}
271 268
@@ -279,26 +276,24 @@ GNUNET_DATASTORE_connect (const struct
279 * @return number of bytes written to buf 276 * @return number of bytes written to buf
280 */ 277 */
281static size_t 278static size_t
282transmit_drop (void *cls, 279transmit_drop (void *cls, size_t size, void *buf)
283 size_t size,
284 void *buf)
285{ 280{
286 struct GNUNET_DATASTORE_Handle *h = cls; 281 struct GNUNET_DATASTORE_Handle *h = cls;
287 struct GNUNET_MessageHeader *hdr; 282 struct GNUNET_MessageHeader *hdr;
288 283
289 if (buf == NULL) 284 if (buf == NULL)
290 { 285 {
291 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 286 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
292 _("Failed to transmit request to drop database.\n")); 287 _("Failed to transmit request to drop database.\n"));
293 GNUNET_DATASTORE_disconnect (h, GNUNET_NO); 288 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
294 return 0; 289 return 0;
295 } 290 }
296 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); 291 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
297 hdr = buf; 292 hdr = buf;
298 hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); 293 hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
299 hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); 294 hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
300 GNUNET_DATASTORE_disconnect (h, GNUNET_NO); 295 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
301 return sizeof(struct GNUNET_MessageHeader); 296 return sizeof (struct GNUNET_MessageHeader);
302} 297}
303 298
304 299
@@ -310,55 +305,51 @@ transmit_drop (void *cls,
310 * @param drop set to GNUNET_YES to delete all data in datastore (!) 305 * @param drop set to GNUNET_YES to delete all data in datastore (!)
311 */ 306 */
312void 307void
313GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, 308GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
314 int drop)
315{ 309{
316 struct GNUNET_DATASTORE_QueueEntry *qe; 310 struct GNUNET_DATASTORE_QueueEntry *qe;
317 311
318#if DEBUG_DATASTORE 312#if DEBUG_DATASTORE
319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
320 "Datastore disconnect\n");
321#endif 314#endif
322 if (NULL != h->th) 315 if (NULL != h->th)
323 { 316 {
324 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 317 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
325 h->th = NULL; 318 h->th = NULL;
326 } 319 }
327 if (h->client != NULL) 320 if (h->client != NULL)
328 { 321 {
329 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); 322 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330 h->client = NULL; 323 h->client = NULL;
331 } 324 }
332 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 325 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
333 { 326 {
334 GNUNET_SCHEDULER_cancel (h->reconnect_task); 327 GNUNET_SCHEDULER_cancel (h->reconnect_task);
335 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 328 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
336 } 329 }
337 while (NULL != (qe = h->queue_head)) 330 while (NULL != (qe = h->queue_head))
331 {
332 GNUNET_assert (NULL != qe->response_proc);
333 qe->response_proc (h, NULL);
334 }
335 if (GNUNET_YES == drop)
336 {
337 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
338 if (h->client != NULL)
338 { 339 {
339 GNUNET_assert (NULL != qe->response_proc); 340 if (NULL !=
340 qe->response_proc (h, NULL); 341 GNUNET_CLIENT_notify_transmit_ready (h->client,
341 } 342 sizeof (struct
342 if (GNUNET_YES == drop) 343 GNUNET_MessageHeader),
343 { 344 GNUNET_TIME_UNIT_MINUTES,
344 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); 345 GNUNET_YES, &transmit_drop, h))
345 if (h->client != NULL) 346 return;
346 { 347 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
347 if (NULL != 348 h->client = NULL;
348 GNUNET_CLIENT_notify_transmit_ready (h->client,
349 sizeof(struct GNUNET_MessageHeader),
350 GNUNET_TIME_UNIT_MINUTES,
351 GNUNET_YES,
352 &transmit_drop,
353 h))
354 return;
355 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
356 h->client = NULL;
357 }
358 GNUNET_break (0);
359 } 349 }
360 GNUNET_STATISTICS_destroy (h->stats, 350 GNUNET_break (0);
361 GNUNET_NO); 351 }
352 GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
362 h->stats = NULL; 353 h->stats = NULL;
363 GNUNET_free (h); 354 GNUNET_free (h);
364} 355}
@@ -371,20 +362,18 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
371 * @param tc scheduler context 362 * @param tc scheduler context
372 */ 363 */
373static void 364static void
374timeout_queue_entry (void *cls, 365timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
375 const struct GNUNET_SCHEDULER_TaskContext *tc)
376{ 366{
377 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 367 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
378 368
379 GNUNET_STATISTICS_update (qe->h->stats, 369 GNUNET_STATISTICS_update (qe->h->stats,
380 gettext_noop ("# queue entry timeouts"), 370 gettext_noop ("# queue entry timeouts"),
381 1, 371 1, GNUNET_NO);
382 GNUNET_NO);
383 qe->task = GNUNET_SCHEDULER_NO_TASK; 372 qe->task = GNUNET_SCHEDULER_NO_TASK;
384 GNUNET_assert (qe->was_transmitted == GNUNET_NO); 373 GNUNET_assert (qe->was_transmitted == GNUNET_NO);
385#if DEBUG_DATASTORE 374#if DEBUG_DATASTORE
386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387 "Timeout of request in datastore queue\n"); 376 "Timeout of request in datastore queue\n");
388#endif 377#endif
389 qe->response_proc (qe->h, NULL); 378 qe->response_proc (qe->h, NULL);
390} 379}
@@ -406,12 +395,12 @@ timeout_queue_entry (void *cls,
406 */ 395 */
407static struct GNUNET_DATASTORE_QueueEntry * 396static struct GNUNET_DATASTORE_QueueEntry *
408make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 397make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
409 size_t msize, 398 size_t msize,
410 unsigned int queue_priority, 399 unsigned int queue_priority,
411 unsigned int max_queue_size, 400 unsigned int max_queue_size,
412 struct GNUNET_TIME_Relative timeout, 401 struct GNUNET_TIME_Relative timeout,
413 GNUNET_CLIENT_MessageHandler response_proc, 402 GNUNET_CLIENT_MessageHandler response_proc,
414 const union QueueContext *qc) 403 const union QueueContext *qc)
415{ 404{
416 struct GNUNET_DATASTORE_QueueEntry *ret; 405 struct GNUNET_DATASTORE_QueueEntry *ret;
417 struct GNUNET_DATASTORE_QueueEntry *pos; 406 struct GNUNET_DATASTORE_QueueEntry *pos;
@@ -419,21 +408,18 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
419 408
420 c = 0; 409 c = 0;
421 pos = h->queue_head; 410 pos = h->queue_head;
422 while ( (pos != NULL) && 411 while ((pos != NULL) &&
423 (c < max_queue_size) && 412 (c < max_queue_size) && (pos->priority >= queue_priority))
424 (pos->priority >= queue_priority) ) 413 {
425 { 414 c++;
426 c++; 415 pos = pos->next;
427 pos = pos->next; 416 }
428 }
429 if (c >= max_queue_size) 417 if (c >= max_queue_size)
430 { 418 {
431 GNUNET_STATISTICS_update (h->stats, 419 GNUNET_STATISTICS_update (h->stats,
432 gettext_noop ("# queue overflows"), 420 gettext_noop ("# queue overflows"), 1, GNUNET_NO);
433 1, 421 return NULL;
434 GNUNET_NO); 422 }
435 return NULL;
436 }
437 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); 423 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
438 ret->h = h; 424 ret->h = h;
439 ret->response_proc = response_proc; 425 ret->response_proc = response_proc;
@@ -444,61 +430,49 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
444 ret->message_size = msize; 430 ret->message_size = msize;
445 ret->was_transmitted = GNUNET_NO; 431 ret->was_transmitted = GNUNET_NO;
446 if (pos == NULL) 432 if (pos == NULL)
447 { 433 {
448 /* append at the tail */ 434 /* append at the tail */
449 pos = h->queue_tail; 435 pos = h->queue_tail;
450 } 436 }
451 else 437 else
452 { 438 {
453 pos = pos->prev; 439 pos = pos->prev;
454 /* do not insert at HEAD if HEAD query was already 440 /* do not insert at HEAD if HEAD query was already
455 transmitted and we are still receiving replies! */ 441 * transmitted and we are still receiving replies! */
456 if ( (pos == NULL) && 442 if ((pos == NULL) && (h->queue_head->was_transmitted))
457 (h->queue_head->was_transmitted) ) 443 pos = h->queue_head;
458 pos = h->queue_head; 444 }
459 }
460 c++; 445 c++;
461 GNUNET_STATISTICS_update (h->stats, 446 GNUNET_STATISTICS_update (h->stats,
462 gettext_noop ("# queue entries created"), 447 gettext_noop ("# queue entries created"),
463 1, 448 1, GNUNET_NO);
464 GNUNET_NO); 449 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
465 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
466 h->queue_tail,
467 pos,
468 ret);
469 h->queue_size++; 450 h->queue_size++;
470 ret->task = GNUNET_SCHEDULER_add_delayed (timeout, 451 ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
471 &timeout_queue_entry,
472 ret);
473 pos = ret->next; 452 pos = ret->next;
474 while (pos != NULL) 453 while (pos != NULL)
454 {
455 if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
475 { 456 {
476 if ( (pos->max_queue < h->queue_size) && 457 GNUNET_assert (pos->response_proc != NULL);
477 (pos->was_transmitted == GNUNET_NO) ) 458 /* move 'pos' element to head so that it will be
478 { 459 * killed on 'NULL' call below */
479 GNUNET_assert (pos->response_proc != NULL);
480 /* move 'pos' element to head so that it will be
481 killed on 'NULL' call below */
482#if DEBUG_DATASTORE 460#if DEBUG_DATASTORE
483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
484 "Dropping request from datastore queue\n"); 462 "Dropping request from datastore queue\n");
485#endif 463#endif
486 GNUNET_CONTAINER_DLL_remove (h->queue_head, 464 GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
487 h->queue_tail, 465 GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
488 pos); 466 GNUNET_STATISTICS_update (h->stats,
489 GNUNET_CONTAINER_DLL_insert (h->queue_head, 467 gettext_noop
490 h->queue_tail, 468 ("# Requests dropped from datastore queue"), 1,
491 pos); 469 GNUNET_NO);
492 GNUNET_STATISTICS_update (h->stats, 470 GNUNET_assert (h->queue_head == pos);
493 gettext_noop ("# Requests dropped from datastore queue"), 471 pos->response_proc (h, NULL);
494 1, 472 break;
495 GNUNET_NO);
496 GNUNET_assert (h->queue_head == pos);
497 pos->response_proc (h, NULL);
498 break;
499 }
500 pos = pos->next;
501 } 473 }
474 pos = pos->next;
475 }
502 return ret; 476 return ret;
503} 477}
504 478
@@ -509,8 +483,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
509 * 483 *
510 * @param h handle to the datastore 484 * @param h handle to the datastore
511 */ 485 */
512static void 486static void process_queue (struct GNUNET_DATASTORE_Handle *h);
513process_queue (struct GNUNET_DATASTORE_Handle *h);
514 487
515 488
516/** 489/**
@@ -520,8 +493,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h);
520 * @param tc scheduler context 493 * @param tc scheduler context
521 */ 494 */
522static void 495static void
523try_reconnect (void *cls, 496try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
524 const struct GNUNET_SCHEDULER_TaskContext *tc)
525{ 497{
526 struct GNUNET_DATASTORE_Handle *h = cls; 498 struct GNUNET_DATASTORE_Handle *h = cls;
527 499
@@ -534,18 +506,17 @@ try_reconnect (void *cls,
534 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 506 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
535 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); 507 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
536 if (h->client == NULL) 508 if (h->client == NULL)
537 { 509 {
538 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 510 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
539 "DATASTORE reconnect failed (fatally)\n"); 511 "DATASTORE reconnect failed (fatally)\n");
540 return; 512 return;
541 } 513 }
542 GNUNET_STATISTICS_update (h->stats, 514 GNUNET_STATISTICS_update (h->stats,
543 gettext_noop ("# datastore connections (re)created"), 515 gettext_noop
544 1, 516 ("# datastore connections (re)created"), 1,
545 GNUNET_NO); 517 GNUNET_NO);
546#if DEBUG_DATASTORE 518#if DEBUG_DATASTORE
547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
548 "Reconnected to DATASTORE\n");
549#endif 520#endif
550 process_queue (h); 521 process_queue (h);
551} 522}
@@ -561,25 +532,23 @@ static void
561do_disconnect (struct GNUNET_DATASTORE_Handle *h) 532do_disconnect (struct GNUNET_DATASTORE_Handle *h)
562{ 533{
563 if (h->client == NULL) 534 if (h->client == NULL)
564 { 535 {
565#if DEBUG_DATASTORE 536#if DEBUG_DATASTORE
566 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567 "client NULL in disconnect, will not try to reconnect\n"); 538 "client NULL in disconnect, will not try to reconnect\n");
568#endif 539#endif
569 return; 540 return;
570 } 541 }
571#if 0 542#if 0
572 GNUNET_STATISTICS_update (stats, 543 GNUNET_STATISTICS_update (stats,
573 gettext_noop ("# reconnected to DATASTORE"), 544 gettext_noop ("# reconnected to DATASTORE"),
574 1, 545 1, GNUNET_NO);
575 GNUNET_NO);
576#endif 546#endif
577 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); 547 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
578 h->skip_next_messages = 0; 548 h->skip_next_messages = 0;
579 h->client = NULL; 549 h->client = NULL;
580 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, 550 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
581 &try_reconnect, 551 &try_reconnect, h);
582 h);
583} 552}
584 553
585 554
@@ -590,30 +559,28 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
590 * @param cls the 'struct GNUNET_DATASTORE_Handle' 559 * @param cls the 'struct GNUNET_DATASTORE_Handle'
591 * @param msg the received message 560 * @param msg the received message
592 */ 561 */
593static void 562static void
594receive_cb (void *cls, 563receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
595 const struct GNUNET_MessageHeader *msg)
596{ 564{
597 struct GNUNET_DATASTORE_Handle *h = cls; 565 struct GNUNET_DATASTORE_Handle *h = cls;
598 struct GNUNET_DATASTORE_QueueEntry *qe; 566 struct GNUNET_DATASTORE_QueueEntry *qe;
599 567
600 h->in_receive = GNUNET_NO; 568 h->in_receive = GNUNET_NO;
601#if DEBUG_DATASTORE 569#if DEBUG_DATASTORE
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 570 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
603 "Receiving reply from datastore\n");
604#endif 571#endif
605 if (h->skip_next_messages > 0) 572 if (h->skip_next_messages > 0)
606 { 573 {
607 h->skip_next_messages--; 574 h->skip_next_messages--;
608 process_queue (h); 575 process_queue (h);
609 return; 576 return;
610 } 577 }
611 if (NULL == (qe = h->queue_head)) 578 if (NULL == (qe = h->queue_head))
612 { 579 {
613 GNUNET_break (0); 580 GNUNET_break (0);
614 process_queue (h); 581 process_queue (h);
615 return; 582 return;
616 } 583 }
617 qe->response_proc (h, msg); 584 qe->response_proc (h, msg);
618} 585}
619 586
@@ -627,9 +594,7 @@ receive_cb (void *cls,
627 * @return number of bytes written to buf 594 * @return number of bytes written to buf
628 */ 595 */
629static size_t 596static size_t
630transmit_request (void *cls, 597transmit_request (void *cls, size_t size, void *buf)
631 size_t size,
632 void *buf)
633{ 598{
634 struct GNUNET_DATASTORE_Handle *h = cls; 599 struct GNUNET_DATASTORE_Handle *h = cls;
635 struct GNUNET_DATASTORE_QueueEntry *qe; 600 struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -637,27 +602,25 @@ transmit_request (void *cls,
637 602
638 h->th = NULL; 603 h->th = NULL;
639 if (NULL == (qe = h->queue_head)) 604 if (NULL == (qe = h->queue_head))
640 return 0; /* no entry in queue */ 605 return 0; /* no entry in queue */
641 if (buf == NULL) 606 if (buf == NULL)
642 { 607 {
643 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 608 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
644 _("Failed to transmit request to DATASTORE.\n")); 609 _("Failed to transmit request to DATASTORE.\n"));
645 GNUNET_STATISTICS_update (h->stats, 610 GNUNET_STATISTICS_update (h->stats,
646 gettext_noop ("# transmission request failures"), 611 gettext_noop ("# transmission request failures"),
647 1, 612 1, GNUNET_NO);
648 GNUNET_NO); 613 do_disconnect (h);
649 do_disconnect (h); 614 return 0;
650 return 0; 615 }
651 }
652 if (size < (msize = qe->message_size)) 616 if (size < (msize = qe->message_size))
653 { 617 {
654 process_queue (h); 618 process_queue (h);
655 return 0; 619 return 0;
656 } 620 }
657 #if DEBUG_DATASTORE 621#if DEBUG_DATASTORE
658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659 "Transmitting %u byte request to DATASTORE\n", 623 "Transmitting %u byte request to DATASTORE\n", msize);
660 msize);
661#endif 624#endif
662 memcpy (buf, &qe[1], msize); 625 memcpy (buf, &qe[1], msize);
663 qe->was_transmitted = GNUNET_YES; 626 qe->was_transmitted = GNUNET_YES;
@@ -666,13 +629,11 @@ transmit_request (void *cls,
666 GNUNET_assert (GNUNET_NO == h->in_receive); 629 GNUNET_assert (GNUNET_NO == h->in_receive);
667 h->in_receive = GNUNET_YES; 630 h->in_receive = GNUNET_YES;
668 GNUNET_CLIENT_receive (h->client, 631 GNUNET_CLIENT_receive (h->client,
669 &receive_cb, 632 &receive_cb,
670 h, 633 h, GNUNET_TIME_absolute_get_remaining (qe->timeout));
671 GNUNET_TIME_absolute_get_remaining (qe->timeout));
672 GNUNET_STATISTICS_update (h->stats, 634 GNUNET_STATISTICS_update (h->stats,
673 gettext_noop ("# bytes sent to datastore"), 635 gettext_noop ("# bytes sent to datastore"),
674 1, 636 1, GNUNET_NO);
675 GNUNET_NO);
676 return msize; 637 return msize;
677} 638}
678 639
@@ -689,53 +650,47 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
689 struct GNUNET_DATASTORE_QueueEntry *qe; 650 struct GNUNET_DATASTORE_QueueEntry *qe;
690 651
691 if (NULL == (qe = h->queue_head)) 652 if (NULL == (qe = h->queue_head))
692 { 653 {
693#if DEBUG_DATASTORE > 1 654#if DEBUG_DATASTORE > 1
694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 655 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
695 "Queue empty\n");
696#endif 656#endif
697 return; /* no entry in queue */ 657 return; /* no entry in queue */
698 } 658 }
699 if (qe->was_transmitted == GNUNET_YES) 659 if (qe->was_transmitted == GNUNET_YES)
700 { 660 {
701#if DEBUG_DATASTORE > 1 661#if DEBUG_DATASTORE > 1
702 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
703 "Head request already transmitted\n");
704#endif 663#endif
705 return; /* waiting for replies */ 664 return; /* waiting for replies */
706 } 665 }
707 if (h->th != NULL) 666 if (h->th != NULL)
708 { 667 {
709#if DEBUG_DATASTORE > 1 668#if DEBUG_DATASTORE > 1
710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
711 "Pending transmission request\n");
712#endif 670#endif
713 return; /* request pending */ 671 return; /* request pending */
714 } 672 }
715 if (h->client == NULL) 673 if (h->client == NULL)
716 { 674 {
717#if DEBUG_DATASTORE > 1 675#if DEBUG_DATASTORE > 1
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 676 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
719 "Not connected\n");
720#endif 677#endif
721 return; /* waiting for reconnect */ 678 return; /* waiting for reconnect */
722 } 679 }
723 if (GNUNET_YES == h->in_receive) 680 if (GNUNET_YES == h->in_receive)
724 { 681 {
725 /* wait for response to previous query */ 682 /* wait for response to previous query */
726 return; 683 return;
727 } 684 }
728#if DEBUG_DATASTORE 685#if DEBUG_DATASTORE
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Queueing %u byte request to DATASTORE\n", 687 "Queueing %u byte request to DATASTORE\n", qe->message_size);
731 qe->message_size);
732#endif 688#endif
733 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 689 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
734 qe->message_size, 690 qe->message_size,
735 GNUNET_TIME_absolute_get_remaining (qe->timeout), 691 GNUNET_TIME_absolute_get_remaining
736 GNUNET_YES, 692 (qe->timeout), GNUNET_YES,
737 &transmit_request, 693 &transmit_request, h);
738 h);
739 GNUNET_assert (GNUNET_NO == h->in_receive); 694 GNUNET_assert (GNUNET_NO == h->in_receive);
740 GNUNET_break (NULL != h->th); 695 GNUNET_break (NULL != h->th);
741} 696}
@@ -767,16 +722,14 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
767{ 722{
768 struct GNUNET_DATASTORE_Handle *h = qe->h; 723 struct GNUNET_DATASTORE_Handle *h = qe->h;
769 724
770 GNUNET_CONTAINER_DLL_remove (h->queue_head, 725 GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
771 h->queue_tail,
772 qe);
773 if (qe->task != GNUNET_SCHEDULER_NO_TASK) 726 if (qe->task != GNUNET_SCHEDULER_NO_TASK)
774 { 727 {
775 GNUNET_SCHEDULER_cancel (qe->task); 728 GNUNET_SCHEDULER_cancel (qe->task);
776 qe->task = GNUNET_SCHEDULER_NO_TASK; 729 qe->task = GNUNET_SCHEDULER_NO_TASK;
777 } 730 }
778 h->queue_size--; 731 h->queue_size--;
779 qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ 732 qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
780 GNUNET_free (qe); 733 GNUNET_free (qe);
781} 734}
782 735
@@ -788,10 +741,8 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
788 * @param cls closure 741 * @param cls closure
789 * @param msg message received, NULL on timeout or fatal error 742 * @param msg message received, NULL on timeout or fatal error
790 */ 743 */
791static void 744static void
792process_status_message (void *cls, 745process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
793 const struct
794 GNUNET_MessageHeader * msg)
795{ 746{
796 struct GNUNET_DATASTORE_Handle *h = cls; 747 struct GNUNET_DATASTORE_Handle *h = cls;
797 struct GNUNET_DATASTORE_QueueEntry *qe; 748 struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -802,74 +753,68 @@ process_status_message (void *cls,
802 int was_transmitted; 753 int was_transmitted;
803 754
804 if (NULL == (qe = h->queue_head)) 755 if (NULL == (qe = h->queue_head))
805 { 756 {
806 GNUNET_break (0); 757 GNUNET_break (0);
807 do_disconnect (h); 758 do_disconnect (h);
808 return; 759 return;
809 } 760 }
810 rc = qe->qc.sc; 761 rc = qe->qc.sc;
811 if (msg == NULL) 762 if (msg == NULL)
812 { 763 {
813 was_transmitted = qe->was_transmitted; 764 was_transmitted = qe->was_transmitted;
814 free_queue_entry (qe); 765 free_queue_entry (qe);
815 if (was_transmitted == GNUNET_YES) 766 if (was_transmitted == GNUNET_YES)
816 do_disconnect (h); 767 do_disconnect (h);
817 else 768 else
818 process_queue (h); 769 process_queue (h);
819 if (rc.cont != NULL) 770 if (rc.cont != NULL)
820 rc.cont (rc.cont_cls, 771 rc.cont (rc.cont_cls,
821 GNUNET_SYSERR, 772 GNUNET_SYSERR,
822 _("Failed to receive status response from database.")); 773 _("Failed to receive status response from database."));
823 return; 774 return;
824 } 775 }
825 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 776 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
826 free_queue_entry (qe); 777 free_queue_entry (qe);
827 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || 778 if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
828 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 779 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
829 { 780 {
830 GNUNET_break (0); 781 GNUNET_break (0);
831 h->retry_time = GNUNET_TIME_UNIT_ZERO; 782 h->retry_time = GNUNET_TIME_UNIT_ZERO;
832 do_disconnect (h); 783 do_disconnect (h);
833 if (rc.cont != NULL) 784 if (rc.cont != NULL)
834 rc.cont (rc.cont_cls, 785 rc.cont (rc.cont_cls,
835 GNUNET_SYSERR, 786 GNUNET_SYSERR,
836 _("Error reading response from datastore service")); 787 _("Error reading response from datastore service"));
837 return; 788 return;
838 } 789 }
839 sm = (const struct StatusMessage*) msg; 790 sm = (const struct StatusMessage *) msg;
840 status = ntohl(sm->status); 791 status = ntohl (sm->status);
841 emsg = NULL; 792 emsg = NULL;
842 if (ntohs(msg->size) > sizeof(struct StatusMessage)) 793 if (ntohs (msg->size) > sizeof (struct StatusMessage))
843 { 794 {
844 emsg = (const char*) &sm[1]; 795 emsg = (const char *) &sm[1];
845 if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') 796 if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
846 {
847 GNUNET_break (0);
848 emsg = _("Invalid error message received from datastore service");
849 }
850 }
851 if ( (status == GNUNET_SYSERR) &&
852 (emsg == NULL) )
853 { 797 {
854 GNUNET_break (0); 798 GNUNET_break (0);
855 emsg = _("Invalid error message received from datastore service"); 799 emsg = _("Invalid error message received from datastore service");
856 } 800 }
801 }
802 if ((status == GNUNET_SYSERR) && (emsg == NULL))
803 {
804 GNUNET_break (0);
805 emsg = _("Invalid error message received from datastore service");
806 }
857#if DEBUG_DATASTORE 807#if DEBUG_DATASTORE
858 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859 "Received status %d/%s\n", 809 "Received status %d/%s\n", (int) status, emsg);
860 (int) status,
861 emsg);
862#endif 810#endif
863 GNUNET_STATISTICS_update (h->stats, 811 GNUNET_STATISTICS_update (h->stats,
864 gettext_noop ("# status messages received"), 812 gettext_noop ("# status messages received"),
865 1, 813 1, GNUNET_NO);
866 GNUNET_NO);
867 h->retry_time.rel_value = 0; 814 h->retry_time.rel_value = 0;
868 process_queue (h); 815 process_queue (h);
869 if (rc.cont != NULL) 816 if (rc.cont != NULL)
870 rc.cont (rc.cont_cls, 817 rc.cont (rc.cont_cls, status, emsg);
871 status,
872 emsg);
873} 818}
874 819
875 820
@@ -901,20 +846,20 @@ process_status_message (void *cls,
901 */ 846 */
902struct GNUNET_DATASTORE_QueueEntry * 847struct GNUNET_DATASTORE_QueueEntry *
903GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, 848GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
904 uint32_t rid, 849 uint32_t rid,
905 const GNUNET_HashCode * key, 850 const GNUNET_HashCode * key,
906 size_t size, 851 size_t size,
907 const void *data, 852 const void *data,
908 enum GNUNET_BLOCK_Type type, 853 enum GNUNET_BLOCK_Type type,
909 uint32_t priority, 854 uint32_t priority,
910 uint32_t anonymity, 855 uint32_t anonymity,
911 uint32_t replication, 856 uint32_t replication,
912 struct GNUNET_TIME_Absolute expiration, 857 struct GNUNET_TIME_Absolute expiration,
913 unsigned int queue_priority, 858 unsigned int queue_priority,
914 unsigned int max_queue_size, 859 unsigned int max_queue_size,
915 struct GNUNET_TIME_Relative timeout, 860 struct GNUNET_TIME_Relative timeout,
916 GNUNET_DATASTORE_ContinuationWithStatus cont, 861 GNUNET_DATASTORE_ContinuationWithStatus cont,
917 void *cont_cls) 862 void *cont_cls)
918{ 863{
919 struct GNUNET_DATASTORE_QueueEntry *qe; 864 struct GNUNET_DATASTORE_QueueEntry *qe;
920 struct DataMessage *dm; 865 struct DataMessage *dm;
@@ -923,42 +868,41 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
923 868
924#if DEBUG_DATASTORE 869#if DEBUG_DATASTORE
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "Asked to put %u bytes of data under key `%s' for %llu ms\n", 871 "Asked to put %u bytes of data under key `%s' for %llu ms\n",
927 size, 872 size,
928 GNUNET_h2s (key), 873 GNUNET_h2s (key),
929 GNUNET_TIME_absolute_get_remaining (expiration).rel_value); 874 GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
930#endif 875#endif
931 msize = sizeof(struct DataMessage) + size; 876 msize = sizeof (struct DataMessage) + size;
932 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 877 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
933 qc.sc.cont = cont; 878 qc.sc.cont = cont;
934 qc.sc.cont_cls = cont_cls; 879 qc.sc.cont_cls = cont_cls;
935 qe = make_queue_entry (h, msize, 880 qe = make_queue_entry (h, msize,
936 queue_priority, max_queue_size, timeout, 881 queue_priority, max_queue_size, timeout,
937 &process_status_message, &qc); 882 &process_status_message, &qc);
938 if (qe == NULL) 883 if (qe == NULL)
939 { 884 {
940#if DEBUG_DATASTORE 885#if DEBUG_DATASTORE
941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 886 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942 "Could not create queue entry for PUT\n"); 887 "Could not create queue entry for PUT\n");
943#endif 888#endif
944 return NULL; 889 return NULL;
945 } 890 }
946 GNUNET_STATISTICS_update (h->stats, 891 GNUNET_STATISTICS_update (h->stats,
947 gettext_noop ("# PUT requests executed"), 892 gettext_noop ("# PUT requests executed"),
948 1, 893 1, GNUNET_NO);
949 GNUNET_NO); 894 dm = (struct DataMessage *) &qe[1];
950 dm = (struct DataMessage* ) &qe[1]; 895 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
951 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); 896 dm->header.size = htons (msize);
952 dm->header.size = htons(msize); 897 dm->rid = htonl (rid);
953 dm->rid = htonl(rid); 898 dm->size = htonl ((uint32_t) size);
954 dm->size = htonl( (uint32_t) size); 899 dm->type = htonl (type);
955 dm->type = htonl(type); 900 dm->priority = htonl (priority);
956 dm->priority = htonl(priority); 901 dm->anonymity = htonl (anonymity);
957 dm->anonymity = htonl(anonymity);
958 dm->replication = htonl (replication); 902 dm->replication = htonl (replication);
959 dm->reserved = htonl (0); 903 dm->reserved = htonl (0);
960 dm->uid = GNUNET_htonll(0); 904 dm->uid = GNUNET_htonll (0);
961 dm->expiration = GNUNET_TIME_absolute_hton(expiration); 905 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
962 dm->key = *key; 906 dm->key = *key;
963 memcpy (&dm[1], data, size); 907 memcpy (&dm[1], data, size);
964 process_queue (h); 908 process_queue (h);
@@ -987,13 +931,13 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
987 */ 931 */
988struct GNUNET_DATASTORE_QueueEntry * 932struct GNUNET_DATASTORE_QueueEntry *
989GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, 933GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
990 uint64_t amount, 934 uint64_t amount,
991 uint32_t entries, 935 uint32_t entries,
992 unsigned int queue_priority, 936 unsigned int queue_priority,
993 unsigned int max_queue_size, 937 unsigned int max_queue_size,
994 struct GNUNET_TIME_Relative timeout, 938 struct GNUNET_TIME_Relative timeout,
995 GNUNET_DATASTORE_ContinuationWithStatus cont, 939 GNUNET_DATASTORE_ContinuationWithStatus cont,
996 void *cont_cls) 940 void *cont_cls)
997{ 941{
998 struct GNUNET_DATASTORE_QueueEntry *qe; 942 struct GNUNET_DATASTORE_QueueEntry *qe;
999 struct ReserveMessage *rm; 943 struct ReserveMessage *rm;
@@ -1003,32 +947,30 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1003 cont = &drop_status_cont; 947 cont = &drop_status_cont;
1004#if DEBUG_DATASTORE 948#if DEBUG_DATASTORE
1005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006 "Asked to reserve %llu bytes of data and %u entries\n", 950 "Asked to reserve %llu bytes of data and %u entries\n",
1007 (unsigned long long) amount, 951 (unsigned long long) amount, (unsigned int) entries);
1008 (unsigned int) entries);
1009#endif 952#endif
1010 qc.sc.cont = cont; 953 qc.sc.cont = cont;
1011 qc.sc.cont_cls = cont_cls; 954 qc.sc.cont_cls = cont_cls;
1012 qe = make_queue_entry (h, sizeof(struct ReserveMessage), 955 qe = make_queue_entry (h, sizeof (struct ReserveMessage),
1013 queue_priority, max_queue_size, timeout, 956 queue_priority, max_queue_size, timeout,
1014 &process_status_message, &qc); 957 &process_status_message, &qc);
1015 if (qe == NULL) 958 if (qe == NULL)
1016 { 959 {
1017#if DEBUG_DATASTORE 960#if DEBUG_DATASTORE
1018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 961 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019 "Could not create queue entry to reserve\n"); 962 "Could not create queue entry to reserve\n");
1020#endif 963#endif
1021 return NULL; 964 return NULL;
1022 } 965 }
1023 GNUNET_STATISTICS_update (h->stats, 966 GNUNET_STATISTICS_update (h->stats,
1024 gettext_noop ("# RESERVE requests executed"), 967 gettext_noop ("# RESERVE requests executed"),
1025 1, 968 1, GNUNET_NO);
1026 GNUNET_NO); 969 rm = (struct ReserveMessage *) &qe[1];
1027 rm = (struct ReserveMessage*) &qe[1]; 970 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1028 rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); 971 rm->header.size = htons (sizeof (struct ReserveMessage));
1029 rm->header.size = htons(sizeof (struct ReserveMessage)); 972 rm->entries = htonl (entries);
1030 rm->entries = htonl(entries); 973 rm->amount = GNUNET_htonll (amount);
1031 rm->amount = GNUNET_htonll(amount);
1032 process_queue (h); 974 process_queue (h);
1033 return qe; 975 return qe;
1034} 976}
@@ -1057,12 +999,12 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1057 */ 999 */
1058struct GNUNET_DATASTORE_QueueEntry * 1000struct GNUNET_DATASTORE_QueueEntry *
1059GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 1001GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1060 uint32_t rid, 1002 uint32_t rid,
1061 unsigned int queue_priority, 1003 unsigned int queue_priority,
1062 unsigned int max_queue_size, 1004 unsigned int max_queue_size,
1063 struct GNUNET_TIME_Relative timeout, 1005 struct GNUNET_TIME_Relative timeout,
1064 GNUNET_DATASTORE_ContinuationWithStatus cont, 1006 GNUNET_DATASTORE_ContinuationWithStatus cont,
1065 void *cont_cls) 1007 void *cont_cls)
1066{ 1008{
1067 struct GNUNET_DATASTORE_QueueEntry *qe; 1009 struct GNUNET_DATASTORE_QueueEntry *qe;
1068 struct ReleaseReserveMessage *rrm; 1010 struct ReleaseReserveMessage *rrm;
@@ -1071,31 +1013,29 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1071 if (cont == NULL) 1013 if (cont == NULL)
1072 cont = &drop_status_cont; 1014 cont = &drop_status_cont;
1073#if DEBUG_DATASTORE 1015#if DEBUG_DATASTORE
1074 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1075 "Asked to release reserve %d\n",
1076 rid);
1077#endif 1017#endif
1078 qc.sc.cont = cont; 1018 qc.sc.cont = cont;
1079 qc.sc.cont_cls = cont_cls; 1019 qc.sc.cont_cls = cont_cls;
1080 qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), 1020 qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1081 queue_priority, max_queue_size, timeout, 1021 queue_priority, max_queue_size, timeout,
1082 &process_status_message, &qc); 1022 &process_status_message, &qc);
1083 if (qe == NULL) 1023 if (qe == NULL)
1084 { 1024 {
1085#if DEBUG_DATASTORE 1025#if DEBUG_DATASTORE
1086 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087 "Could not create queue entry to release reserve\n"); 1027 "Could not create queue entry to release reserve\n");
1088#endif 1028#endif
1089 return NULL; 1029 return NULL;
1090 } 1030 }
1091 GNUNET_STATISTICS_update (h->stats, 1031 GNUNET_STATISTICS_update (h->stats,
1092 gettext_noop ("# RELEASE RESERVE requests executed"), 1032 gettext_noop
1093 1, 1033 ("# RELEASE RESERVE requests executed"), 1,
1094 GNUNET_NO); 1034 GNUNET_NO);
1095 rrm = (struct ReleaseReserveMessage*) &qe[1]; 1035 rrm = (struct ReleaseReserveMessage *) &qe[1];
1096 rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); 1036 rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1097 rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); 1037 rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1098 rrm->rid = htonl(rid); 1038 rrm->rid = htonl (rid);
1099 process_queue (h); 1039 process_queue (h);
1100 return qe; 1040 return qe;
1101} 1041}
@@ -1120,14 +1060,14 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1120 */ 1060 */
1121struct GNUNET_DATASTORE_QueueEntry * 1061struct GNUNET_DATASTORE_QueueEntry *
1122GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, 1062GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1123 uint64_t uid, 1063 uint64_t uid,
1124 uint32_t priority, 1064 uint32_t priority,
1125 struct GNUNET_TIME_Absolute expiration, 1065 struct GNUNET_TIME_Absolute expiration,
1126 unsigned int queue_priority, 1066 unsigned int queue_priority,
1127 unsigned int max_queue_size, 1067 unsigned int max_queue_size,
1128 struct GNUNET_TIME_Relative timeout, 1068 struct GNUNET_TIME_Relative timeout,
1129 GNUNET_DATASTORE_ContinuationWithStatus cont, 1069 GNUNET_DATASTORE_ContinuationWithStatus cont,
1130 void *cont_cls) 1070 void *cont_cls)
1131{ 1071{
1132 struct GNUNET_DATASTORE_QueueEntry *qe; 1072 struct GNUNET_DATASTORE_QueueEntry *qe;
1133 struct UpdateMessage *um; 1073 struct UpdateMessage *um;
@@ -1137,34 +1077,33 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1137 cont = &drop_status_cont; 1077 cont = &drop_status_cont;
1138#if DEBUG_DATASTORE 1078#if DEBUG_DATASTORE
1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1079 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "Asked to update entry %llu raising priority by %u and expiration to %llu\n", 1080 "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1141 uid, 1081 uid,
1142 (unsigned int) priority, 1082 (unsigned int) priority,
1143 (unsigned long long) expiration.abs_value); 1083 (unsigned long long) expiration.abs_value);
1144#endif 1084#endif
1145 qc.sc.cont = cont; 1085 qc.sc.cont = cont;
1146 qc.sc.cont_cls = cont_cls; 1086 qc.sc.cont_cls = cont_cls;
1147 qe = make_queue_entry (h, sizeof(struct UpdateMessage), 1087 qe = make_queue_entry (h, sizeof (struct UpdateMessage),
1148 queue_priority, max_queue_size, timeout, 1088 queue_priority, max_queue_size, timeout,
1149 &process_status_message, &qc); 1089 &process_status_message, &qc);
1150 if (qe == NULL) 1090 if (qe == NULL)
1151 { 1091 {
1152#if DEBUG_DATASTORE 1092#if DEBUG_DATASTORE
1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1093 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154 "Could not create queue entry for UPDATE\n"); 1094 "Could not create queue entry for UPDATE\n");
1155#endif 1095#endif
1156 return NULL; 1096 return NULL;
1157 } 1097 }
1158 GNUNET_STATISTICS_update (h->stats, 1098 GNUNET_STATISTICS_update (h->stats,
1159 gettext_noop ("# UPDATE requests executed"), 1099 gettext_noop ("# UPDATE requests executed"),
1160 1, 1100 1, GNUNET_NO);
1161 GNUNET_NO); 1101 um = (struct UpdateMessage *) &qe[1];
1162 um = (struct UpdateMessage*) &qe[1]; 1102 um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1163 um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); 1103 um->header.size = htons (sizeof (struct UpdateMessage));
1164 um->header.size = htons(sizeof (struct UpdateMessage)); 1104 um->priority = htonl (priority);
1165 um->priority = htonl(priority); 1105 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1166 um->expiration = GNUNET_TIME_absolute_hton(expiration); 1106 um->uid = GNUNET_htonll (uid);
1167 um->uid = GNUNET_htonll(uid);
1168 process_queue (h); 1107 process_queue (h);
1169 return qe; 1108 return qe;
1170} 1109}
@@ -1193,14 +1132,14 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1193 */ 1132 */
1194struct GNUNET_DATASTORE_QueueEntry * 1133struct GNUNET_DATASTORE_QueueEntry *
1195GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, 1134GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1196 const GNUNET_HashCode *key, 1135 const GNUNET_HashCode * key,
1197 size_t size, 1136 size_t size,
1198 const void *data, 1137 const void *data,
1199 unsigned int queue_priority, 1138 unsigned int queue_priority,
1200 unsigned int max_queue_size, 1139 unsigned int max_queue_size,
1201 struct GNUNET_TIME_Relative timeout, 1140 struct GNUNET_TIME_Relative timeout,
1202 GNUNET_DATASTORE_ContinuationWithStatus cont, 1141 GNUNET_DATASTORE_ContinuationWithStatus cont,
1203 void *cont_cls) 1142 void *cont_cls)
1204{ 1143{
1205 struct GNUNET_DATASTORE_QueueEntry *qe; 1144 struct GNUNET_DATASTORE_QueueEntry *qe;
1206 struct DataMessage *dm; 1145 struct DataMessage *dm;
@@ -1211,39 +1150,37 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1211 cont = &drop_status_cont; 1150 cont = &drop_status_cont;
1212#if DEBUG_DATASTORE 1151#if DEBUG_DATASTORE
1213 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214 "Asked to remove %u bytes under key `%s'\n", 1153 "Asked to remove %u bytes under key `%s'\n",
1215 size, 1154 size, GNUNET_h2s (key));
1216 GNUNET_h2s (key));
1217#endif 1155#endif
1218 qc.sc.cont = cont; 1156 qc.sc.cont = cont;
1219 qc.sc.cont_cls = cont_cls; 1157 qc.sc.cont_cls = cont_cls;
1220 msize = sizeof(struct DataMessage) + size; 1158 msize = sizeof (struct DataMessage) + size;
1221 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 1159 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1222 qe = make_queue_entry (h, msize, 1160 qe = make_queue_entry (h, msize,
1223 queue_priority, max_queue_size, timeout, 1161 queue_priority, max_queue_size, timeout,
1224 &process_status_message, &qc); 1162 &process_status_message, &qc);
1225 if (qe == NULL) 1163 if (qe == NULL)
1226 { 1164 {
1227#if DEBUG_DATASTORE 1165#if DEBUG_DATASTORE
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "Could not create queue entry for REMOVE\n"); 1167 "Could not create queue entry for REMOVE\n");
1230#endif 1168#endif
1231 return NULL; 1169 return NULL;
1232 } 1170 }
1233 GNUNET_STATISTICS_update (h->stats, 1171 GNUNET_STATISTICS_update (h->stats,
1234 gettext_noop ("# REMOVE requests executed"), 1172 gettext_noop ("# REMOVE requests executed"),
1235 1, 1173 1, GNUNET_NO);
1236 GNUNET_NO); 1174 dm = (struct DataMessage *) &qe[1];
1237 dm = (struct DataMessage*) &qe[1]; 1175 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1238 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); 1176 dm->header.size = htons (msize);
1239 dm->header.size = htons(msize); 1177 dm->rid = htonl (0);
1240 dm->rid = htonl(0); 1178 dm->size = htonl (size);
1241 dm->size = htonl(size); 1179 dm->type = htonl (0);
1242 dm->type = htonl(0); 1180 dm->priority = htonl (0);
1243 dm->priority = htonl(0); 1181 dm->anonymity = htonl (0);
1244 dm->anonymity = htonl(0); 1182 dm->uid = GNUNET_htonll (0);
1245 dm->uid = GNUNET_htonll(0); 1183 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1246 dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1247 dm->key = *key; 1184 dm->key = *key;
1248 memcpy (&dm[1], data, size); 1185 memcpy (&dm[1], data, size);
1249 process_queue (h); 1186 process_queue (h);
@@ -1258,9 +1195,8 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1258 * @param cls closure 1195 * @param cls closure
1259 * @param msg message received, NULL on timeout or fatal error 1196 * @param msg message received, NULL on timeout or fatal error
1260 */ 1197 */
1261static void 1198static void
1262process_result_message (void *cls, 1199process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1263 const struct GNUNET_MessageHeader *msg)
1264{ 1200{
1265 struct GNUNET_DATASTORE_Handle *h = cls; 1201 struct GNUNET_DATASTORE_Handle *h = cls;
1266 struct GNUNET_DATASTORE_QueueEntry *qe; 1202 struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -1269,104 +1205,98 @@ process_result_message (void *cls,
1269 int was_transmitted; 1205 int was_transmitted;
1270 1206
1271 if (msg == NULL) 1207 if (msg == NULL)
1208 {
1209 qe = h->queue_head;
1210 GNUNET_assert (NULL != qe);
1211 rc = qe->qc.rc;
1212 was_transmitted = qe->was_transmitted;
1213 free_queue_entry (qe);
1214 if (was_transmitted == GNUNET_YES)
1272 { 1215 {
1273 qe = h->queue_head; 1216 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1274 GNUNET_assert (NULL != qe); 1217 _("Failed to receive response from database.\n"));
1275 rc = qe->qc.rc; 1218 do_disconnect (h);
1276 was_transmitted = qe->was_transmitted;
1277 free_queue_entry (qe);
1278 if (was_transmitted == GNUNET_YES)
1279 {
1280 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1281 _("Failed to receive response from database.\n"));
1282 do_disconnect (h);
1283 }
1284 else
1285 {
1286 process_queue (h);
1287 }
1288 if (rc.proc != NULL)
1289 rc.proc (rc.proc_cls,
1290 NULL, 0, NULL, 0, 0, 0,
1291 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1292 return;
1293 } 1219 }
1294 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 1220 else
1295 { 1221 {
1296 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1297 qe = h->queue_head;
1298 rc = qe->qc.rc;
1299 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1300 free_queue_entry (qe);
1301#if DEBUG_DATASTORE
1302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303 "Received end of result set, new queue size is %u\n",
1304 h->queue_size);
1305#endif
1306 if (rc.proc != NULL)
1307 rc.proc (rc.proc_cls,
1308 NULL, 0, NULL, 0, 0, 0,
1309 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1310 h->retry_time.rel_value = 0;
1311 h->result_count = 0;
1312 process_queue (h); 1222 process_queue (h);
1313 return;
1314 } 1223 }
1224 if (rc.proc != NULL)
1225 rc.proc (rc.proc_cls,
1226 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1227 return;
1228 }
1229 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1230 {
1231 GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1232 qe = h->queue_head;
1233 rc = qe->qc.rc;
1234 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1235 free_queue_entry (qe);
1236#if DEBUG_DATASTORE
1237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238 "Received end of result set, new queue size is %u\n",
1239 h->queue_size);
1240#endif
1241 if (rc.proc != NULL)
1242 rc.proc (rc.proc_cls,
1243 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1244 h->retry_time.rel_value = 0;
1245 h->result_count = 0;
1246 process_queue (h);
1247 return;
1248 }
1315 qe = h->queue_head; 1249 qe = h->queue_head;
1316 GNUNET_assert (NULL != qe); 1250 GNUNET_assert (NULL != qe);
1317 rc = qe->qc.rc; 1251 rc = qe->qc.rc;
1318 if (GNUNET_YES != qe->was_transmitted) 1252 if (GNUNET_YES != qe->was_transmitted)
1319 { 1253 {
1320 GNUNET_break (0); 1254 GNUNET_break (0);
1321 free_queue_entry (qe); 1255 free_queue_entry (qe);
1322 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1256 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1323 do_disconnect (h); 1257 do_disconnect (h);
1324 if (rc.proc != NULL) 1258 if (rc.proc != NULL)
1325 rc.proc (rc.proc_cls, 1259 rc.proc (rc.proc_cls,
1326 NULL, 0, NULL, 0, 0, 0, 1260 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1327 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1261 return;
1328 return; 1262 }
1329 } 1263 if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1330 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || 1264 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1331 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || 1265 (ntohs (msg->size) !=
1332 (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) 1266 sizeof (struct DataMessage) +
1333 { 1267 ntohl (((const struct DataMessage *) msg)->size)))
1334 GNUNET_break (0); 1268 {
1335 free_queue_entry (qe); 1269 GNUNET_break (0);
1336 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1270 free_queue_entry (qe);
1337 do_disconnect (h); 1271 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1338 if (rc.proc != NULL) 1272 do_disconnect (h);
1339 rc.proc (rc.proc_cls, 1273 if (rc.proc != NULL)
1340 NULL, 0, NULL, 0, 0, 0, 1274 rc.proc (rc.proc_cls,
1341 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1275 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1342 return; 1276 return;
1343 } 1277 }
1344 GNUNET_STATISTICS_update (h->stats, 1278 GNUNET_STATISTICS_update (h->stats,
1345 gettext_noop ("# Results received"), 1279 gettext_noop ("# Results received"), 1, GNUNET_NO);
1346 1, 1280 dm = (const struct DataMessage *) msg;
1347 GNUNET_NO);
1348 dm = (const struct DataMessage*) msg;
1349#if DEBUG_DATASTORE 1281#if DEBUG_DATASTORE
1350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351 "Received result %llu with type %u and size %u with key %s\n", 1283 "Received result %llu with type %u and size %u with key %s\n",
1352 (unsigned long long) GNUNET_ntohll(dm->uid), 1284 (unsigned long long) GNUNET_ntohll (dm->uid),
1353 ntohl(dm->type), 1285 ntohl (dm->type), ntohl (dm->size), GNUNET_h2s (&dm->key));
1354 ntohl(dm->size),
1355 GNUNET_h2s(&dm->key));
1356#endif 1286#endif
1357 free_queue_entry (qe); 1287 free_queue_entry (qe);
1358 h->retry_time.rel_value = 0; 1288 h->retry_time.rel_value = 0;
1359 process_queue (h); 1289 process_queue (h);
1360 if (rc.proc != NULL) 1290 if (rc.proc != NULL)
1361 rc.proc (rc.proc_cls, 1291 rc.proc (rc.proc_cls,
1362 &dm->key, 1292 &dm->key,
1363 ntohl(dm->size), 1293 ntohl (dm->size),
1364 &dm[1], 1294 &dm[1],
1365 ntohl(dm->type), 1295 ntohl (dm->type),
1366 ntohl(dm->priority), 1296 ntohl (dm->priority),
1367 ntohl(dm->anonymity), 1297 ntohl (dm->anonymity),
1368 GNUNET_TIME_absolute_ntoh(dm->expiration), 1298 GNUNET_TIME_absolute_ntoh (dm->expiration),
1369 GNUNET_ntohll(dm->uid)); 1299 GNUNET_ntohll (dm->uid));
1370} 1300}
1371 1301
1372 1302
@@ -1391,11 +1321,11 @@ process_result_message (void *cls,
1391 */ 1321 */
1392struct GNUNET_DATASTORE_QueueEntry * 1322struct GNUNET_DATASTORE_QueueEntry *
1393GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, 1323GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1394 unsigned int queue_priority, 1324 unsigned int queue_priority,
1395 unsigned int max_queue_size, 1325 unsigned int max_queue_size,
1396 struct GNUNET_TIME_Relative timeout, 1326 struct GNUNET_TIME_Relative timeout,
1397 GNUNET_DATASTORE_DatumProcessor proc, 1327 GNUNET_DATASTORE_DatumProcessor proc,
1398 void *proc_cls) 1328 void *proc_cls)
1399{ 1329{
1400 struct GNUNET_DATASTORE_QueueEntry *qe; 1330 struct GNUNET_DATASTORE_QueueEntry *qe;
1401 struct GNUNET_MessageHeader *m; 1331 struct GNUNET_MessageHeader *m;
@@ -1404,29 +1334,29 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1404 GNUNET_assert (NULL != proc); 1334 GNUNET_assert (NULL != proc);
1405#if DEBUG_DATASTORE 1335#if DEBUG_DATASTORE
1406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407 "Asked to get replication entry in %llu ms\n", 1337 "Asked to get replication entry in %llu ms\n",
1408 (unsigned long long) timeout.rel_value); 1338 (unsigned long long) timeout.rel_value);
1409#endif 1339#endif
1410 qc.rc.proc = proc; 1340 qc.rc.proc = proc;
1411 qc.rc.proc_cls = proc_cls; 1341 qc.rc.proc_cls = proc_cls;
1412 qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), 1342 qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1413 queue_priority, max_queue_size, timeout, 1343 queue_priority, max_queue_size, timeout,
1414 &process_result_message, &qc); 1344 &process_result_message, &qc);
1415 if (qe == NULL) 1345 if (qe == NULL)
1416 { 1346 {
1417#if DEBUG_DATASTORE 1347#if DEBUG_DATASTORE
1418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419 "Could not create queue entry for GET REPLICATION\n"); 1349 "Could not create queue entry for GET REPLICATION\n");
1420#endif 1350#endif
1421 return NULL; 1351 return NULL;
1422 } 1352 }
1423 GNUNET_STATISTICS_update (h->stats, 1353 GNUNET_STATISTICS_update (h->stats,
1424 gettext_noop ("# GET REPLICATION requests executed"), 1354 gettext_noop
1425 1, 1355 ("# GET REPLICATION requests executed"), 1,
1426 GNUNET_NO); 1356 GNUNET_NO);
1427 m = (struct GNUNET_MessageHeader*) &qe[1]; 1357 m = (struct GNUNET_MessageHeader *) &qe[1];
1428 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); 1358 m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1429 m->size = htons(sizeof (struct GNUNET_MessageHeader)); 1359 m->size = htons (sizeof (struct GNUNET_MessageHeader));
1430 process_queue (h); 1360 process_queue (h);
1431 return qe; 1361 return qe;
1432} 1362}
@@ -1454,13 +1384,13 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1454 */ 1384 */
1455struct GNUNET_DATASTORE_QueueEntry * 1385struct GNUNET_DATASTORE_QueueEntry *
1456GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1386GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1457 uint64_t offset, 1387 uint64_t offset,
1458 unsigned int queue_priority, 1388 unsigned int queue_priority,
1459 unsigned int max_queue_size, 1389 unsigned int max_queue_size,
1460 struct GNUNET_TIME_Relative timeout, 1390 struct GNUNET_TIME_Relative timeout,
1461 enum GNUNET_BLOCK_Type type, 1391 enum GNUNET_BLOCK_Type type,
1462 GNUNET_DATASTORE_DatumProcessor proc, 1392 GNUNET_DATASTORE_DatumProcessor proc,
1463 void *proc_cls) 1393 void *proc_cls)
1464{ 1394{
1465 struct GNUNET_DATASTORE_QueueEntry *qe; 1395 struct GNUNET_DATASTORE_QueueEntry *qe;
1466 struct GetZeroAnonymityMessage *m; 1396 struct GetZeroAnonymityMessage *m;
@@ -1470,31 +1400,30 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1470 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1400 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1471#if DEBUG_DATASTORE 1401#if DEBUG_DATASTORE
1472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473 "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", 1403 "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1474 (unsigned long long) offset, 1404 (unsigned long long) offset,
1475 type, 1405 type, (unsigned long long) timeout.rel_value);
1476 (unsigned long long) timeout.rel_value);
1477#endif 1406#endif
1478 qc.rc.proc = proc; 1407 qc.rc.proc = proc;
1479 qc.rc.proc_cls = proc_cls; 1408 qc.rc.proc_cls = proc_cls;
1480 qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), 1409 qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1481 queue_priority, max_queue_size, timeout, 1410 queue_priority, max_queue_size, timeout,
1482 &process_result_message, &qc); 1411 &process_result_message, &qc);
1483 if (qe == NULL) 1412 if (qe == NULL)
1484 { 1413 {
1485#if DEBUG_DATASTORE 1414#if DEBUG_DATASTORE
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "Could not create queue entry for zero-anonymity procation\n"); 1416 "Could not create queue entry for zero-anonymity procation\n");
1488#endif 1417#endif
1489 return NULL; 1418 return NULL;
1490 } 1419 }
1491 GNUNET_STATISTICS_update (h->stats, 1420 GNUNET_STATISTICS_update (h->stats,
1492 gettext_noop ("# GET ZERO ANONYMITY requests executed"), 1421 gettext_noop
1493 1, 1422 ("# GET ZERO ANONYMITY requests executed"), 1,
1494 GNUNET_NO); 1423 GNUNET_NO);
1495 m = (struct GetZeroAnonymityMessage*) &qe[1]; 1424 m = (struct GetZeroAnonymityMessage *) &qe[1];
1496 m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1425 m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1497 m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); 1426 m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1498 m->type = htonl ((uint32_t) type); 1427 m->type = htonl ((uint32_t) type);
1499 m->offset = GNUNET_htonll (offset); 1428 m->offset = GNUNET_htonll (offset);
1500 process_queue (h); 1429 process_queue (h);
@@ -1525,14 +1454,13 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1525 */ 1454 */
1526struct GNUNET_DATASTORE_QueueEntry * 1455struct GNUNET_DATASTORE_QueueEntry *
1527GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1456GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1528 uint64_t offset, 1457 uint64_t offset,
1529 const GNUNET_HashCode * key, 1458 const GNUNET_HashCode * key,
1530 enum GNUNET_BLOCK_Type type, 1459 enum GNUNET_BLOCK_Type type,
1531 unsigned int queue_priority, 1460 unsigned int queue_priority,
1532 unsigned int max_queue_size, 1461 unsigned int max_queue_size,
1533 struct GNUNET_TIME_Relative timeout, 1462 struct GNUNET_TIME_Relative timeout,
1534 GNUNET_DATASTORE_DatumProcessor proc, 1463 GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1535 void *proc_cls)
1536{ 1464{
1537 struct GNUNET_DATASTORE_QueueEntry *qe; 1465 struct GNUNET_DATASTORE_QueueEntry *qe;
1538 struct GetMessage *gm; 1466 struct GetMessage *gm;
@@ -1541,41 +1469,39 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1541 GNUNET_assert (NULL != proc); 1469 GNUNET_assert (NULL != proc);
1542#if DEBUG_DATASTORE 1470#if DEBUG_DATASTORE
1543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1544 "Asked to look for data of type %u under key `%s'\n", 1472 "Asked to look for data of type %u under key `%s'\n",
1545 (unsigned int) type, 1473 (unsigned int) type, GNUNET_h2s (key));
1546 GNUNET_h2s (key));
1547#endif 1474#endif
1548 qc.rc.proc = proc; 1475 qc.rc.proc = proc;
1549 qc.rc.proc_cls = proc_cls; 1476 qc.rc.proc_cls = proc_cls;
1550 qe = make_queue_entry (h, sizeof(struct GetMessage), 1477 qe = make_queue_entry (h, sizeof (struct GetMessage),
1551 queue_priority, max_queue_size, timeout, 1478 queue_priority, max_queue_size, timeout,
1552 &process_result_message, &qc); 1479 &process_result_message, &qc);
1553 if (qe == NULL) 1480 if (qe == NULL)
1554 { 1481 {
1555#if DEBUG_DATASTORE 1482#if DEBUG_DATASTORE
1556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557 "Could not queue request for `%s'\n", 1484 "Could not queue request for `%s'\n", GNUNET_h2s (key));
1558 GNUNET_h2s (key));
1559#endif 1485#endif
1560 return NULL; 1486 return NULL;
1561 } 1487 }
1562 GNUNET_STATISTICS_update (h->stats, 1488 GNUNET_STATISTICS_update (h->stats,
1563 gettext_noop ("# GET requests executed"), 1489 gettext_noop ("# GET requests executed"),
1564 1, 1490 1, GNUNET_NO);
1565 GNUNET_NO); 1491 gm = (struct GetMessage *) &qe[1];
1566 gm = (struct GetMessage*) &qe[1]; 1492 gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1567 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1493 gm->type = htonl (type);
1568 gm->type = htonl(type);
1569 gm->offset = GNUNET_htonll (offset); 1494 gm->offset = GNUNET_htonll (offset);
1570 if (key != NULL) 1495 if (key != NULL)
1571 { 1496 {
1572 gm->header.size = htons(sizeof (struct GetMessage)); 1497 gm->header.size = htons (sizeof (struct GetMessage));
1573 gm->key = *key; 1498 gm->key = *key;
1574 } 1499 }
1575 else 1500 else
1576 { 1501 {
1577 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); 1502 gm->header.size =
1578 } 1503 htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1504 }
1579 process_queue (h); 1505 process_queue (h);
1580 return qe; 1506 return qe;
1581} 1507}
@@ -1595,18 +1521,16 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1595 GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); 1521 GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1596 h = qe->h; 1522 h = qe->h;
1597#if DEBUG_DATASTORE 1523#if DEBUG_DATASTORE
1598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1525 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1600 qe, 1526 qe, qe->was_transmitted, h->queue_head == qe);
1601 qe->was_transmitted,
1602 h->queue_head == qe);
1603#endif 1527#endif
1604 if (GNUNET_YES == qe->was_transmitted) 1528 if (GNUNET_YES == qe->was_transmitted)
1605 { 1529 {
1606 free_queue_entry (qe); 1530 free_queue_entry (qe);
1607 h->skip_next_messages++; 1531 h->skip_next_messages++;
1608 return; 1532 return;
1609 } 1533 }
1610 free_queue_entry (qe); 1534 free_queue_entry (qe);
1611 process_queue (h); 1535 process_queue (h);
1612} 1536}