diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 279 |
1 files changed, 118 insertions, 161 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 2bba2e8ee..99060bd60 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -63,14 +63,14 @@ struct StatusContext | |||
63 | struct ResultContext | 63 | struct ResultContext |
64 | { | 64 | { |
65 | /** | 65 | /** |
66 | * Iterator to call with the result. | 66 | * Function to call with the result. |
67 | */ | 67 | */ |
68 | GNUNET_DATASTORE_Iterator iter; | 68 | GNUNET_DATASTORE_DatumProcessor proc; |
69 | 69 | ||
70 | /** | 70 | /** |
71 | * Closure for iter. | 71 | * Closure for proc. |
72 | */ | 72 | */ |
73 | void *iter_cls; | 73 | void *proc_cls; |
74 | 74 | ||
75 | }; | 75 | }; |
76 | 76 | ||
@@ -168,12 +168,6 @@ struct GNUNET_DATASTORE_QueueEntry | |||
168 | */ | 168 | */ |
169 | int was_transmitted; | 169 | int was_transmitted; |
170 | 170 | ||
171 | /** | ||
172 | * Are we expecting a single message in response to this | ||
173 | * request (and, if it is data, no 'END' message)? | ||
174 | */ | ||
175 | int one_shot; | ||
176 | |||
177 | }; | 171 | }; |
178 | 172 | ||
179 | /** | 173 | /** |
@@ -241,10 +235,9 @@ struct GNUNET_DATASTORE_Handle | |||
241 | int in_receive; | 235 | int in_receive; |
242 | 236 | ||
243 | /** | 237 | /** |
244 | * We should either receive (and ignore) an 'END' message or force a | 238 | * We should ignore the next message(s) from the service. |
245 | * disconnect for the next message from the service. | ||
246 | */ | 239 | */ |
247 | unsigned int expect_end_or_disconnect; | 240 | unsigned int skip_next_messages; |
248 | 241 | ||
249 | }; | 242 | }; |
250 | 243 | ||
@@ -335,7 +328,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
335 | while (NULL != (qe = h->queue_head)) | 328 | while (NULL != (qe = h->queue_head)) |
336 | { | 329 | { |
337 | GNUNET_assert (NULL != qe->response_proc); | 330 | GNUNET_assert (NULL != qe->response_proc); |
338 | qe->response_proc (qe, NULL); | 331 | qe->response_proc (h, NULL); |
339 | } | 332 | } |
340 | if (GNUNET_YES == drop) | 333 | if (GNUNET_YES == drop) |
341 | { | 334 | { |
@@ -378,7 +371,7 @@ timeout_queue_entry (void *cls, | |||
378 | GNUNET_NO); | 371 | GNUNET_NO); |
379 | qe->task = GNUNET_SCHEDULER_NO_TASK; | 372 | qe->task = GNUNET_SCHEDULER_NO_TASK; |
380 | GNUNET_assert (qe->was_transmitted == GNUNET_NO); | 373 | GNUNET_assert (qe->was_transmitted == GNUNET_NO); |
381 | qe->response_proc (qe, NULL); | 374 | qe->response_proc (qe->h, NULL); |
382 | } | 375 | } |
383 | 376 | ||
384 | 377 | ||
@@ -394,7 +387,7 @@ timeout_queue_entry (void *cls, | |||
394 | * @param timeout timeout for the operation | 387 | * @param timeout timeout for the operation |
395 | * @param response_proc function to call with replies (can be NULL) | 388 | * @param response_proc function to call with replies (can be NULL) |
396 | * @param qc client context (NOT a closure for response_proc) | 389 | * @param qc client context (NOT a closure for response_proc) |
397 | * @return NULL if the queue is full (and this entry was dropped) | 390 | * @return NULL if the queue is full |
398 | */ | 391 | */ |
399 | static struct GNUNET_DATASTORE_QueueEntry * | 392 | static struct GNUNET_DATASTORE_QueueEntry * |
400 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | 393 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, |
@@ -418,6 +411,14 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
418 | c++; | 411 | c++; |
419 | pos = pos->next; | 412 | pos = pos->next; |
420 | } | 413 | } |
414 | if (c >= max_queue_size) | ||
415 | { | ||
416 | GNUNET_STATISTICS_update (h->stats, | ||
417 | gettext_noop ("# queue overflows"), | ||
418 | 1, | ||
419 | GNUNET_NO); | ||
420 | return NULL; | ||
421 | } | ||
421 | ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); | 422 | ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); |
422 | ret->h = h; | 423 | ret->h = h; |
423 | ret->response_proc = response_proc; | 424 | ret->response_proc = response_proc; |
@@ -451,15 +452,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
451 | pos, | 452 | pos, |
452 | ret); | 453 | ret); |
453 | h->queue_size++; | 454 | h->queue_size++; |
454 | if (c > max_queue_size) | ||
455 | { | ||
456 | GNUNET_STATISTICS_update (h->stats, | ||
457 | gettext_noop ("# queue overflows"), | ||
458 | 1, | ||
459 | GNUNET_NO); | ||
460 | response_proc (ret, NULL); | ||
461 | return NULL; | ||
462 | } | ||
463 | ret->task = GNUNET_SCHEDULER_add_delayed (timeout, | 455 | ret->task = GNUNET_SCHEDULER_add_delayed (timeout, |
464 | &timeout_queue_entry, | 456 | &timeout_queue_entry, |
465 | ret); | 457 | ret); |
@@ -469,7 +461,15 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
469 | if (pos->max_queue < h->queue_size) | 461 | if (pos->max_queue < h->queue_size) |
470 | { | 462 | { |
471 | GNUNET_assert (pos->response_proc != NULL); | 463 | GNUNET_assert (pos->response_proc != NULL); |
472 | pos->response_proc (pos, NULL); | 464 | /* move 'pos' element to head so that it will be |
465 | killed on 'NULL' call below */ | ||
466 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
467 | h->queue_tail, | ||
468 | pos); | ||
469 | GNUNET_CONTAINER_DLL_insert (h->queue_head, | ||
470 | h->queue_tail, | ||
471 | pos); | ||
472 | pos->response_proc (h, NULL); | ||
473 | break; | 473 | break; |
474 | } | 474 | } |
475 | pos = pos->next; | 475 | pos = pos->next; |
@@ -550,6 +550,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) | |||
550 | GNUNET_NO); | 550 | GNUNET_NO); |
551 | #endif | 551 | #endif |
552 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | 552 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
553 | h->skip_next_messages = 0; | ||
553 | h->client = NULL; | 554 | h->client = NULL; |
554 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, | 555 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, |
555 | &try_reconnect, | 556 | &try_reconnect, |
@@ -700,6 +701,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
700 | qe->task = GNUNET_SCHEDULER_NO_TASK; | 701 | qe->task = GNUNET_SCHEDULER_NO_TASK; |
701 | } | 702 | } |
702 | h->queue_size--; | 703 | h->queue_size--; |
704 | qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ | ||
703 | GNUNET_free (qe); | 705 | GNUNET_free (qe); |
704 | } | 706 | } |
705 | 707 | ||
@@ -724,16 +726,22 @@ process_status_message (void *cls, | |||
724 | int was_transmitted; | 726 | int was_transmitted; |
725 | 727 | ||
726 | h->in_receive = GNUNET_NO; | 728 | h->in_receive = GNUNET_NO; |
729 | if (h->skip_next_messages > 0) | ||
730 | { | ||
731 | h->skip_next_messages--; | ||
732 | process_queue (h); | ||
733 | return; | ||
734 | } | ||
727 | if (NULL == (qe = h->queue_head)) | 735 | if (NULL == (qe = h->queue_head)) |
728 | { | 736 | { |
729 | GNUNET_break (0); | 737 | GNUNET_break (0); |
730 | do_disconnect (h); | 738 | do_disconnect (h); |
731 | return; | 739 | return; |
732 | } | 740 | } |
733 | was_transmitted = qe->was_transmitted; | ||
734 | rc = qe->qc.sc; | 741 | rc = qe->qc.sc; |
735 | if (msg == NULL) | 742 | if (msg == NULL) |
736 | { | 743 | { |
744 | was_transmitted = qe->was_transmitted; | ||
737 | free_queue_entry (qe); | 745 | free_queue_entry (qe); |
738 | if (NULL == h->client) | 746 | if (NULL == h->client) |
739 | return; /* forced disconnect */ | 747 | return; /* forced disconnect */ |
@@ -1114,7 +1122,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1114 | struct GNUNET_DATASTORE_QueueEntry * | 1122 | struct GNUNET_DATASTORE_QueueEntry * |
1115 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | 1123 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, |
1116 | const GNUNET_HashCode *key, | 1124 | const GNUNET_HashCode *key, |
1117 | size_t size, | 1125 | size_t size, |
1118 | const void *data, | 1126 | const void *data, |
1119 | unsigned int queue_priority, | 1127 | unsigned int queue_priority, |
1120 | unsigned int max_queue_size, | 1128 | unsigned int max_queue_size, |
@@ -1186,45 +1194,35 @@ process_result_message (void *cls, | |||
1186 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1194 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1187 | struct ResultContext rc; | 1195 | struct ResultContext rc; |
1188 | const struct DataMessage *dm; | 1196 | const struct DataMessage *dm; |
1189 | int was_transmitted; | ||
1190 | 1197 | ||
1191 | h->in_receive = GNUNET_NO; | 1198 | h->in_receive = GNUNET_NO; |
1199 | if (h->skip_next_messages > 0) | ||
1200 | { | ||
1201 | h->skip_next_messages--; | ||
1202 | process_queue (h); | ||
1203 | return; | ||
1204 | } | ||
1192 | if (msg == NULL) | 1205 | if (msg == NULL) |
1193 | { | 1206 | { |
1194 | if (NULL != (qe = h->queue_head)) | 1207 | qe = h->queue_head; |
1208 | GNUNET_assert (NULL != qe); | ||
1209 | if (qe->was_transmitted == GNUNET_YES) | ||
1195 | { | 1210 | { |
1196 | was_transmitted = qe->was_transmitted; | ||
1197 | free_queue_entry (qe); | ||
1198 | rc = qe->qc.rc; | 1211 | rc = qe->qc.rc; |
1199 | if (was_transmitted == GNUNET_YES) | 1212 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1200 | { | 1213 | _("Failed to receive response from database.\n")); |
1201 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1214 | do_disconnect (h); |
1202 | _("Failed to receive response from database.\n")); | ||
1203 | do_disconnect (h); | ||
1204 | } | ||
1205 | else | ||
1206 | { | ||
1207 | #if DEBUG_DATASTORE | ||
1208 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1209 | "Request dropped due to finite datastore queue length.\n"); | ||
1210 | #endif | ||
1211 | } | ||
1212 | if (rc.iter != NULL) | ||
1213 | rc.iter (rc.iter_cls, | ||
1214 | NULL, 0, NULL, 0, 0, 0, | ||
1215 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1216 | } | 1215 | } |
1216 | free_queue_entry (qe); | ||
1217 | if (rc.proc != NULL) | ||
1218 | rc.proc (rc.proc_cls, | ||
1219 | NULL, 0, NULL, 0, 0, 0, | ||
1220 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1217 | return; | 1221 | return; |
1218 | } | 1222 | } |
1219 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | 1223 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) |
1220 | { | 1224 | { |
1221 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); | 1225 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); |
1222 | if (h->expect_end_or_disconnect > 0) | ||
1223 | { | ||
1224 | h->expect_end_or_disconnect--; | ||
1225 | process_queue (h); | ||
1226 | return; | ||
1227 | } | ||
1228 | qe = h->queue_head; | 1226 | qe = h->queue_head; |
1229 | rc = qe->qc.rc; | 1227 | rc = qe->qc.rc; |
1230 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | 1228 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); |
@@ -1234,8 +1232,8 @@ process_result_message (void *cls, | |||
1234 | "Received end of result set, new queue size is %u\n", | 1232 | "Received end of result set, new queue size is %u\n", |
1235 | h->queue_size); | 1233 | h->queue_size); |
1236 | #endif | 1234 | #endif |
1237 | if (rc.iter != NULL) | 1235 | if (rc.proc != NULL) |
1238 | rc.iter (rc.iter_cls, | 1236 | rc.proc (rc.proc_cls, |
1239 | NULL, 0, NULL, 0, 0, 0, | 1237 | NULL, 0, NULL, 0, 0, 0, |
1240 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 1238 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
1241 | h->retry_time.rel_value = 0; | 1239 | h->retry_time.rel_value = 0; |
@@ -1243,13 +1241,6 @@ process_result_message (void *cls, | |||
1243 | process_queue (h); | 1241 | process_queue (h); |
1244 | return; | 1242 | return; |
1245 | } | 1243 | } |
1246 | if (h->expect_end_or_disconnect > 0) | ||
1247 | { | ||
1248 | /* only 'END' allowed, must reconnect */ | ||
1249 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1250 | do_disconnect (h); | ||
1251 | return; | ||
1252 | } | ||
1253 | qe = h->queue_head; | 1244 | qe = h->queue_head; |
1254 | rc = qe->qc.rc; | 1245 | rc = qe->qc.rc; |
1255 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | 1246 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); |
@@ -1261,40 +1252,16 @@ process_result_message (void *cls, | |||
1261 | free_queue_entry (qe); | 1252 | free_queue_entry (qe); |
1262 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 1253 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
1263 | do_disconnect (h); | 1254 | do_disconnect (h); |
1264 | if (rc.iter != NULL) | 1255 | if (rc.proc != NULL) |
1265 | rc.iter (rc.iter_cls, | 1256 | rc.proc (rc.proc_cls, |
1266 | NULL, 0, NULL, 0, 0, 0, | 1257 | NULL, 0, NULL, 0, 0, 0, |
1267 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 1258 | GNUNET_TIME_UNIT_ZERO_ABS, 0); |
1268 | return; | 1259 | return; |
1269 | } | 1260 | } |
1270 | GNUNET_STATISTICS_update (h->stats, | 1261 | GNUNET_STATISTICS_update (h->stats, |
1271 | gettext_noop ("# Results received"), | 1262 | gettext_noop ("# Results received"), |
1272 | 1, | 1263 | 1, |
1273 | GNUNET_NO); | 1264 | GNUNET_NO); |
1274 | if (rc.iter == NULL) | ||
1275 | { | ||
1276 | h->result_count++; | ||
1277 | GNUNET_STATISTICS_update (h->stats, | ||
1278 | gettext_noop ("# Excess results received"), | ||
1279 | 1, | ||
1280 | GNUNET_NO); | ||
1281 | if (h->result_count > MAX_EXCESS_RESULTS) | ||
1282 | { | ||
1283 | free_queue_entry (qe); | ||
1284 | GNUNET_STATISTICS_update (h->stats, | ||
1285 | gettext_noop ("# Forced database connection resets"), | ||
1286 | 1, | ||
1287 | GNUNET_NO); | ||
1288 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1289 | do_disconnect (h); | ||
1290 | return; | ||
1291 | } | ||
1292 | if (GNUNET_YES == qe->one_shot) | ||
1293 | free_queue_entry (qe); | ||
1294 | else | ||
1295 | GNUNET_DATASTORE_iterate_get_next (h); | ||
1296 | return; | ||
1297 | } | ||
1298 | dm = (const struct DataMessage*) msg; | 1265 | dm = (const struct DataMessage*) msg; |
1299 | #if DEBUG_DATASTORE | 1266 | #if DEBUG_DATASTORE |
1300 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1267 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1304,10 +1271,9 @@ process_result_message (void *cls, | |||
1304 | ntohl(dm->size), | 1271 | ntohl(dm->size), |
1305 | GNUNET_h2s(&dm->key)); | 1272 | GNUNET_h2s(&dm->key)); |
1306 | #endif | 1273 | #endif |
1307 | if (GNUNET_YES == qe->one_shot) | 1274 | free_queue_entry (qe); |
1308 | free_queue_entry (qe); | ||
1309 | h->retry_time.rel_value = 0; | 1275 | h->retry_time.rel_value = 0; |
1310 | rc.iter (rc.iter_cls, | 1276 | rc.proc (rc.proc_cls, |
1311 | &dm->key, | 1277 | &dm->key, |
1312 | ntohl(dm->size), | 1278 | ntohl(dm->size), |
1313 | &dm[1], | 1279 | &dm[1], |
@@ -1331,33 +1297,33 @@ process_result_message (void *cls, | |||
1331 | * @param max_queue_size at what queue size should this request be dropped | 1297 | * @param max_queue_size at what queue size should this request be dropped |
1332 | * (if other requests of higher priority are in the queue) | 1298 | * (if other requests of higher priority are in the queue) |
1333 | * @param timeout how long to wait at most for a response | 1299 | * @param timeout how long to wait at most for a response |
1334 | * @param iter function to call on a random value; it | 1300 | * @param proc function to call on a random value; it |
1335 | * will be called once with a value (if available) | 1301 | * will be called once with a value (if available) |
1336 | * and always once with a value of NULL. | 1302 | * and always once with a value of NULL. |
1337 | * @param iter_cls closure for iter | 1303 | * @param proc_cls closure for proc |
1338 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1304 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
1339 | * cancel; note that even if NULL is returned, the callback will be invoked | 1305 | * cancel |
1340 | * (or rather, will already have been invoked) | ||
1341 | */ | 1306 | */ |
1342 | struct GNUNET_DATASTORE_QueueEntry * | 1307 | struct GNUNET_DATASTORE_QueueEntry * |
1343 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | 1308 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, |
1344 | unsigned int queue_priority, | 1309 | unsigned int queue_priority, |
1345 | unsigned int max_queue_size, | 1310 | unsigned int max_queue_size, |
1346 | struct GNUNET_TIME_Relative timeout, | 1311 | struct GNUNET_TIME_Relative timeout, |
1347 | GNUNET_DATASTORE_Iterator iter, | 1312 | GNUNET_DATASTORE_DatumProcessor proc, |
1348 | void *iter_cls) | 1313 | void *proc_cls) |
1349 | { | 1314 | { |
1350 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1315 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1351 | struct GNUNET_MessageHeader *m; | 1316 | struct GNUNET_MessageHeader *m; |
1352 | union QueueContext qc; | 1317 | union QueueContext qc; |
1353 | 1318 | ||
1319 | GNUNET_assert (NULL != proc); | ||
1354 | #if DEBUG_DATASTORE | 1320 | #if DEBUG_DATASTORE |
1355 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1321 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1356 | "Asked to get replication entry in %llu ms\n", | 1322 | "Asked to get replication entry in %llu ms\n", |
1357 | (unsigned long long) timeout.rel_value); | 1323 | (unsigned long long) timeout.rel_value); |
1358 | #endif | 1324 | #endif |
1359 | qc.rc.iter = iter; | 1325 | qc.rc.proc = proc; |
1360 | qc.rc.iter_cls = iter_cls; | 1326 | qc.rc.proc_cls = proc_cls; |
1361 | qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), | 1327 | qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), |
1362 | queue_priority, max_queue_size, timeout, | 1328 | queue_priority, max_queue_size, timeout, |
1363 | &process_result_message, &qc); | 1329 | &process_result_message, &qc); |
@@ -1369,7 +1335,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1369 | #endif | 1335 | #endif |
1370 | return NULL; | 1336 | return NULL; |
1371 | } | 1337 | } |
1372 | qe->one_shot = GNUNET_YES; | ||
1373 | GNUNET_STATISTICS_update (h->stats, | 1338 | GNUNET_STATISTICS_update (h->stats, |
1374 | gettext_noop ("# GET REPLICATION requests executed"), | 1339 | gettext_noop ("# GET REPLICATION requests executed"), |
1375 | 1, | 1340 | 1, |
@@ -1383,43 +1348,50 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1383 | 1348 | ||
1384 | 1349 | ||
1385 | /** | 1350 | /** |
1386 | * Get a zero-anonymity value from the datastore. | 1351 | * Get a single zero-anonymity value from the datastore. |
1387 | * | 1352 | * |
1388 | * @param h handle to the datastore | 1353 | * @param h handle to the datastore |
1354 | * @param offset offset of the result (mod #num-results); set to | ||
1355 | * a random 64-bit value initially; then increment by | ||
1356 | * one each time; detect that all results have been found by uid | ||
1357 | * being again the first uid ever returned. | ||
1389 | * @param queue_priority ranking of this request in the priority queue | 1358 | * @param queue_priority ranking of this request in the priority queue |
1390 | * @param max_queue_size at what queue size should this request be dropped | 1359 | * @param max_queue_size at what queue size should this request be dropped |
1391 | * (if other requests of higher priority are in the queue) | 1360 | * (if other requests of higher priority are in the queue) |
1392 | * @param timeout how long to wait at most for a response | 1361 | * @param timeout how long to wait at most for a response |
1393 | * @param type allowed type for the operation | 1362 | * @param type allowed type for the operation (never zero) |
1394 | * @param iter function to call on a random value; it | 1363 | * @param proc function to call on a random value; it |
1395 | * will be called once with a value (if available) | 1364 | * will be called once with a value (if available) |
1396 | * and always once with a value of NULL. | 1365 | * or with NULL if none value exists. |
1397 | * @param iter_cls closure for iter | 1366 | * @param proc_cls closure for proc |
1398 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1367 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
1399 | * cancel; note that even if NULL is returned, the callback will be invoked | 1368 | * cancel |
1400 | * (or rather, will already have been invoked) | ||
1401 | */ | 1369 | */ |
1402 | struct GNUNET_DATASTORE_QueueEntry * | 1370 | struct GNUNET_DATASTORE_QueueEntry * |
1403 | GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | 1371 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, |
1404 | unsigned int queue_priority, | 1372 | uint64_t offset, |
1405 | unsigned int max_queue_size, | 1373 | unsigned int queue_priority, |
1406 | struct GNUNET_TIME_Relative timeout, | 1374 | unsigned int max_queue_size, |
1407 | enum GNUNET_BLOCK_Type type, | 1375 | struct GNUNET_TIME_Relative timeout, |
1408 | GNUNET_DATASTORE_Iterator iter, | 1376 | enum GNUNET_BLOCK_Type type, |
1409 | void *iter_cls) | 1377 | GNUNET_DATASTORE_DatumProcessor proc, |
1378 | void *proc_cls) | ||
1410 | { | 1379 | { |
1411 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1380 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1412 | struct GetZeroAnonymityMessage *m; | 1381 | struct GetZeroAnonymityMessage *m; |
1413 | union QueueContext qc; | 1382 | union QueueContext qc; |
1414 | 1383 | ||
1384 | GNUNET_assert (NULL != proc); | ||
1415 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | 1385 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1416 | #if DEBUG_DATASTORE | 1386 | #if DEBUG_DATASTORE |
1417 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1387 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1418 | "Asked to get zero-anonymity entry in %llu ms\n", | 1388 | "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", |
1389 | (unsigned long long) offset, | ||
1390 | type, | ||
1419 | (unsigned long long) timeout.rel_value); | 1391 | (unsigned long long) timeout.rel_value); |
1420 | #endif | 1392 | #endif |
1421 | qc.rc.iter = iter; | 1393 | qc.rc.proc = proc; |
1422 | qc.rc.iter_cls = iter_cls; | 1394 | qc.rc.proc_cls = proc_cls; |
1423 | qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), | 1395 | qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), |
1424 | queue_priority, max_queue_size, timeout, | 1396 | queue_priority, max_queue_size, timeout, |
1425 | &process_result_message, &qc); | 1397 | &process_result_message, &qc); |
@@ -1427,7 +1399,7 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1427 | { | 1399 | { |
1428 | #if DEBUG_DATASTORE | 1400 | #if DEBUG_DATASTORE |
1429 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1401 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1430 | "Could not create queue entry for zero-anonymity iteration\n"); | 1402 | "Could not create queue entry for zero-anonymity procation\n"); |
1431 | #endif | 1403 | #endif |
1432 | return NULL; | 1404 | return NULL; |
1433 | } | 1405 | } |
@@ -1439,55 +1411,57 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1439 | m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | 1411 | m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1440 | m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); | 1412 | m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); |
1441 | m->type = htonl ((uint32_t) type); | 1413 | m->type = htonl ((uint32_t) type); |
1414 | m->offset = GNUNET_htonll (offset); | ||
1442 | process_queue (h); | 1415 | process_queue (h); |
1443 | return qe; | 1416 | return qe; |
1444 | } | 1417 | } |
1445 | 1418 | ||
1446 | 1419 | ||
1447 | |||
1448 | /** | 1420 | /** |
1449 | * Iterate over the results for a particular key | 1421 | * Get a result for a particular key from the datastore. The processor |
1450 | * in the datastore. The iterator will only be called | 1422 | * will only be called once. |
1451 | * once initially; if the first call did contain a | ||
1452 | * result, further results can be obtained by calling | ||
1453 | * "GNUNET_DATASTORE_iterate_get_next" with the given argument. | ||
1454 | * | 1423 | * |
1455 | * @param h handle to the datastore | 1424 | * @param h handle to the datastore |
1425 | * @param offset offset of the result (mod #num-results); set to | ||
1426 | * a random 64-bit value initially; then increment by | ||
1427 | * one each time; detect that all results have been found by uid | ||
1428 | * being again the first uid ever returned. | ||
1456 | * @param key maybe NULL (to match all entries) | 1429 | * @param key maybe NULL (to match all entries) |
1457 | * @param type desired type, 0 for any | 1430 | * @param type desired type, 0 for any |
1458 | * @param queue_priority ranking of this request in the priority queue | 1431 | * @param queue_priority ranking of this request in the priority queue |
1459 | * @param max_queue_size at what queue size should this request be dropped | 1432 | * @param max_queue_size at what queue size should this request be dropped |
1460 | * (if other requests of higher priority are in the queue) | 1433 | * (if other requests of higher priority are in the queue) |
1461 | * @param timeout how long to wait at most for a response | 1434 | * @param timeout how long to wait at most for a response |
1462 | * @param iter function to call on each matching value; | 1435 | * @param proc function to call on each matching value; |
1463 | * will be called once with a NULL value at the end | 1436 | * will be called once with a NULL value at the end |
1464 | * @param iter_cls closure for iter | 1437 | * @param proc_cls closure for proc |
1465 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1438 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
1466 | * cancel; note that even if NULL is returned, the callback will be invoked | 1439 | * cancel |
1467 | * (or rather, will already have been invoked) | ||
1468 | */ | 1440 | */ |
1469 | struct GNUNET_DATASTORE_QueueEntry * | 1441 | struct GNUNET_DATASTORE_QueueEntry * |
1470 | GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, | 1442 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, |
1471 | const GNUNET_HashCode * key, | 1443 | uint64_t offset, |
1472 | enum GNUNET_BLOCK_Type type, | 1444 | const GNUNET_HashCode * key, |
1473 | unsigned int queue_priority, | 1445 | enum GNUNET_BLOCK_Type type, |
1474 | unsigned int max_queue_size, | 1446 | unsigned int queue_priority, |
1475 | struct GNUNET_TIME_Relative timeout, | 1447 | unsigned int max_queue_size, |
1476 | GNUNET_DATASTORE_Iterator iter, | 1448 | struct GNUNET_TIME_Relative timeout, |
1477 | void *iter_cls) | 1449 | GNUNET_DATASTORE_DatumProcessor proc, |
1450 | void *proc_cls) | ||
1478 | { | 1451 | { |
1479 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1452 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1480 | struct GetMessage *gm; | 1453 | struct GetMessage *gm; |
1481 | union QueueContext qc; | 1454 | union QueueContext qc; |
1482 | 1455 | ||
1456 | GNUNET_assert (NULL != proc); | ||
1483 | #if DEBUG_DATASTORE | 1457 | #if DEBUG_DATASTORE |
1484 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1458 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1485 | "Asked to look for data of type %u under key `%s'\n", | 1459 | "Asked to look for data of type %u under key `%s'\n", |
1486 | (unsigned int) type, | 1460 | (unsigned int) type, |
1487 | GNUNET_h2s (key)); | 1461 | GNUNET_h2s (key)); |
1488 | #endif | 1462 | #endif |
1489 | qc.rc.iter = iter; | 1463 | qc.rc.proc = proc; |
1490 | qc.rc.iter_cls = iter_cls; | 1464 | qc.rc.proc_cls = proc_cls; |
1491 | qe = make_queue_entry (h, sizeof(struct GetMessage), | 1465 | qe = make_queue_entry (h, sizeof(struct GetMessage), |
1492 | queue_priority, max_queue_size, timeout, | 1466 | queue_priority, max_queue_size, timeout, |
1493 | &process_result_message, &qc); | 1467 | &process_result_message, &qc); |
@@ -1507,6 +1481,7 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, | |||
1507 | gm = (struct GetMessage*) &qe[1]; | 1481 | gm = (struct GetMessage*) &qe[1]; |
1508 | gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); | 1482 | gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); |
1509 | gm->type = htonl(type); | 1483 | gm->type = htonl(type); |
1484 | gm->offset = GNUNET_htonll (offset); | ||
1510 | if (key != NULL) | 1485 | if (key != NULL) |
1511 | { | 1486 | { |
1512 | gm->header.size = htons(sizeof (struct GetMessage)); | 1487 | gm->header.size = htons(sizeof (struct GetMessage)); |
@@ -1522,25 +1497,6 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, | |||
1522 | 1497 | ||
1523 | 1498 | ||
1524 | /** | 1499 | /** |
1525 | * Function called to trigger obtaining the next result | ||
1526 | * from the datastore. | ||
1527 | * | ||
1528 | * @param h handle to the datastore | ||
1529 | */ | ||
1530 | void | ||
1531 | GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h) | ||
1532 | { | ||
1533 | struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; | ||
1534 | |||
1535 | h->in_receive = GNUNET_YES; | ||
1536 | GNUNET_CLIENT_receive (h->client, | ||
1537 | &process_result_message, | ||
1538 | h, | ||
1539 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | ||
1540 | } | ||
1541 | |||
1542 | |||
1543 | /** | ||
1544 | * Cancel a datastore operation. The final callback from the | 1500 | * Cancel a datastore operation. The final callback from the |
1545 | * operation must not have been done yet. | 1501 | * operation must not have been done yet. |
1546 | * | 1502 | * |
@@ -1551,6 +1507,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
1551 | { | 1507 | { |
1552 | struct GNUNET_DATASTORE_Handle *h; | 1508 | struct GNUNET_DATASTORE_Handle *h; |
1553 | 1509 | ||
1510 | GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); | ||
1554 | h = qe->h; | 1511 | h = qe->h; |
1555 | #if DEBUG_DATASTORE | 1512 | #if DEBUG_DATASTORE |
1556 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1513 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1562,7 +1519,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
1562 | if (GNUNET_YES == qe->was_transmitted) | 1519 | if (GNUNET_YES == qe->was_transmitted) |
1563 | { | 1520 | { |
1564 | free_queue_entry (qe); | 1521 | free_queue_entry (qe); |
1565 | h->expect_end_or_disconnect++; | 1522 | h->skip_next_messages++; |
1566 | return; | 1523 | return; |
1567 | } | 1524 | } |
1568 | free_queue_entry (qe); | 1525 | free_queue_entry (qe); |