diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2015-03-21 03:38:29 +0000 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2015-03-21 03:38:29 +0000 |
commit | c77d4e5c69ac54ffddf5bd60c18bcb0504389311 (patch) | |
tree | bb40b73db6ed428d6ab44ffee91ca0ed6f16b592 /src/datastore/gnunet-service-datastore.c | |
parent | ce6f1156a58aafed6563585b3be560ec0b4eabe7 (diff) | |
download | gnunet-c77d4e5c69ac54ffddf5bd60c18bcb0504389311.tar.gz gnunet-c77d4e5c69ac54ffddf5bd60c18bcb0504389311.zip |
Convert datastore plugin API to asynchronous
Diffstat (limited to 'src/datastore/gnunet-service-datastore.c')
-rw-r--r-- | src/datastore/gnunet-service-datastore.c | 153 |
1 files changed, 97 insertions, 56 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index eb0373039..ea24ca543 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c | |||
@@ -829,38 +829,24 @@ struct PutContext | |||
829 | }; | 829 | }; |
830 | 830 | ||
831 | 831 | ||
832 | /** | ||
833 | * Actually put the data message. | ||
834 | * | ||
835 | * @param client sender of the message | ||
836 | * @param dm message with the data to store | ||
837 | */ | ||
838 | static void | 832 | static void |
839 | execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) | 833 | put_continuation (void *cls, const struct GNUNET_HashCode *key, uint32_t size, |
834 | int status, char *msg) | ||
840 | { | 835 | { |
841 | uint32_t size; | 836 | struct GNUNET_SERVER_Client *client = cls; |
842 | char *msg; | ||
843 | int ret; | ||
844 | 837 | ||
845 | size = ntohl (dm->size); | 838 | if (GNUNET_OK == status) |
846 | msg = NULL; | ||
847 | ret = | ||
848 | plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1], | ||
849 | ntohl (dm->type), ntohl (dm->priority), | ||
850 | ntohl (dm->anonymity), ntohl (dm->replication), | ||
851 | GNUNET_TIME_absolute_ntoh (dm->expiration), &msg); | ||
852 | if (GNUNET_OK == ret) | ||
853 | { | 839 | { |
854 | GNUNET_STATISTICS_update (stats, | 840 | GNUNET_STATISTICS_update (stats, |
855 | gettext_noop ("# bytes stored"), size, | 841 | gettext_noop ("# bytes stored"), size, |
856 | GNUNET_YES); | 842 | GNUNET_YES); |
857 | GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key); | 843 | GNUNET_CONTAINER_bloomfilter_add (filter, key); |
858 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 844 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
859 | "Successfully stored %u bytes of type %u under key `%s'\n", | 845 | "Successfully stored %u bytes under key `%s'\n", |
860 | size, ntohl (dm->type), GNUNET_h2s (&dm->key)); | 846 | size, GNUNET_h2s (key)); |
861 | } | 847 | } |
862 | transmit_status (client, ret, msg); | 848 | transmit_status (client, status, msg); |
863 | GNUNET_free_non_null (msg); | 849 | GNUNET_SERVER_client_drop (client); |
864 | if (quota - reserved - cache_size < payload) | 850 | if (quota - reserved - cache_size < payload) |
865 | { | 851 | { |
866 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 852 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
@@ -872,6 +858,33 @@ execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) | |||
872 | } | 858 | } |
873 | } | 859 | } |
874 | 860 | ||
861 | /** | ||
862 | * Actually put the data message. | ||
863 | * | ||
864 | * @param client sender of the message | ||
865 | * @param dm message with the data to store | ||
866 | */ | ||
867 | static void | ||
868 | execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) | ||
869 | { | ||
870 | GNUNET_SERVER_client_keep (client); | ||
871 | plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1], | ||
872 | ntohl (dm->type), ntohl (dm->priority), | ||
873 | ntohl (dm->anonymity), ntohl (dm->replication), | ||
874 | GNUNET_TIME_absolute_ntoh (dm->expiration), | ||
875 | &put_continuation, client); | ||
876 | } | ||
877 | |||
878 | |||
879 | static void | ||
880 | check_present_continuation (void *cls, int status, char *msg) | ||
881 | { | ||
882 | struct GNUNET_SERVER_Client *client = cls; | ||
883 | |||
884 | transmit_status (client, GNUNET_NO, NULL); | ||
885 | GNUNET_SERVER_client_drop (client); | ||
886 | } | ||
887 | |||
875 | 888 | ||
876 | /** | 889 | /** |
877 | * Function that will check if the given datastore entry | 890 | * Function that will check if the given datastore entry |
@@ -921,9 +934,13 @@ check_present (void *cls, const struct GNUNET_HashCode * key, uint32_t size, | |||
921 | expiration.abs_value_us)) | 934 | expiration.abs_value_us)) |
922 | plugin->api->update (plugin->api->cls, uid, | 935 | plugin->api->update (plugin->api->cls, uid, |
923 | (int32_t) ntohl (dm->priority), | 936 | (int32_t) ntohl (dm->priority), |
924 | GNUNET_TIME_absolute_ntoh (dm->expiration), NULL); | 937 | GNUNET_TIME_absolute_ntoh (dm->expiration), |
925 | transmit_status (pc->client, GNUNET_NO, NULL); | 938 | check_present_continuation, pc->client); |
926 | GNUNET_SERVER_client_drop (pc->client); | 939 | else |
940 | { | ||
941 | transmit_status (pc->client, GNUNET_NO, NULL); | ||
942 | GNUNET_SERVER_client_drop (pc->client); | ||
943 | } | ||
927 | GNUNET_free (pc); | 944 | GNUNET_free (pc); |
928 | } | 945 | } |
929 | else | 946 | else |
@@ -1051,6 +1068,16 @@ handle_get (void *cls, struct GNUNET_SERVER_Client *client, | |||
1051 | } | 1068 | } |
1052 | 1069 | ||
1053 | 1070 | ||
1071 | static void | ||
1072 | update_continuation (void *cls, int status, char *msg) | ||
1073 | { | ||
1074 | struct GNUNET_SERVER_Client *client = cls; | ||
1075 | |||
1076 | transmit_status (client, status, msg); | ||
1077 | GNUNET_SERVER_client_drop (client); | ||
1078 | } | ||
1079 | |||
1080 | |||
1054 | /** | 1081 | /** |
1055 | * Handle UPDATE-message. | 1082 | * Handle UPDATE-message. |
1056 | * | 1083 | * |
@@ -1063,21 +1090,17 @@ handle_update (void *cls, struct GNUNET_SERVER_Client *client, | |||
1063 | const struct GNUNET_MessageHeader *message) | 1090 | const struct GNUNET_MessageHeader *message) |
1064 | { | 1091 | { |
1065 | const struct UpdateMessage *msg; | 1092 | const struct UpdateMessage *msg; |
1066 | int ret; | ||
1067 | char *emsg; | ||
1068 | 1093 | ||
1069 | GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"), | 1094 | GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"), |
1070 | 1, GNUNET_NO); | 1095 | 1, GNUNET_NO); |
1071 | msg = (const struct UpdateMessage *) message; | 1096 | msg = (const struct UpdateMessage *) message; |
1072 | emsg = NULL; | ||
1073 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n", | 1097 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n", |
1074 | "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid)); | 1098 | "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid)); |
1075 | ret = | 1099 | GNUNET_SERVER_client_keep (client); |
1076 | plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), | 1100 | plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), |
1077 | (int32_t) ntohl (msg->priority), | 1101 | (int32_t) ntohl (msg->priority), |
1078 | GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg); | 1102 | GNUNET_TIME_absolute_ntoh (msg->expiration), |
1079 | transmit_status (client, ret, emsg); | 1103 | update_continuation, client); |
1080 | GNUNET_free_non_null (emsg); | ||
1081 | } | 1104 | } |
1082 | 1105 | ||
1083 | 1106 | ||
@@ -1336,6 +1359,29 @@ unload_plugin (struct DatastorePlugin *plug) | |||
1336 | } | 1359 | } |
1337 | 1360 | ||
1338 | 1361 | ||
1362 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | ||
1363 | {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, | ||
1364 | sizeof (struct ReserveMessage)}, | ||
1365 | {&handle_release_reserve, NULL, | ||
1366 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, | ||
1367 | sizeof (struct ReleaseReserveMessage)}, | ||
1368 | {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, | ||
1369 | {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, | ||
1370 | sizeof (struct UpdateMessage)}, | ||
1371 | {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, | ||
1372 | {&handle_get_replication, NULL, | ||
1373 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, | ||
1374 | sizeof (struct GNUNET_MessageHeader)}, | ||
1375 | {&handle_get_zero_anonymity, NULL, | ||
1376 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, | ||
1377 | sizeof (struct GetZeroAnonymityMessage)}, | ||
1378 | {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0}, | ||
1379 | {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, | ||
1380 | sizeof (struct GNUNET_MessageHeader)}, | ||
1381 | {NULL, NULL, 0, 0} | ||
1382 | }; | ||
1383 | |||
1384 | |||
1339 | /** | 1385 | /** |
1340 | * Adds a given @a key to the bloomfilter in @a cls @a count times. | 1386 | * Adds a given @a key to the bloomfilter in @a cls @a count times. |
1341 | * | 1387 | * |
@@ -1350,6 +1396,19 @@ add_key_to_bloomfilter (void *cls, | |||
1350 | { | 1396 | { |
1351 | struct GNUNET_CONTAINER_BloomFilter *bf = cls; | 1397 | struct GNUNET_CONTAINER_BloomFilter *bf = cls; |
1352 | 1398 | ||
1399 | if (NULL == key) | ||
1400 | { | ||
1401 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1402 | _("Bloomfilter construction complete.\n")); | ||
1403 | GNUNET_SERVER_add_handlers (server, handlers); | ||
1404 | GNUNET_SERVER_resume (server); | ||
1405 | expired_kill_task | ||
1406 | = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, | ||
1407 | &delete_expired, | ||
1408 | NULL); | ||
1409 | return; | ||
1410 | } | ||
1411 | |||
1353 | while (0 < count--) | 1412 | while (0 < count--) |
1354 | GNUNET_CONTAINER_bloomfilter_add (bf, key); | 1413 | GNUNET_CONTAINER_bloomfilter_add (bf, key); |
1355 | } | 1414 | } |
@@ -1365,27 +1424,6 @@ add_key_to_bloomfilter (void *cls, | |||
1365 | static void | 1424 | static void |
1366 | process_stat_done (void *cls, int success) | 1425 | process_stat_done (void *cls, int success) |
1367 | { | 1426 | { |
1368 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | ||
1369 | {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, | ||
1370 | sizeof (struct ReserveMessage)}, | ||
1371 | {&handle_release_reserve, NULL, | ||
1372 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, | ||
1373 | sizeof (struct ReleaseReserveMessage)}, | ||
1374 | {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, | ||
1375 | {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, | ||
1376 | sizeof (struct UpdateMessage)}, | ||
1377 | {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, | ||
1378 | {&handle_get_replication, NULL, | ||
1379 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, | ||
1380 | sizeof (struct GNUNET_MessageHeader)}, | ||
1381 | {&handle_get_zero_anonymity, NULL, | ||
1382 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, | ||
1383 | sizeof (struct GetZeroAnonymityMessage)}, | ||
1384 | {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0}, | ||
1385 | {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, | ||
1386 | sizeof (struct GNUNET_MessageHeader)}, | ||
1387 | {NULL, NULL, 0, 0} | ||
1388 | }; | ||
1389 | 1427 | ||
1390 | stat_get = NULL; | 1428 | stat_get = NULL; |
1391 | plugin = load_plugin (); | 1429 | plugin = load_plugin (); |
@@ -1411,9 +1449,12 @@ process_stat_done (void *cls, int success) | |||
1411 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1449 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1412 | _("Rebuilding bloomfilter. Please be patient.\n")); | 1450 | _("Rebuilding bloomfilter. Please be patient.\n")); |
1413 | if (NULL != plugin->api->get_keys) | 1451 | if (NULL != plugin->api->get_keys) |
1452 | { | ||
1414 | plugin->api->get_keys (plugin->api->cls, | 1453 | plugin->api->get_keys (plugin->api->cls, |
1415 | &add_key_to_bloomfilter, | 1454 | &add_key_to_bloomfilter, |
1416 | filter); | 1455 | filter); |
1456 | return; | ||
1457 | } | ||
1417 | else | 1458 | else |
1418 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1459 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1419 | _("Plugin does not support get_keys function. Please fix!\n")); | 1460 | _("Plugin does not support get_keys function. Please fix!\n")); |