summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
commit27ed8fcbc85a361864948edb517d47804c2b5a56 (patch)
tree01626713ea5b2ead4691f13eb66a1574b1c0c7fd /src
parentb6c71d97d2a4bb3cb0e0e0ac1cd2a4e145748cc6 (diff)
datastore and fs fixes from Easter
Diffstat (limited to 'src')
-rw-r--r--src/core/core_api.c2
-rw-r--r--src/datastore/datastore.h10
-rw-r--r--src/datastore/datastore_api.c279
-rw-r--r--src/datastore/gnunet-service-datastore.c242
-rw-r--r--src/datastore/perf_datastore_api.c1
-rw-r--r--src/datastore/perf_plugin_datastore.c78
-rw-r--r--src/datastore/perf_plugin_datastore_data_postgres.conf2
-rw-r--r--src/datastore/plugin_datastore_mysql.c790
-rw-r--r--src/datastore/plugin_datastore_postgres.c525
-rw-r--r--src/datastore/plugin_datastore_sqlite.c918
-rw-r--r--src/datastore/plugin_datastore_template.c110
-rw-r--r--src/datastore/test_datastore_api.c285
-rw-r--r--src/datastore/test_datastore_api_data_sqlite.conf2
-rw-r--r--src/datastore/test_datastore_api_management.c68
-rw-r--r--src/datastore/test_plugin_datastore.c89
-rw-r--r--src/fs/Makefile.am4
-rw-r--r--src/fs/fs_download.c19
-rw-r--r--src/fs/fs_test_lib_data.conf6
-rw-r--r--src/fs/gnunet-pseudonym.c2
-rw-r--r--src/fs/gnunet-service-fs_cp.c7
-rw-r--r--src/fs/gnunet-service-fs_indexing.c2
-rw-r--r--src/fs/gnunet-service-fs_indexing.h2
-rw-r--r--src/fs/gnunet-service-fs_pe.c2
-rw-r--r--src/fs/gnunet-service-fs_pr.c147
-rw-r--r--src/fs/gnunet-service-fs_put.c174
-rw-r--r--src/fs/test_fs_download_data.conf3
-rwxr-xr-xsrc/fs/test_gnunet_fs_idx.py.in2
-rw-r--r--src/fs/test_gnunet_fs_ns_data.conf2
-rw-r--r--src/fs/test_gnunet_service_fs_migration_data.conf2
-rw-r--r--src/include/gnunet_datastore_plugin.h143
-rw-r--r--src/include/gnunet_datastore_service.h109
31 files changed, 1428 insertions, 2599 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 185e09d65..efb00c111 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -1842,6 +1842,7 @@ change_preference_send_continuation (void *cls,
struct GNUNET_CORE_InformationRequestContext *irc = cls;
irc->cm = NULL;
+ // FIXME: who frees 'irc'?
}
@@ -1901,6 +1902,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
irc->h = h;
irc->pr = pr;
+ // FIXME: who frees 'irc'? (if not cancelled?)
cm = GNUNET_malloc (sizeof (struct ControlMessage) +
sizeof (struct RequestInfoMessage));
cm->cont = &change_preference_send_continuation;
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h
index 55ca7c8e5..d66ec0e95 100644
--- a/src/datastore/datastore.h
+++ b/src/datastore/datastore.h
@@ -114,6 +114,11 @@ struct GetMessage
uint32_t type GNUNET_PACKED;
/**
+ * Offset of the result.
+ */
+ uint64_t offset GNUNET_PACKED;
+
+ /**
* Desired key (optional). Check the "size" of the
* header to see if the key is actually present.
*/
@@ -138,6 +143,11 @@ struct GetZeroAnonymityMessage
*/
uint32_t type GNUNET_PACKED;
+ /**
+ * Offset of the result.
+ */
+ uint64_t offset GNUNET_PACKED;
+
};
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 2bba2e8ee..99060bd60 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -63,14 +63,14 @@ struct StatusContext
struct ResultContext
{
/**
- * Iterator to call with the result.
+ * Function to call with the result.
*/
- GNUNET_DATASTORE_Iterator iter;
+ GNUNET_DATASTORE_DatumProcessor proc;
/**
- * Closure for iter.
+ * Closure for proc.
*/
- void *iter_cls;
+ void *proc_cls;
};
@@ -168,12 +168,6 @@ struct GNUNET_DATASTORE_QueueEntry
*/
int was_transmitted;
- /**
- * Are we expecting a single message in response to this
- * request (and, if it is data, no 'END' message)?
- */
- int one_shot;
-
};
/**
@@ -241,10 +235,9 @@ struct GNUNET_DATASTORE_Handle
int in_receive;
/**
- * We should either receive (and ignore) an 'END' message or force a
- * disconnect for the next message from the service.
+ * We should ignore the next message(s) from the service.
*/
- unsigned int expect_end_or_disconnect;
+ unsigned int skip_next_messages;
};
@@ -335,7 +328,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
while (NULL != (qe = h->queue_head))
{
GNUNET_assert (NULL != qe->response_proc);
- qe->response_proc (qe, NULL);
+ qe->response_proc (h, NULL);
}
if (GNUNET_YES == drop)
{
@@ -378,7 +371,7 @@ timeout_queue_entry (void *cls,
GNUNET_NO);
qe->task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (qe->was_transmitted == GNUNET_NO);
- qe->response_proc (qe, NULL);
+ qe->response_proc (qe->h, NULL);
}
@@ -394,7 +387,7 @@ timeout_queue_entry (void *cls,
* @param timeout timeout for the operation
* @param response_proc function to call with replies (can be NULL)
* @param qc client context (NOT a closure for response_proc)
- * @return NULL if the queue is full (and this entry was dropped)
+ * @return NULL if the queue is full
*/
static struct GNUNET_DATASTORE_QueueEntry *
make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
@@ -418,6 +411,14 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
c++;
pos = pos->next;
}
+ if (c >= max_queue_size)
+ {
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# queue overflows"),
+ 1,
+ GNUNET_NO);
+ return NULL;
+ }
ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
ret->h = h;
ret->response_proc = response_proc;
@@ -451,15 +452,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
pos,
ret);
h->queue_size++;
- if (c > max_queue_size)
- {
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# queue overflows"),
- 1,
- GNUNET_NO);
- response_proc (ret, NULL);
- return NULL;
- }
ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
&timeout_queue_entry,
ret);
@@ -469,7 +461,15 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
if (pos->max_queue < h->queue_size)
{
GNUNET_assert (pos->response_proc != NULL);
- pos->response_proc (pos, NULL);
+ /* move 'pos' element to head so that it will be
+ killed on 'NULL' call below */
+ GNUNET_CONTAINER_DLL_remove (h->queue_head,
+ h->queue_tail,
+ pos);
+ GNUNET_CONTAINER_DLL_insert (h->queue_head,
+ h->queue_tail,
+ pos);
+ pos->response_proc (h, NULL);
break;
}
pos = pos->next;
@@ -550,6 +550,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
GNUNET_NO);
#endif
GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+ h->skip_next_messages = 0;
h->client = NULL;
h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
&try_reconnect,
@@ -700,6 +701,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
qe->task = GNUNET_SCHEDULER_NO_TASK;
}
h->queue_size--;
+ qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
GNUNET_free (qe);
}
@@ -724,16 +726,22 @@ process_status_message (void *cls,
int was_transmitted;
h->in_receive = GNUNET_NO;
+ if (h->skip_next_messages > 0)
+ {
+ h->skip_next_messages--;
+ process_queue (h);
+ return;
+ }
if (NULL == (qe = h->queue_head))
{
GNUNET_break (0);
do_disconnect (h);
return;
}
- was_transmitted = qe->was_transmitted;
rc = qe->qc.sc;
if (msg == NULL)
{
+ was_transmitted = qe->was_transmitted;
free_queue_entry (qe);
if (NULL == h->client)
return; /* forced disconnect */
@@ -1114,7 +1122,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode *key,
- size_t size,
+ size_t size,
const void *data,
unsigned int queue_priority,
unsigned int max_queue_size,
@@ -1186,45 +1194,35 @@ process_result_message (void *cls,
struct GNUNET_DATASTORE_QueueEntry *qe;
struct ResultContext rc;
const struct DataMessage *dm;
- int was_transmitted;
h->in_receive = GNUNET_NO;
+ if (h->skip_next_messages > 0)
+ {
+ h->skip_next_messages--;
+ process_queue (h);
+ return;
+ }
if (msg == NULL)
{
- if (NULL != (qe = h->queue_head))
+ qe = h->queue_head;
+ GNUNET_assert (NULL != qe);
+ if (qe->was_transmitted == GNUNET_YES)
{
- was_transmitted = qe->was_transmitted;
- free_queue_entry (qe);
rc = qe->qc.rc;
- if (was_transmitted == GNUNET_YES)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to receive response from database.\n"));
- do_disconnect (h);
- }
- else
- {
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request dropped due to finite datastore queue length.\n");
-#endif
- }
- if (rc.iter != NULL)
- rc.iter (rc.iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to receive response from database.\n"));
+ do_disconnect (h);
}
+ free_queue_entry (qe);
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
{
GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
- if (h->expect_end_or_disconnect > 0)
- {
- h->expect_end_or_disconnect--;
- process_queue (h);
- return;
- }
qe = h->queue_head;
rc = qe->qc.rc;
GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1234,8 +1232,8 @@ process_result_message (void *cls,
"Received end of result set, new queue size is %u\n",
h->queue_size);
#endif
- if (rc.iter != NULL)
- rc.iter (rc.iter_cls,
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls,
NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
h->retry_time.rel_value = 0;
@@ -1243,13 +1241,6 @@ process_result_message (void *cls,
process_queue (h);
return;
}
- if (h->expect_end_or_disconnect > 0)
- {
- /* only 'END' allowed, must reconnect */
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- return;
- }
qe = h->queue_head;
rc = qe->qc.rc;
GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1261,40 +1252,16 @@ process_result_message (void *cls,
free_queue_entry (qe);
h->retry_time = GNUNET_TIME_UNIT_ZERO;
do_disconnect (h);
- if (rc.iter != NULL)
- rc.iter (rc.iter_cls,
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls,
NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
GNUNET_STATISTICS_update (h->stats,
gettext_noop ("# Results received"),
1,
GNUNET_NO);
- if (rc.iter == NULL)
- {
- h->result_count++;
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# Excess results received"),
- 1,
- GNUNET_NO);
- if (h->result_count > MAX_EXCESS_RESULTS)
- {
- free_queue_entry (qe);
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# Forced database connection resets"),
- 1,
- GNUNET_NO);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- return;
- }
- if (GNUNET_YES == qe->one_shot)
- free_queue_entry (qe);
- else
- GNUNET_DATASTORE_iterate_get_next (h);
- return;
- }
dm = (const struct DataMessage*) msg;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1304,10 +1271,9 @@ process_result_message (void *cls,
ntohl(dm->size),
GNUNET_h2s(&dm->key));
#endif
- if (GNUNET_YES == qe->one_shot)
- free_queue_entry (qe);
+ free_queue_entry (qe);
h->retry_time.rel_value = 0;
- rc.iter (rc.iter_cls,
+ rc.proc (rc.proc_cls,
&dm->key,
ntohl(dm->size),
&dm[1],
@@ -1331,33 +1297,33 @@ process_result_message (void *cls,
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
* will be called once with a value (if available)
* and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
unsigned int queue_priority,
unsigned int max_queue_size,
struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls)
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls)
{
struct GNUNET_DATASTORE_QueueEntry *qe;
struct GNUNET_MessageHeader *m;
union QueueContext qc;
+ GNUNET_assert (NULL != proc);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to get replication entry in %llu ms\n",
(unsigned long long) timeout.rel_value);
#endif
- qc.rc.iter = iter;
- qc.rc.iter_cls = iter_cls;
+ qc.rc.proc = proc;
+ qc.rc.proc_cls = proc_cls;
qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
@@ -1369,7 +1335,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
#endif
return NULL;
}
- qe->one_shot = GNUNET_YES;
GNUNET_STATISTICS_update (h->stats,
gettext_noop ("# GET REPLICATION requests executed"),
1,
@@ -1383,43 +1348,50 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
/**
- * Get a zero-anonymity value from the datastore.
+ * Get a single zero-anonymity value from the datastore.
*
* @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ * a random 64-bit value initially; then increment by
+ * one each time; detect that all results have been found by uid
+ * being again the first uid ever returned.
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
- * @param type allowed type for the operation
- * @param iter function to call on a random value; it
+ * @param type allowed type for the operation (never zero)
+ * @param proc function to call on a random value; it
* will be called once with a value (if available)
- * and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * or with NULL if none value exists.
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- enum GNUNET_BLOCK_Type type,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls)
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+ uint64_t offset,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_BLOCK_Type type,
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls)
{
struct GNUNET_DATASTORE_QueueEntry *qe;
struct GetZeroAnonymityMessage *m;
union QueueContext qc;
+ GNUNET_assert (NULL != proc);
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to get zero-anonymity entry in %llu ms\n",
+ "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
+ (unsigned long long) offset,
+ type,
(unsigned long long) timeout.rel_value);
#endif
- qc.rc.iter = iter;
- qc.rc.iter_cls = iter_cls;
+ qc.rc.proc = proc;
+ qc.rc.proc_cls = proc_cls;
qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
@@ -1427,7 +1399,7 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
{
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry for zero-anonymity iteration\n");
+ "Could not create queue entry for zero-anonymity procation\n");
#endif
return NULL;
}
@@ -1439,55 +1411,57 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
m->type = htonl ((uint32_t) type);
+ m->offset = GNUNET_htonll (offset);
process_queue (h);
return qe;
}
-
/**
- * Iterate over the results for a particular key
- * in the datastore. The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
+ * Get a result for a particular key from the datastore. The processor
+ * will only be called once.
*
* @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ * a random 64-bit value initially; then increment by
+ * one each time; detect that all results have been found by uid
+ * being again the first uid ever returned.
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
* will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
- const GNUNET_HashCode * key,
- enum GNUNET_BLOCK_Type type,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls)
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
+ uint64_t offset,
+ const GNUNET_HashCode * key,
+ enum GNUNET_BLOCK_Type type,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls)
{
struct GNUNET_DATASTORE_QueueEntry *qe;
struct GetMessage *gm;
union QueueContext qc;
+ GNUNET_assert (NULL != proc);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to look for data of type %u under key `%s'\n",
(unsigned int) type,
GNUNET_h2s (key));
#endif
- qc.rc.iter = iter;
- qc.rc.iter_cls = iter_cls;
+ qc.rc.proc = proc;
+ qc.rc.proc_cls = proc_cls;
qe = make_queue_entry (h, sizeof(struct GetMessage),
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
@@ -1507,6 +1481,7 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
gm = (struct GetMessage*) &qe[1];
gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl(type);
+ gm->offset = GNUNET_htonll (offset);
if (key != NULL)
{
gm->header.size = htons(sizeof (struct GetMessage));
@@ -1522,25 +1497,6 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
/**
- * Function called to trigger obtaining the next result
- * from the datastore.
- *
- * @param h handle to the datastore
- */
-void
-GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
-{
- struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-
- h->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (h->client,
- &process_result_message,
- h,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
-}
-
-
-/**
* Cancel a datastore operation. The final callback from the
* operation must not have been done yet.
*
@@ -1551,6 +1507,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
{
struct GNUNET_DATASTORE_Handle *h;
+ GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
h = qe->h;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1562,7 +1519,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
if (GNUNET_YES == qe->was_transmitted)
{
free_queue_entry (qe);
- h->expect_end_or_disconnect++;
+ h->skip_next_messages++;
return;
}
free_queue_entry (qe);
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 566a227c1..deab62dd0 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -209,18 +209,6 @@ sync_stats ()
-
-/**
- * Function called once the transmit operation has
- * either failed or succeeded.
- *
- * @param cls closure
- * @param status GNUNET_OK on success, GNUNET_SYSERR on error
- */
-typedef void (*TransmitContinuation)(void *cls,
- int status);
-
-
/**
* Context for transmitting replies to clients.
*/
@@ -252,22 +240,6 @@ struct TransmitCallbackContext
*/
struct GNUNET_SERVER_Client *client;
- /**
- * Function to call once msg has been transmitted
- * (or at least added to the buffer).
- */
- TransmitContinuation tc;
-
- /**
- * Closure for tc.
- */
- void *tc_cls;
-
- /**
- * GNUNET_YES if we are supposed to signal the server
- * completion of the client's request.
- */
- int end;
};
@@ -330,7 +302,6 @@ delete_expired (void *cls,
*/
static int
expired_processor (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -396,7 +367,7 @@ delete_expired (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->api->expiration_get (plugin->api->cls,
+ plugin->api->get_expiration (plugin->api->cls,
&expired_processor,
NULL);
}
@@ -424,7 +395,6 @@ delete_expired (void *cls,
*/
static int
quota_processor (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -487,7 +457,7 @@ manage_space (unsigned long long need)
(last != need) )
{
last = need;
- plugin->api->expiration_get (plugin->api->cls,
+ plugin->api->get_expiration (plugin->api->cls,
&quota_processor,
&need);
}
@@ -521,14 +491,7 @@ transmit_callback (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Transmission to client failed!\n"));
- if (tcc->tc != NULL)
- tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
- if (GNUNET_YES == tcc->end)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Disconnecting client due to transmission failure!\n"));
- GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
- }
+ GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
@@ -536,23 +499,7 @@ transmit_callback (void *cls,
}
GNUNET_assert (size >= msize);
memcpy (buf, tcc->msg, msize);
- if (tcc->tc != NULL)
- tcc->tc (tcc->tc_cls, GNUNET_OK);
- if (GNUNET_YES == tcc->end)
- {
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Done processing client request\n");
-#endif
- GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
- }
- else
- {
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Response transmitted, more pending!\n");
-#endif
- }
+ GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
@@ -567,16 +514,10 @@ transmit_callback (void *cls,
* @param msg message to transmit, will be freed!
* @param tc function to call afterwards
* @param tc_cls closure for tc
- * @param end is this the last response (and we should
- * signal the server completion accodingly after
- * transmitting this message)?
*/
static void
transmit (struct GNUNET_SERVER_Client *client,
- struct GNUNET_MessageHeader *msg,
- TransmitContinuation tc,
- void *tc_cls,
- int end)
+ struct GNUNET_MessageHeader *msg)
{
struct TransmitCallbackContext *tcc;
@@ -586,17 +527,13 @@ transmit (struct GNUNET_SERVER_Client *client,
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Shutdown in progress, aborting transmission.\n");
#endif
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
GNUNET_free (msg);
- if (NULL != tc)
- tc (tc_cls, GNUNET_SYSERR);
return;
}
tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
tcc->msg = msg;
tcc->client = client;
- tcc->tc = tc;
- tcc->tc_cls = tc_cls;
- tcc->end = end;
if (NULL ==
(tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
ntohs(msg->size),
@@ -605,14 +542,7 @@ transmit (struct GNUNET_SERVER_Client *client,
tcc)))
{
GNUNET_break (0);
- if (GNUNET_YES == end)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Forcefully disconnecting client.\n"));
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- }
- if (NULL != tc)
- tc (tc_cls, GNUNET_SYSERR);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
GNUNET_free (msg);
GNUNET_free (tcc);
return;
@@ -653,33 +583,10 @@ transmit_status (struct GNUNET_SERVER_Client *client,
sm->status = htonl(code);
if (slen > 0)
memcpy (&sm[1], msg, slen);
- transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
+ transmit (client, &sm->header);
}
-/**
- * Function called once the transmit operation has
- * either failed or succeeded.
- *
- * @param next_cls closure for calling "next_request" callback
- * @param status GNUNET_OK on success, GNUNET_SYSERR on error
- */
-static void
-get_next(void *next_cls,
- int status)
-{
- if (status != GNUNET_OK)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Failed to transmit an item to the client; aborting iteration.\n"));
- if (plugin != NULL)
- plugin->api->next_request (next_cls, GNUNET_YES);
- return;
- }
- if (next_cls != NULL)
- plugin->api->next_request (next_cls, GNUNET_NO);
-}
-
/**
* Function that will transmit the given datastore entry
@@ -702,7 +609,6 @@ get_next(void *next_cls,
*/
static int
transmit_item (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -727,10 +633,11 @@ transmit_item (void *cls,
end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
end->size = htons(sizeof(struct GNUNET_MessageHeader));
end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
- transmit (client, end, NULL, NULL, GNUNET_YES);
+ transmit (client, end);
GNUNET_SERVER_client_drop (client);
return GNUNET_OK;
}
+ GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
dm->header.size = htons(sizeof(struct DataMessage) + size);
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
@@ -754,8 +661,7 @@ transmit_item (void *cls,
gettext_noop ("# results found"),
1,
GNUNET_NO);
- transmit (client, &dm->header, &get_next, next_cls,
- (next_cls != NULL) ? GNUNET_NO : GNUNET_YES);
+ transmit (client, &dm->header);
return GNUNET_OK;
}
@@ -939,11 +845,6 @@ struct PutContext
* Client to notify on completion.
*/
struct GNUNET_SERVER_Client *client;
-
- /**
- * Did we find the data already in the database?
- */
- int is_present;
/* followed by the 'struct DataMessage' */
};
@@ -1009,7 +910,6 @@ execute_put (struct GNUNET_SERVER_Client *client,
* matches the put and if none match executes the put.
*
* @param cls closure, pointer to the client (of type 'struct PutContext').
- * @param next_cls closure to use to ask for the next item
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
@@ -1020,12 +920,11 @@ execute_put (struct GNUNET_SERVER_Client *client,
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
*
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
- * GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK usually
+ * GNUNET_NO to delete the item
*/
static int
check_present (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -1041,13 +940,10 @@ check_present (void *cls,
dm = (const struct DataMessage*) &pc[1];
if (key == NULL)
{
- if (pc->is_present == GNUNET_YES)
- transmit_status (pc->client, GNUNET_NO, NULL);
- else
- execute_put (pc->client, dm);
+ execute_put (pc->client, dm);
GNUNET_SERVER_client_drop (pc->client);
GNUNET_free (pc);
- return GNUNET_SYSERR;
+ return GNUNET_OK;
}
if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
(GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
@@ -1056,12 +952,19 @@ check_present (void *cls,
data,
size)) ) )
{
- pc->is_present = GNUNET_YES;
- plugin->api->next_request (next_cls, GNUNET_YES);
+#if DEBUG_MYSQL
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Result already present in datastore\n");
+#endif
+ transmit_status (pc->client, GNUNET_NO, NULL);
+ GNUNET_SERVER_client_drop (pc->client);
+ GNUNET_free (pc);
}
else
{
- plugin->api->next_request (next_cls, GNUNET_NO);
+ execute_put (pc->client, dm);
+ GNUNET_SERVER_client_drop (pc->client);
+ GNUNET_free (pc);
}
return GNUNET_OK;
}
@@ -1083,6 +986,7 @@ handle_put (void *cls,
int rid;
struct ReservationList *pos;
struct PutContext *pc;
+ GNUNET_HashCode vhash;
uint32_t size;
if ( (dm == NULL) ||
@@ -1124,16 +1028,18 @@ handle_put (void *cls,
if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
&dm->key))
{
+ GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
pc->client = client;
GNUNET_SERVER_client_keep (client);
memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
- plugin->api->get (plugin->api->cls,
- &dm->key,
- NULL,
- ntohl (dm->type),
- &check_present,
- pc);
+ plugin->api->get_key (plugin->api->cls,
+ 0,
+ &dm->key,
+ &vhash,
+ ntohl (dm->type),
+ &check_present,
+ pc);
return;
}
execute_put (client, dm);
@@ -1192,16 +1098,17 @@ handle_get (void *cls,
1,
GNUNET_NO);
transmit_item (client,
- NULL, NULL, 0, NULL, 0, 0, 0,
+ NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- plugin->api->get (plugin->api->cls,
- ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
- NULL,
- ntohl(msg->type),
- &transmit_item,
- client);
+ plugin->api->get_key (plugin->api->cls,
+ GNUNET_ntohll (msg->offset),
+ ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
+ NULL,
+ ntohl(msg->type),
+ &transmit_item,
+ client);
}
@@ -1265,7 +1172,7 @@ handle_get_replication (void *cls,
1,
GNUNET_NO);
GNUNET_SERVER_client_keep (client);
- plugin->api->replication_get (plugin->api->cls,
+ plugin->api->get_replication (plugin->api->cls,
&transmit_item,
client);
}
@@ -1303,37 +1210,20 @@ handle_get_zero_anonymity (void *cls,
1,
GNUNET_NO);
GNUNET_SERVER_client_keep (client);
- plugin->api->iter_zero_anonymity (plugin->api->cls,
- type,
- &transmit_item,
- client);
+ plugin->api->get_zero_anonymity (plugin->api->cls,
+ GNUNET_ntohll (msg->offset),
+ type,
+ &transmit_item,
+ client);
}
/**
- * Context for the 'remove_callback'.
- */
-struct RemoveContext
-{
- /**
- * Client for whom we're doing the remvoing.
- */
- struct GNUNET_SERVER_Client *client;
-
- /**
- * GNUNET_YES if we managed to remove something.
- */
- int found;
-};
-
-
-/**
* Callback function that will cause the item that is passed
* in to be deleted (by returning GNUNET_NO).
*/
static int
remove_callback (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -1343,7 +1233,7 @@ remove_callback (void *cls,
struct GNUNET_TIME_Absolute
expiration, uint64_t uid)
{
- struct RemoveContext *rc = cls;
+ struct GNUNET_SERVER_Client *client = cls;
if (key == NULL)
{
@@ -1352,15 +1242,10 @@ remove_callback (void *cls,
"No further matches for `%s' request.\n",
"REMOVE");
#endif
- if (GNUNET_YES == rc->found)
- transmit_status (rc->client, GNUNET_OK, NULL);
- else
- transmit_status (rc->client, GNUNET_NO, _("Content not found"));
- GNUNET_SERVER_client_drop (rc->client);
- GNUNET_free (rc);
+ transmit_status (client, GNUNET_NO, _("Content not found"));
+ GNUNET_SERVER_client_drop (client);
return GNUNET_OK; /* last item */
}
- rc->found = GNUNET_YES;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Item %llu matches `%s' request for key `%s' and type %u.\n",
@@ -1375,7 +1260,8 @@ remove_callback (void *cls,
GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
- plugin->api->next_request (next_cls, GNUNET_YES);
+ transmit_status (client, GNUNET_OK, NULL);
+ GNUNET_SERVER_client_drop (client);
return GNUNET_NO;
}
@@ -1394,7 +1280,6 @@ handle_remove (void *cls,
{
const struct DataMessage *dm = check_data (message);
GNUNET_HashCode vhash;
- struct RemoveContext *rc;
if (dm == NULL)
{
@@ -1413,18 +1298,17 @@ handle_remove (void *cls,
gettext_noop ("# REMOVE requests received"),
1,
GNUNET_NO);
- rc = GNUNET_malloc (sizeof(struct RemoveContext));
GNUNET_SERVER_client_keep (client);
- rc->client = client;
GNUNET_CRYPTO_hash (&dm[1],
ntohl(dm->size),
&vhash);
- plugin->api->get (plugin->api->cls,
- &dm->key,
- &vhash,
- (enum GNUNET_BLOCK_Type) ntohl(dm->type),
- &remove_callback,
- rc);
+ plugin->api->get_key (plugin->api->cls,
+ 0,
+ &dm->key,
+ &vhash,
+ (enum GNUNET_BLOCK_Type) ntohl(dm->type),
+ &remove_callback,
+ client);
}
@@ -1469,7 +1353,7 @@ disk_utilization_change_cb (void *cls,
_("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
(long long) payload,
(long long) -delta);
- payload = plugin->api->get_size (plugin->api->cls);
+ payload = plugin->api->estimate_size (plugin->api->cls);
sync_stats ();
return;
}
@@ -1518,7 +1402,7 @@ process_stat_done (void *cls,
stat_get = NULL;
if (stats_worked == GNUNET_NO)
- payload = plugin->api->get_size (plugin->api->cls);
+ payload = plugin->api->estimate_size (plugin->api->cls);
}
@@ -1636,8 +1520,6 @@ cleaning_task (void *cls,
GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
GNUNET_SERVER_client_drop (tcc->client);
}
- if (NULL != tcc->tc)
- tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
}
diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c
index 6ea65c68d..2f0eb0de9 100644
--- a/src/datastore/perf_datastore_api.c
+++ b/src/datastore/perf_datastore_api.c
@@ -385,6 +385,7 @@ check ()
GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
argv, "perf-datastore-api", "nohelp",
options, &run, NULL);
+ sleep (1); /* give datastore chance to process 'DROP' */
if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c
index 2903a8f28..6befa120c 100644
--- a/src/datastore/perf_plugin_datastore.c
+++ b/src/datastore/perf_plugin_datastore.c
@@ -37,7 +37,7 @@
* those take too long to run them in the usual "make check"
* sequence. Hence the value used for shipping is tiny.
*/
-#define MAX_SIZE 1024LL * 1024 * 128
+#define MAX_SIZE 1024LL * 1024 * 32
#define ITERATIONS 2
@@ -81,6 +81,7 @@ struct CpsRunContext
enum RunPhase phase;
unsigned int cnt;
unsigned int iter;
+ uint64_t offset;
};
@@ -100,7 +101,8 @@ disk_utilization_change_cb (void *cls,
static void
-putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k)
+putValue (struct GNUNET_DATASTORE_PluginFunctions * api,
+ int i, int k)
{
char value[65536];
size_t size;
@@ -156,7 +158,6 @@ test (void *cls,
static int
iterate_zeros (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -171,7 +172,18 @@ iterate_zeros (void *cls,
int i;
const char *cdata = data;
- if (key == NULL)
+ GNUNET_assert (key != NULL);
+ GNUNET_assert (size >= 8);
+ memcpy (&i, &cdata[4], sizeof (i));
+ hits[i/8] |= (1 << (i % 8));
+
+#if VERBOSE
+ fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
+ type, priority, size,
+ (unsigned long long) expiration.abs_value);
+#endif
+ crc->cnt++;
+ if (crc->cnt == PUT_10 / 4 - 1)
{
char buf[256];
unsigned int bc;
@@ -192,42 +204,17 @@ iterate_zeros (void *cls,
crc->cnt);
GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
memset (hits, 0, sizeof (hits));
- if ( (int) (PUT_10 / 4 - crc->cnt) > 2)
- {
- fprintf (stderr,
- "Got %d items, expected %d\n",
- (int) crc->cnt, (int) PUT_10 / 4);
- GNUNET_break (0);
- crc->phase = RP_ERROR;
- }
- else
- {
- crc->phase++;
- crc->cnt = 0;
- crc->start = GNUNET_TIME_absolute_get ();
- }
- GNUNET_SCHEDULER_add_now (&test, crc);
- return GNUNET_OK;
+ crc->phase++;
+ crc->cnt = 0;
+ crc->start = GNUNET_TIME_absolute_get ();
}
- GNUNET_assert (size >= 8);
- memcpy (&i, &cdata[4], sizeof (i));
- hits[i/8] |= (1 << (i % 8));
-
-#if VERBOSE
- fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
- type, priority, size,
- (unsigned long long) expiration.abs_value);
-#endif
- crc->cnt++;
- crc->api->next_request (next_cls,
- GNUNET_NO);
+ GNUNET_SCHEDULER_add_now (&test, crc);
return GNUNET_OK;
}
static int
expiration_get (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -281,7 +268,6 @@ expiration_get (void *cls,
static int
replication_get (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -323,6 +309,7 @@ replication_get (void *cls,
GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
memset (hits, 0, sizeof (hits));
crc->phase++;
+ crc->offset = 0;
crc->cnt = 0;
crc->start = GNUNET_TIME_absolute_get ();
}
@@ -386,7 +373,15 @@ test (void *cls,
int j;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- crc->phase = RP_ERROR;
+ {
+ GNUNET_break (0);
+ crc->phase = RP_ERROR;
+ }
+#if VERBOSE
+ fprintf (stderr, "In phase %d, iteration %u\n",
+ crc->phase,
+ crc->cnt);
+#endif
switch (crc->phase)
{
case RP_ERROR:
@@ -419,17 +414,19 @@ test (void *cls,
GNUNET_SCHEDULER_add_now (&test, crc);
break;
case RP_REP_GET:
- crc->api->replication_get (crc->api->cls,
+ crc->api->get_replication (crc->api->cls,
&replication_get,
crc);
break;
case RP_ZA_GET:
- crc->api->iter_zero_anonymity (crc->api->cls, 1,
- &iterate_zeros,
- crc);
+ crc->api->get_zero_anonymity (crc->api->cls,
+ crc->offset++,
+ 1,
+ &iterate_zeros,
+ crc);
break;
case RP_EXP_GET:
- crc->api->expiration_get (crc->api->cls,
+ crc->api->get_expiration (crc->api->cls,
&expiration_get,
crc);
break;
@@ -549,7 +546,6 @@ main (int argc, char *argv[])
char *pos;
char dir_name[128];
- if (1) return 0;
/* determine name of plugin to use */
plugin_name = argv[0];
while (NULL != (pos = strstr(plugin_name, "_")))
diff --git a/src/datastore/perf_plugin_datastore_data_postgres.conf b/src/datastore/perf_plugin_datastore_data_postgres.conf
index c2a181bc7..b7cf174e9 100644
--- a/src/datastore/perf_plugin_datastore_data_postgres.conf
+++ b/src/datastore/perf_plugin_datastore_data_postgres.conf
@@ -20,7 +20,7 @@ DATABASE = postgres
# REJECT_FROM =
# REJECT_FROM6 =
# PREFIX =
-
+# DEBUG = YES
[dht]
AUTOSTART = NO
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index c3d9212d3..2eefd9b04 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -160,58 +160,6 @@ struct GNUNET_MysqlStatementHandle
};
-/**
- * Context for the universal iterator.
- */
-struct NextRequestClosure;
-
-/**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- * call which gives the callback a chance to
- * clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- * no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
- struct NextRequestClosure *nc);
-
-
-struct NextRequestClosure
-{
- struct Plugin *plugin;
-
- struct GNUNET_TIME_Absolute now;
-
- /**
- * Function to call to prepare the next
- * iteration.
- */
- PrepareFunction prep;
-
- /**
- * Closure for prep.
- */
- void *prep_cls;
-
- MYSQL_BIND rbind[7];
-
- enum GNUNET_BLOCK_Type type;
-
- PluginIterator dviter;
-
- void *dviter_cls;
-
- unsigned int count;
-
- int end_it;
-
- int one_shot;
-};
-
/**
* Context for all functions in this plugin.
@@ -244,16 +192,6 @@ struct Plugin
char *cnffile;
/**
- * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
- */
- struct NextRequestClosure *next_task_nc;
-
- /**
- * Pending task with scheduler for running the next request.
- */
- GNUNET_SCHEDULER_TaskIdentifier next_task;
-
- /**
* Prepared statements.
*/
#define INSERT_ENTRY "INSERT INTO gn090 (repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
@@ -295,7 +233,7 @@ struct Plugin
#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
struct GNUNET_MysqlStatementHandle *get_size;
-#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?"
+#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
struct GNUNET_MysqlStatementHandle *zero_iter;
#define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\
@@ -372,7 +310,6 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
}
-
/**
* Free a prepared statement.
*
@@ -381,8 +318,7 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg)
*/
static void
prepared_statement_destroy (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle
- *s)
+ struct GNUNET_MysqlStatementHandle *s)
{
GNUNET_CONTAINER_DLL_remove (plugin->shead,
plugin->stail,
@@ -397,6 +333,8 @@ prepared_statement_destroy (struct Plugin *plugin,
/**
* Close database connection and all prepared statements (we got a DB
* disconnect error).
+ *
+ * @param plugin plugin context
*/
static int
iclose (struct Plugin *plugin)
@@ -420,10 +358,11 @@ iclose (struct Plugin *plugin)
* Open the connection with the database (and initialize
* our default options).
*
+ * @param plugin plugin context
* @return GNUNET_OK on success
*/
static int
-iopen (struct Plugin *ret)
+iopen (struct Plugin *plugin)
{
char *mysql_dbname;
char *mysql_server;
@@ -433,67 +372,67 @@ iopen (struct Plugin *ret)
my_bool reconnect;
unsigned int timeout;
- ret->dbf = mysql_init (NULL);
- if (ret->dbf == NULL)
+ plugin->dbf = mysql_init (NULL);
+ if (plugin->dbf == NULL)
return GNUNET_SYSERR;
- if (ret->cnffile != NULL)
- mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
- mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
+ if (plugin->cnffile != NULL)
+ mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
+ mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
reconnect = 0;
- mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
- mysql_options (ret->dbf,
+ mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
+ mysql_options (plugin->dbf,
MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
- mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
+ mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
timeout = 60; /* in seconds */
- mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
- mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
+ mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
+ mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
mysql_dbname = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+ if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
"datastore-mysql", "DATABASE"))
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+ GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
"datastore-mysql", "DATABASE",
&mysql_dbname));
else
mysql_dbname = GNUNET_strdup ("gnunet");
mysql_user = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+ if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
"datastore-mysql", "USER"))
{
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+ GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
"datastore-mysql", "USER",
&mysql_user));
}
mysql_password = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+ if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
"datastore-mysql", "PASSWORD"))
{
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+ GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
"datastore-mysql", "PASSWORD",
&mysql_password));
}
mysql_server = NULL;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+ if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
"datastore-mysql", "HOST"))
{
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+ GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
"datastore-mysql", "HOST",
&mysql_server));
}
mysql_port = 0;
- if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+ if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
"datastore-mysql", "PORT"))
{
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql",
+ GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, "datastore-mysql",
"PORT", &mysql_port));
}
GNUNET_assert (mysql_dbname != NULL);
- mysql_real_connect (ret->dbf,
+ mysql_real_connect (plugin->dbf,
mysql_server,
mysql_user, mysql_password,
mysql_dbname,
@@ -503,10 +442,10 @@ iopen (struct Plugin *ret)
GNUNET_free_non_null (mysql_user);
GNUNET_free_non_null (mysql_password);
GNUNET_free (mysql_dbname);
- if (mysql_error (ret->dbf)[0])
+ if (mysql_error (plugin->dbf)[0])
{
LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
- "mysql_real_connect", ret);
+ "mysql_real_connect", plugin);
return GNUNET_SYSERR;
}
return GNUNET_OK;
@@ -686,19 +625,6 @@ init_params (struct Plugin *plugin,
return GNUNET_OK;
}
-/**
- * Type of a callback that will be called for each
- * data set returned from MySQL.
- *
- * @param cls user-defined argument
- * @param num_values number of elements in values
- * @param values values returned by MySQL
- * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
- */
-typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
- unsigned int num_values,
- MYSQL_BIND *values);
-
/**
* Run a prepared SELECT statement.
@@ -708,40 +634,31 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
* @param result_size number of elements in results array
* @param results pointer to already initialized MYSQL_BIND
* array (of sufficient size) for passing results
- * @param processor function to call on each result
- * @param processor_cls extra argument to processor
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
+ * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
* values (size + buffer-reference for pointers); terminated
* with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- * the number of successfully affected (or queried) rows
+ * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
*/
static int
-prepared_statement_run_select (struct Plugin *plugin,
- struct GNUNET_MysqlStatementHandle *s,
- unsigned int result_size,
- MYSQL_BIND *results,
- GNUNET_MysqlDataProcessor processor, void *processor_cls,
- ...)
+prepared_statement_run_select_va (struct Plugin *plugin,
+ struct GNUNET_MysqlStatementHandle *s,
+ unsigned int result_size,
+ MYSQL_BIND *results,
+ va_list ap)
{
- va_list ap;
int ret;
unsigned int rsize;
- int total;
if (GNUNET_OK != prepare_statement (plugin, s))
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
- va_start (ap, processor_cls);
if (GNUNET_OK != init_params (plugin, s, ap))
{
GNUNET_break (0);
- va_end (ap);
return GNUNET_SYSERR;
}
- va_end (ap);
rsize = mysql_stmt_field_count (s->statement);
if (rsize > result_size)
{
@@ -757,29 +674,53 @@ prepared_statement_run_select (struct Plugin *plugin,
iclose (plugin);
return GNUNET_SYSERR;
}
-
- total = 0;
- while (1)
+ ret = mysql_stmt_fetch (s->statement);
+ if (ret == MYSQL_NO_DATA)
+ return GNUNET_NO;
+ if (ret != 0)
{
- ret = mysql_stmt_fetch (s->statement);
- if (ret == MYSQL_NO_DATA)
- break;
- if (ret != 0)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("`%s' failed at %s:%d with error: %s\n"),
- "mysql_stmt_fetch",
- __FILE__, __LINE__, mysql_stmt_error (s->statement));
- iclose (plugin);
- return GNUNET_SYSERR;
- }
- if (processor != NULL)
- if (GNUNET_OK != processor (processor_cls, rsize, results))
- break;
- total++;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "mysql_stmt_fetch",
+ __FILE__, __LINE__, mysql_stmt_error (s->statement));
+ iclose (plugin);
+ return GNUNET_SYSERR;
}
mysql_stmt_reset (s->statement);
- return total;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Run a prepared SELECT statement.
+ *
+ * @param plugin plugin context
+ * @param s statement to run
+ * @param result_size number of elements in results array
+ * @param results pointer to already initialized MYSQL_BIND
+ * array (of sufficient size) for passing results
+ * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
+ * values (size + buffer-reference for pointers); terminated
+ * with "-1"
+ * @return GNUNET_SYSERR on error, otherwise
+ * the number of successfully affected (or queried) rows
+ */
+static int
+prepared_statement_run_select (struct Plugin *plugin,
+ struct GNUNET_MysqlStatementHandle *s,
+ unsigned int result_size,
+ MYSQL_BIND *results,
+ ...)
+{
+ va_list ap;
+ int ret;
+
+ va_start (ap, results);
+ ret = prepared_statement_run_select_va (plugin, s,
+ result_size, results,
+ ap);
+ va_end (ap);
+ return ret;
}
@@ -854,23 +795,6 @@ do_delete_entry (struct Plugin *plugin,
/**
- * Function that simply returns GNUNET_OK
- *
- * @param cls closure, not used
- * @param num_values not used
- * @param values not used
- * @return GNUNET_OK
- */
-static int
-return_ok (void *cls,
- unsigned int num_values,
- MYSQL_BIND *values)
-{
- return GNUNET_OK;
-}
-
-
-/**
* Get an estimate of how much space the database is
* currently using.
*
@@ -878,7 +802,7 @@ return_ok (void *cls,
* @return number of bytes used on disk
*/
static unsigned long long
-mysql_plugin_get_size (void *cls)
+mysql_plugin_estimate_size (void *cls)
{
struct Plugin *plugin = cls;
MYSQL_BIND cbind[1];
@@ -893,7 +817,6 @@ mysql_plugin_get_size (void *cls)
prepared_statement_run_select (plugin,
plugin->get_size,
1, cbind,
- &return_ok, NULL,
-1))
return 0;
return total;
@@ -929,7 +852,6 @@ mysql_plugin_put (void *cls,
{
struct Plugin *plugin = cls;
unsigned int irepl = replication;
- unsigned int itype = type;
unsigned int ipriority = priority;
unsigned int ianonymity = anonymity;
unsigned long long lexpiration = expiration.abs_value;
@@ -952,7 +874,7 @@ mysql_plugin_put (void *cls,
plugin->insert_entry,
NULL,
MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
- MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
@@ -1034,20 +956,23 @@ mysql_plugin_update (void *cls,
}
-
-
/**
- * Continuation of "mysql_next_request".
+ * Run the given select statement and call 'proc' on the resulting
+ * values (which must be in particular positions).
*
- * @param next_cls the next context
- * @param tc the task context (unused)
+ * @param plugin the plugin handle
+ * @param stmt select statement to run
+ * @param proc function to call on result
+ * @param proc_cls closure for proc
+ * @param ... arguments to initialize stmt
*/
static void
-mysql_next_request_cont (void *next_cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+execute_select (struct Plugin *plugin,
+ struct GNUNET_MysqlStatementHandle *stmt,
+ PluginDatumProcessor proc, void *proc_cls,
+ ...)
{
- struct NextRequestClosure *nrc = next_cls;
- struct Plugin *plugin;
+ va_list ap;
int ret;
unsigned int type;
unsigned int priority;
@@ -1059,19 +984,10 @@ mysql_next_request_cont (void *next_cls,
char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
GNUNET_HashCode key;
struct GNUNET_TIME_Absolute expiration;
- MYSQL_BIND *rbind = nrc->rbind;
-
- plugin = nrc->plugin;
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc = NULL;
+ MYSQL_BIND rbind[7];
- if (GNUNET_YES == nrc->end_it)
- goto END_SET;
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->now = GNUNET_TIME_absolute_get ();
hashSize = sizeof (GNUNET_HashCode);
- memset (nrc->rbind, 0, sizeof (nrc->rbind));
- rbind = nrc->rbind;
+ memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_LONG;
rbind[0].buffer = &type;
rbind[0].is_unsigned = 1;
@@ -1096,16 +1012,28 @@ mysql_next_request_cont (void *next_cls,
rbind[6].buffer = &uid;
rbind[6].is_unsigned = 1;
- if (GNUNET_OK != nrc->prep (nrc->prep_cls,
- nrc))
- goto END_SET;
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+ va_start (ap, proc_cls);
+ ret = prepared_statement_run_select_va (plugin,
+ stmt,
+ 7, rbind,
+ ap);
+ va_end (ap);
+ if (ret <= 0)
+ {
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
+ }
GNUNET_assert (size <= sizeof(value));
if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
(hashSize != sizeof (GNUNET_HashCode)) )
{
GNUNET_break (0);
- goto END_SET;
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
}
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1116,18 +1044,13 @@ mysql_next_request_cont (void *next_cls,
anonymity,
exp);
#endif
+ GNUNET_assert (size < MAX_DATUM_SIZE);
expiration.abs_value = exp;
- ret = nrc->dviter (nrc->dviter_cls,
- (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
- &key,
- size, value,
- type, priority, anonymity, expiration,
- uid);
- if (ret == GNUNET_SYSERR)
- {
- nrc->end_it = GNUNET_YES;
- return;
- }
+ ret = proc (proc_cls,
+ &key,
+ size, value,
+ type, priority, anonymity, expiration,
+ uid);
if (ret == GNUNET_NO)
{
do_delete_entry (plugin, uid);
@@ -1135,189 +1058,50 @@ mysql_next_request_cont (void *next_cls,
plugin->env->duc (plugin->env->cls,
- size);
}
- if (nrc->one_shot == GNUNET_YES)
- GNUNET_free (nrc);
- return;
- END_SET:
- /* call dviter with "end of set" */
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->dviter (nrc->dviter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- nrc->prep (nrc->prep_cls, NULL);
- GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
- GNUNET_free (nrc);
}
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-static void
-mysql_plugin_next_request (void *next_cls,
- int end_it)
-{
- struct NextRequestClosure *nrc = next_cls;
-
- if (GNUNET_YES == end_it)
- nrc->end_it = GNUNET_YES;
- nrc->plugin->next_task_nc = nrc;
- nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
- nrc);
-}
-
/**
- * Context for 'get_statement_prepare'.
- */
-struct GetContext
-{
- GNUNET_HashCode key;
- GNUNET_HashCode vhash;
-
- unsigned int prio;
- unsigned int anonymity;
- unsigned long long expiration;
- unsigned long long vkey;
- unsigned long long total;
- unsigned int off;
- unsigned int count;
- int have_vhash;
-};
-
-
-static int
-get_statement_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct GetContext *gc = cls;
- struct Plugin *plugin;
- int ret;
- unsigned long hashSize;
-
- if (NULL == nrc)
- {
- GNUNET_free (gc);
- return GNUNET_NO;
- }
- if (gc->count == gc->total)
- return GNUNET_NO;
- plugin = nrc->plugin;
- hashSize = sizeof (GNUNET_HashCode);
- if (++gc->off >= gc->total)
- gc->off = 0;
-#if DEBUG_MYSQL
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
- gc->count+1,
- gc->total,
- gc->off,
- GNUNET_h2s (&gc->key));
-#endif
- if (nrc->type != 0)
- {
- if (gc->have_vhash)
- {
- ret = prepared_statement_run_select (plugin,
- plugin->select_entry_by_hash_vhash_and_type,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
- MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
- MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
- -1);
- }
- else
- {
- ret =
- prepared_statement_run_select (plugin,
- plugin->select_entry_by_hash_and_type,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES,
- MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
- -1);
- }
- }
- else
- {
- if (gc->have_vhash)
- {
- ret =
- prepared_statement_run_select (plugin,
- plugin->select_entry_by_hash_and_vhash,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize,
- MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
- -1);
- }
- else
- {
- ret =
- prepared_statement_run_select (plugin,
- plugin->select_entry_by_hash,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize,
- MYSQL_TYPE_LONG, &gc->off, GNUNET_YES,
- -1);
- }
- }
- gc->count++;
- return ret;
-}
-
-
-/**
- * Iterate over the results for a particular key in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
* @param cls closure
- * @param key maybe NULL (to match all entries)
+ * @param offset offset of the result (mod #num-results);
+ * specific ordering does not matter for the offset
+ * @param key key to match, never NULL
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
* Note that for DBlocks there is no difference
* betwen key and vhash, but for other blocks
* there may be!
* @param type entries of which type are relevant?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * Use 0 for any type.
+ * @param proc function to call on each matching value; however,
+ * after the first call to "proc", the plugin must wait
+ * until "NextRequest" was called before giving the processor
+ * the next item; finally, the "proc" should be called once
+ * once with a NULL value at the end ("next_cls" should be NULL
+ * for that last call)
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_get (void *cls,
- const GNUNET_HashCode *key,
- const GNUNET_HashCode *vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_get_key (void *cls,
+ uint64_t offset,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- unsigned int itype = type;
int ret;
MYSQL_BIND cbind[1];
- struct GetContext *gc;
- struct NextRequestClosure *nrc;
long long total;
unsigned long hashSize;
unsigned long hashSize2;
+ unsigned long long off;
GNUNET_assert (key != NULL);
- if (iter == NULL)
- return;
+ GNUNET_assert (NULL != proc);
hashSize = sizeof (GNUNET_HashCode);
hashSize2 = sizeof (GNUNET_HashCode);
memset (cbind, 0, sizeof (cbind));
@@ -1333,10 +1117,9 @@ mysql_plugin_get (void *cls,
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash_vhash_and_type,
1, cbind,
- &return_ok, NULL,
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
- MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
-1);
}
else
@@ -1345,9 +1128,8 @@ mysql_plugin_get (void *cls,
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash_and_type,
1, cbind,
- &return_ok, NULL,
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
- MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
-1);
}
}
@@ -1359,7 +1141,6 @@ mysql_plugin_get (void *cls,
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash_and_vhash,
1, cbind,
- &return_ok, NULL,
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2,
-1);
@@ -1371,79 +1152,81 @@ mysql_plugin_get (void *cls,
prepared_statement_run_select (plugin,
plugin->count_entry_by_hash,
1, cbind,
- &return_ok, NULL,
MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
-1);
}
}
if ((ret != GNUNET_OK) || (0 >= total))
{
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
+ offset = offset % total;
+ off = (unsigned long long) offset;
#if DEBUG_MYSQL
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iterating over %lld results for GET `%s'\n",
+ "Obtaining %llu/%lld result for GET `%s'\n",
+ off,
total,
GNUNET_h2s (key));
#endif
- gc = GNUNET_malloc (sizeof (struct GetContext));
- gc->key = *key;
- if (vhash != NULL)
+
+ if (type != GNUNET_BLOCK_TYPE_ANY)
{
- gc->have_vhash = GNUNET_YES;
- gc->vhash = *vhash;
+ if (NULL != vhash)
+ {
+ execute_select (plugin,
+ plugin->select_entry_by_hash_vhash_and_type,
+ proc, proc_cls,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ -1);
+ }
+ else
+ {
+ execute_select (plugin,
+ plugin->select_entry_by_hash_and_type,
+ proc, proc_cls,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ -1);
+ }
+ }
+ else
+ {
+ if (NULL != vhash)
+ {
+ execute_select (plugin,
+ plugin->select_entry_by_hash_and_vhash,
+ proc, proc_cls,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ -1);
+ }
+ else
+ {
+ execute_select (plugin,
+ plugin->select_entry_by_hash,
+ proc, proc_cls,
+ MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ -1);
+ }
}
- gc->total = total;
- gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->dviter = iter;
- nrc->dviter_cls = iter_cls;
- nrc->prep = &get_statement_prepare;
- nrc->prep_cls = gc;
- mysql_plugin_next_request (nrc, GNUNET_NO);
-}
-
-
-/**
- * Run the prepared statement to get the next data item ready.
- *
- * @param cls not used
- * @param nrc closure for the next request iterator
- * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
- */
-static int
-iterator_zero_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin;
- int ret;
-
- if (nrc == NULL)
- return GNUNET_NO;
- plugin = nrc->plugin;
- ret = prepared_statement_run_select (plugin,
- plugin->zero_iter,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
- -1);
- nrc->count++;
- return ret;
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get a zero-anonymity datum from the datastore.
*
* @param cls our "struct Plugin*"
+ * @param offset offset of the result
* @param type entries of which type should be considered?
* Use 0 for any type.
* @param iter function to call on each matching value;
@@ -1451,47 +1234,27 @@ iterator_zero_prepare (void *cls,
* @param iter_cls closure for iter
*/
static void
-mysql_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+mysql_plugin_get_zero_anonymity (void *cls,
+ uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->type = type;
- nrc->dviter = iter;
- nrc->dviter_cls = iter_cls;
- nrc->prep = &iterator_zero_prepare;
- mysql_plugin_next_request (nrc, GNUNET_NO);
-}
+ unsigned long long off;
+ off = (unsigned long long) offset;
+ execute_select (plugin,
+ plugin->zero_iter,
+ proc, proc_cls,
+ MYSQL_TYPE_LONG, &type, GNUNET_YES,
+ MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+ -1);
-/**
- * Run the SELECT statement for the replication function.
- *
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
- */
-static int
-replication_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin = cls;
-
- return prepared_statement_run_select (plugin,
- plugin->select_replication,
- 7, nrc->rbind,
- &return_ok, NULL,
- -1);
}
-
/**
- * Context for 'repl_iter' function.
+ * Context for 'repl_proc' function.
*/
struct ReplCtx
{
@@ -1504,22 +1267,21 @@ struct ReplCtx
/**
* Function to call for the result (or the NULL).
*/
- PluginIterator iter;
+ PluginDatumProcessor proc;
/**
- * Closure for iter.
+ * Closure for proc.
*/
- void *iter_cls;
+ void *proc_cls;
};
/**
- * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Wrapper for the processor for 'mysql_plugin_get_replication'.
* Decrements the replication counter and calls the original
* iterator.
*
* @param cls closure
- * @param next_cls closure to pass to the "next" function.
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
@@ -1535,8 +1297,7 @@ struct ReplCtx
* GNUNET_NO to delete the item and continue (if supported)
*/
static int
-repl_iter (void *cls,
- void *next_cls,
+repl_proc (void *cls,
const GNUNET_HashCode *key,
uint32_t size,
const void *data,
@@ -1552,8 +1313,8 @@ repl_iter (void *cls,
int ret;
int iret;
- ret = rc->iter (rc->iter_cls,
- next_cls, key,
+ ret = rc->proc (rc->proc_cls,
+ key,
size, data,
type, priority, anonymity, expiration,
uid);
@@ -1561,10 +1322,10 @@ repl_iter (void *cls,
{
oid = (unsigned long long) uid;
iret = prepared_statement_run (plugin,
- plugin->dec_repl,
- NULL,
- MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
- -1);
+ plugin->dec_repl,
+ NULL,
+ MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES,
+ -1);
if (iret == GNUNET_SYSERR)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1577,94 +1338,56 @@ repl_iter (void *cls,
/**
- * Get a random item for replication. Returns a single, not expired, random item
- * from those with the highest replication counters. The item's
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication. Returns a single, not expired,
+ * random item from those with the highest replication counters. The
+ * item's replication counter is decremented by one IF it was positive
+ * before. Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param iter_cls closure for proc
*/
static void
-mysql_plugin_replication_get (void *cls,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_get_replication (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
struct ReplCtx rc;
rc.plugin = plugin;
- rc.iter = iter;
- rc.iter_cls = iter_cls;
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->now = GNUNET_TIME_absolute_get ();
- nrc->prep = &replication_prepare;
- nrc->prep_cls = plugin;
- nrc->type = 0;
- nrc->dviter = &repl_iter;
- nrc->dviter_cls = &rc;
- nrc->end_it = GNUNET_NO;
- nrc->one_shot = GNUNET_YES;
- mysql_next_request_cont (nrc, NULL);
-}
-
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
+ execute_select (plugin,
+ plugin->select_replication,
+ &repl_proc, &rc,
+ -1);
-/**
- * Run the SELECT statement for the expiration function.
- *
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
- * @return GNUNET_OK on success, GNUNET_NO if there are
- * no more values, GNUNET_SYSERR on error
- */
-static int
-expiration_prepare (void *cls,
- struct NextRequestClosure *nrc)
-{
- struct Plugin *plugin = cls;
- long long nt;
-
- if (NULL == nrc)
- return GNUNET_NO;
- nt = (long long) nrc->now.abs_value;
- return prepared_statement_run_select
- (plugin,
- plugin->select_expiration,
- 7, nrc->rbind,
- &return_ok, NULL,
- MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
- -1);
}
/**
* Get a random item for expiration.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-mysql_plugin_expiration_get (void *cls,
- PluginIterator iter, void *iter_cls)
+mysql_plugin_get_expiration (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->now = GNUNET_TIME_absolute_get ();
- nrc->prep = &expiration_prepare;
- nrc->prep_cls = plugin;
- nrc->type = 0;
- nrc->dviter = iter;
- nrc->dviter_cls = iter_cls;
- nrc->end_it = GNUNET_NO;
- nrc->one_shot = GNUNET_YES;
- mysql_next_request_cont (nrc, NULL);
+ long long nt;
+
+ nt = (long long) GNUNET_TIME_absolute_get().abs_value;
+ execute_select (plugin,
+ plugin->select_expiration,
+ proc, proc_cls,
+ MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES,
+ -1);
+
}
@@ -1760,14 +1483,13 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
api->cls = plugin;
- api->get_size = &mysql_plugin_get_size;
+ api->estimate_size = &mysql_plugin_estimate_size;
api->put = &mysql_plugin_put;
- api->next_request = &mysql_plugin_next_request;
- api->get = &mysql_plugin_get;
- api->replication_get = &mysql_plugin_replication_get;
- api->expiration_get = &mysql_plugin_expiration_get;
api->update = &mysql_plugin_update;
- api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
+ api->get_key = &mysql_plugin_get_key;
+ api->get_replication = &mysql_plugin_get_replication;
+ api->get_expiration = &mysql_plugin_get_expiration;
+ api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
api->drop = &mysql_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"mysql", _("Mysql database running\n"));
@@ -1787,14 +1509,6 @@ libgnunet_plugin_datastore_mysql_done (void *cls)
struct Plugin *plugin = api->cls;
iclose (plugin);
- if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (plugin->next_task);
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
- GNUNET_free (plugin->next_task_nc);
- plugin->next_task_nc = NULL;
- }
GNUNET_free_non_null (plugin->cnffile);
GNUNET_free (plugin);
GNUNET_free (api);
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index aea87fdf4..cb077f06a 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -44,103 +44,6 @@
/**
- * Closure for 'postgres_next_request_cont'.
- */
-struct NextRequestClosure
-{
- /**
- * Global plugin data.
- */
- struct Plugin *plugin;
-
- /**
- * Function to call for each matching entry.
- */
- PluginIterator iter;
-
- /**
- * Closure for 'iter'.
- */
- void *iter_cls;
-
- /**
- * Parameters for the prepared statement.
- */
- const char *paramValues[5];
-
- /**
- * Name of the prepared statement to run.
- */
- const char *pname;
-
- /**
- * Size of values pointed to by paramValues.
- */
- int paramLengths[5];
-
- /**
- * Number of paramters in paramValues/paramLengths.
- */
- int nparams;
-
- /**
- * Current time (possible parameter), big-endian.
- */
- uint64_t bnow;
-
- /**
- * Key (possible parameter)
- */
- GNUNET_HashCode key;
-
- /**
- * Hash of value (possible parameter)
- */
- GNUNET_HashCode vhash;
-
- /**
- * Number of entries found so far
- */
- unsigned long long count;
-
- /**
- * Offset this iteration starts at.
- */
- uint64_t off;
-
- /**
- * Current offset to use in query, big-endian.
- */
- uint64_t blimit_off;
-
- /**
- * Current total number of entries found so far, big-endian.
- */
- uint64_t bcount;
-
- /**
- * Overall number of matching entries.
- */
- unsigned long long total;
-
- /**
- * Type of block (possible paramter), big-endian.
- */
- uint32_t btype;
-
- /**
- * Flag set to GNUNET_YES to stop iteration.
- */
- int end_it;
-
- /**
- * Flag to indicate that there should only be one result.
- */
- int one_shot;
-};
-
-
-/**
* Context for all functions in this plugin.
*/
struct Plugin
@@ -155,16 +58,6 @@ struct Plugin
*/
PGconn *dbh;
- /**
- * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
- */
- struct NextRequestClosure *next_task_nc;
-
- /**
- * Pending task with scheduler for running the next request.
- */
- GNUNET_SCHEDULER_TaskIdentifier next_task;
-
};
@@ -434,7 +327,7 @@ init_connection (struct Plugin *plugin)
pq_prepare (plugin,
"select_non_anonymous",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
+ "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
1,
__LINE__)) ||
(GNUNET_OK !=
@@ -482,11 +375,13 @@ static int
delete_by_rowid (struct Plugin *plugin,
unsigned int rowid)
{
- const char *paramValues[] = { (const char *) &rowid };
- int paramLengths[] = { sizeof (rowid) };
+ uint32_t browid;
+ const char *paramValues[] = { (const char *) &browid };
+ int paramLengths[] = { sizeof (browid) };
const int paramFormats[] = { 1 };
PGresult *ret;
+ browid = htonl (rowid);
ret = PQexecPrepared (plugin->dbh,
"delrow",
1, paramValues, paramLengths, paramFormats, 1);
@@ -510,7 +405,7 @@ delete_by_rowid (struct Plugin *plugin,
* @return number of bytes used on disk
*/
static unsigned long long
-postgres_plugin_get_size (void *cls)
+postgres_plugin_estimate_size (void *cls)
{
struct Plugin *plugin = cls;
unsigned long long total;
@@ -619,22 +514,20 @@ postgres_plugin_put (void *cls,
/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * Function invoked to process the result and call
+ * the processor.
*
- * @param next_cls the 'struct NextRequestClosure'
- * @param tc scheduler context
+ * @param plugin global plugin data
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
+ * @param res result from exec
*/
static void
-postgres_next_request_cont (void *next_cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+process_result (struct Plugin *plugin,
+ PluginDatumProcessor proc, void *proc_cls,
+ PGresult *res)
{
- struct NextRequestClosure *nrc = next_cls;
- struct Plugin *plugin = nrc->plugin;
- const int paramFormats[] = { 1, 1, 1, 1, 1 };
int iret;
- PGresult *res;
enum GNUNET_BLOCK_Type type;
uint32_t anonymity;
uint32_t priority;
@@ -643,38 +536,11 @@ postgres_next_request_cont (void *next_cls,
struct GNUNET_TIME_Absolute expiration_time;
GNUNET_HashCode key;
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc = NULL;
- if ( (GNUNET_YES == nrc->end_it) ||
- (nrc->count == nrc->total) )
- {
-#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Ending iteration (%s)\n",
- (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
-#endif
- nrc->iter (nrc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
- return;
- }
- if (nrc->off == nrc->total)
- nrc->off = 0;
- nrc->blimit_off = GNUNET_htonll (nrc->off);
- nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
- res = PQexecPrepared (plugin->dbh,
- nrc->pname,
- nrc->nparams,
- nrc->paramValues,
- nrc->paramLengths,
- paramFormats, 1);
if (GNUNET_OK != check_result (plugin,
res,
PGRES_TUPLES_OK,
"PQexecPrepared",
- nrc->pname,
+ "select",
__LINE__))
{
#if DEBUG_POSTGRES
@@ -682,10 +548,9 @@ postgres_next_request_cont (void *next_cls,
"datastore-postgres",
"Ending iteration (postgres error)\n");
#endif
- nrc->iter (nrc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
@@ -697,11 +562,10 @@ postgres_next_request_cont (void *next_cls,
"datastore-postgres",
"Ending iteration (no more results)\n");
#endif
- nrc->iter (nrc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
PQclear (res);
- GNUNET_free (nrc);
return;
}
if ((1 != PQntuples (res)) ||
@@ -710,11 +574,10 @@ postgres_next_request_cont (void *next_cls,
(sizeof (uint32_t) != PQfsize (res, 6)))
{
GNUNET_break (0);
- nrc->iter (nrc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
PQclear (res);
- GNUNET_free (nrc);
return;
}
rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
@@ -727,10 +590,9 @@ postgres_next_request_cont (void *next_cls,
GNUNET_break (0);
PQclear (res);
delete_by_rowid (plugin, rowid);
- nrc->iter (nrc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
@@ -749,33 +611,23 @@ postgres_next_request_cont (void *next_cls,
(unsigned int) size,
(unsigned int) type);
#endif
- iret = nrc->iter (nrc->iter_cls,
- (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
- &key,
- size,
- PQgetvalue (res, 0, 5),
- (enum GNUNET_BLOCK_Type) type,
- priority,
- anonymity,
- expiration_time,
- rowid);
+ iret = proc (proc_cls,
+ &key,
+ size,
+ PQgetvalue (res, 0, 5),
+ (enum GNUNET_BLOCK_Type) type,
+ priority,
+ anonymity,
+ expiration_time,
+ rowid);
PQclear (res);
- if (iret != GNUNET_NO)
- {
- nrc->count++;
- nrc->off++;
- }
- if (iret == GNUNET_SYSERR)
+ if (iret == GNUNET_NO)
{
#if DEBUG_POSTGRES
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "datastore-postgres",
- "Ending iteration (client error)\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processor asked for item %u to be removed.\n",
+ rowid);
#endif
- return;
- }
- if (iret == GNUNET_NO)
- {
if (GNUNET_OK == delete_by_rowid (plugin, rowid))
{
#if DEBUG_POSTGRES
@@ -794,34 +646,6 @@ postgres_next_request_cont (void *next_cls,
#endif
}
}
- if (nrc->one_shot == GNUNET_YES)
- GNUNET_free (nrc);
-}
-
-
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-static void
-postgres_plugin_next_request (void *next_cls,
- int end_it)
-{
- struct NextRequestClosure *nrc = next_cls;
-
- if (GNUNET_YES == end_it)
- nrc->end_it = GNUNET_YES;
- nrc->plugin->next_task_nc = nrc;
- nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
- nrc);
}
@@ -843,62 +667,62 @@ postgres_plugin_next_request (void *next_cls,
* @param iter_cls closure for iter
*/
static void
-postgres_plugin_get (void *cls,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls)
+postgres_plugin_get_key (void *cls,
+ uint64_t offset,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
const int paramFormats[] = { 1, 1, 1, 1, 1 };
+ int paramLengths[4];
+ const char *paramValues[4];
+ int nparams;
+ const char *pname;
PGresult *ret;
+ uint64_t total;
+ uint64_t blimit_off;
+ uint32_t btype;
GNUNET_assert (key != NULL);
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->plugin = plugin;
- nrc->iter = iter;
- nrc->iter_cls = iter_cls;
- nrc->key = *key;
- if (vhash != NULL)
- nrc->vhash = *vhash;
- nrc->paramValues[0] = (const char*) &nrc->key;
- nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
- nrc->btype = htonl (type);
+ paramValues[0] = (const char*) key;
+ paramLengths[0] = sizeof (GNUNET_HashCode);
+ btype = htonl (type);
if (type != 0)
{
if (vhash != NULL)
{
- nrc->paramValues[1] = (const char *) &nrc->vhash;
- nrc->paramLengths[1] = sizeof (nrc->vhash);
- nrc->paramValues[2] = (const char *) &nrc->btype;
- nrc->paramLengths[2] = sizeof (nrc->btype);
- nrc->paramValues[3] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[3] = sizeof (nrc->blimit_off);
- nrc->nparams = 4;
- nrc->pname = "getvt";
+ paramValues[1] = (const char *) vhash;
+ paramLengths[1] = sizeof (GNUNET_HashCode);
+ paramValues[2] = (const char *) &btype;
+ paramLengths[2] = sizeof (btype);
+ paramValues[3] = (const char *) &blimit_off;
+ paramLengths[3] = sizeof (blimit_off);
+ nparams = 4;
+ pname = "getvt";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
3,
NULL,
- nrc->paramValues,
- nrc->paramLengths,
+ paramValues,
+ paramLengths,
paramFormats, 1);
}
else
{
- nrc->paramValues[1] = (const char *) &nrc->btype;
- nrc->paramLengths[1] = sizeof (nrc->btype);
- nrc->paramValues[2] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[2] = sizeof (nrc->blimit_off);
- nrc->nparams = 3;
- nrc->pname = "gett";
+ paramValues[1] = (const char *) &btype;
+ paramLengths[1] = sizeof (btype);
+ paramValues[2] = (const char *) &blimit_off;
+ paramLengths[2] = sizeof (blimit_off);
+ nparams = 3;
+ pname = "gett";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
2,
NULL,
- nrc->paramValues,
- nrc->paramLengths,
+ paramValues,
+ paramLengths,
paramFormats, 1);
}
}
@@ -906,32 +730,32 @@ postgres_plugin_get (void *cls,
{
if (vhash != NULL)
{
- nrc->paramValues[1] = (const char *) &nrc->vhash;
- nrc->paramLengths[1] = sizeof (nrc->vhash);
- nrc->paramValues[2] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[2] = sizeof (nrc->blimit_off);
- nrc->nparams = 3;
- nrc->pname = "getv";
+ paramValues[1] = (const char *) vhash;
+ paramLengths[1] = sizeof (GNUNET_HashCode);
+ paramValues[2] = (const char *) &blimit_off;
+ paramLengths[2] = sizeof (blimit_off);
+ nparams = 3;
+ pname = "getv";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
2,
NULL,
- nrc->paramValues,
- nrc->paramLengths,
+ paramValues,
+ paramLengths,
paramFormats, 1);
}
else
{
- nrc->paramValues[1] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[1] = sizeof (nrc->blimit_off);
- nrc->nparams = 2;
- nrc->pname = "get";
+ paramValues[1] = (const char *) &blimit_off;
+ paramLengths[1] = sizeof (blimit_off);
+ nparams = 2;
+ pname = "get";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1",
1,
NULL,
- nrc->paramValues,
- nrc->paramLengths,
+ paramValues,
+ paramLengths,
paramFormats, 1);
}
}
@@ -939,13 +763,12 @@ postgres_plugin_get (void *cls,
ret,
PGRES_TUPLES_OK,
"PQexecParams",
- nrc->pname,
+ pname,
__LINE__))
{
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
return;
}
if ((PQntuples (ret) != 1) ||
@@ -954,26 +777,30 @@ postgres_plugin_get (void *cls,
{
GNUNET_break (0);
PQclear (ret);
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
return;
}
- nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
+ total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
PQclear (ret);
- if (nrc->total == 0)
+ if (total == 0)
{
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
+ proc (proc_cls,
+ NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
return;
}
- nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
- nrc->total);
- postgres_plugin_next_request (nrc,
- GNUNET_NO);
+ blimit_off = GNUNET_htonll (offset % total);
+ ret = PQexecPrepared (plugin->dbh,
+ pname,
+ nparams,
+ paramValues,
+ paramLengths,
+ paramFormats, 1);
+ process_result (plugin,
+ proc, proc_cls,
+ ret);
}
@@ -989,28 +816,33 @@ postgres_plugin_get (void *cls,
* @param iter_cls closure for iter
*/
static void
-postgres_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+postgres_plugin_get_zero_anonymity (void *cls,
+ uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
+ uint32_t btype;
+ uint64_t boff;
+ const int paramFormats[] = { 1, 1 };
+ int paramLengths[] = { sizeof (btype), sizeof (boff) };
+ const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
+ PGresult *ret;
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->total = UINT32_MAX;
- nrc->btype = htonl ((uint32_t) type);
- nrc->plugin = plugin;
- nrc->iter = iter;
- nrc->iter_cls = iter_cls;
- nrc->pname = "select_non_anonymous";
- nrc->nparams = 1;
- nrc->paramLengths[0] = sizeof (nrc->bcount);
- nrc->paramValues[0] = (const char*) &nrc->bcount;
- postgres_plugin_next_request (nrc,
- GNUNET_NO);
+ btype = htonl ((uint32_t) type);
+ boff = GNUNET_htonll (offset);
+ ret = PQexecPrepared (plugin->dbh,
+ "select_non_anonymous",
+ 2,
+ paramValues,
+ paramLengths,
+ paramFormats, 1);
+ process_result (plugin,
+ proc, proc_cls,
+ ret);
}
+
/**
* Context for 'repl_iter' function.
*/
@@ -1025,12 +857,12 @@ struct ReplCtx
/**
* Function to call for the result (or the NULL).
*/
- PluginIterator iter;
+ PluginDatumProcessor proc;
/**
- * Closure for iter.
+ * Closure for proc.
*/
- void *iter_cls;
+ void *proc_cls;
};
@@ -1056,8 +888,7 @@ struct ReplCtx
* GNUNET_NO to delete the item and continue (if supported)
*/
static int
-repl_iter (void *cls,
- void *next_cls,
+repl_proc (void *cls,
const GNUNET_HashCode *key,
uint32_t size,
const void *data,
@@ -1073,8 +904,8 @@ repl_iter (void *cls,
PGresult *qret;
uint32_t boid;
- ret = rc->iter (rc->iter_cls,
- next_cls, key,
+ ret = rc->proc (rc->proc_cls,
+ key,
size, data,
type, priority, anonymity, expiration,
uid);
@@ -1107,32 +938,30 @@ repl_iter (void *cls,
* Get a random item for replication. Returns a single, not expired, random item
* from those with the highest replication counters. The item's
* replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for iter
*/
static void
-postgres_plugin_replication_get (void *cls,
- PluginIterator iter, void *iter_cls)
+postgres_plugin_get_replication (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
struct ReplCtx rc;
+ PGresult *ret;
rc.plugin = plugin;
- rc.iter = iter;
- rc.iter_cls = iter_cls;
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->one_shot = GNUNET_YES;
- nrc->total = 1;
- nrc->plugin = plugin;
- nrc->iter = &repl_iter;
- nrc->iter_cls = &rc;
- nrc->pname = "select_replication_order";
- nrc->nparams = 0;
- postgres_next_request_cont (nrc, NULL);
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
+ ret = PQexecPrepared (plugin->dbh,
+ "select_replication_order",
+ 0,
+ NULL, NULL, NULL, 1);
+ process_result (plugin,
+ &repl_proc, &rc,
+ ret);
}
@@ -1141,29 +970,31 @@ postgres_plugin_replication_get (void *cls,
* Call 'iter' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for iter
*/
static void
-postgres_plugin_expiration_get (void *cls,
- PluginIterator iter, void *iter_cls)
+postgres_plugin_get_expiration (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct NextRequestClosure *nrc;
uint64_t btime;
+ const int paramFormats[] = { 1 };
+ int paramLengths[] = { sizeof (btime) };
+ const char *paramValues[] = { (const char*) &btime };
+ PGresult *ret;
btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->one_shot = GNUNET_YES;
- nrc->total = 1;
- nrc->plugin = plugin;
- nrc->iter = iter;
- nrc->iter_cls = iter_cls;
- nrc->pname = "select_expiration_order";
- nrc->nparams = 1;
- nrc->paramValues[0] = (const char *) &btime;
- nrc->paramLengths[0] = sizeof (btime);
- postgres_next_request_cont (nrc, NULL);
+ ret = PQexecPrepared (plugin->dbh,
+ "select_expiration_order",
+ 1,
+ paramValues,
+ paramLengths,
+ paramFormats,
+ 1);
+ process_result (plugin,
+ proc, proc_cls,
+ ret);
}
@@ -1260,14 +1091,13 @@ libgnunet_plugin_datastore_postgres_init (void *cls)
}
api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
api->cls = plugin;
- api->get_size = &postgres_plugin_get_size;
+ api->estimate_size = &postgres_plugin_estimate_size;
api->put = &postgres_plugin_put;
- api->next_request = &postgres_plugin_next_request;
- api->get = &postgres_plugin_get;
- api->replication_get = &postgres_plugin_replication_get;
- api->expiration_get = &postgres_plugin_expiration_get;
api->update = &postgres_plugin_update;
- api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
+ api->get_key = &postgres_plugin_get_key;
+ api->get_replication = &postgres_plugin_get_replication;
+ api->get_expiration = &postgres_plugin_get_expiration;
+ api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
api->drop = &postgres_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"datastore-postgres",
@@ -1287,13 +1117,6 @@ libgnunet_plugin_datastore_postgres_done (void *cls)
struct GNUNET_DATASTORE_PluginFunctions *api = cls;
struct Plugin *plugin = api->cls;
- if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (plugin->next_task);
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_free (plugin->next_task_nc);
- plugin->next_task_nc = NULL;
- }
PQfinish (plugin->dbh);
GNUNET_free (plugin);
GNUNET_free (api);
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c
index 6e77ec364..3710b7eb7 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -108,19 +108,14 @@ struct Plugin
sqlite3_stmt *selExpi;
/**
- * Precompiled SQL for insertion.
- */
- sqlite3_stmt *insertContent;
-
- /**
- * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
+ * Precompiled SQL for expiration selection.
*/
- struct NextContext *next_task_nc;
+ sqlite3_stmt *selZeroAnon;
/**
- * Pending task with scheduler for running the next request.
+ * Precompiled SQL for insertion.
*/
- GNUNET_SCHEDULER_TaskIdentifier next_task;
+ sqlite3_stmt *insertContent;
/**
* Should the database be dropped on shutdown?
@@ -326,6 +321,11 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
" WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
" ORDER BY prio ASC LIMIT 1",
&plugin->selExpi) != SQLITE_OK) ||
+ (sq_prepare (plugin->dbh,
+ "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
+ "WHERE (anonLevel = 0 AND type=?1) "
+ "ORDER BY hash DESC LIMIT 1 OFFSET ?2",
+ &plugin->selZeroAnon) != SQLITE_OK) ||
(sq_prepare (plugin->dbh,
"INSERT INTO gn090 (repl, type, prio, "
"anonLevel, expire, hash, vhash, value) "
@@ -367,6 +367,8 @@ database_shutdown (struct Plugin *plugin)
sqlite3_finalize (plugin->selRepl);
if (plugin->selExpi != NULL)
sqlite3_finalize (plugin->selExpi);
+ if (plugin->selZeroAnon != NULL)
+ sqlite3_finalize (plugin->selZeroAnon);
if (plugin->insertContent != NULL)
sqlite3_finalize (plugin->insertContent);
result = sqlite3_close(plugin->dbh);
@@ -436,247 +438,6 @@ delete_by_rowid (struct Plugin* plugin,
/**
- * Context for the universal iterator.
- */
-struct NextContext;
-
-/**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- * call which gives the callback a chance to
- * clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- * no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
- struct NextContext *nc);
-
-
-/**
- * Context we keep for the "next request" callback.
- */
-struct NextContext
-{
- /**
- * Internal state.
- */
- struct Plugin *plugin;
-
- /**
- * Function to call on the next value.
- */
- PluginIterator iter;
-
- /**
- * Closure for iter.
- */
- void *iter_cls;
-
- /**
- * Function to call to prepare the next
- * iteration.
- */
- PrepareFunction prep;
-
- /**
- * Closure for prep.
- */
- void *prep_cls;
-
- /**
- * Statement that the iterator will get the data
- * from (updated or set by prep).
- */
- sqlite3_stmt *stmt;
-
- /**
- * Row ID of the last result.
- */
- unsigned long long last_rowid;
-
- /**
- * Key of the last result.
- */
- GNUNET_HashCode lastKey;
-
- /**
- * Priority of the last value visited.
- */
- unsigned int lastPriority;
-
- /**
- * Number of results processed so far.
- */
- unsigned int count;
-
- /**
- * Set to GNUNET_YES if we must stop now.
- */
- int end_it;
-};
-
-
-/**
- * Continuation of "sqlite_next_request".
- *
- * @param cls the 'struct NextContext*'
- * @param tc the task context (unused)
- */
-static void
-sqlite_next_request_cont (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct NextContext * nc = cls;
- struct Plugin *plugin;
- unsigned long long rowid;
- int ret;
- unsigned int size;
- unsigned int hsize;
- uint32_t anonymity;
- uint32_t priority;
- enum GNUNET_BLOCK_Type type;
- const GNUNET_HashCode *key;
- struct GNUNET_TIME_Absolute expiration;
-
- plugin = nc->plugin;
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
- plugin->next_task_nc = NULL;
- if ( (GNUNET_YES == nc->end_it) ||
- (GNUNET_OK != (nc->prep(nc->prep_cls,
- nc))) )
- {
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Iteration completes after %u results\n",
- nc->count);
-#endif
- END:
- nc->iter (nc->iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- nc->prep (nc->prep_cls, NULL);
- GNUNET_free (nc);
- return;
- }
-
- type = sqlite3_column_int (nc->stmt, 0);
- priority = sqlite3_column_int (nc->stmt, 1);
- anonymity = sqlite3_column_int (nc->stmt, 2);
- expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
- hsize = sqlite3_column_bytes (nc->stmt, 4);
- key = sqlite3_column_blob (nc->stmt, 4);
- size = sqlite3_column_bytes (nc->stmt, 5);
- rowid = sqlite3_column_int64 (nc->stmt, 6);
- if (hsize != sizeof (GNUNET_HashCode))
- {
- GNUNET_break (0);
- if (SQLITE_OK != sqlite3_reset (nc->stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- if (GNUNET_OK == delete_by_rowid (plugin, rowid))
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- goto END;
- }
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
- type,
- GNUNET_h2s(key),
- priority,
- (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
- (long long) expiration.abs_value);
-#endif
- if (size > MAX_ITEM_SIZE)
- {
- GNUNET_break (0);
- if (SQLITE_OK != sqlite3_reset (nc->stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- if (GNUNET_OK == delete_by_rowid (plugin, rowid))
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- goto END;
- }
- {
- char data[size];
-
- memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
- nc->count++;
- nc->last_rowid = rowid;
- nc->lastPriority = priority;
- nc->lastKey = *key;
- if (SQLITE_OK != sqlite3_reset (nc->stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- ret = nc->iter (nc->iter_cls, nc,
- &nc->lastKey,
- size, data,
- type, priority,
- anonymity, expiration,
- rowid);
- }
- switch (ret)
- {
- case GNUNET_SYSERR:
- nc->end_it = GNUNET_YES;
- break;
- case GNUNET_NO:
- if (GNUNET_OK == delete_by_rowid (plugin, rowid))
- {
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Removed entry %llu (%u bytes)\n",
- (unsigned long long) rowid,
- size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
-#endif
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- }
- break;
- case GNUNET_YES:
- break;
- default:
- GNUNET_break (0);
- }
-}
-
-
-/**
- * Function invoked on behalf of a "PluginIterator" asking the
- * database plugin to call the iterator with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-static void
-sqlite_next_request (void *next_cls,
- int end_it)
-{
- struct NextContext * nc= next_cls;
-
- if (GNUNET_YES == end_it)
- nc->end_it = GNUNET_YES;
- nc->plugin->next_task_nc = nc;
- nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
- nc);
-}
-
-
-/**
* Store an item in the datastore.
*
* @param cls closure
@@ -849,355 +610,147 @@ sqlite_plugin_update (void *cls,
/**
- * Internal context for an iteration.
- */
-struct ZeroIterContext
-{
- /**
- * First iterator statement for zero-anonymity iteration.
- */
- sqlite3_stmt *stmt_1;
-
- /**
- * Second iterator statement for zero-anonymity iteration.
- */
- sqlite3_stmt *stmt_2;
-
- /**
- * Desired type for blocks returned by this iterator.
- */
- enum GNUNET_BLOCK_Type type;
-};
-
-
-/**
- * Prepare our SQL query to obtain the next record from the database.
+ * Execute statement that gets a row and call the callback
+ * with the result. Resets the statement afterwards.
*
- * @param cls our "struct ZeroIterContext"
- * @param nc NULL to terminate the iteration, otherwise our context for
- * getting the next result.
- * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
- * GNUNET_SYSERR on error (or end of iteration)
+ * @param plugin the plugin
+ * @param stmt the statement
+ * @param proc processor to call
+ * @param proc_cls closure for 'proc'
*/
-static int
-zero_iter_next_prepare (void *cls,
- struct NextContext *nc)
+static void
+execute_get (struct Plugin *plugin,
+ sqlite3_stmt *stmt,
+ PluginDatumProcessor proc, void *proc_cls)
{
- struct ZeroIterContext *ic = cls;
- struct Plugin *plugin;
+ int n;
+ struct GNUNET_TIME_Absolute expiration;
+ unsigned long long rowid;
+ unsigned int size;
int ret;
- if (nc == NULL)
- {
-#if DEBUG_SQLITE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to clean up iterator state.\n");
-#endif
- sqlite3_finalize (ic->stmt_1);
- sqlite3_finalize (ic->stmt_2);
- return GNUNET_SYSERR;
- }
- plugin = nc->plugin;
-
- /* first try iter 1 */
-#if DEBUG_SQLITE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Restricting to results larger than the last priority %u and key `%s'\n",
- nc->lastPriority,
- GNUNET_h2s (&nc->lastKey));
-#endif
- if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
- (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2,
- &nc->lastKey,
- sizeof (GNUNET_HashCode),
- SQLITE_TRANSIENT)) )
+ n = sqlite3_step (stmt);
+ switch (n)
{
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
- if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
+ case SQLITE_ROW:
+ size = sqlite3_column_bytes (stmt, 5);
+ rowid = sqlite3_column_int64 (stmt, 6);
+ if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
+ {
+ GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
+ "sqlite",
+ _("Invalid data in database. Trying to fix (by deletion).\n"));
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ if (GNUNET_OK == delete_by_rowid (plugin, rowid))
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+ break;
+ }
+ expiration.abs_value = sqlite3_column_int64 (stmt, 3);
+ ret = proc (proc_cls,
+ sqlite3_column_blob (stmt, 4) /* key */,
+ size,
+ sqlite3_column_blob (stmt, 5) /* data */,
+ sqlite3_column_int (stmt, 0) /* type */,
+ sqlite3_column_int (stmt, 1) /* priority */,
+ sqlite3_column_int (stmt, 2) /* anonymity */,
+ expiration,
+ rowid);
+ if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
- return GNUNET_SYSERR;
- }
- if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
- {
-#if DEBUG_SQLITE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Result found using iterator 1\n");
-#endif
- nc->stmt = ic->stmt_1;
- return GNUNET_OK;
- }
- if (ret != SQLITE_DONE)
- {
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ if ( (GNUNET_NO == ret) &&
+ (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
+ return;
+ case SQLITE_DONE:
+ /* database must be empty */
+ if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
- return GNUNET_SYSERR;
- }
- if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
-
- /* now try iter 2 */
- if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
- {
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
- return GNUNET_SYSERR;
- }
- if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2)))
- {
-#if DEBUG_SQLITE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Result found using iterator 2\n");
-#endif
- nc->stmt = ic->stmt_2;
- return GNUNET_OK;
- }
- if (ret != SQLITE_DONE)
- {
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ break;
+ case SQLITE_BUSY:
+ case SQLITE_ERROR:
+ case SQLITE_MISUSE:
+ default:
LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_step");
- if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
+ if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK,
"sqlite3_reset");
- return GNUNET_SYSERR;
+ GNUNET_break (0);
+ database_shutdown (plugin);
+ database_setup (plugin->env->cfg,
+ plugin);
+ break;
}
- if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
+ if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
-#if DEBUG_SQLITE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No result found using either iterator\n");
-#endif
- return GNUNET_NO;
+ GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
+
/**
* Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * the given processor for the item.
*
* @param cls our plugin context
* @param type entries of which type should be considered?
* Use 0 for any type.
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
* will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
*/
static void
-sqlite_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+sqlite_plugin_get_zero_anonymity (void *cls,
+ uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
struct Plugin *plugin = cls;
- struct GNUNET_TIME_Absolute now;
- struct NextContext *nc;
- struct ZeroIterContext *ic;
- sqlite3_stmt *stmt_1;
- sqlite3_stmt *stmt_2;
- char *q;
+ sqlite3_stmt *stmt;
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
- now = GNUNET_TIME_absolute_get ();
- GNUNET_asprintf (&q,
- "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
- "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) "
- "ORDER BY hash DESC LIMIT 1",
- (unsigned long long) now.abs_value,
- type);
- if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
+ stmt = plugin->selZeroAnon;
+ if ( (SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) ||
+ (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, offset)) )
{
LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (q);
- return;
- }
- GNUNET_free (q);
- GNUNET_asprintf (&q,
- "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
- "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) "
- "ORDER BY prio DESC, hash DESC LIMIT 1",
- (unsigned long long) now.abs_value,
- type);
- if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
- {
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
- sqlite3_finalize (stmt_1);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (q);
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind_XXXX");
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ LOG_SQLITE (plugin, NULL,
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- GNUNET_free (q);
- nc = GNUNET_malloc (sizeof(struct NextContext) +
- sizeof(struct ZeroIterContext));
- nc->plugin = plugin;
- nc->iter = iter;
- nc->iter_cls = iter_cls;
- nc->stmt = NULL;
- ic = (struct ZeroIterContext*) &nc[1];
- ic->stmt_1 = stmt_1;
- ic->stmt_2 = stmt_2;
- ic->type = type;
- nc->prep = &zero_iter_next_prepare;
- nc->prep_cls = ic;
- nc->lastPriority = INT32_MAX;
- memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
- sqlite_next_request (nc, GNUNET_NO);
+ execute_get (plugin, stmt, proc, proc_cls);
}
-/**
- * Context for get_next_prepare.
- */
-struct GetNextContext
-{
-
- /**
- * Our prepared statement.
- */
- sqlite3_stmt *stmt;
-
- /**
- * Plugin handle.
- */
- struct Plugin *plugin;
-
- /**
- * Key for the query.
- */
- GNUNET_HashCode key;
-
- /**
- * Vhash for the query.
- */
- GNUNET_HashCode vhash;
-
- /**
- * Expected total number of results.
- */
- unsigned int total;
-
- /**
- * Offset to add for the selected result.
- */
- unsigned int off;
-
- /**
- * Is vhash set?
- */
- int have_vhash;
-
- /**
- * Desired block type.
- */
- enum GNUNET_BLOCK_Type type;
-
-};
-
-
-/**
- * Prepare the stmt in 'nc' for the next round of execution, selecting the
- * next return value.
- *
- * @param cls our "struct GetNextContext*"
- * @param nc the general context
- * @return GNUNET_YES if there are more results,
- * GNUNET_NO if there are no more results,
- * GNUNET_SYSERR on internal error
- */
-static int
-get_next_prepare (void *cls,
- struct NextContext *nc)
-{
- struct GetNextContext *gnc = cls;
- int ret;
- int limit_off;
- unsigned int sqoff;
-
- if (nc == NULL)
- {
- sqlite3_finalize (gnc->stmt);
- return GNUNET_SYSERR;
- }
- if (nc->count == gnc->total)
- return GNUNET_NO;
- if (nc->count + gnc->off == gnc->total)
- nc->last_rowid = 0;
- if (nc->count == 0)
- limit_off = gnc->off;
- else
- limit_off = 0;
- sqlite3_reset (nc->stmt);
- sqoff = 1;
- ret = sqlite3_bind_blob (nc->stmt,
- sqoff++,
- &gnc->key,
- sizeof (GNUNET_HashCode),
- SQLITE_TRANSIENT);
- if ((gnc->have_vhash) && (ret == SQLITE_OK))
- ret = sqlite3_bind_blob (nc->stmt,
- sqoff++,
- &gnc->vhash,
- sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
- if ((gnc->type != 0) && (ret == SQLITE_OK))
- ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
- if (ret == SQLITE_OK)
- ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
- if (ret != SQLITE_OK)
- return GNUNET_SYSERR;
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Preparing to GET for key `%s' with type %d at offset %u\n",
- GNUNET_h2s (&gnc->key),
- gnc->type,
- limit_off);
-#endif
- ret = sqlite3_step (nc->stmt);
- switch (ret)
- {
- case SQLITE_ROW:
- return GNUNET_OK;
- case SQLITE_DONE:
- return GNUNET_NO;
- default:
- LOG_SQLITE (gnc->plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- return GNUNET_SYSERR;
- }
-}
-
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get results for a particular key in the datastore.
*
* @param cls closure
+ * @param offset offset (mod count).
* @param key key to match, never NULL
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
@@ -1206,27 +759,27 @@ get_next_prepare (void *cls,
* there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
* will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
*/
static void
-sqlite_plugin_get (void *cls,
- const GNUNET_HashCode *key,
- const GNUNET_HashCode *vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_key (void *cls,
+ uint64_t offset,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
- struct GetNextContext *gnc;
- struct NextContext *nc;
int ret;
int total;
+ int limit_off;
+ unsigned int sqoff;
sqlite3_stmt *stmt;
char scratch[256];
- unsigned int sqoff;
- GNUNET_assert (iter != NULL);
+ GNUNET_assert (proc != NULL);
GNUNET_assert (key != NULL);
GNUNET_snprintf (scratch, sizeof (scratch),
"SELECT count(*) FROM gn090 WHERE hash=?%s%s",
@@ -1236,7 +789,7 @@ sqlite_plugin_get (void *cls,
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
sqoff = 1;
@@ -1253,7 +806,7 @@ sqlite_plugin_get (void *cls,
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
sqlite3_finalize (stmt);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
ret = sqlite3_step (stmt);
@@ -1263,147 +816,64 @@ sqlite_plugin_get (void *cls,
GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK,
"sqlite_step");
sqlite3_finalize (stmt);
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
total = sqlite3_column_int (stmt, 0);
sqlite3_finalize (stmt);
if (0 == total)
{
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
+ limit_off = (int) (offset % total);
+ if (limit_off < 0)
+ limit_off += total;
GNUNET_snprintf (scratch, sizeof (scratch),
"SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
"FROM gn090 WHERE hash=?%s%s "
"ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
vhash == NULL ? "" : " AND vhash=?",
type == 0 ? "" : " AND type=?");
-
if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
{
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR |
GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- nc = GNUNET_malloc (sizeof(struct NextContext) +
- sizeof(struct GetNextContext));
- nc->plugin = plugin;
- nc->iter = iter;
- nc->iter_cls = iter_cls;
- nc->stmt = stmt;
- gnc = (struct GetNextContext*) &nc[1];
- gnc->total = total;
- gnc->type = type;
- gnc->key = *key;
- gnc->plugin = plugin;
- gnc->stmt = stmt; /* alias used for freeing at the end! */
- if (NULL != vhash)
- {
- gnc->have_vhash = GNUNET_YES;
- gnc->vhash = *vhash;
- }
- gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
- nc->prep = &get_next_prepare;
- nc->prep_cls = gnc;
- sqlite_next_request (nc, GNUNET_NO);
-}
-
-
-/**
- * Execute statement that gets a row and call the callback
- * with the result. Resets the statement afterwards.
- *
- * @param plugin the plugin
- * @param stmt the statement
- * @param iter iterator to call
- * @param iter_cls closure for 'iter'
- */
-static void
-execute_get (struct Plugin *plugin,
- sqlite3_stmt *stmt,
- PluginIterator iter, void *iter_cls)
-{
- int n;
- struct GNUNET_TIME_Absolute expiration;
- unsigned long long rowid;
- unsigned int size;
- int ret;
-
- n = sqlite3_step (stmt);
- switch (n)
+ sqoff = 1;
+ ret = sqlite3_bind_blob (stmt,
+ sqoff++,
+ key,
+ sizeof (GNUNET_HashCode),
+ SQLITE_TRANSIENT);
+ if ((vhash != NULL) && (ret == SQLITE_OK))
+ ret = sqlite3_bind_blob (stmt,
+ sqoff++,
+ vhash,
+ sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
+ if ((type != 0) && (ret == SQLITE_OK))
+ ret = sqlite3_bind_int (stmt, sqoff++, type);
+ if (ret == SQLITE_OK)
+ ret = sqlite3_bind_int64 (stmt, sqoff++, limit_off);
+ if (ret != SQLITE_OK)
{
- case SQLITE_ROW:
- size = sqlite3_column_bytes (stmt, 5);
- rowid = sqlite3_column_int64 (stmt, 6);
- if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
- "sqlite",
- _("Invalid data in database. Trying to fix (by deletion).\n"));
- if (SQLITE_OK != sqlite3_reset (stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- if (GNUNET_OK == delete_by_rowid (plugin, rowid))
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- break;
- }
- expiration.abs_value = sqlite3_column_int64 (stmt, 3);
- ret = iter (iter_cls,
- NULL,
- sqlite3_column_blob (stmt, 4) /* key */,
- size,
- sqlite3_column_blob (stmt, 5) /* data */,
- sqlite3_column_int (stmt, 0) /* type */,
- sqlite3_column_int (stmt, 1) /* priority */,
- sqlite3_column_int (stmt, 2) /* anonymity */,
- expiration,
- rowid);
- if (SQLITE_OK != sqlite3_reset (stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- if ( (GNUNET_NO == ret) &&
- (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
- plugin->env->duc (plugin->env->cls,
- - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
- return;
- case SQLITE_DONE:
- /* database must be empty */
- if (SQLITE_OK != sqlite3_reset (stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- break;
- case SQLITE_BUSY:
- case SQLITE_ERROR:
- case SQLITE_MISUSE:
- default:
LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- if (SQLITE_OK != sqlite3_reset (stmt))
- LOG_SQLITE (plugin, NULL,
- GNUNET_ERROR_TYPE_ERROR |
- GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
- GNUNET_break (0);
- database_shutdown (plugin);
- database_setup (plugin->env->cfg,
- plugin);
- break;
+ GNUNET_ERROR_TYPE_ERROR |
+ GNUNET_ERROR_TYPE_BULK, "sqlite_bind");
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ return;
}
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ execute_get (plugin, stmt, proc, proc_cls);
+ sqlite3_finalize (stmt);
}
+
/**
- * Context for 'repl_iter' function.
+ * Context for 'repl_proc' function.
*/
struct ReplCtx
{
@@ -1416,22 +886,21 @@ struct ReplCtx
/**
* Function to call for the result (or the NULL).
*/
- PluginIterator iter;
+ PluginDatumProcessor proc;
/**
- * Closure for iter.
+ * Closure for proc.
*/
- void *iter_cls;
+ void *proc_cls;
};
/**
- * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Wrapper for the processor for 'sqlite_plugin_replication_get'.
* Decrements the replication counter and calls the original
- * iterator.
+ * processor.
*
* @param cls closure
- * @param next_cls closure to pass to the "next" function.
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
@@ -1442,13 +911,11 @@ struct ReplCtx
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
*
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
- * (continue on call to "next", of course),
- * GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK for normal return,
+ * GNUNET_NO to delete the item
*/
static int
-repl_iter (void *cls,
- void *next_cls,
+repl_proc (void *cls,
const GNUNET_HashCode *key,
uint32_t size,
const void *data,
@@ -1462,8 +929,8 @@ repl_iter (void *cls,
struct Plugin *plugin = rc->plugin;
int ret;
- ret = rc->iter (rc->iter_cls,
- next_cls, key,
+ ret = rc->proc (rc->proc_cls,
+ key,
size, data,
type, priority, anonymity, expiration,
uid);
@@ -1494,15 +961,15 @@ repl_iter (void *cls,
* Get a random item for replication. Returns a single random item
* from those with the highest replication counters. The item's
* replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-sqlite_plugin_replication_get (void *cls,
- PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_replication (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
struct ReplCtx rc;
@@ -1513,24 +980,24 @@ sqlite_plugin_replication_get (void *cls,
"Getting random block based on replication order.\n");
#endif
rc.plugin = plugin;
- rc.iter = iter;
- rc.iter_cls = iter_cls;
- execute_get (plugin, plugin->selRepl, &repl_iter, &rc);
+ rc.proc = proc;
+ rc.proc_cls = proc_cls;
+ execute_get (plugin, plugin->selRepl, &repl_proc, &rc);
}
/**
* Get a random item that has expired or has low priority.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-sqlite_plugin_expiration_get (void *cls,
- PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_expiration (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
@@ -1550,11 +1017,11 @@ sqlite_plugin_expiration_get (void *cls,
if (SQLITE_OK != sqlite3_reset (stmt))
LOG_SQLITE (plugin, NULL,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
+ proc (proc_cls, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- execute_get (plugin, stmt, iter, iter_cls);
+ execute_get (plugin, stmt, proc, proc_cls);
}
@@ -1579,7 +1046,7 @@ sqlite_plugin_drop (void *cls)
* @return the size of the database on disk (estimate)
*/
static unsigned long long
-sqlite_plugin_get_size (void *cls)
+sqlite_plugin_estimate_size (void *cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
@@ -1653,14 +1120,13 @@ libgnunet_plugin_datastore_sqlite_init (void *cls)
}
api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
api->cls = &plugin;
- api->get_size = &sqlite_plugin_get_size;
+ api->estimate_size = &sqlite_plugin_estimate_size;
api->put = &sqlite_plugin_put;
- api->next_request = &sqlite_next_request;
- api->get = &sqlite_plugin_get;
- api->replication_get = &sqlite_plugin_replication_get;
- api->expiration_get = &sqlite_plugin_expiration_get;
api->update = &sqlite_plugin_update;
- api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
+ api->get_key = &sqlite_plugin_get_key;
+ api->get_replication = &sqlite_plugin_get_replication;
+ api->get_expiration = &sqlite_plugin_get_expiration;
+ api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity;
api->drop = &sqlite_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"sqlite", _("Sqlite database running\n"));
@@ -1684,27 +1150,9 @@ libgnunet_plugin_datastore_sqlite_done (void *cls)
#if DEBUG_SQLITE
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"sqlite",
- "sqlite plugin is doneing\n");
+ "sqlite plugin is done\n");
#endif
- if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
- {
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Canceling next task\n");
-#endif
- GNUNET_SCHEDULER_cancel (plugin->next_task);
- plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-#if DEBUG_SQLITE
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "sqlite",
- "Prep'ing next task\n");
-#endif
- plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
- GNUNET_free (plugin->next_task_nc);
- plugin->next_task_nc = NULL;
- }
fn = NULL;
if (plugin->drop_on_shutdown)
fn = GNUNET_strdup (plugin->fn);
diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c
index 40b191538..6228e8c0c 100644
--- a/src/datastore/plugin_datastore_template.c
+++ b/src/datastore/plugin_datastore_template.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2009 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -47,7 +47,8 @@ struct Plugin
* @param cls our "struct Plugin*"
* @return number of bytes used on disk
*/
-static unsigned long long template_plugin_get_size (void *cls)
+static unsigned long long
+template_plugin_estimate_size (void *cls)
{
GNUNET_break (0);
return 0;
@@ -88,30 +89,11 @@ template_plugin_put (void *cls,
/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-static void
-template_plugin_next_request (void *next_cls,
- int end_it)
-{
- GNUNET_break (0);
-}
-
-
-/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
* @param cls closure
+ * @param offset offset of the result (mod #num-results);
+ * specific ordering does not matter for the offset
* @param key maybe NULL (to match all entries)
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
@@ -120,16 +102,17 @@ template_plugin_next_request (void *next_cls,
* there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc function to call on each matching value;
+ * will be called with NULL if nothing matches
+ * @param proc_cls closure for proc
*/
static void
-template_plugin_get (void *cls,
- const GNUNET_HashCode * key,
- const GNUNET_HashCode * vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls)
+template_plugin_get_key (void *cls,
+ uint64_t offset,
+ const GNUNET_HashCode * key,
+ const GNUNET_HashCode * vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls)
{
GNUNET_break (0);
}
@@ -137,34 +120,35 @@ template_plugin_get (void *cls,
/**
- * Get a random item for replication. Returns a single, not expired, random item
- * from those with the highest replication counters. The item's
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication. Returns a single, not expired,
+ * random item from those with the highest replication counters. The
+ * item's replication counter is decremented by one IF it was positive
+ * before. Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-template_plugin_replication_get (void *cls,
- PluginIterator iter, void *iter_cls)
+template_plugin_get_replication (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
GNUNET_break (0);
}
/**
- * Get a random item for expiration.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for expiration. Call 'proc' with all values ZERO
+ * or NULL if the datastore is empty.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
static void
-template_plugin_expiration_get (void *cls,
- PluginIterator iter, void *iter_cls)
+template_plugin_get_expiration (void *cls,
+ PluginDatumProcessor proc, void *proc_cls)
{
GNUNET_break (0);
}
@@ -196,7 +180,8 @@ template_plugin_expiration_get (void *cls,
static int
template_plugin_update (void *cls,
uint64_t uid,
- int delta, struct GNUNET_TIME_Absolute expire,
+ int delta,
+ struct GNUNET_TIME_Absolute expire,
char **msg)
{
GNUNET_break (0);
@@ -206,21 +191,23 @@ template_plugin_update (void *cls,
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Call the given processor on an item with zero anonymity.
*
* @param cls our "struct Plugin*"
+ * @param offset offset of the result (mod #num-results);
+ * specific ordering does not matter for the offset
* @param type entries of which type should be considered?
* Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc function to call on each matching value;
+ * will be called with NULL if no value matches
+ * @param proc_cls closure for proc
*/
static void
-template_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+template_plugin_get_zero_anonymity (void *cls,
+ uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls)
{
GNUNET_break (0);
}
@@ -253,14 +240,13 @@ libgnunet_plugin_datastore_template_init (void *cls)
plugin->env = env;
api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
api->cls = plugin;
- api->get_size = &template_plugin_get_size;
+ api->estimate_size = &template_plugin_estimate_size;
api->put = &template_plugin_put;
- api->next_request = &template_plugin_next_request;
- api->get = &template_plugin_get;
- api->replication_get = &template_plugin_replication_get;
- api->expiration_get = &template_plugin_expiration_get;
api->update = &template_plugin_update;
- api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity;
+ api->get_key = &template_plugin_get_key;
+ api->get_replication = &template_plugin_get_replication;
+ api->get_expiration = &template_plugin_get_expiration;
+ api->get_zero_anonymity = &template_plugin_get_zero_anonymity;
api->drop = &template_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"template", _("Template database running\n"));
diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c
index 4a4bbc439..deeee7164 100644
--- a/src/datastore/test_datastore_api.c
+++ b/src/datastore/test_datastore_api.c
@@ -102,20 +102,18 @@ get_expiration (int i)
enum RunPhase
{
RP_DONE = 0,
- RP_PUT,
- RP_GET,
- RP_DEL,
- RP_DO_DEL,
- RP_DELVALIDATE,
- RP_RESERVE,
- RP_PUT_MULTIPLE,
- RP_PUT_MULTIPLE_NEXT,
- RP_GET_MULTIPLE,
- RP_GET_MULTIPLE_NEXT, /* 10 */
- RP_GET_MULTIPLE_DONE,
- RP_UPDATE,
- RP_UPDATE_VALIDATE, /* 13 */
- RP_UPDATE_DONE,
+ RP_PUT = 1,
+ RP_GET = 2,
+ RP_DEL = 3,
+ RP_DO_DEL = 4,
+ RP_DELVALIDATE = 5,
+ RP_RESERVE = 6,
+ RP_PUT_MULTIPLE = 7,
+ RP_PUT_MULTIPLE_NEXT = 8,
+ RP_GET_MULTIPLE = 9,
+ RP_GET_MULTIPLE_NEXT = 10,
+ RP_UPDATE = 11,
+ RP_UPDATE_VALIDATE = 12,
RP_ERROR
};
@@ -129,7 +127,9 @@ struct CpsRunContext
void *data;
size_t size;
enum RunPhase phase;
- unsigned long long uid;
+ uint64_t uid;
+ uint64_t offset;
+ uint64_t first_uid;
};
@@ -144,16 +144,15 @@ check_success (void *cls,
const char *msg)
{
struct CpsRunContext *crc = cls;
+
if (GNUNET_OK != success)
{
- ok = 42;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Operation not successfull: `%s'\n", msg);
+ "Operation %d/%d not successfull: `%s'\n",
+ crc->phase,
+ crc->i,
+ msg);
crc->phase = RP_ERROR;
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
}
GNUNET_free_non_null (crc->data);
crc->data = NULL;
@@ -171,7 +170,8 @@ get_reserved (void *cls,
struct CpsRunContext *crc = cls;
if (0 >= success)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%s\n", msg);
+ "Error obtaining reservation: `%s'\n",
+ msg);
GNUNET_assert (0 < success);
crc->rid = success;
GNUNET_SCHEDULER_add_continuation (&run_continuation,
@@ -188,42 +188,48 @@ check_value (void *cls,
enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid)
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
- static int matched;
struct CpsRunContext *crc = cls;
int i;
- if (key == NULL)
- {
- if (crc->i == 0)
- {
- crc->phase = RP_DEL;
- crc->i = ITERATIONS;
- }
- GNUNET_assert (matched == GNUNET_YES);
- matched = GNUNET_NO;
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
- }
i = crc->i;
+#if 0
+ fprintf (stderr,
+ "Check value got `%s' of size %u, type %d, expire %llu\n",
+ GNUNET_h2s (key),
+ (unsigned int) size,
+ type,
+ (unsigned long long) expiration.abs_value);
+ fprintf (stderr,
+ "Check value iteration %d wants size %u, type %d, expire %llu\n",
+ i,
+ (unsigned int) get_size (i),
+ get_type (i),
+ (unsigned long long) get_expiration(i).abs_value);
+#endif
GNUNET_assert (size == get_size (i));
GNUNET_assert (0 == memcmp (data, get_data(i), size));
GNUNET_assert (type == get_type (i));
GNUNET_assert (priority == get_priority (i));
GNUNET_assert (anonymity == get_anonymity(i));
GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
- matched = GNUNET_YES;
- GNUNET_DATASTORE_iterate_get_next (datastore);
+ crc->offset++;
+ if (crc->i == 0)
+ {
+ crc->phase = RP_DEL;
+ crc->i = ITERATIONS;
+ }
+ GNUNET_SCHEDULER_add_continuation (&run_continuation,
+ crc,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
static void
delete_value (void *cls,
- const GNUNET_HashCode * key,
+ const GNUNET_HashCode *key,
size_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
@@ -233,36 +239,23 @@ delete_value (void *cls,
expiration, uint64_t uid)
{
struct CpsRunContext *crc = cls;
- if (key == NULL)
- {
- if (crc->data == NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Content %u not found!\n",
- crc->i);
- crc->phase = RP_ERROR;
- }
- else
- {
- crc->phase = RP_DO_DEL;
- }
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
- }
+
GNUNET_assert (crc->data == NULL);
+ GNUNET_assert (NULL != key);
crc->size = size;
crc->key = *key;
crc->data = GNUNET_malloc (size);
memcpy (crc->data, data, size);
- GNUNET_DATASTORE_iterate_get_next (datastore);
+ crc->phase = RP_DO_DEL;
+ GNUNET_SCHEDULER_add_continuation (&run_continuation,
+ crc,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
static void
check_nothing (void *cls,
- const GNUNET_HashCode * key,
+ const GNUNET_HashCode *key,
size_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
@@ -272,11 +265,10 @@ check_nothing (void *cls,
expiration, uint64_t uid)
{
struct CpsRunContext *crc = cls;
+
GNUNET_assert (key == NULL);
if (crc->i == 0)
- {
- crc->phase = RP_RESERVE;
- }
+ crc->phase = RP_RESERVE;
GNUNET_SCHEDULER_add_continuation (&run_continuation,
crc,
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
@@ -296,47 +288,28 @@ check_multiple (void *cls,
{
struct CpsRunContext *crc = cls;
- if (key == NULL)
- {
- if (crc->phase != RP_GET_MULTIPLE_DONE)
- {
- fprintf (stderr,
- "Wrong phase: %d\n",
- crc->phase);
- GNUNET_break (0);
- crc->phase = RP_ERROR;
- }
- else
- {
- crc->phase = RP_UPDATE;
- }
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
- }
+ GNUNET_assert (key != NULL);
switch (crc->phase)
{
case RP_GET_MULTIPLE:
crc->phase = RP_GET_MULTIPLE_NEXT;
+ crc->first_uid = uid;
+ crc->offset++;
break;
case RP_GET_MULTIPLE_NEXT:
- crc->phase = RP_GET_MULTIPLE_DONE;
- break;
- case RP_GET_MULTIPLE_DONE:
- /* do not advance further */
+ GNUNET_assert (uid != crc->first_uid);
+ crc->phase = RP_UPDATE;
break;
default:
GNUNET_break (0);
+ crc->phase = RP_ERROR;
break;
}
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Test in phase %u\n", crc->phase);
-#endif
if (priority == get_priority (42))
crc->uid = uid;
- GNUNET_DATASTORE_iterate_get_next (datastore);
+ GNUNET_SCHEDULER_add_continuation (&run_continuation,
+ crc,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
@@ -353,31 +326,19 @@ check_update (void *cls,
{
struct CpsRunContext *crc = cls;
- if (key == NULL)
- {
- if (crc->phase != RP_UPDATE_DONE)
- {
- GNUNET_break (0);
- crc->phase = RP_ERROR;
- }
- else
- {
- crc->phase = RP_DONE;
- }
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
- }
+ GNUNET_assert (key != NULL);
if ( (anonymity == get_anonymity (42)) &&
(size == get_size (42)) &&
(priority == get_priority (42) + 100) )
+ crc->phase = RP_DONE;
+ else
{
- crc->phase = RP_UPDATE_DONE;
+ GNUNET_assert (size == get_size (43));
+ crc->offset++;
}
- else
- GNUNET_assert (size == get_size (43));
- GNUNET_DATASTORE_iterate_get_next (datastore);
+ GNUNET_SCHEDULER_add_continuation (&run_continuation,
+ crc,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
@@ -427,12 +388,13 @@ run_continuation (void *cls,
crc->i);
#endif
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (crc->i),
- 1, 1, TIMEOUT,
- &check_value,
- crc);
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (crc->i),
+ 1, 1, TIMEOUT,
+ &check_value,
+ crc);
break;
case RP_DEL:
crc->i--;
@@ -444,12 +406,14 @@ run_continuation (void *cls,
#endif
crc->data = NULL;
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (crc->i),
- 1, 1, TIMEOUT,
- &delete_value,
- crc);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (crc->i),
+ 1, 1, TIMEOUT,
+ &delete_value,
+ crc));
break;
case RP_DO_DEL:
#if VERBOSE
@@ -467,13 +431,14 @@ run_continuation (void *cls,
{
crc->phase = RP_DEL;
}
- GNUNET_DATASTORE_remove (datastore,
- &crc->key,
- crc->size,
- crc->data,
- 1, 1, TIMEOUT,
- &check_success,
- crc);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_remove (datastore,
+ &crc->key,
+ crc->size,
+ crc->data,
+ 1, 1, TIMEOUT,
+ &check_success,
+ crc));
break;
case RP_DELVALIDATE:
crc->i--;
@@ -484,12 +449,14 @@ run_continuation (void *cls,
crc->i);
#endif
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (crc->i),
- 1, 1, TIMEOUT,
- &check_nothing,
- crc);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (crc->i),
+ 1, 1, TIMEOUT,
+ &check_nothing,
+ crc));
break;
case RP_RESERVE:
crc->phase = RP_PUT_MULTIPLE;
@@ -533,16 +500,24 @@ run_continuation (void *cls,
crc);
break;
case RP_GET_MULTIPLE:
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (42),
- 1, 1, TIMEOUT,
- &check_multiple,
- crc);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (42),
+ 1, 1, TIMEOUT,
+ &check_multiple,
+ crc));
break;
case RP_GET_MULTIPLE_NEXT:
- case RP_GET_MULTIPLE_DONE:
- GNUNET_assert (0);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (42),
+ 1, 1, TIMEOUT,
+ &check_multiple,
+ crc));
break;
case RP_UPDATE:
GNUNET_assert (crc->uid > 0);
@@ -556,15 +531,14 @@ run_continuation (void *cls,
crc);
break;
case RP_UPDATE_VALIDATE:
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (42),
- 1, 1, TIMEOUT,
- &check_update,
- crc);
- break;
- case RP_UPDATE_DONE:
- GNUNET_assert (0);
+ GNUNET_assert (NULL !=
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset,
+ &crc->key,
+ get_type (42),
+ 1, 1, TIMEOUT,
+ &check_update,
+ crc));
break;
case RP_DONE:
#if VERBOSE
@@ -681,6 +655,7 @@ check ()
argv, "test-datastore-api", "nohelp",
options, &run, NULL);
#if START_DATASTORE
+ sleep (1); /* give datastore chance to receive 'DROP' request */
if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
diff --git a/src/datastore/test_datastore_api_data_sqlite.conf b/src/datastore/test_datastore_api_data_sqlite.conf
index d7c01fe22..931572025 100644
--- a/src/datastore/test_datastore_api_data_sqlite.conf
+++ b/src/datastore/test_datastore_api_data_sqlite.conf
@@ -29,7 +29,7 @@ DATABASE = sqlite
# REJECT_FROM =
# REJECT_FROM6 =
# PREFIX =
-# DEBUG = YES
+#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-datastore
diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c
index 5dfb5cea7..41aa7ae3e 100644
--- a/src/datastore/test_datastore_api_management.c
+++ b/src/datastore/test_datastore_api_management.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -97,9 +97,9 @@ get_expiration (int i)
enum RunPhase
{
- RP_DONE = 0,
RP_PUT,
RP_GET,
+ RP_DONE,
RP_GET_FAIL
};
@@ -112,6 +112,7 @@ struct CpsRunContext
const struct GNUNET_CONFIGURATION_Handle *cfg;
void *data;
enum RunPhase phase;
+ uint64_t offset;
};
@@ -146,42 +147,26 @@ check_value (void *cls,
enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid)
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
struct CpsRunContext *crc = cls;
int i;
- if (key == NULL)
- {
- crc->i--;
- if (crc->found == GNUNET_YES)
- {
- crc->phase = RP_GET;
- crc->found = GNUNET_NO;
- }
- else
- {
- fprintf (stderr,
- "First not found was %u\n", crc->i);
- crc->phase = RP_GET_FAIL;
- }
- if (0 == crc->i)
- crc->phase = RP_DONE;
- GNUNET_SCHEDULER_add_continuation (&run_continuation,
- crc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return;
- }
i = crc->i;
- crc->found = GNUNET_YES;
GNUNET_assert (size == get_size (i));
GNUNET_assert (0 == memcmp (data, get_data(i), size));
GNUNET_assert (type == get_type (i));
GNUNET_assert (priority == get_priority (i));
GNUNET_assert (anonymity == get_anonymity(i));
GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
- GNUNET_DATASTORE_iterate_get_next (datastore);
+ crc->offset++;
+ crc->i--;
+ if (crc->i == 0)
+ crc->phase = RP_DONE;
+ GNUNET_SCHEDULER_add_continuation (&run_continuation,
+ crc,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
@@ -241,7 +226,7 @@ run_continuation (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Sleeping to give datastore time to clean up\n");
- sleep (5);
+ sleep (1);
crc->phase = RP_GET;
crc->i--;
}
@@ -254,12 +239,13 @@ run_continuation (void *cls,
crc->i);
#endif
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (crc->i),
- 1, 1, TIMEOUT,
- &check_value,
- crc);
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset++,
+ &crc->key,
+ get_type (crc->i),
+ 1, 1, TIMEOUT,
+ &check_value,
+ crc);
break;
case RP_GET_FAIL:
#if VERBOSE
@@ -269,12 +255,13 @@ run_continuation (void *cls,
crc->i);
#endif
GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
- GNUNET_DATASTORE_iterate_key (datastore,
- &crc->key,
- get_type (crc->i),
- 1, 1, TIMEOUT,
- &check_nothing,
- crc);
+ GNUNET_DATASTORE_get_key (datastore,
+ crc->offset++,
+ &crc->key,
+ get_type (crc->i),
+ 1, 1, TIMEOUT,
+ &check_nothing,
+ crc);
break;
case RP_DONE:
GNUNET_assert (0 == crc->i);
@@ -372,6 +359,7 @@ check ()
GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
argv, "test-datastore-api", "nohelp",
options, &run, NULL);
+ sleep (1); /* give datastore chance to process 'DROP' request */
if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c
index d38e908ac..f0961f51e 100644
--- a/src/datastore/test_plugin_datastore.c
+++ b/src/datastore/test_plugin_datastore.c
@@ -65,6 +65,7 @@ struct CpsRunContext
enum RunPhase phase;
unsigned int cnt;
unsigned int i;
+ uint64_t offset;
};
@@ -120,6 +121,11 @@ put_value (struct GNUNET_DATASTORE_PluginFunctions * api,
value[0] = k;
msg = NULL;
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
+#if VERBOSE
+ fprintf (stderr,
+ "putting type %u, anon %u under key %s\n",
+ i+1, i, GNUNET_h2s (&key));
+#endif
if (GNUNET_OK != api->put (api->cls,
&key,
size,
@@ -149,9 +155,11 @@ test (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
+static uint64_t guid;
+
+
static int
iterate_one_shot (void *cls,
- void *next_cls,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
@@ -164,57 +172,21 @@ iterate_one_shot (void *cls,
{
struct CpsRunContext *crc = cls;
- GNUNET_assert (NULL == next_cls);
GNUNET_assert (key != NULL);
+ guid = uid;
crc->phase++;
#if VERBOSE
fprintf (stderr,
- "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
+ "Found result type=%u, priority=%u, size=%u, expire=%llu, key %s\n",
type, priority, size,
- (unsigned long long) expiration.abs_value);
+ (unsigned long long) expiration.abs_value,
+ GNUNET_h2s (key));
#endif
GNUNET_SCHEDULER_add_now (&test, crc);
return GNUNET_OK;
}
-static uint64_t guid;
-
-static int
-iterate_with_next (void *cls,
- void *next_cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration,
- uint64_t uid)
-{
- struct CpsRunContext *crc = cls;
-
- if (key == NULL)
- {
- crc->phase++;
- GNUNET_SCHEDULER_add_now (&test, crc);
- return GNUNET_OK;
- }
- guid = uid;
-#if VERBOSE
- fprintf (stderr,
- "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
- type, priority, size,
- (unsigned long long) expiration.abs_value);
-#endif
- crc->cnt++;
- crc->api->next_request (next_cls,
- GNUNET_NO);
- return GNUNET_OK;
-}
-
-
/**
* Function called when the service shuts
* down. Unloads our datastore plugin.
@@ -274,12 +246,19 @@ test (void *cls,
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test aborted.\n");
crc->phase = RP_ERROR;
- ok = 1;
}
+#if VERBOSE
+ fprintf (stderr, "In phase %d, iteration %u\n",
+ crc->phase,
+ crc->cnt);
+#endif
switch (crc->phase)
{
case RP_ERROR:
+ ok = 1;
GNUNET_break (0);
crc->api->drop (crc->api->cls);
GNUNET_SCHEDULER_add_now (&cleaning_task, crc);
@@ -289,7 +268,7 @@ test (void *cls,
for (j=0;j<PUT_10;j++)
{
put_value (crc->api, j, crc->i);
- cs = crc->api->get_size (crc->api->cls);
+ cs = crc->api->estimate_size (crc->api->cls);
GNUNET_assert (os < cs);
os = cs;
}
@@ -305,11 +284,12 @@ test (void *cls,
break;
}
gen_key (5, &key);
- crc->api->get (crc->api->cls,
- &key, NULL,
- GNUNET_BLOCK_TYPE_ANY,
- &iterate_with_next,
- crc);
+ crc->api->get_key (crc->api->cls,
+ crc->offset++,
+ &key, NULL,
+ GNUNET_BLOCK_TYPE_ANY,
+ &iterate_one_shot,
+ crc);
break;
case RP_UPDATE:
GNUNET_assert (GNUNET_OK ==
@@ -329,18 +309,19 @@ test (void *cls,
GNUNET_SCHEDULER_add_now (&test, crc);
break;
}
- crc->api->iter_zero_anonymity (crc->api->cls,
- 1,
- &iterate_with_next,
- crc);
+ crc->api->get_zero_anonymity (crc->api->cls,
+ 0,
+ 1,
+ &iterate_one_shot,
+ crc);
break;
case RP_REPL_GET:
- crc->api->replication_get (crc->api->cls,
+ crc->api->get_replication (crc->api->cls,
&iterate_one_shot,
crc);
break;
case RP_EXPI_GET:
- crc->api->expiration_get (crc->api->cls,
+ crc->api->get_expiration (crc->api->cls,
&iterate_one_shot,
crc);
break;
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index f980f4206..20aa652ae 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -1,4 +1,3 @@
-
INCLUDES = -I$(top_srcdir)/src/include
if MINGW
@@ -173,8 +172,7 @@ check_SCRIPTS = \
test_gnunet_fs_idx.py
endif
-#if !DISABLE_TEST_RUN
-if 0
+if !DISABLE_TEST_RUN
TESTS = \
test_fs_directory \
test_fs_download \
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index 8192b8c1f..8eb2b4331 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -756,10 +756,12 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc,
child_block_size = GNUNET_FS_tree_compute_tree_size (drc->depth);
GNUNET_assert (0 == (drc->offset - dr->offset) % child_block_size);
chk_off = (drc->offset - dr->offset) / child_block_size;
- GNUNET_assert (drc->state == BRS_INIT);
- drc->state = BRS_CHK_SET;
- drc->chk = chks[chk_off];
- try_top_down_reconstruction (dc, drc);
+ if (drc->state == BRS_INIT)
+ {
+ drc->state = BRS_CHK_SET;
+ drc->chk = chks[chk_off];
+ try_top_down_reconstruction (dc, drc);
+ }
if (drc->state != BRS_DOWNLOAD_UP)
up_done = GNUNET_NO; /* children not all done */
}
@@ -815,10 +817,11 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc,
dr->depth,
GNUNET_h2s (&dr->chk.query));
#endif
- GNUNET_assert (GNUNET_NO ==
- GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
- &dr->chk.query,
- dr));
+ if (GNUNET_NO !=
+ GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
+ &dr->chk.query,
+ dr))
+ return; /* already active */
GNUNET_CONTAINER_multihashmap_put (dc->active,
&dr->chk.query,
dr,
diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf
index 68c5166b3..204bb90cf 100644
--- a/src/fs/fs_test_lib_data.conf
+++ b/src/fs/fs_test_lib_data.conf
@@ -43,7 +43,7 @@ HOSTNAME = localhost
#TOTAL_QUOTA_OUT = 9321
TOTAL_QUOTA_IN = 3932160
TOTAL_QUOTA_OUT = 3932160
-DEBUG = YES
+#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-core
@@ -53,8 +53,8 @@ HOSTNAME = localhost
#OPTIONS = -L DEBUG
CONTENT_CACHING = NO
CONTENT_PUSHING = NO
-DEBUG = YES
-#PREFIX = valgrind --tool=memcheck --leak-check=yes
+# DEBUG = YES
+# PREFIX = valgrind --tool=memcheck --leak-check=yes --trace-children=yes
#BINARY = /home/grothoff/gn9/bin/gnunet-service-fs
#PREFIX = xterm -e gdb -x cmd --args
diff --git a/src/fs/gnunet-pseudonym.c b/src/fs/gnunet-pseudonym.c
index 769b4239d..68a760867 100644
--- a/src/fs/gnunet-pseudonym.c
+++ b/src/fs/gnunet-pseudonym.c
@@ -341,7 +341,7 @@ main (int argc, char *const *argv)
0, &GNUNET_GETOPT_set_one, &no_remote_printing},
{'r', "replication", "LEVEL",
gettext_noop ("set the desired replication LEVEL"),
- 0, &GNUNET_GETOPT_set_uint, &bo.replication_level},
+ 1, &GNUNET_GETOPT_set_uint, &bo.replication_level},
{'R', "root", "ID",
gettext_noop
("specify ID of the root of the namespace"),
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 2522cbe7b..acad54501 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -704,9 +704,9 @@ copy_reply (void *cls,
/**
- * Free the given client request.
+ * Free the given request.
*
- * @param cls the client request to free
+ * @param cls the request to free
* @param tc task context
*/
static void
@@ -1182,6 +1182,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
NULL, 0, /* replies_seen */
&handle_p2p_reply,
peerreq);
+ GNUNET_assert (NULL != pr);
peerreq->pr = pr;
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (cp->request_map,
@@ -1427,7 +1428,7 @@ cancel_pending_request (void *cls,
const GNUNET_HashCode *query,
void *value)
{
- struct PeerRequest *peerreq = cls;
+ struct PeerRequest *peerreq = value;
struct GSF_PendingRequest *pr = peerreq->pr;
GSF_pending_request_cancel_ (pr);
diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c
index cc99d3962..dc6b82952 100644
--- a/src/fs/gnunet-service-fs_indexing.c
+++ b/src/fs/gnunet-service-fs_indexing.c
@@ -566,7 +566,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
expiration, uint64_t uid,
- GNUNET_DATASTORE_Iterator cont,
+ GNUNET_DATASTORE_DatumProcessor cont,
void *cont_cls)
{
const struct OnDemandBlock *odb;
diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h
index 6a2c3d4a0..e1154830b 100644
--- a/src/fs/gnunet-service-fs_indexing.h
+++ b/src/fs/gnunet-service-fs_indexing.h
@@ -63,7 +63,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
expiration, uint64_t uid,
- GNUNET_DATASTORE_Iterator cont,
+ GNUNET_DATASTORE_DatumProcessor cont,
void *cont_cls);
/**
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 28036150f..4dc9de1b8 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -158,7 +158,7 @@ plan (struct PeerPlan *pp,
rp->transmission_counter);
#endif
-
+ GNUNET_assert (rp->hn == NULL);
if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
rp,
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 7406bed0f..c1074e8bf 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -100,6 +100,20 @@ struct GSF_PendingRequest
GNUNET_PEER_Id sender_pid;
/**
+ * Current offset for querying our local datastore for results.
+ * Starts at a random value, incremented until we get the same
+ * UID again (detected using 'first_uid'), which is then used
+ * to termiante the iteration.
+ */
+ uint64_t local_result_offset;
+
+ /**
+ * Unique ID of the first result from the local datastore;
+ * used to detect wrap-around of the offset.
+ */
+ uint64_t first_uid;
+
+ /**
* Number of valid entries in the 'replies_seen' array.
*/
unsigned int replies_seen_count;
@@ -113,7 +127,7 @@ struct GSF_PendingRequest
* Mingle value we currently use for the bf.
*/
uint32_t mingle;
-
+
};
@@ -273,6 +287,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
type);
#endif
pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
+ pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT64_MAX);
pr->public_data.query = *query;
if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
{
@@ -535,7 +551,20 @@ clean_request (void *cls,
void *value)
{
struct GSF_PendingRequest *pr = value;
-
+ GSF_LocalLookupContinuation cont;
+
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cleaning up pending request for `%s'.\n",
+ GNUNET_h2s (key));
+#endif
+ if (NULL != (cont = pr->llc_cont))
+ {
+ pr->llc_cont = NULL;
+ cont (pr->llc_cont_cls,
+ pr,
+ pr->local_result);
+ }
GSF_plan_notify_request_done_ (pr);
GNUNET_free_non_null (pr->replies_seen);
if (NULL != pr->bf)
@@ -560,6 +589,7 @@ clean_request (void *cls,
void
GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
{
+ if (NULL == pr_map) return; /* already cleaned up! */
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (pr_map,
&pr->public_data.query,
@@ -1023,13 +1053,22 @@ process_local_reply (void *cls,
GNUNET_HashCode query;
unsigned int old_rf;
+ pr->qe = NULL;
+ if (0 == pr->replies_seen_count)
+ {
+ pr->first_uid = uid;
+ }
+ else
+ {
+ if (uid == pr->first_uid)
+ key = NULL; /* all replies seen! */
+ }
if (NULL == key)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"No further local responses available.\n");
#endif
- pr->qe = NULL;
if (NULL != (cont = pr->llc_cont))
{
pr->llc_cont = NULL;
@@ -1041,9 +1080,10 @@ process_local_reply (void *cls,
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New local response to `%s' of type %u.\n",
+ "Received reply for `%s' of type %d with UID %llu from datastore.\n",
GNUNET_h2s (key),
- type);
+ type,
+ (unsigned long long) uid);
#endif
if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
{
@@ -1061,8 +1101,22 @@ process_local_reply (void *cls,
&process_local_reply,
pr))
{
- if (pr->qe != NULL)
- GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+ pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ pr->local_result_offset - 1,
+ &pr->public_data.query,
+ pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
+ ? GNUNET_BLOCK_TYPE_ANY
+ : pr->public_data.type,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* queue priority */,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* max queue size */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_local_reply,
+ pr);
+ GNUNET_assert (NULL != pr->qe);
}
return;
}
@@ -1085,7 +1139,22 @@ process_local_reply (void *cls,
-1, -1,
GNUNET_TIME_UNIT_FOREVER_REL,
NULL, NULL);
- GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+ pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ pr->local_result_offset - 1,
+ &pr->public_data.query,
+ pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
+ ? GNUNET_BLOCK_TYPE_ANY
+ : pr->public_data.type,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* queue priority */,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* max queue size */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_local_reply,
+ pr);
+ GNUNET_assert (NULL != pr->qe);
return;
}
prq.type = type;
@@ -1097,12 +1166,16 @@ process_local_reply (void *cls,
GSF_update_datastore_delay_ (pr->public_data.start_time);
process_reply (&prq, key, pr);
pr->local_result = prq.eval;
- if (pr->qe == NULL)
+ if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
{
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request cancelled, not asking datastore for more\n");
-#endif
+ if (NULL != (cont = pr->llc_cont))
+ {
+ pr->llc_cont = NULL;
+ cont (pr->llc_cont_cls,
+ pr,
+ pr->local_result);
+ }
+ return;
}
if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1116,8 +1189,6 @@ process_local_reply (void *cls,
gettext_noop ("# processing result set cut short due to load"),
1,
GNUNET_NO);
- GNUNET_DATASTORE_cancel (pr->qe);
- pr->qe = NULL;
if (NULL != (cont = pr->llc_cont))
{
pr->llc_cont = NULL;
@@ -1127,7 +1198,22 @@ process_local_reply (void *cls,
}
return;
}
- GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+ pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ pr->local_result_offset++,
+ &pr->public_data.query,
+ pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
+ ? GNUNET_BLOCK_TYPE_ANY
+ : pr->public_data.type,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* queue priority */,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* max queue size */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_local_reply,
+ pr);
+ GNUNET_assert (NULL != pr->qe);
}
@@ -1147,20 +1233,21 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
GNUNET_assert (NULL == pr->llc_cont);
pr->llc_cont = cont;
pr->llc_cont_cls = cont_cls;
- pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh,
- &pr->public_data.query,
- pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
- ? GNUNET_BLOCK_TYPE_ANY
- : pr->public_data.type,
- (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
- ? UINT_MAX
- : 1 /* queue priority */,
- (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
- ? UINT_MAX
- : 1 /* max queue size */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_local_reply,
- pr);
+ pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ pr->local_result_offset++,
+ &pr->public_data.query,
+ pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
+ ? GNUNET_BLOCK_TYPE_ANY
+ : pr->public_data.type,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* queue priority */,
+ (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
+ ? UINT_MAX
+ : 1 /* max queue size */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_local_reply,
+ pr);
}
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index 121a90bcd..b15207ce8 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -35,25 +35,50 @@
/**
- * Request to datastore for DHT PUTs (or NULL).
+ * Context for each zero-anonymity iterator.
*/
-static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+struct PutOperator
+{
-/**
- * Type we will request for the next DHT PUT round from the datastore.
- */
-static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+ /**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+ struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+ /**
+ * Type we request from the datastore.
+ */
+ enum GNUNET_BLOCK_Type dht_put_type;
+
+ /**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
+ /**
+ * How many entires with zero anonymity of our type do we currently
+ * estimate to have in the database?
+ */
+ uint64_t zero_anonymity_count_estimate;
+
+ /**
+ * Current offset when iterating the database.
+ */
+ uint64_t current_offset;
+};
-/**
- * ID of task that collects blocks for DHT PUTs.
- */
-static GNUNET_SCHEDULER_TaskIdentifier dht_task;
/**
- * How many entires with zero anonymity do we currently estimate
- * to have in the database?
+ * ANY-terminated list of our operators (one per type
+ * of block that we're putting into the DHT).
*/
-static unsigned int zero_anonymity_count_estimate;
+static struct PutOperator operators[] =
+ {
+ { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 },
+ { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 },
+ { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 },
+ { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
+ };
/**
@@ -67,26 +92,26 @@ gather_dht_put_blocks (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
-
/**
- * If the DHT PUT gathering task is not currently running, consider
- * (re)scheduling it with the appropriate delay.
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
*/
static void
-consider_dht_put_gathering (void *cls)
+delay_dht_put_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct PutOperator *po = cls;
struct GNUNET_TIME_Relative delay;
- if (GSF_dsh == NULL)
- return;
- if (dht_qe != NULL)
+ po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- if (dht_task != GNUNET_SCHEDULER_NO_TASK)
- return;
- if (zero_anonymity_count_estimate > 0)
+ if (po->zero_anonymity_count_estimate > 0)
{
delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
- zero_anonymity_count_estimate);
+ po->zero_anonymity_count_estimate);
delay = GNUNET_TIME_relative_min (delay,
MAX_DHT_PUT_FREQ);
}
@@ -96,20 +121,9 @@ consider_dht_put_gathering (void *cls)
(hopefully) appear */
delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
}
- dht_task = GNUNET_SCHEDULER_add_delayed (delay,
- &gather_dht_put_blocks,
- cls);
-}
-
-
-/**
- * Function called upon completion of the DHT PUT operation.
- */
-static void
-dht_put_continuation (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+ po->dht_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &gather_dht_put_blocks,
+ po);
}
@@ -138,31 +152,19 @@ process_dht_put_content (void *cls,
struct GNUNET_TIME_Absolute
expiration, uint64_t uid)
{
- static unsigned int counter;
- static GNUNET_HashCode last_vhash;
- static GNUNET_HashCode vhash;
+ struct PutOperator *po = cls;
+ po->dht_qe = NULL;
if (key == NULL)
{
- dht_qe = NULL;
- consider_dht_put_gathering (cls);
+ po->zero_anonymity_count_estimate = po->current_offset - 1;
+ po->current_offset = 0;
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
+ po);
return;
}
- /* slightly funky code to estimate the total number of values with zero
- anonymity from the maximum observed length of a monotonically increasing
- sequence of hashes over the contents */
- GNUNET_CRYPTO_hash (data, size, &vhash);
- if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
- {
- if (zero_anonymity_count_estimate > 0)
- zero_anonymity_count_estimate /= 2;
- counter = 0;
- }
- last_vhash = vhash;
- if (counter < 31)
- counter++;
- if (zero_anonymity_count_estimate < (1 << counter))
- zero_anonymity_count_estimate = (1 << counter);
+ po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset,
+ po->zero_anonymity_count_estimate);
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Retrieved block `%s' of type %u for DHT PUT\n",
@@ -178,8 +180,8 @@ process_dht_put_content (void *cls,
data,
expiration,
GNUNET_TIME_UNIT_FOREVER_REL,
- &dht_put_continuation,
- cls);
+ &delay_dht_put_blocks,
+ po);
}
@@ -193,17 +195,20 @@ static void
gather_dht_put_blocks (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (GSF_dsh == NULL)
+ struct PutOperator *po = cls;
+
+ po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
- dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
- dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh,
+ po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
+ po->current_offset++,
0, UINT_MAX,
GNUNET_TIME_UNIT_FOREVER_REL,
- dht_put_type++,
- &process_dht_put_content, NULL);
- GNUNET_assert (dht_qe != NULL);
+ po->dht_put_type,
+ &process_dht_put_content, po);
+ if (NULL == po->dht_qe)
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
+ po);
}
@@ -213,7 +218,14 @@ gather_dht_put_blocks (void *cls,
void
GSF_put_init_ ()
{
- dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL);
+ unsigned int i;
+
+ i = 0;
+ while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+ {
+ operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
+ i++;
+ }
}
@@ -223,15 +235,23 @@ GSF_put_init_ ()
void
GSF_put_done_ ()
{
- if (GNUNET_SCHEDULER_NO_TASK != dht_task)
- {
- GNUNET_SCHEDULER_cancel (dht_task);
- dht_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != dht_qe)
+ struct PutOperator *po;
+ unsigned int i;
+
+ i = 0;
+ while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
{
- GNUNET_DATASTORE_cancel (dht_qe);
- dht_qe = NULL;
+ if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
+ {
+ GNUNET_SCHEDULER_cancel (po->dht_task);
+ po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != po->dht_qe)
+ {
+ GNUNET_DATASTORE_cancel (po->dht_qe);
+ po->dht_qe = NULL;
+ }
+ i++;
}
}
diff --git a/src/fs/test_fs_download_data.conf b/src/fs/test_fs_download_data.conf
index 0a7eb311a..6bbae9dc9 100644
--- a/src/fs/test_fs_download_data.conf
+++ b/src/fs/test_fs_download_data.conf
@@ -36,7 +36,8 @@ HOSTNAME = localhost
[fs]
PORT = 42471
HOSTNAME = localhost
-ACTIVEMIGRATION = NO
+CONTENT_CACHING = NO
+CONTENT_PUSHING = NO
# DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-fs
diff --git a/src/fs/test_gnunet_fs_idx.py.in b/src/fs/test_gnunet_fs_idx.py.in
index 3bb3681c6..c97ffd883 100755
--- a/src/fs/test_gnunet_fs_idx.py.in
+++ b/src/fs/test_gnunet_fs_idx.py.in
@@ -31,7 +31,7 @@ try:
pub.expect ("URI is `gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147'.\r")
pub.expect (pexpect.EOF)
- down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o \"COPYING\" gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147')
+ down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o COPYING gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147')
down.expect (re.compile ("Downloading `COPYING\' done \(.*\).\r"));
down.expect (pexpect.EOF);
os.system ('rm COPYING');
diff --git a/src/fs/test_gnunet_fs_ns_data.conf b/src/fs/test_gnunet_fs_ns_data.conf
index 65bac0a15..2086cd0fd 100644
--- a/src/fs/test_gnunet_fs_ns_data.conf
+++ b/src/fs/test_gnunet_fs_ns_data.conf
@@ -36,7 +36,7 @@ HOSTNAME = localhost
[fs]
PORT = 47471
HOSTNAME = localhost
-#DEBUG = YES
+DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-fs
diff --git a/src/fs/test_gnunet_service_fs_migration_data.conf b/src/fs/test_gnunet_service_fs_migration_data.conf
index a72a98e97..3ab61d76c 100644
--- a/src/fs/test_gnunet_service_fs_migration_data.conf
+++ b/src/fs/test_gnunet_service_fs_migration_data.conf
@@ -53,7 +53,7 @@ HOSTNAME = localhost
ACTIVEMIGRATION = YES
CONTENT_CACHING = YES
CONTENT_PUSHING = YES
-DEBUG = YES
+#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#PREFIX = xterm -e gdb -x cmd --args
diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h
index a5c548146..4d717996d 100644
--- a/src/include/gnunet_datastore_plugin.h
+++ b/src/include/gnunet_datastore_plugin.h
@@ -78,26 +78,9 @@ struct GNUNET_DATASTORE_PluginEnvironment
/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- * to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- * should terminate the iteration early
- * (iterator should be still called once more
- * to signal the end of the iteration).
- */
-typedef void (*PluginNextRequest)(void *next_cls,
- int end_it);
-
-
-/**
- * An iterator over a set of items stored in the datastore.
+ * An processor over a set of items stored in the datastore.
*
* @param cls closure
- * @param next_cls closure to pass to the "next" function.
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
@@ -105,24 +88,21 @@ typedef void (*PluginNextRequest)(void *next_cls,
* @param priority priority of the content
* @param anonymity anonymity-level for the content
* @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- * maybe 0 if no unique identifier is available
+ * @param uid unique identifier for the datum
*
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
- * (continue on call to "next", of course),
- * GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK to keep the item
+ * GNUNET_NO to delete the item
*/
-typedef int (*PluginIterator) (void *cls,
- void *next_cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration,
- uint64_t uid);
+typedef int (*PluginDatumProcessor) (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration,
+ uint64_t uid);
/**
* Get an estimate of how much space the database is
@@ -131,7 +111,7 @@ typedef int (*PluginIterator) (void *cls,
* @param cls closure
* @return number of bytes used on disk
*/
-typedef unsigned long long (*PluginGetSize) (void *cls);
+typedef unsigned long long (*PluginEstimateSize) (void *cls);
/**
@@ -165,10 +145,11 @@ typedef int (*PluginPut) (void *cls,
/**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
*
* @param cls closure
+ * @param offset offset of the result (mod #num-results);
+ * specific ordering does not matter for the offset
* @param key key to match, never NULL
* @param vhash hash of the value, maybe NULL (to
* match all values that have the right key).
@@ -177,34 +158,31 @@ typedef int (*PluginPut) (void *cls,
* there may be!
* @param type entries of which type are relevant?
* Use 0 for any type.
- * @param iter function to call on each matching value; however,
- * after the first call to "iter", the plugin must wait
- * until "NextRequest" was called before giving the iterator
- * the next item; finally, the "iter" should be called once
- * once with a NULL value at the end ("next_cls" should be NULL
- * for that last call)
- * @param iter_cls closure for iter
+ * @param proc function to call on the matching value;
+ * proc should be called with NULL if there is no result
+ * @param proc_cls closure for proc
*/
-typedef void (*PluginGet) (void *cls,
- const GNUNET_HashCode *key,
- const GNUNET_HashCode *vhash,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter, void *iter_cls);
+typedef void (*PluginGetKey) (void *cls,
+ uint64_t offset,
+ const GNUNET_HashCode *key,
+ const GNUNET_HashCode *vhash,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc, void *proc_cls);
/**
* Get a random item (additional constraints may apply depending on
- * the specific implementation). Calls 'iter' with all values ZERO or
- * NULL if no item applies, otherwise 'iter' is called once and only
+ * the specific implementation). Calls 'proc' with all values ZERO or
+ * NULL if no item applies, otherwise 'proc' is called once and only
* once with an item, with the 'next_cls' argument being NULL.
*
* @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
*/
-typedef void (*PluginRandomGet) (void *cls,
- PluginIterator iter, void *iter_cls);
+typedef void (*PluginGetRandom) (void *cls,
+ PluginDatumProcessor proc, void *proc_cls);
/**
@@ -238,26 +216,22 @@ typedef int (*PluginUpdate) (void *cls,
/**
- * Select a subset of the items in the datastore and call the given
- * iterator for the first item; then allow getting more items by
- * calling the 'next_request' callback with the given 'next_cls'
- * argument passed to 'iter'.
+ * Select a single item from the datastore at the specified offset
+ * (among those applicable).
*
* @param cls closure
+ * @param offset offset of the result (mod #num-results);
+ * specific ordering does not matter for the offset
* @param type entries of which type should be considered?
- * Myst not be zero (ANY).
- * @param iter function to call on each matching value; however,
- * after the first call to "iter", the plugin must wait
- * until "NextRequest" was called before giving the iterator
- * the next item; finally, the "iter" should be called once
- * once with a NULL value at the end ("next_cls" should be NULL
- * for that last call)
- * @param iter_cls closure for iter
+ * Must not be zero (ANY).
+ * @param proc function to call on the matching value
+ * @param proc_cls closure for proc
*/
-typedef void (*PluginSelector) (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls);
+typedef void (*PluginGetType) (void *cls,
+ uint64_t offset,
+ enum GNUNET_BLOCK_Type type,
+ PluginDatumProcessor proc,
+ void *proc_cls);
/**
@@ -283,10 +257,10 @@ struct GNUNET_DATASTORE_PluginFunctions
void *cls;
/**
- * Get the current on-disk size of the SQ store. Estimates are
- * fine, if that's the only thing available.
+ * Calculate the current on-disk size of the SQ store. Estimates
+ * are fine, if that's the only thing available.
*/
- PluginGetSize get_size;
+ PluginEstimateSize estimate_size;
/**
* Function to store an item in the datastore.
@@ -304,23 +278,14 @@ struct GNUNET_DATASTORE_PluginFunctions
PluginUpdate update;
/**
- * Function called by iterators whenever they want the next value;
- * note that unlike all of the other callbacks, this one does get a
- * the "next_cls" closure which is usually different from the "cls"
- * member of this struct!
- */
- PluginNextRequest next_request;
-
- /**
- * Function to iterate over the results for a particular key
- * in the datastore.
+ * Get a particular datum matching a given hash from the datastore.
*/
- PluginGet get;
+ PluginGetKey get_key;
/**
- * Iterate over content with anonymity level zero.
+ * Get datum (of the specified type) with anonymity level zero.
*/
- PluginSelector iter_zero_anonymity;
+ PluginGetType get_zero_anonymity;
/**
* Function to get a random item with high replication score from
@@ -329,13 +294,13 @@ struct GNUNET_DATASTORE_PluginFunctions
* counters. The item's replication counter is decremented by one
* IF it was positive before.
*/
- PluginRandomGet replication_get;
+ PluginGetRandom get_replication;
/**
* Function to get a random expired item or, if none are expired, one
* with a low priority.
*/
- PluginRandomGet expiration_get;
+ PluginGetRandom get_expiration;
/**
* Delete the database. The next operation is
diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h
index 53d04e517..c563e5cc9 100644
--- a/src/include/gnunet_datastore_service.h
+++ b/src/include/gnunet_datastore_service.h
@@ -262,7 +262,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
/**
- * An iterator over a set of items stored in the datastore.
+ * Process a datum that was stored in the datastore.
*
* @param cls closure
* @param key key for the content
@@ -275,87 +275,79 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
*/
-typedef void (*GNUNET_DATASTORE_Iterator) (void *cls,
- const GNUNET_HashCode * key,
- size_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid);
+typedef void (*GNUNET_DATASTORE_DatumProcessor) (void *cls,
+ const GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid);
/**
- * Iterate over the results for a particular key
- * in the datastore. The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
+ * Get a result for a particular key from the datastore. The processor
+ * will only be called once.
*
* @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ * a random 64-bit value initially; then increment by
+ * one each time; detect that all results have been found by uid
+ * being again the first uid ever returned.
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
* will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
- const GNUNET_HashCode * key,
- enum GNUNET_BLOCK_Type type,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls);
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
+ uint64_t offset,
+ const GNUNET_HashCode * key,
+ enum GNUNET_BLOCK_Type type,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls);
/**
- * Get all zero-anonymity values from the datastore.
+ * Get a single zero-anonymity value from the datastore.
*
* @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ * a random 64-bit value initially; then increment by
+ * one each time; detect that all results have been found by uid
+ * being again the first uid ever returned.
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
* @param type allowed type for the operation (never zero)
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
* will be called once with a value (if available)
- * and always once with a value of NULL at the end.
- * @param iter_cls closure for iter
+ * or with NULL if none value exists.
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- enum GNUNET_BLOCK_Type type,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls);
-
-
-/**
- * Function called to trigger obtaining the next result
- * from the datastore. ONLY applies for 'GNUNET_DATASTORE_iterate_*'
- * calls, not for 'get' calls. FIXME: how much mixing of iterate
- * calls with other operations can we permit!? Should we pass
- * the 'QueueEntry' instead of the datastore handle here instead?
- *
- * @param h handle to the datastore
- */
-void
-GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h);
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+ uint64_t offset,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_BLOCK_Type type,
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls);
/**
@@ -370,21 +362,20 @@ GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h);
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
* @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
* will be called once with a value (if available)
* and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
* @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
+ * cancel
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
unsigned int queue_priority,
unsigned int max_queue_size,
struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_Iterator iter,
- void *iter_cls);
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls);