diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 78 |
1 files changed, 59 insertions, 19 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 916e6acae..26e1e501d 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -33,6 +33,8 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) |
35 | 35 | ||
36 | #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES | ||
37 | |||
36 | /** | 38 | /** |
37 | * Collect an instane number of statistics? May cause excessive IPC. | 39 | * Collect an instane number of statistics? May cause excessive IPC. |
38 | */ | 40 | */ |
@@ -138,6 +140,12 @@ struct GNUNET_DATASTORE_QueueEntry | |||
138 | struct GNUNET_MQ_Envelope *env; | 140 | struct GNUNET_MQ_Envelope *env; |
139 | 141 | ||
140 | /** | 142 | /** |
143 | * Task we run if this entry stalls the queue and we | ||
144 | * need to warn the user. | ||
145 | */ | ||
146 | struct GNUNET_SCHEDULER_Task *delay_warn_task; | ||
147 | |||
148 | /** | ||
141 | * Priority in the queue. | 149 | * Priority in the queue. |
142 | */ | 150 | */ |
143 | unsigned int priority; | 151 | unsigned int priority; |
@@ -269,11 +277,36 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
269 | h->queue_size--; | 277 | h->queue_size--; |
270 | if (NULL != qe->env) | 278 | if (NULL != qe->env) |
271 | GNUNET_MQ_discard (qe->env); | 279 | GNUNET_MQ_discard (qe->env); |
280 | if (NULL != qe->delay_warn_task) | ||
281 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
272 | GNUNET_free (qe); | 282 | GNUNET_free (qe); |
273 | } | 283 | } |
274 | 284 | ||
275 | 285 | ||
276 | /** | 286 | /** |
287 | * Task that logs an error after some time. | ||
288 | * | ||
289 | * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is | ||
290 | */ | ||
291 | static void | ||
292 | delay_warning (void *cls) | ||
293 | { | ||
294 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | ||
295 | |||
296 | qe->delay_warn_task = NULL; | ||
297 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
298 | "Request %p of type %u at head of datastore queue for more than %s\n", | ||
299 | qe, | ||
300 | (unsigned int) qe->response_type, | ||
301 | GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT, | ||
302 | GNUNET_YES)); | ||
303 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
304 | &delay_warning, | ||
305 | qe); | ||
306 | } | ||
307 | |||
308 | |||
309 | /** | ||
277 | * Handle error in sending drop request to datastore. | 310 | * Handle error in sending drop request to datastore. |
278 | * | 311 | * |
279 | * @param cls closure with the datastore handle | 312 | * @param cls closure with the datastore handle |
@@ -290,8 +323,14 @@ mq_error_handler (void *cls, | |||
290 | "MQ error, reconnecting to DATASTORE\n"); | 323 | "MQ error, reconnecting to DATASTORE\n"); |
291 | do_disconnect (h); | 324 | do_disconnect (h); |
292 | qe = h->queue_head; | 325 | qe = h->queue_head; |
293 | if ( (NULL != qe) && | 326 | if (NULL == qe) |
294 | (NULL == qe->env) ) | 327 | return; |
328 | if (NULL != qe->delay_warn_task) | ||
329 | { | ||
330 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
331 | qe->delay_warn_task = NULL; | ||
332 | } | ||
333 | if (NULL == qe->env) | ||
295 | { | 334 | { |
296 | union QueueContext qc = qe->qc; | 335 | union QueueContext qc = qe->qc; |
297 | uint16_t rt = qe->response_type; | 336 | uint16_t rt = qe->response_type; |
@@ -594,6 +633,10 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) | |||
594 | "Not connected\n"); | 633 | "Not connected\n"); |
595 | return; | 634 | return; |
596 | } | 635 | } |
636 | GNUNET_assert (NULL == qe->delay_warn_task); | ||
637 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
638 | &delay_warning, | ||
639 | qe); | ||
597 | GNUNET_MQ_send (h->mq, | 640 | GNUNET_MQ_send (h->mq, |
598 | qe->env); | 641 | qe->env); |
599 | qe->env = NULL; | 642 | qe->env = NULL; |
@@ -958,7 +1001,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
958 | struct DataMessage *dm; | 1001 | struct DataMessage *dm; |
959 | union QueueContext qc; | 1002 | union QueueContext qc; |
960 | 1003 | ||
961 | if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 1004 | if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE) |
962 | { | 1005 | { |
963 | GNUNET_break (0); | 1006 | GNUNET_break (0); |
964 | return NULL; | 1007 | return NULL; |
@@ -1169,7 +1212,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1169 | struct GNUNET_MQ_Envelope *env; | 1212 | struct GNUNET_MQ_Envelope *env; |
1170 | union QueueContext qc; | 1213 | union QueueContext qc; |
1171 | 1214 | ||
1172 | if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 1215 | if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) |
1173 | { | 1216 | { |
1174 | GNUNET_break (0); | 1217 | GNUNET_break (0); |
1175 | return NULL; | 1218 | return NULL; |
@@ -1282,10 +1325,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1282 | * Get a single zero-anonymity value from the datastore. | 1325 | * Get a single zero-anonymity value from the datastore. |
1283 | * | 1326 | * |
1284 | * @param h handle to the datastore | 1327 | * @param h handle to the datastore |
1285 | * @param offset offset of the result (modulo num-results); set to | 1328 | * @param next_uid return the result with lowest uid >= next_uid |
1286 | * a random 64-bit value initially; then increment by | ||
1287 | * one each time; detect that all results have been found by uid | ||
1288 | * being again the first uid ever returned. | ||
1289 | * @param queue_priority ranking of this request in the priority queue | 1329 | * @param queue_priority ranking of this request in the priority queue |
1290 | * @param max_queue_size at what queue size should this request be dropped | 1330 | * @param max_queue_size at what queue size should this request be dropped |
1291 | * (if other requests of higher priority are in the queue) | 1331 | * (if other requests of higher priority are in the queue) |
@@ -1299,7 +1339,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1299 | */ | 1339 | */ |
1300 | struct GNUNET_DATASTORE_QueueEntry * | 1340 | struct GNUNET_DATASTORE_QueueEntry * |
1301 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | 1341 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, |
1302 | uint64_t offset, | 1342 | uint64_t next_uid, |
1303 | unsigned int queue_priority, | 1343 | unsigned int queue_priority, |
1304 | unsigned int max_queue_size, | 1344 | unsigned int max_queue_size, |
1305 | enum GNUNET_BLOCK_Type type, | 1345 | enum GNUNET_BLOCK_Type type, |
@@ -1314,13 +1354,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1314 | GNUNET_assert (NULL != proc); | 1354 | GNUNET_assert (NULL != proc); |
1315 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | 1355 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1316 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1356 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1317 | "Asked to get %llu-th zero-anonymity entry of type %d\n", | 1357 | "Asked to get a zero-anonymity entry of type %d\n", |
1318 | (unsigned long long) offset, | ||
1319 | type); | 1358 | type); |
1320 | env = GNUNET_MQ_msg (m, | 1359 | env = GNUNET_MQ_msg (m, |
1321 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | 1360 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1322 | m->type = htonl ((uint32_t) type); | 1361 | m->type = htonl ((uint32_t) type); |
1323 | m->offset = GNUNET_htonll (offset); | 1362 | m->next_uid = GNUNET_htonll (next_uid); |
1324 | qc.rc.proc = proc; | 1363 | qc.rc.proc = proc; |
1325 | qc.rc.proc_cls = proc_cls; | 1364 | qc.rc.proc_cls = proc_cls; |
1326 | qe = make_queue_entry (h, | 1365 | qe = make_queue_entry (h, |
@@ -1349,10 +1388,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1349 | * will only be called once. | 1388 | * will only be called once. |
1350 | * | 1389 | * |
1351 | * @param h handle to the datastore | 1390 | * @param h handle to the datastore |
1352 | * @param offset offset of the result (modulo num-results); set to | 1391 | * @param next_uid return the result with lowest uid >= next_uid |
1353 | * a random 64-bit value initially; then increment by | 1392 | * @param random if true, return a random result instead of using next_uid |
1354 | * one each time; detect that all results have been found by uid | ||
1355 | * being again the first uid ever returned. | ||
1356 | * @param key maybe NULL (to match all entries) | 1393 | * @param key maybe NULL (to match all entries) |
1357 | * @param type desired type, 0 for any | 1394 | * @param type desired type, 0 for any |
1358 | * @param queue_priority ranking of this request in the priority queue | 1395 | * @param queue_priority ranking of this request in the priority queue |
@@ -1366,7 +1403,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1366 | */ | 1403 | */ |
1367 | struct GNUNET_DATASTORE_QueueEntry * | 1404 | struct GNUNET_DATASTORE_QueueEntry * |
1368 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | 1405 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, |
1369 | uint64_t offset, | 1406 | uint64_t next_uid, |
1407 | bool random, | ||
1370 | const struct GNUNET_HashCode *key, | 1408 | const struct GNUNET_HashCode *key, |
1371 | enum GNUNET_BLOCK_Type type, | 1409 | enum GNUNET_BLOCK_Type type, |
1372 | unsigned int queue_priority, | 1410 | unsigned int queue_priority, |
@@ -1390,14 +1428,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1390 | env = GNUNET_MQ_msg (gm, | 1428 | env = GNUNET_MQ_msg (gm, |
1391 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); | 1429 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); |
1392 | gm->type = htonl (type); | 1430 | gm->type = htonl (type); |
1393 | gm->offset = GNUNET_htonll (offset); | 1431 | gm->next_uid = GNUNET_htonll (next_uid); |
1432 | gm->random = random; | ||
1394 | } | 1433 | } |
1395 | else | 1434 | else |
1396 | { | 1435 | { |
1397 | env = GNUNET_MQ_msg (gkm, | 1436 | env = GNUNET_MQ_msg (gkm, |
1398 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); | 1437 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); |
1399 | gkm->type = htonl (type); | 1438 | gkm->type = htonl (type); |
1400 | gkm->offset = GNUNET_htonll (offset); | 1439 | gkm->next_uid = GNUNET_htonll (next_uid); |
1440 | gkm->random = random; | ||
1401 | gkm->key = *key; | 1441 | gkm->key = *key; |
1402 | } | 1442 | } |
1403 | qc.rc.proc = proc; | 1443 | qc.rc.proc = proc; |