aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/gnunet-service-datastore.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/gnunet-service-datastore.c')
-rw-r--r--src/datastore/gnunet-service-datastore.c1289
1 files changed, 587 insertions, 702 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 39354acaf..c01de5891 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -92,7 +92,7 @@ struct DatastorePlugin
92/** 92/**
93 * Linked list of active reservations. 93 * Linked list of active reservations.
94 */ 94 */
95struct ReservationList 95struct ReservationList
96{ 96{
97 97
98 /** 98 /**
@@ -161,7 +161,7 @@ static unsigned long long cache_size;
161 * How much space have we currently reserved? 161 * How much space have we currently reserved?
162 */ 162 */
163static unsigned long long reserved; 163static unsigned long long reserved;
164 164
165/** 165/**
166 * How much data are we currently storing 166 * How much data are we currently storing
167 * in the database? 167 * in the database?
@@ -202,13 +202,10 @@ static struct GNUNET_STATISTICS_Handle *stats;
202 * Synchronize our utilization statistics with the 202 * Synchronize our utilization statistics with the
203 * statistics service. 203 * statistics service.
204 */ 204 */
205static void 205static void
206sync_stats () 206sync_stats ()
207{ 207{
208 GNUNET_STATISTICS_set (stats, 208 GNUNET_STATISTICS_set (stats, QUOTA_STAT_NAME, payload, GNUNET_YES);
209 QUOTA_STAT_NAME,
210 payload,
211 GNUNET_YES);
212 lastSync = 0; 209 lastSync = 0;
213} 210}
214 211
@@ -217,24 +214,24 @@ sync_stats ()
217/** 214/**
218 * Context for transmitting replies to clients. 215 * Context for transmitting replies to clients.
219 */ 216 */
220struct TransmitCallbackContext 217struct TransmitCallbackContext
221{ 218{
222 219
223 /** 220 /**
224 * We keep these in a doubly-linked list (for cleanup). 221 * We keep these in a doubly-linked list (for cleanup).
225 */ 222 */
226 struct TransmitCallbackContext *next; 223 struct TransmitCallbackContext *next;
227 224
228 /** 225 /**
229 * We keep these in a doubly-linked list (for cleanup). 226 * We keep these in a doubly-linked list (for cleanup).
230 */ 227 */
231 struct TransmitCallbackContext *prev; 228 struct TransmitCallbackContext *prev;
232 229
233 /** 230 /**
234 * The message that we're asked to transmit. 231 * The message that we're asked to transmit.
235 */ 232 */
236 struct GNUNET_MessageHeader *msg; 233 struct GNUNET_MessageHeader *msg;
237 234
238 /** 235 /**
239 * Handle for the transmission request. 236 * Handle for the transmission request.
240 */ 237 */
@@ -247,7 +244,7 @@ struct TransmitCallbackContext
247 244
248}; 245};
249 246
250 247
251/** 248/**
252 * Head of the doubly-linked list (for cleanup). 249 * Head of the doubly-linked list (for cleanup).
253 */ 250 */
@@ -278,10 +275,9 @@ static struct GNUNET_STATISTICS_GetHandle *stat_get;
278 * 275 *
279 * @param cls not used 276 * @param cls not used
280 * @param tc task context 277 * @param tc task context
281 */ 278 */
282static void 279static void
283delete_expired (void *cls, 280delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
284 const struct GNUNET_SCHEDULER_TaskContext *tc);
285 281
286 282
287/** 283/**
@@ -304,55 +300,46 @@ delete_expired (void *cls,
304 * (continue on call to "next", of course), 300 * (continue on call to "next", of course),
305 * GNUNET_NO to delete the item and continue (if supported) 301 * GNUNET_NO to delete the item and continue (if supported)
306 */ 302 */
307static int 303static int
308expired_processor (void *cls, 304expired_processor (void *cls,
309 const GNUNET_HashCode * key, 305 const GNUNET_HashCode * key,
310 uint32_t size, 306 uint32_t size,
311 const void *data, 307 const void *data,
312 enum GNUNET_BLOCK_Type type, 308 enum GNUNET_BLOCK_Type type,
313 uint32_t priority, 309 uint32_t priority,
314 uint32_t anonymity, 310 uint32_t anonymity,
315 struct GNUNET_TIME_Absolute 311 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
316 expiration,
317 uint64_t uid)
318{ 312{
319 struct GNUNET_TIME_Absolute now; 313 struct GNUNET_TIME_Absolute now;
320 314
321 if (key == NULL) 315 if (key == NULL)
322 { 316 {
323 expired_kill_task 317 expired_kill_task
324 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, 318 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
325 &delete_expired, 319 &delete_expired, NULL);
326 NULL); 320 return GNUNET_SYSERR;
327 return GNUNET_SYSERR; 321 }
328 }
329 now = GNUNET_TIME_absolute_get (); 322 now = GNUNET_TIME_absolute_get ();
330 if (expiration.abs_value > now.abs_value) 323 if (expiration.abs_value > now.abs_value)
331 { 324 {
332 /* finished processing */ 325 /* finished processing */
333 expired_kill_task 326 expired_kill_task
334 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, 327 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
335 &delete_expired, 328 &delete_expired, NULL);
336 NULL); 329 return GNUNET_SYSERR;
337 return GNUNET_SYSERR; 330 }
338 }
339#if DEBUG_DATASTORE 331#if DEBUG_DATASTORE
340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341 "Deleting content `%s' of type %u that expired %llu ms ago\n", 333 "Deleting content `%s' of type %u that expired %llu ms ago\n",
342 GNUNET_h2s (key), 334 GNUNET_h2s (key),
343 type, 335 type,
344 (unsigned long long) (now.abs_value - expiration.abs_value)); 336 (unsigned long long) (now.abs_value - expiration.abs_value));
345#endif 337#endif
346 GNUNET_STATISTICS_update (stats, 338 GNUNET_STATISTICS_update (stats,
347 gettext_noop ("# bytes expired"), 339 gettext_noop ("# bytes expired"), size, GNUNET_YES);
348 size, 340 GNUNET_CONTAINER_bloomfilter_remove (filter, key);
349 GNUNET_YES); 341 expired_kill_task
350 GNUNET_CONTAINER_bloomfilter_remove (filter, 342 = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, &delete_expired, NULL);
351 key);
352 expired_kill_task
353 = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
354 &delete_expired,
355 NULL);
356 return GNUNET_NO; 343 return GNUNET_NO;
357} 344}
358 345
@@ -365,15 +352,12 @@ expired_processor (void *cls,
365 * 352 *
366 * @param cls not used 353 * @param cls not used
367 * @param tc task context 354 * @param tc task context
368 */ 355 */
369static void 356static void
370delete_expired (void *cls, 357delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
371 const struct GNUNET_SCHEDULER_TaskContext *tc)
372{ 358{
373 expired_kill_task = GNUNET_SCHEDULER_NO_TASK; 359 expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
374 plugin->api->get_expiration (plugin->api->cls, 360 plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
375 &expired_processor,
376 NULL);
377} 361}
378 362
379 363
@@ -396,39 +380,34 @@ delete_expired (void *cls,
396 * (continue on call to "next", of course), 380 * (continue on call to "next", of course),
397 * GNUNET_NO to delete the item and continue (if supported) 381 * GNUNET_NO to delete the item and continue (if supported)
398 */ 382 */
399static int 383static int
400quota_processor (void *cls, 384quota_processor (void *cls,
401 const GNUNET_HashCode * key, 385 const GNUNET_HashCode * key,
402 uint32_t size, 386 uint32_t size,
403 const void *data, 387 const void *data,
404 enum GNUNET_BLOCK_Type type, 388 enum GNUNET_BLOCK_Type type,
405 uint32_t priority, 389 uint32_t priority,
406 uint32_t anonymity, 390 uint32_t anonymity,
407 struct GNUNET_TIME_Absolute expiration, 391 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
408 uint64_t uid)
409{ 392{
410 unsigned long long *need = cls; 393 unsigned long long *need = cls;
411 394
412 if (NULL == key) 395 if (NULL == key)
413 return GNUNET_SYSERR; 396 return GNUNET_SYSERR;
414#if DEBUG_DATASTORE 397#if DEBUG_DATASTORE
415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
416 "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n", 399 "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
417 (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD), 400 (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
418 GNUNET_h2s (key), 401 GNUNET_h2s (key), type, *need);
419 type,
420 *need);
421#endif 402#endif
422 if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need) 403 if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
423 *need = 0; 404 *need = 0;
424 else 405 else
425 *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD; 406 *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
426 GNUNET_STATISTICS_update (stats, 407 GNUNET_STATISTICS_update (stats,
427 gettext_noop ("# bytes purged (low-priority)"), 408 gettext_noop ("# bytes purged (low-priority)"),
428 size, 409 size, GNUNET_YES);
429 GNUNET_YES); 410 GNUNET_CONTAINER_bloomfilter_remove (filter, key);
430 GNUNET_CONTAINER_bloomfilter_remove (filter,
431 key);
432 return GNUNET_NO; 411 return GNUNET_NO;
433} 412}
434 413
@@ -452,18 +431,14 @@ manage_space (unsigned long long need)
452 431
453#if DEBUG_DATASTORE 432#if DEBUG_DATASTORE
454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 433 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455 "Asked to free up %llu bytes of cache space\n", 434 "Asked to free up %llu bytes of cache space\n", need);
456 need);
457#endif 435#endif
458 last = 0; 436 last = 0;
459 while ( (need > 0) && 437 while ((need > 0) && (last != need))
460 (last != need) ) 438 {
461 { 439 last = need;
462 last = need; 440 plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
463 plugin->api->get_expiration (plugin->api->cls, 441 }
464 &quota_processor,
465 &need);
466 }
467} 442}
468 443
469 444
@@ -479,27 +454,24 @@ manage_space (unsigned long long need)
479 * @return number of bytes written to buf 454 * @return number of bytes written to buf
480 */ 455 */
481static size_t 456static size_t
482transmit_callback (void *cls, 457transmit_callback (void *cls, size_t size, void *buf)
483 size_t size, void *buf)
484{ 458{
485 struct TransmitCallbackContext *tcc = cls; 459 struct TransmitCallbackContext *tcc = cls;
486 size_t msize; 460 size_t msize;
487 461
488 tcc->th = NULL; 462 tcc->th = NULL;
489 GNUNET_CONTAINER_DLL_remove (tcc_head, 463 GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
490 tcc_tail, 464 msize = ntohs (tcc->msg->size);
491 tcc);
492 msize = ntohs(tcc->msg->size);
493 if (size == 0) 465 if (size == 0)
494 { 466 {
495 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 467 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
496 _("Transmission to client failed!\n")); 468 _("Transmission to client failed!\n"));
497 GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR); 469 GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
498 GNUNET_SERVER_client_drop (tcc->client); 470 GNUNET_SERVER_client_drop (tcc->client);
499 GNUNET_free (tcc->msg); 471 GNUNET_free (tcc->msg);
500 GNUNET_free (tcc); 472 GNUNET_free (tcc);
501 return 0; 473 return 0;
502 } 474 }
503 GNUNET_assert (size >= msize); 475 GNUNET_assert (size >= msize);
504 memcpy (buf, tcc->msg, msize); 476 memcpy (buf, tcc->msg, msize);
505 GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK); 477 GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
@@ -517,41 +489,37 @@ transmit_callback (void *cls,
517 * @param msg message to transmit, will be freed! 489 * @param msg message to transmit, will be freed!
518 */ 490 */
519static void 491static void
520transmit (struct GNUNET_SERVER_Client *client, 492transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
521 struct GNUNET_MessageHeader *msg)
522{ 493{
523 struct TransmitCallbackContext *tcc; 494 struct TransmitCallbackContext *tcc;
524 495
525 if (GNUNET_YES == cleaning_done) 496 if (GNUNET_YES == cleaning_done)
526 { 497 {
527#if DEBUG_DATASTORE 498#if DEBUG_DATASTORE
528 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 499 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529 "Shutdown in progress, aborting transmission.\n"); 500 "Shutdown in progress, aborting transmission.\n");
530#endif 501#endif
531 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 502 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
532 GNUNET_free (msg); 503 GNUNET_free (msg);
533 return; 504 return;
534 } 505 }
535 tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext)); 506 tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
536 tcc->msg = msg; 507 tcc->msg = msg;
537 tcc->client = client; 508 tcc->client = client;
538 if (NULL == 509 if (NULL ==
539 (tcc->th = GNUNET_SERVER_notify_transmit_ready (client, 510 (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
540 ntohs(msg->size), 511 ntohs (msg->size),
541 GNUNET_TIME_UNIT_FOREVER_REL, 512 GNUNET_TIME_UNIT_FOREVER_REL,
542 &transmit_callback, 513 &transmit_callback, tcc)))
543 tcc))) 514 {
544 { 515 GNUNET_break (0);
545 GNUNET_break (0); 516 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
546 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 517 GNUNET_free (msg);
547 GNUNET_free (msg); 518 GNUNET_free (tcc);
548 GNUNET_free (tcc); 519 return;
549 return; 520 }
550 }
551 GNUNET_SERVER_client_keep (client); 521 GNUNET_SERVER_client_keep (client);
552 GNUNET_CONTAINER_DLL_insert (tcc_head, 522 GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
553 tcc_tail,
554 tcc);
555} 523}
556 524
557 525
@@ -563,27 +531,23 @@ transmit (struct GNUNET_SERVER_Client *client,
563 * @param msg optional error message (can be NULL) 531 * @param msg optional error message (can be NULL)
564 */ 532 */
565static void 533static void
566transmit_status (struct GNUNET_SERVER_Client *client, 534transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
567 int code,
568 const char *msg)
569{ 535{
570 struct StatusMessage *sm; 536 struct StatusMessage *sm;
571 size_t slen; 537 size_t slen;
572 538
573#if DEBUG_DATASTORE 539#if DEBUG_DATASTORE
574 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575 "Transmitting `%s' message with value %d and message `%s'\n", 541 "Transmitting `%s' message with value %d and message `%s'\n",
576 "STATUS", 542 "STATUS", code, msg != NULL ? msg : "(none)");
577 code,
578 msg != NULL ? msg : "(none)");
579#endif 543#endif
580 slen = (msg == NULL) ? 0 : strlen(msg) + 1; 544 slen = (msg == NULL) ? 0 : strlen (msg) + 1;
581 sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen); 545 sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
582 sm->header.size = htons(sizeof(struct StatusMessage) + slen); 546 sm->header.size = htons (sizeof (struct StatusMessage) + slen);
583 sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); 547 sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
584 sm->status = htonl(code); 548 sm->status = htonl (code);
585 if (slen > 0) 549 if (slen > 0)
586 memcpy (&sm[1], msg, slen); 550 memcpy (&sm[1], msg, slen);
587 transmit (client, &sm->header); 551 transmit (client, &sm->header);
588} 552}
589 553
@@ -609,62 +573,59 @@ transmit_status (struct GNUNET_SERVER_Client *client,
609 */ 573 */
610static int 574static int
611transmit_item (void *cls, 575transmit_item (void *cls,
612 const GNUNET_HashCode * key, 576 const GNUNET_HashCode * key,
613 uint32_t size, 577 uint32_t size,
614 const void *data, 578 const void *data,
615 enum GNUNET_BLOCK_Type type, 579 enum GNUNET_BLOCK_Type type,
616 uint32_t priority, 580 uint32_t priority,
617 uint32_t anonymity, 581 uint32_t anonymity,
618 struct GNUNET_TIME_Absolute 582 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
619 expiration, uint64_t uid)
620{ 583{
621 struct GNUNET_SERVER_Client *client = cls; 584 struct GNUNET_SERVER_Client *client = cls;
622 struct GNUNET_MessageHeader *end; 585 struct GNUNET_MessageHeader *end;
623 struct DataMessage *dm; 586 struct DataMessage *dm;
624 587
625 if (key == NULL) 588 if (key == NULL)
626 { 589 {
627 /* transmit 'DATA_END' */ 590 /* transmit 'DATA_END' */
628#if DEBUG_DATASTORE 591#if DEBUG_DATASTORE
629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630 "Transmitting `%s' message\n", 593 "Transmitting `%s' message\n", "DATA_END");
631 "DATA_END");
632#endif 594#endif
633 end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader)); 595 end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
634 end->size = htons(sizeof(struct GNUNET_MessageHeader)); 596 end->size = htons (sizeof (struct GNUNET_MessageHeader));
635 end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); 597 end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
636 transmit (client, end); 598 transmit (client, end);
637 GNUNET_SERVER_client_drop (client); 599 GNUNET_SERVER_client_drop (client);
638 return GNUNET_OK; 600 return GNUNET_OK;
639 } 601 }
640 GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE); 602 GNUNET_assert (sizeof (struct DataMessage) + size <
641 dm = GNUNET_malloc (sizeof(struct DataMessage) + size); 603 GNUNET_SERVER_MAX_MESSAGE_SIZE);
642 dm->header.size = htons(sizeof(struct DataMessage) + size); 604 dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
643 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA); 605 dm->header.size = htons (sizeof (struct DataMessage) + size);
644 dm->rid = htonl(0); 606 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
645 dm->size = htonl(size); 607 dm->rid = htonl (0);
646 dm->type = htonl(type); 608 dm->size = htonl (size);
647 dm->priority = htonl(priority); 609 dm->type = htonl (type);
648 dm->anonymity = htonl(anonymity); 610 dm->priority = htonl (priority);
611 dm->anonymity = htonl (anonymity);
649 dm->replication = htonl (0); 612 dm->replication = htonl (0);
650 dm->reserved = htonl (0); 613 dm->reserved = htonl (0);
651 dm->expiration = GNUNET_TIME_absolute_hton(expiration); 614 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
652 dm->uid = GNUNET_htonll(uid); 615 dm->uid = GNUNET_htonll (uid);
653 dm->key = *key; 616 dm->key = *key;
654 memcpy (&dm[1], data, size); 617 memcpy (&dm[1], data, size);
655#if DEBUG_DATASTORE 618#if DEBUG_DATASTORE
656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
657 "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n", 620 "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
658 "DATA", 621 "DATA",
659 GNUNET_h2s (key), 622 GNUNET_h2s (key),
660 type, 623 type,
661 (unsigned long long) expiration.abs_value, 624 (unsigned long long) expiration.abs_value,
662 (unsigned long long) GNUNET_TIME_absolute_get ().abs_value); 625 (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
663#endif 626#endif
664 GNUNET_STATISTICS_update (stats, 627 GNUNET_STATISTICS_update (stats,
665 gettext_noop ("# results found"), 628 gettext_noop ("# results found"), 1, GNUNET_NO);
666 1,
667 GNUNET_NO);
668 transmit (client, &dm->header); 629 transmit (client, &dm->header);
669 GNUNET_SERVER_client_drop (client); 630 GNUNET_SERVER_client_drop (client);
670 return GNUNET_OK; 631 return GNUNET_OK;
@@ -680,15 +641,15 @@ transmit_item (void *cls,
680 */ 641 */
681static void 642static void
682handle_reserve (void *cls, 643handle_reserve (void *cls,
683 struct GNUNET_SERVER_Client *client, 644 struct GNUNET_SERVER_Client *client,
684 const struct GNUNET_MessageHeader *message) 645 const struct GNUNET_MessageHeader *message)
685{ 646{
686 /** 647 /**
687 * Static counter to produce reservation identifiers. 648 * Static counter to produce reservation identifiers.
688 */ 649 */
689 static int reservation_gen; 650 static int reservation_gen;
690 651
691 const struct ReserveMessage *msg = (const struct ReserveMessage*) message; 652 const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
692 struct ReservationList *e; 653 struct ReservationList *e;
693 unsigned long long used; 654 unsigned long long used;
694 unsigned long long req; 655 unsigned long long req;
@@ -696,51 +657,48 @@ handle_reserve (void *cls,
696 uint32_t entries; 657 uint32_t entries;
697 658
698#if DEBUG_DATASTORE 659#if DEBUG_DATASTORE
699 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
700 "Processing `%s' request\n",
701 "RESERVE");
702#endif 661#endif
703 amount = GNUNET_ntohll(msg->amount); 662 amount = GNUNET_ntohll (msg->amount);
704 entries = ntohl(msg->entries); 663 entries = ntohl (msg->entries);
705 used = payload + reserved; 664 used = payload + reserved;
706 req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries; 665 req =
666 amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
707 if (used + req > quota) 667 if (used + req > quota)
668 {
669 if (quota < used)
670 used = quota; /* cheat a bit for error message (to avoid negative numbers) */
671 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
672 _
673 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
674 quota - used, "RESERVE", req);
675 if (cache_size < req)
708 { 676 {
709 if (quota < used) 677 /* TODO: document this in the FAQ; essentially, if this
710 used = quota; /* cheat a bit for error message (to avoid negative numbers) */ 678 * message happens, the insertion request could be blocked
679 * by less-important content from migration because it is
680 * larger than 1/8th of the overall available space, and
681 * we only reserve 1/8th for "fresh" insertions */
711 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 682 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
712 _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"), 683 _
713 quota - used, 684 ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
714 "RESERVE", 685 req, cache_size);
715 req); 686 transmit_status (client, 0,
716 if (cache_size < req) 687 gettext_noop
717 { 688 ("Insufficient space to satisfy request and "
718 /* TODO: document this in the FAQ; essentially, if this 689 "requested amount is larger than cache size"));
719 message happens, the insertion request could be blocked
720 by less-important content from migration because it is
721 larger than 1/8th of the overall available space, and
722 we only reserve 1/8th for "fresh" insertions */
723 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
724 _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
725 req,
726 cache_size);
727 transmit_status (client, 0,
728 gettext_noop ("Insufficient space to satisfy request and "
729 "requested amount is larger than cache size"));
730 }
731 else
732 {
733 transmit_status (client, 0,
734 gettext_noop ("Insufficient space to satisfy request"));
735 }
736 return;
737 } 690 }
691 else
692 {
693 transmit_status (client, 0,
694 gettext_noop ("Insufficient space to satisfy request"));
695 }
696 return;
697 }
738 reserved += req; 698 reserved += req;
739 GNUNET_STATISTICS_set (stats, 699 GNUNET_STATISTICS_set (stats,
740 gettext_noop ("# reserved"), 700 gettext_noop ("# reserved"), reserved, GNUNET_NO);
741 reserved, 701 e = GNUNET_malloc (sizeof (struct ReservationList));
742 GNUNET_NO);
743 e = GNUNET_malloc (sizeof(struct ReservationList));
744 e->next = reservations; 702 e->next = reservations;
745 reservations = e; 703 reservations = e;
746 e->client = client; 704 e->client = client;
@@ -748,7 +706,7 @@ handle_reserve (void *cls,
748 e->entries = entries; 706 e->entries = entries;
749 e->rid = ++reservation_gen; 707 e->rid = ++reservation_gen;
750 if (reservation_gen < 0) 708 if (reservation_gen < 0)
751 reservation_gen = 0; /* wrap around */ 709 reservation_gen = 0; /* wrap around */
752 transmit_status (client, e->rid, NULL); 710 transmit_status (client, e->rid, NULL);
753} 711}
754 712
@@ -762,52 +720,53 @@ handle_reserve (void *cls,
762 */ 720 */
763static void 721static void
764handle_release_reserve (void *cls, 722handle_release_reserve (void *cls,
765 struct GNUNET_SERVER_Client *client, 723 struct GNUNET_SERVER_Client *client,
766 const struct GNUNET_MessageHeader *message) 724 const struct GNUNET_MessageHeader *message)
767{ 725{
768 const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message; 726 const struct ReleaseReserveMessage *msg =
727 (const struct ReleaseReserveMessage *) message;
769 struct ReservationList *pos; 728 struct ReservationList *pos;
770 struct ReservationList *prev; 729 struct ReservationList *prev;
771 struct ReservationList *next; 730 struct ReservationList *next;
772 int rid = ntohl(msg->rid); 731 int rid = ntohl (msg->rid);
773 unsigned long long rem; 732 unsigned long long rem;
774 733
775#if DEBUG_DATASTORE 734#if DEBUG_DATASTORE
776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777 "Processing `%s' request\n", 736 "Processing `%s' request\n", "RELEASE_RESERVE");
778 "RELEASE_RESERVE");
779#endif 737#endif
780 next = reservations; 738 next = reservations;
781 prev = NULL; 739 prev = NULL;
782 while (NULL != (pos = next)) 740 while (NULL != (pos = next))
741 {
742 next = pos->next;
743 if (rid == pos->rid)
783 { 744 {
784 next = pos->next; 745 if (prev == NULL)
785 if (rid == pos->rid) 746 reservations = next;
786 { 747 else
787 if (prev == NULL) 748 prev->next = next;
788 reservations = next; 749 rem =
789 else 750 pos->amount +
790 prev->next = next; 751 ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
791 rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries; 752 GNUNET_assert (reserved >= rem);
792 GNUNET_assert (reserved >= rem); 753 reserved -= rem;
793 reserved -= rem; 754 GNUNET_STATISTICS_set (stats,
794 GNUNET_STATISTICS_set (stats, 755 gettext_noop ("# reserved"), reserved, GNUNET_NO);
795 gettext_noop ("# reserved"),
796 reserved,
797 GNUNET_NO);
798#if DEBUG_DATASTORE 756#if DEBUG_DATASTORE
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "Returning %llu remaining reserved bytes to storage pool\n", 758 "Returning %llu remaining reserved bytes to storage pool\n",
801 rem); 759 rem);
802#endif 760#endif
803 GNUNET_free (pos); 761 GNUNET_free (pos);
804 transmit_status (client, GNUNET_OK, NULL); 762 transmit_status (client, GNUNET_OK, NULL);
805 return; 763 return;
806 }
807 prev = pos;
808 } 764 }
765 prev = pos;
766 }
809 GNUNET_break (0); 767 GNUNET_break (0);
810 transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation")); 768 transmit_status (client, GNUNET_SYSERR,
769 gettext_noop ("Could not find matching reservation"));
811} 770}
812 771
813 772
@@ -823,19 +782,19 @@ check_data (const struct GNUNET_MessageHeader *message)
823 uint32_t dsize; 782 uint32_t dsize;
824 const struct DataMessage *dm; 783 const struct DataMessage *dm;
825 784
826 size = ntohs(message->size); 785 size = ntohs (message->size);
827 if (size < sizeof(struct DataMessage)) 786 if (size < sizeof (struct DataMessage))
828 { 787 {
829 GNUNET_break (0); 788 GNUNET_break (0);
830 return NULL; 789 return NULL;
831 } 790 }
832 dm = (const struct DataMessage *) message; 791 dm = (const struct DataMessage *) message;
833 dsize = ntohl(dm->size); 792 dsize = ntohl (dm->size);
834 if (size != dsize + sizeof(struct DataMessage)) 793 if (size != dsize + sizeof (struct DataMessage))
835 { 794 {
836 GNUNET_break (0); 795 GNUNET_break (0);
837 return NULL; 796 return NULL;
838 } 797 }
839 return dm; 798 return dm;
840} 799}
841 800
@@ -853,8 +812,8 @@ struct PutContext
853 812
854#if ! HAVE_UNALIGNED_64_ACCESS 813#if ! HAVE_UNALIGNED_64_ACCESS
855 void *reserved; 814 void *reserved;
856#endif 815#endif
857 816
858 /* followed by the 'struct DataMessage' */ 817 /* followed by the 'struct DataMessage' */
859}; 818};
860 819
@@ -863,54 +822,46 @@ struct PutContext
863 * Actually put the data message. 822 * Actually put the data message.
864 */ 823 */
865static void 824static void
866execute_put (struct GNUNET_SERVER_Client *client, 825execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
867 const struct DataMessage *dm)
868{ 826{
869 uint32_t size; 827 uint32_t size;
870 char *msg; 828 char *msg;
871 int ret; 829 int ret;
872 830
873 size = ntohl(dm->size); 831 size = ntohl (dm->size);
874 msg = NULL; 832 msg = NULL;
875 ret = plugin->api->put (plugin->api->cls, 833 ret = plugin->api->put (plugin->api->cls,
876 &dm->key, 834 &dm->key,
877 size, 835 size,
878 &dm[1], 836 &dm[1],
879 ntohl(dm->type), 837 ntohl (dm->type),
880 ntohl(dm->priority), 838 ntohl (dm->priority),
881 ntohl(dm->anonymity), 839 ntohl (dm->anonymity),
882 ntohl(dm->replication), 840 ntohl (dm->replication),
883 GNUNET_TIME_absolute_ntoh(dm->expiration), 841 GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
884 &msg);
885 if (GNUNET_OK == ret) 842 if (GNUNET_OK == ret)
886 { 843 {
887 GNUNET_STATISTICS_update (stats, 844 GNUNET_STATISTICS_update (stats,
888 gettext_noop ("# bytes stored"), 845 gettext_noop ("# bytes stored"),
889 size, 846 size, GNUNET_YES);
890 GNUNET_YES); 847 GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
891 GNUNET_CONTAINER_bloomfilter_add (filter,
892 &dm->key);
893#if DEBUG_DATASTORE 848#if DEBUG_DATASTORE
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "Successfully stored %u bytes of type %u under key `%s'\n", 850 "Successfully stored %u bytes of type %u under key `%s'\n",
896 size, 851 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
897 ntohl(dm->type),
898 GNUNET_h2s (&dm->key));
899#endif 852#endif
900 } 853 }
901 transmit_status (client, 854 transmit_status (client, ret, msg);
902 ret,
903 msg);
904 GNUNET_free_non_null (msg); 855 GNUNET_free_non_null (msg);
905 if (quota - reserved - cache_size < payload) 856 if (quota - reserved - cache_size < payload)
906 { 857 {
907 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 858 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
908 _("Need %llu bytes more space (%llu allowed, using %llu)\n"), 859 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
909 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD, 860 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
910 (unsigned long long) (quota - reserved - cache_size), 861 (unsigned long long) (quota - reserved - cache_size),
911 (unsigned long long) payload); 862 (unsigned long long) payload);
912 manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD); 863 manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
913 } 864 }
914} 865}
915 866
916 867
@@ -934,56 +885,51 @@ execute_put (struct GNUNET_SERVER_Client *client,
934 */ 885 */
935static int 886static int
936check_present (void *cls, 887check_present (void *cls,
937 const GNUNET_HashCode * key, 888 const GNUNET_HashCode * key,
938 uint32_t size, 889 uint32_t size,
939 const void *data, 890 const void *data,
940 enum GNUNET_BLOCK_Type type, 891 enum GNUNET_BLOCK_Type type,
941 uint32_t priority, 892 uint32_t priority,
942 uint32_t anonymity, 893 uint32_t anonymity,
943 struct GNUNET_TIME_Absolute 894 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
944 expiration, uint64_t uid)
945{ 895{
946 struct PutContext *pc = cls; 896 struct PutContext *pc = cls;
947 const struct DataMessage *dm; 897 const struct DataMessage *dm;
948 898
949 dm = (const struct DataMessage*) &pc[1]; 899 dm = (const struct DataMessage *) &pc[1];
950 if (key == NULL) 900 if (key == NULL)
951 { 901 {
952 execute_put (pc->client, dm); 902 execute_put (pc->client, dm);
953 GNUNET_SERVER_client_drop (pc->client); 903 GNUNET_SERVER_client_drop (pc->client);
954 GNUNET_free (pc); 904 GNUNET_free (pc);
955 return GNUNET_OK; 905 return GNUNET_OK;
956 } 906 }
957 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) || 907 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
958 (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || 908 (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
959 ( (size == ntohl(dm->size)) && 909 ((size == ntohl (dm->size)) && (0 == memcmp (&dm[1], data, size))))
960 (0 == memcmp (&dm[1], 910 {
961 data,
962 size)) ) )
963 {
964#if DEBUG_MYSQL 911#if DEBUG_MYSQL
965 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 912 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966 "Result already present in datastore\n"); 913 "Result already present in datastore\n");
967#endif 914#endif
968 /* FIXME: change API to allow increasing 'replication' counter */ 915 /* FIXME: change API to allow increasing 'replication' counter */
969 if ( (ntohl (dm->priority) > 0) || 916 if ((ntohl (dm->priority) > 0) ||
970 (GNUNET_TIME_absolute_ntoh(dm->expiration).abs_value > 917 (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
971 expiration.abs_value) ) 918 expiration.abs_value))
972 plugin->api->update (plugin->api->cls, 919 plugin->api->update (plugin->api->cls,
973 uid, 920 uid,
974 (int32_t) ntohl(dm->priority), 921 (int32_t) ntohl (dm->priority),
975 GNUNET_TIME_absolute_ntoh(dm->expiration), 922 GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
976 NULL); 923 transmit_status (pc->client, GNUNET_NO, NULL);
977 transmit_status (pc->client, GNUNET_NO, NULL); 924 GNUNET_SERVER_client_drop (pc->client);
978 GNUNET_SERVER_client_drop (pc->client); 925 GNUNET_free (pc);
979 GNUNET_free (pc); 926 }
980 }
981 else 927 else
982 { 928 {
983 execute_put (pc->client, dm); 929 execute_put (pc->client, dm);
984 GNUNET_SERVER_client_drop (pc->client); 930 GNUNET_SERVER_client_drop (pc->client);
985 GNUNET_free (pc); 931 GNUNET_free (pc);
986 } 932 }
987 return GNUNET_OK; 933 return GNUNET_OK;
988} 934}
989 935
@@ -997,8 +943,8 @@ check_present (void *cls,
997 */ 943 */
998static void 944static void
999handle_put (void *cls, 945handle_put (void *cls,
1000 struct GNUNET_SERVER_Client *client, 946 struct GNUNET_SERVER_Client *client,
1001 const struct GNUNET_MessageHeader *message) 947 const struct GNUNET_MessageHeader *message)
1002{ 948{
1003 const struct DataMessage *dm = check_data (message); 949 const struct DataMessage *dm = check_data (message);
1004 int rid; 950 int rid;
@@ -1007,59 +953,50 @@ handle_put (void *cls,
1007 GNUNET_HashCode vhash; 953 GNUNET_HashCode vhash;
1008 uint32_t size; 954 uint32_t size;
1009 955
1010 if ( (dm == NULL) || 956 if ((dm == NULL) || (ntohl (dm->type) == 0))
1011 (ntohl(dm->type) == 0) ) 957 {
1012 { 958 GNUNET_break (0);
1013 GNUNET_break (0); 959 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1014 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 960 return;
1015 return; 961 }
1016 }
1017#if DEBUG_DATASTORE 962#if DEBUG_DATASTORE
1018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 963 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019 "Processing `%s' request for `%s' of type %u\n", 964 "Processing `%s' request for `%s' of type %u\n",
1020 "PUT", 965 "PUT", GNUNET_h2s (&dm->key), ntohl (dm->type));
1021 GNUNET_h2s (&dm->key),
1022 ntohl (dm->type));
1023#endif 966#endif
1024 rid = ntohl(dm->rid); 967 rid = ntohl (dm->rid);
1025 size = ntohl(dm->size); 968 size = ntohl (dm->size);
1026 if (rid > 0) 969 if (rid > 0)
970 {
971 pos = reservations;
972 while ((NULL != pos) && (rid != pos->rid))
973 pos = pos->next;
974 GNUNET_break (pos != NULL);
975 if (NULL != pos)
1027 { 976 {
1028 pos = reservations; 977 GNUNET_break (pos->entries > 0);
1029 while ( (NULL != pos) && 978 GNUNET_break (pos->amount >= size);
1030 (rid != pos->rid) ) 979 pos->entries--;
1031 pos = pos->next; 980 pos->amount -= size;
1032 GNUNET_break (pos != NULL); 981 reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1033 if (NULL != pos) 982 GNUNET_STATISTICS_set (stats,
1034 { 983 gettext_noop ("# reserved"), reserved, GNUNET_NO);
1035 GNUNET_break (pos->entries > 0);
1036 GNUNET_break (pos->amount >= size);
1037 pos->entries--;
1038 pos->amount -= size;
1039 reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1040 GNUNET_STATISTICS_set (stats,
1041 gettext_noop ("# reserved"),
1042 reserved,
1043 GNUNET_NO);
1044 }
1045 }
1046 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1047 &dm->key))
1048 {
1049 GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1050 pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1051 pc->client = client;
1052 GNUNET_SERVER_client_keep (client);
1053 memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1054 plugin->api->get_key (plugin->api->cls,
1055 0,
1056 &dm->key,
1057 &vhash,
1058 ntohl (dm->type),
1059 &check_present,
1060 pc);
1061 return;
1062 } 984 }
985 }
986 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
987 {
988 GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
989 pc = GNUNET_malloc (sizeof (struct PutContext) + size +
990 sizeof (struct DataMessage));
991 pc->client = client;
992 GNUNET_SERVER_client_keep (client);
993 memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
994 plugin->api->get_key (plugin->api->cls,
995 0,
996 &dm->key,
997 &vhash, ntohl (dm->type), &check_present, pc);
998 return;
999 }
1063 execute_put (client, dm); 1000 execute_put (client, dm);
1064} 1001}
1065 1002
@@ -1073,60 +1010,52 @@ handle_put (void *cls,
1073 */ 1010 */
1074static void 1011static void
1075handle_get (void *cls, 1012handle_get (void *cls,
1076 struct GNUNET_SERVER_Client *client, 1013 struct GNUNET_SERVER_Client *client,
1077 const struct GNUNET_MessageHeader *message) 1014 const struct GNUNET_MessageHeader *message)
1078{ 1015{
1079 const struct GetMessage *msg; 1016 const struct GetMessage *msg;
1080 uint16_t size; 1017 uint16_t size;
1081 1018
1082 size = ntohs(message->size); 1019 size = ntohs (message->size);
1083 if ( (size != sizeof(struct GetMessage)) && 1020 if ((size != sizeof (struct GetMessage)) &&
1084 (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) ) 1021 (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
1085 { 1022 {
1086 GNUNET_break (0); 1023 GNUNET_break (0);
1087 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1024 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1088 return; 1025 return;
1089 } 1026 }
1090 msg = (const struct GetMessage*) message; 1027 msg = (const struct GetMessage *) message;
1091#if DEBUG_DATASTORE 1028#if DEBUG_DATASTORE
1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093 "Processing `%s' request for `%s' of type %u\n", 1030 "Processing `%s' request for `%s' of type %u\n",
1094 "GET", 1031 "GET", GNUNET_h2s (&msg->key), ntohl (msg->type));
1095 GNUNET_h2s (&msg->key),
1096 ntohl (msg->type));
1097#endif 1032#endif
1098 GNUNET_STATISTICS_update (stats, 1033 GNUNET_STATISTICS_update (stats,
1099 gettext_noop ("# GET requests received"), 1034 gettext_noop ("# GET requests received"),
1100 1, 1035 1, GNUNET_NO);
1101 GNUNET_NO);
1102 GNUNET_SERVER_client_keep (client); 1036 GNUNET_SERVER_client_keep (client);
1103 if ( (size == sizeof(struct GetMessage)) && 1037 if ((size == sizeof (struct GetMessage)) &&
1104 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, 1038 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1105 &msg->key)) ) 1039 {
1106 { 1040 /* don't bother database... */
1107 /* don't bother database... */
1108#if DEBUG_DATASTORE 1041#if DEBUG_DATASTORE
1109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110 "Empty result set for `%s' request for `%s' (bloomfilter).\n", 1043 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1111 "GET", 1044 "GET", GNUNET_h2s (&msg->key));
1112 GNUNET_h2s (&msg->key)); 1045#endif
1113#endif 1046 GNUNET_STATISTICS_update (stats,
1114 GNUNET_STATISTICS_update (stats, 1047 gettext_noop
1115 gettext_noop ("# requests filtered by bloomfilter"), 1048 ("# requests filtered by bloomfilter"), 1,
1116 1, 1049 GNUNET_NO);
1117 GNUNET_NO); 1050 transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1118 transmit_item (client, 1051 0);
1119 NULL, 0, NULL, 0, 0, 0, 1052 return;
1120 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1053 }
1121 return;
1122 }
1123 plugin->api->get_key (plugin->api->cls, 1054 plugin->api->get_key (plugin->api->cls,
1124 GNUNET_ntohll (msg->offset), 1055 GNUNET_ntohll (msg->offset),
1125 ((size == sizeof(struct GetMessage)) ? &msg->key : NULL), 1056 ((size ==
1126 NULL, 1057 sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1127 ntohl(msg->type), 1058 ntohl (msg->type), &transmit_item, client);
1128 &transmit_item,
1129 client);
1130} 1059}
1131 1060
1132 1061
@@ -1139,30 +1068,28 @@ handle_get (void *cls,
1139 */ 1068 */
1140static void 1069static void
1141handle_update (void *cls, 1070handle_update (void *cls,
1142 struct GNUNET_SERVER_Client *client, 1071 struct GNUNET_SERVER_Client *client,
1143 const struct GNUNET_MessageHeader *message) 1072 const struct GNUNET_MessageHeader *message)
1144{ 1073{
1145 const struct UpdateMessage *msg; 1074 const struct UpdateMessage *msg;
1146 int ret; 1075 int ret;
1147 char *emsg; 1076 char *emsg;
1148 1077
1149 GNUNET_STATISTICS_update (stats, 1078 GNUNET_STATISTICS_update (stats,
1150 gettext_noop ("# UPDATE requests received"), 1079 gettext_noop ("# UPDATE requests received"),
1151 1, 1080 1, GNUNET_NO);
1152 GNUNET_NO); 1081 msg = (const struct UpdateMessage *) message;
1153 msg = (const struct UpdateMessage*) message;
1154 emsg = NULL; 1082 emsg = NULL;
1155#if DEBUG_DATASTORE 1083#if DEBUG_DATASTORE
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "Processing `%s' request for %llu\n", 1085 "Processing `%s' request for %llu\n",
1158 "UPDATE", 1086 "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1159 (unsigned long long) GNUNET_ntohll (msg->uid));
1160#endif 1087#endif
1161 ret = plugin->api->update (plugin->api->cls, 1088 ret = plugin->api->update (plugin->api->cls,
1162 GNUNET_ntohll(msg->uid), 1089 GNUNET_ntohll (msg->uid),
1163 (int32_t) ntohl(msg->priority), 1090 (int32_t) ntohl (msg->priority),
1164 GNUNET_TIME_absolute_ntoh(msg->expiration), 1091 GNUNET_TIME_absolute_ntoh (msg->expiration),
1165 &emsg); 1092 &emsg);
1166 transmit_status (client, ret, emsg); 1093 transmit_status (client, ret, emsg);
1167 GNUNET_free_non_null (emsg); 1094 GNUNET_free_non_null (emsg);
1168} 1095}
@@ -1177,22 +1104,19 @@ handle_update (void *cls,
1177 */ 1104 */
1178static void 1105static void
1179handle_get_replication (void *cls, 1106handle_get_replication (void *cls,
1180 struct GNUNET_SERVER_Client *client, 1107 struct GNUNET_SERVER_Client *client,
1181 const struct GNUNET_MessageHeader *message) 1108 const struct GNUNET_MessageHeader *message)
1182{ 1109{
1183#if DEBUG_DATASTORE 1110#if DEBUG_DATASTORE
1184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185 "Processing `%s' request\n", 1112 "Processing `%s' request\n", "GET_REPLICATION");
1186 "GET_REPLICATION");
1187#endif 1113#endif
1188 GNUNET_STATISTICS_update (stats, 1114 GNUNET_STATISTICS_update (stats,
1189 gettext_noop ("# GET REPLICATION requests received"), 1115 gettext_noop
1190 1, 1116 ("# GET REPLICATION requests received"), 1,
1191 GNUNET_NO); 1117 GNUNET_NO);
1192 GNUNET_SERVER_client_keep (client); 1118 GNUNET_SERVER_client_keep (client);
1193 plugin->api->get_replication (plugin->api->cls, 1119 plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1194 &transmit_item,
1195 client);
1196} 1120}
1197 1121
1198 1122
@@ -1205,34 +1129,32 @@ handle_get_replication (void *cls,
1205 */ 1129 */
1206static void 1130static void
1207handle_get_zero_anonymity (void *cls, 1131handle_get_zero_anonymity (void *cls,
1208 struct GNUNET_SERVER_Client *client, 1132 struct GNUNET_SERVER_Client *client,
1209 const struct GNUNET_MessageHeader *message) 1133 const struct GNUNET_MessageHeader *message)
1210{ 1134{
1211 const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message; 1135 const struct GetZeroAnonymityMessage *msg =
1136 (const struct GetZeroAnonymityMessage *) message;
1212 enum GNUNET_BLOCK_Type type; 1137 enum GNUNET_BLOCK_Type type;
1213 1138
1214 type = (enum GNUNET_BLOCK_Type) ntohl (msg->type); 1139 type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1215 if (type == GNUNET_BLOCK_TYPE_ANY) 1140 if (type == GNUNET_BLOCK_TYPE_ANY)
1216 { 1141 {
1217 GNUNET_break (0); 1142 GNUNET_break (0);
1218 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1143 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1219 return; 1144 return;
1220 } 1145 }
1221#if DEBUG_DATASTORE 1146#if DEBUG_DATASTORE
1222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223 "Processing `%s' request\n", 1148 "Processing `%s' request\n", "GET_ZERO_ANONYMITY");
1224 "GET_ZERO_ANONYMITY");
1225#endif 1149#endif
1226 GNUNET_STATISTICS_update (stats, 1150 GNUNET_STATISTICS_update (stats,
1227 gettext_noop ("# GET ZERO ANONYMITY requests received"), 1151 gettext_noop
1228 1, 1152 ("# GET ZERO ANONYMITY requests received"), 1,
1229 GNUNET_NO); 1153 GNUNET_NO);
1230 GNUNET_SERVER_client_keep (client); 1154 GNUNET_SERVER_client_keep (client);
1231 plugin->api->get_zero_anonymity (plugin->api->cls, 1155 plugin->api->get_zero_anonymity (plugin->api->cls,
1232 GNUNET_ntohll (msg->offset), 1156 GNUNET_ntohll (msg->offset),
1233 type, 1157 type, &transmit_item, client);
1234 &transmit_item,
1235 client);
1236} 1158}
1237 1159
1238 1160
@@ -1242,43 +1164,36 @@ handle_get_zero_anonymity (void *cls,
1242 */ 1164 */
1243static int 1165static int
1244remove_callback (void *cls, 1166remove_callback (void *cls,
1245 const GNUNET_HashCode * key, 1167 const GNUNET_HashCode * key,
1246 uint32_t size, 1168 uint32_t size,
1247 const void *data, 1169 const void *data,
1248 enum GNUNET_BLOCK_Type type, 1170 enum GNUNET_BLOCK_Type type,
1249 uint32_t priority, 1171 uint32_t priority,
1250 uint32_t anonymity, 1172 uint32_t anonymity,
1251 struct GNUNET_TIME_Absolute 1173 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1252 expiration, uint64_t uid)
1253{ 1174{
1254 struct GNUNET_SERVER_Client *client = cls; 1175 struct GNUNET_SERVER_Client *client = cls;
1255 1176
1256 if (key == NULL) 1177 if (key == NULL)
1257 { 1178 {
1258#if DEBUG_DATASTORE 1179#if DEBUG_DATASTORE
1259 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260 "No further matches for `%s' request.\n", 1181 "No further matches for `%s' request.\n", "REMOVE");
1261 "REMOVE"); 1182#endif
1262#endif 1183 transmit_status (client, GNUNET_NO, _("Content not found"));
1263 transmit_status (client, GNUNET_NO, _("Content not found")); 1184 GNUNET_SERVER_client_drop (client);
1264 GNUNET_SERVER_client_drop (client); 1185 return GNUNET_OK; /* last item */
1265 return GNUNET_OK; /* last item */ 1186 }
1266 }
1267#if DEBUG_DATASTORE 1187#if DEBUG_DATASTORE
1268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1188 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269 "Item %llu matches `%s' request for key `%s' and type %u.\n", 1189 "Item %llu matches `%s' request for key `%s' and type %u.\n",
1270 (unsigned long long) uid, 1190 (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1271 "REMOVE", 1191#endif
1272 GNUNET_h2s (key),
1273 type);
1274#endif
1275 GNUNET_STATISTICS_update (stats, 1192 GNUNET_STATISTICS_update (stats,
1276 gettext_noop ("# bytes removed (explicit request)"), 1193 gettext_noop ("# bytes removed (explicit request)"),
1277 size, 1194 size, GNUNET_YES);
1278 GNUNET_YES); 1195 GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1279 GNUNET_CONTAINER_bloomfilter_remove (filter, 1196 transmit_status (client, GNUNET_OK, NULL);
1280 key);
1281 transmit_status (client, GNUNET_OK, NULL);
1282 GNUNET_SERVER_client_drop (client); 1197 GNUNET_SERVER_client_drop (client);
1283 return GNUNET_NO; 1198 return GNUNET_NO;
1284} 1199}
@@ -1293,40 +1208,34 @@ remove_callback (void *cls,
1293 */ 1208 */
1294static void 1209static void
1295handle_remove (void *cls, 1210handle_remove (void *cls,
1296 struct GNUNET_SERVER_Client *client, 1211 struct GNUNET_SERVER_Client *client,
1297 const struct GNUNET_MessageHeader *message) 1212 const struct GNUNET_MessageHeader *message)
1298{ 1213{
1299 const struct DataMessage *dm = check_data (message); 1214 const struct DataMessage *dm = check_data (message);
1300 GNUNET_HashCode vhash; 1215 GNUNET_HashCode vhash;
1301 1216
1302 if (dm == NULL) 1217 if (dm == NULL)
1303 { 1218 {
1304 GNUNET_break (0); 1219 GNUNET_break (0);
1305 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1220 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1306 return; 1221 return;
1307 } 1222 }
1308#if DEBUG_DATASTORE 1223#if DEBUG_DATASTORE
1309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310 "Processing `%s' request for `%s' of type %u\n", 1225 "Processing `%s' request for `%s' of type %u\n",
1311 "REMOVE", 1226 "REMOVE", GNUNET_h2s (&dm->key), ntohl (dm->type));
1312 GNUNET_h2s (&dm->key),
1313 ntohl (dm->type));
1314#endif 1227#endif
1315 GNUNET_STATISTICS_update (stats, 1228 GNUNET_STATISTICS_update (stats,
1316 gettext_noop ("# REMOVE requests received"), 1229 gettext_noop ("# REMOVE requests received"),
1317 1, 1230 1, GNUNET_NO);
1318 GNUNET_NO);
1319 GNUNET_SERVER_client_keep (client); 1231 GNUNET_SERVER_client_keep (client);
1320 GNUNET_CRYPTO_hash (&dm[1], 1232 GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1321 ntohl(dm->size),
1322 &vhash);
1323 plugin->api->get_key (plugin->api->cls, 1233 plugin->api->get_key (plugin->api->cls,
1324 0, 1234 0,
1325 &dm->key, 1235 &dm->key,
1326 &vhash, 1236 &vhash,
1327 (enum GNUNET_BLOCK_Type) ntohl(dm->type), 1237 (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1328 &remove_callback, 1238 &remove_callback, client);
1329 client);
1330} 1239}
1331 1240
1332 1241
@@ -1339,13 +1248,11 @@ handle_remove (void *cls,
1339 */ 1248 */
1340static void 1249static void
1341handle_drop (void *cls, 1250handle_drop (void *cls,
1342 struct GNUNET_SERVER_Client *client, 1251 struct GNUNET_SERVER_Client *client,
1343 const struct GNUNET_MessageHeader *message) 1252 const struct GNUNET_MessageHeader *message)
1344{ 1253{
1345#if DEBUG_DATASTORE 1254#if DEBUG_DATASTORE
1346 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1347 "Processing `%s' request\n",
1348 "DROP");
1349#endif 1256#endif
1350 do_drop = GNUNET_YES; 1257 do_drop = GNUNET_YES;
1351 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1258 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1361,20 +1268,18 @@ handle_drop (void *cls,
1361 * 0 for "reset to empty" 1268 * 0 for "reset to empty"
1362 */ 1269 */
1363static void 1270static void
1364disk_utilization_change_cb (void *cls, 1271disk_utilization_change_cb (void *cls, int delta)
1365 int delta)
1366{ 1272{
1367 if ( (delta < 0) && 1273 if ((delta < 0) && (payload < -delta))
1368 (payload < -delta) ) 1274 {
1369 { 1275 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1370 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1276 _
1371 _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"), 1277 ("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
1372 (long long) payload, 1278 (long long) payload, (long long) -delta);
1373 (long long) -delta); 1279 payload = plugin->api->estimate_size (plugin->api->cls);
1374 payload = plugin->api->estimate_size (plugin->api->cls); 1280 sync_stats ();
1375 sync_stats (); 1281 return;
1376 return; 1282 }
1377 }
1378 payload += delta; 1283 payload += delta;
1379 lastSync++; 1284 lastSync++;
1380 if (lastSync >= MAX_STAT_SYNC_LAG) 1285 if (lastSync >= MAX_STAT_SYNC_LAG)
@@ -1394,32 +1299,28 @@ disk_utilization_change_cb (void *cls,
1394 */ 1299 */
1395static int 1300static int
1396process_stat_in (void *cls, 1301process_stat_in (void *cls,
1397 const char *subsystem, 1302 const char *subsystem,
1398 const char *name, 1303 const char *name, uint64_t value, int is_persistent)
1399 uint64_t value,
1400 int is_persistent)
1401{ 1304{
1402 GNUNET_assert (stats_worked == GNUNET_NO); 1305 GNUNET_assert (stats_worked == GNUNET_NO);
1403 stats_worked = GNUNET_YES; 1306 stats_worked = GNUNET_YES;
1404 payload += value; 1307 payload += value;
1405#if DEBUG_SQLITE 1308#if DEBUG_SQLITE
1406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407 "Notification from statistics about existing payload (%llu), new payload is %llu\n", 1310 "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1408 abs_value, 1311 abs_value, payload);
1409 payload);
1410#endif 1312#endif
1411 return GNUNET_OK; 1313 return GNUNET_OK;
1412} 1314}
1413 1315
1414 1316
1415static void 1317static void
1416process_stat_done (void *cls, 1318process_stat_done (void *cls, int success)
1417 int success)
1418{ 1319{
1419 struct DatastorePlugin *plugin = cls; 1320 struct DatastorePlugin *plugin = cls;
1420 1321
1421 stat_get = NULL; 1322 stat_get = NULL;
1422 if (stats_worked == GNUNET_NO) 1323 if (stats_worked == GNUNET_NO)
1423 payload = plugin->api->estimate_size (plugin->api->cls); 1324 payload = plugin->api->estimate_size (plugin->api->cls);
1424} 1325}
1425 1326
@@ -1428,7 +1329,7 @@ process_stat_done (void *cls,
1428 * Load the datastore plugin. 1329 * Load the datastore plugin.
1429 */ 1330 */
1430static struct DatastorePlugin * 1331static struct DatastorePlugin *
1431load_plugin () 1332load_plugin ()
1432{ 1333{
1433 struct DatastorePlugin *ret; 1334 struct DatastorePlugin *ret;
1434 char *libname; 1335 char *libname;
@@ -1437,14 +1338,13 @@ load_plugin ()
1437 if (GNUNET_OK != 1338 if (GNUNET_OK !=
1438 GNUNET_CONFIGURATION_get_value_string (cfg, 1339 GNUNET_CONFIGURATION_get_value_string (cfg,
1439 "DATASTORE", "DATABASE", &name)) 1340 "DATASTORE", "DATABASE", &name))
1440 { 1341 {
1441 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1342 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1442 _("No `%s' specified for `%s' in configuration!\n"), 1343 _("No `%s' specified for `%s' in configuration!\n"),
1443 "DATABASE", 1344 "DATABASE", "DATASTORE");
1444 "DATASTORE"); 1345 return NULL;
1445 return NULL; 1346 }
1446 } 1347 ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1447 ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1448 ret->env.cfg = cfg; 1348 ret->env.cfg = cfg;
1449 ret->env.duc = &disk_utilization_change_cb; 1349 ret->env.duc = &disk_utilization_change_cb;
1450 ret->env.cls = NULL; 1350 ret->env.cls = NULL;
@@ -1455,14 +1355,14 @@ load_plugin ()
1455 ret->lib_name = libname; 1355 ret->lib_name = libname;
1456 ret->api = GNUNET_PLUGIN_load (libname, &ret->env); 1356 ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1457 if (ret->api == NULL) 1357 if (ret->api == NULL)
1458 { 1358 {
1459 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1359 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1460 _("Failed to load datastore plugin for `%s'\n"), name); 1360 _("Failed to load datastore plugin for `%s'\n"), name);
1461 GNUNET_free (ret->short_name); 1361 GNUNET_free (ret->short_name);
1462 GNUNET_free (libname); 1362 GNUNET_free (libname);
1463 GNUNET_free (ret); 1363 GNUNET_free (ret);
1464 return NULL; 1364 return NULL;
1465 } 1365 }
1466 return ret; 1366 return ret;
1467} 1367}
1468 1368
@@ -1492,30 +1392,29 @@ unload_plugin (struct DatastorePlugin *plug)
1492 * statistics. 1392 * statistics.
1493 */ 1393 */
1494static void 1394static void
1495unload_task (void *cls, 1395unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1496 const struct GNUNET_SCHEDULER_TaskContext *tc)
1497{ 1396{
1498 if (GNUNET_YES == do_drop) 1397 if (GNUNET_YES == do_drop)
1499 plugin->api->drop (plugin->api->cls); 1398 plugin->api->drop (plugin->api->cls);
1500 unload_plugin (plugin); 1399 unload_plugin (plugin);
1501 plugin = NULL; 1400 plugin = NULL;
1502 if (filter != NULL) 1401 if (filter != NULL)
1503 { 1402 {
1504 GNUNET_CONTAINER_bloomfilter_free (filter); 1403 GNUNET_CONTAINER_bloomfilter_free (filter);
1505 filter = NULL; 1404 filter = NULL;
1506 } 1405 }
1507 if (lastSync > 0) 1406 if (lastSync > 0)
1508 sync_stats (); 1407 sync_stats ();
1509 if (stat_get != NULL) 1408 if (stat_get != NULL)
1510 { 1409 {
1511 GNUNET_STATISTICS_get_cancel (stat_get); 1410 GNUNET_STATISTICS_get_cancel (stat_get);
1512 stat_get = NULL; 1411 stat_get = NULL;
1513 } 1412 }
1514 if (stats != NULL) 1413 if (stats != NULL)
1515 { 1414 {
1516 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 1415 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1517 stats = NULL; 1416 stats = NULL;
1518 } 1417 }
1519} 1418}
1520 1419
1521 1420
@@ -1524,33 +1423,29 @@ unload_task (void *cls,
1524 * the transport and core. 1423 * the transport and core.
1525 */ 1424 */
1526static void 1425static void
1527cleaning_task (void *cls, 1426cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1528 const struct GNUNET_SCHEDULER_TaskContext *tc)
1529{ 1427{
1530 struct TransmitCallbackContext *tcc; 1428 struct TransmitCallbackContext *tcc;
1531 1429
1532 cleaning_done = GNUNET_YES; 1430 cleaning_done = GNUNET_YES;
1533 while (NULL != (tcc = tcc_head)) 1431 while (NULL != (tcc = tcc_head))
1432 {
1433 GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1434 if (tcc->th != NULL)
1534 { 1435 {
1535 GNUNET_CONTAINER_DLL_remove (tcc_head, 1436 GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1536 tcc_tail, 1437 GNUNET_SERVER_client_drop (tcc->client);
1537 tcc);
1538 if (tcc->th != NULL)
1539 {
1540 GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1541 GNUNET_SERVER_client_drop (tcc->client);
1542 }
1543 GNUNET_free (tcc->msg);
1544 GNUNET_free (tcc);
1545 } 1438 }
1439 GNUNET_free (tcc->msg);
1440 GNUNET_free (tcc);
1441 }
1546 if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK) 1442 if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1547 { 1443 {
1548 GNUNET_SCHEDULER_cancel (expired_kill_task); 1444 GNUNET_SCHEDULER_cancel (expired_kill_task);
1549 expired_kill_task = GNUNET_SCHEDULER_NO_TASK; 1445 expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1550 } 1446 }
1551 GNUNET_SCHEDULER_add_continuation (&unload_task, 1447 GNUNET_SCHEDULER_add_continuation (&unload_task,
1552 NULL, 1448 NULL, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1553 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1554} 1449}
1555 1450
1556 1451
@@ -1563,8 +1458,7 @@ cleaning_task (void *cls,
1563 * @param client identification of the client 1458 * @param client identification of the client
1564 */ 1459 */
1565static void 1460static void
1566cleanup_reservations (void *cls, 1461cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1567 struct GNUNET_SERVER_Client *client)
1568{ 1462{
1569 struct ReservationList *pos; 1463 struct ReservationList *pos;
1570 struct ReservationList *prev; 1464 struct ReservationList *prev;
@@ -1575,27 +1469,25 @@ cleanup_reservations (void *cls,
1575 prev = NULL; 1469 prev = NULL;
1576 pos = reservations; 1470 pos = reservations;
1577 while (NULL != pos) 1471 while (NULL != pos)
1472 {
1473 next = pos->next;
1474 if (pos->client == client)
1578 { 1475 {
1579 next = pos->next; 1476 if (prev == NULL)
1580 if (pos->client == client) 1477 reservations = next;
1581 {
1582 if (prev == NULL)
1583 reservations = next;
1584 else
1585 prev->next = next;
1586 reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1587 GNUNET_free (pos);
1588 }
1589 else 1478 else
1590 { 1479 prev->next = next;
1591 prev = pos; 1480 reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1592 } 1481 GNUNET_free (pos);
1593 pos = next; 1482 }
1483 else
1484 {
1485 prev = pos;
1594 } 1486 }
1487 pos = next;
1488 }
1595 GNUNET_STATISTICS_set (stats, 1489 GNUNET_STATISTICS_set (stats,
1596 gettext_noop ("# reserved"), 1490 gettext_noop ("# reserved"), reserved, GNUNET_NO);
1597 reserved,
1598 GNUNET_NO);
1599} 1491}
1600 1492
1601 1493
@@ -1612,21 +1504,24 @@ run (void *cls,
1612 const struct GNUNET_CONFIGURATION_Handle *c) 1504 const struct GNUNET_CONFIGURATION_Handle *c)
1613{ 1505{
1614 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 1506 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1615 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 1507 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1616 sizeof(struct ReserveMessage) }, 1508 sizeof (struct ReserveMessage)},
1617 {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 1509 {&handle_release_reserve, NULL,
1618 sizeof(struct ReleaseReserveMessage) }, 1510 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1619 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 1511 sizeof (struct ReleaseReserveMessage)},
1620 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 1512 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1621 sizeof (struct UpdateMessage) }, 1513 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1622 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 1514 sizeof (struct UpdateMessage)},
1623 {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 1515 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1624 sizeof(struct GNUNET_MessageHeader) }, 1516 {&handle_get_replication, NULL,
1625 {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 1517 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1626 sizeof(struct GetZeroAnonymityMessage) }, 1518 sizeof (struct GNUNET_MessageHeader)},
1627 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 1519 {&handle_get_zero_anonymity, NULL,
1628 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 1520 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1629 sizeof(struct GNUNET_MessageHeader) }, 1521 sizeof (struct GetZeroAnonymityMessage)},
1522 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1523 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1524 sizeof (struct GNUNET_MessageHeader)},
1630 {NULL, NULL, 0, 0} 1525 {NULL, NULL, 0, 0}
1631 }; 1526 };
1632 char *fn; 1527 char *fn;
@@ -1634,81 +1529,72 @@ run (void *cls,
1634 1529
1635 cfg = c; 1530 cfg = c;
1636 if (GNUNET_OK != 1531 if (GNUNET_OK !=
1637 GNUNET_CONFIGURATION_get_value_number (cfg, 1532 GNUNET_CONFIGURATION_get_value_number (cfg, "DATASTORE", "QUOTA", &quota))
1638 "DATASTORE", "QUOTA", &quota)) 1533 {
1639 { 1534 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1640 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1535 _("No `%s' specified for `%s' in configuration!\n"),
1641 _("No `%s' specified for `%s' in configuration!\n"), 1536 "QUOTA", "DATASTORE");
1642 "QUOTA", 1537 return;
1643 "DATASTORE"); 1538 }
1644 return;
1645 }
1646 stats = GNUNET_STATISTICS_create ("datastore", cfg); 1539 stats = GNUNET_STATISTICS_create ("datastore", cfg);
1540 GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1541 cache_size = quota / 8; /* Or should we make this an option? */
1647 GNUNET_STATISTICS_set (stats, 1542 GNUNET_STATISTICS_set (stats,
1648 gettext_noop ("# quota"), 1543 gettext_noop ("# cache size"), cache_size, GNUNET_NO);
1649 quota, 1544 bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1650 GNUNET_NO);
1651 cache_size = quota / 8; /* Or should we make this an option? */
1652 GNUNET_STATISTICS_set (stats,
1653 gettext_noop ("# cache size"),
1654 cache_size,
1655 GNUNET_NO);
1656 bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1657 fn = NULL; 1545 fn = NULL;
1658 if ( (GNUNET_OK != 1546 if ((GNUNET_OK !=
1659 GNUNET_CONFIGURATION_get_value_filename (cfg, 1547 GNUNET_CONFIGURATION_get_value_filename (cfg,
1660 "DATASTORE", 1548 "DATASTORE",
1661 "BLOOMFILTER", 1549 "BLOOMFILTER",
1662 &fn)) || 1550 &fn)) ||
1663 (GNUNET_OK != 1551 (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1664 GNUNET_DISK_directory_create_for_file (fn)) ) 1552 {
1665 { 1553 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1666 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1554 _("Could not use specified filename `%s' for bloomfilter.\n"),
1667 _("Could not use specified filename `%s' for bloomfilter.\n"), 1555 fn != NULL ? fn : "");
1668 fn != NULL ? fn : ""); 1556 GNUNET_free_non_null (fn);
1669 GNUNET_free_non_null (fn); 1557 fn = NULL;
1670 fn = NULL; 1558 }
1671 }
1672 if (fn != NULL) 1559 if (fn != NULL)
1673 filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5); /* approx. 3% false positives at max use */ 1560 filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5); /* approx. 3% false positives at max use */
1674 else 1561 else
1675 filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5); /* approx. 3% false positives at max use */ 1562 filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5); /* approx. 3% false positives at max use */
1676 GNUNET_free_non_null (fn); 1563 GNUNET_free_non_null (fn);
1677 if (filter == NULL) 1564 if (filter == NULL)
1565 {
1566 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1567 _("Failed to initialize bloomfilter.\n"));
1568 if (stats != NULL)
1678 { 1569 {
1679 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1570 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1680 _("Failed to initialize bloomfilter.\n")); 1571 stats = NULL;
1681 if (stats != NULL)
1682 {
1683 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1684 stats = NULL;
1685 }
1686 return;
1687 } 1572 }
1573 return;
1574 }
1688 plugin = load_plugin (); 1575 plugin = load_plugin ();
1689 if (NULL == plugin) 1576 if (NULL == plugin)
1577 {
1578 GNUNET_CONTAINER_bloomfilter_free (filter);
1579 filter = NULL;
1580 if (stats != NULL)
1690 { 1581 {
1691 GNUNET_CONTAINER_bloomfilter_free (filter); 1582 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1692 filter = NULL; 1583 stats = NULL;
1693 if (stats != NULL)
1694 {
1695 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1696 stats = NULL;
1697 }
1698 return;
1699 } 1584 }
1585 return;
1586 }
1700 stat_get = GNUNET_STATISTICS_get (stats, 1587 stat_get = GNUNET_STATISTICS_get (stats,
1701 "datastore", 1588 "datastore",
1702 QUOTA_STAT_NAME, 1589 QUOTA_STAT_NAME,
1703 GNUNET_TIME_UNIT_SECONDS, 1590 GNUNET_TIME_UNIT_SECONDS,
1704 &process_stat_done, 1591 &process_stat_done,
1705 &process_stat_in, 1592 &process_stat_in, plugin);
1706 plugin);
1707 GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL); 1593 GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1708 GNUNET_SERVER_add_handlers (server, handlers); 1594 GNUNET_SERVER_add_handlers (server, handlers);
1709 expired_kill_task 1595 expired_kill_task
1710 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, 1596 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1711 &delete_expired, NULL); 1597 &delete_expired, NULL);
1712 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 1598 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1713 &cleaning_task, NULL); 1599 &cleaning_task, NULL);
1714} 1600}
@@ -1730,8 +1616,7 @@ main (int argc, char *const *argv)
1730 GNUNET_SERVICE_run (argc, 1616 GNUNET_SERVICE_run (argc,
1731 argv, 1617 argv,
1732 "datastore", 1618 "datastore",
1733 GNUNET_SERVICE_OPTION_NONE, 1619 GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
1734 &run, NULL)) ? 0 : 1;
1735 return ret; 1620 return ret;
1736} 1621}
1737 1622