aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_mysql.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_mysql.c')
-rw-r--r--src/datastore/plugin_datastore_mysql.c454
1 files changed, 280 insertions, 174 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index bae40d17d..0ae5c1a2e 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -180,7 +180,7 @@ struct Plugin
180#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?" 180#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
181 struct GNUNET_MYSQL_StatementHandle *dec_repl; 181 struct GNUNET_MYSQL_StatementHandle *dec_repl;
182 182
183#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" 183#define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
184 struct GNUNET_MYSQL_StatementHandle *get_size; 184 struct GNUNET_MYSQL_StatementHandle *get_size;
185 185
186#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\ 186#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
@@ -221,23 +221,22 @@ struct Plugin
221 * 221 *
222 * @param plugin plugin context 222 * @param plugin plugin context
223 * @param uid unique ID of the entry to delete 223 * @param uid unique ID of the entry to delete
224 * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error 224 * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
225 */ 225 */
226static int 226static int
227do_delete_entry (struct Plugin *plugin, unsigned long long uid) 227do_delete_entry (struct Plugin *plugin,
228 unsigned long long uid)
228{ 229{
229 int ret; 230 int ret;
230 uint64_t uid64 = (uint64_t) uid; 231 uint64_t uid64 = (uint64_t) uid;
231
232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
233 "Deleting value %llu from gn090 table\n",
234 uid);
235
236 struct GNUNET_MY_QueryParam params_delete[] = { 232 struct GNUNET_MY_QueryParam params_delete[] = {
237 GNUNET_MY_query_param_uint64 (&uid64), 233 GNUNET_MY_query_param_uint64 (&uid64),
238 GNUNET_MY_query_param_end 234 GNUNET_MY_query_param_end
239 }; 235 };
240 236
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "Deleting value %llu from gn090 table\n",
239 uid);
241 ret = GNUNET_MY_exec_prepared (plugin->mc, 240 ret = GNUNET_MY_exec_prepared (plugin->mc,
242 plugin->delete_entry_by_uid, 241 plugin->delete_entry_by_uid,
243 params_delete); 242 params_delete);
@@ -247,7 +246,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
247 } 246 }
248 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 247 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
249 "Deleting value %llu from gn090 table failed\n", 248 "Deleting value %llu from gn090 table failed\n",
250 uid); 249 (unsigned long long) uid);
251 return ret; 250 return ret;
252} 251}
253 252
@@ -256,7 +255,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
256 * Get an estimate of how much space the database is 255 * Get an estimate of how much space the database is
257 * currently using. 256 * currently using.
258 * 257 *
259 * @param cls our "struct Plugin *" 258 * @param cls our `struct Plugin *`
260 * @return number of bytes used on disk 259 * @return number of bytes used on disk
261 */ 260 */
262static void 261static void
@@ -266,26 +265,33 @@ mysql_plugin_estimate_size (void *cls,
266 struct Plugin *plugin = cls; 265 struct Plugin *plugin = cls;
267 uint64_t total; 266 uint64_t total;
268 int ret; 267 int ret;
269
270 struct GNUNET_MY_QueryParam params_get[] = { 268 struct GNUNET_MY_QueryParam params_get[] = {
271 GNUNET_MY_query_param_end 269 GNUNET_MY_query_param_end
272 }; 270 };
273
274 struct GNUNET_MY_ResultSpec results_get[] = { 271 struct GNUNET_MY_ResultSpec results_get[] = {
275 GNUNET_MY_result_spec_uint64 (&total), 272 GNUNET_MY_result_spec_uint64 (&total),
276 GNUNET_MY_result_spec_end 273 GNUNET_MY_result_spec_end
277 }; 274 };
278 275
279 ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get); 276 ret = GNUNET_MY_exec_prepared (plugin->mc,
280 if (GNUNET_OK == ret) 277 plugin->get_size,
281 { 278 params_get);
282 if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, results_get)) 279 *estimate = 0;
283 { 280 total = UINT64_MAX;
284 *estimate = (unsigned long long)total; 281 if ( (GNUNET_OK == ret) &&
285 } 282 (GNUNET_OK ==
286 } 283 GNUNET_MY_extract_result (plugin->get_size,
287 else 284 results_get)) )
288 *estimate = 0; 285 {
286 *estimate = (unsigned long long) total;
287 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
288 "Size estimate for MySQL payload is %lld\n",
289 (long long) total);
290 GNUNET_assert (UINT64_MAX != total);
291 GNUNET_break (GNUNET_NO ==
292 GNUNET_MY_extract_result (plugin->get_size,
293 NULL));
294 }
289} 295}
290 296
291 297
@@ -294,7 +300,7 @@ mysql_plugin_estimate_size (void *cls,
294 * 300 *
295 * @param cls closure 301 * @param cls closure
296 * @param key key for the item 302 * @param key key for the item
297 * @param size number of bytes in data 303 * @param size number of bytes in @a data
298 * @param data content stored 304 * @param data content stored
299 * @param type type of the content 305 * @param type type of the content
300 * @param priority priority of the content 306 * @param priority priority of the content
@@ -302,7 +308,7 @@ mysql_plugin_estimate_size (void *cls,
302 * @param replication replication-level for the content 308 * @param replication replication-level for the content
303 * @param expiration expiration time for the content 309 * @param expiration expiration time for the content
304 * @param cont continuation called with success or failure status 310 * @param cont continuation called with success or failure status
305 * @param cont_cls continuation closure 311 * @param cont_cls closure for @a cont
306 */ 312 */
307static void 313static void
308mysql_plugin_put (void *cls, 314mysql_plugin_put (void *cls,
@@ -318,12 +324,9 @@ mysql_plugin_put (void *cls,
318 void *cont_cls) 324 void *cont_cls)
319{ 325{
320 struct Plugin *plugin = cls; 326 struct Plugin *plugin = cls;
321
322 uint64_t lexpiration = expiration.abs_value_us; 327 uint64_t lexpiration = expiration.abs_value_us;
323 uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 328 uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
324 UINT64_MAX); 329 UINT64_MAX);
325 unsigned long lsize = 0;
326
327 struct GNUNET_HashCode vhash; 330 struct GNUNET_HashCode vhash;
328 struct GNUNET_MY_QueryParam params_insert[] = { 331 struct GNUNET_MY_QueryParam params_insert[] = {
329 GNUNET_MY_query_param_uint32 (&replication), 332 GNUNET_MY_query_param_uint32 (&replication),
@@ -334,7 +337,7 @@ mysql_plugin_put (void *cls,
334 GNUNET_MY_query_param_uint64 (&lrvalue), 337 GNUNET_MY_query_param_uint64 (&lrvalue),
335 GNUNET_MY_query_param_auto_from_type (key), 338 GNUNET_MY_query_param_auto_from_type (key),
336 GNUNET_MY_query_param_auto_from_type (&vhash), 339 GNUNET_MY_query_param_auto_from_type (&vhash),
337 GNUNET_MY_query_param_fixed_size (data, lsize), 340 GNUNET_MY_query_param_fixed_size (data, size),
338 GNUNET_MY_query_param_end 341 GNUNET_MY_query_param_end
339 }; 342 };
340 343
@@ -344,7 +347,6 @@ mysql_plugin_put (void *cls,
344 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large")); 347 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
345 return; 348 return;
346 } 349 }
347 lsize = size;
348 GNUNET_CRYPTO_hash (data, 350 GNUNET_CRYPTO_hash (data,
349 size, 351 size,
350 &vhash); 352 &vhash);
@@ -354,15 +356,28 @@ mysql_plugin_put (void *cls,
354 plugin->insert_entry, 356 plugin->insert_entry,
355 params_insert)) 357 params_insert))
356 { 358 {
357 cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure")); 359 cont (cont_cls,
360 key,
361 size,
362 GNUNET_SYSERR,
363 _("MySQL statement run failure"));
358 return; 364 return;
359 } 365 }
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361 "Inserted value `%s' with size %u into gn090 table\n", 367 "Inserted value `%s' with size %u into gn090 table\n",
362 GNUNET_h2s (key), (unsigned int) size); 368 GNUNET_h2s (key),
369 (unsigned int) size);
363 if (size > 0) 370 if (size > 0)
364 plugin->env->duc (plugin->env->cls, size); 371 plugin->env->duc (plugin->env->cls,
365 cont (cont_cls, key, size, GNUNET_OK, NULL); 372 size);
373 GNUNET_break (GNUNET_NO ==
374 GNUNET_MY_extract_result (plugin->insert_entry,
375 NULL));
376 cont (cont_cls,
377 key,
378 size,
379 GNUNET_OK,
380 NULL);
366} 381}
367 382
368 383
@@ -390,18 +405,22 @@ mysql_plugin_put (void *cls,
390 * @param cons_cls continuation closure 405 * @param cons_cls continuation closure
391 */ 406 */
392static void 407static void
393mysql_plugin_update (void *cls, uint64_t uid, int delta, 408mysql_plugin_update (void *cls,
409 uint64_t uid,
410 int delta,
394 struct GNUNET_TIME_Absolute expire, 411 struct GNUNET_TIME_Absolute expire,
395 PluginUpdateCont cont, void *cont_cls) 412 PluginUpdateCont cont,
413 void *cont_cls)
396{ 414{
397 struct Plugin *plugin = cls; 415 struct Plugin *plugin = cls;
398 uint32_t idelta = (uint32_t)delta; 416 uint32_t idelta = (uint32_t) delta;
399 uint64_t lexpire = expire.abs_value_us; 417 uint64_t lexpire = expire.abs_value_us;
400 int ret; 418 int ret;
401 419
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Updating value %llu adding %d to priority and maxing exp at %s\n", 421 "Updating value %llu adding %d to priority and maxing exp at %s\n",
404 (unsigned long long)uid, delta, 422 (unsigned long long) uid,
423 delta,
405 GNUNET_STRINGS_absolute_time_to_string (expire)); 424 GNUNET_STRINGS_absolute_time_to_string (expire));
406 425
407 struct GNUNET_MY_QueryParam params_update[] = { 426 struct GNUNET_MY_QueryParam params_update[] = {
@@ -416,12 +435,21 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
416 plugin->update_entry, 435 plugin->update_entry,
417 params_update); 436 params_update);
418 437
419 if (ret != GNUNET_OK) 438 if (GNUNET_OK != ret)
420 { 439 {
421 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", 440 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
441 "Failed to update value %llu\n",
422 (unsigned long long) uid); 442 (unsigned long long) uid);
423 } 443 }
424 cont (cont_cls, ret, NULL); 444 else
445 {
446 GNUNET_break (GNUNET_NO ==
447 GNUNET_MY_extract_result (plugin->update_entry,
448 NULL));
449 }
450 cont (cont_cls,
451 ret,
452 NULL);
425} 453}
426 454
427 455
@@ -432,7 +460,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
432 * @param plugin the plugin handle 460 * @param plugin the plugin handle
433 * @param stmt select statement to run 461 * @param stmt select statement to run
434 * @param proc function to call on result 462 * @param proc function to call on result
435 * @param proc_cls closure for proc 463 * @param proc_cls closure for @a proc
436 * @param params_select arguments to initialize stmt 464 * @param params_select arguments to initialize stmt
437 */ 465 */
438static void 466static void
@@ -474,7 +502,7 @@ execute_select (struct Plugin *plugin,
474 502
475 ret = GNUNET_MY_extract_result (stmt, 503 ret = GNUNET_MY_extract_result (stmt,
476 results_select); 504 results_select);
477 if (ret <= 0) 505 if (GNUNET_OK != ret)
478 { 506 {
479 proc (proc_cls, 507 proc (proc_cls,
480 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 508 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
@@ -489,6 +517,9 @@ execute_select (struct Plugin *plugin,
489 (unsigned int) anonymity, 517 (unsigned int) anonymity,
490 GNUNET_STRINGS_absolute_time_to_string (expiration)); 518 GNUNET_STRINGS_absolute_time_to_string (expiration));
491 GNUNET_assert (value_size < MAX_DATUM_SIZE); 519 GNUNET_assert (value_size < MAX_DATUM_SIZE);
520 GNUNET_break (GNUNET_NO ==
521 GNUNET_MY_extract_result (stmt,
522 NULL));
492 ret = proc (proc_cls, 523 ret = proc (proc_cls,
493 &key, 524 &key,
494 value_size, 525 value_size,
@@ -498,7 +529,8 @@ execute_select (struct Plugin *plugin,
498 anonymity, 529 anonymity,
499 expiration, 530 expiration,
500 uid); 531 uid);
501 if (ret == GNUNET_NO) 532 GNUNET_MY_cleanup_result (results_select);
533 if (GNUNET_NO == ret)
502 { 534 {
503 do_delete_entry (plugin, uid); 535 do_delete_entry (plugin, uid);
504 if (0 != value_size) 536 if (0 != value_size)
@@ -538,16 +570,15 @@ mysql_plugin_get_key (void *cls,
538 struct Plugin *plugin = cls; 570 struct Plugin *plugin = cls;
539 int ret; 571 int ret;
540 uint64_t total; 572 uint64_t total;
541
542 total = -1;
543 struct GNUNET_MY_ResultSpec results_get[] = { 573 struct GNUNET_MY_ResultSpec results_get[] = {
544 GNUNET_MY_result_spec_uint64 (&total), 574 GNUNET_MY_result_spec_uint64 (&total),
545 GNUNET_MY_result_spec_end 575 GNUNET_MY_result_spec_end
546 }; 576 };
547 577
548 if (type != 0) 578 total = UINT64_MAX;
579 if (0 != type)
549 { 580 {
550 if (vhash != NULL) 581 if (NULL != vhash)
551 { 582 {
552 struct GNUNET_MY_QueryParam params_get[] = { 583 struct GNUNET_MY_QueryParam params_get[] = {
553 GNUNET_MY_query_param_auto_from_type (key), 584 GNUNET_MY_query_param_auto_from_type (key),
@@ -560,9 +591,15 @@ mysql_plugin_get_key (void *cls,
560 GNUNET_MY_exec_prepared (plugin->mc, 591 GNUNET_MY_exec_prepared (plugin->mc,
561 plugin->count_entry_by_hash_vhash_and_type, 592 plugin->count_entry_by_hash_vhash_and_type,
562 params_get); 593 params_get);
563 ret = 594 GNUNET_break (GNUNET_OK == ret);
564 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type, 595 if (GNUNET_OK == ret)
565 results_get); 596 ret =
597 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
598 results_get);
599 if (GNUNET_OK == ret)
600 GNUNET_break (GNUNET_NO ==
601 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
602 NULL));
566 } 603 }
567 else 604 else
568 { 605 {
@@ -576,14 +613,20 @@ mysql_plugin_get_key (void *cls,
576 GNUNET_MY_exec_prepared (plugin->mc, 613 GNUNET_MY_exec_prepared (plugin->mc,
577 plugin->count_entry_by_hash_and_type, 614 plugin->count_entry_by_hash_and_type,
578 params_get); 615 params_get);
579 ret = 616 GNUNET_break (GNUNET_OK == ret);
580 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type, 617 if (GNUNET_OK == ret)
581 results_get); 618 ret =
619 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
620 results_get);
621 if (GNUNET_OK == ret)
622 GNUNET_break (GNUNET_NO ==
623 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
624 NULL));
582 } 625 }
583 } 626 }
584 else 627 else
585 { 628 {
586 if (vhash != NULL) 629 if (NULL != vhash)
587 { 630 {
588 struct GNUNET_MY_QueryParam params_get[] = { 631 struct GNUNET_MY_QueryParam params_get[] = {
589 GNUNET_MY_query_param_auto_from_type (key), 632 GNUNET_MY_query_param_auto_from_type (key),
@@ -595,9 +638,15 @@ mysql_plugin_get_key (void *cls,
595 GNUNET_MY_exec_prepared (plugin->mc, 638 GNUNET_MY_exec_prepared (plugin->mc,
596 plugin->count_entry_by_hash_and_vhash, 639 plugin->count_entry_by_hash_and_vhash,
597 params_get); 640 params_get);
598 ret = 641 GNUNET_break (GNUNET_OK == ret);
599 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash, 642 if (GNUNET_OK == ret)
600 results_get); 643 ret =
644 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
645 results_get);
646 if (GNUNET_OK == ret)
647 GNUNET_break (GNUNET_NO ==
648 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
649 NULL));
601 } 650 }
602 else 651 else
603 { 652 {
@@ -610,12 +659,19 @@ mysql_plugin_get_key (void *cls,
610 GNUNET_MY_exec_prepared (plugin->mc, 659 GNUNET_MY_exec_prepared (plugin->mc,
611 plugin->count_entry_by_hash, 660 plugin->count_entry_by_hash,
612 params_get); 661 params_get);
613 ret = 662 GNUNET_break (GNUNET_OK == ret);
614 GNUNET_MY_extract_result (plugin->count_entry_by_hash, 663 if (GNUNET_OK == ret)
615 results_get); 664 ret =
665 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
666 results_get);
667 if (GNUNET_OK == ret)
668 GNUNET_break (GNUNET_NO ==
669 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
670 NULL));
616 } 671 }
617 } 672 }
618 if ((ret != GNUNET_OK) || (0 >= total)) 673 if ( (GNUNET_OK != ret) ||
674 (0 >= total) )
619 { 675 {
620 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 676 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
621 return; 677 return;
@@ -640,7 +696,8 @@ mysql_plugin_get_key (void *cls,
640 696
641 execute_select (plugin, 697 execute_select (plugin,
642 plugin->select_entry_by_hash_vhash_and_type, 698 plugin->select_entry_by_hash_vhash_and_type,
643 proc, proc_cls, 699 proc,
700 proc_cls,
644 params_select); 701 params_select);
645 } 702 }
646 else 703 else
@@ -654,7 +711,8 @@ mysql_plugin_get_key (void *cls,
654 711
655 execute_select (plugin, 712 execute_select (plugin,
656 plugin->select_entry_by_hash_and_type, 713 plugin->select_entry_by_hash_and_type,
657 proc, proc_cls, 714 proc,
715 proc_cls,
658 params_select); 716 params_select);
659 } 717 }
660 } 718 }
@@ -671,7 +729,8 @@ mysql_plugin_get_key (void *cls,
671 729
672 execute_select (plugin, 730 execute_select (plugin,
673 plugin->select_entry_by_hash_and_vhash, 731 plugin->select_entry_by_hash_and_vhash,
674 proc, proc_cls, 732 proc,
733 proc_cls,
675 params_select); 734 params_select);
676 } 735 }
677 else 736 else
@@ -684,28 +743,31 @@ mysql_plugin_get_key (void *cls,
684 743
685 execute_select (plugin, 744 execute_select (plugin,
686 plugin->select_entry_by_hash, 745 plugin->select_entry_by_hash,
687 proc, proc_cls, 746 proc,
747 proc_cls,
688 params_select); 748 params_select);
689 } 749 }
690 } 750 }
691 751
692} 752}
693 753
694 754
695/** 755/**
696 * Get a zero-anonymity datum from the datastore. 756 * Get a zero-anonymity datum from the datastore.
697 * 757 *
698 * @param cls our "struct Plugin*" 758 * @param cls our `struct Plugin *`
699 * @param offset offset of the result 759 * @param offset offset of the result
700 * @param type entries of which type should be considered? 760 * @param type entries of which type should be considered?
701 * Use 0 for any type. 761 * Use 0 for any type.
702 * @param proc function to call on a matching value or NULL 762 * @param proc function to call on a matching value or NULL
703 * @param proc_cls closure for iter 763 * @param proc_cls closure for @a proc
704 */ 764 */
705static void 765static void
706mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset, 766mysql_plugin_get_zero_anonymity (void *cls,
767 uint64_t offset,
707 enum GNUNET_BLOCK_Type type, 768 enum GNUNET_BLOCK_Type type,
708 PluginDatumProcessor proc, void *proc_cls) 769 PluginDatumProcessor proc,
770 void *proc_cls)
709{ 771{
710 struct Plugin *plugin = cls; 772 struct Plugin *plugin = cls;
711 uint32_t typei = (uint32_t) type; 773 uint32_t typei = (uint32_t) type;
@@ -719,8 +781,10 @@ mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
719 GNUNET_MY_query_param_end 781 GNUNET_MY_query_param_end
720 }; 782 };
721 783
722 execute_select (plugin, plugin->zero_iter, 784 execute_select (plugin,
723 proc, proc_cls, 785 plugin->zero_iter,
786 proc,
787 proc_cls,
724 params_zero_iter); 788 params_zero_iter);
725} 789}
726 790
@@ -749,13 +813,13 @@ struct ReplCtx
749 813
750 814
751/** 815/**
752 * Wrapper for the processor for 'mysql_plugin_get_replication'. 816 * Wrapper for the processor for #mysql_plugin_get_replication().
753 * Decrements the replication counter and calls the original 817 * Decrements the replication counter and calls the original
754 * iterator. 818 * iterator.
755 * 819 *
756 * @param cls closure 820 * @param cls closure
757 * @param key key for the content 821 * @param key key for the content
758 * @param size number of bytes in data 822 * @param size number of bytes in @a data
759 * @param data content stored 823 * @param data content stored
760 * @param type type of the content 824 * @param type type of the content
761 * @param priority priority of the content 825 * @param priority priority of the content
@@ -763,19 +827,18 @@ struct ReplCtx
763 * @param expiration expiration time for the content 827 * @param expiration expiration time for the content
764 * @param uid unique identifier for the datum; 828 * @param uid unique identifier for the datum;
765 * maybe 0 if no unique identifier is available 829 * maybe 0 if no unique identifier is available
766 * 830 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
767 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
768 * (continue on call to "next", of course), 831 * (continue on call to "next", of course),
769 * GNUNET_NO to delete the item and continue (if supported) 832 * #GNUNET_NO to delete the item and continue (if supported)
770 */ 833 */
771static int 834static int
772repl_proc (void *cls, 835repl_proc (void *cls,
773 const struct GNUNET_HashCode * key, 836 const struct GNUNET_HashCode *key,
774 uint32_t size, 837 uint32_t size,
775 const void *data, 838 const void *data,
776 enum GNUNET_BLOCK_Type type, 839 enum GNUNET_BLOCK_Type type,
777 uint32_t priority, 840 uint32_t priority,
778 uint32_t anonymity, 841 uint32_t anonymity,
779 struct GNUNET_TIME_Absolute expiration, 842 struct GNUNET_TIME_Absolute expiration,
780 uint64_t uid) 843 uint64_t uid)
781{ 844{
@@ -784,21 +847,26 @@ repl_proc (void *cls,
784 int ret; 847 int ret;
785 int iret; 848 int iret;
786 849
787 ret = 850 ret = rc->proc (rc->proc_cls,
788 rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, 851 key,
789 expiration, uid); 852 size,
853 data,
854 type,
855 priority,
856 anonymity,
857 expiration,
858 uid);
790 if (NULL != key) 859 if (NULL != key)
791 { 860 {
792 struct GNUNET_MY_QueryParam params_proc[] = { 861 struct GNUNET_MY_QueryParam params_proc[] = {
793 GNUNET_MY_query_param_uint64 (&uid), 862 GNUNET_MY_query_param_uint64 (&uid),
794 GNUNET_MY_query_param_end 863 GNUNET_MY_query_param_end
795 }; 864 };
796 865
797 iret = 866 iret = GNUNET_MY_exec_prepared (plugin->mc,
798 GNUNET_MY_exec_prepared (plugin->mc, 867 plugin->dec_repl,
799 plugin->dec_repl, 868 params_proc);
800 params_proc); 869 if (GNUNET_SYSERR == iret)
801 if (iret == GNUNET_SYSERR)
802 { 870 {
803 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 871 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
804 "Failed to reduce replication counter\n"); 872 "Failed to reduce replication counter\n");
@@ -813,35 +881,29 @@ repl_proc (void *cls,
813 * Get a random item for replication. Returns a single, not expired, 881 * Get a random item for replication. Returns a single, not expired,
814 * random item from those with the highest replication counters. The 882 * random item from those with the highest replication counters. The
815 * item's replication counter is decremented by one IF it was positive 883 * item's replication counter is decremented by one IF it was positive
816 * before. Call 'proc' with all values ZERO or NULL if the datastore 884 * before. Call @a proc with all values ZERO or NULL if the datastore
817 * is empty. 885 * is empty.
818 * 886 *
819 * @param cls closure 887 * @param cls closure
820 * @param proc function to call the value (once only). 888 * @param proc function to call the value (once only).
821 * @param proc_cls closure for proc 889 * @param proc_cls closure for @a proc
822 */ 890 */
823static void 891static void
824mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc, 892mysql_plugin_get_replication (void *cls,
893 PluginDatumProcessor proc,
825 void *proc_cls) 894 void *proc_cls)
826{ 895{
827 struct Plugin *plugin = cls; 896 struct Plugin *plugin = cls;
828 uint64_t rvalue; 897 uint64_t rvalue;
829 uint32_t repl; 898 uint32_t repl;
830
831 struct ReplCtx rc; 899 struct ReplCtx rc;
832 rc.plugin = plugin;
833 rc.proc = proc;
834 rc.proc_cls = proc_cls;
835
836 struct GNUNET_MY_QueryParam params_get[] = { 900 struct GNUNET_MY_QueryParam params_get[] = {
837 GNUNET_MY_query_param_end 901 GNUNET_MY_query_param_end
838 }; 902 };
839
840 struct GNUNET_MY_ResultSpec results_get[] = { 903 struct GNUNET_MY_ResultSpec results_get[] = {
841 GNUNET_MY_result_spec_uint32 (&repl), 904 GNUNET_MY_result_spec_uint32 (&repl),
842 GNUNET_MY_result_spec_end 905 GNUNET_MY_result_spec_end
843 }; 906 };
844
845 struct GNUNET_MY_QueryParam params_select[] = { 907 struct GNUNET_MY_QueryParam params_select[] = {
846 GNUNET_MY_query_param_uint32 (&repl), 908 GNUNET_MY_query_param_uint32 (&repl),
847 GNUNET_MY_query_param_uint64 (&rvalue), 909 GNUNET_MY_query_param_uint64 (&rvalue),
@@ -850,27 +912,36 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
850 GNUNET_MY_query_param_end 912 GNUNET_MY_query_param_end
851 }; 913 };
852 914
915 rc.plugin = plugin;
916 rc.proc = proc;
917 rc.proc_cls = proc_cls;
918
853 if (1 != 919 if (1 !=
854 GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get)) 920 GNUNET_MY_exec_prepared (plugin->mc,
921 plugin->max_repl,
922 params_get))
855 { 923 {
856 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 924 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
857 return; 925 return;
858 } 926 }
859 927
860 if (1 != 928 if (GNUNET_OK !=
861 GNUNET_MY_extract_result (plugin->max_repl, results_get)) 929 GNUNET_MY_extract_result (plugin->max_repl,
930 results_get))
862 { 931 {
863 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 932 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
864 return; 933 return;
865 } 934 }
935 GNUNET_break (GNUNET_NO ==
936 GNUNET_MY_extract_result (plugin->max_repl,
937 NULL));
938 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
939 UINT64_MAX);
866 940
867 rvalue = 941 execute_select (plugin,
868 (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
869 UINT64_MAX);
870
871 execute_select (plugin,
872 plugin->select_replication, 942 plugin->select_replication,
873 &repl_proc, &rc, 943 &repl_proc,
944 &rc,
874 params_select); 945 params_select);
875} 946}
876 947
@@ -880,69 +951,91 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
880 * 951 *
881 * @param cls closure 952 * @param cls closure
882 * @param proc function to call on each key 953 * @param proc function to call on each key
883 * @param proc_cls closure for proc 954 * @param proc_cls closure for @a proc
884 */ 955 */
885static void 956static void
886mysql_plugin_get_keys (void *cls, 957mysql_plugin_get_keys (void *cls,
887 PluginKeyProcessor proc, 958 PluginKeyProcessor proc,
888 void *proc_cls) 959 void *proc_cls)
889{ 960{
890 struct Plugin *plugin = cls; 961 struct Plugin *plugin = cls;
891 char *query = "SELECT hash FROM gn090";
892 int ret; 962 int ret;
893 MYSQL_STMT *statement; 963 MYSQL_STMT *statement;
894 struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL; 964 unsigned int cnt;
895
896
897 struct GNUNET_HashCode key; 965 struct GNUNET_HashCode key;
898 966 struct GNUNET_HashCode last;
899 statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
900
901 statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc,
902 query);
903 GNUNET_assert (proc != NULL);
904
905 struct GNUNET_MY_QueryParam params_select[] = { 967 struct GNUNET_MY_QueryParam params_select[] = {
906 GNUNET_MY_query_param_end 968 GNUNET_MY_query_param_end
907 }; 969 };
908
909 struct GNUNET_MY_ResultSpec results_select[] = { 970 struct GNUNET_MY_ResultSpec results_select[] = {
910 GNUNET_MY_result_spec_auto_from_type (&key), 971 GNUNET_MY_result_spec_auto_from_type (&key),
911 GNUNET_MY_result_spec_end 972 GNUNET_MY_result_spec_end
912 }; 973 };
913 974
914 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, 975 GNUNET_assert (NULL != proc);
915 statements_handle_select, 976 statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
916 params_select)) 977 if (GNUNET_OK !=
978 GNUNET_MY_exec_prepared (plugin->mc,
979 plugin->get_all_keys,
980 params_select))
917 { 981 {
918 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 982 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
919 _("`%s' for `%s' failed at %s:%d with error: %s\n"), 983 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
920 "mysql_stmt_execute", query, __FILE__, __LINE__, 984 "mysql_stmt_execute",
985 GET_ALL_KEYS,
986 __FILE__,
987 __LINE__,
921 mysql_stmt_error (statement)); 988 mysql_stmt_error (statement));
922 GNUNET_MYSQL_statements_invalidate (plugin->mc); 989 GNUNET_MYSQL_statements_invalidate (plugin->mc);
923 proc (proc_cls, NULL, 0); 990 proc (proc_cls, NULL, 0);
924 return; 991 return;
925 } 992 }
926 993 ret = GNUNET_YES;
927 ret = GNUNET_MY_extract_result (statements_handle_select, 994 cnt = 0;
928 results_select); 995 while (ret == GNUNET_YES)
929 996 {
930 if (ret != MYSQL_NO_DATA) 997 ret = GNUNET_MY_extract_result (plugin->get_all_keys,
998 results_select);
999 if (0 != memcmp (&last,
1000 &key,
1001 sizeof (key)))
1002 {
1003 if (0 != cnt)
1004 proc (proc_cls,
1005 &last,
1006 cnt);
1007 cnt = 1;
1008 last = key;
1009 }
1010 else
1011 {
1012 cnt++;
1013 }
1014 }
1015 if (0 != cnt)
1016 proc (proc_cls,
1017 &last,
1018 cnt);
1019 /* finally, let app know we are done */
1020 proc (proc_cls,
1021 NULL,
1022 0);
1023 if (GNUNET_SYSERR == ret)
931 { 1024 {
932 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1025 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
933 _("`%s' failed at %s:%d with error: %s\n"), 1026 _("`%s' failed at %s:%d with error: %s\n"),
934 "mysql_stmt_fetch", __FILE__, __LINE__, 1027 "mysql_stmt_fetch",
935 mysql_stmt_error (statement)); 1028 __FILE__,
1029 __LINE__,
1030 mysql_stmt_error (statement));
936 GNUNET_MYSQL_statements_invalidate (plugin->mc); 1031 GNUNET_MYSQL_statements_invalidate (plugin->mc);
937 return; 1032 return;
938 } 1033 }
939
940 mysql_stmt_reset (statement);
941} 1034}
942 1035
943 1036
944/** 1037/**
945 * Context for 'expi_proc' function. 1038 * Context for #expi_proc() function.
946 */ 1039 */
947struct ExpiCtx 1040struct ExpiCtx
948{ 1041{
@@ -958,7 +1051,7 @@ struct ExpiCtx
958 PluginDatumProcessor proc; 1051 PluginDatumProcessor proc;
959 1052
960 /** 1053 /**
961 * Closure for proc. 1054 * Closure for @e proc.
962 */ 1055 */
963 void *proc_cls; 1056 void *proc_cls;
964}; 1057};
@@ -966,7 +1059,7 @@ struct ExpiCtx
966 1059
967 1060
968/** 1061/**
969 * Wrapper for the processor for 'mysql_plugin_get_expiration'. 1062 * Wrapper for the processor for #mysql_plugin_get_expiration().
970 * If no expired value was found, we do a second query for 1063 * If no expired value was found, we do a second query for
971 * low-priority content. 1064 * low-priority content.
972 * 1065 *
@@ -980,83 +1073,94 @@ struct ExpiCtx
980 * @param expiration expiration time for the content 1073 * @param expiration expiration time for the content
981 * @param uid unique identifier for the datum; 1074 * @param uid unique identifier for the datum;
982 * maybe 0 if no unique identifier is available 1075 * maybe 0 if no unique identifier is available
983 * 1076 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
984 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
985 * (continue on call to "next", of course), 1077 * (continue on call to "next", of course),
986 * GNUNET_NO to delete the item and continue (if supported) 1078 * #GNUNET_NO to delete the item and continue (if supported)
987 */ 1079 */
988static int 1080static int
989expi_proc (void *cls, 1081expi_proc (void *cls,
990 const struct GNUNET_HashCode * key, 1082 const struct GNUNET_HashCode *key,
991 uint32_t size, 1083 uint32_t size,
992 const void *data, 1084 const void *data,
993 enum GNUNET_BLOCK_Type type, 1085 enum GNUNET_BLOCK_Type type,
994 uint32_t priority, 1086 uint32_t priority,
995 uint32_t anonymity, 1087 uint32_t anonymity,
996 struct GNUNET_TIME_Absolute expiration, 1088 struct GNUNET_TIME_Absolute expiration,
997 uint64_t uid) 1089 uint64_t uid)
998{ 1090{
999 struct ExpiCtx *rc = cls; 1091 struct ExpiCtx *rc = cls;
1000 struct Plugin *plugin = rc->plugin; 1092 struct Plugin *plugin = rc->plugin;
1001
1002 struct GNUNET_MY_QueryParam params_select[] = { 1093 struct GNUNET_MY_QueryParam params_select[] = {
1003 GNUNET_MY_query_param_end 1094 GNUNET_MY_query_param_end
1004 }; 1095 };
1005 1096
1006 if (NULL == key) 1097 if (NULL == key)
1007 { 1098 {
1008 execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls, 1099 execute_select (plugin,
1100 plugin->select_priority,
1101 rc->proc,
1102 rc->proc_cls,
1009 params_select); 1103 params_select);
1010 return GNUNET_SYSERR; 1104 return GNUNET_SYSERR;
1011 } 1105 }
1012 return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, 1106 return rc->proc (rc->proc_cls,
1013 expiration, uid); 1107 key,
1108 size,
1109 data,
1110 type,
1111 priority,
1112 anonymity,
1113 expiration,
1114 uid);
1014} 1115}
1015 1116
1016 1117
1017/** 1118/**
1018 * Get a random item for expiration. 1119 * Get a random item for expiration.
1019 * Call 'proc' with all values ZERO or NULL if the datastore is empty. 1120 * Call @a proc with all values ZERO or NULL if the datastore is empty.
1020 * 1121 *
1021 * @param cls closure 1122 * @param cls closure
1022 * @param proc function to call the value (once only). 1123 * @param proc function to call the value (once only).
1023 * @param proc_cls closure for proc 1124 * @param proc_cls closure for @a proc
1024 */ 1125 */
1025static void 1126static void
1026mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc, 1127mysql_plugin_get_expiration (void *cls,
1128 PluginDatumProcessor proc,
1027 void *proc_cls) 1129 void *proc_cls)
1028{ 1130{
1029 struct Plugin *plugin = cls; 1131 struct Plugin *plugin = cls;
1030 uint64_t nt; 1132 struct GNUNET_TIME_Absolute now;
1133 struct GNUNET_MY_QueryParam params_select[] = {
1134 GNUNET_MY_query_param_absolute_time (&now),
1135 GNUNET_MY_query_param_end
1136 };
1031 struct ExpiCtx rc; 1137 struct ExpiCtx rc;
1032 1138
1033 rc.plugin = plugin; 1139 rc.plugin = plugin;
1034 rc.proc = proc; 1140 rc.proc = proc;
1035 rc.proc_cls = proc_cls; 1141 rc.proc_cls = proc_cls;
1036 nt = GNUNET_TIME_absolute_get ().abs_value_us; 1142 now = GNUNET_TIME_absolute_get ();
1037 1143 execute_select (plugin,
1038 struct GNUNET_MY_QueryParam params_select[] = { 1144 plugin->select_expiration,
1039 GNUNET_MY_query_param_uint64 (&nt), 1145 expi_proc,
1040 GNUNET_MY_query_param_end 1146 &rc,
1041 };
1042
1043 execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
1044 params_select); 1147 params_select);
1045
1046} 1148}
1047 1149
1048 1150
1049/** 1151/**
1050 * Drop database. 1152 * Drop database.
1051 * 1153 *
1052 * @param cls the "struct Plugin*" 1154 * @param cls the `struct Plugin *`
1053 */ 1155 */
1054static void 1156static void
1055mysql_plugin_drop (void *cls) 1157mysql_plugin_drop (void *cls)
1056{ 1158{
1057 struct Plugin *plugin = cls; 1159 struct Plugin *plugin = cls;
1058 1160
1059 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090")) 1161 if (GNUNET_OK !=
1162 GNUNET_MYSQL_statement_run (plugin->mc,
1163 "DROP TABLE gn090"))
1060 return; /* error */ 1164 return; /* error */
1061 plugin->env->duc (plugin->env->cls, 0); 1165 plugin->env->duc (plugin->env->cls, 0);
1062} 1166}
@@ -1065,8 +1169,8 @@ mysql_plugin_drop (void *cls)
1065/** 1169/**
1066 * Entry point for the plugin. 1170 * Entry point for the plugin.
1067 * 1171 *
1068 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*" 1172 * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1069 * @return our "struct Plugin*" 1173 * @return our `struct Plugin *`
1070 */ 1174 */
1071void * 1175void *
1072libgnunet_plugin_datastore_mysql_init (void *cls) 1176libgnunet_plugin_datastore_mysql_init (void *cls)
@@ -1077,7 +1181,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1077 1181
1078 plugin = GNUNET_new (struct Plugin); 1182 plugin = GNUNET_new (struct Plugin);
1079 plugin->env = env; 1183 plugin->env = env;
1080 plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql"); 1184 plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1185 "datastore-mysql");
1081 if (NULL == plugin->mc) 1186 if (NULL == plugin->mc)
1082 { 1187 {
1083 GNUNET_free (plugin); 1188 GNUNET_free (plugin);
@@ -1155,7 +1260,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1155 1260
1156/** 1261/**
1157 * Exit point from the plugin. 1262 * Exit point from the plugin.
1158 * @param cls our "struct Plugin*" 1263 *
1264 * @param cls our `struct Plugin *`
1159 * @return always NULL 1265 * @return always NULL
1160 */ 1266 */
1161void * 1267void *