aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c78
1 files changed, 59 insertions, 19 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 916e6acae..26e1e501d 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -33,6 +33,8 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35 35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
36/** 38/**
37 * Collect an instane number of statistics? May cause excessive IPC. 39 * Collect an instane number of statistics? May cause excessive IPC.
38 */ 40 */
@@ -138,6 +140,12 @@ struct GNUNET_DATASTORE_QueueEntry
138 struct GNUNET_MQ_Envelope *env; 140 struct GNUNET_MQ_Envelope *env;
139 141
140 /** 142 /**
143 * Task we run if this entry stalls the queue and we
144 * need to warn the user.
145 */
146 struct GNUNET_SCHEDULER_Task *delay_warn_task;
147
148 /**
141 * Priority in the queue. 149 * Priority in the queue.
142 */ 150 */
143 unsigned int priority; 151 unsigned int priority;
@@ -269,11 +277,36 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
269 h->queue_size--; 277 h->queue_size--;
270 if (NULL != qe->env) 278 if (NULL != qe->env)
271 GNUNET_MQ_discard (qe->env); 279 GNUNET_MQ_discard (qe->env);
280 if (NULL != qe->delay_warn_task)
281 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
272 GNUNET_free (qe); 282 GNUNET_free (qe);
273} 283}
274 284
275 285
276/** 286/**
287 * Task that logs an error after some time.
288 *
289 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
290 */
291static void
292delay_warning (void *cls)
293{
294 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
295
296 qe->delay_warn_task = NULL;
297 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
298 "Request %p of type %u at head of datastore queue for more than %s\n",
299 qe,
300 (unsigned int) qe->response_type,
301 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
302 GNUNET_YES));
303 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
304 &delay_warning,
305 qe);
306}
307
308
309/**
277 * Handle error in sending drop request to datastore. 310 * Handle error in sending drop request to datastore.
278 * 311 *
279 * @param cls closure with the datastore handle 312 * @param cls closure with the datastore handle
@@ -290,8 +323,14 @@ mq_error_handler (void *cls,
290 "MQ error, reconnecting to DATASTORE\n"); 323 "MQ error, reconnecting to DATASTORE\n");
291 do_disconnect (h); 324 do_disconnect (h);
292 qe = h->queue_head; 325 qe = h->queue_head;
293 if ( (NULL != qe) && 326 if (NULL == qe)
294 (NULL == qe->env) ) 327 return;
328 if (NULL != qe->delay_warn_task)
329 {
330 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
331 qe->delay_warn_task = NULL;
332 }
333 if (NULL == qe->env)
295 { 334 {
296 union QueueContext qc = qe->qc; 335 union QueueContext qc = qe->qc;
297 uint16_t rt = qe->response_type; 336 uint16_t rt = qe->response_type;
@@ -594,6 +633,10 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
594 "Not connected\n"); 633 "Not connected\n");
595 return; 634 return;
596 } 635 }
636 GNUNET_assert (NULL == qe->delay_warn_task);
637 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
638 &delay_warning,
639 qe);
597 GNUNET_MQ_send (h->mq, 640 GNUNET_MQ_send (h->mq,
598 qe->env); 641 qe->env);
599 qe->env = NULL; 642 qe->env = NULL;
@@ -958,7 +1001,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
958 struct DataMessage *dm; 1001 struct DataMessage *dm;
959 union QueueContext qc; 1002 union QueueContext qc;
960 1003
961 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 1004 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
962 { 1005 {
963 GNUNET_break (0); 1006 GNUNET_break (0);
964 return NULL; 1007 return NULL;
@@ -1169,7 +1212,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1169 struct GNUNET_MQ_Envelope *env; 1212 struct GNUNET_MQ_Envelope *env;
1170 union QueueContext qc; 1213 union QueueContext qc;
1171 1214
1172 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 1215 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1173 { 1216 {
1174 GNUNET_break (0); 1217 GNUNET_break (0);
1175 return NULL; 1218 return NULL;
@@ -1282,10 +1325,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1282 * Get a single zero-anonymity value from the datastore. 1325 * Get a single zero-anonymity value from the datastore.
1283 * 1326 *
1284 * @param h handle to the datastore 1327 * @param h handle to the datastore
1285 * @param offset offset of the result (modulo num-results); set to 1328 * @param next_uid return the result with lowest uid >= next_uid
1286 * a random 64-bit value initially; then increment by
1287 * one each time; detect that all results have been found by uid
1288 * being again the first uid ever returned.
1289 * @param queue_priority ranking of this request in the priority queue 1329 * @param queue_priority ranking of this request in the priority queue
1290 * @param max_queue_size at what queue size should this request be dropped 1330 * @param max_queue_size at what queue size should this request be dropped
1291 * (if other requests of higher priority are in the queue) 1331 * (if other requests of higher priority are in the queue)
@@ -1299,7 +1339,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1299 */ 1339 */
1300struct GNUNET_DATASTORE_QueueEntry * 1340struct GNUNET_DATASTORE_QueueEntry *
1301GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1341GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1302 uint64_t offset, 1342 uint64_t next_uid,
1303 unsigned int queue_priority, 1343 unsigned int queue_priority,
1304 unsigned int max_queue_size, 1344 unsigned int max_queue_size,
1305 enum GNUNET_BLOCK_Type type, 1345 enum GNUNET_BLOCK_Type type,
@@ -1314,13 +1354,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1314 GNUNET_assert (NULL != proc); 1354 GNUNET_assert (NULL != proc);
1315 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1355 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1316 LOG (GNUNET_ERROR_TYPE_DEBUG, 1356 LOG (GNUNET_ERROR_TYPE_DEBUG,
1317 "Asked to get %llu-th zero-anonymity entry of type %d\n", 1357 "Asked to get a zero-anonymity entry of type %d\n",
1318 (unsigned long long) offset,
1319 type); 1358 type);
1320 env = GNUNET_MQ_msg (m, 1359 env = GNUNET_MQ_msg (m,
1321 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1360 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1322 m->type = htonl ((uint32_t) type); 1361 m->type = htonl ((uint32_t) type);
1323 m->offset = GNUNET_htonll (offset); 1362 m->next_uid = GNUNET_htonll (next_uid);
1324 qc.rc.proc = proc; 1363 qc.rc.proc = proc;
1325 qc.rc.proc_cls = proc_cls; 1364 qc.rc.proc_cls = proc_cls;
1326 qe = make_queue_entry (h, 1365 qe = make_queue_entry (h,
@@ -1349,10 +1388,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1349 * will only be called once. 1388 * will only be called once.
1350 * 1389 *
1351 * @param h handle to the datastore 1390 * @param h handle to the datastore
1352 * @param offset offset of the result (modulo num-results); set to 1391 * @param next_uid return the result with lowest uid >= next_uid
1353 * a random 64-bit value initially; then increment by 1392 * @param random if true, return a random result instead of using next_uid
1354 * one each time; detect that all results have been found by uid
1355 * being again the first uid ever returned.
1356 * @param key maybe NULL (to match all entries) 1393 * @param key maybe NULL (to match all entries)
1357 * @param type desired type, 0 for any 1394 * @param type desired type, 0 for any
1358 * @param queue_priority ranking of this request in the priority queue 1395 * @param queue_priority ranking of this request in the priority queue
@@ -1366,7 +1403,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1366 */ 1403 */
1367struct GNUNET_DATASTORE_QueueEntry * 1404struct GNUNET_DATASTORE_QueueEntry *
1368GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1405GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1369 uint64_t offset, 1406 uint64_t next_uid,
1407 bool random,
1370 const struct GNUNET_HashCode *key, 1408 const struct GNUNET_HashCode *key,
1371 enum GNUNET_BLOCK_Type type, 1409 enum GNUNET_BLOCK_Type type,
1372 unsigned int queue_priority, 1410 unsigned int queue_priority,
@@ -1390,14 +1428,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1390 env = GNUNET_MQ_msg (gm, 1428 env = GNUNET_MQ_msg (gm,
1391 GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1429 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1392 gm->type = htonl (type); 1430 gm->type = htonl (type);
1393 gm->offset = GNUNET_htonll (offset); 1431 gm->next_uid = GNUNET_htonll (next_uid);
1432 gm->random = random;
1394 } 1433 }
1395 else 1434 else
1396 { 1435 {
1397 env = GNUNET_MQ_msg (gkm, 1436 env = GNUNET_MQ_msg (gkm,
1398 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); 1437 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1399 gkm->type = htonl (type); 1438 gkm->type = htonl (type);
1400 gkm->offset = GNUNET_htonll (offset); 1439 gkm->next_uid = GNUNET_htonll (next_uid);
1440 gkm->random = random;
1401 gkm->key = *key; 1441 gkm->key = *key;
1402 } 1442 }
1403 qc.rc.proc = proc; 1443 qc.rc.proc = proc;