aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_mysql.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/plugin_datastore_mysql.c')
-rw-r--r--src/datastore/plugin_datastore_mysql.c790
1 files changed, 252 insertions, 538 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index c3d9212d3..2eefd9b04 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -160,58 +160,6 @@ struct GNUNET_MysqlStatementHandle
160 160
161}; 161};
162 162
163/**
164 * Context for the universal iterator.
165 */
166struct NextRequestClosure;
167
168/**
169 * Type of a function that will prepare
170 * the next iteration.
171 *
172 * @param cls closure
173 * @param nc the next context; NULL for the last
174 * call which gives the callback a chance to
175 * clean up the closure
176 * @return GNUNET_OK on success, GNUNET_NO if there are
177 * no more values, GNUNET_SYSERR on error
178 */
179typedef int (*PrepareFunction)(void *cls,
180 struct NextRequestClosure *nc);
181
182
183struct NextRequestClosure
184{
185 struct Plugin *plugin;
186
187 struct GNUNET_TIME_Absolute now;
188
189 /**
190 * Function to call to prepare the next
191 * iteration.
192 */
193 PrepareFunction prep;
194
195 /**
196 * Closure for prep.
197 */
198 void *prep_cls;
199
200 MYSQL_BIND rbind[7];
201
202 enum GNUNET_BLOCK_Type type;
203
204 PluginIterator dviter;
205
206 void *dviter_cls;
207
208 unsigned int count;
209
210 int end_it;
211
212 int one_shot;
213};
214
215 163
216/** 164/**
217 * Context for all functions in this plugin. 165 * Context for all functions in this plugin.
@@ -244,16 +192,6 @@ struct Plugin
244 char *cnffile; 192 char *cnffile;
245 193
246 /** 194 /**
247 * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
248 */
249 struct NextRequestClosure *next_task_nc;
250
251 /**
252 * Pending task with scheduler for running the next request.
253 */
254 GNUNET_SCHEDULER_TaskIdentifier next_task;
255
256 /**
257 * Prepared statements. 195 * Prepared statements.
258 */ 196 */
259#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)" 197#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
@@ -295,7 +233,7 @@ struct Plugin
295#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" 233#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
296 struct GNUNET_MysqlStatementHandle *get_size; 234 struct GNUNET_MysqlStatementHandle *get_size;
297 235
298#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?" 236#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
299 struct GNUNET_MysqlStatementHandle *zero_iter; 237 struct GNUNET_MysqlStatementHandle *zero_iter;
300 238
301#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\ 239#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
@@ -372,7 +310,6 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
372} 310}
373 311
374 312
375
376/** 313/**
377 * Free a prepared statement. 314 * Free a prepared statement.
378 * 315 *
@@ -381,8 +318,7 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
381 */ 318 */
382static void 319static void
383prepared_statement_destroy (struct Plugin *plugin, 320prepared_statement_destroy (struct Plugin *plugin,
384 struct GNUNET_MysqlStatementHandle 321 struct GNUNET_MysqlStatementHandle *s)
385 *s)
386{ 322{
387 GNUNET_CONTAINER_DLL_remove (plugin->shead, 323 GNUNET_CONTAINER_DLL_remove (plugin->shead,
388 plugin->stail, 324 plugin->stail,
@@ -397,6 +333,8 @@ prepared_statement_destroy (struct Plugin *plugin,
397/** 333/**
398 * Close database connection and all prepared statements (we got a DB 334 * Close database connection and all prepared statements (we got a DB
399 * disconnect error). 335 * disconnect error).
336 *
337 * @param plugin plugin context
400 */ 338 */
401static int 339static int
402iclose (struct Plugin *plugin) 340iclose (struct Plugin *plugin)
@@ -420,10 +358,11 @@ iclose (struct Plugin *plugin)
420 * Open the connection with the database (and initialize 358 * Open the connection with the database (and initialize
421 * our default options). 359 * our default options).
422 * 360 *
361 * @param plugin plugin context
423 * @return GNUNET_OK on success 362 * @return GNUNET_OK on success
424 */ 363 */
425static int 364static int
426iopen (struct Plugin *ret) 365iopen (struct Plugin *plugin)
427{ 366{
428 char *mysql_dbname; 367 char *mysql_dbname;
429 char *mysql_server; 368 char *mysql_server;
@@ -433,67 +372,67 @@ iopen (struct Plugin *ret)
433 my_bool reconnect; 372 my_bool reconnect;
434 unsigned int timeout; 373 unsigned int timeout;
435 374
436 ret->dbf = mysql_init (NULL); 375 plugin->dbf = mysql_init (NULL);
437 if (ret->dbf == NULL) 376 if (plugin->dbf == NULL)
438 return GNUNET_SYSERR; 377 return GNUNET_SYSERR;
439 if (ret->cnffile != NULL) 378 if (plugin->cnffile != NULL)
440 mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile); 379 mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
441 mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); 380 mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
442 reconnect = 0; 381 reconnect = 0;
443 mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect); 382 mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
444 mysql_options (ret->dbf, 383 mysql_options (plugin->dbf,
445 MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout); 384 MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
446 mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8"); 385 mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
447 timeout = 60; /* in seconds */ 386 timeout = 60; /* in seconds */
448 mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); 387 mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
449 mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); 388 mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
450 mysql_dbname = NULL; 389 mysql_dbname = NULL;
451 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, 390 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
452 "datastore-mysql", "DATABASE")) 391 "datastore-mysql", "DATABASE"))
453 GNUNET_assert (GNUNET_OK == 392 GNUNET_assert (GNUNET_OK ==
454 GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, 393 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
455 "datastore-mysql", "DATABASE", 394 "datastore-mysql", "DATABASE",
456 &mysql_dbname)); 395 &mysql_dbname));
457 else 396 else
458 mysql_dbname = GNUNET_strdup ("gnunet"); 397 mysql_dbname = GNUNET_strdup ("gnunet");
459 mysql_user = NULL; 398 mysql_user = NULL;
460 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, 399 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
461 "datastore-mysql", "USER")) 400 "datastore-mysql", "USER"))
462 { 401 {
463 GNUNET_assert (GNUNET_OK == 402 GNUNET_assert (GNUNET_OK ==
464 GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, 403 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
465 "datastore-mysql", "USER", 404 "datastore-mysql", "USER",
466 &mysql_user)); 405 &mysql_user));
467 } 406 }
468 mysql_password = NULL; 407 mysql_password = NULL;
469 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, 408 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
470 "datastore-mysql", "PASSWORD")) 409 "datastore-mysql", "PASSWORD"))
471 { 410 {
472 GNUNET_assert (GNUNET_OK == 411 GNUNET_assert (GNUNET_OK ==
473 GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, 412 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
474 "datastore-mysql", "PASSWORD", 413 "datastore-mysql", "PASSWORD",
475 &mysql_password)); 414 &mysql_password));
476 } 415 }
477 mysql_server = NULL; 416 mysql_server = NULL;
478 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, 417 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
479 "datastore-mysql", "HOST")) 418 "datastore-mysql", "HOST"))
480 { 419 {
481 GNUNET_assert (GNUNET_OK == 420 GNUNET_assert (GNUNET_OK ==
482 GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, 421 GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
483 "datastore-mysql", "HOST", 422 "datastore-mysql", "HOST",
484 &mysql_server)); 423 &mysql_server));
485 } 424 }
486 mysql_port = 0; 425 mysql_port = 0;
487 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, 426 if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
488 "datastore-mysql", "PORT")) 427 "datastore-mysql", "PORT"))
489 { 428 {
490 GNUNET_assert (GNUNET_OK == 429 GNUNET_assert (GNUNET_OK ==
491 GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql", 430 GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, "datastore-mysql",
492 "PORT", &mysql_port)); 431 "PORT", &mysql_port));
493 } 432 }
494 433
495 GNUNET_assert (mysql_dbname != NULL); 434 GNUNET_assert (mysql_dbname != NULL);
496 mysql_real_connect (ret->dbf, 435 mysql_real_connect (plugin->dbf,
497 mysql_server, 436 mysql_server,
498 mysql_user, mysql_password, 437 mysql_user, mysql_password,
499 mysql_dbname, 438 mysql_dbname,
@@ -503,10 +442,10 @@ iopen (struct Plugin *ret)
503 GNUNET_free_non_null (mysql_user); 442 GNUNET_free_non_null (mysql_user);
504 GNUNET_free_non_null (mysql_password); 443 GNUNET_free_non_null (mysql_password);
505 GNUNET_free (mysql_dbname); 444 GNUNET_free (mysql_dbname);
506 if (mysql_error (ret->dbf)[0]) 445 if (mysql_error (plugin->dbf)[0])
507 { 446 {
508 LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, 447 LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
509 "mysql_real_connect", ret); 448 "mysql_real_connect", plugin);
510 return GNUNET_SYSERR; 449 return GNUNET_SYSERR;
511 } 450 }
512 return GNUNET_OK; 451 return GNUNET_OK;
@@ -686,19 +625,6 @@ init_params (struct Plugin *plugin,
686 return GNUNET_OK; 625 return GNUNET_OK;
687} 626}
688 627
689/**
690 * Type of a callback that will be called for each
691 * data set returned from MySQL.
692 *
693 * @param cls user-defined argument
694 * @param num_values number of elements in values
695 * @param values values returned by MySQL
696 * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
697 */
698typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
699 unsigned int num_values,
700 MYSQL_BIND *values);
701
702 628
703/** 629/**
704 * Run a prepared SELECT statement. 630 * Run a prepared SELECT statement.
@@ -708,40 +634,31 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
708 * @param result_size number of elements in results array 634 * @param result_size number of elements in results array
709 * @param results pointer to already initialized MYSQL_BIND 635 * @param results pointer to already initialized MYSQL_BIND
710 * array (of sufficient size) for passing results 636 * array (of sufficient size) for passing results
711 * @param processor function to call on each result 637 * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
712 * @param processor_cls extra argument to processor
713 * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
714 * values (size + buffer-reference for pointers); terminated 638 * values (size + buffer-reference for pointers); terminated
715 * with "-1" 639 * with "-1"
716 * @return GNUNET_SYSERR on error, otherwise 640 * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
717 * the number of successfully affected (or queried) rows
718 */ 641 */
719static int 642static int
720prepared_statement_run_select (struct Plugin *plugin, 643prepared_statement_run_select_va (struct Plugin *plugin,
721 struct GNUNET_MysqlStatementHandle *s, 644 struct GNUNET_MysqlStatementHandle *s,
722 unsigned int result_size, 645 unsigned int result_size,
723 MYSQL_BIND *results, 646 MYSQL_BIND *results,
724 GNUNET_MysqlDataProcessor processor, void *processor_cls, 647 va_list ap)
725 ...)
726{ 648{
727 va_list ap;
728 int ret; 649 int ret;
729 unsigned int rsize; 650 unsigned int rsize;
730 int total;
731 651
732 if (GNUNET_OK != prepare_statement (plugin, s)) 652 if (GNUNET_OK != prepare_statement (plugin, s))
733 { 653 {
734 GNUNET_break (0); 654 GNUNET_break (0);
735 return GNUNET_SYSERR; 655 return GNUNET_SYSERR;
736 } 656 }
737 va_start (ap, processor_cls);
738 if (GNUNET_OK != init_params (plugin, s, ap)) 657 if (GNUNET_OK != init_params (plugin, s, ap))
739 { 658 {
740 GNUNET_break (0); 659 GNUNET_break (0);
741 va_end (ap);
742 return GNUNET_SYSERR; 660 return GNUNET_SYSERR;
743 } 661 }
744 va_end (ap);
745 rsize = mysql_stmt_field_count (s->statement); 662 rsize = mysql_stmt_field_count (s->statement);
746 if (rsize > result_size) 663 if (rsize > result_size)
747 { 664 {
@@ -757,29 +674,53 @@ prepared_statement_run_select (struct Plugin *plugin,
757 iclose (plugin); 674 iclose (plugin);
758 return GNUNET_SYSERR; 675 return GNUNET_SYSERR;
759 } 676 }
760 677 ret = mysql_stmt_fetch (s->statement);
761 total = 0; 678 if (ret == MYSQL_NO_DATA)
762 while (1) 679 return GNUNET_NO;
680 if (ret != 0)
763 { 681 {
764 ret = mysql_stmt_fetch (s->statement); 682 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
765 if (ret == MYSQL_NO_DATA) 683 _("`%s' failed at %s:%d with error: %s\n"),
766 break; 684 "mysql_stmt_fetch",
767 if (ret != 0) 685 __FILE__, __LINE__, mysql_stmt_error (s->statement));
768 { 686 iclose (plugin);
769 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 687 return GNUNET_SYSERR;
770 _("`%s' failed at %s:%d with error: %s\n"),
771 "mysql_stmt_fetch",
772 __FILE__, __LINE__, mysql_stmt_error (s->statement));
773 iclose (plugin);
774 return GNUNET_SYSERR;
775 }
776 if (processor != NULL)
777 if (GNUNET_OK != processor (processor_cls, rsize, results))
778 break;
779 total++;
780 } 688 }
781 mysql_stmt_reset (s->statement); 689 mysql_stmt_reset (s->statement);
782 return total; 690 return GNUNET_OK;
691}
692
693
694/**
695 * Run a prepared SELECT statement.
696 *
697 * @param plugin plugin context
698 * @param s statement to run
699 * @param result_size number of elements in results array
700 * @param results pointer to already initialized MYSQL_BIND
701 * array (of sufficient size) for passing results
702 * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
703 * values (size + buffer-reference for pointers); terminated
704 * with "-1"
705 * @return GNUNET_SYSERR on error, otherwise
706 * the number of successfully affected (or queried) rows
707 */
708static int
709prepared_statement_run_select (struct Plugin *plugin,
710 struct GNUNET_MysqlStatementHandle *s,
711 unsigned int result_size,
712 MYSQL_BIND *results,
713 ...)
714{
715 va_list ap;
716 int ret;
717
718 va_start (ap, results);
719 ret = prepared_statement_run_select_va (plugin, s,
720 result_size, results,
721 ap);
722 va_end (ap);
723 return ret;
783} 724}
784 725
785 726
@@ -854,23 +795,6 @@ do_delete_entry (struct Plugin *plugin,
854 795
855 796
856/** 797/**
857 * Function that simply returns GNUNET_OK
858 *
859 * @param cls closure, not used
860 * @param num_values not used
861 * @param values not used
862 * @return GNUNET_OK
863 */
864static int
865return_ok (void *cls,
866 unsigned int num_values,
867 MYSQL_BIND *values)
868{
869 return GNUNET_OK;
870}
871
872
873/**
874 * Get an estimate of how much space the database is 798 * Get an estimate of how much space the database is
875 * currently using. 799 * currently using.
876 * 800 *
@@ -878,7 +802,7 @@ return_ok (void *cls,
878 * @return number of bytes used on disk 802 * @return number of bytes used on disk
879 */ 803 */
880static unsigned long long 804static unsigned long long
881mysql_plugin_get_size (void *cls) 805mysql_plugin_estimate_size (void *cls)
882{ 806{
883 struct Plugin *plugin = cls; 807 struct Plugin *plugin = cls;
884 MYSQL_BIND cbind[1]; 808 MYSQL_BIND cbind[1];
@@ -893,7 +817,6 @@ mysql_plugin_get_size (void *cls)
893 prepared_statement_run_select (plugin, 817 prepared_statement_run_select (plugin,
894 plugin->get_size, 818 plugin->get_size,
895 1, cbind, 819 1, cbind,
896 &return_ok, NULL,
897 -1)) 820 -1))
898 return 0; 821 return 0;
899 return total; 822 return total;
@@ -929,7 +852,6 @@ mysql_plugin_put (void *cls,
929{ 852{
930 struct Plugin *plugin = cls; 853 struct Plugin *plugin = cls;
931 unsigned int irepl = replication; 854 unsigned int irepl = replication;
932 unsigned int itype = type;
933 unsigned int ipriority = priority; 855 unsigned int ipriority = priority;
934 unsigned int ianonymity = anonymity; 856 unsigned int ianonymity = anonymity;
935 unsigned long long lexpiration = expiration.abs_value; 857 unsigned long long lexpiration = expiration.abs_value;
@@ -952,7 +874,7 @@ mysql_plugin_put (void *cls,
952 plugin->insert_entry, 874 plugin->insert_entry,
953 NULL, 875 NULL,
954 MYSQL_TYPE_LONG, &irepl, GNUNET_YES, 876 MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
955 MYSQL_TYPE_LONG, &itype, GNUNET_YES, 877 MYSQL_TYPE_LONG, &type, GNUNET_YES,
956 MYSQL_TYPE_LONG, &ipriority, GNUNET_YES, 878 MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
957 MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES, 879 MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
958 MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES, 880 MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
@@ -1034,20 +956,23 @@ mysql_plugin_update (void *cls,
1034} 956}
1035 957
1036 958
1037
1038
1039/** 959/**
1040 * Continuation of "mysql_next_request". 960 * Run the given select statement and call 'proc' on the resulting
961 * values (which must be in particular positions).
1041 * 962 *
1042 * @param next_cls the next context 963 * @param plugin the plugin handle
1043 * @param tc the task context (unused) 964 * @param stmt select statement to run
965 * @param proc function to call on result
966 * @param proc_cls closure for proc
967 * @param ... arguments to initialize stmt
1044 */ 968 */
1045static void 969static void
1046mysql_next_request_cont (void *next_cls, 970execute_select (struct Plugin *plugin,
1047 const struct GNUNET_SCHEDULER_TaskContext *tc) 971 struct GNUNET_MysqlStatementHandle *stmt,
972 PluginDatumProcessor proc, void *proc_cls,
973 ...)
1048{ 974{
1049 struct NextRequestClosure *nrc = next_cls; 975 va_list ap;
1050 struct Plugin *plugin;
1051 int ret; 976 int ret;
1052 unsigned int type; 977 unsigned int type;
1053 unsigned int priority; 978 unsigned int priority;
@@ -1059,19 +984,10 @@ mysql_next_request_cont (void *next_cls,
1059 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; 984 char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
1060 GNUNET_HashCode key; 985 GNUNET_HashCode key;
1061 struct GNUNET_TIME_Absolute expiration; 986 struct GNUNET_TIME_Absolute expiration;
1062 MYSQL_BIND *rbind = nrc->rbind; 987 MYSQL_BIND rbind[7];
1063
1064 plugin = nrc->plugin;
1065 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1066 plugin->next_task_nc = NULL;
1067 988
1068 if (GNUNET_YES == nrc->end_it)
1069 goto END_SET;
1070 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1071 nrc->now = GNUNET_TIME_absolute_get ();
1072 hashSize = sizeof (GNUNET_HashCode); 989 hashSize = sizeof (GNUNET_HashCode);
1073 memset (nrc->rbind, 0, sizeof (nrc->rbind)); 990 memset (rbind, 0, sizeof (rbind));
1074 rbind = nrc->rbind;
1075 rbind[0].buffer_type = MYSQL_TYPE_LONG; 991 rbind[0].buffer_type = MYSQL_TYPE_LONG;
1076 rbind[0].buffer = &type; 992 rbind[0].buffer = &type;
1077 rbind[0].is_unsigned = 1; 993 rbind[0].is_unsigned = 1;
@@ -1096,16 +1012,28 @@ mysql_next_request_cont (void *next_cls,
1096 rbind[6].buffer = &uid; 1012 rbind[6].buffer = &uid;
1097 rbind[6].is_unsigned = 1; 1013 rbind[6].is_unsigned = 1;
1098 1014
1099 if (GNUNET_OK != nrc->prep (nrc->prep_cls, 1015 va_start (ap, proc_cls);
1100 nrc)) 1016 ret = prepared_statement_run_select_va (plugin,
1101 goto END_SET; 1017 stmt,
1102 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); 1018 7, rbind,
1019 ap);
1020 va_end (ap);
1021 if (ret <= 0)
1022 {
1023 proc (proc_cls,
1024 NULL, 0, NULL, 0, 0, 0,
1025 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1026 return;
1027 }
1103 GNUNET_assert (size <= sizeof(value)); 1028 GNUNET_assert (size <= sizeof(value));
1104 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || 1029 if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
1105 (hashSize != sizeof (GNUNET_HashCode)) ) 1030 (hashSize != sizeof (GNUNET_HashCode)) )
1106 { 1031 {
1107 GNUNET_break (0); 1032 GNUNET_break (0);
1108 goto END_SET; 1033 proc (proc_cls,
1034 NULL, 0, NULL, 0, 0, 0,
1035 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1036 return;
1109 } 1037 }
1110#if DEBUG_MYSQL 1038#if DEBUG_MYSQL
1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1039 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1116,18 +1044,13 @@ mysql_next_request_cont (void *next_cls,
1116 anonymity, 1044 anonymity,
1117 exp); 1045 exp);
1118#endif 1046#endif
1047 GNUNET_assert (size < MAX_DATUM_SIZE);
1119 expiration.abs_value = exp; 1048 expiration.abs_value = exp;
1120 ret = nrc->dviter (nrc->dviter_cls, 1049 ret = proc (proc_cls,
1121 (nrc->one_shot == GNUNET_YES) ? NULL : nrc, 1050 &key,
1122 &key, 1051 size, value,
1123 size, value, 1052 type, priority, anonymity, expiration,
1124 type, priority, anonymity, expiration, 1053 uid);
1125 uid);
1126 if (ret == GNUNET_SYSERR)
1127 {
1128 nrc->end_it = GNUNET_YES;
1129 return;
1130 }
1131 if (ret == GNUNET_NO) 1054 if (ret == GNUNET_NO)
1132 { 1055 {
1133 do_delete_entry (plugin, uid); 1056 do_delete_entry (plugin, uid);
@@ -1135,189 +1058,50 @@ mysql_next_request_cont (void *next_cls,
1135 plugin->env->duc (plugin->env->cls, 1058 plugin->env->duc (plugin->env->cls,
1136 - size); 1059 - size);
1137 } 1060 }
1138 if (nrc->one_shot == GNUNET_YES)
1139 GNUNET_free (nrc);
1140 return;
1141 END_SET:
1142 /* call dviter with "end of set" */
1143 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1144 nrc->dviter (nrc->dviter_cls,
1145 NULL, NULL, 0, NULL, 0, 0, 0,
1146 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1147 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1148 nrc->prep (nrc->prep_cls, NULL);
1149 GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
1150 GNUNET_free (nrc);
1151} 1061}
1152 1062
1153 1063
1154/**
1155 * Function invoked on behalf of a "PluginIterator"
1156 * asking the database plugin to call the iterator
1157 * with the next item.
1158 *
1159 * @param next_cls whatever argument was given
1160 * to the PluginIterator as "next_cls".
1161 * @param end_it set to GNUNET_YES if we
1162 * should terminate the iteration early
1163 * (iterator should be still called once more
1164 * to signal the end of the iteration).
1165 */
1166static void
1167mysql_plugin_next_request (void *next_cls,
1168 int end_it)
1169{
1170 struct NextRequestClosure *nrc = next_cls;
1171
1172 if (GNUNET_YES == end_it)
1173 nrc->end_it = GNUNET_YES;
1174 nrc->plugin->next_task_nc = nrc;
1175 nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
1176 nrc);
1177}
1178
1179 1064
1180/** 1065/**
1181 * Context for 'get_statement_prepare'. 1066 * Get one of the results for a particular key in the datastore.
1182 */
1183struct GetContext
1184{
1185 GNUNET_HashCode key;
1186 GNUNET_HashCode vhash;
1187
1188 unsigned int prio;
1189 unsigned int anonymity;
1190 unsigned long long expiration;
1191 unsigned long long vkey;
1192 unsigned long long total;
1193 unsigned int off;
1194 unsigned int count;
1195 int have_vhash;
1196};
1197
1198
1199static int
1200get_statement_prepare (void *cls,
1201 struct NextRequestClosure *nrc)
1202{
1203 struct GetContext *gc = cls;
1204 struct Plugin *plugin;
1205 int ret;
1206 unsigned long hashSize;
1207
1208 if (NULL == nrc)
1209 {
1210 GNUNET_free (gc);
1211 return GNUNET_NO;
1212 }
1213 if (gc->count == gc->total)
1214 return GNUNET_NO;
1215 plugin = nrc->plugin;
1216 hashSize = sizeof (GNUNET_HashCode);
1217 if (++gc->off >= gc->total)
1218 gc->off = 0;
1219#if DEBUG_MYSQL
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
1222 gc->count+1,
1223 gc->total,
1224 gc->off,
1225 GNUNET_h2s (&gc->key));
1226#endif
1227 if (nrc->type != 0)
1228 {
1229 if (gc->have_vhash)
1230 {
1231 ret = prepared_statement_run_select (plugin,
1232 plugin->select_entry_by_hash_vhash_and_type,
1233 7, nrc->rbind,
1234 &return_ok, NULL,
1235 MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1236 MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1237 MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
1238 MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1239 -1);
1240 }
1241 else
1242 {
1243 ret =
1244 prepared_statement_run_select (plugin,
1245 plugin->select_entry_by_hash_and_type,
1246 7, nrc->rbind,
1247 &return_ok, NULL,
1248 MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1249 MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
1250 MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1251 -1);
1252 }
1253 }
1254 else
1255 {
1256 if (gc->have_vhash)
1257 {
1258 ret =
1259 prepared_statement_run_select (plugin,
1260 plugin->select_entry_by_hash_and_vhash,
1261 7, nrc->rbind,
1262 &return_ok, NULL,
1263 MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1264 MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
1265 MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1266 -1);
1267 }
1268 else
1269 {
1270 ret =
1271 prepared_statement_run_select (plugin,
1272 plugin->select_entry_by_hash,
1273 7, nrc->rbind,
1274 &return_ok, NULL,
1275 MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
1276 MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
1277 -1);
1278 }
1279 }
1280 gc->count++;
1281 return ret;
1282}
1283
1284
1285/**
1286 * Iterate over the results for a particular key in the datastore.
1287 * 1067 *
1288 * @param cls closure 1068 * @param cls closure
1289 * @param key maybe NULL (to match all entries) 1069 * @param offset offset of the result (mod #num-results);
1070 * specific ordering does not matter for the offset
1071 * @param key key to match, never NULL
1290 * @param vhash hash of the value, maybe NULL (to 1072 * @param vhash hash of the value, maybe NULL (to
1291 * match all values that have the right key). 1073 * match all values that have the right key).
1292 * Note that for DBlocks there is no difference 1074 * Note that for DBlocks there is no difference
1293 * betwen key and vhash, but for other blocks 1075 * betwen key and vhash, but for other blocks
1294 * there may be! 1076 * there may be!
1295 * @param type entries of which type are relevant? 1077 * @param type entries of which type are relevant?
1296 * Use 0 for any type. 1078 * Use 0 for any type.
1297 * @param iter function to call on each matching value; 1079 * @param proc function to call on each matching value; however,
1298 * will be called once with a NULL value at the end 1080 * after the first call to "proc", the plugin must wait
1299 * @param iter_cls closure for iter 1081 * until "NextRequest" was called before giving the processor
1082 * the next item; finally, the "proc" should be called once
1083 * once with a NULL value at the end ("next_cls" should be NULL
1084 * for that last call)
1085 * @param proc_cls closure for proc
1300 */ 1086 */
1301static void 1087static void
1302mysql_plugin_get (void *cls, 1088mysql_plugin_get_key (void *cls,
1303 const GNUNET_HashCode *key, 1089 uint64_t offset,
1304 const GNUNET_HashCode *vhash, 1090 const GNUNET_HashCode *key,
1305 enum GNUNET_BLOCK_Type type, 1091 const GNUNET_HashCode *vhash,
1306 PluginIterator iter, void *iter_cls) 1092 enum GNUNET_BLOCK_Type type,
1093 PluginDatumProcessor proc, void *proc_cls)
1307{ 1094{
1308 struct Plugin *plugin = cls; 1095 struct Plugin *plugin = cls;
1309 unsigned int itype = type;
1310 int ret; 1096 int ret;
1311 MYSQL_BIND cbind[1]; 1097 MYSQL_BIND cbind[1];
1312 struct GetContext *gc;
1313 struct NextRequestClosure *nrc;
1314 long long total; 1098 long long total;
1315 unsigned long hashSize; 1099 unsigned long hashSize;
1316 unsigned long hashSize2; 1100 unsigned long hashSize2;
1101 unsigned long long off;
1317 1102
1318 GNUNET_assert (key != NULL); 1103 GNUNET_assert (key != NULL);
1319 if (iter == NULL) 1104 GNUNET_assert (NULL != proc);
1320 return;
1321 hashSize = sizeof (GNUNET_HashCode); 1105 hashSize = sizeof (GNUNET_HashCode);
1322 hashSize2 = sizeof (GNUNET_HashCode); 1106 hashSize2 = sizeof (GNUNET_HashCode);
1323 memset (cbind, 0, sizeof (cbind)); 1107 memset (cbind, 0, sizeof (cbind));
@@ -1333,10 +1117,9 @@ mysql_plugin_get (void *cls,
1333 prepared_statement_run_select (plugin, 1117 prepared_statement_run_select (plugin,
1334 plugin->count_entry_by_hash_vhash_and_type, 1118 plugin->count_entry_by_hash_vhash_and_type,
1335 1, cbind, 1119 1, cbind,
1336 &return_ok, NULL,
1337 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 1120 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1338 MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 1121 MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
1339 MYSQL_TYPE_LONG, &itype, GNUNET_YES, 1122 MYSQL_TYPE_LONG, &type, GNUNET_YES,
1340 -1); 1123 -1);
1341 } 1124 }
1342 else 1125 else
@@ -1345,9 +1128,8 @@ mysql_plugin_get (void *cls,
1345 prepared_statement_run_select (plugin, 1128 prepared_statement_run_select (plugin,
1346 plugin->count_entry_by_hash_and_type, 1129 plugin->count_entry_by_hash_and_type,
1347 1, cbind, 1130 1, cbind,
1348 &return_ok, NULL,
1349 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 1131 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1350 MYSQL_TYPE_LONG, &itype, GNUNET_YES, 1132 MYSQL_TYPE_LONG, &type, GNUNET_YES,
1351 -1); 1133 -1);
1352 } 1134 }
1353 } 1135 }
@@ -1359,7 +1141,6 @@ mysql_plugin_get (void *cls,
1359 prepared_statement_run_select (plugin, 1141 prepared_statement_run_select (plugin,
1360 plugin->count_entry_by_hash_and_vhash, 1142 plugin->count_entry_by_hash_and_vhash,
1361 1, cbind, 1143 1, cbind,
1362 &return_ok, NULL,
1363 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 1144 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1364 MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, 1145 MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
1365 -1); 1146 -1);
@@ -1371,79 +1152,81 @@ mysql_plugin_get (void *cls,
1371 prepared_statement_run_select (plugin, 1152 prepared_statement_run_select (plugin,
1372 plugin->count_entry_by_hash, 1153 plugin->count_entry_by_hash,
1373 1, cbind, 1154 1, cbind,
1374 &return_ok, NULL,
1375 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 1155 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1376 -1); 1156 -1);
1377 } 1157 }
1378 } 1158 }
1379 if ((ret != GNUNET_OK) || (0 >= total)) 1159 if ((ret != GNUNET_OK) || (0 >= total))
1380 { 1160 {
1381 iter (iter_cls, 1161 proc (proc_cls,
1382 NULL, NULL, 0, NULL, 0, 0, 0, 1162 NULL, 0, NULL, 0, 0, 0,
1383 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1163 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1384 return; 1164 return;
1385 } 1165 }
1166 offset = offset % total;
1167 off = (unsigned long long) offset;
1386#if DEBUG_MYSQL 1168#if DEBUG_MYSQL
1387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1388 "Iterating over %lld results for GET `%s'\n", 1170 "Obtaining %llu/%lld result for GET `%s'\n",
1171 off,
1389 total, 1172 total,
1390 GNUNET_h2s (key)); 1173 GNUNET_h2s (key));
1391#endif 1174#endif
1392 gc = GNUNET_malloc (sizeof (struct GetContext)); 1175
1393 gc->key = *key; 1176 if (type != GNUNET_BLOCK_TYPE_ANY)
1394 if (vhash != NULL)
1395 { 1177 {
1396 gc->have_vhash = GNUNET_YES; 1178 if (NULL != vhash)
1397 gc->vhash = *vhash; 1179 {
1180 execute_select (plugin,
1181 plugin->select_entry_by_hash_vhash_and_type,
1182 proc, proc_cls,
1183 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1184 MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1185 MYSQL_TYPE_LONG, &type, GNUNET_YES,
1186 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1187 -1);
1188 }
1189 else
1190 {
1191 execute_select (plugin,
1192 plugin->select_entry_by_hash_and_type,
1193 proc, proc_cls,
1194 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1195 MYSQL_TYPE_LONG, &type, GNUNET_YES,
1196 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1197 -1);
1198 }
1199 }
1200 else
1201 {
1202 if (NULL != vhash)
1203 {
1204 execute_select (plugin,
1205 plugin->select_entry_by_hash_and_vhash,
1206 proc, proc_cls,
1207 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1208 MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
1209 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1210 -1);
1211 }
1212 else
1213 {
1214 execute_select (plugin,
1215 plugin->select_entry_by_hash,
1216 proc, proc_cls,
1217 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
1218 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1219 -1);
1220 }
1398 } 1221 }
1399 gc->total = total;
1400 gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1401
1402
1403 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1404 nrc->plugin = plugin;
1405 nrc->type = type;
1406 nrc->dviter = iter;
1407 nrc->dviter_cls = iter_cls;
1408 nrc->prep = &get_statement_prepare;
1409 nrc->prep_cls = gc;
1410 mysql_plugin_next_request (nrc, GNUNET_NO);
1411}
1412
1413
1414/**
1415 * Run the prepared statement to get the next data item ready.
1416 *
1417 * @param cls not used
1418 * @param nrc closure for the next request iterator
1419 * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
1420 */
1421static int
1422iterator_zero_prepare (void *cls,
1423 struct NextRequestClosure *nrc)
1424{
1425 struct Plugin *plugin;
1426 int ret;
1427
1428 if (nrc == NULL)
1429 return GNUNET_NO;
1430 plugin = nrc->plugin;
1431 ret = prepared_statement_run_select (plugin,
1432 plugin->zero_iter,
1433 7, nrc->rbind,
1434 &return_ok, NULL,
1435 MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
1436 -1);
1437 nrc->count++;
1438 return ret;
1439} 1222}
1440 1223
1441 1224
1442/** 1225/**
1443 * Select a subset of the items in the datastore and call 1226 * Get a zero-anonymity datum from the datastore.
1444 * the given iterator for each of them.
1445 * 1227 *
1446 * @param cls our "struct Plugin*" 1228 * @param cls our "struct Plugin*"
1229 * @param offset offset of the result
1447 * @param type entries of which type should be considered? 1230 * @param type entries of which type should be considered?
1448 * Use 0 for any type. 1231 * Use 0 for any type.
1449 * @param iter function to call on each matching value; 1232 * @param iter function to call on each matching value;
@@ -1451,47 +1234,27 @@ iterator_zero_prepare (void *cls,
1451 * @param iter_cls closure for iter 1234 * @param iter_cls closure for iter
1452 */ 1235 */
1453static void 1236static void
1454mysql_plugin_iter_zero_anonymity (void *cls, 1237mysql_plugin_get_zero_anonymity (void *cls,
1455 enum GNUNET_BLOCK_Type type, 1238 uint64_t offset,
1456 PluginIterator iter, 1239 enum GNUNET_BLOCK_Type type,
1457 void *iter_cls) 1240 PluginDatumProcessor proc, void *proc_cls)
1458{ 1241{
1459 struct Plugin *plugin = cls; 1242 struct Plugin *plugin = cls;
1460 struct NextRequestClosure *nrc; 1243 unsigned long long off;
1461
1462 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
1463 nrc->plugin = plugin;
1464 nrc->type = type;
1465 nrc->dviter = iter;
1466 nrc->dviter_cls = iter_cls;
1467 nrc->prep = &iterator_zero_prepare;
1468 mysql_plugin_next_request (nrc, GNUNET_NO);
1469}
1470 1244
1245 off = (unsigned long long) offset;
1246 execute_select (plugin,
1247 plugin->zero_iter,
1248 proc, proc_cls,
1249 MYSQL_TYPE_LONG, &type, GNUNET_YES,
1250 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
1251 -1);
1471 1252
1472/**
1473 * Run the SELECT statement for the replication function.
1474 *
1475 * @param cls the 'struct Plugin'
1476 * @param nrc the context (not used)
1477 */
1478static int
1479replication_prepare (void *cls,
1480 struct NextRequestClosure *nrc)
1481{
1482 struct Plugin *plugin = cls;
1483
1484 return prepared_statement_run_select (plugin,
1485 plugin->select_replication,
1486 7, nrc->rbind,
1487 &return_ok, NULL,
1488 -1);
1489} 1253}
1490 1254
1491 1255
1492
1493/** 1256/**
1494 * Context for 'repl_iter' function. 1257 * Context for 'repl_proc' function.
1495 */ 1258 */
1496struct ReplCtx 1259struct ReplCtx
1497{ 1260{
@@ -1504,22 +1267,21 @@ struct ReplCtx
1504 /** 1267 /**
1505 * Function to call for the result (or the NULL). 1268 * Function to call for the result (or the NULL).
1506 */ 1269 */
1507 PluginIterator iter; 1270 PluginDatumProcessor proc;
1508 1271
1509 /** 1272 /**
1510 * Closure for iter. 1273 * Closure for proc.
1511 */ 1274 */
1512 void *iter_cls; 1275 void *proc_cls;
1513}; 1276};
1514 1277
1515 1278
1516/** 1279/**
1517 * Wrapper for the iterator for 'sqlite_plugin_replication_get'. 1280 * Wrapper for the processor for 'mysql_plugin_get_replication'.
1518 * Decrements the replication counter and calls the original 1281 * Decrements the replication counter and calls the original
1519 * iterator. 1282 * iterator.
1520 * 1283 *
1521 * @param cls closure 1284 * @param cls closure
1522 * @param next_cls closure to pass to the "next" function.
1523 * @param key key for the content 1285 * @param key key for the content
1524 * @param size number of bytes in data 1286 * @param size number of bytes in data
1525 * @param data content stored 1287 * @param data content stored
@@ -1535,8 +1297,7 @@ struct ReplCtx
1535 * GNUNET_NO to delete the item and continue (if supported) 1297 * GNUNET_NO to delete the item and continue (if supported)
1536 */ 1298 */
1537static int 1299static int
1538repl_iter (void *cls, 1300repl_proc (void *cls,
1539 void *next_cls,
1540 const GNUNET_HashCode *key, 1301 const GNUNET_HashCode *key,
1541 uint32_t size, 1302 uint32_t size,
1542 const void *data, 1303 const void *data,
@@ -1552,8 +1313,8 @@ repl_iter (void *cls,
1552 int ret; 1313 int ret;
1553 int iret; 1314 int iret;
1554 1315
1555 ret = rc->iter (rc->iter_cls, 1316 ret = rc->proc (rc->proc_cls,
1556 next_cls, key, 1317 key,
1557 size, data, 1318 size, data,
1558 type, priority, anonymity, expiration, 1319 type, priority, anonymity, expiration,
1559 uid); 1320 uid);
@@ -1561,10 +1322,10 @@ repl_iter (void *cls,
1561 { 1322 {
1562 oid = (unsigned long long) uid; 1323 oid = (unsigned long long) uid;
1563 iret = prepared_statement_run (plugin, 1324 iret = prepared_statement_run (plugin,
1564 plugin->dec_repl, 1325 plugin->dec_repl,
1565 NULL, 1326 NULL,
1566 MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 1327 MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
1567 -1); 1328 -1);
1568 if (iret == GNUNET_SYSERR) 1329 if (iret == GNUNET_SYSERR)
1569 { 1330 {
1570 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1331 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1577,94 +1338,56 @@ repl_iter (void *cls,
1577 1338
1578 1339
1579/** 1340/**
1580 * Get a random item for replication. Returns a single, not expired, random item 1341 * Get a random item for replication. Returns a single, not expired,
1581 * from those with the highest replication counters. The item's 1342 * random item from those with the highest replication counters. The
1582 * replication counter is decremented by one IF it was positive before. 1343 * item's replication counter is decremented by one IF it was positive
1583 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 1344 * before. Call 'proc' with all values ZERO or NULL if the datastore
1345 * is empty.
1584 * 1346 *
1585 * @param cls closure 1347 * @param cls closure
1586 * @param iter function to call the value (once only). 1348 * @param proc function to call the value (once only).
1587 * @param iter_cls closure for iter 1349 * @param iter_cls closure for proc
1588 */ 1350 */
1589static void 1351static void
1590mysql_plugin_replication_get (void *cls, 1352mysql_plugin_get_replication (void *cls,
1591 PluginIterator iter, void *iter_cls) 1353 PluginDatumProcessor proc, void *proc_cls)
1592{ 1354{
1593 struct Plugin *plugin = cls; 1355 struct Plugin *plugin = cls;
1594 struct NextRequestClosure *nrc;
1595 struct ReplCtx rc; 1356 struct ReplCtx rc;
1596 1357
1597 rc.plugin = plugin; 1358 rc.plugin = plugin;
1598 rc.iter = iter; 1359 rc.proc = proc;
1599 rc.iter_cls = iter_cls; 1360 rc.proc_cls = proc_cls;
1600 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 1361 execute_select (plugin,
1601 nrc->plugin = plugin; 1362 plugin->select_replication,
1602 nrc->now = GNUNET_TIME_absolute_get (); 1363 &repl_proc, &rc,
1603 nrc->prep = &replication_prepare; 1364 -1);
1604 nrc->prep_cls = plugin;
1605 nrc->type = 0;
1606 nrc->dviter = &repl_iter;
1607 nrc->dviter_cls = &rc;
1608 nrc->end_it = GNUNET_NO;
1609 nrc->one_shot = GNUNET_YES;
1610 mysql_next_request_cont (nrc, NULL);
1611}
1612
1613 1365
1614/**
1615 * Run the SELECT statement for the expiration function.
1616 *
1617 * @param cls the 'struct Plugin'
1618 * @param nrc the context (not used)
1619 * @return GNUNET_OK on success, GNUNET_NO if there are
1620 * no more values, GNUNET_SYSERR on error
1621 */
1622static int
1623expiration_prepare (void *cls,
1624 struct NextRequestClosure *nrc)
1625{
1626 struct Plugin *plugin = cls;
1627 long long nt;
1628
1629 if (NULL == nrc)
1630 return GNUNET_NO;
1631 nt = (long long) nrc->now.abs_value;
1632 return prepared_statement_run_select
1633 (plugin,
1634 plugin->select_expiration,
1635 7, nrc->rbind,
1636 &return_ok, NULL,
1637 MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
1638 -1);
1639} 1366}
1640 1367
1641 1368
1642/** 1369/**
1643 * Get a random item for expiration. 1370 * Get a random item for expiration.
1644 * Call 'iter' with all values ZERO or NULL if the datastore is empty. 1371 * Call 'proc' with all values ZERO or NULL if the datastore is empty.
1645 * 1372 *
1646 * @param cls closure 1373 * @param cls closure
1647 * @param iter function to call the value (once only). 1374 * @param proc function to call the value (once only).
1648 * @param iter_cls closure for iter 1375 * @param proc_cls closure for proc
1649 */ 1376 */
1650static void 1377static void
1651mysql_plugin_expiration_get (void *cls, 1378mysql_plugin_get_expiration (void *cls,
1652 PluginIterator iter, void *iter_cls) 1379 PluginDatumProcessor proc, void *proc_cls)
1653{ 1380{
1654 struct Plugin *plugin = cls; 1381 struct Plugin *plugin = cls;
1655 struct NextRequestClosure *nrc; 1382 long long nt;
1656 1383
1657 nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); 1384 nt = (long long) GNUNET_TIME_absolute_get().abs_value;
1658 nrc->plugin = plugin; 1385 execute_select (plugin,
1659 nrc->now = GNUNET_TIME_absolute_get (); 1386 plugin->select_expiration,
1660 nrc->prep = &expiration_prepare; 1387 proc, proc_cls,
1661 nrc->prep_cls = plugin; 1388 MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
1662 nrc->type = 0; 1389 -1);
1663 nrc->dviter = iter; 1390
1664 nrc->dviter_cls = iter_cls;
1665 nrc->end_it = GNUNET_NO;
1666 nrc->one_shot = GNUNET_YES;
1667 mysql_next_request_cont (nrc, NULL);
1668} 1391}
1669 1392
1670 1393
@@ -1760,14 +1483,13 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1760 1483
1761 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); 1484 api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1762 api->cls = plugin; 1485 api->cls = plugin;
1763 api->get_size = &mysql_plugin_get_size; 1486 api->estimate_size = &mysql_plugin_estimate_size;
1764 api->put = &mysql_plugin_put; 1487 api->put = &mysql_plugin_put;
1765 api->next_request = &mysql_plugin_next_request;
1766 api->get = &mysql_plugin_get;
1767 api->replication_get = &mysql_plugin_replication_get;
1768 api->expiration_get = &mysql_plugin_expiration_get;
1769 api->update = &mysql_plugin_update; 1488 api->update = &mysql_plugin_update;
1770 api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity; 1489 api->get_key = &mysql_plugin_get_key;
1490 api->get_replication = &mysql_plugin_get_replication;
1491 api->get_expiration = &mysql_plugin_get_expiration;
1492 api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
1771 api->drop = &mysql_plugin_drop; 1493 api->drop = &mysql_plugin_drop;
1772 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, 1494 GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1773 "mysql", _("Mysql database running\n")); 1495 "mysql", _("Mysql database running\n"));
@@ -1787,14 +1509,6 @@ libgnunet_plugin_datastore_mysql_done (void *cls)
1787 struct Plugin *plugin = api->cls; 1509 struct Plugin *plugin = api->cls;
1788 1510
1789 iclose (plugin); 1511 iclose (plugin);
1790 if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1791 {
1792 GNUNET_SCHEDULER_cancel (plugin->next_task);
1793 plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1794 plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1795 GNUNET_free (plugin->next_task_nc);
1796 plugin->next_task_nc = NULL;
1797 }
1798 GNUNET_free_non_null (plugin->cnffile); 1512 GNUNET_free_non_null (plugin->cnffile);
1799 GNUNET_free (plugin); 1513 GNUNET_free (plugin);
1800 GNUNET_free (api); 1514 GNUNET_free (api);