diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 179 |
1 files changed, 82 insertions, 97 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index db485364e..31f7a997f 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -33,6 +33,8 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) |
35 | 35 | ||
36 | #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES | ||
37 | |||
36 | /** | 38 | /** |
37 | * Collect an instane number of statistics? May cause excessive IPC. | 39 | * Collect an instane number of statistics? May cause excessive IPC. |
38 | */ | 40 | */ |
@@ -138,6 +140,12 @@ struct GNUNET_DATASTORE_QueueEntry | |||
138 | struct GNUNET_MQ_Envelope *env; | 140 | struct GNUNET_MQ_Envelope *env; |
139 | 141 | ||
140 | /** | 142 | /** |
143 | * Task we run if this entry stalls the queue and we | ||
144 | * need to warn the user. | ||
145 | */ | ||
146 | struct GNUNET_SCHEDULER_Task *delay_warn_task; | ||
147 | |||
148 | /** | ||
141 | * Priority in the queue. | 149 | * Priority in the queue. |
142 | */ | 150 | */ |
143 | unsigned int priority; | 151 | unsigned int priority; |
@@ -269,11 +277,36 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
269 | h->queue_size--; | 277 | h->queue_size--; |
270 | if (NULL != qe->env) | 278 | if (NULL != qe->env) |
271 | GNUNET_MQ_discard (qe->env); | 279 | GNUNET_MQ_discard (qe->env); |
280 | if (NULL != qe->delay_warn_task) | ||
281 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
272 | GNUNET_free (qe); | 282 | GNUNET_free (qe); |
273 | } | 283 | } |
274 | 284 | ||
275 | 285 | ||
276 | /** | 286 | /** |
287 | * Task that logs an error after some time. | ||
288 | * | ||
289 | * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is | ||
290 | */ | ||
291 | static void | ||
292 | delay_warning (void *cls) | ||
293 | { | ||
294 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | ||
295 | |||
296 | qe->delay_warn_task = NULL; | ||
297 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
298 | "Request %p of type %u at head of datastore queue for more than %s\n", | ||
299 | qe, | ||
300 | (unsigned int) qe->response_type, | ||
301 | GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT, | ||
302 | GNUNET_YES)); | ||
303 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
304 | &delay_warning, | ||
305 | qe); | ||
306 | } | ||
307 | |||
308 | |||
309 | /** | ||
277 | * Handle error in sending drop request to datastore. | 310 | * Handle error in sending drop request to datastore. |
278 | * | 311 | * |
279 | * @param cls closure with the datastore handle | 312 | * @param cls closure with the datastore handle |
@@ -290,8 +323,14 @@ mq_error_handler (void *cls, | |||
290 | "MQ error, reconnecting to DATASTORE\n"); | 323 | "MQ error, reconnecting to DATASTORE\n"); |
291 | do_disconnect (h); | 324 | do_disconnect (h); |
292 | qe = h->queue_head; | 325 | qe = h->queue_head; |
293 | if ( (NULL != qe) && | 326 | if (NULL == qe) |
294 | (NULL == qe->env) ) | 327 | return; |
328 | if (NULL != qe->delay_warn_task) | ||
329 | { | ||
330 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); | ||
331 | qe->delay_warn_task = NULL; | ||
332 | } | ||
333 | if (NULL == qe->env) | ||
295 | { | 334 | { |
296 | union QueueContext qc = qe->qc; | 335 | union QueueContext qc = qe->qc; |
297 | uint16_t rt = qe->response_type; | 336 | uint16_t rt = qe->response_type; |
@@ -313,7 +352,11 @@ mq_error_handler (void *cls, | |||
313 | qc.rc.proc (qc.rc.proc_cls, | 352 | qc.rc.proc (qc.rc.proc_cls, |
314 | NULL, | 353 | NULL, |
315 | 0, | 354 | 0, |
316 | NULL, 0, 0, 0, | 355 | NULL, |
356 | 0, | ||
357 | 0, | ||
358 | 0, | ||
359 | 0, | ||
317 | GNUNET_TIME_UNIT_ZERO_ABS, | 360 | GNUNET_TIME_UNIT_ZERO_ABS, |
318 | 0); | 361 | 0); |
319 | break; | 362 | break; |
@@ -429,7 +472,11 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
429 | qe->qc.rc.proc (qe->qc.rc.proc_cls, | 472 | qe->qc.rc.proc (qe->qc.rc.proc_cls, |
430 | NULL, | 473 | NULL, |
431 | 0, | 474 | 0, |
432 | NULL, 0, 0, 0, | 475 | NULL, |
476 | 0, | ||
477 | 0, | ||
478 | 0, | ||
479 | 0, | ||
433 | GNUNET_TIME_UNIT_ZERO_ABS, | 480 | GNUNET_TIME_UNIT_ZERO_ABS, |
434 | 0); | 481 | 0); |
435 | break; | 482 | break; |
@@ -498,8 +545,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
498 | struct GNUNET_DATASTORE_QueueEntry *pos; | 545 | struct GNUNET_DATASTORE_QueueEntry *pos; |
499 | unsigned int c; | 546 | unsigned int c; |
500 | 547 | ||
501 | c = 0; | 548 | if ( (NULL != h->queue_tail) && |
502 | pos = h->queue_head; | 549 | (h->queue_tail->priority >= queue_priority) ) |
550 | { | ||
551 | c = h->queue_size; | ||
552 | pos = NULL; | ||
553 | } | ||
554 | else | ||
555 | { | ||
556 | c = 0; | ||
557 | pos = h->queue_head; | ||
558 | } | ||
503 | while ( (NULL != pos) && | 559 | while ( (NULL != pos) && |
504 | (c < max_queue_size) && | 560 | (c < max_queue_size) && |
505 | (pos->priority >= queue_priority) ) | 561 | (pos->priority >= queue_priority) ) |
@@ -585,6 +641,10 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) | |||
585 | "Not connected\n"); | 641 | "Not connected\n"); |
586 | return; | 642 | return; |
587 | } | 643 | } |
644 | GNUNET_assert (NULL == qe->delay_warn_task); | ||
645 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, | ||
646 | &delay_warning, | ||
647 | qe); | ||
588 | GNUNET_MQ_send (h->mq, | 648 | GNUNET_MQ_send (h->mq, |
589 | qe->env); | 649 | qe->env); |
590 | qe->env = NULL; | 650 | qe->env = NULL; |
@@ -773,6 +833,7 @@ handle_data (void *cls, | |||
773 | ntohl (dm->type), | 833 | ntohl (dm->type), |
774 | ntohl (dm->priority), | 834 | ntohl (dm->priority), |
775 | ntohl (dm->anonymity), | 835 | ntohl (dm->anonymity), |
836 | ntohl (dm->replication), | ||
776 | GNUNET_TIME_absolute_ntoh (dm->expiration), | 837 | GNUNET_TIME_absolute_ntoh (dm->expiration), |
777 | GNUNET_ntohll (dm->uid)); | 838 | GNUNET_ntohll (dm->uid)); |
778 | } | 839 | } |
@@ -835,6 +896,7 @@ handle_data_end (void *cls, | |||
835 | 0, | 896 | 0, |
836 | 0, | 897 | 0, |
837 | 0, | 898 | 0, |
899 | 0, | ||
838 | GNUNET_TIME_UNIT_ZERO_ABS, | 900 | GNUNET_TIME_UNIT_ZERO_ABS, |
839 | 0); | 901 | 0); |
840 | } | 902 | } |
@@ -949,7 +1011,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
949 | struct DataMessage *dm; | 1011 | struct DataMessage *dm; |
950 | union QueueContext qc; | 1012 | union QueueContext qc; |
951 | 1013 | ||
952 | if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 1014 | if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE) |
953 | { | 1015 | { |
954 | GNUNET_break (0); | 1016 | GNUNET_break (0); |
955 | return NULL; | 1017 | return NULL; |
@@ -970,8 +1032,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
970 | dm->priority = htonl (priority); | 1032 | dm->priority = htonl (priority); |
971 | dm->anonymity = htonl (anonymity); | 1033 | dm->anonymity = htonl (anonymity); |
972 | dm->replication = htonl (replication); | 1034 | dm->replication = htonl (replication); |
973 | dm->reserved = htonl (0); | ||
974 | dm->uid = GNUNET_htonll (0); | ||
975 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); | 1035 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); |
976 | dm->key = *key; | 1036 | dm->key = *key; |
977 | GNUNET_memcpy (&dm[1], | 1037 | GNUNET_memcpy (&dm[1], |
@@ -1126,72 +1186,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1126 | 1186 | ||
1127 | 1187 | ||
1128 | /** | 1188 | /** |
1129 | * Update a value in the datastore. | ||
1130 | * | ||
1131 | * @param h handle to the datastore | ||
1132 | * @param uid identifier for the value | ||
1133 | * @param priority how much to increase the priority of the value | ||
1134 | * @param expiration new expiration value should be MAX of existing and this argument | ||
1135 | * @param queue_priority ranking of this request in the priority queue | ||
1136 | * @param max_queue_size at what queue size should this request be dropped | ||
1137 | * (if other requests of higher priority are in the queue) | ||
1138 | * @param cont continuation to call when done | ||
1139 | * @param cont_cls closure for @a cont | ||
1140 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | ||
1141 | * cancel; note that even if NULL is returned, the callback will be invoked | ||
1142 | * (or rather, will already have been invoked) | ||
1143 | */ | ||
1144 | struct GNUNET_DATASTORE_QueueEntry * | ||
1145 | GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | ||
1146 | uint64_t uid, | ||
1147 | uint32_t priority, | ||
1148 | struct GNUNET_TIME_Absolute expiration, | ||
1149 | unsigned int queue_priority, | ||
1150 | unsigned int max_queue_size, | ||
1151 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
1152 | void *cont_cls) | ||
1153 | { | ||
1154 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1155 | struct GNUNET_MQ_Envelope *env; | ||
1156 | struct UpdateMessage *um; | ||
1157 | union QueueContext qc; | ||
1158 | |||
1159 | if (NULL == cont) | ||
1160 | cont = &drop_status_cont; | ||
1161 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1162 | "Asked to update entry %llu raising priority by %u and expiration to %s\n", | ||
1163 | uid, | ||
1164 | (unsigned int) priority, | ||
1165 | GNUNET_STRINGS_absolute_time_to_string (expiration)); | ||
1166 | env = GNUNET_MQ_msg (um, | ||
1167 | GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); | ||
1168 | um->priority = htonl (priority); | ||
1169 | um->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1170 | um->uid = GNUNET_htonll (uid); | ||
1171 | |||
1172 | qc.sc.cont = cont; | ||
1173 | qc.sc.cont_cls = cont_cls; | ||
1174 | qe = make_queue_entry (h, | ||
1175 | env, | ||
1176 | queue_priority, | ||
1177 | max_queue_size, | ||
1178 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1179 | &qc); | ||
1180 | if (NULL == qe) | ||
1181 | { | ||
1182 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1183 | "Could not create queue entry for UPDATE\n"); | ||
1184 | return NULL; | ||
1185 | } | ||
1186 | GNUNET_STATISTICS_update (h->stats, | ||
1187 | gettext_noop ("# UPDATE requests executed"), 1, | ||
1188 | GNUNET_NO); | ||
1189 | process_queue (h); | ||
1190 | return qe; | ||
1191 | } | ||
1192 | |||
1193 | |||
1194 | /** | ||
1195 | * Explicitly remove some content from the database. | 1189 | * Explicitly remove some content from the database. |
1196 | * The @a cont continuation will be called with `status` | 1190 | * The @a cont continuation will be called with `status` |
1197 | * #GNUNET_OK" if content was removed, #GNUNET_NO | 1191 | * #GNUNET_OK" if content was removed, #GNUNET_NO |
@@ -1226,7 +1220,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1226 | struct GNUNET_MQ_Envelope *env; | 1220 | struct GNUNET_MQ_Envelope *env; |
1227 | union QueueContext qc; | 1221 | union QueueContext qc; |
1228 | 1222 | ||
1229 | if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 1223 | if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) |
1230 | { | 1224 | { |
1231 | GNUNET_break (0); | 1225 | GNUNET_break (0); |
1232 | return NULL; | 1226 | return NULL; |
@@ -1240,13 +1234,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1240 | env = GNUNET_MQ_msg_extra (dm, | 1234 | env = GNUNET_MQ_msg_extra (dm, |
1241 | size, | 1235 | size, |
1242 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | 1236 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); |
1243 | dm->rid = htonl (0); | ||
1244 | dm->size = htonl (size); | 1237 | dm->size = htonl (size); |
1245 | dm->type = htonl (0); | ||
1246 | dm->priority = htonl (0); | ||
1247 | dm->anonymity = htonl (0); | ||
1248 | dm->uid = GNUNET_htonll (0); | ||
1249 | dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); | ||
1250 | dm->key = *key; | 1238 | dm->key = *key; |
1251 | GNUNET_memcpy (&dm[1], | 1239 | GNUNET_memcpy (&dm[1], |
1252 | data, | 1240 | data, |
@@ -1339,10 +1327,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1339 | * Get a single zero-anonymity value from the datastore. | 1327 | * Get a single zero-anonymity value from the datastore. |
1340 | * | 1328 | * |
1341 | * @param h handle to the datastore | 1329 | * @param h handle to the datastore |
1342 | * @param offset offset of the result (modulo num-results); set to | 1330 | * @param next_uid return the result with lowest uid >= next_uid |
1343 | * a random 64-bit value initially; then increment by | ||
1344 | * one each time; detect that all results have been found by uid | ||
1345 | * being again the first uid ever returned. | ||
1346 | * @param queue_priority ranking of this request in the priority queue | 1331 | * @param queue_priority ranking of this request in the priority queue |
1347 | * @param max_queue_size at what queue size should this request be dropped | 1332 | * @param max_queue_size at what queue size should this request be dropped |
1348 | * (if other requests of higher priority are in the queue) | 1333 | * (if other requests of higher priority are in the queue) |
@@ -1356,7 +1341,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1356 | */ | 1341 | */ |
1357 | struct GNUNET_DATASTORE_QueueEntry * | 1342 | struct GNUNET_DATASTORE_QueueEntry * |
1358 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | 1343 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, |
1359 | uint64_t offset, | 1344 | uint64_t next_uid, |
1360 | unsigned int queue_priority, | 1345 | unsigned int queue_priority, |
1361 | unsigned int max_queue_size, | 1346 | unsigned int max_queue_size, |
1362 | enum GNUNET_BLOCK_Type type, | 1347 | enum GNUNET_BLOCK_Type type, |
@@ -1371,13 +1356,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1371 | GNUNET_assert (NULL != proc); | 1356 | GNUNET_assert (NULL != proc); |
1372 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | 1357 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1373 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1358 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1374 | "Asked to get %llu-th zero-anonymity entry of type %d\n", | 1359 | "Asked to get a zero-anonymity entry of type %d\n", |
1375 | (unsigned long long) offset, | ||
1376 | type); | 1360 | type); |
1377 | env = GNUNET_MQ_msg (m, | 1361 | env = GNUNET_MQ_msg (m, |
1378 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | 1362 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1379 | m->type = htonl ((uint32_t) type); | 1363 | m->type = htonl ((uint32_t) type); |
1380 | m->offset = GNUNET_htonll (offset); | 1364 | m->next_uid = GNUNET_htonll (next_uid); |
1381 | qc.rc.proc = proc; | 1365 | qc.rc.proc = proc; |
1382 | qc.rc.proc_cls = proc_cls; | 1366 | qc.rc.proc_cls = proc_cls; |
1383 | qe = make_queue_entry (h, | 1367 | qe = make_queue_entry (h, |
@@ -1406,10 +1390,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1406 | * will only be called once. | 1390 | * will only be called once. |
1407 | * | 1391 | * |
1408 | * @param h handle to the datastore | 1392 | * @param h handle to the datastore |
1409 | * @param offset offset of the result (modulo num-results); set to | 1393 | * @param next_uid return the result with lowest uid >= next_uid |
1410 | * a random 64-bit value initially; then increment by | 1394 | * @param random if true, return a random result instead of using next_uid |
1411 | * one each time; detect that all results have been found by uid | ||
1412 | * being again the first uid ever returned. | ||
1413 | * @param key maybe NULL (to match all entries) | 1395 | * @param key maybe NULL (to match all entries) |
1414 | * @param type desired type, 0 for any | 1396 | * @param type desired type, 0 for any |
1415 | * @param queue_priority ranking of this request in the priority queue | 1397 | * @param queue_priority ranking of this request in the priority queue |
@@ -1423,7 +1405,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1423 | */ | 1405 | */ |
1424 | struct GNUNET_DATASTORE_QueueEntry * | 1406 | struct GNUNET_DATASTORE_QueueEntry * |
1425 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | 1407 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, |
1426 | uint64_t offset, | 1408 | uint64_t next_uid, |
1409 | bool random, | ||
1427 | const struct GNUNET_HashCode *key, | 1410 | const struct GNUNET_HashCode *key, |
1428 | enum GNUNET_BLOCK_Type type, | 1411 | enum GNUNET_BLOCK_Type type, |
1429 | unsigned int queue_priority, | 1412 | unsigned int queue_priority, |
@@ -1447,14 +1430,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1447 | env = GNUNET_MQ_msg (gm, | 1430 | env = GNUNET_MQ_msg (gm, |
1448 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); | 1431 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); |
1449 | gm->type = htonl (type); | 1432 | gm->type = htonl (type); |
1450 | gm->offset = GNUNET_htonll (offset); | 1433 | gm->next_uid = GNUNET_htonll (next_uid); |
1434 | gm->random = random; | ||
1451 | } | 1435 | } |
1452 | else | 1436 | else |
1453 | { | 1437 | { |
1454 | env = GNUNET_MQ_msg (gkm, | 1438 | env = GNUNET_MQ_msg (gkm, |
1455 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); | 1439 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); |
1456 | gkm->type = htonl (type); | 1440 | gkm->type = htonl (type); |
1457 | gkm->offset = GNUNET_htonll (offset); | 1441 | gkm->next_uid = GNUNET_htonll (next_uid); |
1442 | gkm->random = random; | ||
1458 | gkm->key = *key; | 1443 | gkm->key = *key; |
1459 | } | 1444 | } |
1460 | qc.rc.proc = proc; | 1445 | qc.rc.proc = proc; |