aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c179
1 files changed, 82 insertions, 97 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index db485364e..31f7a997f 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -33,6 +33,8 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35 35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
36/** 38/**
37 * Collect an instane number of statistics? May cause excessive IPC. 39 * Collect an instane number of statistics? May cause excessive IPC.
38 */ 40 */
@@ -138,6 +140,12 @@ struct GNUNET_DATASTORE_QueueEntry
138 struct GNUNET_MQ_Envelope *env; 140 struct GNUNET_MQ_Envelope *env;
139 141
140 /** 142 /**
143 * Task we run if this entry stalls the queue and we
144 * need to warn the user.
145 */
146 struct GNUNET_SCHEDULER_Task *delay_warn_task;
147
148 /**
141 * Priority in the queue. 149 * Priority in the queue.
142 */ 150 */
143 unsigned int priority; 151 unsigned int priority;
@@ -269,11 +277,36 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
269 h->queue_size--; 277 h->queue_size--;
270 if (NULL != qe->env) 278 if (NULL != qe->env)
271 GNUNET_MQ_discard (qe->env); 279 GNUNET_MQ_discard (qe->env);
280 if (NULL != qe->delay_warn_task)
281 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
272 GNUNET_free (qe); 282 GNUNET_free (qe);
273} 283}
274 284
275 285
276/** 286/**
287 * Task that logs an error after some time.
288 *
289 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
290 */
291static void
292delay_warning (void *cls)
293{
294 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
295
296 qe->delay_warn_task = NULL;
297 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
298 "Request %p of type %u at head of datastore queue for more than %s\n",
299 qe,
300 (unsigned int) qe->response_type,
301 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
302 GNUNET_YES));
303 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
304 &delay_warning,
305 qe);
306}
307
308
309/**
277 * Handle error in sending drop request to datastore. 310 * Handle error in sending drop request to datastore.
278 * 311 *
279 * @param cls closure with the datastore handle 312 * @param cls closure with the datastore handle
@@ -290,8 +323,14 @@ mq_error_handler (void *cls,
290 "MQ error, reconnecting to DATASTORE\n"); 323 "MQ error, reconnecting to DATASTORE\n");
291 do_disconnect (h); 324 do_disconnect (h);
292 qe = h->queue_head; 325 qe = h->queue_head;
293 if ( (NULL != qe) && 326 if (NULL == qe)
294 (NULL == qe->env) ) 327 return;
328 if (NULL != qe->delay_warn_task)
329 {
330 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
331 qe->delay_warn_task = NULL;
332 }
333 if (NULL == qe->env)
295 { 334 {
296 union QueueContext qc = qe->qc; 335 union QueueContext qc = qe->qc;
297 uint16_t rt = qe->response_type; 336 uint16_t rt = qe->response_type;
@@ -313,7 +352,11 @@ mq_error_handler (void *cls,
313 qc.rc.proc (qc.rc.proc_cls, 352 qc.rc.proc (qc.rc.proc_cls,
314 NULL, 353 NULL,
315 0, 354 0,
316 NULL, 0, 0, 0, 355 NULL,
356 0,
357 0,
358 0,
359 0,
317 GNUNET_TIME_UNIT_ZERO_ABS, 360 GNUNET_TIME_UNIT_ZERO_ABS,
318 0); 361 0);
319 break; 362 break;
@@ -429,7 +472,11 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
429 qe->qc.rc.proc (qe->qc.rc.proc_cls, 472 qe->qc.rc.proc (qe->qc.rc.proc_cls,
430 NULL, 473 NULL,
431 0, 474 0,
432 NULL, 0, 0, 0, 475 NULL,
476 0,
477 0,
478 0,
479 0,
433 GNUNET_TIME_UNIT_ZERO_ABS, 480 GNUNET_TIME_UNIT_ZERO_ABS,
434 0); 481 0);
435 break; 482 break;
@@ -498,8 +545,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
498 struct GNUNET_DATASTORE_QueueEntry *pos; 545 struct GNUNET_DATASTORE_QueueEntry *pos;
499 unsigned int c; 546 unsigned int c;
500 547
501 c = 0; 548 if ( (NULL != h->queue_tail) &&
502 pos = h->queue_head; 549 (h->queue_tail->priority >= queue_priority) )
550 {
551 c = h->queue_size;
552 pos = NULL;
553 }
554 else
555 {
556 c = 0;
557 pos = h->queue_head;
558 }
503 while ( (NULL != pos) && 559 while ( (NULL != pos) &&
504 (c < max_queue_size) && 560 (c < max_queue_size) &&
505 (pos->priority >= queue_priority) ) 561 (pos->priority >= queue_priority) )
@@ -585,6 +641,10 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
585 "Not connected\n"); 641 "Not connected\n");
586 return; 642 return;
587 } 643 }
644 GNUNET_assert (NULL == qe->delay_warn_task);
645 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
646 &delay_warning,
647 qe);
588 GNUNET_MQ_send (h->mq, 648 GNUNET_MQ_send (h->mq,
589 qe->env); 649 qe->env);
590 qe->env = NULL; 650 qe->env = NULL;
@@ -773,6 +833,7 @@ handle_data (void *cls,
773 ntohl (dm->type), 833 ntohl (dm->type),
774 ntohl (dm->priority), 834 ntohl (dm->priority),
775 ntohl (dm->anonymity), 835 ntohl (dm->anonymity),
836 ntohl (dm->replication),
776 GNUNET_TIME_absolute_ntoh (dm->expiration), 837 GNUNET_TIME_absolute_ntoh (dm->expiration),
777 GNUNET_ntohll (dm->uid)); 838 GNUNET_ntohll (dm->uid));
778} 839}
@@ -835,6 +896,7 @@ handle_data_end (void *cls,
835 0, 896 0,
836 0, 897 0,
837 0, 898 0,
899 0,
838 GNUNET_TIME_UNIT_ZERO_ABS, 900 GNUNET_TIME_UNIT_ZERO_ABS,
839 0); 901 0);
840} 902}
@@ -949,7 +1011,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
949 struct DataMessage *dm; 1011 struct DataMessage *dm;
950 union QueueContext qc; 1012 union QueueContext qc;
951 1013
952 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 1014 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
953 { 1015 {
954 GNUNET_break (0); 1016 GNUNET_break (0);
955 return NULL; 1017 return NULL;
@@ -970,8 +1032,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
970 dm->priority = htonl (priority); 1032 dm->priority = htonl (priority);
971 dm->anonymity = htonl (anonymity); 1033 dm->anonymity = htonl (anonymity);
972 dm->replication = htonl (replication); 1034 dm->replication = htonl (replication);
973 dm->reserved = htonl (0);
974 dm->uid = GNUNET_htonll (0);
975 dm->expiration = GNUNET_TIME_absolute_hton (expiration); 1035 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
976 dm->key = *key; 1036 dm->key = *key;
977 GNUNET_memcpy (&dm[1], 1037 GNUNET_memcpy (&dm[1],
@@ -1126,72 +1186,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1126 1186
1127 1187
1128/** 1188/**
1129 * Update a value in the datastore.
1130 *
1131 * @param h handle to the datastore
1132 * @param uid identifier for the value
1133 * @param priority how much to increase the priority of the value
1134 * @param expiration new expiration value should be MAX of existing and this argument
1135 * @param queue_priority ranking of this request in the priority queue
1136 * @param max_queue_size at what queue size should this request be dropped
1137 * (if other requests of higher priority are in the queue)
1138 * @param cont continuation to call when done
1139 * @param cont_cls closure for @a cont
1140 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141 * cancel; note that even if NULL is returned, the callback will be invoked
1142 * (or rather, will already have been invoked)
1143 */
1144struct GNUNET_DATASTORE_QueueEntry *
1145GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1146 uint64_t uid,
1147 uint32_t priority,
1148 struct GNUNET_TIME_Absolute expiration,
1149 unsigned int queue_priority,
1150 unsigned int max_queue_size,
1151 GNUNET_DATASTORE_ContinuationWithStatus cont,
1152 void *cont_cls)
1153{
1154 struct GNUNET_DATASTORE_QueueEntry *qe;
1155 struct GNUNET_MQ_Envelope *env;
1156 struct UpdateMessage *um;
1157 union QueueContext qc;
1158
1159 if (NULL == cont)
1160 cont = &drop_status_cont;
1161 LOG (GNUNET_ERROR_TYPE_DEBUG,
1162 "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1163 uid,
1164 (unsigned int) priority,
1165 GNUNET_STRINGS_absolute_time_to_string (expiration));
1166 env = GNUNET_MQ_msg (um,
1167 GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1168 um->priority = htonl (priority);
1169 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1170 um->uid = GNUNET_htonll (uid);
1171
1172 qc.sc.cont = cont;
1173 qc.sc.cont_cls = cont_cls;
1174 qe = make_queue_entry (h,
1175 env,
1176 queue_priority,
1177 max_queue_size,
1178 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1179 &qc);
1180 if (NULL == qe)
1181 {
1182 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "Could not create queue entry for UPDATE\n");
1184 return NULL;
1185 }
1186 GNUNET_STATISTICS_update (h->stats,
1187 gettext_noop ("# UPDATE requests executed"), 1,
1188 GNUNET_NO);
1189 process_queue (h);
1190 return qe;
1191}
1192
1193
1194/**
1195 * Explicitly remove some content from the database. 1189 * Explicitly remove some content from the database.
1196 * The @a cont continuation will be called with `status` 1190 * The @a cont continuation will be called with `status`
1197 * #GNUNET_OK" if content was removed, #GNUNET_NO 1191 * #GNUNET_OK" if content was removed, #GNUNET_NO
@@ -1226,7 +1220,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1226 struct GNUNET_MQ_Envelope *env; 1220 struct GNUNET_MQ_Envelope *env;
1227 union QueueContext qc; 1221 union QueueContext qc;
1228 1222
1229 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 1223 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1230 { 1224 {
1231 GNUNET_break (0); 1225 GNUNET_break (0);
1232 return NULL; 1226 return NULL;
@@ -1240,13 +1234,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1240 env = GNUNET_MQ_msg_extra (dm, 1234 env = GNUNET_MQ_msg_extra (dm,
1241 size, 1235 size,
1242 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); 1236 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1243 dm->rid = htonl (0);
1244 dm->size = htonl (size); 1237 dm->size = htonl (size);
1245 dm->type = htonl (0);
1246 dm->priority = htonl (0);
1247 dm->anonymity = htonl (0);
1248 dm->uid = GNUNET_htonll (0);
1249 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1250 dm->key = *key; 1238 dm->key = *key;
1251 GNUNET_memcpy (&dm[1], 1239 GNUNET_memcpy (&dm[1],
1252 data, 1240 data,
@@ -1339,10 +1327,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1339 * Get a single zero-anonymity value from the datastore. 1327 * Get a single zero-anonymity value from the datastore.
1340 * 1328 *
1341 * @param h handle to the datastore 1329 * @param h handle to the datastore
1342 * @param offset offset of the result (modulo num-results); set to 1330 * @param next_uid return the result with lowest uid >= next_uid
1343 * a random 64-bit value initially; then increment by
1344 * one each time; detect that all results have been found by uid
1345 * being again the first uid ever returned.
1346 * @param queue_priority ranking of this request in the priority queue 1331 * @param queue_priority ranking of this request in the priority queue
1347 * @param max_queue_size at what queue size should this request be dropped 1332 * @param max_queue_size at what queue size should this request be dropped
1348 * (if other requests of higher priority are in the queue) 1333 * (if other requests of higher priority are in the queue)
@@ -1356,7 +1341,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1356 */ 1341 */
1357struct GNUNET_DATASTORE_QueueEntry * 1342struct GNUNET_DATASTORE_QueueEntry *
1358GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1343GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1359 uint64_t offset, 1344 uint64_t next_uid,
1360 unsigned int queue_priority, 1345 unsigned int queue_priority,
1361 unsigned int max_queue_size, 1346 unsigned int max_queue_size,
1362 enum GNUNET_BLOCK_Type type, 1347 enum GNUNET_BLOCK_Type type,
@@ -1371,13 +1356,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1371 GNUNET_assert (NULL != proc); 1356 GNUNET_assert (NULL != proc);
1372 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1357 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1373 LOG (GNUNET_ERROR_TYPE_DEBUG, 1358 LOG (GNUNET_ERROR_TYPE_DEBUG,
1374 "Asked to get %llu-th zero-anonymity entry of type %d\n", 1359 "Asked to get a zero-anonymity entry of type %d\n",
1375 (unsigned long long) offset,
1376 type); 1360 type);
1377 env = GNUNET_MQ_msg (m, 1361 env = GNUNET_MQ_msg (m,
1378 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1362 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1379 m->type = htonl ((uint32_t) type); 1363 m->type = htonl ((uint32_t) type);
1380 m->offset = GNUNET_htonll (offset); 1364 m->next_uid = GNUNET_htonll (next_uid);
1381 qc.rc.proc = proc; 1365 qc.rc.proc = proc;
1382 qc.rc.proc_cls = proc_cls; 1366 qc.rc.proc_cls = proc_cls;
1383 qe = make_queue_entry (h, 1367 qe = make_queue_entry (h,
@@ -1406,10 +1390,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1406 * will only be called once. 1390 * will only be called once.
1407 * 1391 *
1408 * @param h handle to the datastore 1392 * @param h handle to the datastore
1409 * @param offset offset of the result (modulo num-results); set to 1393 * @param next_uid return the result with lowest uid >= next_uid
1410 * a random 64-bit value initially; then increment by 1394 * @param random if true, return a random result instead of using next_uid
1411 * one each time; detect that all results have been found by uid
1412 * being again the first uid ever returned.
1413 * @param key maybe NULL (to match all entries) 1395 * @param key maybe NULL (to match all entries)
1414 * @param type desired type, 0 for any 1396 * @param type desired type, 0 for any
1415 * @param queue_priority ranking of this request in the priority queue 1397 * @param queue_priority ranking of this request in the priority queue
@@ -1423,7 +1405,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1423 */ 1405 */
1424struct GNUNET_DATASTORE_QueueEntry * 1406struct GNUNET_DATASTORE_QueueEntry *
1425GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1407GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1426 uint64_t offset, 1408 uint64_t next_uid,
1409 bool random,
1427 const struct GNUNET_HashCode *key, 1410 const struct GNUNET_HashCode *key,
1428 enum GNUNET_BLOCK_Type type, 1411 enum GNUNET_BLOCK_Type type,
1429 unsigned int queue_priority, 1412 unsigned int queue_priority,
@@ -1447,14 +1430,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1447 env = GNUNET_MQ_msg (gm, 1430 env = GNUNET_MQ_msg (gm,
1448 GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1431 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1449 gm->type = htonl (type); 1432 gm->type = htonl (type);
1450 gm->offset = GNUNET_htonll (offset); 1433 gm->next_uid = GNUNET_htonll (next_uid);
1434 gm->random = random;
1451 } 1435 }
1452 else 1436 else
1453 { 1437 {
1454 env = GNUNET_MQ_msg (gkm, 1438 env = GNUNET_MQ_msg (gkm,
1455 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); 1439 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1456 gkm->type = htonl (type); 1440 gkm->type = htonl (type);
1457 gkm->offset = GNUNET_htonll (offset); 1441 gkm->next_uid = GNUNET_htonll (next_uid);
1442 gkm->random = random;
1458 gkm->key = *key; 1443 gkm->key = *key;
1459 } 1444 }
1460 qc.rc.proc = proc; 1445 qc.rc.proc = proc;