From c77d4e5c69ac54ffddf5bd60c18bcb0504389311 Mon Sep 17 00:00:00 2001 From: David Barksdale Date: Sat, 21 Mar 2015 03:38:29 +0000 Subject: Convert datastore plugin API to asynchronous --- src/datastore/gnunet-service-datastore.c | 153 +++++++++++++++++++----------- src/datastore/perf_plugin_datastore.c | 107 ++++++++++++--------- src/datastore/plugin_datastore_heap.c | 25 +++-- src/datastore/plugin_datastore_mysql.c | 35 ++++--- src/datastore/plugin_datastore_postgres.c | 33 ++++--- src/datastore/plugin_datastore_sqlite.c | 67 ++++++++----- src/datastore/plugin_datastore_template.c | 25 ++--- src/datastore/test_plugin_datastore.c | 107 ++++++++++++--------- src/include/gnunet_datastore_plugin.h | 51 +++++++--- 9 files changed, 374 insertions(+), 229 deletions(-) (limited to 'src') diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index eb0373039..ea24ca543 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -829,38 +829,24 @@ struct PutContext }; -/** - * Actually put the data message. - * - * @param client sender of the message - * @param dm message with the data to store - */ static void -execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) +put_continuation (void *cls, const struct GNUNET_HashCode *key, uint32_t size, + int status, char *msg) { - uint32_t size; - char *msg; - int ret; + struct GNUNET_SERVER_Client *client = cls; - size = ntohl (dm->size); - msg = NULL; - ret = - plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1], - ntohl (dm->type), ntohl (dm->priority), - ntohl (dm->anonymity), ntohl (dm->replication), - GNUNET_TIME_absolute_ntoh (dm->expiration), &msg); - if (GNUNET_OK == ret) + if (GNUNET_OK == status) { GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size, GNUNET_YES); - GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key); + GNUNET_CONTAINER_bloomfilter_add (filter, key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successfully stored %u bytes of type %u under key `%s'\n", - size, ntohl (dm->type), GNUNET_h2s (&dm->key)); + "Successfully stored %u bytes under key `%s'\n", + size, GNUNET_h2s (key)); } - transmit_status (client, ret, msg); - GNUNET_free_non_null (msg); + transmit_status (client, status, msg); + GNUNET_SERVER_client_drop (client); if (quota - reserved - cache_size < payload) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -872,6 +858,33 @@ execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) } } +/** + * Actually put the data message. + * + * @param client sender of the message + * @param dm message with the data to store + */ +static void +execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) +{ + GNUNET_SERVER_client_keep (client); + plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1], + ntohl (dm->type), ntohl (dm->priority), + ntohl (dm->anonymity), ntohl (dm->replication), + GNUNET_TIME_absolute_ntoh (dm->expiration), + &put_continuation, client); +} + + +static void +check_present_continuation (void *cls, int status, char *msg) +{ + struct GNUNET_SERVER_Client *client = cls; + + transmit_status (client, GNUNET_NO, NULL); + GNUNET_SERVER_client_drop (client); +} + /** * Function that will check if the given datastore entry @@ -921,9 +934,13 @@ check_present (void *cls, const struct GNUNET_HashCode * key, uint32_t size, expiration.abs_value_us)) plugin->api->update (plugin->api->cls, uid, (int32_t) ntohl (dm->priority), - GNUNET_TIME_absolute_ntoh (dm->expiration), NULL); - transmit_status (pc->client, GNUNET_NO, NULL); - GNUNET_SERVER_client_drop (pc->client); + GNUNET_TIME_absolute_ntoh (dm->expiration), + check_present_continuation, pc->client); + else + { + transmit_status (pc->client, GNUNET_NO, NULL); + GNUNET_SERVER_client_drop (pc->client); + } GNUNET_free (pc); } else @@ -1051,6 +1068,16 @@ handle_get (void *cls, struct GNUNET_SERVER_Client *client, } +static void +update_continuation (void *cls, int status, char *msg) +{ + struct GNUNET_SERVER_Client *client = cls; + + transmit_status (client, status, msg); + GNUNET_SERVER_client_drop (client); +} + + /** * Handle UPDATE-message. * @@ -1063,21 +1090,17 @@ handle_update (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { const struct UpdateMessage *msg; - int ret; - char *emsg; GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"), 1, GNUNET_NO); msg = (const struct UpdateMessage *) message; - emsg = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n", "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid)); - ret = - plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), - (int32_t) ntohl (msg->priority), - GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg); - transmit_status (client, ret, emsg); - GNUNET_free_non_null (emsg); + GNUNET_SERVER_client_keep (client); + plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), + (int32_t) ntohl (msg->priority), + GNUNET_TIME_absolute_ntoh (msg->expiration), + update_continuation, client); } @@ -1336,6 +1359,29 @@ unload_plugin (struct DatastorePlugin *plug) } +static const struct GNUNET_SERVER_MessageHandler handlers[] = { + {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, + sizeof (struct ReserveMessage)}, + {&handle_release_reserve, NULL, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, + sizeof (struct ReleaseReserveMessage)}, + {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, + {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, + sizeof (struct UpdateMessage)}, + {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, + {&handle_get_replication, NULL, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, + sizeof (struct GNUNET_MessageHeader)}, + {&handle_get_zero_anonymity, NULL, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, + sizeof (struct GetZeroAnonymityMessage)}, + {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0}, + {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, + sizeof (struct GNUNET_MessageHeader)}, + {NULL, NULL, 0, 0} +}; + + /** * Adds a given @a key to the bloomfilter in @a cls @a count times. * @@ -1350,6 +1396,19 @@ add_key_to_bloomfilter (void *cls, { struct GNUNET_CONTAINER_BloomFilter *bf = cls; + if (NULL == key) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Bloomfilter construction complete.\n")); + GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_resume (server); + expired_kill_task + = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, + &delete_expired, + NULL); + return; + } + while (0 < count--) GNUNET_CONTAINER_bloomfilter_add (bf, key); } @@ -1365,27 +1424,6 @@ add_key_to_bloomfilter (void *cls, static void process_stat_done (void *cls, int success) { - static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, - sizeof (struct ReserveMessage)}, - {&handle_release_reserve, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, - sizeof (struct ReleaseReserveMessage)}, - {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, - {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, - sizeof (struct UpdateMessage)}, - {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, - {&handle_get_replication, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, - sizeof (struct GNUNET_MessageHeader)}, - {&handle_get_zero_anonymity, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, - sizeof (struct GetZeroAnonymityMessage)}, - {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0}, - {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, - sizeof (struct GNUNET_MessageHeader)}, - {NULL, NULL, 0, 0} - }; stat_get = NULL; plugin = load_plugin (); @@ -1411,9 +1449,12 @@ process_stat_done (void *cls, int success) GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Rebuilding bloomfilter. Please be patient.\n")); if (NULL != plugin->api->get_keys) + { plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter); + return; + } else GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Plugin does not support get_keys function. Please fix!\n")); diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c index f845b5a59..2ff8340f3 100644 --- a/src/datastore/perf_plugin_datastore.c +++ b/src/datastore/perf_plugin_datastore.c @@ -99,15 +99,60 @@ disk_utilization_change_cb (void *cls, int delta) static void -putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) +test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +static void +put_continuation (void *cls, const struct GNUNET_HashCode *key, + uint32_t size, int status, char *msg) +{ + struct CpsRunContext *crc = cls; + + if (GNUNET_OK != status) + { + FPRINTF (stderr, "ERROR: `%s'\n", msg); + } + else + { + stored_bytes += size; + stored_ops++; + stored_entries++; + } + GNUNET_SCHEDULER_add_now (&test, crc); +} + +static void +do_put (struct CpsRunContext *crc) { char value[65536]; size_t size; static struct GNUNET_HashCode key; - static int ic; - char *msg; + static int i; unsigned int prio; + if (0 == i) + crc->start = GNUNET_TIME_absolute_get (); + if (PUT_10 == i) + { + i = 0; + crc->end = GNUNET_TIME_absolute_get (); + { + printf ("%s took %s for %llu items\n", "Storing an item", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (crc->start, + crc->end), + GNUNET_YES), + PUT_10); + if (PUT_10 > 0) + GAUGER (category, "Storing an item", + (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / PUT_10, + "ms/item"); + } + crc->i++; + crc->start = GNUNET_TIME_absolute_get (); + crc->phase++; + GNUNET_SCHEDULER_add_now (&test, crc); + return; + } /* most content is 32k */ size = 32 * 1024; if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but some of it is less! */ @@ -120,33 +165,22 @@ putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) memset (value, i, size); if (i > 255) memset (value, i - 255, size / 2); - value[0] = k; + value[0] = crc->i; memcpy (&value[4], &i, sizeof (i)); - msg = NULL; prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); - if (GNUNET_OK != api->put (api->cls, &key, size, value, 1 + i % 4 /* type */ , - prio, i % 4 /* anonymity */ , - 0 /* replication */ , - GNUNET_TIME_relative_to_absolute - (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_MILLISECONDS, - 60 * 60 * 60 * 1000 + - GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg)) - { - FPRINTF (stderr, "ERROR: `%s'\n", msg); - GNUNET_free_non_null (msg); - return; - } - ic++; - stored_bytes += size; - stored_ops++; - stored_entries++; + crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ , + prio, i % 4 /* anonymity */ , + 0 /* replication */ , + GNUNET_TIME_relative_to_absolute + (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MILLISECONDS, + 60 * 60 * 60 * 1000 + + GNUNET_CRYPTO_random_u32 + (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), + put_continuation, crc); + i++; } -static void -test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - static int iterate_zeros (void *cls, const struct GNUNET_HashCode * key, uint32_t size, @@ -342,7 +376,6 @@ static void test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct CpsRunContext *crc = cls; - int j; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { @@ -361,25 +394,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) &cleaning_task, crc); break; case RP_PUT: - crc->start = GNUNET_TIME_absolute_get (); - for (j = 0; j < PUT_10; j++) - putValue (crc->api, j, crc->i); - crc->end = GNUNET_TIME_absolute_get (); - { - printf ("%s took %s for %llu items\n", "Storing an item", - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (crc->start, - crc->end), - GNUNET_YES), - PUT_10); - if (PUT_10 > 0) - GAUGER (category, "Storing an item", - (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / PUT_10, - "ms/item"); - } - crc->i++; - crc->start = GNUNET_TIME_absolute_get (); - crc->phase++; - GNUNET_SCHEDULER_add_now (&test, crc); + do_put (crc); break; case RP_REP_GET: crc->api->get_replication (crc->api->cls, &replication_get, crc); diff --git a/src/datastore/plugin_datastore_heap.c b/src/datastore/plugin_datastore_heap.c index 111a852b4..821fbb4e8 100644 --- a/src/datastore/plugin_datastore_heap.c +++ b/src/datastore/plugin_datastore_heap.c @@ -206,10 +206,10 @@ heap_plugin_estimate_size (void *cls, unsigned long long *estimate) * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -static int +static void heap_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, @@ -217,7 +217,9 @@ heap_plugin_put (void *cls, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, - struct GNUNET_TIME_Absolute expiration, char **msg) + struct GNUNET_TIME_Absolute expiration, + PluginPutCont cont, + void *cont_cls) { struct Plugin *plugin = cls; struct Value *value; @@ -267,7 +269,7 @@ heap_plugin_put (void *cls, value, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); plugin->size += size; - return GNUNET_OK; + cont (cont_cls, key, size, GNUNET_OK, NULL); } @@ -615,14 +617,16 @@ heap_plugin_get_expiration (void *cls, PluginDatumProcessor proc, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -static int +static void heap_plugin_update (void *cls, uint64_t uid, int delta, - struct GNUNET_TIME_Absolute expire, char **msg) + struct GNUNET_TIME_Absolute expire, + PluginUpdateCont cont, + void *cont_cls) { struct Plugin *plugin = cls; struct Value *value; @@ -640,7 +644,7 @@ heap_plugin_update (void *cls, value->priority = 0; else value->priority += delta; - return GNUNET_OK; + cont (cont_cls, GNUNET_OK, NULL); } @@ -778,6 +782,7 @@ heap_get_keys (void *cls, GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue, &return_value, &gac); + proc (proc_cls, NULL, 0); } diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index 972a51417..5100adc34 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -280,14 +280,15 @@ mysql_plugin_estimate_size (void *cls, unsigned long long *estimate) * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -static int +static void mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, - struct GNUNET_TIME_Absolute expiration, char **msg) + struct GNUNET_TIME_Absolute expiration, PluginPutCont cont, + void *cont_cls) { struct Plugin *plugin = cls; unsigned int irepl = replication; @@ -305,7 +306,8 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, if (size > MAX_DATUM_SIZE) { GNUNET_break (0); - return GNUNET_SYSERR; + cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large")); + return; } hashSize = sizeof (struct GNUNET_HashCode); hashSize2 = sizeof (struct GNUNET_HashCode); @@ -322,13 +324,16 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2, MYSQL_TYPE_BLOB, data, lsize, &lsize, -1)) - return GNUNET_SYSERR; + { + cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure")); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Inserted value `%s' with size %u into gn090 table\n", GNUNET_h2s (key), (unsigned int) size); if (size > 0) plugin->env->duc (plugin->env->cls, size); - return GNUNET_OK; + cont (cont_cls, key, size, GNUNET_OK, NULL); } @@ -352,12 +357,13 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -static int +static void mysql_plugin_update (void *cls, uint64_t uid, int delta, - struct GNUNET_TIME_Absolute expire, char **msg) + struct GNUNET_TIME_Absolute expire, + PluginUpdateCont cont, void *cont_cls) { struct Plugin *plugin = cls; unsigned long long vkey = uid; @@ -379,7 +385,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", vkey); } - return ret; + cont (cont_cls, ret, NULL); } @@ -778,6 +784,7 @@ mysql_plugin_get_keys (void *cls, if (statement == NULL) { GNUNET_MYSQL_statements_invalidate (plugin->mc); + proc (proc_cls, NULL, 0); return; } if (mysql_stmt_prepare (statement, query, strlen (query))) @@ -785,6 +792,7 @@ mysql_plugin_get_keys (void *cls, GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql", _("Failed to prepare statement `%s'\n"), query); GNUNET_MYSQL_statements_invalidate (plugin->mc); + proc (proc_cls, NULL, 0); return; } GNUNET_assert (proc != NULL); @@ -795,6 +803,7 @@ mysql_plugin_get_keys (void *cls, "mysql_stmt_execute", query, __FILE__, __LINE__, mysql_stmt_error (statement)); GNUNET_MYSQL_statements_invalidate (plugin->mc); + proc (proc_cls, NULL, 0); return; } memset (cbind, 0, sizeof (cbind)); @@ -810,6 +819,7 @@ mysql_plugin_get_keys (void *cls, "mysql_stmt_bind_result", __FILE__, __LINE__, mysql_stmt_error (statement)); GNUNET_MYSQL_statements_invalidate (plugin->mc); + proc (proc_cls, NULL, 0); return; } while (0 == (ret = mysql_stmt_fetch (statement))) @@ -817,6 +827,7 @@ mysql_plugin_get_keys (void *cls, if (sizeof (struct GNUNET_HashCode) == length) proc (proc_cls, &key, 1); } + proc (proc_cls, NULL, 0); if (ret != MYSQL_NO_DATA) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index eb2a69c27..e0decc024 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c @@ -279,15 +279,16 @@ postgres_plugin_estimate_size (void *cls, unsigned long long *estimate) * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to error message - * @return #GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -static int +static void postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, - struct GNUNET_TIME_Absolute expiration, char **msg) + struct GNUNET_TIME_Absolute expiration, PluginPutCont cont, + void *cont_cls) { struct Plugin *plugin = cls; struct GNUNET_HashCode vhash; @@ -326,12 +327,15 @@ postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t siz paramFormats, 1); if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put")) - return GNUNET_SYSERR; + { + cont (cont_cls, key, size, GNUNET_SYSERR, _("Postgress exec failure")); + return; + } PQclear (ret); plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD); GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", "Stored %u bytes in database\n", (unsigned int) size); - return GNUNET_OK; + cont (cont_cls, key, size, GNUNET_OK, NULL); } @@ -753,12 +757,13 @@ postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -static int +static void postgres_plugin_update (void *cls, uint64_t uid, int delta, - struct GNUNET_TIME_Absolute expire, char **msg) + struct GNUNET_TIME_Absolute expire, + PluginUpdateCont cont, void *cont_cls) { struct Plugin *plugin = cls; PGresult *ret; @@ -783,9 +788,12 @@ postgres_plugin_update (void *cls, uint64_t uid, int delta, paramFormats, 1); if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update")) - return GNUNET_SYSERR; + { + cont (cont_cls, GNUNET_SYSERR, NULL); + return; + } PQclear (ret); - return GNUNET_OK; + cont (cont_cls, GNUNET_OK, NULL); } @@ -819,6 +827,7 @@ postgres_plugin_get_keys (void *cls, } } PQclear (res); + proc (proc_cls, NULL, 0); } diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index b27aab8c1..ea00e7df7 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -470,10 +470,10 @@ delete_by_rowid (struct Plugin *plugin, * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to an error message - * @return #GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -static int +static void sqlite_plugin_put (void *cls, const struct GNUNET_HashCode *key, uint32_t size, @@ -483,7 +483,8 @@ sqlite_plugin_put (void *cls, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, - char **msg) + PluginPutCont cont, + void *cont_cls) { struct Plugin *plugin = cls; int n; @@ -491,9 +492,13 @@ sqlite_plugin_put (void *cls, sqlite3_stmt *stmt; struct GNUNET_HashCode vhash; uint64_t rvalue; + char *msg = NULL; if (size > MAX_ITEM_SIZE) - return GNUNET_SYSERR; + { + cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large")); + return; + } GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Storing in database block with type %u/key `%s'/priority %u/expiration in %s (%s).\n", type, @@ -519,13 +524,15 @@ sqlite_plugin_put (void *cls, SQLITE_TRANSIENT)) || (SQLITE_OK != sqlite3_bind_blob (stmt, 9, data, size, SQLITE_TRANSIENT))) { - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - return GNUNET_SYSERR; + cont (cont_cls, key, size, GNUNET_SYSERR, msg); + GNUNET_free_non_null(msg); + return; } n = sqlite3_step (stmt); switch (n) @@ -539,12 +546,12 @@ sqlite_plugin_put (void *cls, break; case SQLITE_BUSY: GNUNET_break (0); - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); ret = GNUNET_SYSERR; break; default: - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, @@ -552,12 +559,15 @@ sqlite_plugin_put (void *cls, "sqlite3_reset"); database_shutdown (plugin); database_setup (plugin->env->cfg, plugin); - return GNUNET_SYSERR; + cont (cont_cls, key, size, GNUNET_SYSERR, msg); + GNUNET_free_non_null(msg); + return; } if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - return ret; + cont (cont_cls, key, size, ret, msg); + GNUNET_free_non_null(msg); } @@ -581,31 +591,34 @@ sqlite_plugin_put (void *cls, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to an error message - * @return #GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -static int +static void sqlite_plugin_update (void *cls, uint64_t uid, int delta, struct GNUNET_TIME_Absolute expire, - char **msg) + PluginUpdateCont cont, + void *cont_cls) { struct Plugin *plugin = cls; int n; + char *msg = NULL; if ((SQLITE_OK != sqlite3_bind_int (plugin->updPrio, 1, delta)) || (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value_us)) || (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 3, uid))) { - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); if (SQLITE_OK != sqlite3_reset (plugin->updPrio)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - return GNUNET_SYSERR; - + cont (cont_cls, GNUNET_SYSERR, msg); + GNUNET_free_non_null(msg); + return; } n = sqlite3_step (plugin->updPrio); if (SQLITE_OK != sqlite3_reset (plugin->updPrio)) @@ -615,15 +628,21 @@ sqlite_plugin_update (void *cls, { case SQLITE_DONE: GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n"); - return GNUNET_OK; + cont (cont_cls, GNUNET_OK, NULL); + return; case SQLITE_BUSY: - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, + GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - return GNUNET_NO; + cont (cont_cls, GNUNET_NO, msg); + GNUNET_free_non_null(msg); + return; default: - LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - return GNUNET_SYSERR; + cont (cont_cls, GNUNET_SYSERR, msg); + GNUNET_free_non_null(msg); + return; } } @@ -1098,6 +1117,7 @@ sqlite_plugin_get_keys (void *cls, { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare"); + proc (proc_cls, NULL, 0); return; } while (SQLITE_ROW == (ret = sqlite3_step (stmt))) @@ -1111,6 +1131,7 @@ sqlite_plugin_get_keys (void *cls, if (SQLITE_DONE != ret) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_step"); sqlite3_finalize (stmt); + proc (proc_cls, NULL, 0); } diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c index e9d10a25c..4b8c9c016 100644 --- a/src/datastore/plugin_datastore_template.c +++ b/src/datastore/plugin_datastore_template.c @@ -69,19 +69,19 @@ template_plugin_estimate_size (void *cls, unsigned long long *estimate) * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -static int +static void template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, - struct GNUNET_TIME_Absolute expiration, char **msg) + struct GNUNET_TIME_Absolute expiration, PluginPutCont cont, + void *cont_cls) { GNUNET_break (0); - *msg = GNUNET_strdup ("not implemented"); - return GNUNET_SYSERR; + cont (cont_cls, key, size, GNUNET_SYSERR, "not implemented"); } @@ -170,16 +170,16 @@ template_plugin_get_expiration (void *cls, PluginDatumProcessor proc, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to error message - * @return GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -static int +static void template_plugin_update (void *cls, uint64_t uid, int delta, - struct GNUNET_TIME_Absolute expire, char **msg) + struct GNUNET_TIME_Absolute expire, + PluginUpdateCont cont, void *cont_cls) { GNUNET_break (0); - *msg = GNUNET_strdup ("not implemented"); - return GNUNET_SYSERR; + cont (cont_cls, GNUNET_SYSERR, "not implemented"); } @@ -226,6 +226,7 @@ template_get_keys (void *cls, PluginKeyProcessor proc, void *proc_cls) { + proc (proc_cls, NULL, 0); } diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c index ee582a97e..64e88b819 100644 --- a/src/datastore/test_plugin_datastore.c +++ b/src/datastore/test_plugin_datastore.c @@ -83,6 +83,35 @@ disk_utilization_change_cb (void *cls, int delta) } +static void +test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +static void +put_continuation (void *cls, const struct GNUNET_HashCode *key, + uint32_t size, int status, char *msg) +{ + struct CpsRunContext *crc = cls; + static unsigned long long os; + unsigned long long cs; + + if (GNUNET_OK != status) + { + FPRINTF (stderr, "ERROR: `%s'\n", msg); + } + else + { + crc->api->estimate_size (crc->api->cls, &cs); + GNUNET_assert (os <= cs); + os = cs; + stored_bytes += size; + stored_ops++; + stored_entries++; + } + GNUNET_SCHEDULER_add_now (&test, crc); +} + + static void gen_key (int i, struct GNUNET_HashCode * key) { @@ -93,14 +122,21 @@ gen_key (int i, struct GNUNET_HashCode * key) static void -put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) +do_put (struct CpsRunContext *crc) { char value[65536]; size_t size; struct GNUNET_HashCode key; - char *msg; unsigned int prio; + static int i; + if (PUT_10 == i) + { + i = 0; + crc->phase++; + GNUNET_SCHEDULER_add_now (&test, crc); + return; + } /* most content is 32k */ size = 32 * 1024; @@ -113,36 +149,25 @@ put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) memset (value, i, size); if (i > 255) memset (value, i - 255, size / 2); - value[0] = k; - msg = NULL; + value[0] = crc->i; prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "putting type %u, anon %u under key %s\n", i + 1, i, GNUNET_h2s (&key)); - if (GNUNET_OK != api->put (api->cls, &key, size, value, i + 1 /* type */ , - prio, i /* anonymity */ , - 0 /* replication */ , - GNUNET_TIME_relative_to_absolute - (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_MILLISECONDS, - 60 * 60 * 60 * 1000 + - GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg)) - { - FPRINTF (stderr, "ERROR: `%s'\n", msg); - GNUNET_free_non_null (msg); - return; - } - stored_bytes += size; - stored_ops++; - stored_entries++; + crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ , + prio, i /* anonymity */ , + 0 /* replication */ , + GNUNET_TIME_relative_to_absolute + (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MILLISECONDS, + 60 * 60 * 60 * 1000 + + GNUNET_CRYPTO_random_u32 + (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), + put_continuation, crc); + i++; } -static void -test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - static uint64_t guid; @@ -212,13 +237,21 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +static void +update_continuation (void *cls, int status, char *msg) +{ + struct CpsRunContext *crc = cls; + + GNUNET_assert (GNUNET_OK == status); + crc->phase++; + GNUNET_SCHEDULER_add_now (&test, crc); +} + + static void test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct CpsRunContext *crc = cls; - int j; - unsigned long long os; - unsigned long long cs; struct GNUNET_HashCode key; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) @@ -237,16 +270,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_SCHEDULER_add_now (&cleaning_task, crc); break; case RP_PUT: - os = 0; - for (j = 0; j < PUT_10; j++) - { - put_value (crc->api, j, crc->i); - crc->api->estimate_size (crc->api->cls, &cs); - GNUNET_assert (os <= cs); - os = cs; - } - crc->phase++; - GNUNET_SCHEDULER_add_now (&test, crc); + do_put (crc); break; case RP_GET: if (crc->cnt == 1) @@ -261,11 +285,8 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc); break; case RP_UPDATE: - GNUNET_assert (GNUNET_OK == - crc->api->update (crc->api->cls, guid, 1, - GNUNET_TIME_UNIT_ZERO_ABS, NULL)); - crc->phase++; - GNUNET_SCHEDULER_add_now (&test, crc); + crc->api->update (crc->api->cls, guid, 1, GNUNET_TIME_UNIT_ZERO_ABS, + update_continuation, crc); break; case RP_ITER_ZERO: diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h index e81a38566..efca0baed 100644 --- a/src/include/gnunet_datastore_plugin.h +++ b/src/include/gnunet_datastore_plugin.h @@ -112,6 +112,19 @@ typedef int (*PluginDatumProcessor) (void *cls, const struct GNUNET_HashCode * k typedef void (*PluginEstimateSize) (void *cls, unsigned long long *estimate); +/** + * Put continuation. + * + * @param cls closure + * @param key key for the item stored + * @param size size of the item stored + * @param status GNUNET_OK or GNUNET_SYSERROR + * @param msg error message on error + */ +typedef void (*PluginPutCont) (void *cls, const struct GNUNET_HashCode *key, + uint32_t size, int status, char *msg); + + /** * Store an item in the datastore. If the item is already present, * the priorities and replication levels are summed up and the higher @@ -126,22 +139,23 @@ typedef void (*PluginEstimateSize) (void *cls, unsigned long long *estimate); * @param anonymity anonymity-level for the content * @param replication replication-level for the content * @param expiration expiration time for the content - * @param msg set to an error message (on failure) - * @return #GNUNET_OK on success, - * #GNUNET_SYSERR on failure + * @param cont continuation called with success or failure status + * @param cont_cls continuation closure */ -typedef int (*PluginPut) (void *cls, const struct GNUNET_HashCode * key, uint32_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, - uint32_t replication, - struct GNUNET_TIME_Absolute expiration, char **msg); +typedef void (*PluginPut) (void *cls, const struct GNUNET_HashCode * key, + uint32_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + uint32_t replication, + struct GNUNET_TIME_Absolute expiration, + PluginPutCont cont, void *cont_cls); /** * An processor over a set of keys stored in the datastore. * * @param cls closure - * @param key key in the data store + * @param key key in the data store, if NULL iteration is finished * @param count how many values are stored under this key in the datastore */ typedef void (*PluginKeyProcessor) (void *cls, @@ -174,8 +188,6 @@ typedef void (*PluginGetKeys) (void *cls, * there may be! * @param type entries of which type are relevant? * Use 0 for any type. - * @param min find the smallest key that is larger than the given min, - * NULL for no minimum (return smallest key) * @param proc function to call on the matching value; * proc should be called with NULL if there is no result * @param proc_cls closure for @a proc @@ -201,6 +213,14 @@ typedef void (*PluginGetRandom) (void *cls, PluginDatumProcessor proc, void *proc_cls); +/** + * Update continuation. + * + * @param cls closure + * @param status GNUNET_OK or GNUNET_SYSERROR + * @param msg error message on error + */ +typedef void (*PluginUpdateCont) (void *cls, int status, char *msg); /** @@ -220,11 +240,12 @@ typedef void (*PluginGetRandom) (void *cls, PluginDatumProcessor proc, * @param expire new expiration time should be the * MAX of any existing expiration time and * this value - * @param msg set to an error message (on error) - * @return #GNUNET_OK on success + * @param cont continuation called with success or failure status + * @param cons_cls continuation closure */ -typedef int (*PluginUpdate) (void *cls, uint64_t uid, int delta, - struct GNUNET_TIME_Absolute expire, char **msg); +typedef void (*PluginUpdate) (void *cls, uint64_t uid, int delta, + struct GNUNET_TIME_Absolute expire, + PluginUpdateCont cont, void *cont_cls); /** -- cgit v1.2.3