aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2015-03-21 03:38:29 +0000
committerDavid Barksdale <amatus.amongus@gmail.com>2015-03-21 03:38:29 +0000
commitc77d4e5c69ac54ffddf5bd60c18bcb0504389311 (patch)
treebb40b73db6ed428d6ab44ffee91ca0ed6f16b592 /src/datastore
parentce6f1156a58aafed6563585b3be560ec0b4eabe7 (diff)
downloadgnunet-c77d4e5c69ac54ffddf5bd60c18bcb0504389311.tar.gz
gnunet-c77d4e5c69ac54ffddf5bd60c18bcb0504389311.zip
Convert datastore plugin API to asynchronous
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/gnunet-service-datastore.c153
-rw-r--r--src/datastore/perf_plugin_datastore.c107
-rw-r--r--src/datastore/plugin_datastore_heap.c25
-rw-r--r--src/datastore/plugin_datastore_mysql.c35
-rw-r--r--src/datastore/plugin_datastore_postgres.c33
-rw-r--r--src/datastore/plugin_datastore_sqlite.c67
-rw-r--r--src/datastore/plugin_datastore_template.c25
-rw-r--r--src/datastore/test_plugin_datastore.c107
8 files changed, 338 insertions, 214 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index eb0373039..ea24ca543 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -829,38 +829,24 @@ struct PutContext
829}; 829};
830 830
831 831
832/**
833 * Actually put the data message.
834 *
835 * @param client sender of the message
836 * @param dm message with the data to store
837 */
838static void 832static void
839execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm) 833put_continuation (void *cls, const struct GNUNET_HashCode *key, uint32_t size,
834 int status, char *msg)
840{ 835{
841 uint32_t size; 836 struct GNUNET_SERVER_Client *client = cls;
842 char *msg;
843 int ret;
844 837
845 size = ntohl (dm->size); 838 if (GNUNET_OK == status)
846 msg = NULL;
847 ret =
848 plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
849 ntohl (dm->type), ntohl (dm->priority),
850 ntohl (dm->anonymity), ntohl (dm->replication),
851 GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
852 if (GNUNET_OK == ret)
853 { 839 {
854 GNUNET_STATISTICS_update (stats, 840 GNUNET_STATISTICS_update (stats,
855 gettext_noop ("# bytes stored"), size, 841 gettext_noop ("# bytes stored"), size,
856 GNUNET_YES); 842 GNUNET_YES);
857 GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key); 843 GNUNET_CONTAINER_bloomfilter_add (filter, key);
858 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859 "Successfully stored %u bytes of type %u under key `%s'\n", 845 "Successfully stored %u bytes under key `%s'\n",
860 size, ntohl (dm->type), GNUNET_h2s (&dm->key)); 846 size, GNUNET_h2s (key));
861 } 847 }
862 transmit_status (client, ret, msg); 848 transmit_status (client, status, msg);
863 GNUNET_free_non_null (msg); 849 GNUNET_SERVER_client_drop (client);
864 if (quota - reserved - cache_size < payload) 850 if (quota - reserved - cache_size < payload)
865 { 851 {
866 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 852 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -872,6 +858,33 @@ execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
872 } 858 }
873} 859}
874 860
861/**
862 * Actually put the data message.
863 *
864 * @param client sender of the message
865 * @param dm message with the data to store
866 */
867static void
868execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
869{
870 GNUNET_SERVER_client_keep (client);
871 plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1],
872 ntohl (dm->type), ntohl (dm->priority),
873 ntohl (dm->anonymity), ntohl (dm->replication),
874 GNUNET_TIME_absolute_ntoh (dm->expiration),
875 &put_continuation, client);
876}
877
878
879static void
880check_present_continuation (void *cls, int status, char *msg)
881{
882 struct GNUNET_SERVER_Client *client = cls;
883
884 transmit_status (client, GNUNET_NO, NULL);
885 GNUNET_SERVER_client_drop (client);
886}
887
875 888
876/** 889/**
877 * Function that will check if the given datastore entry 890 * Function that will check if the given datastore entry
@@ -921,9 +934,13 @@ check_present (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
921 expiration.abs_value_us)) 934 expiration.abs_value_us))
922 plugin->api->update (plugin->api->cls, uid, 935 plugin->api->update (plugin->api->cls, uid,
923 (int32_t) ntohl (dm->priority), 936 (int32_t) ntohl (dm->priority),
924 GNUNET_TIME_absolute_ntoh (dm->expiration), NULL); 937 GNUNET_TIME_absolute_ntoh (dm->expiration),
925 transmit_status (pc->client, GNUNET_NO, NULL); 938 check_present_continuation, pc->client);
926 GNUNET_SERVER_client_drop (pc->client); 939 else
940 {
941 transmit_status (pc->client, GNUNET_NO, NULL);
942 GNUNET_SERVER_client_drop (pc->client);
943 }
927 GNUNET_free (pc); 944 GNUNET_free (pc);
928 } 945 }
929 else 946 else
@@ -1051,6 +1068,16 @@ handle_get (void *cls, struct GNUNET_SERVER_Client *client,
1051} 1068}
1052 1069
1053 1070
1071static void
1072update_continuation (void *cls, int status, char *msg)
1073{
1074 struct GNUNET_SERVER_Client *client = cls;
1075
1076 transmit_status (client, status, msg);
1077 GNUNET_SERVER_client_drop (client);
1078}
1079
1080
1054/** 1081/**
1055 * Handle UPDATE-message. 1082 * Handle UPDATE-message.
1056 * 1083 *
@@ -1063,21 +1090,17 @@ handle_update (void *cls, struct GNUNET_SERVER_Client *client,
1063 const struct GNUNET_MessageHeader *message) 1090 const struct GNUNET_MessageHeader *message)
1064{ 1091{
1065 const struct UpdateMessage *msg; 1092 const struct UpdateMessage *msg;
1066 int ret;
1067 char *emsg;
1068 1093
1069 GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"), 1094 GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1070 1, GNUNET_NO); 1095 1, GNUNET_NO);
1071 msg = (const struct UpdateMessage *) message; 1096 msg = (const struct UpdateMessage *) message;
1072 emsg = NULL;
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n", 1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
1074 "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid)); 1098 "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1075 ret = 1099 GNUNET_SERVER_client_keep (client);
1076 plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), 1100 plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
1077 (int32_t) ntohl (msg->priority), 1101 (int32_t) ntohl (msg->priority),
1078 GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg); 1102 GNUNET_TIME_absolute_ntoh (msg->expiration),
1079 transmit_status (client, ret, emsg); 1103 update_continuation, client);
1080 GNUNET_free_non_null (emsg);
1081} 1104}
1082 1105
1083 1106
@@ -1336,6 +1359,29 @@ unload_plugin (struct DatastorePlugin *plug)
1336} 1359}
1337 1360
1338 1361
1362static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1363 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1364 sizeof (struct ReserveMessage)},
1365 {&handle_release_reserve, NULL,
1366 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1367 sizeof (struct ReleaseReserveMessage)},
1368 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1369 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1370 sizeof (struct UpdateMessage)},
1371 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1372 {&handle_get_replication, NULL,
1373 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1374 sizeof (struct GNUNET_MessageHeader)},
1375 {&handle_get_zero_anonymity, NULL,
1376 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1377 sizeof (struct GetZeroAnonymityMessage)},
1378 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1379 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1380 sizeof (struct GNUNET_MessageHeader)},
1381 {NULL, NULL, 0, 0}
1382};
1383
1384
1339/** 1385/**
1340 * Adds a given @a key to the bloomfilter in @a cls @a count times. 1386 * Adds a given @a key to the bloomfilter in @a cls @a count times.
1341 * 1387 *
@@ -1350,6 +1396,19 @@ add_key_to_bloomfilter (void *cls,
1350{ 1396{
1351 struct GNUNET_CONTAINER_BloomFilter *bf = cls; 1397 struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1352 1398
1399 if (NULL == key)
1400 {
1401 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1402 _("Bloomfilter construction complete.\n"));
1403 GNUNET_SERVER_add_handlers (server, handlers);
1404 GNUNET_SERVER_resume (server);
1405 expired_kill_task
1406 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1407 &delete_expired,
1408 NULL);
1409 return;
1410 }
1411
1353 while (0 < count--) 1412 while (0 < count--)
1354 GNUNET_CONTAINER_bloomfilter_add (bf, key); 1413 GNUNET_CONTAINER_bloomfilter_add (bf, key);
1355} 1414}
@@ -1365,27 +1424,6 @@ add_key_to_bloomfilter (void *cls,
1365static void 1424static void
1366process_stat_done (void *cls, int success) 1425process_stat_done (void *cls, int success)
1367{ 1426{
1368 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1369 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1370 sizeof (struct ReserveMessage)},
1371 {&handle_release_reserve, NULL,
1372 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1373 sizeof (struct ReleaseReserveMessage)},
1374 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1375 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1376 sizeof (struct UpdateMessage)},
1377 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1378 {&handle_get_replication, NULL,
1379 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1380 sizeof (struct GNUNET_MessageHeader)},
1381 {&handle_get_zero_anonymity, NULL,
1382 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1383 sizeof (struct GetZeroAnonymityMessage)},
1384 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1385 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1386 sizeof (struct GNUNET_MessageHeader)},
1387 {NULL, NULL, 0, 0}
1388 };
1389 1427
1390 stat_get = NULL; 1428 stat_get = NULL;
1391 plugin = load_plugin (); 1429 plugin = load_plugin ();
@@ -1411,9 +1449,12 @@ process_stat_done (void *cls, int success)
1411 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1449 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1412 _("Rebuilding bloomfilter. Please be patient.\n")); 1450 _("Rebuilding bloomfilter. Please be patient.\n"));
1413 if (NULL != plugin->api->get_keys) 1451 if (NULL != plugin->api->get_keys)
1452 {
1414 plugin->api->get_keys (plugin->api->cls, 1453 plugin->api->get_keys (plugin->api->cls,
1415 &add_key_to_bloomfilter, 1454 &add_key_to_bloomfilter,
1416 filter); 1455 filter);
1456 return;
1457 }
1417 else 1458 else
1418 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1459 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1419 _("Plugin does not support get_keys function. Please fix!\n")); 1460 _("Plugin does not support get_keys function. Please fix!\n"));
diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c
index f845b5a59..2ff8340f3 100644
--- a/src/datastore/perf_plugin_datastore.c
+++ b/src/datastore/perf_plugin_datastore.c
@@ -99,15 +99,60 @@ disk_utilization_change_cb (void *cls, int delta)
99 99
100 100
101static void 101static void
102putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) 102test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
103
104
105static void
106put_continuation (void *cls, const struct GNUNET_HashCode *key,
107 uint32_t size, int status, char *msg)
108{
109 struct CpsRunContext *crc = cls;
110
111 if (GNUNET_OK != status)
112 {
113 FPRINTF (stderr, "ERROR: `%s'\n", msg);
114 }
115 else
116 {
117 stored_bytes += size;
118 stored_ops++;
119 stored_entries++;
120 }
121 GNUNET_SCHEDULER_add_now (&test, crc);
122}
123
124static void
125do_put (struct CpsRunContext *crc)
103{ 126{
104 char value[65536]; 127 char value[65536];
105 size_t size; 128 size_t size;
106 static struct GNUNET_HashCode key; 129 static struct GNUNET_HashCode key;
107 static int ic; 130 static int i;
108 char *msg;
109 unsigned int prio; 131 unsigned int prio;
110 132
133 if (0 == i)
134 crc->start = GNUNET_TIME_absolute_get ();
135 if (PUT_10 == i)
136 {
137 i = 0;
138 crc->end = GNUNET_TIME_absolute_get ();
139 {
140 printf ("%s took %s for %llu items\n", "Storing an item",
141 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (crc->start,
142 crc->end),
143 GNUNET_YES),
144 PUT_10);
145 if (PUT_10 > 0)
146 GAUGER (category, "Storing an item",
147 (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / PUT_10,
148 "ms/item");
149 }
150 crc->i++;
151 crc->start = GNUNET_TIME_absolute_get ();
152 crc->phase++;
153 GNUNET_SCHEDULER_add_now (&test, crc);
154 return;
155 }
111 /* most content is 32k */ 156 /* most content is 32k */
112 size = 32 * 1024; 157 size = 32 * 1024;
113 if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but some of it is less! */ 158 if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0) /* but some of it is less! */
@@ -120,33 +165,22 @@ putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
120 memset (value, i, size); 165 memset (value, i, size);
121 if (i > 255) 166 if (i > 255)
122 memset (value, i - 255, size / 2); 167 memset (value, i - 255, size / 2);
123 value[0] = k; 168 value[0] = crc->i;
124 memcpy (&value[4], &i, sizeof (i)); 169 memcpy (&value[4], &i, sizeof (i));
125 msg = NULL;
126 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); 170 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
127 if (GNUNET_OK != api->put (api->cls, &key, size, value, 1 + i % 4 /* type */ , 171 crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ ,
128 prio, i % 4 /* anonymity */ , 172 prio, i % 4 /* anonymity */ ,
129 0 /* replication */ , 173 0 /* replication */ ,
130 GNUNET_TIME_relative_to_absolute 174 GNUNET_TIME_relative_to_absolute
131 (GNUNET_TIME_relative_multiply 175 (GNUNET_TIME_relative_multiply
132 (GNUNET_TIME_UNIT_MILLISECONDS, 176 (GNUNET_TIME_UNIT_MILLISECONDS,
133 60 * 60 * 60 * 1000 + 177 60 * 60 * 60 * 1000 +
134 GNUNET_CRYPTO_random_u32 178 GNUNET_CRYPTO_random_u32
135 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg)) 179 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
136 { 180 put_continuation, crc);
137 FPRINTF (stderr, "ERROR: `%s'\n", msg); 181 i++;
138 GNUNET_free_non_null (msg);
139 return;
140 }
141 ic++;
142 stored_bytes += size;
143 stored_ops++;
144 stored_entries++;
145} 182}
146 183
147static void
148test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
149
150 184
151static int 185static int
152iterate_zeros (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 186iterate_zeros (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
@@ -342,7 +376,6 @@ static void
342test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 376test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
343{ 377{
344 struct CpsRunContext *crc = cls; 378 struct CpsRunContext *crc = cls;
345 int j;
346 379
347 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 380 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
348 { 381 {
@@ -361,25 +394,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
361 &cleaning_task, crc); 394 &cleaning_task, crc);
362 break; 395 break;
363 case RP_PUT: 396 case RP_PUT:
364 crc->start = GNUNET_TIME_absolute_get (); 397 do_put (crc);
365 for (j = 0; j < PUT_10; j++)
366 putValue (crc->api, j, crc->i);
367 crc->end = GNUNET_TIME_absolute_get ();
368 {
369 printf ("%s took %s for %llu items\n", "Storing an item",
370 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (crc->start,
371 crc->end),
372 GNUNET_YES),
373 PUT_10);
374 if (PUT_10 > 0)
375 GAUGER (category, "Storing an item",
376 (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / PUT_10,
377 "ms/item");
378 }
379 crc->i++;
380 crc->start = GNUNET_TIME_absolute_get ();
381 crc->phase++;
382 GNUNET_SCHEDULER_add_now (&test, crc);
383 break; 398 break;
384 case RP_REP_GET: 399 case RP_REP_GET:
385 crc->api->get_replication (crc->api->cls, &replication_get, crc); 400 crc->api->get_replication (crc->api->cls, &replication_get, crc);
diff --git a/src/datastore/plugin_datastore_heap.c b/src/datastore/plugin_datastore_heap.c
index 111a852b4..821fbb4e8 100644
--- a/src/datastore/plugin_datastore_heap.c
+++ b/src/datastore/plugin_datastore_heap.c
@@ -206,10 +206,10 @@ heap_plugin_estimate_size (void *cls, unsigned long long *estimate)
206 * @param anonymity anonymity-level for the content 206 * @param anonymity anonymity-level for the content
207 * @param replication replication-level for the content 207 * @param replication replication-level for the content
208 * @param expiration expiration time for the content 208 * @param expiration expiration time for the content
209 * @param msg set to error message 209 * @param cont continuation called with success or failure status
210 * @return GNUNET_OK on success 210 * @param cont_cls continuation closure
211 */ 211 */
212static int 212static void
213heap_plugin_put (void *cls, 213heap_plugin_put (void *cls,
214 const struct GNUNET_HashCode * key, 214 const struct GNUNET_HashCode * key,
215 uint32_t size, 215 uint32_t size,
@@ -217,7 +217,9 @@ heap_plugin_put (void *cls,
217 enum GNUNET_BLOCK_Type type, 217 enum GNUNET_BLOCK_Type type,
218 uint32_t priority, uint32_t anonymity, 218 uint32_t priority, uint32_t anonymity,
219 uint32_t replication, 219 uint32_t replication,
220 struct GNUNET_TIME_Absolute expiration, char **msg) 220 struct GNUNET_TIME_Absolute expiration,
221 PluginPutCont cont,
222 void *cont_cls)
221{ 223{
222 struct Plugin *plugin = cls; 224 struct Plugin *plugin = cls;
223 struct Value *value; 225 struct Value *value;
@@ -267,7 +269,7 @@ heap_plugin_put (void *cls,
267 value, 269 value,
268 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 270 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
269 plugin->size += size; 271 plugin->size += size;
270 return GNUNET_OK; 272 cont (cont_cls, key, size, GNUNET_OK, NULL);
271} 273}
272 274
273 275
@@ -615,14 +617,16 @@ heap_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
615 * @param expire new expiration time should be the 617 * @param expire new expiration time should be the
616 * MAX of any existing expiration time and 618 * MAX of any existing expiration time and
617 * this value 619 * this value
618 * @param msg set to error message 620 * @param cont continuation called with success or failure status
619 * @return GNUNET_OK on success 621 * @param cons_cls continuation closure
620 */ 622 */
621static int 623static void
622heap_plugin_update (void *cls, 624heap_plugin_update (void *cls,
623 uint64_t uid, 625 uint64_t uid,
624 int delta, 626 int delta,
625 struct GNUNET_TIME_Absolute expire, char **msg) 627 struct GNUNET_TIME_Absolute expire,
628 PluginUpdateCont cont,
629 void *cont_cls)
626{ 630{
627 struct Plugin *plugin = cls; 631 struct Plugin *plugin = cls;
628 struct Value *value; 632 struct Value *value;
@@ -640,7 +644,7 @@ heap_plugin_update (void *cls,
640 value->priority = 0; 644 value->priority = 0;
641 else 645 else
642 value->priority += delta; 646 value->priority += delta;
643 return GNUNET_OK; 647 cont (cont_cls, GNUNET_OK, NULL);
644} 648}
645 649
646 650
@@ -778,6 +782,7 @@ heap_get_keys (void *cls,
778 GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue, 782 GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
779 &return_value, 783 &return_value,
780 &gac); 784 &gac);
785 proc (proc_cls, NULL, 0);
781} 786}
782 787
783 788
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index 972a51417..5100adc34 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -280,14 +280,15 @@ mysql_plugin_estimate_size (void *cls, unsigned long long *estimate)
280 * @param anonymity anonymity-level for the content 280 * @param anonymity anonymity-level for the content
281 * @param replication replication-level for the content 281 * @param replication replication-level for the content
282 * @param expiration expiration time for the content 282 * @param expiration expiration time for the content
283 * @param msg set to error message 283 * @param cont continuation called with success or failure status
284 * @return GNUNET_OK on success 284 * @param cont_cls continuation closure
285 */ 285 */
286static int 286static void
287mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 287mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
288 const void *data, enum GNUNET_BLOCK_Type type, 288 const void *data, enum GNUNET_BLOCK_Type type,
289 uint32_t priority, uint32_t anonymity, uint32_t replication, 289 uint32_t priority, uint32_t anonymity, uint32_t replication,
290 struct GNUNET_TIME_Absolute expiration, char **msg) 290 struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
291 void *cont_cls)
291{ 292{
292 struct Plugin *plugin = cls; 293 struct Plugin *plugin = cls;
293 unsigned int irepl = replication; 294 unsigned int irepl = replication;
@@ -305,7 +306,8 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
305 if (size > MAX_DATUM_SIZE) 306 if (size > MAX_DATUM_SIZE)
306 { 307 {
307 GNUNET_break (0); 308 GNUNET_break (0);
308 return GNUNET_SYSERR; 309 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
310 return;
309 } 311 }
310 hashSize = sizeof (struct GNUNET_HashCode); 312 hashSize = sizeof (struct GNUNET_HashCode);
311 hashSize2 = sizeof (struct GNUNET_HashCode); 313 hashSize2 = sizeof (struct GNUNET_HashCode);
@@ -322,13 +324,16 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
322 MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 324 MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
323 MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2, 325 MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
324 MYSQL_TYPE_BLOB, data, lsize, &lsize, -1)) 326 MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
325 return GNUNET_SYSERR; 327 {
328 cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure"));
329 return;
330 }
326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327 "Inserted value `%s' with size %u into gn090 table\n", 332 "Inserted value `%s' with size %u into gn090 table\n",
328 GNUNET_h2s (key), (unsigned int) size); 333 GNUNET_h2s (key), (unsigned int) size);
329 if (size > 0) 334 if (size > 0)
330 plugin->env->duc (plugin->env->cls, size); 335 plugin->env->duc (plugin->env->cls, size);
331 return GNUNET_OK; 336 cont (cont_cls, key, size, GNUNET_OK, NULL);
332} 337}
333 338
334 339
@@ -352,12 +357,13 @@ mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
352 * @param expire new expiration time should be the 357 * @param expire new expiration time should be the
353 * MAX of any existing expiration time and 358 * MAX of any existing expiration time and
354 * this value 359 * this value
355 * @param msg set to error message 360 * @param cont continuation called with success or failure status
356 * @return GNUNET_OK on success 361 * @param cons_cls continuation closure
357 */ 362 */
358static int 363static void
359mysql_plugin_update (void *cls, uint64_t uid, int delta, 364mysql_plugin_update (void *cls, uint64_t uid, int delta,
360 struct GNUNET_TIME_Absolute expire, char **msg) 365 struct GNUNET_TIME_Absolute expire,
366 PluginUpdateCont cont, void *cont_cls)
361{ 367{
362 struct Plugin *plugin = cls; 368 struct Plugin *plugin = cls;
363 unsigned long long vkey = uid; 369 unsigned long long vkey = uid;
@@ -379,7 +385,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
379 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", 385 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
380 vkey); 386 vkey);
381 } 387 }
382 return ret; 388 cont (cont_cls, ret, NULL);
383} 389}
384 390
385 391
@@ -778,6 +784,7 @@ mysql_plugin_get_keys (void *cls,
778 if (statement == NULL) 784 if (statement == NULL)
779 { 785 {
780 GNUNET_MYSQL_statements_invalidate (plugin->mc); 786 GNUNET_MYSQL_statements_invalidate (plugin->mc);
787 proc (proc_cls, NULL, 0);
781 return; 788 return;
782 } 789 }
783 if (mysql_stmt_prepare (statement, query, strlen (query))) 790 if (mysql_stmt_prepare (statement, query, strlen (query)))
@@ -785,6 +792,7 @@ mysql_plugin_get_keys (void *cls,
785 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql", 792 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
786 _("Failed to prepare statement `%s'\n"), query); 793 _("Failed to prepare statement `%s'\n"), query);
787 GNUNET_MYSQL_statements_invalidate (plugin->mc); 794 GNUNET_MYSQL_statements_invalidate (plugin->mc);
795 proc (proc_cls, NULL, 0);
788 return; 796 return;
789 } 797 }
790 GNUNET_assert (proc != NULL); 798 GNUNET_assert (proc != NULL);
@@ -795,6 +803,7 @@ mysql_plugin_get_keys (void *cls,
795 "mysql_stmt_execute", query, __FILE__, __LINE__, 803 "mysql_stmt_execute", query, __FILE__, __LINE__,
796 mysql_stmt_error (statement)); 804 mysql_stmt_error (statement));
797 GNUNET_MYSQL_statements_invalidate (plugin->mc); 805 GNUNET_MYSQL_statements_invalidate (plugin->mc);
806 proc (proc_cls, NULL, 0);
798 return; 807 return;
799 } 808 }
800 memset (cbind, 0, sizeof (cbind)); 809 memset (cbind, 0, sizeof (cbind));
@@ -810,6 +819,7 @@ mysql_plugin_get_keys (void *cls,
810 "mysql_stmt_bind_result", __FILE__, __LINE__, 819 "mysql_stmt_bind_result", __FILE__, __LINE__,
811 mysql_stmt_error (statement)); 820 mysql_stmt_error (statement));
812 GNUNET_MYSQL_statements_invalidate (plugin->mc); 821 GNUNET_MYSQL_statements_invalidate (plugin->mc);
822 proc (proc_cls, NULL, 0);
813 return; 823 return;
814 } 824 }
815 while (0 == (ret = mysql_stmt_fetch (statement))) 825 while (0 == (ret = mysql_stmt_fetch (statement)))
@@ -817,6 +827,7 @@ mysql_plugin_get_keys (void *cls,
817 if (sizeof (struct GNUNET_HashCode) == length) 827 if (sizeof (struct GNUNET_HashCode) == length)
818 proc (proc_cls, &key, 1); 828 proc (proc_cls, &key, 1);
819 } 829 }
830 proc (proc_cls, NULL, 0);
820 if (ret != MYSQL_NO_DATA) 831 if (ret != MYSQL_NO_DATA)
821 { 832 {
822 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 833 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index eb2a69c27..e0decc024 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -279,15 +279,16 @@ postgres_plugin_estimate_size (void *cls, unsigned long long *estimate)
279 * @param anonymity anonymity-level for the content 279 * @param anonymity anonymity-level for the content
280 * @param replication replication-level for the content 280 * @param replication replication-level for the content
281 * @param expiration expiration time for the content 281 * @param expiration expiration time for the content
282 * @param msg set to error message 282 * @param cont continuation called with success or failure status
283 * @return #GNUNET_OK on success 283 * @param cont_cls continuation closure
284 */ 284 */
285static int 285static void
286postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 286postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
287 const void *data, enum GNUNET_BLOCK_Type type, 287 const void *data, enum GNUNET_BLOCK_Type type,
288 uint32_t priority, uint32_t anonymity, 288 uint32_t priority, uint32_t anonymity,
289 uint32_t replication, 289 uint32_t replication,
290 struct GNUNET_TIME_Absolute expiration, char **msg) 290 struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
291 void *cont_cls)
291{ 292{
292 struct Plugin *plugin = cls; 293 struct Plugin *plugin = cls;
293 struct GNUNET_HashCode vhash; 294 struct GNUNET_HashCode vhash;
@@ -326,12 +327,15 @@ postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t siz
326 paramFormats, 1); 327 paramFormats, 1);
327 if (GNUNET_OK != 328 if (GNUNET_OK !=
328 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put")) 329 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
329 return GNUNET_SYSERR; 330 {
331 cont (cont_cls, key, size, GNUNET_SYSERR, _("Postgress exec failure"));
332 return;
333 }
330 PQclear (ret); 334 PQclear (ret);
331 plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD); 335 plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
332 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres", 336 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
333 "Stored %u bytes in database\n", (unsigned int) size); 337 "Stored %u bytes in database\n", (unsigned int) size);
334 return GNUNET_OK; 338 cont (cont_cls, key, size, GNUNET_OK, NULL);
335} 339}
336 340
337 341
@@ -753,12 +757,13 @@ postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
753 * @param expire new expiration time should be the 757 * @param expire new expiration time should be the
754 * MAX of any existing expiration time and 758 * MAX of any existing expiration time and
755 * this value 759 * this value
756 * @param msg set to error message 760 * @param cont continuation called with success or failure status
757 * @return GNUNET_OK on success 761 * @param cons_cls continuation closure
758 */ 762 */
759static int 763static void
760postgres_plugin_update (void *cls, uint64_t uid, int delta, 764postgres_plugin_update (void *cls, uint64_t uid, int delta,
761 struct GNUNET_TIME_Absolute expire, char **msg) 765 struct GNUNET_TIME_Absolute expire,
766 PluginUpdateCont cont, void *cont_cls)
762{ 767{
763 struct Plugin *plugin = cls; 768 struct Plugin *plugin = cls;
764 PGresult *ret; 769 PGresult *ret;
@@ -783,9 +788,12 @@ postgres_plugin_update (void *cls, uint64_t uid, int delta,
783 paramFormats, 1); 788 paramFormats, 1);
784 if (GNUNET_OK != 789 if (GNUNET_OK !=
785 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update")) 790 GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
786 return GNUNET_SYSERR; 791 {
792 cont (cont_cls, GNUNET_SYSERR, NULL);
793 return;
794 }
787 PQclear (ret); 795 PQclear (ret);
788 return GNUNET_OK; 796 cont (cont_cls, GNUNET_OK, NULL);
789} 797}
790 798
791 799
@@ -819,6 +827,7 @@ postgres_plugin_get_keys (void *cls,
819 } 827 }
820 } 828 }
821 PQclear (res); 829 PQclear (res);
830 proc (proc_cls, NULL, 0);
822} 831}
823 832
824 833
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c
index b27aab8c1..ea00e7df7 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -470,10 +470,10 @@ delete_by_rowid (struct Plugin *plugin,
470 * @param anonymity anonymity-level for the content 470 * @param anonymity anonymity-level for the content
471 * @param replication replication-level for the content 471 * @param replication replication-level for the content
472 * @param expiration expiration time for the content 472 * @param expiration expiration time for the content
473 * @param msg set to an error message 473 * @param cont continuation called with success or failure status
474 * @return #GNUNET_OK on success 474 * @param cont_cls continuation closure
475 */ 475 */
476static int 476static void
477sqlite_plugin_put (void *cls, 477sqlite_plugin_put (void *cls,
478 const struct GNUNET_HashCode *key, 478 const struct GNUNET_HashCode *key,
479 uint32_t size, 479 uint32_t size,
@@ -483,7 +483,8 @@ sqlite_plugin_put (void *cls,
483 uint32_t anonymity, 483 uint32_t anonymity,
484 uint32_t replication, 484 uint32_t replication,
485 struct GNUNET_TIME_Absolute expiration, 485 struct GNUNET_TIME_Absolute expiration,
486 char **msg) 486 PluginPutCont cont,
487 void *cont_cls)
487{ 488{
488 struct Plugin *plugin = cls; 489 struct Plugin *plugin = cls;
489 int n; 490 int n;
@@ -491,9 +492,13 @@ sqlite_plugin_put (void *cls,
491 sqlite3_stmt *stmt; 492 sqlite3_stmt *stmt;
492 struct GNUNET_HashCode vhash; 493 struct GNUNET_HashCode vhash;
493 uint64_t rvalue; 494 uint64_t rvalue;
495 char *msg = NULL;
494 496
495 if (size > MAX_ITEM_SIZE) 497 if (size > MAX_ITEM_SIZE)
496 return GNUNET_SYSERR; 498 {
499 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
500 return;
501 }
497 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", 502 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
498 "Storing in database block with type %u/key `%s'/priority %u/expiration in %s (%s).\n", 503 "Storing in database block with type %u/key `%s'/priority %u/expiration in %s (%s).\n",
499 type, 504 type,
@@ -519,13 +524,15 @@ sqlite_plugin_put (void *cls,
519 SQLITE_TRANSIENT)) || 524 SQLITE_TRANSIENT)) ||
520 (SQLITE_OK != sqlite3_bind_blob (stmt, 9, data, size, SQLITE_TRANSIENT))) 525 (SQLITE_OK != sqlite3_bind_blob (stmt, 9, data, size, SQLITE_TRANSIENT)))
521 { 526 {
522 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 527 LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
523 "sqlite3_bind_XXXX"); 528 "sqlite3_bind_XXXX");
524 if (SQLITE_OK != sqlite3_reset (stmt)) 529 if (SQLITE_OK != sqlite3_reset (stmt))
525 LOG_SQLITE (plugin, NULL, 530 LOG_SQLITE (plugin, NULL,
526 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 531 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
527 "sqlite3_reset"); 532 "sqlite3_reset");
528 return GNUNET_SYSERR; 533 cont (cont_cls, key, size, GNUNET_SYSERR, msg);
534 GNUNET_free_non_null(msg);
535 return;
529 } 536 }
530 n = sqlite3_step (stmt); 537 n = sqlite3_step (stmt);
531 switch (n) 538 switch (n)
@@ -539,12 +546,12 @@ sqlite_plugin_put (void *cls,
539 break; 546 break;
540 case SQLITE_BUSY: 547 case SQLITE_BUSY:
541 GNUNET_break (0); 548 GNUNET_break (0);
542 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 549 LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
543 "sqlite3_step"); 550 "sqlite3_step");
544 ret = GNUNET_SYSERR; 551 ret = GNUNET_SYSERR;
545 break; 552 break;
546 default: 553 default:
547 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 554 LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
548 "sqlite3_step"); 555 "sqlite3_step");
549 if (SQLITE_OK != sqlite3_reset (stmt)) 556 if (SQLITE_OK != sqlite3_reset (stmt))
550 LOG_SQLITE (plugin, NULL, 557 LOG_SQLITE (plugin, NULL,
@@ -552,12 +559,15 @@ sqlite_plugin_put (void *cls,
552 "sqlite3_reset"); 559 "sqlite3_reset");
553 database_shutdown (plugin); 560 database_shutdown (plugin);
554 database_setup (plugin->env->cfg, plugin); 561 database_setup (plugin->env->cfg, plugin);
555 return GNUNET_SYSERR; 562 cont (cont_cls, key, size, GNUNET_SYSERR, msg);
563 GNUNET_free_non_null(msg);
564 return;
556 } 565 }
557 if (SQLITE_OK != sqlite3_reset (stmt)) 566 if (SQLITE_OK != sqlite3_reset (stmt))
558 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 567 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
559 "sqlite3_reset"); 568 "sqlite3_reset");
560 return ret; 569 cont (cont_cls, key, size, ret, msg);
570 GNUNET_free_non_null(msg);
561} 571}
562 572
563 573
@@ -581,31 +591,34 @@ sqlite_plugin_put (void *cls,
581 * @param expire new expiration time should be the 591 * @param expire new expiration time should be the
582 * MAX of any existing expiration time and 592 * MAX of any existing expiration time and
583 * this value 593 * this value
584 * @param msg set to an error message 594 * @param cont continuation called with success or failure status
585 * @return #GNUNET_OK on success 595 * @param cons_cls continuation closure
586 */ 596 */
587static int 597static void
588sqlite_plugin_update (void *cls, 598sqlite_plugin_update (void *cls,
589 uint64_t uid, 599 uint64_t uid,
590 int delta, 600 int delta,
591 struct GNUNET_TIME_Absolute expire, 601 struct GNUNET_TIME_Absolute expire,
592 char **msg) 602 PluginUpdateCont cont,
603 void *cont_cls)
593{ 604{
594 struct Plugin *plugin = cls; 605 struct Plugin *plugin = cls;
595 int n; 606 int n;
607 char *msg = NULL;
596 608
597 if ((SQLITE_OK != sqlite3_bind_int (plugin->updPrio, 1, delta)) || 609 if ((SQLITE_OK != sqlite3_bind_int (plugin->updPrio, 1, delta)) ||
598 (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value_us)) 610 (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value_us))
599 || (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 3, uid))) 611 || (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 3, uid)))
600 { 612 {
601 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 613 LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
602 "sqlite3_bind_XXXX"); 614 "sqlite3_bind_XXXX");
603 if (SQLITE_OK != sqlite3_reset (plugin->updPrio)) 615 if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
604 LOG_SQLITE (plugin, NULL, 616 LOG_SQLITE (plugin, NULL,
605 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 617 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
606 "sqlite3_reset"); 618 "sqlite3_reset");
607 return GNUNET_SYSERR; 619 cont (cont_cls, GNUNET_SYSERR, msg);
608 620 GNUNET_free_non_null(msg);
621 return;
609 } 622 }
610 n = sqlite3_step (plugin->updPrio); 623 n = sqlite3_step (plugin->updPrio);
611 if (SQLITE_OK != sqlite3_reset (plugin->updPrio)) 624 if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
@@ -615,15 +628,21 @@ sqlite_plugin_update (void *cls,
615 { 628 {
616 case SQLITE_DONE: 629 case SQLITE_DONE:
617 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n"); 630 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n");
618 return GNUNET_OK; 631 cont (cont_cls, GNUNET_OK, NULL);
632 return;
619 case SQLITE_BUSY: 633 case SQLITE_BUSY:
620 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, 634 LOG_SQLITE (plugin, &msg,
635 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
621 "sqlite3_step"); 636 "sqlite3_step");
622 return GNUNET_NO; 637 cont (cont_cls, GNUNET_NO, msg);
638 GNUNET_free_non_null(msg);
639 return;
623 default: 640 default:
624 LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 641 LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
625 "sqlite3_step"); 642 "sqlite3_step");
626 return GNUNET_SYSERR; 643 cont (cont_cls, GNUNET_SYSERR, msg);
644 GNUNET_free_non_null(msg);
645 return;
627 } 646 }
628} 647}
629 648
@@ -1098,6 +1117,7 @@ sqlite_plugin_get_keys (void *cls,
1098 { 1117 {
1099 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1118 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1100 "sqlite_prepare"); 1119 "sqlite_prepare");
1120 proc (proc_cls, NULL, 0);
1101 return; 1121 return;
1102 } 1122 }
1103 while (SQLITE_ROW == (ret = sqlite3_step (stmt))) 1123 while (SQLITE_ROW == (ret = sqlite3_step (stmt)))
@@ -1111,6 +1131,7 @@ sqlite_plugin_get_keys (void *cls,
1111 if (SQLITE_DONE != ret) 1131 if (SQLITE_DONE != ret)
1112 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_step"); 1132 LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_step");
1113 sqlite3_finalize (stmt); 1133 sqlite3_finalize (stmt);
1134 proc (proc_cls, NULL, 0);
1114} 1135}
1115 1136
1116 1137
diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c
index e9d10a25c..4b8c9c016 100644
--- a/src/datastore/plugin_datastore_template.c
+++ b/src/datastore/plugin_datastore_template.c
@@ -69,19 +69,19 @@ template_plugin_estimate_size (void *cls, unsigned long long *estimate)
69 * @param anonymity anonymity-level for the content 69 * @param anonymity anonymity-level for the content
70 * @param replication replication-level for the content 70 * @param replication replication-level for the content
71 * @param expiration expiration time for the content 71 * @param expiration expiration time for the content
72 * @param msg set to error message 72 * @param cont continuation called with success or failure status
73 * @return GNUNET_OK on success 73 * @param cont_cls continuation closure
74 */ 74 */
75static int 75static void
76template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 76template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
77 const void *data, enum GNUNET_BLOCK_Type type, 77 const void *data, enum GNUNET_BLOCK_Type type,
78 uint32_t priority, uint32_t anonymity, 78 uint32_t priority, uint32_t anonymity,
79 uint32_t replication, 79 uint32_t replication,
80 struct GNUNET_TIME_Absolute expiration, char **msg) 80 struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
81 void *cont_cls)
81{ 82{
82 GNUNET_break (0); 83 GNUNET_break (0);
83 *msg = GNUNET_strdup ("not implemented"); 84 cont (cont_cls, key, size, GNUNET_SYSERR, "not implemented");
84 return GNUNET_SYSERR;
85} 85}
86 86
87 87
@@ -170,16 +170,16 @@ template_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
170 * @param expire new expiration time should be the 170 * @param expire new expiration time should be the
171 * MAX of any existing expiration time and 171 * MAX of any existing expiration time and
172 * this value 172 * this value
173 * @param msg set to error message 173 * @param cont continuation called with success or failure status
174 * @return GNUNET_OK on success 174 * @param cons_cls continuation closure
175 */ 175 */
176static int 176static void
177template_plugin_update (void *cls, uint64_t uid, int delta, 177template_plugin_update (void *cls, uint64_t uid, int delta,
178 struct GNUNET_TIME_Absolute expire, char **msg) 178 struct GNUNET_TIME_Absolute expire,
179 PluginUpdateCont cont, void *cont_cls)
179{ 180{
180 GNUNET_break (0); 181 GNUNET_break (0);
181 *msg = GNUNET_strdup ("not implemented"); 182 cont (cont_cls, GNUNET_SYSERR, "not implemented");
182 return GNUNET_SYSERR;
183} 183}
184 184
185 185
@@ -226,6 +226,7 @@ template_get_keys (void *cls,
226 PluginKeyProcessor proc, 226 PluginKeyProcessor proc,
227 void *proc_cls) 227 void *proc_cls)
228{ 228{
229 proc (proc_cls, NULL, 0);
229} 230}
230 231
231 232
diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c
index ee582a97e..64e88b819 100644
--- a/src/datastore/test_plugin_datastore.c
+++ b/src/datastore/test_plugin_datastore.c
@@ -84,6 +84,35 @@ disk_utilization_change_cb (void *cls, int delta)
84 84
85 85
86static void 86static void
87test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
88
89
90static void
91put_continuation (void *cls, const struct GNUNET_HashCode *key,
92 uint32_t size, int status, char *msg)
93{
94 struct CpsRunContext *crc = cls;
95 static unsigned long long os;
96 unsigned long long cs;
97
98 if (GNUNET_OK != status)
99 {
100 FPRINTF (stderr, "ERROR: `%s'\n", msg);
101 }
102 else
103 {
104 crc->api->estimate_size (crc->api->cls, &cs);
105 GNUNET_assert (os <= cs);
106 os = cs;
107 stored_bytes += size;
108 stored_ops++;
109 stored_entries++;
110 }
111 GNUNET_SCHEDULER_add_now (&test, crc);
112}
113
114
115static void
87gen_key (int i, struct GNUNET_HashCode * key) 116gen_key (int i, struct GNUNET_HashCode * key)
88{ 117{
89 memset (key, 0, sizeof (struct GNUNET_HashCode)); 118 memset (key, 0, sizeof (struct GNUNET_HashCode));
@@ -93,14 +122,21 @@ gen_key (int i, struct GNUNET_HashCode * key)
93 122
94 123
95static void 124static void
96put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k) 125do_put (struct CpsRunContext *crc)
97{ 126{
98 char value[65536]; 127 char value[65536];
99 size_t size; 128 size_t size;
100 struct GNUNET_HashCode key; 129 struct GNUNET_HashCode key;
101 char *msg;
102 unsigned int prio; 130 unsigned int prio;
131 static int i;
103 132
133 if (PUT_10 == i)
134 {
135 i = 0;
136 crc->phase++;
137 GNUNET_SCHEDULER_add_now (&test, crc);
138 return;
139 }
104 /* most content is 32k */ 140 /* most content is 32k */
105 size = 32 * 1024; 141 size = 32 * 1024;
106 142
@@ -113,36 +149,25 @@ put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
113 memset (value, i, size); 149 memset (value, i, size);
114 if (i > 255) 150 if (i > 255)
115 memset (value, i - 255, size / 2); 151 memset (value, i - 255, size / 2);
116 value[0] = k; 152 value[0] = crc->i;
117 msg = NULL;
118 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); 153 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
120 "putting type %u, anon %u under key %s\n", i + 1, i, 155 "putting type %u, anon %u under key %s\n", i + 1, i,
121 GNUNET_h2s (&key)); 156 GNUNET_h2s (&key));
122 if (GNUNET_OK != api->put (api->cls, &key, size, value, i + 1 /* type */ , 157 crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ ,
123 prio, i /* anonymity */ , 158 prio, i /* anonymity */ ,
124 0 /* replication */ , 159 0 /* replication */ ,
125 GNUNET_TIME_relative_to_absolute 160 GNUNET_TIME_relative_to_absolute
126 (GNUNET_TIME_relative_multiply 161 (GNUNET_TIME_relative_multiply
127 (GNUNET_TIME_UNIT_MILLISECONDS, 162 (GNUNET_TIME_UNIT_MILLISECONDS,
128 60 * 60 * 60 * 1000 + 163 60 * 60 * 60 * 1000 +
129 GNUNET_CRYPTO_random_u32 164 GNUNET_CRYPTO_random_u32
130 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg)) 165 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
131 { 166 put_continuation, crc);
132 FPRINTF (stderr, "ERROR: `%s'\n", msg); 167 i++;
133 GNUNET_free_non_null (msg);
134 return;
135 }
136 stored_bytes += size;
137 stored_ops++;
138 stored_entries++;
139} 168}
140 169
141 170
142static void
143test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
144
145
146static uint64_t guid; 171static uint64_t guid;
147 172
148 173
@@ -213,12 +238,20 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
213 238
214 239
215static void 240static void
241update_continuation (void *cls, int status, char *msg)
242{
243 struct CpsRunContext *crc = cls;
244
245 GNUNET_assert (GNUNET_OK == status);
246 crc->phase++;
247 GNUNET_SCHEDULER_add_now (&test, crc);
248}
249
250
251static void
216test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 252test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
217{ 253{
218 struct CpsRunContext *crc = cls; 254 struct CpsRunContext *crc = cls;
219 int j;
220 unsigned long long os;
221 unsigned long long cs;
222 struct GNUNET_HashCode key; 255 struct GNUNET_HashCode key;
223 256
224 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 257 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -237,16 +270,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
237 GNUNET_SCHEDULER_add_now (&cleaning_task, crc); 270 GNUNET_SCHEDULER_add_now (&cleaning_task, crc);
238 break; 271 break;
239 case RP_PUT: 272 case RP_PUT:
240 os = 0; 273 do_put (crc);
241 for (j = 0; j < PUT_10; j++)
242 {
243 put_value (crc->api, j, crc->i);
244 crc->api->estimate_size (crc->api->cls, &cs);
245 GNUNET_assert (os <= cs);
246 os = cs;
247 }
248 crc->phase++;
249 GNUNET_SCHEDULER_add_now (&test, crc);
250 break; 274 break;
251 case RP_GET: 275 case RP_GET:
252 if (crc->cnt == 1) 276 if (crc->cnt == 1)
@@ -261,11 +285,8 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
261 GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc); 285 GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc);
262 break; 286 break;
263 case RP_UPDATE: 287 case RP_UPDATE:
264 GNUNET_assert (GNUNET_OK == 288 crc->api->update (crc->api->cls, guid, 1, GNUNET_TIME_UNIT_ZERO_ABS,
265 crc->api->update (crc->api->cls, guid, 1, 289 update_continuation, crc);
266 GNUNET_TIME_UNIT_ZERO_ABS, NULL));
267 crc->phase++;
268 GNUNET_SCHEDULER_add_now (&test, crc);
269 break; 290 break;
270 291
271 case RP_ITER_ZERO: 292 case RP_ITER_ZERO: