aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
commit3140154d46212e08e0d73ed891a66213a6813075 (patch)
tree018a55a3899207664b388fcf47a679ca54ca6fbf /src/datastore
parentd5fd881c2a044474b54ddf03b6ab8be8d2b75927 (diff)
downloadgnunet-3140154d46212e08e0d73ed891a66213a6813075.tar.gz
gnunet-3140154d46212e08e0d73ed891a66213a6813075.zip
refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql backend
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/datastore.h33
-rw-r--r--src/datastore/datastore_api.c1213
-rw-r--r--src/datastore/gnunet-datastore.c6
-rw-r--r--src/datastore/gnunet-service-datastore.c111
-rw-r--r--src/datastore/perf_datastore_api.c5
-rw-r--r--src/datastore/plugin_datastore_mysql.c454
-rw-r--r--src/datastore/test_datastore_api.c41
-rw-r--r--src/datastore/test_datastore_api_management.c39
-rw-r--r--src/datastore/test_plugin_datastore.c48
-rw-r--r--src/datastore/test_plugin_datastore_data_mysql.conf3
10 files changed, 1058 insertions, 895 deletions
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h
index 5767ae6a0..dc3d9d1f2 100644
--- a/src/datastore/datastore.h
+++ b/src/datastore/datastore.h
@@ -106,12 +106,10 @@ struct ReleaseReserveMessage
106 * Message to the datastore service asking about specific 106 * Message to the datastore service asking about specific
107 * content. 107 * content.
108 */ 108 */
109struct GetMessage 109struct GetKeyMessage
110{ 110{
111 /** 111 /**
112 * Type is GNUNET_MESSAGE_TYPE_DATASTORE_GET. Size 112 * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY.
113 * can either be "sizeof(struct GetMessage)" or
114 * "sizeof(struct GetMessage) - sizeof(struct GNUNET_HashCode)"!
115 */ 113 */
116 struct GNUNET_MessageHeader header; 114 struct GNUNET_MessageHeader header;
117 115
@@ -126,8 +124,7 @@ struct GetMessage
126 uint64_t offset GNUNET_PACKED; 124 uint64_t offset GNUNET_PACKED;
127 125
128 /** 126 /**
129 * Desired key (optional). Check the "size" of the 127 * Desired key.
130 * header to see if the key is actually present.
131 */ 128 */
132 struct GNUNET_HashCode key; 129 struct GNUNET_HashCode key;
133 130
@@ -135,6 +132,30 @@ struct GetMessage
135 132
136 133
137/** 134/**
135 * Message to the datastore service asking about specific
136 * content.
137 */
138struct GetMessage
139{
140 /**
141 * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET.
142 */
143 struct GNUNET_MessageHeader header;
144
145 /**
146 * Desired content type. (actually an enum GNUNET_BLOCK_Type)
147 */
148 uint32_t type GNUNET_PACKED;
149
150 /**
151 * Offset of the result.
152 */
153 uint64_t offset GNUNET_PACKED;
154
155};
156
157
158/**
138 * Message to the datastore service asking about zero 159 * Message to the datastore service asking about zero
139 * anonymity content. 160 * anonymity content.
140 */ 161 */
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index b2de3d35d..285634759 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -21,7 +21,7 @@
21/** 21/**
22 * @file datastore/datastore_api.c 22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements 23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests (with timeouts). 24 * a priority queue for requests
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27#include "platform.h" 27#include "platform.h"
@@ -95,7 +95,6 @@ union QueueContext
95}; 95};
96 96
97 97
98
99/** 98/**
100 * Entry in our priority queue. 99 * Entry in our priority queue.
101 */ 100 */
@@ -118,13 +117,6 @@ struct GNUNET_DATASTORE_QueueEntry
118 struct GNUNET_DATASTORE_Handle *h; 117 struct GNUNET_DATASTORE_Handle *h;
119 118
120 /** 119 /**
121 * Response processor (NULL if we are not waiting for a response).
122 * This struct should be used for the closure, function-specific
123 * arguments can be passed via 'qc'.
124 */
125 GNUNET_CLIENT_MessageHandler response_proc;
126
127 /**
128 * Function to call after transmission of the request. 120 * Function to call after transmission of the request.
129 */ 121 */
130 GNUNET_DATASTORE_ContinuationWithStatus cont; 122 GNUNET_DATASTORE_ContinuationWithStatus cont;
@@ -140,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry
140 union QueueContext qc; 132 union QueueContext qc;
141 133
142 /** 134 /**
143 * Task for timeout signalling. 135 * Envelope of the request to transmit, NULL after
136 * transmission.
144 */ 137 */
145 struct GNUNET_SCHEDULER_Task *task; 138 struct GNUNET_MQ_Envelope *env;
146
147 /**
148 * Timeout for the current operation.
149 */
150 struct GNUNET_TIME_Absolute timeout;
151 139
152 /** 140 /**
153 * Priority in the queue. 141 * Priority in the queue.
@@ -161,22 +149,13 @@ struct GNUNET_DATASTORE_QueueEntry
161 unsigned int max_queue; 149 unsigned int max_queue;
162 150
163 /** 151 /**
164 * Number of bytes in the request message following 152 * Expected response type.
165 * this struct. 32-bit value for nicer memory
166 * access (and overall struct alignment).
167 */ 153 */
168 uint32_t message_size; 154 uint16_t response_type;
169
170 /**
171 * Has this message been transmitted to the service?
172 * Only ever #GNUNET_YES for the head of the queue.
173 * Note that the overall struct should end at a
174 * multiple of 64 bits.
175 */
176 int was_transmitted;
177 155
178}; 156};
179 157
158
180/** 159/**
181 * Handle to the datastore service. 160 * Handle to the datastore service.
182 */ 161 */
@@ -191,7 +170,7 @@ struct GNUNET_DATASTORE_Handle
191 /** 170 /**
192 * Current connection to the datastore service. 171 * Current connection to the datastore service.
193 */ 172 */
194 struct GNUNET_CLIENT_Connection *client; 173 struct GNUNET_MQ_Handle *mq;
195 174
196 /** 175 /**
197 * Handle for statistics. 176 * Handle for statistics.
@@ -199,11 +178,6 @@ struct GNUNET_DATASTORE_Handle
199 struct GNUNET_STATISTICS_Handle *stats; 178 struct GNUNET_STATISTICS_Handle *stats;
200 179
201 /** 180 /**
202 * Current transmit handle.
203 */
204 struct GNUNET_CLIENT_TransmitHandle *th;
205
206 /**
207 * Current head of priority queue. 181 * Current head of priority queue.
208 */ 182 */
209 struct GNUNET_DATASTORE_QueueEntry *queue_head; 183 struct GNUNET_DATASTORE_QueueEntry *queue_head;
@@ -216,7 +190,7 @@ struct GNUNET_DATASTORE_Handle
216 /** 190 /**
217 * Task for trying to reconnect. 191 * Task for trying to reconnect.
218 */ 192 */
219 struct GNUNET_SCHEDULER_Task * reconnect_task; 193 struct GNUNET_SCHEDULER_Task *reconnect_task;
220 194
221 /** 195 /**
222 * How quickly should we retry? Used for exponential back-off on 196 * How quickly should we retry? Used for exponential back-off on
@@ -237,11 +211,6 @@ struct GNUNET_DATASTORE_Handle
237 unsigned int result_count; 211 unsigned int result_count;
238 212
239 /** 213 /**
240 * Are we currently trying to receive from the service?
241 */
242 int in_receive;
243
244 /**
245 * We should ignore the next message(s) from the service. 214 * We should ignore the next message(s) from the service.
246 */ 215 */
247 unsigned int skip_next_messages; 216 unsigned int skip_next_messages;
@@ -250,6 +219,110 @@ struct GNUNET_DATASTORE_Handle
250 219
251 220
252/** 221/**
222 * Try reconnecting to the datastore service.
223 *
224 * @param cls the `struct GNUNET_DATASTORE_Handle`
225 */
226static void
227try_reconnect (void *cls);
228
229
230/**
231 * Disconnect from the service and then try reconnecting to the datastore service
232 * after some delay.
233 *
234 * @param h handle to datastore to disconnect and reconnect
235 */
236static void
237do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238{
239 if (NULL == h->mq)
240 {
241 GNUNET_break (0);
242 return;
243 }
244 GNUNET_MQ_destroy (h->mq);
245 h->mq = NULL;
246 h->skip_next_messages = 0;
247 h->reconnect_task
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249 &try_reconnect,
250 h);
251}
252
253
254/**
255 * Free a queue entry. Removes the given entry from the
256 * queue and releases associated resources. Does NOT
257 * call the callback.
258 *
259 * @param qe entry to free.
260 */
261static void
262free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263{
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
267 h->queue_tail,
268 qe);
269 h->queue_size--;
270 GNUNET_free (qe);
271}
272
273
274/**
275 * Handle error in sending drop request to datastore.
276 *
277 * @param cls closure with the datastore handle
278 * @param error error code
279 */
280static void
281mq_error_handler (void *cls,
282 enum GNUNET_MQ_Error error)
283{
284 struct GNUNET_DATASTORE_Handle *h = cls;
285 struct GNUNET_DATASTORE_QueueEntry *qe;
286
287 LOG (GNUNET_ERROR_TYPE_DEBUG,
288 "MQ error, reconnecting to DATASTORE\n");
289 do_disconnect (h);
290 qe = h->queue_head;
291 if ( (NULL != qe) &&
292 (NULL == qe->env) )
293 {
294 union QueueContext qc = qe->qc;
295 uint16_t rt = qe->response_type;
296
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Failed to receive response from database.\n");
299 free_queue_entry (qe);
300 switch (rt)
301 {
302 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
303 if (NULL != qc.sc.cont)
304 qc.sc.cont (qc.sc.cont_cls,
305 GNUNET_SYSERR,
306 GNUNET_TIME_UNIT_ZERO_ABS,
307 _("DATASTORE disconnected"));
308 break;
309 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
310 if (NULL != qc.rc.proc)
311 qc.rc.proc (qc.rc.proc_cls,
312 NULL,
313 0,
314 NULL, 0, 0, 0,
315 GNUNET_TIME_UNIT_ZERO_ABS,
316 0);
317 break;
318 default:
319 GNUNET_break (0);
320 }
321 }
322}
323
324
325/**
253 * Connect to the datastore service. 326 * Connect to the datastore service.
254 * 327 *
255 * @param cfg configuration to use 328 * @param cfg configuration to use
@@ -258,22 +331,27 @@ struct GNUNET_DATASTORE_Handle
258struct GNUNET_DATASTORE_Handle * 331struct GNUNET_DATASTORE_Handle *
259GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 332GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
260{ 333{
261 struct GNUNET_CLIENT_Connection *c;
262 struct GNUNET_DATASTORE_Handle *h; 334 struct GNUNET_DATASTORE_Handle *h;
263 335
264 c = GNUNET_CLIENT_connect ("datastore", cfg); 336 LOG (GNUNET_ERROR_TYPE_DEBUG,
265 if (c == NULL) 337 "Establishing DATASTORE connection!\n");
266 return NULL; /* oops */
267 h = GNUNET_new (struct GNUNET_DATASTORE_Handle); 338 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
268 h->client = c;
269 h->cfg = cfg; 339 h->cfg = cfg;
270 h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); 340 try_reconnect (h);
341 if (NULL == h->mq)
342 {
343 GNUNET_free (h);
344 return NULL;
345 }
346 h->stats = GNUNET_STATISTICS_create ("datastore-api",
347 cfg);
271 return h; 348 return h;
272} 349}
273 350
274 351
275/** 352/**
276 * Task used by 'transmit_drop' to disconnect the datastore. 353 * Task used by to disconnect from the datastore after
354 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
277 * 355 *
278 * @param cls the datastore handle 356 * @param cls the datastore handle
279 */ 357 */
@@ -282,39 +360,29 @@ disconnect_after_drop (void *cls)
282{ 360{
283 struct GNUNET_DATASTORE_Handle *h = cls; 361 struct GNUNET_DATASTORE_Handle *h = cls;
284 362
363 LOG (GNUNET_ERROR_TYPE_DEBUG,
364 "Drop sent, disconnecting\n");
285 GNUNET_DATASTORE_disconnect (h, 365 GNUNET_DATASTORE_disconnect (h,
286 GNUNET_NO); 366 GNUNET_NO);
287} 367}
288 368
289 369
290/** 370/**
291 * Transmit DROP message to datastore service. 371 * Handle error in sending drop request to datastore.
292 * 372 *
293 * @param cls the `struct GNUNET_DATASTORE_Handle` 373 * @param cls closure with the datastore handle
294 * @param size number of bytes that can be copied to @a buf 374 * @param error error code
295 * @param buf where to copy the drop message
296 * @return number of bytes written to @a buf
297 */ 375 */
298static size_t 376static void
299transmit_drop (void *cls, size_t size, void *buf) 377disconnect_on_mq_error (void *cls,
378 enum GNUNET_MQ_Error error)
300{ 379{
301 struct GNUNET_DATASTORE_Handle *h = cls; 380 struct GNUNET_DATASTORE_Handle *h = cls;
302 struct GNUNET_MessageHeader *hdr;
303 381
304 if (buf == NULL) 382 LOG (GNUNET_ERROR_TYPE_ERROR,
305 { 383 "Failed to ask datastore to drop tables\n");
306 LOG (GNUNET_ERROR_TYPE_WARNING, 384 GNUNET_DATASTORE_disconnect (h,
307 _("Failed to transmit request to drop database.\n")); 385 GNUNET_NO);
308 GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
309 return 0;
310 }
311 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
312 hdr = buf;
313 hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
314 hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
315 GNUNET_SCHEDULER_add_now (&disconnect_after_drop,
316 h);
317 return sizeof (struct GNUNET_MessageHeader);
318} 386}
319 387
320 388
@@ -333,15 +401,10 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
333 401
334 LOG (GNUNET_ERROR_TYPE_DEBUG, 402 LOG (GNUNET_ERROR_TYPE_DEBUG,
335 "Datastore disconnect\n"); 403 "Datastore disconnect\n");
336 if (NULL != h->th) 404 if (NULL != h->mq)
337 {
338 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
339 h->th = NULL;
340 }
341 if (NULL != h->client)
342 { 405 {
343 GNUNET_CLIENT_disconnect (h->client); 406 GNUNET_MQ_destroy (h->mq);
344 h->client = NULL; 407 h->mq = NULL;
345 } 408 }
346 if (NULL != h->reconnect_task) 409 if (NULL != h->reconnect_task)
347 { 410 {
@@ -350,25 +413,52 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
350 } 413 }
351 while (NULL != (qe = h->queue_head)) 414 while (NULL != (qe = h->queue_head))
352 { 415 {
353 GNUNET_assert (NULL != qe->response_proc); 416 switch (qe->response_type)
354 qe->response_proc (h, NULL); 417 {
418 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
419 if (NULL != qe->qc.sc.cont)
420 qe->qc.sc.cont (qe->qc.sc.cont_cls,
421 GNUNET_SYSERR,
422 GNUNET_TIME_UNIT_ZERO_ABS,
423 _("Disconnected from DATASTORE"));
424 break;
425 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
426 if (NULL != qe->qc.rc.proc)
427 qe->qc.rc.proc (qe->qc.rc.proc_cls,
428 NULL,
429 0,
430 NULL, 0, 0, 0,
431 GNUNET_TIME_UNIT_ZERO_ABS,
432 0);
433 break;
434 default:
435 GNUNET_break (0);
436 }
437 free_queue_entry (qe);
355 } 438 }
356 if (GNUNET_YES == drop) 439 if (GNUNET_YES == drop)
357 { 440 {
358 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); 441 LOG (GNUNET_ERROR_TYPE_DEBUG,
359 if (NULL != h->client) 442 "Re-connecting to issue DROP!\n");
443 GNUNET_assert (NULL == h->mq);
444 h->mq = GNUNET_CLIENT_connecT (h->cfg,
445 "datastore",
446 NULL,
447 &disconnect_on_mq_error,
448 h);
449 if (NULL != h->mq)
360 { 450 {
361 if (NULL != 451 struct GNUNET_MessageHeader *hdr;
362 GNUNET_CLIENT_notify_transmit_ready (h->client, 452 struct GNUNET_MQ_Envelope *env;
363 sizeof (struct 453
364 GNUNET_MessageHeader), 454 env = GNUNET_MQ_msg (hdr,
365 GNUNET_TIME_UNIT_SECONDS, 455 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
366 GNUNET_YES, 456 GNUNET_MQ_notify_sent (env,
367 &transmit_drop, 457 &disconnect_after_drop,
368 h)) 458 h);
369 return; 459 GNUNET_MQ_send (h->mq,
370 GNUNET_CLIENT_disconnect (h->client); 460 env);
371 h->client = NULL; 461 return;
372 } 462 }
373 GNUNET_break (0); 463 GNUNET_break (0);
374 } 464 }
@@ -380,67 +470,37 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
380 470
381 471
382/** 472/**
383 * A request has timed out (before being transmitted to the service).
384 *
385 * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
386 */
387static void
388timeout_queue_entry (void *cls)
389{
390 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
391 struct GNUNET_DATASTORE_Handle *h = qe->h;
392
393 GNUNET_STATISTICS_update (h->stats,
394 gettext_noop ("# queue entry timeouts"),
395 1,
396 GNUNET_NO);
397 qe->task = NULL;
398 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
399 LOG (GNUNET_ERROR_TYPE_DEBUG,
400 "Timeout of request in datastore queue\n");
401 /* response_proc's expect request at the head of the queue! */
402 GNUNET_CONTAINER_DLL_remove (h->queue_head,
403 h->queue_tail,
404 qe);
405 GNUNET_CONTAINER_DLL_insert (h->queue_head,
406 h->queue_tail,
407 qe);
408 GNUNET_assert (h->queue_head == qe);
409 qe->response_proc (qe->h, NULL);
410}
411
412
413/**
414 * Create a new entry for our priority queue (and possibly discard other entires if 473 * Create a new entry for our priority queue (and possibly discard other entires if
415 * the queue is getting too long). 474 * the queue is getting too long).
416 * 475 *
417 * @param h handle to the datastore 476 * @param h handle to the datastore
418 * @param msize size of the message to queue 477 * @param env envelope with the message to queue
419 * @param queue_priority priority of the entry 478 * @param queue_priority priority of the entry
420 * @param max_queue_size at what queue size should this request be dropped 479 * @param max_queue_size at what queue size should this request be dropped
421 * (if other requests of higher priority are in the queue) 480 * (if other requests of higher priority are in the queue)
422 * @param timeout timeout for the operation 481 * @param expected_type which type of response do we expect,
423 * @param response_proc function to call with replies (can be NULL) 482 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
483 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
424 * @param qc client context (NOT a closure for @a response_proc) 484 * @param qc client context (NOT a closure for @a response_proc)
425 * @return NULL if the queue is full 485 * @return NULL if the queue is full
426 */ 486 */
427static struct GNUNET_DATASTORE_QueueEntry * 487static struct GNUNET_DATASTORE_QueueEntry *
428make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 488make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
429 size_t msize, 489 struct GNUNET_MQ_Envelope *env,
430 unsigned int queue_priority, 490 unsigned int queue_priority,
431 unsigned int max_queue_size, 491 unsigned int max_queue_size,
432 struct GNUNET_TIME_Relative timeout, 492 uint16_t expected_type,
433 GNUNET_CLIENT_MessageHandler response_proc,
434 const union QueueContext *qc) 493 const union QueueContext *qc)
435{ 494{
436 struct GNUNET_DATASTORE_QueueEntry *ret; 495 struct GNUNET_DATASTORE_QueueEntry *qe;
437 struct GNUNET_DATASTORE_QueueEntry *pos; 496 struct GNUNET_DATASTORE_QueueEntry *pos;
438 unsigned int c; 497 unsigned int c;
439 498
440 c = 0; 499 c = 0;
441 pos = h->queue_head; 500 pos = h->queue_head;
442 while ((pos != NULL) && (c < max_queue_size) && 501 while ( (NULL != pos) &&
443 (pos->priority >= queue_priority)) 502 (c < max_queue_size) &&
503 (pos->priority >= queue_priority) )
444 { 504 {
445 c++; 505 c++;
446 pos = pos->next; 506 pos = pos->next;
@@ -451,18 +511,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
451 gettext_noop ("# queue overflows"), 511 gettext_noop ("# queue overflows"),
452 1, 512 1,
453 GNUNET_NO); 513 GNUNET_NO);
514 GNUNET_MQ_discard (env);
454 return NULL; 515 return NULL;
455 } 516 }
456 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); 517 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
457 ret->h = h; 518 qe->h = h;
458 ret->response_proc = response_proc; 519 qe->env = env;
459 ret->qc = *qc; 520 qe->response_type = expected_type;
460 ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); 521 qe->qc = *qc;
461 ret->priority = queue_priority; 522 qe->priority = queue_priority;
462 ret->max_queue = max_queue_size; 523 qe->max_queue = max_queue_size;
463 ret->message_size = msize; 524 if (NULL == pos)
464 ret->was_transmitted = GNUNET_NO;
465 if (pos == NULL)
466 { 525 {
467 /* append at the tail */ 526 /* append at the tail */
468 pos = h->queue_tail; 527 pos = h->queue_tail;
@@ -472,49 +531,23 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
472 pos = pos->prev; 531 pos = pos->prev;
473 /* do not insert at HEAD if HEAD query was already 532 /* do not insert at HEAD if HEAD query was already
474 * transmitted and we are still receiving replies! */ 533 * transmitted and we are still receiving replies! */
475 if ((pos == NULL) && (h->queue_head->was_transmitted)) 534 if ( (NULL == pos) &&
535 (NULL == h->queue_head->env) )
476 pos = h->queue_head; 536 pos = h->queue_head;
477 } 537 }
478 c++; 538 c++;
479#if INSANE_STATISTICS 539#if INSANE_STATISTICS
480 GNUNET_STATISTICS_update (h->stats, 540 GNUNET_STATISTICS_update (h->stats,
481 gettext_noop ("# queue entries created"), 541 gettext_noop ("# queue entries created"),
482 1, GNUNET_NO); 542 1,
543 GNUNET_NO);
483#endif 544#endif
484 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, 545 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
485 h->queue_tail, 546 h->queue_tail,
486 pos, 547 pos,
487 ret); 548 qe);
488 h->queue_size++; 549 h->queue_size++;
489 ret->task = GNUNET_SCHEDULER_add_delayed (timeout, 550 return qe;
490 &timeout_queue_entry,
491 ret);
492 for (pos = ret->next; NULL != pos; pos = pos->next)
493 {
494 if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
495 {
496 GNUNET_assert (NULL != pos->response_proc);
497 /* move 'pos' element to head so that it will be
498 * killed on 'NULL' call below */
499 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "Dropping request from datastore queue\n");
501 /* response_proc's expect request at the head of the queue! */
502 GNUNET_CONTAINER_DLL_remove (h->queue_head,
503 h->queue_tail,
504 pos);
505 GNUNET_CONTAINER_DLL_insert (h->queue_head,
506 h->queue_tail,
507 pos);
508 GNUNET_STATISTICS_update (h->stats,
509 gettext_noop
510 ("# Requests dropped from datastore queue"), 1,
511 GNUNET_NO);
512 GNUNET_assert (h->queue_head == pos);
513 pos->response_proc (h, NULL);
514 break;
515 }
516 }
517 return ret;
518} 551}
519 552
520 553
@@ -525,78 +558,88 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
525 * @param h handle to the datastore 558 * @param h handle to the datastore
526 */ 559 */
527static void 560static void
528process_queue (struct GNUNET_DATASTORE_Handle *h); 561process_queue (struct GNUNET_DATASTORE_Handle *h)
529
530
531/**
532 * Try reconnecting to the datastore service.
533 *
534 * @param cls the `struct GNUNET_DATASTORE_Handle`
535 */
536static void
537try_reconnect (void *cls)
538{ 562{
539 struct GNUNET_DATASTORE_Handle *h = cls; 563 struct GNUNET_DATASTORE_QueueEntry *qe;
540 564
541 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); 565 if (NULL == (qe = h->queue_head))
542 h->reconnect_task = NULL;
543 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
544 if (h->client == NULL)
545 { 566 {
546 LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); 567 /* no entry in queue */
568 LOG (GNUNET_ERROR_TYPE_DEBUG,
569 "Queue empty\n");
547 return; 570 return;
548 } 571 }
549 GNUNET_STATISTICS_update (h->stats, 572 if (NULL == qe->env)
550 gettext_noop 573 {
551 ("# datastore connections (re)created"), 1, 574 /* waiting for replies */
552 GNUNET_NO); 575 LOG (GNUNET_ERROR_TYPE_DEBUG,
553 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); 576 "Head request already transmitted\n");
554 process_queue (h); 577 return;
578 }
579 if (NULL == h->mq)
580 {
581 /* waiting for reconnect */
582 LOG (GNUNET_ERROR_TYPE_DEBUG,
583 "Not connected\n");
584 return;
585 }
586 GNUNET_MQ_send (h->mq,
587 qe->env);
588 qe->env = NULL;
555} 589}
556 590
557 591
592
593
558/** 594/**
559 * Disconnect from the service and then try reconnecting to the datastore service 595 * Function called to check status message from the service.
560 * after some delay.
561 * 596 *
562 * @param h handle to datastore to disconnect and reconnect 597 * @param cls closure
598 * @param sm status message received
599 * @return #GNUNET_OK if the message is well-formed
563 */ 600 */
564static void 601static int
565do_disconnect (struct GNUNET_DATASTORE_Handle *h) 602check_status (void *cls,
603 const struct StatusMessage *sm)
566{ 604{
567 if (NULL == h->client) 605 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
606 int32_t status = ntohl (sm->status);
607
608 if (msize > 0)
568 { 609 {
569 LOG (GNUNET_ERROR_TYPE_DEBUG, 610 const char *emsg = (const char *) &sm[1];
570 "Client NULL in disconnect, will not try to reconnect\n"); 611
571 return; 612 if ('\0' != emsg[msize - 1])
613 {
614 GNUNET_break (0);
615 return GNUNET_SYSERR;
616 }
572 } 617 }
573 GNUNET_CLIENT_disconnect (h->client); 618 else if (GNUNET_SYSERR == status)
574 h->skip_next_messages = 0; 619 {
575 h->client = NULL; 620 GNUNET_break (0);
576 h->reconnect_task = 621 return GNUNET_SYSERR;
577 GNUNET_SCHEDULER_add_delayed (h->retry_time, 622 }
578 &try_reconnect, 623 return GNUNET_OK;
579 h);
580} 624}
581 625
582 626
583/** 627/**
584 * Function called whenever we receive a message from 628 * Function called to handle status message from the service.
585 * the service. Calls the appropriate handler.
586 * 629 *
587 * @param cls the `struct GNUNET_DATASTORE_Handle` 630 * @param cls closure
588 * @param msg the received message 631 * @param sm status message received
589 */ 632 */
590static void 633static void
591receive_cb (void *cls, 634handle_status (void *cls,
592 const struct GNUNET_MessageHeader *msg) 635 const struct StatusMessage *sm)
593{ 636{
594 struct GNUNET_DATASTORE_Handle *h = cls; 637 struct GNUNET_DATASTORE_Handle *h = cls;
595 struct GNUNET_DATASTORE_QueueEntry *qe; 638 struct GNUNET_DATASTORE_QueueEntry *qe;
639 struct StatusContext rc;
640 const char *emsg;
641 int32_t status = ntohl (sm->status);
596 642
597 h->in_receive = GNUNET_NO;
598 LOG (GNUNET_ERROR_TYPE_DEBUG,
599 "Receiving reply from datastore\n");
600 if (h->skip_next_messages > 0) 643 if (h->skip_next_messages > 0)
601 { 644 {
602 h->skip_next_messages--; 645 h->skip_next_messages--;
@@ -606,252 +649,255 @@ receive_cb (void *cls,
606 if (NULL == (qe = h->queue_head)) 649 if (NULL == (qe = h->queue_head))
607 { 650 {
608 GNUNET_break (0); 651 GNUNET_break (0);
609 process_queue (h); 652 do_disconnect (h);
653 return;
654 }
655 if (NULL != qe->env)
656 {
657 GNUNET_break (0);
658 do_disconnect (h);
610 return; 659 return;
611 } 660 }
612 qe->response_proc (h, msg); 661 if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
662 {
663 GNUNET_break (0);
664 do_disconnect (h);
665 return;
666 }
667 rc = qe->qc.sc;
668 free_queue_entry (qe);
669 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
670 emsg = (const char *) &sm[1];
671 else
672 emsg = NULL;
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Received status %d/%s\n",
675 (int) status,
676 emsg);
677 GNUNET_STATISTICS_update (h->stats,
678 gettext_noop ("# status messages received"),
679 1,
680 GNUNET_NO);
681 h->retry_time = GNUNET_TIME_UNIT_ZERO;
682 process_queue (h);
683 if (NULL != rc.cont)
684 rc.cont (rc.cont_cls,
685 status,
686 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
687 emsg);
613} 688}
614 689
615 690
616/** 691/**
617 * Transmit request from queue to datastore service. 692 * Check data message we received from the service.
618 * 693 *
619 * @param cls the `struct GNUNET_DATASTORE_Handle` 694 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
620 * @param size number of bytes that can be copied to @a buf 695 * @param dm message received
621 * @param buf where to copy the drop message
622 * @return number of bytes written to @a buf
623 */ 696 */
624static size_t 697static int
625transmit_request (void *cls, 698check_data (void *cls,
626 size_t size, 699 const struct DataMessage *dm)
627 void *buf)
628{ 700{
629 struct GNUNET_DATASTORE_Handle *h = cls; 701 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
630 struct GNUNET_DATASTORE_QueueEntry *qe;
631 size_t msize;
632 702
633 h->th = NULL; 703 if (msize != ntohl (dm->size))
634 if (NULL == (qe = h->queue_head))
635 return 0; /* no entry in queue */
636 if (NULL == buf)
637 { 704 {
638 LOG (GNUNET_ERROR_TYPE_DEBUG, 705 GNUNET_break (0);
639 "Failed to transmit request to DATASTORE.\n"); 706 return GNUNET_SYSERR;
640 GNUNET_STATISTICS_update (h->stats,
641 gettext_noop ("# transmission request failures"),
642 1, GNUNET_NO);
643 do_disconnect (h);
644 return 0;
645 }
646 if (size < (msize = qe->message_size))
647 {
648 process_queue (h);
649 return 0;
650 } 707 }
651 LOG (GNUNET_ERROR_TYPE_DEBUG, 708 return GNUNET_OK;
652 "Transmitting %u byte request to DATASTORE\n",
653 msize);
654 memcpy (buf, &qe[1], msize);
655 qe->was_transmitted = GNUNET_YES;
656 GNUNET_SCHEDULER_cancel (qe->task);
657 qe->task = NULL;
658 GNUNET_assert (GNUNET_NO == h->in_receive);
659 h->in_receive = GNUNET_YES;
660 GNUNET_CLIENT_receive (h->client,
661 &receive_cb, h,
662 GNUNET_TIME_absolute_get_remaining (qe->timeout));
663#if INSANE_STATISTICS
664 GNUNET_STATISTICS_update (h->stats,
665 gettext_noop ("# bytes sent to datastore"), msize,
666 GNUNET_NO);
667#endif
668 return msize;
669} 709}
670 710
671 711
672/** 712/**
673 * Process entries in the queue (or do nothing if we are already 713 * Handle data message we got from the service.
674 * doing so).
675 * 714 *
676 * @param h handle to the datastore 715 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
716 * @param dm message received
677 */ 717 */
678static void 718static void
679process_queue (struct GNUNET_DATASTORE_Handle *h) 719handle_data (void *cls,
720 const struct DataMessage *dm)
680{ 721{
722 struct GNUNET_DATASTORE_Handle *h = cls;
681 struct GNUNET_DATASTORE_QueueEntry *qe; 723 struct GNUNET_DATASTORE_QueueEntry *qe;
724 struct ResultContext rc;
682 725
683 if (NULL == (qe = h->queue_head)) 726 if (h->skip_next_messages > 0)
684 {
685 /* no entry in queue */
686 LOG (GNUNET_ERROR_TYPE_DEBUG,
687 "Queue empty\n");
688 return;
689 }
690 if (GNUNET_YES == qe->was_transmitted)
691 { 727 {
692 /* waiting for replies */ 728 process_queue (h);
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "Head request already transmitted\n");
695 return; 729 return;
696 } 730 }
697 if (NULL != h->th) 731 qe = h->queue_head;
732 if (NULL == qe)
698 { 733 {
699 /* request pending */ 734 GNUNET_break (0);
700 LOG (GNUNET_ERROR_TYPE_DEBUG, 735 do_disconnect (h);
701 "Pending transmission request\n");
702 return; 736 return;
703 } 737 }
704 if (NULL == h->client) 738 if (NULL != qe->env)
705 { 739 {
706 /* waiting for reconnect */ 740 GNUNET_break (0);
707 LOG (GNUNET_ERROR_TYPE_DEBUG, 741 do_disconnect (h);
708 "Not connected\n");
709 return; 742 return;
710 } 743 }
711 if (GNUNET_YES == h->in_receive) 744 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
712 { 745 {
713 /* wait for response to previous query */ 746 GNUNET_break (0);
747 do_disconnect (h);
714 return; 748 return;
715 } 749 }
750#if INSANE_STATISTICS
751 GNUNET_STATISTICS_update (h->stats,
752 gettext_noop ("# Results received"),
753 1,
754 GNUNET_NO);
755#endif
716 LOG (GNUNET_ERROR_TYPE_DEBUG, 756 LOG (GNUNET_ERROR_TYPE_DEBUG,
717 "Queueing %u byte request to DATASTORE\n", 757 "Received result %llu with type %u and size %u with key %s\n",
718 qe->message_size); 758 (unsigned long long) GNUNET_ntohll (dm->uid),
719 h->th 759 ntohl (dm->type),
720 = GNUNET_CLIENT_notify_transmit_ready (h->client, 760 ntohl (dm->size),
721 qe->message_size, 761 GNUNET_h2s (&dm->key));
722 GNUNET_TIME_absolute_get_remaining (qe->timeout), 762 rc = qe->qc.rc;
723 GNUNET_YES, 763 free_queue_entry (qe);
724 &transmit_request, 764 h->retry_time = GNUNET_TIME_UNIT_ZERO;
725 h); 765 process_queue (h);
726 GNUNET_assert (GNUNET_NO == h->in_receive); 766 if (NULL != rc.proc)
727 GNUNET_break (NULL != h->th); 767 rc.proc (rc.proc_cls,
728} 768 &dm->key,
729 769 ntohl (dm->size),
730 770 &dm[1],
731/** 771 ntohl (dm->type),
732 * Dummy continuation used to do nothing (but be non-zero). 772 ntohl (dm->priority),
733 * 773 ntohl (dm->anonymity),
734 * @param cls closure 774 GNUNET_TIME_absolute_ntoh (dm->expiration),
735 * @param result result 775 GNUNET_ntohll (dm->uid));
736 * @param min_expiration expiration time
737 * @param emsg error message
738 */
739static void
740drop_status_cont (void *cls, int32_t result,
741 struct GNUNET_TIME_Absolute min_expiration,
742 const char *emsg)
743{
744 /* do nothing */
745}
746
747
748/**
749 * Free a queue entry. Removes the given entry from the
750 * queue and releases associated resources. Does NOT
751 * call the callback.
752 *
753 * @param qe entry to free.
754 */
755static void
756free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
757{
758 struct GNUNET_DATASTORE_Handle *h = qe->h;
759
760 GNUNET_CONTAINER_DLL_remove (h->queue_head,
761 h->queue_tail,
762 qe);
763 if (qe->task != NULL)
764 {
765 GNUNET_SCHEDULER_cancel (qe->task);
766 qe->task = NULL;
767 }
768 h->queue_size--;
769 qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
770 GNUNET_free (qe);
771} 776}
772 777
773 778
774/** 779/**
775 * Type of a function to call when we receive a message 780 * Type of a function to call when we receive a
776 * from the service. 781 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
777 * 782 *
778 * @param cls closure 783 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
779 * @param msg message received, NULL on timeout or fatal error 784 * @param msg message received
780 */ 785 */
781static void 786static void
782process_status_message (void *cls, 787handle_data_end (void *cls,
783 const struct GNUNET_MessageHeader *msg) 788 const struct GNUNET_MessageHeader *msg)
784{ 789{
785 struct GNUNET_DATASTORE_Handle *h = cls; 790 struct GNUNET_DATASTORE_Handle *h = cls;
786 struct GNUNET_DATASTORE_QueueEntry *qe; 791 struct GNUNET_DATASTORE_QueueEntry *qe;
787 struct StatusContext rc; 792 struct ResultContext rc;
788 const struct StatusMessage *sm;
789 const char *emsg;
790 int32_t status;
791 int was_transmitted;
792 793
793 if (NULL == (qe = h->queue_head)) 794 if (h->skip_next_messages > 0)
794 { 795 {
795 GNUNET_break (0); 796 h->skip_next_messages--;
796 do_disconnect (h); 797 process_queue (h);
797 return; 798 return;
798 } 799 }
799 rc = qe->qc.sc; 800 qe = h->queue_head;
800 if (NULL == msg) 801 if (NULL == qe)
801 { 802 {
802 was_transmitted = qe->was_transmitted; 803 GNUNET_break (0);
803 free_queue_entry (qe); 804 do_disconnect (h);
804 if (was_transmitted == GNUNET_YES)
805 do_disconnect (h);
806 else
807 process_queue (h);
808 if (NULL != rc.cont)
809 rc.cont (rc.cont_cls, GNUNET_SYSERR,
810 GNUNET_TIME_UNIT_ZERO_ABS,
811 _("Failed to receive status response from database."));
812 return; 805 return;
813 } 806 }
814 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 807 if (NULL != qe->env)
815 free_queue_entry (qe);
816 if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
817 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
818 { 808 {
819 GNUNET_break (0); 809 GNUNET_break (0);
820 h->retry_time = GNUNET_TIME_UNIT_ZERO;
821 do_disconnect (h); 810 do_disconnect (h);
822 if (rc.cont != NULL)
823 rc.cont (rc.cont_cls, GNUNET_SYSERR,
824 GNUNET_TIME_UNIT_ZERO_ABS,
825 _("Error reading response from datastore service"));
826 return; 811 return;
827 } 812 }
828 sm = (const struct StatusMessage *) msg; 813 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
829 status = ntohl (sm->status);
830 emsg = NULL;
831 if (ntohs (msg->size) > sizeof (struct StatusMessage))
832 {
833 emsg = (const char *) &sm[1];
834 if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
835 {
836 GNUNET_break (0);
837 emsg = _("Invalid error message received from datastore service");
838 }
839 }
840 if ((status == GNUNET_SYSERR) && (emsg == NULL))
841 { 814 {
842 GNUNET_break (0); 815 GNUNET_break (0);
843 emsg = _("Invalid error message received from datastore service"); 816 do_disconnect (h);
817 return;
844 } 818 }
845 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); 819 rc = qe->qc.rc;
820 free_queue_entry (qe);
821 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Received end of result set, new queue size is %u\n",
823 h->queue_size);
824 h->retry_time = GNUNET_TIME_UNIT_ZERO;
825 h->result_count = 0;
826 process_queue (h);
827 /* signal end of iteration */
828 if (NULL != rc.proc)
829 rc.proc (rc.proc_cls,
830 NULL,
831 0,
832 NULL,
833 0,
834 0,
835 0,
836 GNUNET_TIME_UNIT_ZERO_ABS,
837 0);
838}
839
840
841/**
842 * Try reconnecting to the datastore service.
843 *
844 * @param cls the `struct GNUNET_DATASTORE_Handle`
845 */
846static void
847try_reconnect (void *cls)
848{
849 GNUNET_MQ_hd_var_size (status,
850 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
851 struct StatusMessage);
852 GNUNET_MQ_hd_var_size (data,
853 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
854 struct DataMessage);
855 GNUNET_MQ_hd_fixed_size (data_end,
856 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
857 struct GNUNET_MessageHeader);
858 struct GNUNET_DATASTORE_Handle *h = cls;
859 struct GNUNET_MQ_MessageHandler handlers[] = {
860 make_status_handler (h),
861 make_data_handler (h),
862 make_data_end_handler (h),
863 GNUNET_MQ_handler_end ()
864 };
865
866 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
867 h->reconnect_task = NULL;
868 GNUNET_assert (NULL == h->mq);
869 h->mq = GNUNET_CLIENT_connecT (h->cfg,
870 "datastore",
871 handlers,
872 &mq_error_handler,
873 h);
874 if (NULL == h->mq)
875 return;
846 GNUNET_STATISTICS_update (h->stats, 876 GNUNET_STATISTICS_update (h->stats,
847 gettext_noop ("# status messages received"), 1, 877 gettext_noop ("# datastore connections (re)created"),
878 1,
848 GNUNET_NO); 879 GNUNET_NO);
849 h->retry_time = GNUNET_TIME_UNIT_ZERO; 880 LOG (GNUNET_ERROR_TYPE_DEBUG,
881 "Reconnected to DATASTORE\n");
850 process_queue (h); 882 process_queue (h);
851 if (rc.cont != NULL) 883}
852 rc.cont (rc.cont_cls, status, 884
853 GNUNET_TIME_absolute_ntoh (sm->min_expiration), 885
854 emsg); 886/**
887 * Dummy continuation used to do nothing (but be non-zero).
888 *
889 * @param cls closure
890 * @param result result
891 * @param min_expiration expiration time
892 * @param emsg error message
893 */
894static void
895drop_status_cont (void *cls,
896 int32_t result,
897 struct GNUNET_TIME_Absolute min_expiration,
898 const char *emsg)
899{
900 /* do nothing */
855} 901}
856 902
857 903
@@ -874,7 +920,6 @@ process_status_message (void *cls,
874 * @param queue_priority ranking of this request in the priority queue 920 * @param queue_priority ranking of this request in the priority queue
875 * @param max_queue_size at what queue size should this request be dropped 921 * @param max_queue_size at what queue size should this request be dropped
876 * (if other requests of higher priority are in the queue) 922 * (if other requests of higher priority are in the queue)
877 * @param timeout timeout for the operation
878 * @param cont continuation to call when done 923 * @param cont continuation to call when done
879 * @param cont_cls closure for @a cont 924 * @param cont_cls closure for @a cont
880 * @return NULL if the entry was not queued, otherwise a handle that can be used to 925 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -894,33 +939,51 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
894 struct GNUNET_TIME_Absolute expiration, 939 struct GNUNET_TIME_Absolute expiration,
895 unsigned int queue_priority, 940 unsigned int queue_priority,
896 unsigned int max_queue_size, 941 unsigned int max_queue_size,
897 struct GNUNET_TIME_Relative timeout,
898 GNUNET_DATASTORE_ContinuationWithStatus cont, 942 GNUNET_DATASTORE_ContinuationWithStatus cont,
899 void *cont_cls) 943 void *cont_cls)
900{ 944{
901 struct GNUNET_DATASTORE_QueueEntry *qe; 945 struct GNUNET_DATASTORE_QueueEntry *qe;
946 struct GNUNET_MQ_Envelope *env;
902 struct DataMessage *dm; 947 struct DataMessage *dm;
903 size_t msize;
904 union QueueContext qc; 948 union QueueContext qc;
905 949
950 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
951 {
952 GNUNET_break (0);
953 return NULL;
954 }
955
906 LOG (GNUNET_ERROR_TYPE_DEBUG, 956 LOG (GNUNET_ERROR_TYPE_DEBUG,
907 "Asked to put %u bytes of data under key `%s' for %s\n", 957 "Asked to put %u bytes of data under key `%s' for %s\n",
908 size, 958 size,
909 GNUNET_h2s (key), 959 GNUNET_h2s (key),
910 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), 960 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
911 GNUNET_YES)); 961 GNUNET_YES));
912 msize = sizeof (struct DataMessage) + size; 962 env = GNUNET_MQ_msg_extra (dm,
913 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 963 size,
964 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
965 dm->rid = htonl (rid);
966 dm->size = htonl ((uint32_t) size);
967 dm->type = htonl (type);
968 dm->priority = htonl (priority);
969 dm->anonymity = htonl (anonymity);
970 dm->replication = htonl (replication);
971 dm->reserved = htonl (0);
972 dm->uid = GNUNET_htonll (0);
973 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
974 dm->key = *key;
975 memcpy (&dm[1],
976 data,
977 size);
914 qc.sc.cont = cont; 978 qc.sc.cont = cont;
915 qc.sc.cont_cls = cont_cls; 979 qc.sc.cont_cls = cont_cls;
916 qe = make_queue_entry (h, 980 qe = make_queue_entry (h,
917 msize, 981 env,
918 queue_priority, 982 queue_priority,
919 max_queue_size, 983 max_queue_size,
920 timeout, 984 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
921 &process_status_message,
922 &qc); 985 &qc);
923 if (qe == NULL) 986 if (NULL == qe)
924 { 987 {
925 LOG (GNUNET_ERROR_TYPE_DEBUG, 988 LOG (GNUNET_ERROR_TYPE_DEBUG,
926 "Could not create queue entry for PUT\n"); 989 "Could not create queue entry for PUT\n");
@@ -930,20 +993,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
930 gettext_noop ("# PUT requests executed"), 993 gettext_noop ("# PUT requests executed"),
931 1, 994 1,
932 GNUNET_NO); 995 GNUNET_NO);
933 dm = (struct DataMessage *) &qe[1];
934 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
935 dm->header.size = htons (msize);
936 dm->rid = htonl (rid);
937 dm->size = htonl ((uint32_t) size);
938 dm->type = htonl (type);
939 dm->priority = htonl (priority);
940 dm->anonymity = htonl (anonymity);
941 dm->replication = htonl (replication);
942 dm->reserved = htonl (0);
943 dm->uid = GNUNET_htonll (0);
944 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
945 dm->key = *key;
946 memcpy (&dm[1], data, size);
947 process_queue (h); 996 process_queue (h);
948 return qe; 997 return qe;
949} 998}
@@ -972,6 +1021,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
972 void *cont_cls) 1021 void *cont_cls)
973{ 1022{
974 struct GNUNET_DATASTORE_QueueEntry *qe; 1023 struct GNUNET_DATASTORE_QueueEntry *qe;
1024 struct GNUNET_MQ_Envelope *env;
975 struct ReserveMessage *rm; 1025 struct ReserveMessage *rm;
976 union QueueContext qc; 1026 union QueueContext qc;
977 1027
@@ -981,14 +1031,18 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
981 "Asked to reserve %llu bytes of data and %u entries\n", 1031 "Asked to reserve %llu bytes of data and %u entries\n",
982 (unsigned long long) amount, 1032 (unsigned long long) amount,
983 (unsigned int) entries); 1033 (unsigned int) entries);
1034 env = GNUNET_MQ_msg (rm,
1035 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1036 rm->entries = htonl (entries);
1037 rm->amount = GNUNET_htonll (amount);
1038
984 qc.sc.cont = cont; 1039 qc.sc.cont = cont;
985 qc.sc.cont_cls = cont_cls; 1040 qc.sc.cont_cls = cont_cls;
986 qe = make_queue_entry (h, 1041 qe = make_queue_entry (h,
987 sizeof (struct ReserveMessage), 1042 env,
988 UINT_MAX, 1043 UINT_MAX,
989 UINT_MAX, 1044 UINT_MAX,
990 GNUNET_TIME_UNIT_FOREVER_REL, 1045 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
991 &process_status_message,
992 &qc); 1046 &qc);
993 if (NULL == qe) 1047 if (NULL == qe)
994 { 1048 {
@@ -1000,11 +1054,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1000 gettext_noop ("# RESERVE requests executed"), 1054 gettext_noop ("# RESERVE requests executed"),
1001 1, 1055 1,
1002 GNUNET_NO); 1056 GNUNET_NO);
1003 rm = (struct ReserveMessage *) &qe[1];
1004 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1005 rm->header.size = htons (sizeof (struct ReserveMessage));
1006 rm->entries = htonl (entries);
1007 rm->amount = GNUNET_htonll (amount);
1008 process_queue (h); 1057 process_queue (h);
1009 return qe; 1058 return qe;
1010} 1059}
@@ -1024,7 +1073,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1024 * @param queue_priority ranking of this request in the priority queue 1073 * @param queue_priority ranking of this request in the priority queue
1025 * @param max_queue_size at what queue size should this request be dropped 1074 * @param max_queue_size at what queue size should this request be dropped
1026 * (if other requests of higher priority are in the queue) 1075 * (if other requests of higher priority are in the queue)
1027 * @param timeout how long to wait at most for a response
1028 * @param cont continuation to call when done 1076 * @param cont continuation to call when done
1029 * @param cont_cls closure for @a cont 1077 * @param cont_cls closure for @a cont
1030 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1078 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1036,29 +1084,31 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1036 uint32_t rid, 1084 uint32_t rid,
1037 unsigned int queue_priority, 1085 unsigned int queue_priority,
1038 unsigned int max_queue_size, 1086 unsigned int max_queue_size,
1039 struct GNUNET_TIME_Relative timeout,
1040 GNUNET_DATASTORE_ContinuationWithStatus cont, 1087 GNUNET_DATASTORE_ContinuationWithStatus cont,
1041 void *cont_cls) 1088 void *cont_cls)
1042{ 1089{
1043 struct GNUNET_DATASTORE_QueueEntry *qe; 1090 struct GNUNET_DATASTORE_QueueEntry *qe;
1091 struct GNUNET_MQ_Envelope *env;
1044 struct ReleaseReserveMessage *rrm; 1092 struct ReleaseReserveMessage *rrm;
1045 union QueueContext qc; 1093 union QueueContext qc;
1046 1094
1047 if (cont == NULL) 1095 if (NULL == cont)
1048 cont = &drop_status_cont; 1096 cont = &drop_status_cont;
1049 LOG (GNUNET_ERROR_TYPE_DEBUG, 1097 LOG (GNUNET_ERROR_TYPE_DEBUG,
1050 "Asked to release reserve %d\n", 1098 "Asked to release reserve %d\n",
1051 rid); 1099 rid);
1100 env = GNUNET_MQ_msg (rrm,
1101 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1102 rrm->rid = htonl (rid);
1052 qc.sc.cont = cont; 1103 qc.sc.cont = cont;
1053 qc.sc.cont_cls = cont_cls; 1104 qc.sc.cont_cls = cont_cls;
1054 qe = make_queue_entry (h, 1105 qe = make_queue_entry (h,
1055 sizeof (struct ReleaseReserveMessage), 1106 env,
1056 queue_priority, 1107 queue_priority,
1057 max_queue_size, 1108 max_queue_size,
1058 timeout, 1109 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1059 &process_status_message,
1060 &qc); 1110 &qc);
1061 if (qe == NULL) 1111 if (NULL == qe)
1062 { 1112 {
1063 LOG (GNUNET_ERROR_TYPE_DEBUG, 1113 LOG (GNUNET_ERROR_TYPE_DEBUG,
1064 "Could not create queue entry to release reserve\n"); 1114 "Could not create queue entry to release reserve\n");
@@ -1068,10 +1118,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1068 gettext_noop 1118 gettext_noop
1069 ("# RELEASE RESERVE requests executed"), 1, 1119 ("# RELEASE RESERVE requests executed"), 1,
1070 GNUNET_NO); 1120 GNUNET_NO);
1071 rrm = (struct ReleaseReserveMessage *) &qe[1];
1072 rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1073 rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1074 rrm->rid = htonl (rid);
1075 process_queue (h); 1121 process_queue (h);
1076 return qe; 1122 return qe;
1077} 1123}
@@ -1087,7 +1133,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1087 * @param queue_priority ranking of this request in the priority queue 1133 * @param queue_priority ranking of this request in the priority queue
1088 * @param max_queue_size at what queue size should this request be dropped 1134 * @param max_queue_size at what queue size should this request be dropped
1089 * (if other requests of higher priority are in the queue) 1135 * (if other requests of higher priority are in the queue)
1090 * @param timeout how long to wait at most for a response
1091 * @param cont continuation to call when done 1136 * @param cont continuation to call when done
1092 * @param cont_cls closure for @a cont 1137 * @param cont_cls closure for @a cont
1093 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1138 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1101,26 +1146,36 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1101 struct GNUNET_TIME_Absolute expiration, 1146 struct GNUNET_TIME_Absolute expiration,
1102 unsigned int queue_priority, 1147 unsigned int queue_priority,
1103 unsigned int max_queue_size, 1148 unsigned int max_queue_size,
1104 struct GNUNET_TIME_Relative timeout,
1105 GNUNET_DATASTORE_ContinuationWithStatus cont, 1149 GNUNET_DATASTORE_ContinuationWithStatus cont,
1106 void *cont_cls) 1150 void *cont_cls)
1107{ 1151{
1108 struct GNUNET_DATASTORE_QueueEntry *qe; 1152 struct GNUNET_DATASTORE_QueueEntry *qe;
1153 struct GNUNET_MQ_Envelope *env;
1109 struct UpdateMessage *um; 1154 struct UpdateMessage *um;
1110 union QueueContext qc; 1155 union QueueContext qc;
1111 1156
1112 if (cont == NULL) 1157 if (NULL == cont)
1113 cont = &drop_status_cont; 1158 cont = &drop_status_cont;
1114 LOG (GNUNET_ERROR_TYPE_DEBUG, 1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1115 "Asked to update entry %llu raising priority by %u and expiration to %s\n", 1160 "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1116 uid, 1161 uid,
1117 (unsigned int) priority, 1162 (unsigned int) priority,
1118 GNUNET_STRINGS_absolute_time_to_string (expiration)); 1163 GNUNET_STRINGS_absolute_time_to_string (expiration));
1164 env = GNUNET_MQ_msg (um,
1165 GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1166 um->priority = htonl (priority);
1167 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1168 um->uid = GNUNET_htonll (uid);
1169
1119 qc.sc.cont = cont; 1170 qc.sc.cont = cont;
1120 qc.sc.cont_cls = cont_cls; 1171 qc.sc.cont_cls = cont_cls;
1121 qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, 1172 qe = make_queue_entry (h,
1122 max_queue_size, timeout, &process_status_message, &qc); 1173 env,
1123 if (qe == NULL) 1174 queue_priority,
1175 max_queue_size,
1176 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1177 &qc);
1178 if (NULL == qe)
1124 { 1179 {
1125 LOG (GNUNET_ERROR_TYPE_DEBUG, 1180 LOG (GNUNET_ERROR_TYPE_DEBUG,
1126 "Could not create queue entry for UPDATE\n"); 1181 "Could not create queue entry for UPDATE\n");
@@ -1129,12 +1184,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1129 GNUNET_STATISTICS_update (h->stats, 1184 GNUNET_STATISTICS_update (h->stats,
1130 gettext_noop ("# UPDATE requests executed"), 1, 1185 gettext_noop ("# UPDATE requests executed"), 1,
1131 GNUNET_NO); 1186 GNUNET_NO);
1132 um = (struct UpdateMessage *) &qe[1];
1133 um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1134 um->header.size = htons (sizeof (struct UpdateMessage));
1135 um->priority = htonl (priority);
1136 um->expiration = GNUNET_TIME_absolute_hton (expiration);
1137 um->uid = GNUNET_htonll (uid);
1138 process_queue (h); 1187 process_queue (h);
1139 return qe; 1188 return qe;
1140} 1189}
@@ -1154,7 +1203,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1154 * @param queue_priority ranking of this request in the priority queue 1203 * @param queue_priority ranking of this request in the priority queue
1155 * @param max_queue_size at what queue size should this request be dropped 1204 * @param max_queue_size at what queue size should this request be dropped
1156 * (if other requests of higher priority are in the queue) 1205 * (if other requests of higher priority are in the queue)
1157 * @param timeout how long to wait at most for a response
1158 * @param cont continuation to call when done 1206 * @param cont continuation to call when done
1159 * @param cont_cls closure for @a cont 1207 * @param cont_cls closure for @a cont
1160 * @return NULL if the entry was not queued, otherwise a handle that can be used to 1208 * @return NULL if the entry was not queued, otherwise a handle that can be used to
@@ -1168,161 +1216,64 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1168 const void *data, 1216 const void *data,
1169 unsigned int queue_priority, 1217 unsigned int queue_priority,
1170 unsigned int max_queue_size, 1218 unsigned int max_queue_size,
1171 struct GNUNET_TIME_Relative timeout,
1172 GNUNET_DATASTORE_ContinuationWithStatus cont, 1219 GNUNET_DATASTORE_ContinuationWithStatus cont,
1173 void *cont_cls) 1220 void *cont_cls)
1174{ 1221{
1175 struct GNUNET_DATASTORE_QueueEntry *qe; 1222 struct GNUNET_DATASTORE_QueueEntry *qe;
1176 struct DataMessage *dm; 1223 struct DataMessage *dm;
1177 size_t msize; 1224 struct GNUNET_MQ_Envelope *env;
1178 union QueueContext qc; 1225 union QueueContext qc;
1179 1226
1180 if (cont == NULL) 1227 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1228 {
1229 GNUNET_break (0);
1230 return NULL;
1231 }
1232 if (NULL == cont)
1181 cont = &drop_status_cont; 1233 cont = &drop_status_cont;
1182 LOG (GNUNET_ERROR_TYPE_DEBUG, 1234 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "Asked to remove %u bytes under key `%s'\n", 1235 "Asked to remove %u bytes under key `%s'\n",
1184 size, 1236 size,
1185 GNUNET_h2s (key)); 1237 GNUNET_h2s (key));
1238 env = GNUNET_MQ_msg_extra (dm,
1239 size,
1240 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1241 dm->rid = htonl (0);
1242 dm->size = htonl (size);
1243 dm->type = htonl (0);
1244 dm->priority = htonl (0);
1245 dm->anonymity = htonl (0);
1246 dm->uid = GNUNET_htonll (0);
1247 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1248 dm->key = *key;
1249 memcpy (&dm[1],
1250 data,
1251 size);
1252
1186 qc.sc.cont = cont; 1253 qc.sc.cont = cont;
1187 qc.sc.cont_cls = cont_cls; 1254 qc.sc.cont_cls = cont_cls;
1188 msize = sizeof (struct DataMessage) + size; 1255
1189 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1190 qe = make_queue_entry (h, 1256 qe = make_queue_entry (h,
1191 msize, 1257 env,
1192 queue_priority, 1258 queue_priority,
1193 max_queue_size, 1259 max_queue_size,
1194 timeout, 1260 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1195 &process_status_message,
1196 &qc); 1261 &qc);
1197 if (qe == NULL) 1262 if (NULL == qe)
1198 { 1263 {
1199 LOG (GNUNET_ERROR_TYPE_DEBUG, 1264 LOG (GNUNET_ERROR_TYPE_DEBUG,
1200 "Could not create queue entry for REMOVE\n"); 1265 "Could not create queue entry for REMOVE\n");
1201 return NULL; 1266 return NULL;
1202 } 1267 }
1203 GNUNET_STATISTICS_update (h->stats, 1268 GNUNET_STATISTICS_update (h->stats,
1204 gettext_noop ("# REMOVE requests executed"), 1, 1269 gettext_noop ("# REMOVE requests executed"),
1270 1,
1205 GNUNET_NO); 1271 GNUNET_NO);
1206 dm = (struct DataMessage *) &qe[1];
1207 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1208 dm->header.size = htons (msize);
1209 dm->rid = htonl (0);
1210 dm->size = htonl (size);
1211 dm->type = htonl (0);
1212 dm->priority = htonl (0);
1213 dm->anonymity = htonl (0);
1214 dm->uid = GNUNET_htonll (0);
1215 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1216 dm->key = *key;
1217 memcpy (&dm[1], data, size);
1218 process_queue (h); 1272 process_queue (h);
1219 return qe; 1273 return qe;
1220} 1274}
1221 1275
1222 1276
1223/**
1224 * Type of a function to call when we receive a message
1225 * from the service.
1226 *
1227 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
1228 * @param msg message received, NULL on timeout or fatal error
1229 */
1230static void
1231process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1232{
1233 struct GNUNET_DATASTORE_Handle *h = cls;
1234 struct GNUNET_DATASTORE_QueueEntry *qe;
1235 struct ResultContext rc;
1236 const struct DataMessage *dm;
1237 int was_transmitted;
1238
1239 if (NULL == msg)
1240 {
1241 qe = h->queue_head;
1242 GNUNET_assert (NULL != qe);
1243 rc = qe->qc.rc;
1244 was_transmitted = qe->was_transmitted;
1245 free_queue_entry (qe);
1246 if (GNUNET_YES == was_transmitted)
1247 {
1248 LOG (GNUNET_ERROR_TYPE_DEBUG,
1249 "Failed to receive response from database.\n");
1250 do_disconnect (h);
1251 }
1252 else
1253 {
1254 process_queue (h);
1255 }
1256 if (NULL != rc.proc)
1257 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1258 0);
1259 return;
1260 }
1261 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1262 {
1263 GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1264 qe = h->queue_head;
1265 rc = qe->qc.rc;
1266 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1267 free_queue_entry (qe);
1268 LOG (GNUNET_ERROR_TYPE_DEBUG,
1269 "Received end of result set, new queue size is %u\n", h->queue_size);
1270 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1271 h->result_count = 0;
1272 process_queue (h);
1273 if (NULL != rc.proc)
1274 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1275 0);
1276 return;
1277 }
1278 qe = h->queue_head;
1279 GNUNET_assert (NULL != qe);
1280 rc = qe->qc.rc;
1281 if (GNUNET_YES != qe->was_transmitted)
1282 {
1283 GNUNET_break (0);
1284 free_queue_entry (qe);
1285 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1286 do_disconnect (h);
1287 if (rc.proc != NULL)
1288 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1289 0);
1290 return;
1291 }
1292 if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1293 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1294 (ntohs (msg->size) !=
1295 sizeof (struct DataMessage) +
1296 ntohl (((const struct DataMessage *) msg)->size)))
1297 {
1298 GNUNET_break (0);
1299 free_queue_entry (qe);
1300 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1301 do_disconnect (h);
1302 if (rc.proc != NULL)
1303 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1304 0);
1305 return;
1306 }
1307#if INSANE_STATISTICS
1308 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1309 GNUNET_NO);
1310#endif
1311 dm = (const struct DataMessage *) msg;
1312 LOG (GNUNET_ERROR_TYPE_DEBUG,
1313 "Received result %llu with type %u and size %u with key %s\n",
1314 (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1315 ntohl (dm->size), GNUNET_h2s (&dm->key));
1316 free_queue_entry (qe);
1317 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1318 process_queue (h);
1319 if (rc.proc != NULL)
1320 rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1321 ntohl (dm->priority), ntohl (dm->anonymity),
1322 GNUNET_TIME_absolute_ntoh (dm->expiration),
1323 GNUNET_ntohll (dm->uid));
1324}
1325
1326 1277
1327/** 1278/**
1328 * Get a random value from the datastore for content replication. 1279 * Get a random value from the datastore for content replication.
@@ -1335,7 +1286,6 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1335 * @param queue_priority ranking of this request in the priority queue 1286 * @param queue_priority ranking of this request in the priority queue
1336 * @param max_queue_size at what queue size should this request be dropped 1287 * @param max_queue_size at what queue size should this request be dropped
1337 * (if other requests of higher priority are in the queue) 1288 * (if other requests of higher priority are in the queue)
1338 * @param timeout how long to wait at most for a response
1339 * @param proc function to call on a random value; it 1289 * @param proc function to call on a random value; it
1340 * will be called once with a value (if available) 1290 * will be called once with a value (if available)
1341 * and always once with a value of NULL. 1291 * and always once with a value of NULL.
@@ -1347,23 +1297,27 @@ struct GNUNET_DATASTORE_QueueEntry *
1347GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, 1297GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1348 unsigned int queue_priority, 1298 unsigned int queue_priority,
1349 unsigned int max_queue_size, 1299 unsigned int max_queue_size,
1350 struct GNUNET_TIME_Relative timeout,
1351 GNUNET_DATASTORE_DatumProcessor proc, 1300 GNUNET_DATASTORE_DatumProcessor proc,
1352 void *proc_cls) 1301 void *proc_cls)
1353{ 1302{
1354 struct GNUNET_DATASTORE_QueueEntry *qe; 1303 struct GNUNET_DATASTORE_QueueEntry *qe;
1304 struct GNUNET_MQ_Envelope *env;
1355 struct GNUNET_MessageHeader *m; 1305 struct GNUNET_MessageHeader *m;
1356 union QueueContext qc; 1306 union QueueContext qc;
1357 1307
1358 GNUNET_assert (NULL != proc); 1308 GNUNET_assert (NULL != proc);
1359 LOG (GNUNET_ERROR_TYPE_DEBUG, 1309 LOG (GNUNET_ERROR_TYPE_DEBUG,
1360 "Asked to get replication entry in %s\n", 1310 "Asked to get replication entry\n");
1361 GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); 1311 env = GNUNET_MQ_msg (m,
1312 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1362 qc.rc.proc = proc; 1313 qc.rc.proc = proc;
1363 qc.rc.proc_cls = proc_cls; 1314 qc.rc.proc_cls = proc_cls;
1364 qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), 1315 qe = make_queue_entry (h,
1365 queue_priority, max_queue_size, timeout, 1316 env,
1366 &process_result_message, &qc); 1317 queue_priority,
1318 max_queue_size,
1319 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1320 &qc);
1367 if (NULL == qe) 1321 if (NULL == qe)
1368 { 1322 {
1369 LOG (GNUNET_ERROR_TYPE_DEBUG, 1323 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1374,9 +1328,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1374 gettext_noop 1328 gettext_noop
1375 ("# GET REPLICATION requests executed"), 1, 1329 ("# GET REPLICATION requests executed"), 1,
1376 GNUNET_NO); 1330 GNUNET_NO);
1377 m = (struct GNUNET_MessageHeader *) &qe[1];
1378 m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1379 m->size = htons (sizeof (struct GNUNET_MessageHeader));
1380 process_queue (h); 1331 process_queue (h);
1381 return qe; 1332 return qe;
1382} 1333}
@@ -1393,7 +1344,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1393 * @param queue_priority ranking of this request in the priority queue 1344 * @param queue_priority ranking of this request in the priority queue
1394 * @param max_queue_size at what queue size should this request be dropped 1345 * @param max_queue_size at what queue size should this request be dropped
1395 * (if other requests of higher priority are in the queue) 1346 * (if other requests of higher priority are in the queue)
1396 * @param timeout how long to wait at most for a response
1397 * @param type allowed type for the operation (never zero) 1347 * @param type allowed type for the operation (never zero)
1398 * @param proc function to call on a random value; it 1348 * @param proc function to call on a random value; it
1399 * will be called once with a value (if available) 1349 * will be called once with a value (if available)
@@ -1407,31 +1357,32 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1407 uint64_t offset, 1357 uint64_t offset,
1408 unsigned int queue_priority, 1358 unsigned int queue_priority,
1409 unsigned int max_queue_size, 1359 unsigned int max_queue_size,
1410 struct GNUNET_TIME_Relative timeout,
1411 enum GNUNET_BLOCK_Type type, 1360 enum GNUNET_BLOCK_Type type,
1412 GNUNET_DATASTORE_DatumProcessor proc, 1361 GNUNET_DATASTORE_DatumProcessor proc,
1413 void *proc_cls) 1362 void *proc_cls)
1414{ 1363{
1415 struct GNUNET_DATASTORE_QueueEntry *qe; 1364 struct GNUNET_DATASTORE_QueueEntry *qe;
1365 struct GNUNET_MQ_Envelope *env;
1416 struct GetZeroAnonymityMessage *m; 1366 struct GetZeroAnonymityMessage *m;
1417 union QueueContext qc; 1367 union QueueContext qc;
1418 1368
1419 GNUNET_assert (NULL != proc); 1369 GNUNET_assert (NULL != proc);
1420 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); 1370 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1421 LOG (GNUNET_ERROR_TYPE_DEBUG, 1371 LOG (GNUNET_ERROR_TYPE_DEBUG,
1422 "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", 1372 "Asked to get %llu-th zero-anonymity entry of type %d\n",
1423 (unsigned long long) offset, 1373 (unsigned long long) offset,
1424 type, 1374 type);
1425 GNUNET_STRINGS_relative_time_to_string (timeout, 1375 env = GNUNET_MQ_msg (m,
1426 GNUNET_YES)); 1376 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1377 m->type = htonl ((uint32_t) type);
1378 m->offset = GNUNET_htonll (offset);
1427 qc.rc.proc = proc; 1379 qc.rc.proc = proc;
1428 qc.rc.proc_cls = proc_cls; 1380 qc.rc.proc_cls = proc_cls;
1429 qe = make_queue_entry (h, 1381 qe = make_queue_entry (h,
1430 sizeof (struct GetZeroAnonymityMessage), 1382 env,
1431 queue_priority, 1383 queue_priority,
1432 max_queue_size, 1384 max_queue_size,
1433 timeout, 1385 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1434 &process_result_message,
1435 &qc); 1386 &qc);
1436 if (NULL == qe) 1387 if (NULL == qe)
1437 { 1388 {
@@ -1443,11 +1394,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1443 gettext_noop 1394 gettext_noop
1444 ("# GET ZERO ANONYMITY requests executed"), 1, 1395 ("# GET ZERO ANONYMITY requests executed"), 1,
1445 GNUNET_NO); 1396 GNUNET_NO);
1446 m = (struct GetZeroAnonymityMessage *) &qe[1];
1447 m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1448 m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1449 m->type = htonl ((uint32_t) type);
1450 m->offset = GNUNET_htonll (offset);
1451 process_queue (h); 1397 process_queue (h);
1452 return qe; 1398 return qe;
1453} 1399}
@@ -1467,7 +1413,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1467 * @param queue_priority ranking of this request in the priority queue 1413 * @param queue_priority ranking of this request in the priority queue
1468 * @param max_queue_size at what queue size should this request be dropped 1414 * @param max_queue_size at what queue size should this request be dropped
1469 * (if other requests of higher priority are in the queue) 1415 * (if other requests of higher priority are in the queue)
1470 * @param timeout how long to wait at most for a response
1471 * @param proc function to call on each matching value; 1416 * @param proc function to call on each matching value;
1472 * will be called once with a NULL value at the end 1417 * will be called once with a NULL value at the end
1473 * @param proc_cls closure for @a proc 1418 * @param proc_cls closure for @a proc
@@ -1481,28 +1426,44 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1481 enum GNUNET_BLOCK_Type type, 1426 enum GNUNET_BLOCK_Type type,
1482 unsigned int queue_priority, 1427 unsigned int queue_priority,
1483 unsigned int max_queue_size, 1428 unsigned int max_queue_size,
1484 struct GNUNET_TIME_Relative timeout,
1485 GNUNET_DATASTORE_DatumProcessor proc, 1429 GNUNET_DATASTORE_DatumProcessor proc,
1486 void *proc_cls) 1430 void *proc_cls)
1487{ 1431{
1488 struct GNUNET_DATASTORE_QueueEntry *qe; 1432 struct GNUNET_DATASTORE_QueueEntry *qe;
1433 struct GNUNET_MQ_Envelope *env;
1434 struct GetKeyMessage *gkm;
1489 struct GetMessage *gm; 1435 struct GetMessage *gm;
1490 union QueueContext qc; 1436 union QueueContext qc;
1491 1437
1492 GNUNET_assert (NULL != proc); 1438 GNUNET_assert (NULL != proc);
1493 LOG (GNUNET_ERROR_TYPE_DEBUG, 1439 LOG (GNUNET_ERROR_TYPE_DEBUG,
1494 "Asked to look for data of type %u under key `%s'\n", 1440 "Asked to look for data of type %u under key `%s'\n",
1495 (unsigned int) type, GNUNET_h2s (key)); 1441 (unsigned int) type,
1442 GNUNET_h2s (key));
1443 if (NULL == key)
1444 {
1445 env = GNUNET_MQ_msg (gm,
1446 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1447 gm->type = htonl (type);
1448 gm->offset = GNUNET_htonll (offset);
1449 }
1450 else
1451 {
1452 env = GNUNET_MQ_msg (gkm,
1453 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1454 gkm->type = htonl (type);
1455 gkm->offset = GNUNET_htonll (offset);
1456 gkm->key = *key;
1457 }
1496 qc.rc.proc = proc; 1458 qc.rc.proc = proc;
1497 qc.rc.proc_cls = proc_cls; 1459 qc.rc.proc_cls = proc_cls;
1498 qe = make_queue_entry (h, 1460 qe = make_queue_entry (h,
1499 sizeof (struct GetMessage), 1461 env,
1500 queue_priority, 1462 queue_priority,
1501 max_queue_size, 1463 max_queue_size,
1502 timeout, 1464 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1503 &process_result_message,
1504 &qc); 1465 &qc);
1505 if (qe == NULL) 1466 if (NULL == qe)
1506 { 1467 {
1507 LOG (GNUNET_ERROR_TYPE_DEBUG, 1468 LOG (GNUNET_ERROR_TYPE_DEBUG,
1508 "Could not queue request for `%s'\n", 1469 "Could not queue request for `%s'\n",
@@ -1515,20 +1476,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1515 1, 1476 1,
1516 GNUNET_NO); 1477 GNUNET_NO);
1517#endif 1478#endif
1518 gm = (struct GetMessage *) &qe[1];
1519 gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1520 gm->type = htonl (type);
1521 gm->offset = GNUNET_htonll (offset);
1522 if (key != NULL)
1523 {
1524 gm->header.size = htons (sizeof (struct GetMessage));
1525 gm->key = *key;
1526 }
1527 else
1528 {
1529 gm->header.size =
1530 htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1531 }
1532 process_queue (h); 1479 process_queue (h);
1533 return qe; 1480 return qe;
1534} 1481}
@@ -1543,16 +1490,14 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1543void 1490void
1544GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) 1491GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1545{ 1492{
1546 struct GNUNET_DATASTORE_Handle *h; 1493 struct GNUNET_DATASTORE_Handle *h = qe->h;
1547 1494
1548 GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1549 h = qe->h;
1550 LOG (GNUNET_ERROR_TYPE_DEBUG, 1495 LOG (GNUNET_ERROR_TYPE_DEBUG,
1551 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1496 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1552 qe, 1497 qe,
1553 qe->was_transmitted, 1498 NULL == qe->env,
1554 h->queue_head == qe); 1499 h->queue_head == qe);
1555 if (GNUNET_YES == qe->was_transmitted) 1500 if (NULL == qe->env)
1556 { 1501 {
1557 free_queue_entry (qe); 1502 free_queue_entry (qe);
1558 h->skip_next_messages++; 1503 h->skip_next_messages++;
diff --git a/src/datastore/gnunet-datastore.c b/src/datastore/gnunet-datastore.c
index ddca4ee06..b3d14c43c 100644
--- a/src/datastore/gnunet-datastore.c
+++ b/src/datastore/gnunet-datastore.c
@@ -137,7 +137,8 @@ do_finish (void *cls,
137static void 137static void
138do_put (void *cls, 138do_put (void *cls,
139 const struct GNUNET_HashCode *key, 139 const struct GNUNET_HashCode *key,
140 size_t size, const void *data, 140 size_t size,
141 const void *data,
141 enum GNUNET_BLOCK_Type type, 142 enum GNUNET_BLOCK_Type type,
142 uint32_t priority, 143 uint32_t priority,
143 uint32_t anonymity, 144 uint32_t anonymity,
@@ -158,7 +159,7 @@ do_put (void *cls,
158 priority, anonymity, 159 priority, anonymity,
159 0 /* FIXME: replication is lost... */, 160 0 /* FIXME: replication is lost... */,
160 expiration, 161 expiration,
161 0, 1, GNUNET_TIME_UNIT_FOREVER_REL, 162 0, 1,
162 &do_finish, NULL); 163 &do_finish, NULL);
163} 164}
164 165
@@ -173,7 +174,6 @@ do_get ()
173 offset, 174 offset,
174 NULL, GNUNET_BLOCK_TYPE_ANY, 175 NULL, GNUNET_BLOCK_TYPE_ANY,
175 0, 1, 176 0, 1,
176 GNUNET_TIME_UNIT_FOREVER_REL,
177 &do_put, NULL); 177 &do_put, NULL);
178} 178}
179 179
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index a67d1c772..57527991c 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -416,16 +416,20 @@ delete_expired (void *cls)
416 * @param expiration expiration time for the content 416 * @param expiration expiration time for the content
417 * @param uid unique identifier for the datum; 417 * @param uid unique identifier for the datum;
418 * maybe 0 if no unique identifier is available 418 * maybe 0 if no unique identifier is available
419 * 419 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
420 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
421 * (continue on call to "next", of course), 420 * (continue on call to "next", of course),
422 * GNUNET_NO to delete the item and continue (if supported) 421 * #GNUNET_NO to delete the item and continue (if supported)
423 */ 422 */
424static int 423static int
425quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 424quota_processor (void *cls,
426 const void *data, enum GNUNET_BLOCK_Type type, 425 const struct GNUNET_HashCode *key,
427 uint32_t priority, uint32_t anonymity, 426 uint32_t size,
428 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 427 const void *data,
428 enum GNUNET_BLOCK_Type type,
429 uint32_t priority,
430 uint32_t anonymity,
431 struct GNUNET_TIME_Absolute expiration,
432 uint64_t uid)
429{ 433{
430 unsigned long long *need = cls; 434 unsigned long long *need = cls;
431 435
@@ -473,12 +477,15 @@ manage_space (unsigned long long need)
473 unsigned long long last; 477 unsigned long long last;
474 478
475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476 "Asked to free up %llu bytes of cache space\n", need); 480 "Asked to free up %llu bytes of cache space\n",
481 need);
477 last = 0; 482 last = 0;
478 while ((need > 0) && (last != need)) 483 while ((need > 0) && (last != need))
479 { 484 {
480 last = need; 485 last = need;
481 plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need); 486 plugin->api->get_expiration (plugin->api->cls,
487 &quota_processor,
488 &need);
482 } 489 }
483} 490}
484 491
@@ -1068,7 +1075,7 @@ handle_put (void *cls,
1068 1075
1069 1076
1070/** 1077/**
1071 * Handle GET-message. 1078 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1072 * 1079 *
1073 * @param cls closure 1080 * @param cls closure
1074 * @param client identification of the client 1081 * @param client identification of the client
@@ -1080,28 +1087,52 @@ handle_get (void *cls,
1080 const struct GNUNET_MessageHeader *message) 1087 const struct GNUNET_MessageHeader *message)
1081{ 1088{
1082 const struct GetMessage *msg; 1089 const struct GetMessage *msg;
1083 uint16_t size;
1084 1090
1085 size = ntohs (message->size);
1086 if ((size != sizeof (struct GetMessage)) &&
1087 (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
1088 {
1089 GNUNET_break (0);
1090 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1091 return;
1092 }
1093 msg = (const struct GetMessage *) message; 1091 msg = (const struct GetMessage *) message;
1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093 "Processing GET request of type %u\n",
1094 ntohl (msg->type));
1095 GNUNET_STATISTICS_update (stats,
1096 gettext_noop ("# GET requests received"),
1097 1,
1098 GNUNET_NO);
1099 GNUNET_SERVER_client_keep (client);
1100 plugin->api->get_key (plugin->api->cls,
1101 GNUNET_ntohll (msg->offset),
1102 NULL,
1103 NULL,
1104 ntohl (msg->type),
1105 &transmit_item,
1106 client);
1107}
1108
1109/**
1110 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1111 *
1112 * @param cls closure
1113 * @param client identification of the client
1114 * @param message the actual message
1115 */
1116static void
1117handle_get_key (void *cls,
1118 struct GNUNET_SERVER_Client *client,
1119 const struct GNUNET_MessageHeader *message)
1120{
1121 const struct GetKeyMessage *msg;
1122
1123 msg = (const struct GetKeyMessage *) message;
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095 "Processing GET request for `%s' of type %u\n", 1125 "Processing GET request for `%s' of type %u\n",
1096 GNUNET_h2s (&msg->key), 1126 GNUNET_h2s (&msg->key),
1097 ntohl (msg->type)); 1127 ntohl (msg->type));
1098 GNUNET_STATISTICS_update (stats, 1128 GNUNET_STATISTICS_update (stats,
1099 gettext_noop ("# GET requests received"), 1129 gettext_noop ("# GET KEY requests received"),
1100 1, 1130 1,
1101 GNUNET_NO); 1131 GNUNET_NO);
1102 GNUNET_SERVER_client_keep (client); 1132 GNUNET_SERVER_client_keep (client);
1103 if ( (size == sizeof (struct GetMessage)) && 1133 if (GNUNET_YES !=
1104 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) ) 1134 GNUNET_CONTAINER_bloomfilter_test (filter,
1135 &msg->key))
1105 { 1136 {
1106 /* don't bother database... */ 1137 /* don't bother database... */
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1112,14 +1143,19 @@ handle_get (void *cls,
1112 ("# requests filtered by bloomfilter"), 1143 ("# requests filtered by bloomfilter"),
1113 1, 1144 1,
1114 GNUNET_NO); 1145 GNUNET_NO);
1115 transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 1146 transmit_item (client,
1147 NULL, 0, NULL, 0, 0, 0,
1148 GNUNET_TIME_UNIT_ZERO_ABS,
1116 0); 1149 0);
1117 return; 1150 return;
1118 } 1151 }
1119 plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset), 1152 plugin->api->get_key (plugin->api->cls,
1120 ((size == 1153 GNUNET_ntohll (msg->offset),
1121 sizeof (struct GetMessage)) ? &msg->key : NULL), NULL, 1154 &msg->key,
1122 ntohl (msg->type), &transmit_item, client); 1155 NULL,
1156 ntohl (msg->type),
1157 &transmit_item,
1158 client);
1123} 1159}
1124 1160
1125 1161
@@ -1369,7 +1405,8 @@ disk_utilization_change_cb (void *cls,
1369 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"), 1405 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1370 (long long) payload, 1406 (long long) payload,
1371 (long long) -delta); 1407 (long long) -delta);
1372 plugin->api->estimate_size (plugin->api->cls, &payload); 1408 plugin->api->estimate_size (plugin->api->cls,
1409 &payload);
1373 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1410 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1374 _("New payload: %lld\n"), 1411 _("New payload: %lld\n"),
1375 (long long) payload); 1412 (long long) payload);
@@ -1474,7 +1511,10 @@ static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1474 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, 1511 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1475 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 1512 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1476 sizeof (struct UpdateMessage)}, 1513 sizeof (struct UpdateMessage)},
1477 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, 1514 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1515 sizeof (struct GetMessage) },
1516 {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1517 sizeof (struct GetKeyMessage) },
1478 {&handle_get_replication, NULL, 1518 {&handle_get_replication, NULL,
1479 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 1519 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1480 sizeof (struct GNUNET_MessageHeader)}, 1520 sizeof (struct GNUNET_MessageHeader)},
@@ -1555,6 +1595,10 @@ process_stat_done (void *cls,
1555 "Failed to obtain value from statistics service, recomputing it\n"); 1595 "Failed to obtain value from statistics service, recomputing it\n");
1556 plugin->api->estimate_size (plugin->api->cls, 1596 plugin->api->estimate_size (plugin->api->cls,
1557 &payload); 1597 &payload);
1598 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1599 _("New payload: %lld\n"),
1600 (long long) payload);
1601
1558 } 1602 }
1559 if (GNUNET_YES == refresh_bf) 1603 if (GNUNET_YES == refresh_bf)
1560 { 1604 {
@@ -1624,7 +1668,13 @@ cleaning_task (void *cls)
1624 expired_kill_task = NULL; 1668 expired_kill_task = NULL;
1625 } 1669 }
1626 if (GNUNET_YES == do_drop) 1670 if (GNUNET_YES == do_drop)
1671 {
1672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673 "Dropping database!\n");
1627 plugin->api->drop (plugin->api->cls); 1674 plugin->api->drop (plugin->api->cls);
1675 payload = 0;
1676 last_sync++;
1677 }
1628 if (NULL != plugin) 1678 if (NULL != plugin)
1629 { 1679 {
1630 unload_plugin (plugin); 1680 unload_plugin (plugin);
@@ -1651,7 +1701,8 @@ cleaning_task (void *cls)
1651 sync_stats (); 1701 sync_stats ();
1652 if (NULL != stats) 1702 if (NULL != stats)
1653 { 1703 {
1654 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 1704 GNUNET_STATISTICS_destroy (stats,
1705 GNUNET_YES);
1655 stats = NULL; 1706 stats = NULL;
1656 } 1707 }
1657 GNUNET_free (quota_stat_name); 1708 GNUNET_free (quota_stat_name);
diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c
index 97774198c..4f1f99a5c 100644
--- a/src/datastore/perf_datastore_api.c
+++ b/src/datastore/perf_datastore_api.c
@@ -332,7 +332,6 @@ delete_value (void *cls,
332 key, 332 key,
333 size, 333 size,
334 data, 1, 1, 334 data, 1, 1,
335 TIMEOUT,
336 &remove_next, crc)); 335 &remove_next, crc));
337} 336}
338 337
@@ -396,7 +395,6 @@ run_continuation (void *cls)
396 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), 395 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
397 1, 396 1,
398 1, 397 1,
399 TIMEOUT,
400 &check_success, crc)); 398 &check_success, crc));
401 break; 399 break;
402 case RP_CUT: 400 case RP_CUT:
@@ -404,7 +402,6 @@ run_continuation (void *cls)
404 GNUNET_assert (NULL != 402 GNUNET_assert (NULL !=
405 GNUNET_DATASTORE_get_for_replication (datastore, 403 GNUNET_DATASTORE_get_for_replication (datastore,
406 1, 1, 404 1, 1,
407 TIMEOUT,
408 &delete_value, 405 &delete_value,
409 crc)); 406 crc));
410 break; 407 break;
@@ -466,7 +463,6 @@ run_continuation (void *cls)
466 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), 463 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
467 1, 464 1,
468 1, 465 1,
469 TIMEOUT,
470 &check_success, crc)); 466 &check_success, crc));
471 break; 467 break;
472 468
@@ -573,7 +569,6 @@ run (void *cls,
573 0, 0, 0, 569 0, 0, 0,
574 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS), 570 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS),
575 0, 1, 571 0, 1,
576 TIMEOUT,
577 &run_tests, crc)) 572 &run_tests, crc))
578 { 573 {
579 FPRINTF (stderr, 574 FPRINTF (stderr,
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c
index bae40d17d..0ae5c1a2e 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -180,7 +180,7 @@ struct Plugin
180#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?" 180#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
181 struct GNUNET_MYSQL_StatementHandle *dec_repl; 181 struct GNUNET_MYSQL_StatementHandle *dec_repl;
182 182
183#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" 183#define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
184 struct GNUNET_MYSQL_StatementHandle *get_size; 184 struct GNUNET_MYSQL_StatementHandle *get_size;
185 185
186#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\ 186#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\
@@ -221,23 +221,22 @@ struct Plugin
221 * 221 *
222 * @param plugin plugin context 222 * @param plugin plugin context
223 * @param uid unique ID of the entry to delete 223 * @param uid unique ID of the entry to delete
224 * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error 224 * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error
225 */ 225 */
226static int 226static int
227do_delete_entry (struct Plugin *plugin, unsigned long long uid) 227do_delete_entry (struct Plugin *plugin,
228 unsigned long long uid)
228{ 229{
229 int ret; 230 int ret;
230 uint64_t uid64 = (uint64_t) uid; 231 uint64_t uid64 = (uint64_t) uid;
231
232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
233 "Deleting value %llu from gn090 table\n",
234 uid);
235
236 struct GNUNET_MY_QueryParam params_delete[] = { 232 struct GNUNET_MY_QueryParam params_delete[] = {
237 GNUNET_MY_query_param_uint64 (&uid64), 233 GNUNET_MY_query_param_uint64 (&uid64),
238 GNUNET_MY_query_param_end 234 GNUNET_MY_query_param_end
239 }; 235 };
240 236
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "Deleting value %llu from gn090 table\n",
239 uid);
241 ret = GNUNET_MY_exec_prepared (plugin->mc, 240 ret = GNUNET_MY_exec_prepared (plugin->mc,
242 plugin->delete_entry_by_uid, 241 plugin->delete_entry_by_uid,
243 params_delete); 242 params_delete);
@@ -247,7 +246,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
247 } 246 }
248 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 247 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
249 "Deleting value %llu from gn090 table failed\n", 248 "Deleting value %llu from gn090 table failed\n",
250 uid); 249 (unsigned long long) uid);
251 return ret; 250 return ret;
252} 251}
253 252
@@ -256,7 +255,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid)
256 * Get an estimate of how much space the database is 255 * Get an estimate of how much space the database is
257 * currently using. 256 * currently using.
258 * 257 *
259 * @param cls our "struct Plugin *" 258 * @param cls our `struct Plugin *`
260 * @return number of bytes used on disk 259 * @return number of bytes used on disk
261 */ 260 */
262static void 261static void
@@ -266,26 +265,33 @@ mysql_plugin_estimate_size (void *cls,
266 struct Plugin *plugin = cls; 265 struct Plugin *plugin = cls;
267 uint64_t total; 266 uint64_t total;
268 int ret; 267 int ret;
269
270 struct GNUNET_MY_QueryParam params_get[] = { 268 struct GNUNET_MY_QueryParam params_get[] = {
271 GNUNET_MY_query_param_end 269 GNUNET_MY_query_param_end
272 }; 270 };
273
274 struct GNUNET_MY_ResultSpec results_get[] = { 271 struct GNUNET_MY_ResultSpec results_get[] = {
275 GNUNET_MY_result_spec_uint64 (&total), 272 GNUNET_MY_result_spec_uint64 (&total),
276 GNUNET_MY_result_spec_end 273 GNUNET_MY_result_spec_end
277 }; 274 };
278 275
279 ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get); 276 ret = GNUNET_MY_exec_prepared (plugin->mc,
280 if (GNUNET_OK == ret) 277 plugin->get_size,
281 { 278 params_get);
282 if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, results_get)) 279 *estimate = 0;
283 { 280 total = UINT64_MAX;
284 *estimate = (unsigned long long)total; 281 if ( (GNUNET_OK == ret) &&
285 } 282 (GNUNET_OK ==
286 } 283 GNUNET_MY_extract_result (plugin->get_size,
287 else 284 results_get)) )
288 *estimate = 0; 285 {
286 *estimate = (unsigned long long) total;
287 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
288 "Size estimate for MySQL payload is %lld\n",
289 (long long) total);
290 GNUNET_assert (UINT64_MAX != total);
291 GNUNET_break (GNUNET_NO ==
292 GNUNET_MY_extract_result (plugin->get_size,
293 NULL));
294 }
289} 295}
290 296
291 297
@@ -294,7 +300,7 @@ mysql_plugin_estimate_size (void *cls,
294 * 300 *
295 * @param cls closure 301 * @param cls closure
296 * @param key key for the item 302 * @param key key for the item
297 * @param size number of bytes in data 303 * @param size number of bytes in @a data
298 * @param data content stored 304 * @param data content stored
299 * @param type type of the content 305 * @param type type of the content
300 * @param priority priority of the content 306 * @param priority priority of the content
@@ -302,7 +308,7 @@ mysql_plugin_estimate_size (void *cls,
302 * @param replication replication-level for the content 308 * @param replication replication-level for the content
303 * @param expiration expiration time for the content 309 * @param expiration expiration time for the content
304 * @param cont continuation called with success or failure status 310 * @param cont continuation called with success or failure status
305 * @param cont_cls continuation closure 311 * @param cont_cls closure for @a cont
306 */ 312 */
307static void 313static void
308mysql_plugin_put (void *cls, 314mysql_plugin_put (void *cls,
@@ -318,12 +324,9 @@ mysql_plugin_put (void *cls,
318 void *cont_cls) 324 void *cont_cls)
319{ 325{
320 struct Plugin *plugin = cls; 326 struct Plugin *plugin = cls;
321
322 uint64_t lexpiration = expiration.abs_value_us; 327 uint64_t lexpiration = expiration.abs_value_us;
323 uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 328 uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
324 UINT64_MAX); 329 UINT64_MAX);
325 unsigned long lsize = 0;
326
327 struct GNUNET_HashCode vhash; 330 struct GNUNET_HashCode vhash;
328 struct GNUNET_MY_QueryParam params_insert[] = { 331 struct GNUNET_MY_QueryParam params_insert[] = {
329 GNUNET_MY_query_param_uint32 (&replication), 332 GNUNET_MY_query_param_uint32 (&replication),
@@ -334,7 +337,7 @@ mysql_plugin_put (void *cls,
334 GNUNET_MY_query_param_uint64 (&lrvalue), 337 GNUNET_MY_query_param_uint64 (&lrvalue),
335 GNUNET_MY_query_param_auto_from_type (key), 338 GNUNET_MY_query_param_auto_from_type (key),
336 GNUNET_MY_query_param_auto_from_type (&vhash), 339 GNUNET_MY_query_param_auto_from_type (&vhash),
337 GNUNET_MY_query_param_fixed_size (data, lsize), 340 GNUNET_MY_query_param_fixed_size (data, size),
338 GNUNET_MY_query_param_end 341 GNUNET_MY_query_param_end
339 }; 342 };
340 343
@@ -344,7 +347,6 @@ mysql_plugin_put (void *cls,
344 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large")); 347 cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
345 return; 348 return;
346 } 349 }
347 lsize = size;
348 GNUNET_CRYPTO_hash (data, 350 GNUNET_CRYPTO_hash (data,
349 size, 351 size,
350 &vhash); 352 &vhash);
@@ -354,15 +356,28 @@ mysql_plugin_put (void *cls,
354 plugin->insert_entry, 356 plugin->insert_entry,
355 params_insert)) 357 params_insert))
356 { 358 {
357 cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure")); 359 cont (cont_cls,
360 key,
361 size,
362 GNUNET_SYSERR,
363 _("MySQL statement run failure"));
358 return; 364 return;
359 } 365 }
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361 "Inserted value `%s' with size %u into gn090 table\n", 367 "Inserted value `%s' with size %u into gn090 table\n",
362 GNUNET_h2s (key), (unsigned int) size); 368 GNUNET_h2s (key),
369 (unsigned int) size);
363 if (size > 0) 370 if (size > 0)
364 plugin->env->duc (plugin->env->cls, size); 371 plugin->env->duc (plugin->env->cls,
365 cont (cont_cls, key, size, GNUNET_OK, NULL); 372 size);
373 GNUNET_break (GNUNET_NO ==
374 GNUNET_MY_extract_result (plugin->insert_entry,
375 NULL));
376 cont (cont_cls,
377 key,
378 size,
379 GNUNET_OK,
380 NULL);
366} 381}
367 382
368 383
@@ -390,18 +405,22 @@ mysql_plugin_put (void *cls,
390 * @param cons_cls continuation closure 405 * @param cons_cls continuation closure
391 */ 406 */
392static void 407static void
393mysql_plugin_update (void *cls, uint64_t uid, int delta, 408mysql_plugin_update (void *cls,
409 uint64_t uid,
410 int delta,
394 struct GNUNET_TIME_Absolute expire, 411 struct GNUNET_TIME_Absolute expire,
395 PluginUpdateCont cont, void *cont_cls) 412 PluginUpdateCont cont,
413 void *cont_cls)
396{ 414{
397 struct Plugin *plugin = cls; 415 struct Plugin *plugin = cls;
398 uint32_t idelta = (uint32_t)delta; 416 uint32_t idelta = (uint32_t) delta;
399 uint64_t lexpire = expire.abs_value_us; 417 uint64_t lexpire = expire.abs_value_us;
400 int ret; 418 int ret;
401 419
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Updating value %llu adding %d to priority and maxing exp at %s\n", 421 "Updating value %llu adding %d to priority and maxing exp at %s\n",
404 (unsigned long long)uid, delta, 422 (unsigned long long) uid,
423 delta,
405 GNUNET_STRINGS_absolute_time_to_string (expire)); 424 GNUNET_STRINGS_absolute_time_to_string (expire));
406 425
407 struct GNUNET_MY_QueryParam params_update[] = { 426 struct GNUNET_MY_QueryParam params_update[] = {
@@ -416,12 +435,21 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
416 plugin->update_entry, 435 plugin->update_entry,
417 params_update); 436 params_update);
418 437
419 if (ret != GNUNET_OK) 438 if (GNUNET_OK != ret)
420 { 439 {
421 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", 440 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
441 "Failed to update value %llu\n",
422 (unsigned long long) uid); 442 (unsigned long long) uid);
423 } 443 }
424 cont (cont_cls, ret, NULL); 444 else
445 {
446 GNUNET_break (GNUNET_NO ==
447 GNUNET_MY_extract_result (plugin->update_entry,
448 NULL));
449 }
450 cont (cont_cls,
451 ret,
452 NULL);
425} 453}
426 454
427 455
@@ -432,7 +460,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta,
432 * @param plugin the plugin handle 460 * @param plugin the plugin handle
433 * @param stmt select statement to run 461 * @param stmt select statement to run
434 * @param proc function to call on result 462 * @param proc function to call on result
435 * @param proc_cls closure for proc 463 * @param proc_cls closure for @a proc
436 * @param params_select arguments to initialize stmt 464 * @param params_select arguments to initialize stmt
437 */ 465 */
438static void 466static void
@@ -474,7 +502,7 @@ execute_select (struct Plugin *plugin,
474 502
475 ret = GNUNET_MY_extract_result (stmt, 503 ret = GNUNET_MY_extract_result (stmt,
476 results_select); 504 results_select);
477 if (ret <= 0) 505 if (GNUNET_OK != ret)
478 { 506 {
479 proc (proc_cls, 507 proc (proc_cls,
480 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 508 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
@@ -489,6 +517,9 @@ execute_select (struct Plugin *plugin,
489 (unsigned int) anonymity, 517 (unsigned int) anonymity,
490 GNUNET_STRINGS_absolute_time_to_string (expiration)); 518 GNUNET_STRINGS_absolute_time_to_string (expiration));
491 GNUNET_assert (value_size < MAX_DATUM_SIZE); 519 GNUNET_assert (value_size < MAX_DATUM_SIZE);
520 GNUNET_break (GNUNET_NO ==
521 GNUNET_MY_extract_result (stmt,
522 NULL));
492 ret = proc (proc_cls, 523 ret = proc (proc_cls,
493 &key, 524 &key,
494 value_size, 525 value_size,
@@ -498,7 +529,8 @@ execute_select (struct Plugin *plugin,
498 anonymity, 529 anonymity,
499 expiration, 530 expiration,
500 uid); 531 uid);
501 if (ret == GNUNET_NO) 532 GNUNET_MY_cleanup_result (results_select);
533 if (GNUNET_NO == ret)
502 { 534 {
503 do_delete_entry (plugin, uid); 535 do_delete_entry (plugin, uid);
504 if (0 != value_size) 536 if (0 != value_size)
@@ -538,16 +570,15 @@ mysql_plugin_get_key (void *cls,
538 struct Plugin *plugin = cls; 570 struct Plugin *plugin = cls;
539 int ret; 571 int ret;
540 uint64_t total; 572 uint64_t total;
541
542 total = -1;
543 struct GNUNET_MY_ResultSpec results_get[] = { 573 struct GNUNET_MY_ResultSpec results_get[] = {
544 GNUNET_MY_result_spec_uint64 (&total), 574 GNUNET_MY_result_spec_uint64 (&total),
545 GNUNET_MY_result_spec_end 575 GNUNET_MY_result_spec_end
546 }; 576 };
547 577
548 if (type != 0) 578 total = UINT64_MAX;
579 if (0 != type)
549 { 580 {
550 if (vhash != NULL) 581 if (NULL != vhash)
551 { 582 {
552 struct GNUNET_MY_QueryParam params_get[] = { 583 struct GNUNET_MY_QueryParam params_get[] = {
553 GNUNET_MY_query_param_auto_from_type (key), 584 GNUNET_MY_query_param_auto_from_type (key),
@@ -560,9 +591,15 @@ mysql_plugin_get_key (void *cls,
560 GNUNET_MY_exec_prepared (plugin->mc, 591 GNUNET_MY_exec_prepared (plugin->mc,
561 plugin->count_entry_by_hash_vhash_and_type, 592 plugin->count_entry_by_hash_vhash_and_type,
562 params_get); 593 params_get);
563 ret = 594 GNUNET_break (GNUNET_OK == ret);
564 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type, 595 if (GNUNET_OK == ret)
565 results_get); 596 ret =
597 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
598 results_get);
599 if (GNUNET_OK == ret)
600 GNUNET_break (GNUNET_NO ==
601 GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
602 NULL));
566 } 603 }
567 else 604 else
568 { 605 {
@@ -576,14 +613,20 @@ mysql_plugin_get_key (void *cls,
576 GNUNET_MY_exec_prepared (plugin->mc, 613 GNUNET_MY_exec_prepared (plugin->mc,
577 plugin->count_entry_by_hash_and_type, 614 plugin->count_entry_by_hash_and_type,
578 params_get); 615 params_get);
579 ret = 616 GNUNET_break (GNUNET_OK == ret);
580 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type, 617 if (GNUNET_OK == ret)
581 results_get); 618 ret =
619 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
620 results_get);
621 if (GNUNET_OK == ret)
622 GNUNET_break (GNUNET_NO ==
623 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
624 NULL));
582 } 625 }
583 } 626 }
584 else 627 else
585 { 628 {
586 if (vhash != NULL) 629 if (NULL != vhash)
587 { 630 {
588 struct GNUNET_MY_QueryParam params_get[] = { 631 struct GNUNET_MY_QueryParam params_get[] = {
589 GNUNET_MY_query_param_auto_from_type (key), 632 GNUNET_MY_query_param_auto_from_type (key),
@@ -595,9 +638,15 @@ mysql_plugin_get_key (void *cls,
595 GNUNET_MY_exec_prepared (plugin->mc, 638 GNUNET_MY_exec_prepared (plugin->mc,
596 plugin->count_entry_by_hash_and_vhash, 639 plugin->count_entry_by_hash_and_vhash,
597 params_get); 640 params_get);
598 ret = 641 GNUNET_break (GNUNET_OK == ret);
599 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash, 642 if (GNUNET_OK == ret)
600 results_get); 643 ret =
644 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
645 results_get);
646 if (GNUNET_OK == ret)
647 GNUNET_break (GNUNET_NO ==
648 GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
649 NULL));
601 } 650 }
602 else 651 else
603 { 652 {
@@ -610,12 +659,19 @@ mysql_plugin_get_key (void *cls,
610 GNUNET_MY_exec_prepared (plugin->mc, 659 GNUNET_MY_exec_prepared (plugin->mc,
611 plugin->count_entry_by_hash, 660 plugin->count_entry_by_hash,
612 params_get); 661 params_get);
613 ret = 662 GNUNET_break (GNUNET_OK == ret);
614 GNUNET_MY_extract_result (plugin->count_entry_by_hash, 663 if (GNUNET_OK == ret)
615 results_get); 664 ret =
665 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
666 results_get);
667 if (GNUNET_OK == ret)
668 GNUNET_break (GNUNET_NO ==
669 GNUNET_MY_extract_result (plugin->count_entry_by_hash,
670 NULL));
616 } 671 }
617 } 672 }
618 if ((ret != GNUNET_OK) || (0 >= total)) 673 if ( (GNUNET_OK != ret) ||
674 (0 >= total) )
619 { 675 {
620 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 676 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
621 return; 677 return;
@@ -640,7 +696,8 @@ mysql_plugin_get_key (void *cls,
640 696
641 execute_select (plugin, 697 execute_select (plugin,
642 plugin->select_entry_by_hash_vhash_and_type, 698 plugin->select_entry_by_hash_vhash_and_type,
643 proc, proc_cls, 699 proc,
700 proc_cls,
644 params_select); 701 params_select);
645 } 702 }
646 else 703 else
@@ -654,7 +711,8 @@ mysql_plugin_get_key (void *cls,
654 711
655 execute_select (plugin, 712 execute_select (plugin,
656 plugin->select_entry_by_hash_and_type, 713 plugin->select_entry_by_hash_and_type,
657 proc, proc_cls, 714 proc,
715 proc_cls,
658 params_select); 716 params_select);
659 } 717 }
660 } 718 }
@@ -671,7 +729,8 @@ mysql_plugin_get_key (void *cls,
671 729
672 execute_select (plugin, 730 execute_select (plugin,
673 plugin->select_entry_by_hash_and_vhash, 731 plugin->select_entry_by_hash_and_vhash,
674 proc, proc_cls, 732 proc,
733 proc_cls,
675 params_select); 734 params_select);
676 } 735 }
677 else 736 else
@@ -684,28 +743,31 @@ mysql_plugin_get_key (void *cls,
684 743
685 execute_select (plugin, 744 execute_select (plugin,
686 plugin->select_entry_by_hash, 745 plugin->select_entry_by_hash,
687 proc, proc_cls, 746 proc,
747 proc_cls,
688 params_select); 748 params_select);
689 } 749 }
690 } 750 }
691 751
692} 752}
693 753
694 754
695/** 755/**
696 * Get a zero-anonymity datum from the datastore. 756 * Get a zero-anonymity datum from the datastore.
697 * 757 *
698 * @param cls our "struct Plugin*" 758 * @param cls our `struct Plugin *`
699 * @param offset offset of the result 759 * @param offset offset of the result
700 * @param type entries of which type should be considered? 760 * @param type entries of which type should be considered?
701 * Use 0 for any type. 761 * Use 0 for any type.
702 * @param proc function to call on a matching value or NULL 762 * @param proc function to call on a matching value or NULL
703 * @param proc_cls closure for iter 763 * @param proc_cls closure for @a proc
704 */ 764 */
705static void 765static void
706mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset, 766mysql_plugin_get_zero_anonymity (void *cls,
767 uint64_t offset,
707 enum GNUNET_BLOCK_Type type, 768 enum GNUNET_BLOCK_Type type,
708 PluginDatumProcessor proc, void *proc_cls) 769 PluginDatumProcessor proc,
770 void *proc_cls)
709{ 771{
710 struct Plugin *plugin = cls; 772 struct Plugin *plugin = cls;
711 uint32_t typei = (uint32_t) type; 773 uint32_t typei = (uint32_t) type;
@@ -719,8 +781,10 @@ mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
719 GNUNET_MY_query_param_end 781 GNUNET_MY_query_param_end
720 }; 782 };
721 783
722 execute_select (plugin, plugin->zero_iter, 784 execute_select (plugin,
723 proc, proc_cls, 785 plugin->zero_iter,
786 proc,
787 proc_cls,
724 params_zero_iter); 788 params_zero_iter);
725} 789}
726 790
@@ -749,13 +813,13 @@ struct ReplCtx
749 813
750 814
751/** 815/**
752 * Wrapper for the processor for 'mysql_plugin_get_replication'. 816 * Wrapper for the processor for #mysql_plugin_get_replication().
753 * Decrements the replication counter and calls the original 817 * Decrements the replication counter and calls the original
754 * iterator. 818 * iterator.
755 * 819 *
756 * @param cls closure 820 * @param cls closure
757 * @param key key for the content 821 * @param key key for the content
758 * @param size number of bytes in data 822 * @param size number of bytes in @a data
759 * @param data content stored 823 * @param data content stored
760 * @param type type of the content 824 * @param type type of the content
761 * @param priority priority of the content 825 * @param priority priority of the content
@@ -763,19 +827,18 @@ struct ReplCtx
763 * @param expiration expiration time for the content 827 * @param expiration expiration time for the content
764 * @param uid unique identifier for the datum; 828 * @param uid unique identifier for the datum;
765 * maybe 0 if no unique identifier is available 829 * maybe 0 if no unique identifier is available
766 * 830 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
767 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
768 * (continue on call to "next", of course), 831 * (continue on call to "next", of course),
769 * GNUNET_NO to delete the item and continue (if supported) 832 * #GNUNET_NO to delete the item and continue (if supported)
770 */ 833 */
771static int 834static int
772repl_proc (void *cls, 835repl_proc (void *cls,
773 const struct GNUNET_HashCode * key, 836 const struct GNUNET_HashCode *key,
774 uint32_t size, 837 uint32_t size,
775 const void *data, 838 const void *data,
776 enum GNUNET_BLOCK_Type type, 839 enum GNUNET_BLOCK_Type type,
777 uint32_t priority, 840 uint32_t priority,
778 uint32_t anonymity, 841 uint32_t anonymity,
779 struct GNUNET_TIME_Absolute expiration, 842 struct GNUNET_TIME_Absolute expiration,
780 uint64_t uid) 843 uint64_t uid)
781{ 844{
@@ -784,21 +847,26 @@ repl_proc (void *cls,
784 int ret; 847 int ret;
785 int iret; 848 int iret;
786 849
787 ret = 850 ret = rc->proc (rc->proc_cls,
788 rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, 851 key,
789 expiration, uid); 852 size,
853 data,
854 type,
855 priority,
856 anonymity,
857 expiration,
858 uid);
790 if (NULL != key) 859 if (NULL != key)
791 { 860 {
792 struct GNUNET_MY_QueryParam params_proc[] = { 861 struct GNUNET_MY_QueryParam params_proc[] = {
793 GNUNET_MY_query_param_uint64 (&uid), 862 GNUNET_MY_query_param_uint64 (&uid),
794 GNUNET_MY_query_param_end 863 GNUNET_MY_query_param_end
795 }; 864 };
796 865
797 iret = 866 iret = GNUNET_MY_exec_prepared (plugin->mc,
798 GNUNET_MY_exec_prepared (plugin->mc, 867 plugin->dec_repl,
799 plugin->dec_repl, 868 params_proc);
800 params_proc); 869 if (GNUNET_SYSERR == iret)
801 if (iret == GNUNET_SYSERR)
802 { 870 {
803 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 871 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
804 "Failed to reduce replication counter\n"); 872 "Failed to reduce replication counter\n");
@@ -813,35 +881,29 @@ repl_proc (void *cls,
813 * Get a random item for replication. Returns a single, not expired, 881 * Get a random item for replication. Returns a single, not expired,
814 * random item from those with the highest replication counters. The 882 * random item from those with the highest replication counters. The
815 * item's replication counter is decremented by one IF it was positive 883 * item's replication counter is decremented by one IF it was positive
816 * before. Call 'proc' with all values ZERO or NULL if the datastore 884 * before. Call @a proc with all values ZERO or NULL if the datastore
817 * is empty. 885 * is empty.
818 * 886 *
819 * @param cls closure 887 * @param cls closure
820 * @param proc function to call the value (once only). 888 * @param proc function to call the value (once only).
821 * @param proc_cls closure for proc 889 * @param proc_cls closure for @a proc
822 */ 890 */
823static void 891static void
824mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc, 892mysql_plugin_get_replication (void *cls,
893 PluginDatumProcessor proc,
825 void *proc_cls) 894 void *proc_cls)
826{ 895{
827 struct Plugin *plugin = cls; 896 struct Plugin *plugin = cls;
828 uint64_t rvalue; 897 uint64_t rvalue;
829 uint32_t repl; 898 uint32_t repl;
830
831 struct ReplCtx rc; 899 struct ReplCtx rc;
832 rc.plugin = plugin;
833 rc.proc = proc;
834 rc.proc_cls = proc_cls;
835
836 struct GNUNET_MY_QueryParam params_get[] = { 900 struct GNUNET_MY_QueryParam params_get[] = {
837 GNUNET_MY_query_param_end 901 GNUNET_MY_query_param_end
838 }; 902 };
839
840 struct GNUNET_MY_ResultSpec results_get[] = { 903 struct GNUNET_MY_ResultSpec results_get[] = {
841 GNUNET_MY_result_spec_uint32 (&repl), 904 GNUNET_MY_result_spec_uint32 (&repl),
842 GNUNET_MY_result_spec_end 905 GNUNET_MY_result_spec_end
843 }; 906 };
844
845 struct GNUNET_MY_QueryParam params_select[] = { 907 struct GNUNET_MY_QueryParam params_select[] = {
846 GNUNET_MY_query_param_uint32 (&repl), 908 GNUNET_MY_query_param_uint32 (&repl),
847 GNUNET_MY_query_param_uint64 (&rvalue), 909 GNUNET_MY_query_param_uint64 (&rvalue),
@@ -850,27 +912,36 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
850 GNUNET_MY_query_param_end 912 GNUNET_MY_query_param_end
851 }; 913 };
852 914
915 rc.plugin = plugin;
916 rc.proc = proc;
917 rc.proc_cls = proc_cls;
918
853 if (1 != 919 if (1 !=
854 GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get)) 920 GNUNET_MY_exec_prepared (plugin->mc,
921 plugin->max_repl,
922 params_get))
855 { 923 {
856 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 924 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
857 return; 925 return;
858 } 926 }
859 927
860 if (1 != 928 if (GNUNET_OK !=
861 GNUNET_MY_extract_result (plugin->max_repl, results_get)) 929 GNUNET_MY_extract_result (plugin->max_repl,
930 results_get))
862 { 931 {
863 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 932 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
864 return; 933 return;
865 } 934 }
935 GNUNET_break (GNUNET_NO ==
936 GNUNET_MY_extract_result (plugin->max_repl,
937 NULL));
938 rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
939 UINT64_MAX);
866 940
867 rvalue = 941 execute_select (plugin,
868 (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
869 UINT64_MAX);
870
871 execute_select (plugin,
872 plugin->select_replication, 942 plugin->select_replication,
873 &repl_proc, &rc, 943 &repl_proc,
944 &rc,
874 params_select); 945 params_select);
875} 946}
876 947
@@ -880,69 +951,91 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
880 * 951 *
881 * @param cls closure 952 * @param cls closure
882 * @param proc function to call on each key 953 * @param proc function to call on each key
883 * @param proc_cls closure for proc 954 * @param proc_cls closure for @a proc
884 */ 955 */
885static void 956static void
886mysql_plugin_get_keys (void *cls, 957mysql_plugin_get_keys (void *cls,
887 PluginKeyProcessor proc, 958 PluginKeyProcessor proc,
888 void *proc_cls) 959 void *proc_cls)
889{ 960{
890 struct Plugin *plugin = cls; 961 struct Plugin *plugin = cls;
891 char *query = "SELECT hash FROM gn090";
892 int ret; 962 int ret;
893 MYSQL_STMT *statement; 963 MYSQL_STMT *statement;
894 struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL; 964 unsigned int cnt;
895
896
897 struct GNUNET_HashCode key; 965 struct GNUNET_HashCode key;
898 966 struct GNUNET_HashCode last;
899 statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
900
901 statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc,
902 query);
903 GNUNET_assert (proc != NULL);
904
905 struct GNUNET_MY_QueryParam params_select[] = { 967 struct GNUNET_MY_QueryParam params_select[] = {
906 GNUNET_MY_query_param_end 968 GNUNET_MY_query_param_end
907 }; 969 };
908
909 struct GNUNET_MY_ResultSpec results_select[] = { 970 struct GNUNET_MY_ResultSpec results_select[] = {
910 GNUNET_MY_result_spec_auto_from_type (&key), 971 GNUNET_MY_result_spec_auto_from_type (&key),
911 GNUNET_MY_result_spec_end 972 GNUNET_MY_result_spec_end
912 }; 973 };
913 974
914 if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, 975 GNUNET_assert (NULL != proc);
915 statements_handle_select, 976 statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
916 params_select)) 977 if (GNUNET_OK !=
978 GNUNET_MY_exec_prepared (plugin->mc,
979 plugin->get_all_keys,
980 params_select))
917 { 981 {
918 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 982 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
919 _("`%s' for `%s' failed at %s:%d with error: %s\n"), 983 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
920 "mysql_stmt_execute", query, __FILE__, __LINE__, 984 "mysql_stmt_execute",
985 GET_ALL_KEYS,
986 __FILE__,
987 __LINE__,
921 mysql_stmt_error (statement)); 988 mysql_stmt_error (statement));
922 GNUNET_MYSQL_statements_invalidate (plugin->mc); 989 GNUNET_MYSQL_statements_invalidate (plugin->mc);
923 proc (proc_cls, NULL, 0); 990 proc (proc_cls, NULL, 0);
924 return; 991 return;
925 } 992 }
926 993 ret = GNUNET_YES;
927 ret = GNUNET_MY_extract_result (statements_handle_select, 994 cnt = 0;
928 results_select); 995 while (ret == GNUNET_YES)
929 996 {
930 if (ret != MYSQL_NO_DATA) 997 ret = GNUNET_MY_extract_result (plugin->get_all_keys,
998 results_select);
999 if (0 != memcmp (&last,
1000 &key,
1001 sizeof (key)))
1002 {
1003 if (0 != cnt)
1004 proc (proc_cls,
1005 &last,
1006 cnt);
1007 cnt = 1;
1008 last = key;
1009 }
1010 else
1011 {
1012 cnt++;
1013 }
1014 }
1015 if (0 != cnt)
1016 proc (proc_cls,
1017 &last,
1018 cnt);
1019 /* finally, let app know we are done */
1020 proc (proc_cls,
1021 NULL,
1022 0);
1023 if (GNUNET_SYSERR == ret)
931 { 1024 {
932 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1025 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
933 _("`%s' failed at %s:%d with error: %s\n"), 1026 _("`%s' failed at %s:%d with error: %s\n"),
934 "mysql_stmt_fetch", __FILE__, __LINE__, 1027 "mysql_stmt_fetch",
935 mysql_stmt_error (statement)); 1028 __FILE__,
1029 __LINE__,
1030 mysql_stmt_error (statement));
936 GNUNET_MYSQL_statements_invalidate (plugin->mc); 1031 GNUNET_MYSQL_statements_invalidate (plugin->mc);
937 return; 1032 return;
938 } 1033 }
939
940 mysql_stmt_reset (statement);
941} 1034}
942 1035
943 1036
944/** 1037/**
945 * Context for 'expi_proc' function. 1038 * Context for #expi_proc() function.
946 */ 1039 */
947struct ExpiCtx 1040struct ExpiCtx
948{ 1041{
@@ -958,7 +1051,7 @@ struct ExpiCtx
958 PluginDatumProcessor proc; 1051 PluginDatumProcessor proc;
959 1052
960 /** 1053 /**
961 * Closure for proc. 1054 * Closure for @e proc.
962 */ 1055 */
963 void *proc_cls; 1056 void *proc_cls;
964}; 1057};
@@ -966,7 +1059,7 @@ struct ExpiCtx
966 1059
967 1060
968/** 1061/**
969 * Wrapper for the processor for 'mysql_plugin_get_expiration'. 1062 * Wrapper for the processor for #mysql_plugin_get_expiration().
970 * If no expired value was found, we do a second query for 1063 * If no expired value was found, we do a second query for
971 * low-priority content. 1064 * low-priority content.
972 * 1065 *
@@ -980,83 +1073,94 @@ struct ExpiCtx
980 * @param expiration expiration time for the content 1073 * @param expiration expiration time for the content
981 * @param uid unique identifier for the datum; 1074 * @param uid unique identifier for the datum;
982 * maybe 0 if no unique identifier is available 1075 * maybe 0 if no unique identifier is available
983 * 1076 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
984 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
985 * (continue on call to "next", of course), 1077 * (continue on call to "next", of course),
986 * GNUNET_NO to delete the item and continue (if supported) 1078 * #GNUNET_NO to delete the item and continue (if supported)
987 */ 1079 */
988static int 1080static int
989expi_proc (void *cls, 1081expi_proc (void *cls,
990 const struct GNUNET_HashCode * key, 1082 const struct GNUNET_HashCode *key,
991 uint32_t size, 1083 uint32_t size,
992 const void *data, 1084 const void *data,
993 enum GNUNET_BLOCK_Type type, 1085 enum GNUNET_BLOCK_Type type,
994 uint32_t priority, 1086 uint32_t priority,
995 uint32_t anonymity, 1087 uint32_t anonymity,
996 struct GNUNET_TIME_Absolute expiration, 1088 struct GNUNET_TIME_Absolute expiration,
997 uint64_t uid) 1089 uint64_t uid)
998{ 1090{
999 struct ExpiCtx *rc = cls; 1091 struct ExpiCtx *rc = cls;
1000 struct Plugin *plugin = rc->plugin; 1092 struct Plugin *plugin = rc->plugin;
1001
1002 struct GNUNET_MY_QueryParam params_select[] = { 1093 struct GNUNET_MY_QueryParam params_select[] = {
1003 GNUNET_MY_query_param_end 1094 GNUNET_MY_query_param_end
1004 }; 1095 };
1005 1096
1006 if (NULL == key) 1097 if (NULL == key)
1007 { 1098 {
1008 execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls, 1099 execute_select (plugin,
1100 plugin->select_priority,
1101 rc->proc,
1102 rc->proc_cls,
1009 params_select); 1103 params_select);
1010 return GNUNET_SYSERR; 1104 return GNUNET_SYSERR;
1011 } 1105 }
1012 return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, 1106 return rc->proc (rc->proc_cls,
1013 expiration, uid); 1107 key,
1108 size,
1109 data,
1110 type,
1111 priority,
1112 anonymity,
1113 expiration,
1114 uid);
1014} 1115}
1015 1116
1016 1117
1017/** 1118/**
1018 * Get a random item for expiration. 1119 * Get a random item for expiration.
1019 * Call 'proc' with all values ZERO or NULL if the datastore is empty. 1120 * Call @a proc with all values ZERO or NULL if the datastore is empty.
1020 * 1121 *
1021 * @param cls closure 1122 * @param cls closure
1022 * @param proc function to call the value (once only). 1123 * @param proc function to call the value (once only).
1023 * @param proc_cls closure for proc 1124 * @param proc_cls closure for @a proc
1024 */ 1125 */
1025static void 1126static void
1026mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc, 1127mysql_plugin_get_expiration (void *cls,
1128 PluginDatumProcessor proc,
1027 void *proc_cls) 1129 void *proc_cls)
1028{ 1130{
1029 struct Plugin *plugin = cls; 1131 struct Plugin *plugin = cls;
1030 uint64_t nt; 1132 struct GNUNET_TIME_Absolute now;
1133 struct GNUNET_MY_QueryParam params_select[] = {
1134 GNUNET_MY_query_param_absolute_time (&now),
1135 GNUNET_MY_query_param_end
1136 };
1031 struct ExpiCtx rc; 1137 struct ExpiCtx rc;
1032 1138
1033 rc.plugin = plugin; 1139 rc.plugin = plugin;
1034 rc.proc = proc; 1140 rc.proc = proc;
1035 rc.proc_cls = proc_cls; 1141 rc.proc_cls = proc_cls;
1036 nt = GNUNET_TIME_absolute_get ().abs_value_us; 1142 now = GNUNET_TIME_absolute_get ();
1037 1143 execute_select (plugin,
1038 struct GNUNET_MY_QueryParam params_select[] = { 1144 plugin->select_expiration,
1039 GNUNET_MY_query_param_uint64 (&nt), 1145 expi_proc,
1040 GNUNET_MY_query_param_end 1146 &rc,
1041 };
1042
1043 execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
1044 params_select); 1147 params_select);
1045
1046} 1148}
1047 1149
1048 1150
1049/** 1151/**
1050 * Drop database. 1152 * Drop database.
1051 * 1153 *
1052 * @param cls the "struct Plugin*" 1154 * @param cls the `struct Plugin *`
1053 */ 1155 */
1054static void 1156static void
1055mysql_plugin_drop (void *cls) 1157mysql_plugin_drop (void *cls)
1056{ 1158{
1057 struct Plugin *plugin = cls; 1159 struct Plugin *plugin = cls;
1058 1160
1059 if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090")) 1161 if (GNUNET_OK !=
1162 GNUNET_MYSQL_statement_run (plugin->mc,
1163 "DROP TABLE gn090"))
1060 return; /* error */ 1164 return; /* error */
1061 plugin->env->duc (plugin->env->cls, 0); 1165 plugin->env->duc (plugin->env->cls, 0);
1062} 1166}
@@ -1065,8 +1169,8 @@ mysql_plugin_drop (void *cls)
1065/** 1169/**
1066 * Entry point for the plugin. 1170 * Entry point for the plugin.
1067 * 1171 *
1068 * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*" 1172 * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
1069 * @return our "struct Plugin*" 1173 * @return our `struct Plugin *`
1070 */ 1174 */
1071void * 1175void *
1072libgnunet_plugin_datastore_mysql_init (void *cls) 1176libgnunet_plugin_datastore_mysql_init (void *cls)
@@ -1077,7 +1181,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1077 1181
1078 plugin = GNUNET_new (struct Plugin); 1182 plugin = GNUNET_new (struct Plugin);
1079 plugin->env = env; 1183 plugin->env = env;
1080 plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql"); 1184 plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
1185 "datastore-mysql");
1081 if (NULL == plugin->mc) 1186 if (NULL == plugin->mc)
1082 { 1187 {
1083 GNUNET_free (plugin); 1188 GNUNET_free (plugin);
@@ -1155,7 +1260,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
1155 1260
1156/** 1261/**
1157 * Exit point from the plugin. 1262 * Exit point from the plugin.
1158 * @param cls our "struct Plugin*" 1263 *
1264 * @param cls our `struct Plugin *`
1159 * @return always NULL 1265 * @return always NULL
1160 */ 1266 */
1161void * 1267void *
diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c
index 7d4565de6..6ebfee01e 100644
--- a/src/datastore/test_datastore_api.c
+++ b/src/datastore/test_datastore_api.c
@@ -412,7 +412,7 @@ run_continuation (void *cls)
412 GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i), 412 GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i),
413 get_data (crc->i), get_type (crc->i), 413 get_data (crc->i), get_type (crc->i),
414 get_priority (crc->i), get_anonymity (crc->i), 0, 414 get_priority (crc->i), get_anonymity (crc->i), 0,
415 get_expiration (crc->i), 1, 1, TIMEOUT, 415 get_expiration (crc->i), 1, 1,
416 &check_success, crc); 416 &check_success, crc);
417 crc->i++; 417 crc->i++;
418 if (crc->i == ITERATIONS) 418 if (crc->i == ITERATIONS)
@@ -423,10 +423,17 @@ run_continuation (void *cls)
423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424 "Executing GET number %u\n", 424 "Executing GET number %u\n",
425 crc->i); 425 crc->i);
426 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 426 GNUNET_CRYPTO_hash (&crc->i,
427 GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, 427 sizeof (int),
428 get_type (crc->i), 1, 1, TIMEOUT, 428 &crc->key);
429 &check_value, crc); 429 GNUNET_DATASTORE_get_key (datastore,
430 crc->offset,
431 &crc->key,
432 get_type (crc->i),
433 1,
434 1,
435 &check_value,
436 crc);
430 break; 437 break;
431 case RP_DEL: 438 case RP_DEL:
432 crc->i--; 439 crc->i--;
@@ -436,9 +443,14 @@ run_continuation (void *cls)
436 crc->data = NULL; 443 crc->data = NULL;
437 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 444 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
438 GNUNET_assert (NULL != 445 GNUNET_assert (NULL !=
439 GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, 446 GNUNET_DATASTORE_get_key (datastore,
440 get_type (crc->i), 1, 1, TIMEOUT, 447 crc->offset,
441 &delete_value, crc)); 448 &crc->key,
449 get_type (crc->i),
450 1,
451 1,
452 &delete_value,
453 crc));
442 break; 454 break;
443 case RP_DO_DEL: 455 case RP_DO_DEL:
444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -455,7 +467,7 @@ run_continuation (void *cls)
455 } 467 }
456 GNUNET_assert (NULL != 468 GNUNET_assert (NULL !=
457 GNUNET_DATASTORE_remove (datastore, &crc->key, crc->size, 469 GNUNET_DATASTORE_remove (datastore, &crc->key, crc->size,
458 crc->data, 1, 1, TIMEOUT, 470 crc->data, 1, 1,
459 &check_success, crc)); 471 &check_success, crc));
460 break; 472 break;
461 case RP_DELVALIDATE: 473 case RP_DELVALIDATE:
@@ -466,7 +478,7 @@ run_continuation (void *cls)
466 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 478 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
467 GNUNET_assert (NULL != 479 GNUNET_assert (NULL !=
468 GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, 480 GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key,
469 get_type (crc->i), 1, 1, TIMEOUT, 481 get_type (crc->i), 1, 1,
470 &check_nothing, crc)); 482 &check_nothing, crc));
471 break; 483 break;
472 case RP_RESERVE: 484 case RP_RESERVE:
@@ -479,7 +491,7 @@ run_continuation (void *cls)
479 GNUNET_DATASTORE_put (datastore, crc->rid, &crc->key, get_size (42), 491 GNUNET_DATASTORE_put (datastore, crc->rid, &crc->key, get_size (42),
480 get_data (42), get_type (42), get_priority (42), 492 get_data (42), get_type (42), get_priority (42),
481 get_anonymity (42), 0, get_expiration (42), 1, 1, 493 get_anonymity (42), 0, get_expiration (42), 1, 1,
482 TIMEOUT, &check_success, crc); 494 &check_success, crc);
483 break; 495 break;
484 case RP_PUT_MULTIPLE_NEXT: 496 case RP_PUT_MULTIPLE_NEXT:
485 crc->phase = RP_GET_MULTIPLE; 497 crc->phase = RP_GET_MULTIPLE;
@@ -493,7 +505,6 @@ run_continuation (void *cls)
493 0, 505 0,
494 get_expiration (43), 506 get_expiration (43),
495 1, 1, 507 1, 1,
496 TIMEOUT,
497 &check_success, crc); 508 &check_success, crc);
498 break; 509 break;
499 case RP_GET_MULTIPLE: 510 case RP_GET_MULTIPLE:
@@ -502,7 +513,6 @@ run_continuation (void *cls)
502 crc->offset, 513 crc->offset,
503 &crc->key, 514 &crc->key,
504 get_type (42), 1, 1, 515 get_type (42), 1, 1,
505 TIMEOUT,
506 &check_multiple, crc)); 516 &check_multiple, crc));
507 break; 517 break;
508 case RP_GET_MULTIPLE_NEXT: 518 case RP_GET_MULTIPLE_NEXT:
@@ -512,7 +522,6 @@ run_continuation (void *cls)
512 &crc->key, 522 &crc->key,
513 get_type (42), 523 get_type (42),
514 1, 1, 524 1, 1,
515 TIMEOUT,
516 &check_multiple, crc)); 525 &check_multiple, crc));
517 break; 526 break;
518 case RP_UPDATE: 527 case RP_UPDATE:
@@ -521,7 +530,7 @@ run_continuation (void *cls)
521 GNUNET_DATASTORE_update (datastore, 530 GNUNET_DATASTORE_update (datastore,
522 crc->uid, 100, 531 crc->uid, 100,
523 get_expiration (42), 1, 532 get_expiration (42), 1,
524 1, TIMEOUT, 533 1,
525 &check_success, crc); 534 &check_success, crc);
526 break; 535 break;
527 case RP_UPDATE_VALIDATE: 536 case RP_UPDATE_VALIDATE:
@@ -531,7 +540,6 @@ run_continuation (void *cls)
531 &crc->key, 540 &crc->key,
532 get_type (42), 541 get_type (42),
533 1, 1, 542 1, 1,
534 TIMEOUT,
535 &check_update, crc)); 543 &check_update, crc));
536 break; 544 break;
537 case RP_DONE: 545 case RP_DONE:
@@ -631,7 +639,6 @@ run (void *cls,
631 GNUNET_TIME_relative_to_absolute 639 GNUNET_TIME_relative_to_absolute
632 (GNUNET_TIME_UNIT_SECONDS), 640 (GNUNET_TIME_UNIT_SECONDS),
633 0, 1, 641 0, 1,
634 TIMEOUT,
635 &run_tests, crc)) 642 &run_tests, crc))
636 { 643 {
637 FPRINTF (stderr, 644 FPRINTF (stderr,
diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c
index c9fec79e3..954e61bec 100644
--- a/src/datastore/test_datastore_api_management.c
+++ b/src/datastore/test_datastore_api_management.c
@@ -193,10 +193,18 @@ run_continuation (void *cls)
193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "PUT", 193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "PUT",
194 crc->i); 194 crc->i);
195 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 195 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
196 GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i), 196 GNUNET_DATASTORE_put (datastore,
197 get_data (crc->i), get_type (crc->i), 197 0,
198 get_priority (crc->i), get_anonymity (crc->i), 0, 198 &crc->key,
199 get_expiration (crc->i), 1, 1, TIMEOUT, 199 get_size (crc->i),
200 get_data (crc->i),
201 get_type (crc->i),
202 get_priority (crc->i),
203 get_anonymity (crc->i),
204 0,
205 get_expiration (crc->i),
206 1,
207 1,
200 &check_success, crc); 208 &check_success, crc);
201 crc->i++; 209 crc->i++;
202 if (crc->i == ITERATIONS) 210 if (crc->i == ITERATIONS)
@@ -213,7 +221,8 @@ run_continuation (void *cls)
213 crc->i); 221 crc->i);
214 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 222 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
215 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, 223 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
216 get_type (crc->i), 1, 1, TIMEOUT, &check_value, 224 get_type (crc->i), 1, 1,
225 &check_value,
217 crc); 226 crc);
218 break; 227 break;
219 case RP_GET_FAIL: 228 case RP_GET_FAIL:
@@ -221,7 +230,8 @@ run_continuation (void *cls)
221 crc->i); 230 crc->i);
222 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); 231 GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
223 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, 232 GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
224 get_type (crc->i), 1, 1, TIMEOUT, &check_nothing, 233 get_type (crc->i), 1, 1,
234 &check_nothing,
225 crc); 235 crc);
226 break; 236 break;
227 case RP_DONE: 237 case RP_DONE:
@@ -266,11 +276,18 @@ run (void *cls,
266 now = GNUNET_TIME_absolute_get (); 276 now = GNUNET_TIME_absolute_get ();
267 datastore = GNUNET_DATASTORE_connect (cfg); 277 datastore = GNUNET_DATASTORE_connect (cfg);
268 if (NULL == 278 if (NULL ==
269 GNUNET_DATASTORE_put (datastore, 0, &zkey, 4, "TEST", 279 GNUNET_DATASTORE_put (datastore,
270 GNUNET_BLOCK_TYPE_TEST, 0, 0, 0, 280 0,
271 GNUNET_TIME_relative_to_absolute 281 &zkey,
272 (GNUNET_TIME_UNIT_SECONDS), 0, 1, 282 4,
273 GNUNET_TIME_UNIT_MINUTES, &run_tests, crc)) 283 "TEST",
284 GNUNET_BLOCK_TYPE_TEST,
285 0, 0, 0,
286 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS),
287 0,
288 1,
289 &run_tests,
290 crc))
274 { 291 {
275 FPRINTF (stderr, "%s", "Test 'put' operation failed.\n"); 292 FPRINTF (stderr, "%s", "Test 'put' operation failed.\n");
276 GNUNET_free (crc); 293 GNUNET_free (crc);
diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c
index 9ec0c53a2..9b85d57da 100644
--- a/src/datastore/test_plugin_datastore.c
+++ b/src/datastore/test_plugin_datastore.c
@@ -109,11 +109,14 @@ put_continuation (void *cls,
109 109
110 if (GNUNET_OK != status) 110 if (GNUNET_OK != status)
111 { 111 {
112 FPRINTF (stderr, "ERROR: `%s'\n", msg); 112 FPRINTF (stderr,
113 "ERROR: `%s'\n",
114 msg);
113 } 115 }
114 else 116 else
115 { 117 {
116 crc->api->estimate_size (crc->api->cls, &cs); 118 crc->api->estimate_size (crc->api->cls,
119 &cs);
117 GNUNET_assert (os <= cs); 120 GNUNET_assert (os <= cs);
118 os = cs; 121 os = cs;
119 stored_bytes += size; 122 stored_bytes += size;
@@ -184,22 +187,30 @@ static uint64_t guid;
184 187
185 188
186static int 189static int
187iterate_one_shot (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 190iterate_one_shot (void *cls,
188 const void *data, enum GNUNET_BLOCK_Type type, 191 const struct GNUNET_HashCode *key,
189 uint32_t priority, uint32_t anonymity, 192 uint32_t size,
190 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 193 const void *data,
194 enum GNUNET_BLOCK_Type type,
195 uint32_t priority,
196 uint32_t anonymity,
197 struct GNUNET_TIME_Absolute expiration,
198 uint64_t uid)
191{ 199{
192 struct CpsRunContext *crc = cls; 200 struct CpsRunContext *crc = cls;
193 201
194 GNUNET_assert (key != NULL); 202 GNUNET_assert (NULL != key);
195 guid = uid; 203 guid = uid;
196 crc->phase++; 204 crc->phase++;
197 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
198 "Found result type=%u, priority=%u, size=%u, expire=%s, key %s\n", 206 "Found result type=%u, priority=%u, size=%u, expire=%s, key %s\n",
199 type, priority, size, 207 (unsigned int) type,
208 (unsigned int) priority,
209 (unsigned int) size,
200 GNUNET_STRINGS_absolute_time_to_string (expiration), 210 GNUNET_STRINGS_absolute_time_to_string (expiration),
201 GNUNET_h2s (key)); 211 GNUNET_h2s (key));
202 GNUNET_SCHEDULER_add_now (&test, crc); 212 GNUNET_SCHEDULER_add_now (&test,
213 crc);
203 return GNUNET_OK; 214 return GNUNET_OK;
204} 215}
205 216
@@ -219,11 +230,14 @@ unload_plugin (struct GNUNET_DATASTORE_PluginFunctions *api,
219 char *libname; 230 char *libname;
220 231
221 if (GNUNET_OK != 232 if (GNUNET_OK !=
222 GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE", 233 GNUNET_CONFIGURATION_get_value_string (cfg,
234 "DATASTORE",
235 "DATABASE",
223 &name)) 236 &name))
224 { 237 {
225 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 238 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
226 _("No `%s' specified for `%s' in configuration!\n"), "DATABASE", 239 _("No `%s' specified for `%s' in configuration!\n"),
240 "DATABASE",
227 "DATASTORE"); 241 "DATASTORE");
228 return; 242 return;
229 } 243 }
@@ -290,8 +304,16 @@ test (void *cls)
290 break; 304 break;
291 } 305 }
292 gen_key (5, &key); 306 gen_key (5, &key);
293 crc->api->get_key (crc->api->cls, crc->offset++, &key, NULL, 307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
294 GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc); 308 "Looking for %s\n",
309 GNUNET_h2s (&key));
310 crc->api->get_key (crc->api->cls,
311 crc->offset++,
312 &key,
313 NULL,
314 GNUNET_BLOCK_TYPE_ANY,
315 &iterate_one_shot,
316 crc);
295 break; 317 break;
296 case RP_UPDATE: 318 case RP_UPDATE:
297 crc->api->update (crc->api->cls, 319 crc->api->update (crc->api->cls,
diff --git a/src/datastore/test_plugin_datastore_data_mysql.conf b/src/datastore/test_plugin_datastore_data_mysql.conf
index ac7a3cde1..18eda687e 100644
--- a/src/datastore/test_plugin_datastore_data_mysql.conf
+++ b/src/datastore/test_plugin_datastore_data_mysql.conf
@@ -6,5 +6,4 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-datastore-plugin-mysql/
6DATABASE = mysql 6DATABASE = mysql
7 7
8[datastore-mysql] 8[datastore-mysql]
9DATABASE = gnunet 9DATABASE = gnunetcheck
10