aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c279
1 files changed, 118 insertions, 161 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 2bba2e8ee..99060bd60 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -63,14 +63,14 @@ struct StatusContext
63struct ResultContext 63struct ResultContext
64{ 64{
65 /** 65 /**
66 * Iterator to call with the result. 66 * Function to call with the result.
67 */ 67 */
68 GNUNET_DATASTORE_Iterator iter; 68 GNUNET_DATASTORE_DatumProcessor proc;
69 69
70 /** 70 /**
71 * Closure for iter. 71 * Closure for proc.
72 */ 72 */
73 void *iter_cls; 73 void *proc_cls;
74 74
75}; 75};
76 76
@@ -168,12 +168,6 @@ struct GNUNET_DATASTORE_QueueEntry
168 */ 168 */
169 int was_transmitted; 169 int was_transmitted;
170 170
171 /**
172 * Are we expecting a single message in response to this
173 * request (and, if it is data, no 'END' message)?
174 */
175 int one_shot;
176
177}; 171};
178 172
179/** 173/**
@@ -241,10 +235,9 @@ struct GNUNET_DATASTORE_Handle
241 int in_receive; 235 int in_receive;
242 236
243 /** 237 /**
244 * We should either receive (and ignore) an 'END' message or force a 238 * We should ignore the next message(s) from the service.
245 * disconnect for the next message from the service.
246 */ 239 */
247 unsigned int expect_end_or_disconnect; 240 unsigned int skip_next_messages;
248 241
249}; 242};
250 243
@@ -335,7 +328,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
335 while (NULL != (qe = h->queue_head)) 328 while (NULL != (qe = h->queue_head))
336 { 329 {
337 GNUNET_assert (NULL != qe->response_proc); 330 GNUNET_assert (NULL != qe->response_proc);
338 qe->response_proc (qe, NULL); 331 qe->response_proc (h, NULL);
339 } 332 }
340 if (GNUNET_YES == drop) 333 if (GNUNET_YES == drop)
341 { 334 {
@@ -378,7 +371,7 @@ timeout_queue_entry (void *cls,
378 GNUNET_NO); 371 GNUNET_NO);
379 qe->task = GNUNET_SCHEDULER_NO_TASK; 372 qe->task = GNUNET_SCHEDULER_NO_TASK;
380 GNUNET_assert (qe->was_transmitted == GNUNET_NO); 373 GNUNET_assert (qe->was_transmitted == GNUNET_NO);
381 qe->response_proc (qe, NULL); 374 qe->response_proc (qe->h, NULL);
382} 375}
383 376
384 377
@@ -394,7 +387,7 @@ timeout_queue_entry (void *cls,
394 * @param timeout timeout for the operation 387 * @param timeout timeout for the operation
395 * @param response_proc function to call with replies (can be NULL) 388 * @param response_proc function to call with replies (can be NULL)
396 * @param qc client context (NOT a closure for response_proc) 389 * @param qc client context (NOT a closure for response_proc)
397 * @return NULL if the queue is full (and this entry was dropped) 390 * @return NULL if the queue is full
398 */ 391 */
399static struct GNUNET_DATASTORE_QueueEntry * 392static struct GNUNET_DATASTORE_QueueEntry *
400make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 393make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
@@ -418,6 +411,14 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
418 c++; 411 c++;
419 pos = pos->next; 412 pos = pos->next;
420 } 413 }
414 if (c >= max_queue_size)
415 {
416 GNUNET_STATISTICS_update (h->stats,
417 gettext_noop ("# queue overflows"),
418 1,
419 GNUNET_NO);
420 return NULL;
421 }
421 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); 422 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
422 ret->h = h; 423 ret->h = h;
423 ret->response_proc = response_proc; 424 ret->response_proc = response_proc;
@@ -451,15 +452,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
451 pos, 452 pos,
452 ret); 453 ret);
453 h->queue_size++; 454 h->queue_size++;
454 if (c > max_queue_size)
455 {
456 GNUNET_STATISTICS_update (h->stats,
457 gettext_noop ("# queue overflows"),
458 1,
459 GNUNET_NO);
460 response_proc (ret, NULL);
461 return NULL;
462 }
463 ret->task = GNUNET_SCHEDULER_add_delayed (timeout, 455 ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
464 &timeout_queue_entry, 456 &timeout_queue_entry,
465 ret); 457 ret);
@@ -469,7 +461,15 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
469 if (pos->max_queue < h->queue_size) 461 if (pos->max_queue < h->queue_size)
470 { 462 {
471 GNUNET_assert (pos->response_proc != NULL); 463 GNUNET_assert (pos->response_proc != NULL);
472 pos->response_proc (pos, NULL); 464 /* move 'pos' element to head so that it will be
465 killed on 'NULL' call below */
466 GNUNET_CONTAINER_DLL_remove (h->queue_head,
467 h->queue_tail,
468 pos);
469 GNUNET_CONTAINER_DLL_insert (h->queue_head,
470 h->queue_tail,
471 pos);
472 pos->response_proc (h, NULL);
473 break; 473 break;
474 } 474 }
475 pos = pos->next; 475 pos = pos->next;
@@ -550,6 +550,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
550 GNUNET_NO); 550 GNUNET_NO);
551#endif 551#endif
552 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); 552 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
553 h->skip_next_messages = 0;
553 h->client = NULL; 554 h->client = NULL;
554 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, 555 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
555 &try_reconnect, 556 &try_reconnect,
@@ -700,6 +701,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
700 qe->task = GNUNET_SCHEDULER_NO_TASK; 701 qe->task = GNUNET_SCHEDULER_NO_TASK;
701 } 702 }
702 h->queue_size--; 703 h->queue_size--;
704 qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
703 GNUNET_free (qe); 705 GNUNET_free (qe);
704} 706}
705 707
@@ -724,16 +726,22 @@ process_status_message (void *cls,
724 int was_transmitted; 726 int was_transmitted;
725 727
726 h->in_receive = GNUNET_NO; 728 h->in_receive = GNUNET_NO;
729 if (h->skip_next_messages > 0)
730 {
731 h->skip_next_messages--;
732 process_queue (h);
733 return;
734 }
727 if (NULL == (qe = h->queue_head)) 735 if (NULL == (qe = h->queue_head))
728 { 736 {
729 GNUNET_break (0); 737 GNUNET_break (0);
730 do_disconnect (h); 738 do_disconnect (h);
731 return; 739 return;
732 } 740 }
733 was_transmitted = qe->was_transmitted;
734 rc = qe->qc.sc; 741 rc = qe->qc.sc;
735 if (msg == NULL) 742 if (msg == NULL)
736 { 743 {
744 was_transmitted = qe->was_transmitted;
737 free_queue_entry (qe); 745 free_queue_entry (qe);
738 if (NULL == h->client) 746 if (NULL == h->client)
739 return; /* forced disconnect */ 747 return; /* forced disconnect */
@@ -1114,7 +1122,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1114struct GNUNET_DATASTORE_QueueEntry * 1122struct GNUNET_DATASTORE_QueueEntry *
1115GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, 1123GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1116 const GNUNET_HashCode *key, 1124 const GNUNET_HashCode *key,
1117 size_t size, 1125 size_t size,
1118 const void *data, 1126 const void *data,
1119 unsigned int queue_priority, 1127 unsigned int queue_priority,
1120 unsigned int max_queue_size, 1128 unsigned int max_queue_size,
@@ -1186,45 +1194,35 @@ process_result_message (void *cls,
1186 struct GNUNET_DATASTORE_QueueEntry *qe; 1194 struct GNUNET_DATASTORE_QueueEntry *qe;
1187 struct ResultContext rc; 1195 struct ResultContext rc;
1188 const struct DataMessage *dm; 1196 const struct DataMessage *dm;
1189 int was_transmitted;
1190 1197
1191 h->in_receive = GNUNET_NO; 1198 h->in_receive = GNUNET_NO;
1199 if (h->skip_next_messages > 0)
1200 {
1201 h->skip_next_messages--;
1202 process_queue (h);
1203 return;
1204 }
1192 if (msg == NULL) 1205 if (msg == NULL)
1193 { 1206 {
1194 if (NULL != (qe = h->queue_head)) 1207 qe = h->queue_head;
1208 GNUNET_assert (NULL != qe);
1209 if (qe->was_transmitted == GNUNET_YES)
1195 { 1210 {
1196 was_transmitted = qe->was_transmitted;
1197 free_queue_entry (qe);
1198 rc = qe->qc.rc; 1211 rc = qe->qc.rc;
1199 if (was_transmitted == GNUNET_YES) 1212 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1200 { 1213 _("Failed to receive response from database.\n"));
1201 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1214 do_disconnect (h);
1202 _("Failed to receive response from database.\n"));
1203 do_disconnect (h);
1204 }
1205 else
1206 {
1207#if DEBUG_DATASTORE
1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209 "Request dropped due to finite datastore queue length.\n");
1210#endif
1211 }
1212 if (rc.iter != NULL)
1213 rc.iter (rc.iter_cls,
1214 NULL, 0, NULL, 0, 0, 0,
1215 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1216 } 1215 }
1216 free_queue_entry (qe);
1217 if (rc.proc != NULL)
1218 rc.proc (rc.proc_cls,
1219 NULL, 0, NULL, 0, 0, 0,
1220 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1217 return; 1221 return;
1218 } 1222 }
1219 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 1223 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1220 { 1224 {
1221 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); 1225 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1222 if (h->expect_end_or_disconnect > 0)
1223 {
1224 h->expect_end_or_disconnect--;
1225 process_queue (h);
1226 return;
1227 }
1228 qe = h->queue_head; 1226 qe = h->queue_head;
1229 rc = qe->qc.rc; 1227 rc = qe->qc.rc;
1230 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 1228 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1234,8 +1232,8 @@ process_result_message (void *cls,
1234 "Received end of result set, new queue size is %u\n", 1232 "Received end of result set, new queue size is %u\n",
1235 h->queue_size); 1233 h->queue_size);
1236#endif 1234#endif
1237 if (rc.iter != NULL) 1235 if (rc.proc != NULL)
1238 rc.iter (rc.iter_cls, 1236 rc.proc (rc.proc_cls,
1239 NULL, 0, NULL, 0, 0, 0, 1237 NULL, 0, NULL, 0, 0, 0,
1240 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1238 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1241 h->retry_time.rel_value = 0; 1239 h->retry_time.rel_value = 0;
@@ -1243,13 +1241,6 @@ process_result_message (void *cls,
1243 process_queue (h); 1241 process_queue (h);
1244 return; 1242 return;
1245 } 1243 }
1246 if (h->expect_end_or_disconnect > 0)
1247 {
1248 /* only 'END' allowed, must reconnect */
1249 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1250 do_disconnect (h);
1251 return;
1252 }
1253 qe = h->queue_head; 1244 qe = h->queue_head;
1254 rc = qe->qc.rc; 1245 rc = qe->qc.rc;
1255 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 1246 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1261,40 +1252,16 @@ process_result_message (void *cls,
1261 free_queue_entry (qe); 1252 free_queue_entry (qe);
1262 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1253 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1263 do_disconnect (h); 1254 do_disconnect (h);
1264 if (rc.iter != NULL) 1255 if (rc.proc != NULL)
1265 rc.iter (rc.iter_cls, 1256 rc.proc (rc.proc_cls,
1266 NULL, 0, NULL, 0, 0, 0, 1257 NULL, 0, NULL, 0, 0, 0,
1267 GNUNET_TIME_UNIT_ZERO_ABS, 0); 1258 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1268 return; 1259 return;
1269 } 1260 }
1270 GNUNET_STATISTICS_update (h->stats, 1261 GNUNET_STATISTICS_update (h->stats,
1271 gettext_noop ("# Results received"), 1262 gettext_noop ("# Results received"),
1272 1, 1263 1,
1273 GNUNET_NO); 1264 GNUNET_NO);
1274 if (rc.iter == NULL)
1275 {
1276 h->result_count++;
1277 GNUNET_STATISTICS_update (h->stats,
1278 gettext_noop ("# Excess results received"),
1279 1,
1280 GNUNET_NO);
1281 if (h->result_count > MAX_EXCESS_RESULTS)
1282 {
1283 free_queue_entry (qe);
1284 GNUNET_STATISTICS_update (h->stats,
1285 gettext_noop ("# Forced database connection resets"),
1286 1,
1287 GNUNET_NO);
1288 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1289 do_disconnect (h);
1290 return;
1291 }
1292 if (GNUNET_YES == qe->one_shot)
1293 free_queue_entry (qe);
1294 else
1295 GNUNET_DATASTORE_iterate_get_next (h);
1296 return;
1297 }
1298 dm = (const struct DataMessage*) msg; 1265 dm = (const struct DataMessage*) msg;
1299#if DEBUG_DATASTORE 1266#if DEBUG_DATASTORE
1300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1267 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1304,10 +1271,9 @@ process_result_message (void *cls,
1304 ntohl(dm->size), 1271 ntohl(dm->size),
1305 GNUNET_h2s(&dm->key)); 1272 GNUNET_h2s(&dm->key));
1306#endif 1273#endif
1307 if (GNUNET_YES == qe->one_shot) 1274 free_queue_entry (qe);
1308 free_queue_entry (qe);
1309 h->retry_time.rel_value = 0; 1275 h->retry_time.rel_value = 0;
1310 rc.iter (rc.iter_cls, 1276 rc.proc (rc.proc_cls,
1311 &dm->key, 1277 &dm->key,
1312 ntohl(dm->size), 1278 ntohl(dm->size),
1313 &dm[1], 1279 &dm[1],
@@ -1331,33 +1297,33 @@ process_result_message (void *cls,
1331 * @param max_queue_size at what queue size should this request be dropped 1297 * @param max_queue_size at what queue size should this request be dropped
1332 * (if other requests of higher priority are in the queue) 1298 * (if other requests of higher priority are in the queue)
1333 * @param timeout how long to wait at most for a response 1299 * @param timeout how long to wait at most for a response
1334 * @param iter function to call on a random value; it 1300 * @param proc function to call on a random value; it
1335 * will be called once with a value (if available) 1301 * will be called once with a value (if available)
1336 * and always once with a value of NULL. 1302 * and always once with a value of NULL.
1337 * @param iter_cls closure for iter 1303 * @param proc_cls closure for proc
1338 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1304 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1339 * cancel; note that even if NULL is returned, the callback will be invoked 1305 * cancel
1340 * (or rather, will already have been invoked)
1341 */ 1306 */
1342struct GNUNET_DATASTORE_QueueEntry * 1307struct GNUNET_DATASTORE_QueueEntry *
1343GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, 1308GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1344 unsigned int queue_priority, 1309 unsigned int queue_priority,
1345 unsigned int max_queue_size, 1310 unsigned int max_queue_size,
1346 struct GNUNET_TIME_Relative timeout, 1311 struct GNUNET_TIME_Relative timeout,
1347 GNUNET_DATASTORE_Iterator iter, 1312 GNUNET_DATASTORE_DatumProcessor proc,
1348 void *iter_cls) 1313 void *proc_cls)
1349{ 1314{
1350 struct GNUNET_DATASTORE_QueueEntry *qe; 1315 struct GNUNET_DATASTORE_QueueEntry *qe;
1351 struct GNUNET_MessageHeader *m; 1316 struct GNUNET_MessageHeader *m;
1352 union QueueContext qc; 1317 union QueueContext qc;
1353 1318
1319 GNUNET_assert (NULL != proc);
1354#if DEBUG_DATASTORE 1320#if DEBUG_DATASTORE
1355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1321 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1356 "Asked to get replication entry in %llu ms\n", 1322 "Asked to get replication entry in %llu ms\n",
1357 (unsigned long long) timeout.rel_value); 1323 (unsigned long long) timeout.rel_value);
1358#endif 1324#endif
1359 qc.rc.iter = iter; 1325 qc.rc.proc = proc;
1360 qc.rc.iter_cls = iter_cls; 1326 qc.rc.proc_cls = proc_cls;
1361 qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), 1327 qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1362 queue_priority, max_queue_size, timeout, 1328 queue_priority, max_queue_size, timeout,
1363 &process_result_message, &qc); 1329 &process_result_message, &qc);
@@ -1369,7 +1335,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1369#endif 1335#endif
1370 return NULL; 1336 return NULL;
1371 } 1337 }
1372 qe->one_shot = GNUNET_YES;
1373 GNUNET_STATISTICS_update (h->stats, 1338 GNUNET_STATISTICS_update (h->stats,
1374 gettext_noop ("# GET REPLICATION requests executed"), 1339 gettext_noop ("# GET REPLICATION requests executed"),
1375 1, 1340 1,
@@ -1383,43 +1348,50 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1383 1348
1384 1349
1385/** 1350/**
1386 * Get a zero-anonymity value from the datastore. 1351 * Get a single zero-anonymity value from the datastore.
1387 * 1352 *
1388 * @param h handle to the datastore 1353 * @param h handle to the datastore
1354 * @param offset offset of the result (mod #num-results); set to
1355 * a random 64-bit value initially; then increment by
1356 * one each time; detect that all results have been found by uid
1357 * being again the first uid ever returned.
1389 * @param queue_priority ranking of this request in the priority queue 1358 * @param queue_priority ranking of this request in the priority queue
1390 * @param max_queue_size at what queue size should this request be dropped 1359 * @param max_queue_size at what queue size should this request be dropped
1391 * (if other requests of higher priority are in the queue) 1360 * (if other requests of higher priority are in the queue)
1392 * @param timeout how long to wait at most for a response 1361 * @param timeout how long to wait at most for a response
1393 * @param type allowed type for the operation 1362 * @param type allowed type for the operation (never zero)
1394 * @param iter function to call on a random value; it 1363 * @param proc function to call on a random value; it
1395 * will be called once with a value (if available) 1364 * will be called once with a value (if available)
1396 * and always once with a value of NULL. 1365 * or with NULL if none value exists.
1397 * @param iter_cls closure for iter 1366 * @param proc_cls closure for proc
1398 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1367 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1399 * cancel; note that even if NULL is returned, the callback will be invoked 1368 * cancel
1400 * (or rather, will already have been invoked)
1401 */ 1369 */
1402struct GNUNET_DATASTORE_QueueEntry * 1370struct GNUNET_DATASTORE_QueueEntry *
1403GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, 1371GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1404 unsigned int queue_priority, 1372 uint64_t offset,
1405 unsigned int max_queue_size, 1373 unsigned int queue_priority,
1406 struct GNUNET_TIME_Relative timeout, 1374 unsigned int max_queue_size,
1407 enum GNUNET_BLOCK_Type type, 1375 struct GNUNET_TIME_Relative timeout,
1408 GNUNET_DATASTORE_Iterator iter, 1376 enum GNUNET_BLOCK_Type type,
1409 void *iter_cls) 1377 GNUNET_DATASTORE_DatumProcessor proc,
1378 void *proc_cls)
1410{ 1379{
1411 struct GNUNET_DATASTORE_QueueEntry *qe; 1380 struct GNUNET_DATASTORE_QueueEntry *qe;
1412 struct GetZeroAnonymityMessage *m; 1381 struct GetZeroAnonymityMessage *m;
1413 union QueueContext qc; 1382 union QueueContext qc;
1414 1383
1384 GNUNET_assert (NULL != proc);
1415 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1385 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1416#if DEBUG_DATASTORE 1386#if DEBUG_DATASTORE
1417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418 "Asked to get zero-anonymity entry in %llu ms\n", 1388 "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1389 (unsigned long long) offset,
1390 type,
1419 (unsigned long long) timeout.rel_value); 1391 (unsigned long long) timeout.rel_value);
1420#endif 1392#endif
1421 qc.rc.iter = iter; 1393 qc.rc.proc = proc;
1422 qc.rc.iter_cls = iter_cls; 1394 qc.rc.proc_cls = proc_cls;
1423 qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), 1395 qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1424 queue_priority, max_queue_size, timeout, 1396 queue_priority, max_queue_size, timeout,
1425 &process_result_message, &qc); 1397 &process_result_message, &qc);
@@ -1427,7 +1399,7 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1427 { 1399 {
1428#if DEBUG_DATASTORE 1400#if DEBUG_DATASTORE
1429 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1401 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1430 "Could not create queue entry for zero-anonymity iteration\n"); 1402 "Could not create queue entry for zero-anonymity procation\n");
1431#endif 1403#endif
1432 return NULL; 1404 return NULL;
1433 } 1405 }
@@ -1439,55 +1411,57 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1439 m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); 1411 m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1440 m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); 1412 m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1441 m->type = htonl ((uint32_t) type); 1413 m->type = htonl ((uint32_t) type);
1414 m->offset = GNUNET_htonll (offset);
1442 process_queue (h); 1415 process_queue (h);
1443 return qe; 1416 return qe;
1444} 1417}
1445 1418
1446 1419
1447
1448/** 1420/**
1449 * Iterate over the results for a particular key 1421 * Get a result for a particular key from the datastore. The processor
1450 * in the datastore. The iterator will only be called 1422 * will only be called once.
1451 * once initially; if the first call did contain a
1452 * result, further results can be obtained by calling
1453 * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
1454 * 1423 *
1455 * @param h handle to the datastore 1424 * @param h handle to the datastore
1425 * @param offset offset of the result (mod #num-results); set to
1426 * a random 64-bit value initially; then increment by
1427 * one each time; detect that all results have been found by uid
1428 * being again the first uid ever returned.
1456 * @param key maybe NULL (to match all entries) 1429 * @param key maybe NULL (to match all entries)
1457 * @param type desired type, 0 for any 1430 * @param type desired type, 0 for any
1458 * @param queue_priority ranking of this request in the priority queue 1431 * @param queue_priority ranking of this request in the priority queue
1459 * @param max_queue_size at what queue size should this request be dropped 1432 * @param max_queue_size at what queue size should this request be dropped
1460 * (if other requests of higher priority are in the queue) 1433 * (if other requests of higher priority are in the queue)
1461 * @param timeout how long to wait at most for a response 1434 * @param timeout how long to wait at most for a response
1462 * @param iter function to call on each matching value; 1435 * @param proc function to call on each matching value;
1463 * will be called once with a NULL value at the end 1436 * will be called once with a NULL value at the end
1464 * @param iter_cls closure for iter 1437 * @param proc_cls closure for proc
1465 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1438 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1466 * cancel; note that even if NULL is returned, the callback will be invoked 1439 * cancel
1467 * (or rather, will already have been invoked)
1468 */ 1440 */
1469struct GNUNET_DATASTORE_QueueEntry * 1441struct GNUNET_DATASTORE_QueueEntry *
1470GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, 1442GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1471 const GNUNET_HashCode * key, 1443 uint64_t offset,
1472 enum GNUNET_BLOCK_Type type, 1444 const GNUNET_HashCode * key,
1473 unsigned int queue_priority, 1445 enum GNUNET_BLOCK_Type type,
1474 unsigned int max_queue_size, 1446 unsigned int queue_priority,
1475 struct GNUNET_TIME_Relative timeout, 1447 unsigned int max_queue_size,
1476 GNUNET_DATASTORE_Iterator iter, 1448 struct GNUNET_TIME_Relative timeout,
1477 void *iter_cls) 1449 GNUNET_DATASTORE_DatumProcessor proc,
1450 void *proc_cls)
1478{ 1451{
1479 struct GNUNET_DATASTORE_QueueEntry *qe; 1452 struct GNUNET_DATASTORE_QueueEntry *qe;
1480 struct GetMessage *gm; 1453 struct GetMessage *gm;
1481 union QueueContext qc; 1454 union QueueContext qc;
1482 1455
1456 GNUNET_assert (NULL != proc);
1483#if DEBUG_DATASTORE 1457#if DEBUG_DATASTORE
1484 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1485 "Asked to look for data of type %u under key `%s'\n", 1459 "Asked to look for data of type %u under key `%s'\n",
1486 (unsigned int) type, 1460 (unsigned int) type,
1487 GNUNET_h2s (key)); 1461 GNUNET_h2s (key));
1488#endif 1462#endif
1489 qc.rc.iter = iter; 1463 qc.rc.proc = proc;
1490 qc.rc.iter_cls = iter_cls; 1464 qc.rc.proc_cls = proc_cls;
1491 qe = make_queue_entry (h, sizeof(struct GetMessage), 1465 qe = make_queue_entry (h, sizeof(struct GetMessage),
1492 queue_priority, max_queue_size, timeout, 1466 queue_priority, max_queue_size, timeout,
1493 &process_result_message, &qc); 1467 &process_result_message, &qc);
@@ -1507,6 +1481,7 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1507 gm = (struct GetMessage*) &qe[1]; 1481 gm = (struct GetMessage*) &qe[1];
1508 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1482 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1509 gm->type = htonl(type); 1483 gm->type = htonl(type);
1484 gm->offset = GNUNET_htonll (offset);
1510 if (key != NULL) 1485 if (key != NULL)
1511 { 1486 {
1512 gm->header.size = htons(sizeof (struct GetMessage)); 1487 gm->header.size = htons(sizeof (struct GetMessage));
@@ -1522,25 +1497,6 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1522 1497
1523 1498
1524/** 1499/**
1525 * Function called to trigger obtaining the next result
1526 * from the datastore.
1527 *
1528 * @param h handle to the datastore
1529 */
1530void
1531GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1532{
1533 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1534
1535 h->in_receive = GNUNET_YES;
1536 GNUNET_CLIENT_receive (h->client,
1537 &process_result_message,
1538 h,
1539 GNUNET_TIME_absolute_get_remaining (qe->timeout));
1540}
1541
1542
1543/**
1544 * Cancel a datastore operation. The final callback from the 1500 * Cancel a datastore operation. The final callback from the
1545 * operation must not have been done yet. 1501 * operation must not have been done yet.
1546 * 1502 *
@@ -1551,6 +1507,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1551{ 1507{
1552 struct GNUNET_DATASTORE_Handle *h; 1508 struct GNUNET_DATASTORE_Handle *h;
1553 1509
1510 GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1554 h = qe->h; 1511 h = qe->h;
1555#if DEBUG_DATASTORE 1512#if DEBUG_DATASTORE
1556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1562,7 +1519,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1562 if (GNUNET_YES == qe->was_transmitted) 1519 if (GNUNET_YES == qe->was_transmitted)
1563 { 1520 {
1564 free_queue_entry (qe); 1521 free_queue_entry (qe);
1565 h->expect_end_or_disconnect++; 1522 h->skip_next_messages++;
1566 return; 1523 return;
1567 } 1524 }
1568 free_queue_entry (qe); 1525 free_queue_entry (qe);