diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-24 20:17:39 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-24 20:17:39 +0000 |
commit | 3140154d46212e08e0d73ed891a66213a6813075 (patch) | |
tree | 018a55a3899207664b388fcf47a679ca54ca6fbf /src/datastore/datastore_api.c | |
parent | d5fd881c2a044474b54ddf03b6ab8be8d2b75927 (diff) | |
download | gnunet-3140154d46212e08e0d73ed891a66213a6813075.tar.gz gnunet-3140154d46212e08e0d73ed891a66213a6813075.zip |
refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql backend
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 1213 |
1 files changed, 579 insertions, 634 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index b2de3d35d..285634759 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -21,7 +21,7 @@ | |||
21 | /** | 21 | /** |
22 | * @file datastore/datastore_api.c | 22 | * @file datastore/datastore_api.c |
23 | * @brief Management for the datastore for files stored on a GNUnet node. Implements | 23 | * @brief Management for the datastore for files stored on a GNUnet node. Implements |
24 | * a priority queue for requests (with timeouts). | 24 | * a priority queue for requests |
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | */ | 26 | */ |
27 | #include "platform.h" | 27 | #include "platform.h" |
@@ -95,7 +95,6 @@ union QueueContext | |||
95 | }; | 95 | }; |
96 | 96 | ||
97 | 97 | ||
98 | |||
99 | /** | 98 | /** |
100 | * Entry in our priority queue. | 99 | * Entry in our priority queue. |
101 | */ | 100 | */ |
@@ -118,13 +117,6 @@ struct GNUNET_DATASTORE_QueueEntry | |||
118 | struct GNUNET_DATASTORE_Handle *h; | 117 | struct GNUNET_DATASTORE_Handle *h; |
119 | 118 | ||
120 | /** | 119 | /** |
121 | * Response processor (NULL if we are not waiting for a response). | ||
122 | * This struct should be used for the closure, function-specific | ||
123 | * arguments can be passed via 'qc'. | ||
124 | */ | ||
125 | GNUNET_CLIENT_MessageHandler response_proc; | ||
126 | |||
127 | /** | ||
128 | * Function to call after transmission of the request. | 120 | * Function to call after transmission of the request. |
129 | */ | 121 | */ |
130 | GNUNET_DATASTORE_ContinuationWithStatus cont; | 122 | GNUNET_DATASTORE_ContinuationWithStatus cont; |
@@ -140,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry | |||
140 | union QueueContext qc; | 132 | union QueueContext qc; |
141 | 133 | ||
142 | /** | 134 | /** |
143 | * Task for timeout signalling. | 135 | * Envelope of the request to transmit, NULL after |
136 | * transmission. | ||
144 | */ | 137 | */ |
145 | struct GNUNET_SCHEDULER_Task *task; | 138 | struct GNUNET_MQ_Envelope *env; |
146 | |||
147 | /** | ||
148 | * Timeout for the current operation. | ||
149 | */ | ||
150 | struct GNUNET_TIME_Absolute timeout; | ||
151 | 139 | ||
152 | /** | 140 | /** |
153 | * Priority in the queue. | 141 | * Priority in the queue. |
@@ -161,22 +149,13 @@ struct GNUNET_DATASTORE_QueueEntry | |||
161 | unsigned int max_queue; | 149 | unsigned int max_queue; |
162 | 150 | ||
163 | /** | 151 | /** |
164 | * Number of bytes in the request message following | 152 | * Expected response type. |
165 | * this struct. 32-bit value for nicer memory | ||
166 | * access (and overall struct alignment). | ||
167 | */ | 153 | */ |
168 | uint32_t message_size; | 154 | uint16_t response_type; |
169 | |||
170 | /** | ||
171 | * Has this message been transmitted to the service? | ||
172 | * Only ever #GNUNET_YES for the head of the queue. | ||
173 | * Note that the overall struct should end at a | ||
174 | * multiple of 64 bits. | ||
175 | */ | ||
176 | int was_transmitted; | ||
177 | 155 | ||
178 | }; | 156 | }; |
179 | 157 | ||
158 | |||
180 | /** | 159 | /** |
181 | * Handle to the datastore service. | 160 | * Handle to the datastore service. |
182 | */ | 161 | */ |
@@ -191,7 +170,7 @@ struct GNUNET_DATASTORE_Handle | |||
191 | /** | 170 | /** |
192 | * Current connection to the datastore service. | 171 | * Current connection to the datastore service. |
193 | */ | 172 | */ |
194 | struct GNUNET_CLIENT_Connection *client; | 173 | struct GNUNET_MQ_Handle *mq; |
195 | 174 | ||
196 | /** | 175 | /** |
197 | * Handle for statistics. | 176 | * Handle for statistics. |
@@ -199,11 +178,6 @@ struct GNUNET_DATASTORE_Handle | |||
199 | struct GNUNET_STATISTICS_Handle *stats; | 178 | struct GNUNET_STATISTICS_Handle *stats; |
200 | 179 | ||
201 | /** | 180 | /** |
202 | * Current transmit handle. | ||
203 | */ | ||
204 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
205 | |||
206 | /** | ||
207 | * Current head of priority queue. | 181 | * Current head of priority queue. |
208 | */ | 182 | */ |
209 | struct GNUNET_DATASTORE_QueueEntry *queue_head; | 183 | struct GNUNET_DATASTORE_QueueEntry *queue_head; |
@@ -216,7 +190,7 @@ struct GNUNET_DATASTORE_Handle | |||
216 | /** | 190 | /** |
217 | * Task for trying to reconnect. | 191 | * Task for trying to reconnect. |
218 | */ | 192 | */ |
219 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 193 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
220 | 194 | ||
221 | /** | 195 | /** |
222 | * How quickly should we retry? Used for exponential back-off on | 196 | * How quickly should we retry? Used for exponential back-off on |
@@ -237,11 +211,6 @@ struct GNUNET_DATASTORE_Handle | |||
237 | unsigned int result_count; | 211 | unsigned int result_count; |
238 | 212 | ||
239 | /** | 213 | /** |
240 | * Are we currently trying to receive from the service? | ||
241 | */ | ||
242 | int in_receive; | ||
243 | |||
244 | /** | ||
245 | * We should ignore the next message(s) from the service. | 214 | * We should ignore the next message(s) from the service. |
246 | */ | 215 | */ |
247 | unsigned int skip_next_messages; | 216 | unsigned int skip_next_messages; |
@@ -250,6 +219,110 @@ struct GNUNET_DATASTORE_Handle | |||
250 | 219 | ||
251 | 220 | ||
252 | /** | 221 | /** |
222 | * Try reconnecting to the datastore service. | ||
223 | * | ||
224 | * @param cls the `struct GNUNET_DATASTORE_Handle` | ||
225 | */ | ||
226 | static void | ||
227 | try_reconnect (void *cls); | ||
228 | |||
229 | |||
230 | /** | ||
231 | * Disconnect from the service and then try reconnecting to the datastore service | ||
232 | * after some delay. | ||
233 | * | ||
234 | * @param h handle to datastore to disconnect and reconnect | ||
235 | */ | ||
236 | static void | ||
237 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) | ||
238 | { | ||
239 | if (NULL == h->mq) | ||
240 | { | ||
241 | GNUNET_break (0); | ||
242 | return; | ||
243 | } | ||
244 | GNUNET_MQ_destroy (h->mq); | ||
245 | h->mq = NULL; | ||
246 | h->skip_next_messages = 0; | ||
247 | h->reconnect_task | ||
248 | = GNUNET_SCHEDULER_add_delayed (h->retry_time, | ||
249 | &try_reconnect, | ||
250 | h); | ||
251 | } | ||
252 | |||
253 | |||
254 | /** | ||
255 | * Free a queue entry. Removes the given entry from the | ||
256 | * queue and releases associated resources. Does NOT | ||
257 | * call the callback. | ||
258 | * | ||
259 | * @param qe entry to free. | ||
260 | */ | ||
261 | static void | ||
262 | free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | ||
263 | { | ||
264 | struct GNUNET_DATASTORE_Handle *h = qe->h; | ||
265 | |||
266 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
267 | h->queue_tail, | ||
268 | qe); | ||
269 | h->queue_size--; | ||
270 | GNUNET_free (qe); | ||
271 | } | ||
272 | |||
273 | |||
274 | /** | ||
275 | * Handle error in sending drop request to datastore. | ||
276 | * | ||
277 | * @param cls closure with the datastore handle | ||
278 | * @param error error code | ||
279 | */ | ||
280 | static void | ||
281 | mq_error_handler (void *cls, | ||
282 | enum GNUNET_MQ_Error error) | ||
283 | { | ||
284 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
285 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
286 | |||
287 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
288 | "MQ error, reconnecting to DATASTORE\n"); | ||
289 | do_disconnect (h); | ||
290 | qe = h->queue_head; | ||
291 | if ( (NULL != qe) && | ||
292 | (NULL == qe->env) ) | ||
293 | { | ||
294 | union QueueContext qc = qe->qc; | ||
295 | uint16_t rt = qe->response_type; | ||
296 | |||
297 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
298 | "Failed to receive response from database.\n"); | ||
299 | free_queue_entry (qe); | ||
300 | switch (rt) | ||
301 | { | ||
302 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | ||
303 | if (NULL != qc.sc.cont) | ||
304 | qc.sc.cont (qc.sc.cont_cls, | ||
305 | GNUNET_SYSERR, | ||
306 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
307 | _("DATASTORE disconnected")); | ||
308 | break; | ||
309 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | ||
310 | if (NULL != qc.rc.proc) | ||
311 | qc.rc.proc (qc.rc.proc_cls, | ||
312 | NULL, | ||
313 | 0, | ||
314 | NULL, 0, 0, 0, | ||
315 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
316 | 0); | ||
317 | break; | ||
318 | default: | ||
319 | GNUNET_break (0); | ||
320 | } | ||
321 | } | ||
322 | } | ||
323 | |||
324 | |||
325 | /** | ||
253 | * Connect to the datastore service. | 326 | * Connect to the datastore service. |
254 | * | 327 | * |
255 | * @param cfg configuration to use | 328 | * @param cfg configuration to use |
@@ -258,22 +331,27 @@ struct GNUNET_DATASTORE_Handle | |||
258 | struct GNUNET_DATASTORE_Handle * | 331 | struct GNUNET_DATASTORE_Handle * |
259 | GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | 332 | GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) |
260 | { | 333 | { |
261 | struct GNUNET_CLIENT_Connection *c; | ||
262 | struct GNUNET_DATASTORE_Handle *h; | 334 | struct GNUNET_DATASTORE_Handle *h; |
263 | 335 | ||
264 | c = GNUNET_CLIENT_connect ("datastore", cfg); | 336 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
265 | if (c == NULL) | 337 | "Establishing DATASTORE connection!\n"); |
266 | return NULL; /* oops */ | ||
267 | h = GNUNET_new (struct GNUNET_DATASTORE_Handle); | 338 | h = GNUNET_new (struct GNUNET_DATASTORE_Handle); |
268 | h->client = c; | ||
269 | h->cfg = cfg; | 339 | h->cfg = cfg; |
270 | h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); | 340 | try_reconnect (h); |
341 | if (NULL == h->mq) | ||
342 | { | ||
343 | GNUNET_free (h); | ||
344 | return NULL; | ||
345 | } | ||
346 | h->stats = GNUNET_STATISTICS_create ("datastore-api", | ||
347 | cfg); | ||
271 | return h; | 348 | return h; |
272 | } | 349 | } |
273 | 350 | ||
274 | 351 | ||
275 | /** | 352 | /** |
276 | * Task used by 'transmit_drop' to disconnect the datastore. | 353 | * Task used by to disconnect from the datastore after |
354 | * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. | ||
277 | * | 355 | * |
278 | * @param cls the datastore handle | 356 | * @param cls the datastore handle |
279 | */ | 357 | */ |
@@ -282,39 +360,29 @@ disconnect_after_drop (void *cls) | |||
282 | { | 360 | { |
283 | struct GNUNET_DATASTORE_Handle *h = cls; | 361 | struct GNUNET_DATASTORE_Handle *h = cls; |
284 | 362 | ||
363 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
364 | "Drop sent, disconnecting\n"); | ||
285 | GNUNET_DATASTORE_disconnect (h, | 365 | GNUNET_DATASTORE_disconnect (h, |
286 | GNUNET_NO); | 366 | GNUNET_NO); |
287 | } | 367 | } |
288 | 368 | ||
289 | 369 | ||
290 | /** | 370 | /** |
291 | * Transmit DROP message to datastore service. | 371 | * Handle error in sending drop request to datastore. |
292 | * | 372 | * |
293 | * @param cls the `struct GNUNET_DATASTORE_Handle` | 373 | * @param cls closure with the datastore handle |
294 | * @param size number of bytes that can be copied to @a buf | 374 | * @param error error code |
295 | * @param buf where to copy the drop message | ||
296 | * @return number of bytes written to @a buf | ||
297 | */ | 375 | */ |
298 | static size_t | 376 | static void |
299 | transmit_drop (void *cls, size_t size, void *buf) | 377 | disconnect_on_mq_error (void *cls, |
378 | enum GNUNET_MQ_Error error) | ||
300 | { | 379 | { |
301 | struct GNUNET_DATASTORE_Handle *h = cls; | 380 | struct GNUNET_DATASTORE_Handle *h = cls; |
302 | struct GNUNET_MessageHeader *hdr; | ||
303 | 381 | ||
304 | if (buf == NULL) | 382 | LOG (GNUNET_ERROR_TYPE_ERROR, |
305 | { | 383 | "Failed to ask datastore to drop tables\n"); |
306 | LOG (GNUNET_ERROR_TYPE_WARNING, | 384 | GNUNET_DATASTORE_disconnect (h, |
307 | _("Failed to transmit request to drop database.\n")); | 385 | GNUNET_NO); |
308 | GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h); | ||
309 | return 0; | ||
310 | } | ||
311 | GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); | ||
312 | hdr = buf; | ||
313 | hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
314 | hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); | ||
315 | GNUNET_SCHEDULER_add_now (&disconnect_after_drop, | ||
316 | h); | ||
317 | return sizeof (struct GNUNET_MessageHeader); | ||
318 | } | 386 | } |
319 | 387 | ||
320 | 388 | ||
@@ -333,15 +401,10 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
333 | 401 | ||
334 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 402 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
335 | "Datastore disconnect\n"); | 403 | "Datastore disconnect\n"); |
336 | if (NULL != h->th) | 404 | if (NULL != h->mq) |
337 | { | ||
338 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
339 | h->th = NULL; | ||
340 | } | ||
341 | if (NULL != h->client) | ||
342 | { | 405 | { |
343 | GNUNET_CLIENT_disconnect (h->client); | 406 | GNUNET_MQ_destroy (h->mq); |
344 | h->client = NULL; | 407 | h->mq = NULL; |
345 | } | 408 | } |
346 | if (NULL != h->reconnect_task) | 409 | if (NULL != h->reconnect_task) |
347 | { | 410 | { |
@@ -350,25 +413,52 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
350 | } | 413 | } |
351 | while (NULL != (qe = h->queue_head)) | 414 | while (NULL != (qe = h->queue_head)) |
352 | { | 415 | { |
353 | GNUNET_assert (NULL != qe->response_proc); | 416 | switch (qe->response_type) |
354 | qe->response_proc (h, NULL); | 417 | { |
418 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | ||
419 | if (NULL != qe->qc.sc.cont) | ||
420 | qe->qc.sc.cont (qe->qc.sc.cont_cls, | ||
421 | GNUNET_SYSERR, | ||
422 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
423 | _("Disconnected from DATASTORE")); | ||
424 | break; | ||
425 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | ||
426 | if (NULL != qe->qc.rc.proc) | ||
427 | qe->qc.rc.proc (qe->qc.rc.proc_cls, | ||
428 | NULL, | ||
429 | 0, | ||
430 | NULL, 0, 0, 0, | ||
431 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
432 | 0); | ||
433 | break; | ||
434 | default: | ||
435 | GNUNET_break (0); | ||
436 | } | ||
437 | free_queue_entry (qe); | ||
355 | } | 438 | } |
356 | if (GNUNET_YES == drop) | 439 | if (GNUNET_YES == drop) |
357 | { | 440 | { |
358 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); | 441 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
359 | if (NULL != h->client) | 442 | "Re-connecting to issue DROP!\n"); |
443 | GNUNET_assert (NULL == h->mq); | ||
444 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
445 | "datastore", | ||
446 | NULL, | ||
447 | &disconnect_on_mq_error, | ||
448 | h); | ||
449 | if (NULL != h->mq) | ||
360 | { | 450 | { |
361 | if (NULL != | 451 | struct GNUNET_MessageHeader *hdr; |
362 | GNUNET_CLIENT_notify_transmit_ready (h->client, | 452 | struct GNUNET_MQ_Envelope *env; |
363 | sizeof (struct | 453 | |
364 | GNUNET_MessageHeader), | 454 | env = GNUNET_MQ_msg (hdr, |
365 | GNUNET_TIME_UNIT_SECONDS, | 455 | GNUNET_MESSAGE_TYPE_DATASTORE_DROP); |
366 | GNUNET_YES, | 456 | GNUNET_MQ_notify_sent (env, |
367 | &transmit_drop, | 457 | &disconnect_after_drop, |
368 | h)) | 458 | h); |
369 | return; | 459 | GNUNET_MQ_send (h->mq, |
370 | GNUNET_CLIENT_disconnect (h->client); | 460 | env); |
371 | h->client = NULL; | 461 | return; |
372 | } | 462 | } |
373 | GNUNET_break (0); | 463 | GNUNET_break (0); |
374 | } | 464 | } |
@@ -380,67 +470,37 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
380 | 470 | ||
381 | 471 | ||
382 | /** | 472 | /** |
383 | * A request has timed out (before being transmitted to the service). | ||
384 | * | ||
385 | * @param cls the `struct GNUNET_DATASTORE_QueueEntry` | ||
386 | */ | ||
387 | static void | ||
388 | timeout_queue_entry (void *cls) | ||
389 | { | ||
390 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | ||
391 | struct GNUNET_DATASTORE_Handle *h = qe->h; | ||
392 | |||
393 | GNUNET_STATISTICS_update (h->stats, | ||
394 | gettext_noop ("# queue entry timeouts"), | ||
395 | 1, | ||
396 | GNUNET_NO); | ||
397 | qe->task = NULL; | ||
398 | GNUNET_assert (GNUNET_NO == qe->was_transmitted); | ||
399 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
400 | "Timeout of request in datastore queue\n"); | ||
401 | /* response_proc's expect request at the head of the queue! */ | ||
402 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
403 | h->queue_tail, | ||
404 | qe); | ||
405 | GNUNET_CONTAINER_DLL_insert (h->queue_head, | ||
406 | h->queue_tail, | ||
407 | qe); | ||
408 | GNUNET_assert (h->queue_head == qe); | ||
409 | qe->response_proc (qe->h, NULL); | ||
410 | } | ||
411 | |||
412 | |||
413 | /** | ||
414 | * Create a new entry for our priority queue (and possibly discard other entires if | 473 | * Create a new entry for our priority queue (and possibly discard other entires if |
415 | * the queue is getting too long). | 474 | * the queue is getting too long). |
416 | * | 475 | * |
417 | * @param h handle to the datastore | 476 | * @param h handle to the datastore |
418 | * @param msize size of the message to queue | 477 | * @param env envelope with the message to queue |
419 | * @param queue_priority priority of the entry | 478 | * @param queue_priority priority of the entry |
420 | * @param max_queue_size at what queue size should this request be dropped | 479 | * @param max_queue_size at what queue size should this request be dropped |
421 | * (if other requests of higher priority are in the queue) | 480 | * (if other requests of higher priority are in the queue) |
422 | * @param timeout timeout for the operation | 481 | * @param expected_type which type of response do we expect, |
423 | * @param response_proc function to call with replies (can be NULL) | 482 | * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or |
483 | * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA | ||
424 | * @param qc client context (NOT a closure for @a response_proc) | 484 | * @param qc client context (NOT a closure for @a response_proc) |
425 | * @return NULL if the queue is full | 485 | * @return NULL if the queue is full |
426 | */ | 486 | */ |
427 | static struct GNUNET_DATASTORE_QueueEntry * | 487 | static struct GNUNET_DATASTORE_QueueEntry * |
428 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | 488 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, |
429 | size_t msize, | 489 | struct GNUNET_MQ_Envelope *env, |
430 | unsigned int queue_priority, | 490 | unsigned int queue_priority, |
431 | unsigned int max_queue_size, | 491 | unsigned int max_queue_size, |
432 | struct GNUNET_TIME_Relative timeout, | 492 | uint16_t expected_type, |
433 | GNUNET_CLIENT_MessageHandler response_proc, | ||
434 | const union QueueContext *qc) | 493 | const union QueueContext *qc) |
435 | { | 494 | { |
436 | struct GNUNET_DATASTORE_QueueEntry *ret; | 495 | struct GNUNET_DATASTORE_QueueEntry *qe; |
437 | struct GNUNET_DATASTORE_QueueEntry *pos; | 496 | struct GNUNET_DATASTORE_QueueEntry *pos; |
438 | unsigned int c; | 497 | unsigned int c; |
439 | 498 | ||
440 | c = 0; | 499 | c = 0; |
441 | pos = h->queue_head; | 500 | pos = h->queue_head; |
442 | while ((pos != NULL) && (c < max_queue_size) && | 501 | while ( (NULL != pos) && |
443 | (pos->priority >= queue_priority)) | 502 | (c < max_queue_size) && |
503 | (pos->priority >= queue_priority) ) | ||
444 | { | 504 | { |
445 | c++; | 505 | c++; |
446 | pos = pos->next; | 506 | pos = pos->next; |
@@ -451,18 +511,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
451 | gettext_noop ("# queue overflows"), | 511 | gettext_noop ("# queue overflows"), |
452 | 1, | 512 | 1, |
453 | GNUNET_NO); | 513 | GNUNET_NO); |
514 | GNUNET_MQ_discard (env); | ||
454 | return NULL; | 515 | return NULL; |
455 | } | 516 | } |
456 | ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); | 517 | qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); |
457 | ret->h = h; | 518 | qe->h = h; |
458 | ret->response_proc = response_proc; | 519 | qe->env = env; |
459 | ret->qc = *qc; | 520 | qe->response_type = expected_type; |
460 | ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); | 521 | qe->qc = *qc; |
461 | ret->priority = queue_priority; | 522 | qe->priority = queue_priority; |
462 | ret->max_queue = max_queue_size; | 523 | qe->max_queue = max_queue_size; |
463 | ret->message_size = msize; | 524 | if (NULL == pos) |
464 | ret->was_transmitted = GNUNET_NO; | ||
465 | if (pos == NULL) | ||
466 | { | 525 | { |
467 | /* append at the tail */ | 526 | /* append at the tail */ |
468 | pos = h->queue_tail; | 527 | pos = h->queue_tail; |
@@ -472,49 +531,23 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
472 | pos = pos->prev; | 531 | pos = pos->prev; |
473 | /* do not insert at HEAD if HEAD query was already | 532 | /* do not insert at HEAD if HEAD query was already |
474 | * transmitted and we are still receiving replies! */ | 533 | * transmitted and we are still receiving replies! */ |
475 | if ((pos == NULL) && (h->queue_head->was_transmitted)) | 534 | if ( (NULL == pos) && |
535 | (NULL == h->queue_head->env) ) | ||
476 | pos = h->queue_head; | 536 | pos = h->queue_head; |
477 | } | 537 | } |
478 | c++; | 538 | c++; |
479 | #if INSANE_STATISTICS | 539 | #if INSANE_STATISTICS |
480 | GNUNET_STATISTICS_update (h->stats, | 540 | GNUNET_STATISTICS_update (h->stats, |
481 | gettext_noop ("# queue entries created"), | 541 | gettext_noop ("# queue entries created"), |
482 | 1, GNUNET_NO); | 542 | 1, |
543 | GNUNET_NO); | ||
483 | #endif | 544 | #endif |
484 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, | 545 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, |
485 | h->queue_tail, | 546 | h->queue_tail, |
486 | pos, | 547 | pos, |
487 | ret); | 548 | qe); |
488 | h->queue_size++; | 549 | h->queue_size++; |
489 | ret->task = GNUNET_SCHEDULER_add_delayed (timeout, | 550 | return qe; |
490 | &timeout_queue_entry, | ||
491 | ret); | ||
492 | for (pos = ret->next; NULL != pos; pos = pos->next) | ||
493 | { | ||
494 | if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) | ||
495 | { | ||
496 | GNUNET_assert (NULL != pos->response_proc); | ||
497 | /* move 'pos' element to head so that it will be | ||
498 | * killed on 'NULL' call below */ | ||
499 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
500 | "Dropping request from datastore queue\n"); | ||
501 | /* response_proc's expect request at the head of the queue! */ | ||
502 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
503 | h->queue_tail, | ||
504 | pos); | ||
505 | GNUNET_CONTAINER_DLL_insert (h->queue_head, | ||
506 | h->queue_tail, | ||
507 | pos); | ||
508 | GNUNET_STATISTICS_update (h->stats, | ||
509 | gettext_noop | ||
510 | ("# Requests dropped from datastore queue"), 1, | ||
511 | GNUNET_NO); | ||
512 | GNUNET_assert (h->queue_head == pos); | ||
513 | pos->response_proc (h, NULL); | ||
514 | break; | ||
515 | } | ||
516 | } | ||
517 | return ret; | ||
518 | } | 551 | } |
519 | 552 | ||
520 | 553 | ||
@@ -525,78 +558,88 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
525 | * @param h handle to the datastore | 558 | * @param h handle to the datastore |
526 | */ | 559 | */ |
527 | static void | 560 | static void |
528 | process_queue (struct GNUNET_DATASTORE_Handle *h); | 561 | process_queue (struct GNUNET_DATASTORE_Handle *h) |
529 | |||
530 | |||
531 | /** | ||
532 | * Try reconnecting to the datastore service. | ||
533 | * | ||
534 | * @param cls the `struct GNUNET_DATASTORE_Handle` | ||
535 | */ | ||
536 | static void | ||
537 | try_reconnect (void *cls) | ||
538 | { | 562 | { |
539 | struct GNUNET_DATASTORE_Handle *h = cls; | 563 | struct GNUNET_DATASTORE_QueueEntry *qe; |
540 | 564 | ||
541 | h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); | 565 | if (NULL == (qe = h->queue_head)) |
542 | h->reconnect_task = NULL; | ||
543 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); | ||
544 | if (h->client == NULL) | ||
545 | { | 566 | { |
546 | LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); | 567 | /* no entry in queue */ |
568 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
569 | "Queue empty\n"); | ||
547 | return; | 570 | return; |
548 | } | 571 | } |
549 | GNUNET_STATISTICS_update (h->stats, | 572 | if (NULL == qe->env) |
550 | gettext_noop | 573 | { |
551 | ("# datastore connections (re)created"), 1, | 574 | /* waiting for replies */ |
552 | GNUNET_NO); | 575 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
553 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); | 576 | "Head request already transmitted\n"); |
554 | process_queue (h); | 577 | return; |
578 | } | ||
579 | if (NULL == h->mq) | ||
580 | { | ||
581 | /* waiting for reconnect */ | ||
582 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
583 | "Not connected\n"); | ||
584 | return; | ||
585 | } | ||
586 | GNUNET_MQ_send (h->mq, | ||
587 | qe->env); | ||
588 | qe->env = NULL; | ||
555 | } | 589 | } |
556 | 590 | ||
557 | 591 | ||
592 | |||
593 | |||
558 | /** | 594 | /** |
559 | * Disconnect from the service and then try reconnecting to the datastore service | 595 | * Function called to check status message from the service. |
560 | * after some delay. | ||
561 | * | 596 | * |
562 | * @param h handle to datastore to disconnect and reconnect | 597 | * @param cls closure |
598 | * @param sm status message received | ||
599 | * @return #GNUNET_OK if the message is well-formed | ||
563 | */ | 600 | */ |
564 | static void | 601 | static int |
565 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) | 602 | check_status (void *cls, |
603 | const struct StatusMessage *sm) | ||
566 | { | 604 | { |
567 | if (NULL == h->client) | 605 | uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); |
606 | int32_t status = ntohl (sm->status); | ||
607 | |||
608 | if (msize > 0) | ||
568 | { | 609 | { |
569 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 610 | const char *emsg = (const char *) &sm[1]; |
570 | "Client NULL in disconnect, will not try to reconnect\n"); | 611 | |
571 | return; | 612 | if ('\0' != emsg[msize - 1]) |
613 | { | ||
614 | GNUNET_break (0); | ||
615 | return GNUNET_SYSERR; | ||
616 | } | ||
572 | } | 617 | } |
573 | GNUNET_CLIENT_disconnect (h->client); | 618 | else if (GNUNET_SYSERR == status) |
574 | h->skip_next_messages = 0; | 619 | { |
575 | h->client = NULL; | 620 | GNUNET_break (0); |
576 | h->reconnect_task = | 621 | return GNUNET_SYSERR; |
577 | GNUNET_SCHEDULER_add_delayed (h->retry_time, | 622 | } |
578 | &try_reconnect, | 623 | return GNUNET_OK; |
579 | h); | ||
580 | } | 624 | } |
581 | 625 | ||
582 | 626 | ||
583 | /** | 627 | /** |
584 | * Function called whenever we receive a message from | 628 | * Function called to handle status message from the service. |
585 | * the service. Calls the appropriate handler. | ||
586 | * | 629 | * |
587 | * @param cls the `struct GNUNET_DATASTORE_Handle` | 630 | * @param cls closure |
588 | * @param msg the received message | 631 | * @param sm status message received |
589 | */ | 632 | */ |
590 | static void | 633 | static void |
591 | receive_cb (void *cls, | 634 | handle_status (void *cls, |
592 | const struct GNUNET_MessageHeader *msg) | 635 | const struct StatusMessage *sm) |
593 | { | 636 | { |
594 | struct GNUNET_DATASTORE_Handle *h = cls; | 637 | struct GNUNET_DATASTORE_Handle *h = cls; |
595 | struct GNUNET_DATASTORE_QueueEntry *qe; | 638 | struct GNUNET_DATASTORE_QueueEntry *qe; |
639 | struct StatusContext rc; | ||
640 | const char *emsg; | ||
641 | int32_t status = ntohl (sm->status); | ||
596 | 642 | ||
597 | h->in_receive = GNUNET_NO; | ||
598 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
599 | "Receiving reply from datastore\n"); | ||
600 | if (h->skip_next_messages > 0) | 643 | if (h->skip_next_messages > 0) |
601 | { | 644 | { |
602 | h->skip_next_messages--; | 645 | h->skip_next_messages--; |
@@ -606,252 +649,255 @@ receive_cb (void *cls, | |||
606 | if (NULL == (qe = h->queue_head)) | 649 | if (NULL == (qe = h->queue_head)) |
607 | { | 650 | { |
608 | GNUNET_break (0); | 651 | GNUNET_break (0); |
609 | process_queue (h); | 652 | do_disconnect (h); |
653 | return; | ||
654 | } | ||
655 | if (NULL != qe->env) | ||
656 | { | ||
657 | GNUNET_break (0); | ||
658 | do_disconnect (h); | ||
610 | return; | 659 | return; |
611 | } | 660 | } |
612 | qe->response_proc (h, msg); | 661 | if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) |
662 | { | ||
663 | GNUNET_break (0); | ||
664 | do_disconnect (h); | ||
665 | return; | ||
666 | } | ||
667 | rc = qe->qc.sc; | ||
668 | free_queue_entry (qe); | ||
669 | if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) | ||
670 | emsg = (const char *) &sm[1]; | ||
671 | else | ||
672 | emsg = NULL; | ||
673 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
674 | "Received status %d/%s\n", | ||
675 | (int) status, | ||
676 | emsg); | ||
677 | GNUNET_STATISTICS_update (h->stats, | ||
678 | gettext_noop ("# status messages received"), | ||
679 | 1, | ||
680 | GNUNET_NO); | ||
681 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
682 | process_queue (h); | ||
683 | if (NULL != rc.cont) | ||
684 | rc.cont (rc.cont_cls, | ||
685 | status, | ||
686 | GNUNET_TIME_absolute_ntoh (sm->min_expiration), | ||
687 | emsg); | ||
613 | } | 688 | } |
614 | 689 | ||
615 | 690 | ||
616 | /** | 691 | /** |
617 | * Transmit request from queue to datastore service. | 692 | * Check data message we received from the service. |
618 | * | 693 | * |
619 | * @param cls the `struct GNUNET_DATASTORE_Handle` | 694 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` |
620 | * @param size number of bytes that can be copied to @a buf | 695 | * @param dm message received |
621 | * @param buf where to copy the drop message | ||
622 | * @return number of bytes written to @a buf | ||
623 | */ | 696 | */ |
624 | static size_t | 697 | static int |
625 | transmit_request (void *cls, | 698 | check_data (void *cls, |
626 | size_t size, | 699 | const struct DataMessage *dm) |
627 | void *buf) | ||
628 | { | 700 | { |
629 | struct GNUNET_DATASTORE_Handle *h = cls; | 701 | uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); |
630 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
631 | size_t msize; | ||
632 | 702 | ||
633 | h->th = NULL; | 703 | if (msize != ntohl (dm->size)) |
634 | if (NULL == (qe = h->queue_head)) | ||
635 | return 0; /* no entry in queue */ | ||
636 | if (NULL == buf) | ||
637 | { | 704 | { |
638 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 705 | GNUNET_break (0); |
639 | "Failed to transmit request to DATASTORE.\n"); | 706 | return GNUNET_SYSERR; |
640 | GNUNET_STATISTICS_update (h->stats, | ||
641 | gettext_noop ("# transmission request failures"), | ||
642 | 1, GNUNET_NO); | ||
643 | do_disconnect (h); | ||
644 | return 0; | ||
645 | } | ||
646 | if (size < (msize = qe->message_size)) | ||
647 | { | ||
648 | process_queue (h); | ||
649 | return 0; | ||
650 | } | 707 | } |
651 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 708 | return GNUNET_OK; |
652 | "Transmitting %u byte request to DATASTORE\n", | ||
653 | msize); | ||
654 | memcpy (buf, &qe[1], msize); | ||
655 | qe->was_transmitted = GNUNET_YES; | ||
656 | GNUNET_SCHEDULER_cancel (qe->task); | ||
657 | qe->task = NULL; | ||
658 | GNUNET_assert (GNUNET_NO == h->in_receive); | ||
659 | h->in_receive = GNUNET_YES; | ||
660 | GNUNET_CLIENT_receive (h->client, | ||
661 | &receive_cb, h, | ||
662 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | ||
663 | #if INSANE_STATISTICS | ||
664 | GNUNET_STATISTICS_update (h->stats, | ||
665 | gettext_noop ("# bytes sent to datastore"), msize, | ||
666 | GNUNET_NO); | ||
667 | #endif | ||
668 | return msize; | ||
669 | } | 709 | } |
670 | 710 | ||
671 | 711 | ||
672 | /** | 712 | /** |
673 | * Process entries in the queue (or do nothing if we are already | 713 | * Handle data message we got from the service. |
674 | * doing so). | ||
675 | * | 714 | * |
676 | * @param h handle to the datastore | 715 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` |
716 | * @param dm message received | ||
677 | */ | 717 | */ |
678 | static void | 718 | static void |
679 | process_queue (struct GNUNET_DATASTORE_Handle *h) | 719 | handle_data (void *cls, |
720 | const struct DataMessage *dm) | ||
680 | { | 721 | { |
722 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
681 | struct GNUNET_DATASTORE_QueueEntry *qe; | 723 | struct GNUNET_DATASTORE_QueueEntry *qe; |
724 | struct ResultContext rc; | ||
682 | 725 | ||
683 | if (NULL == (qe = h->queue_head)) | 726 | if (h->skip_next_messages > 0) |
684 | { | ||
685 | /* no entry in queue */ | ||
686 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
687 | "Queue empty\n"); | ||
688 | return; | ||
689 | } | ||
690 | if (GNUNET_YES == qe->was_transmitted) | ||
691 | { | 727 | { |
692 | /* waiting for replies */ | 728 | process_queue (h); |
693 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
694 | "Head request already transmitted\n"); | ||
695 | return; | 729 | return; |
696 | } | 730 | } |
697 | if (NULL != h->th) | 731 | qe = h->queue_head; |
732 | if (NULL == qe) | ||
698 | { | 733 | { |
699 | /* request pending */ | 734 | GNUNET_break (0); |
700 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 735 | do_disconnect (h); |
701 | "Pending transmission request\n"); | ||
702 | return; | 736 | return; |
703 | } | 737 | } |
704 | if (NULL == h->client) | 738 | if (NULL != qe->env) |
705 | { | 739 | { |
706 | /* waiting for reconnect */ | 740 | GNUNET_break (0); |
707 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 741 | do_disconnect (h); |
708 | "Not connected\n"); | ||
709 | return; | 742 | return; |
710 | } | 743 | } |
711 | if (GNUNET_YES == h->in_receive) | 744 | if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) |
712 | { | 745 | { |
713 | /* wait for response to previous query */ | 746 | GNUNET_break (0); |
747 | do_disconnect (h); | ||
714 | return; | 748 | return; |
715 | } | 749 | } |
750 | #if INSANE_STATISTICS | ||
751 | GNUNET_STATISTICS_update (h->stats, | ||
752 | gettext_noop ("# Results received"), | ||
753 | 1, | ||
754 | GNUNET_NO); | ||
755 | #endif | ||
716 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 756 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
717 | "Queueing %u byte request to DATASTORE\n", | 757 | "Received result %llu with type %u and size %u with key %s\n", |
718 | qe->message_size); | 758 | (unsigned long long) GNUNET_ntohll (dm->uid), |
719 | h->th | 759 | ntohl (dm->type), |
720 | = GNUNET_CLIENT_notify_transmit_ready (h->client, | 760 | ntohl (dm->size), |
721 | qe->message_size, | 761 | GNUNET_h2s (&dm->key)); |
722 | GNUNET_TIME_absolute_get_remaining (qe->timeout), | 762 | rc = qe->qc.rc; |
723 | GNUNET_YES, | 763 | free_queue_entry (qe); |
724 | &transmit_request, | 764 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
725 | h); | 765 | process_queue (h); |
726 | GNUNET_assert (GNUNET_NO == h->in_receive); | 766 | if (NULL != rc.proc) |
727 | GNUNET_break (NULL != h->th); | 767 | rc.proc (rc.proc_cls, |
728 | } | 768 | &dm->key, |
729 | 769 | ntohl (dm->size), | |
730 | 770 | &dm[1], | |
731 | /** | 771 | ntohl (dm->type), |
732 | * Dummy continuation used to do nothing (but be non-zero). | 772 | ntohl (dm->priority), |
733 | * | 773 | ntohl (dm->anonymity), |
734 | * @param cls closure | 774 | GNUNET_TIME_absolute_ntoh (dm->expiration), |
735 | * @param result result | 775 | GNUNET_ntohll (dm->uid)); |
736 | * @param min_expiration expiration time | ||
737 | * @param emsg error message | ||
738 | */ | ||
739 | static void | ||
740 | drop_status_cont (void *cls, int32_t result, | ||
741 | struct GNUNET_TIME_Absolute min_expiration, | ||
742 | const char *emsg) | ||
743 | { | ||
744 | /* do nothing */ | ||
745 | } | ||
746 | |||
747 | |||
748 | /** | ||
749 | * Free a queue entry. Removes the given entry from the | ||
750 | * queue and releases associated resources. Does NOT | ||
751 | * call the callback. | ||
752 | * | ||
753 | * @param qe entry to free. | ||
754 | */ | ||
755 | static void | ||
756 | free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | ||
757 | { | ||
758 | struct GNUNET_DATASTORE_Handle *h = qe->h; | ||
759 | |||
760 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | ||
761 | h->queue_tail, | ||
762 | qe); | ||
763 | if (qe->task != NULL) | ||
764 | { | ||
765 | GNUNET_SCHEDULER_cancel (qe->task); | ||
766 | qe->task = NULL; | ||
767 | } | ||
768 | h->queue_size--; | ||
769 | qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ | ||
770 | GNUNET_free (qe); | ||
771 | } | 776 | } |
772 | 777 | ||
773 | 778 | ||
774 | /** | 779 | /** |
775 | * Type of a function to call when we receive a message | 780 | * Type of a function to call when we receive a |
776 | * from the service. | 781 | * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. |
777 | * | 782 | * |
778 | * @param cls closure | 783 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` |
779 | * @param msg message received, NULL on timeout or fatal error | 784 | * @param msg message received |
780 | */ | 785 | */ |
781 | static void | 786 | static void |
782 | process_status_message (void *cls, | 787 | handle_data_end (void *cls, |
783 | const struct GNUNET_MessageHeader *msg) | 788 | const struct GNUNET_MessageHeader *msg) |
784 | { | 789 | { |
785 | struct GNUNET_DATASTORE_Handle *h = cls; | 790 | struct GNUNET_DATASTORE_Handle *h = cls; |
786 | struct GNUNET_DATASTORE_QueueEntry *qe; | 791 | struct GNUNET_DATASTORE_QueueEntry *qe; |
787 | struct StatusContext rc; | 792 | struct ResultContext rc; |
788 | const struct StatusMessage *sm; | ||
789 | const char *emsg; | ||
790 | int32_t status; | ||
791 | int was_transmitted; | ||
792 | 793 | ||
793 | if (NULL == (qe = h->queue_head)) | 794 | if (h->skip_next_messages > 0) |
794 | { | 795 | { |
795 | GNUNET_break (0); | 796 | h->skip_next_messages--; |
796 | do_disconnect (h); | 797 | process_queue (h); |
797 | return; | 798 | return; |
798 | } | 799 | } |
799 | rc = qe->qc.sc; | 800 | qe = h->queue_head; |
800 | if (NULL == msg) | 801 | if (NULL == qe) |
801 | { | 802 | { |
802 | was_transmitted = qe->was_transmitted; | 803 | GNUNET_break (0); |
803 | free_queue_entry (qe); | 804 | do_disconnect (h); |
804 | if (was_transmitted == GNUNET_YES) | ||
805 | do_disconnect (h); | ||
806 | else | ||
807 | process_queue (h); | ||
808 | if (NULL != rc.cont) | ||
809 | rc.cont (rc.cont_cls, GNUNET_SYSERR, | ||
810 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
811 | _("Failed to receive status response from database.")); | ||
812 | return; | 805 | return; |
813 | } | 806 | } |
814 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | 807 | if (NULL != qe->env) |
815 | free_queue_entry (qe); | ||
816 | if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || | ||
817 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) | ||
818 | { | 808 | { |
819 | GNUNET_break (0); | 809 | GNUNET_break (0); |
820 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
821 | do_disconnect (h); | 810 | do_disconnect (h); |
822 | if (rc.cont != NULL) | ||
823 | rc.cont (rc.cont_cls, GNUNET_SYSERR, | ||
824 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
825 | _("Error reading response from datastore service")); | ||
826 | return; | 811 | return; |
827 | } | 812 | } |
828 | sm = (const struct StatusMessage *) msg; | 813 | if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) |
829 | status = ntohl (sm->status); | ||
830 | emsg = NULL; | ||
831 | if (ntohs (msg->size) > sizeof (struct StatusMessage)) | ||
832 | { | ||
833 | emsg = (const char *) &sm[1]; | ||
834 | if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') | ||
835 | { | ||
836 | GNUNET_break (0); | ||
837 | emsg = _("Invalid error message received from datastore service"); | ||
838 | } | ||
839 | } | ||
840 | if ((status == GNUNET_SYSERR) && (emsg == NULL)) | ||
841 | { | 814 | { |
842 | GNUNET_break (0); | 815 | GNUNET_break (0); |
843 | emsg = _("Invalid error message received from datastore service"); | 816 | do_disconnect (h); |
817 | return; | ||
844 | } | 818 | } |
845 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); | 819 | rc = qe->qc.rc; |
820 | free_queue_entry (qe); | ||
821 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
822 | "Received end of result set, new queue size is %u\n", | ||
823 | h->queue_size); | ||
824 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
825 | h->result_count = 0; | ||
826 | process_queue (h); | ||
827 | /* signal end of iteration */ | ||
828 | if (NULL != rc.proc) | ||
829 | rc.proc (rc.proc_cls, | ||
830 | NULL, | ||
831 | 0, | ||
832 | NULL, | ||
833 | 0, | ||
834 | 0, | ||
835 | 0, | ||
836 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
837 | 0); | ||
838 | } | ||
839 | |||
840 | |||
841 | /** | ||
842 | * Try reconnecting to the datastore service. | ||
843 | * | ||
844 | * @param cls the `struct GNUNET_DATASTORE_Handle` | ||
845 | */ | ||
846 | static void | ||
847 | try_reconnect (void *cls) | ||
848 | { | ||
849 | GNUNET_MQ_hd_var_size (status, | ||
850 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
851 | struct StatusMessage); | ||
852 | GNUNET_MQ_hd_var_size (data, | ||
853 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
854 | struct DataMessage); | ||
855 | GNUNET_MQ_hd_fixed_size (data_end, | ||
856 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, | ||
857 | struct GNUNET_MessageHeader); | ||
858 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
859 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
860 | make_status_handler (h), | ||
861 | make_data_handler (h), | ||
862 | make_data_end_handler (h), | ||
863 | GNUNET_MQ_handler_end () | ||
864 | }; | ||
865 | |||
866 | h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); | ||
867 | h->reconnect_task = NULL; | ||
868 | GNUNET_assert (NULL == h->mq); | ||
869 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
870 | "datastore", | ||
871 | handlers, | ||
872 | &mq_error_handler, | ||
873 | h); | ||
874 | if (NULL == h->mq) | ||
875 | return; | ||
846 | GNUNET_STATISTICS_update (h->stats, | 876 | GNUNET_STATISTICS_update (h->stats, |
847 | gettext_noop ("# status messages received"), 1, | 877 | gettext_noop ("# datastore connections (re)created"), |
878 | 1, | ||
848 | GNUNET_NO); | 879 | GNUNET_NO); |
849 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 880 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
881 | "Reconnected to DATASTORE\n"); | ||
850 | process_queue (h); | 882 | process_queue (h); |
851 | if (rc.cont != NULL) | 883 | } |
852 | rc.cont (rc.cont_cls, status, | 884 | |
853 | GNUNET_TIME_absolute_ntoh (sm->min_expiration), | 885 | |
854 | emsg); | 886 | /** |
887 | * Dummy continuation used to do nothing (but be non-zero). | ||
888 | * | ||
889 | * @param cls closure | ||
890 | * @param result result | ||
891 | * @param min_expiration expiration time | ||
892 | * @param emsg error message | ||
893 | */ | ||
894 | static void | ||
895 | drop_status_cont (void *cls, | ||
896 | int32_t result, | ||
897 | struct GNUNET_TIME_Absolute min_expiration, | ||
898 | const char *emsg) | ||
899 | { | ||
900 | /* do nothing */ | ||
855 | } | 901 | } |
856 | 902 | ||
857 | 903 | ||
@@ -874,7 +920,6 @@ process_status_message (void *cls, | |||
874 | * @param queue_priority ranking of this request in the priority queue | 920 | * @param queue_priority ranking of this request in the priority queue |
875 | * @param max_queue_size at what queue size should this request be dropped | 921 | * @param max_queue_size at what queue size should this request be dropped |
876 | * (if other requests of higher priority are in the queue) | 922 | * (if other requests of higher priority are in the queue) |
877 | * @param timeout timeout for the operation | ||
878 | * @param cont continuation to call when done | 923 | * @param cont continuation to call when done |
879 | * @param cont_cls closure for @a cont | 924 | * @param cont_cls closure for @a cont |
880 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 925 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
@@ -894,33 +939,51 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
894 | struct GNUNET_TIME_Absolute expiration, | 939 | struct GNUNET_TIME_Absolute expiration, |
895 | unsigned int queue_priority, | 940 | unsigned int queue_priority, |
896 | unsigned int max_queue_size, | 941 | unsigned int max_queue_size, |
897 | struct GNUNET_TIME_Relative timeout, | ||
898 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 942 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
899 | void *cont_cls) | 943 | void *cont_cls) |
900 | { | 944 | { |
901 | struct GNUNET_DATASTORE_QueueEntry *qe; | 945 | struct GNUNET_DATASTORE_QueueEntry *qe; |
946 | struct GNUNET_MQ_Envelope *env; | ||
902 | struct DataMessage *dm; | 947 | struct DataMessage *dm; |
903 | size_t msize; | ||
904 | union QueueContext qc; | 948 | union QueueContext qc; |
905 | 949 | ||
950 | if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
951 | { | ||
952 | GNUNET_break (0); | ||
953 | return NULL; | ||
954 | } | ||
955 | |||
906 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 956 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
907 | "Asked to put %u bytes of data under key `%s' for %s\n", | 957 | "Asked to put %u bytes of data under key `%s' for %s\n", |
908 | size, | 958 | size, |
909 | GNUNET_h2s (key), | 959 | GNUNET_h2s (key), |
910 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), | 960 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), |
911 | GNUNET_YES)); | 961 | GNUNET_YES)); |
912 | msize = sizeof (struct DataMessage) + size; | 962 | env = GNUNET_MQ_msg_extra (dm, |
913 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 963 | size, |
964 | GNUNET_MESSAGE_TYPE_DATASTORE_PUT); | ||
965 | dm->rid = htonl (rid); | ||
966 | dm->size = htonl ((uint32_t) size); | ||
967 | dm->type = htonl (type); | ||
968 | dm->priority = htonl (priority); | ||
969 | dm->anonymity = htonl (anonymity); | ||
970 | dm->replication = htonl (replication); | ||
971 | dm->reserved = htonl (0); | ||
972 | dm->uid = GNUNET_htonll (0); | ||
973 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
974 | dm->key = *key; | ||
975 | memcpy (&dm[1], | ||
976 | data, | ||
977 | size); | ||
914 | qc.sc.cont = cont; | 978 | qc.sc.cont = cont; |
915 | qc.sc.cont_cls = cont_cls; | 979 | qc.sc.cont_cls = cont_cls; |
916 | qe = make_queue_entry (h, | 980 | qe = make_queue_entry (h, |
917 | msize, | 981 | env, |
918 | queue_priority, | 982 | queue_priority, |
919 | max_queue_size, | 983 | max_queue_size, |
920 | timeout, | 984 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
921 | &process_status_message, | ||
922 | &qc); | 985 | &qc); |
923 | if (qe == NULL) | 986 | if (NULL == qe) |
924 | { | 987 | { |
925 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 988 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
926 | "Could not create queue entry for PUT\n"); | 989 | "Could not create queue entry for PUT\n"); |
@@ -930,20 +993,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
930 | gettext_noop ("# PUT requests executed"), | 993 | gettext_noop ("# PUT requests executed"), |
931 | 1, | 994 | 1, |
932 | GNUNET_NO); | 995 | GNUNET_NO); |
933 | dm = (struct DataMessage *) &qe[1]; | ||
934 | dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); | ||
935 | dm->header.size = htons (msize); | ||
936 | dm->rid = htonl (rid); | ||
937 | dm->size = htonl ((uint32_t) size); | ||
938 | dm->type = htonl (type); | ||
939 | dm->priority = htonl (priority); | ||
940 | dm->anonymity = htonl (anonymity); | ||
941 | dm->replication = htonl (replication); | ||
942 | dm->reserved = htonl (0); | ||
943 | dm->uid = GNUNET_htonll (0); | ||
944 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
945 | dm->key = *key; | ||
946 | memcpy (&dm[1], data, size); | ||
947 | process_queue (h); | 996 | process_queue (h); |
948 | return qe; | 997 | return qe; |
949 | } | 998 | } |
@@ -972,6 +1021,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
972 | void *cont_cls) | 1021 | void *cont_cls) |
973 | { | 1022 | { |
974 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1023 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1024 | struct GNUNET_MQ_Envelope *env; | ||
975 | struct ReserveMessage *rm; | 1025 | struct ReserveMessage *rm; |
976 | union QueueContext qc; | 1026 | union QueueContext qc; |
977 | 1027 | ||
@@ -981,14 +1031,18 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
981 | "Asked to reserve %llu bytes of data and %u entries\n", | 1031 | "Asked to reserve %llu bytes of data and %u entries\n", |
982 | (unsigned long long) amount, | 1032 | (unsigned long long) amount, |
983 | (unsigned int) entries); | 1033 | (unsigned int) entries); |
1034 | env = GNUNET_MQ_msg (rm, | ||
1035 | GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); | ||
1036 | rm->entries = htonl (entries); | ||
1037 | rm->amount = GNUNET_htonll (amount); | ||
1038 | |||
984 | qc.sc.cont = cont; | 1039 | qc.sc.cont = cont; |
985 | qc.sc.cont_cls = cont_cls; | 1040 | qc.sc.cont_cls = cont_cls; |
986 | qe = make_queue_entry (h, | 1041 | qe = make_queue_entry (h, |
987 | sizeof (struct ReserveMessage), | 1042 | env, |
988 | UINT_MAX, | 1043 | UINT_MAX, |
989 | UINT_MAX, | 1044 | UINT_MAX, |
990 | GNUNET_TIME_UNIT_FOREVER_REL, | 1045 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
991 | &process_status_message, | ||
992 | &qc); | 1046 | &qc); |
993 | if (NULL == qe) | 1047 | if (NULL == qe) |
994 | { | 1048 | { |
@@ -1000,11 +1054,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1000 | gettext_noop ("# RESERVE requests executed"), | 1054 | gettext_noop ("# RESERVE requests executed"), |
1001 | 1, | 1055 | 1, |
1002 | GNUNET_NO); | 1056 | GNUNET_NO); |
1003 | rm = (struct ReserveMessage *) &qe[1]; | ||
1004 | rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); | ||
1005 | rm->header.size = htons (sizeof (struct ReserveMessage)); | ||
1006 | rm->entries = htonl (entries); | ||
1007 | rm->amount = GNUNET_htonll (amount); | ||
1008 | process_queue (h); | 1057 | process_queue (h); |
1009 | return qe; | 1058 | return qe; |
1010 | } | 1059 | } |
@@ -1024,7 +1073,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1024 | * @param queue_priority ranking of this request in the priority queue | 1073 | * @param queue_priority ranking of this request in the priority queue |
1025 | * @param max_queue_size at what queue size should this request be dropped | 1074 | * @param max_queue_size at what queue size should this request be dropped |
1026 | * (if other requests of higher priority are in the queue) | 1075 | * (if other requests of higher priority are in the queue) |
1027 | * @param timeout how long to wait at most for a response | ||
1028 | * @param cont continuation to call when done | 1076 | * @param cont continuation to call when done |
1029 | * @param cont_cls closure for @a cont | 1077 | * @param cont_cls closure for @a cont |
1030 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1078 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
@@ -1036,29 +1084,31 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1036 | uint32_t rid, | 1084 | uint32_t rid, |
1037 | unsigned int queue_priority, | 1085 | unsigned int queue_priority, |
1038 | unsigned int max_queue_size, | 1086 | unsigned int max_queue_size, |
1039 | struct GNUNET_TIME_Relative timeout, | ||
1040 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1087 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1041 | void *cont_cls) | 1088 | void *cont_cls) |
1042 | { | 1089 | { |
1043 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1090 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1091 | struct GNUNET_MQ_Envelope *env; | ||
1044 | struct ReleaseReserveMessage *rrm; | 1092 | struct ReleaseReserveMessage *rrm; |
1045 | union QueueContext qc; | 1093 | union QueueContext qc; |
1046 | 1094 | ||
1047 | if (cont == NULL) | 1095 | if (NULL == cont) |
1048 | cont = &drop_status_cont; | 1096 | cont = &drop_status_cont; |
1049 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1097 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1050 | "Asked to release reserve %d\n", | 1098 | "Asked to release reserve %d\n", |
1051 | rid); | 1099 | rid); |
1100 | env = GNUNET_MQ_msg (rrm, | ||
1101 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); | ||
1102 | rrm->rid = htonl (rid); | ||
1052 | qc.sc.cont = cont; | 1103 | qc.sc.cont = cont; |
1053 | qc.sc.cont_cls = cont_cls; | 1104 | qc.sc.cont_cls = cont_cls; |
1054 | qe = make_queue_entry (h, | 1105 | qe = make_queue_entry (h, |
1055 | sizeof (struct ReleaseReserveMessage), | 1106 | env, |
1056 | queue_priority, | 1107 | queue_priority, |
1057 | max_queue_size, | 1108 | max_queue_size, |
1058 | timeout, | 1109 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1059 | &process_status_message, | ||
1060 | &qc); | 1110 | &qc); |
1061 | if (qe == NULL) | 1111 | if (NULL == qe) |
1062 | { | 1112 | { |
1063 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1113 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1064 | "Could not create queue entry to release reserve\n"); | 1114 | "Could not create queue entry to release reserve\n"); |
@@ -1068,10 +1118,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1068 | gettext_noop | 1118 | gettext_noop |
1069 | ("# RELEASE RESERVE requests executed"), 1, | 1119 | ("# RELEASE RESERVE requests executed"), 1, |
1070 | GNUNET_NO); | 1120 | GNUNET_NO); |
1071 | rrm = (struct ReleaseReserveMessage *) &qe[1]; | ||
1072 | rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); | ||
1073 | rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); | ||
1074 | rrm->rid = htonl (rid); | ||
1075 | process_queue (h); | 1121 | process_queue (h); |
1076 | return qe; | 1122 | return qe; |
1077 | } | 1123 | } |
@@ -1087,7 +1133,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1087 | * @param queue_priority ranking of this request in the priority queue | 1133 | * @param queue_priority ranking of this request in the priority queue |
1088 | * @param max_queue_size at what queue size should this request be dropped | 1134 | * @param max_queue_size at what queue size should this request be dropped |
1089 | * (if other requests of higher priority are in the queue) | 1135 | * (if other requests of higher priority are in the queue) |
1090 | * @param timeout how long to wait at most for a response | ||
1091 | * @param cont continuation to call when done | 1136 | * @param cont continuation to call when done |
1092 | * @param cont_cls closure for @a cont | 1137 | * @param cont_cls closure for @a cont |
1093 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1138 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
@@ -1101,26 +1146,36 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1101 | struct GNUNET_TIME_Absolute expiration, | 1146 | struct GNUNET_TIME_Absolute expiration, |
1102 | unsigned int queue_priority, | 1147 | unsigned int queue_priority, |
1103 | unsigned int max_queue_size, | 1148 | unsigned int max_queue_size, |
1104 | struct GNUNET_TIME_Relative timeout, | ||
1105 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1149 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1106 | void *cont_cls) | 1150 | void *cont_cls) |
1107 | { | 1151 | { |
1108 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1152 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1153 | struct GNUNET_MQ_Envelope *env; | ||
1109 | struct UpdateMessage *um; | 1154 | struct UpdateMessage *um; |
1110 | union QueueContext qc; | 1155 | union QueueContext qc; |
1111 | 1156 | ||
1112 | if (cont == NULL) | 1157 | if (NULL == cont) |
1113 | cont = &drop_status_cont; | 1158 | cont = &drop_status_cont; |
1114 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1159 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1115 | "Asked to update entry %llu raising priority by %u and expiration to %s\n", | 1160 | "Asked to update entry %llu raising priority by %u and expiration to %s\n", |
1116 | uid, | 1161 | uid, |
1117 | (unsigned int) priority, | 1162 | (unsigned int) priority, |
1118 | GNUNET_STRINGS_absolute_time_to_string (expiration)); | 1163 | GNUNET_STRINGS_absolute_time_to_string (expiration)); |
1164 | env = GNUNET_MQ_msg (um, | ||
1165 | GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); | ||
1166 | um->priority = htonl (priority); | ||
1167 | um->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1168 | um->uid = GNUNET_htonll (uid); | ||
1169 | |||
1119 | qc.sc.cont = cont; | 1170 | qc.sc.cont = cont; |
1120 | qc.sc.cont_cls = cont_cls; | 1171 | qc.sc.cont_cls = cont_cls; |
1121 | qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, | 1172 | qe = make_queue_entry (h, |
1122 | max_queue_size, timeout, &process_status_message, &qc); | 1173 | env, |
1123 | if (qe == NULL) | 1174 | queue_priority, |
1175 | max_queue_size, | ||
1176 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | ||
1177 | &qc); | ||
1178 | if (NULL == qe) | ||
1124 | { | 1179 | { |
1125 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1180 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1126 | "Could not create queue entry for UPDATE\n"); | 1181 | "Could not create queue entry for UPDATE\n"); |
@@ -1129,12 +1184,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1129 | GNUNET_STATISTICS_update (h->stats, | 1184 | GNUNET_STATISTICS_update (h->stats, |
1130 | gettext_noop ("# UPDATE requests executed"), 1, | 1185 | gettext_noop ("# UPDATE requests executed"), 1, |
1131 | GNUNET_NO); | 1186 | GNUNET_NO); |
1132 | um = (struct UpdateMessage *) &qe[1]; | ||
1133 | um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); | ||
1134 | um->header.size = htons (sizeof (struct UpdateMessage)); | ||
1135 | um->priority = htonl (priority); | ||
1136 | um->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1137 | um->uid = GNUNET_htonll (uid); | ||
1138 | process_queue (h); | 1187 | process_queue (h); |
1139 | return qe; | 1188 | return qe; |
1140 | } | 1189 | } |
@@ -1154,7 +1203,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1154 | * @param queue_priority ranking of this request in the priority queue | 1203 | * @param queue_priority ranking of this request in the priority queue |
1155 | * @param max_queue_size at what queue size should this request be dropped | 1204 | * @param max_queue_size at what queue size should this request be dropped |
1156 | * (if other requests of higher priority are in the queue) | 1205 | * (if other requests of higher priority are in the queue) |
1157 | * @param timeout how long to wait at most for a response | ||
1158 | * @param cont continuation to call when done | 1206 | * @param cont continuation to call when done |
1159 | * @param cont_cls closure for @a cont | 1207 | * @param cont_cls closure for @a cont |
1160 | * @return NULL if the entry was not queued, otherwise a handle that can be used to | 1208 | * @return NULL if the entry was not queued, otherwise a handle that can be used to |
@@ -1168,161 +1216,64 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1168 | const void *data, | 1216 | const void *data, |
1169 | unsigned int queue_priority, | 1217 | unsigned int queue_priority, |
1170 | unsigned int max_queue_size, | 1218 | unsigned int max_queue_size, |
1171 | struct GNUNET_TIME_Relative timeout, | ||
1172 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1219 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1173 | void *cont_cls) | 1220 | void *cont_cls) |
1174 | { | 1221 | { |
1175 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1222 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1176 | struct DataMessage *dm; | 1223 | struct DataMessage *dm; |
1177 | size_t msize; | 1224 | struct GNUNET_MQ_Envelope *env; |
1178 | union QueueContext qc; | 1225 | union QueueContext qc; |
1179 | 1226 | ||
1180 | if (cont == NULL) | 1227 | if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
1228 | { | ||
1229 | GNUNET_break (0); | ||
1230 | return NULL; | ||
1231 | } | ||
1232 | if (NULL == cont) | ||
1181 | cont = &drop_status_cont; | 1233 | cont = &drop_status_cont; |
1182 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1234 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1183 | "Asked to remove %u bytes under key `%s'\n", | 1235 | "Asked to remove %u bytes under key `%s'\n", |
1184 | size, | 1236 | size, |
1185 | GNUNET_h2s (key)); | 1237 | GNUNET_h2s (key)); |
1238 | env = GNUNET_MQ_msg_extra (dm, | ||
1239 | size, | ||
1240 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | ||
1241 | dm->rid = htonl (0); | ||
1242 | dm->size = htonl (size); | ||
1243 | dm->type = htonl (0); | ||
1244 | dm->priority = htonl (0); | ||
1245 | dm->anonymity = htonl (0); | ||
1246 | dm->uid = GNUNET_htonll (0); | ||
1247 | dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); | ||
1248 | dm->key = *key; | ||
1249 | memcpy (&dm[1], | ||
1250 | data, | ||
1251 | size); | ||
1252 | |||
1186 | qc.sc.cont = cont; | 1253 | qc.sc.cont = cont; |
1187 | qc.sc.cont_cls = cont_cls; | 1254 | qc.sc.cont_cls = cont_cls; |
1188 | msize = sizeof (struct DataMessage) + size; | 1255 | |
1189 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | ||
1190 | qe = make_queue_entry (h, | 1256 | qe = make_queue_entry (h, |
1191 | msize, | 1257 | env, |
1192 | queue_priority, | 1258 | queue_priority, |
1193 | max_queue_size, | 1259 | max_queue_size, |
1194 | timeout, | 1260 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1195 | &process_status_message, | ||
1196 | &qc); | 1261 | &qc); |
1197 | if (qe == NULL) | 1262 | if (NULL == qe) |
1198 | { | 1263 | { |
1199 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1264 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1200 | "Could not create queue entry for REMOVE\n"); | 1265 | "Could not create queue entry for REMOVE\n"); |
1201 | return NULL; | 1266 | return NULL; |
1202 | } | 1267 | } |
1203 | GNUNET_STATISTICS_update (h->stats, | 1268 | GNUNET_STATISTICS_update (h->stats, |
1204 | gettext_noop ("# REMOVE requests executed"), 1, | 1269 | gettext_noop ("# REMOVE requests executed"), |
1270 | 1, | ||
1205 | GNUNET_NO); | 1271 | GNUNET_NO); |
1206 | dm = (struct DataMessage *) &qe[1]; | ||
1207 | dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | ||
1208 | dm->header.size = htons (msize); | ||
1209 | dm->rid = htonl (0); | ||
1210 | dm->size = htonl (size); | ||
1211 | dm->type = htonl (0); | ||
1212 | dm->priority = htonl (0); | ||
1213 | dm->anonymity = htonl (0); | ||
1214 | dm->uid = GNUNET_htonll (0); | ||
1215 | dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); | ||
1216 | dm->key = *key; | ||
1217 | memcpy (&dm[1], data, size); | ||
1218 | process_queue (h); | 1272 | process_queue (h); |
1219 | return qe; | 1273 | return qe; |
1220 | } | 1274 | } |
1221 | 1275 | ||
1222 | 1276 | ||
1223 | /** | ||
1224 | * Type of a function to call when we receive a message | ||
1225 | * from the service. | ||
1226 | * | ||
1227 | * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` | ||
1228 | * @param msg message received, NULL on timeout or fatal error | ||
1229 | */ | ||
1230 | static void | ||
1231 | process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) | ||
1232 | { | ||
1233 | struct GNUNET_DATASTORE_Handle *h = cls; | ||
1234 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
1235 | struct ResultContext rc; | ||
1236 | const struct DataMessage *dm; | ||
1237 | int was_transmitted; | ||
1238 | |||
1239 | if (NULL == msg) | ||
1240 | { | ||
1241 | qe = h->queue_head; | ||
1242 | GNUNET_assert (NULL != qe); | ||
1243 | rc = qe->qc.rc; | ||
1244 | was_transmitted = qe->was_transmitted; | ||
1245 | free_queue_entry (qe); | ||
1246 | if (GNUNET_YES == was_transmitted) | ||
1247 | { | ||
1248 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1249 | "Failed to receive response from database.\n"); | ||
1250 | do_disconnect (h); | ||
1251 | } | ||
1252 | else | ||
1253 | { | ||
1254 | process_queue (h); | ||
1255 | } | ||
1256 | if (NULL != rc.proc) | ||
1257 | rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, | ||
1258 | 0); | ||
1259 | return; | ||
1260 | } | ||
1261 | if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | ||
1262 | { | ||
1263 | GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); | ||
1264 | qe = h->queue_head; | ||
1265 | rc = qe->qc.rc; | ||
1266 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1267 | free_queue_entry (qe); | ||
1268 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1269 | "Received end of result set, new queue size is %u\n", h->queue_size); | ||
1270 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1271 | h->result_count = 0; | ||
1272 | process_queue (h); | ||
1273 | if (NULL != rc.proc) | ||
1274 | rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, | ||
1275 | 0); | ||
1276 | return; | ||
1277 | } | ||
1278 | qe = h->queue_head; | ||
1279 | GNUNET_assert (NULL != qe); | ||
1280 | rc = qe->qc.rc; | ||
1281 | if (GNUNET_YES != qe->was_transmitted) | ||
1282 | { | ||
1283 | GNUNET_break (0); | ||
1284 | free_queue_entry (qe); | ||
1285 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1286 | do_disconnect (h); | ||
1287 | if (rc.proc != NULL) | ||
1288 | rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, | ||
1289 | 0); | ||
1290 | return; | ||
1291 | } | ||
1292 | if ((ntohs (msg->size) < sizeof (struct DataMessage)) || | ||
1293 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || | ||
1294 | (ntohs (msg->size) != | ||
1295 | sizeof (struct DataMessage) + | ||
1296 | ntohl (((const struct DataMessage *) msg)->size))) | ||
1297 | { | ||
1298 | GNUNET_break (0); | ||
1299 | free_queue_entry (qe); | ||
1300 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1301 | do_disconnect (h); | ||
1302 | if (rc.proc != NULL) | ||
1303 | rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, | ||
1304 | 0); | ||
1305 | return; | ||
1306 | } | ||
1307 | #if INSANE_STATISTICS | ||
1308 | GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, | ||
1309 | GNUNET_NO); | ||
1310 | #endif | ||
1311 | dm = (const struct DataMessage *) msg; | ||
1312 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1313 | "Received result %llu with type %u and size %u with key %s\n", | ||
1314 | (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), | ||
1315 | ntohl (dm->size), GNUNET_h2s (&dm->key)); | ||
1316 | free_queue_entry (qe); | ||
1317 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1318 | process_queue (h); | ||
1319 | if (rc.proc != NULL) | ||
1320 | rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), | ||
1321 | ntohl (dm->priority), ntohl (dm->anonymity), | ||
1322 | GNUNET_TIME_absolute_ntoh (dm->expiration), | ||
1323 | GNUNET_ntohll (dm->uid)); | ||
1324 | } | ||
1325 | |||
1326 | 1277 | ||
1327 | /** | 1278 | /** |
1328 | * Get a random value from the datastore for content replication. | 1279 | * Get a random value from the datastore for content replication. |
@@ -1335,7 +1286,6 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) | |||
1335 | * @param queue_priority ranking of this request in the priority queue | 1286 | * @param queue_priority ranking of this request in the priority queue |
1336 | * @param max_queue_size at what queue size should this request be dropped | 1287 | * @param max_queue_size at what queue size should this request be dropped |
1337 | * (if other requests of higher priority are in the queue) | 1288 | * (if other requests of higher priority are in the queue) |
1338 | * @param timeout how long to wait at most for a response | ||
1339 | * @param proc function to call on a random value; it | 1289 | * @param proc function to call on a random value; it |
1340 | * will be called once with a value (if available) | 1290 | * will be called once with a value (if available) |
1341 | * and always once with a value of NULL. | 1291 | * and always once with a value of NULL. |
@@ -1347,23 +1297,27 @@ struct GNUNET_DATASTORE_QueueEntry * | |||
1347 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | 1297 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, |
1348 | unsigned int queue_priority, | 1298 | unsigned int queue_priority, |
1349 | unsigned int max_queue_size, | 1299 | unsigned int max_queue_size, |
1350 | struct GNUNET_TIME_Relative timeout, | ||
1351 | GNUNET_DATASTORE_DatumProcessor proc, | 1300 | GNUNET_DATASTORE_DatumProcessor proc, |
1352 | void *proc_cls) | 1301 | void *proc_cls) |
1353 | { | 1302 | { |
1354 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1303 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1304 | struct GNUNET_MQ_Envelope *env; | ||
1355 | struct GNUNET_MessageHeader *m; | 1305 | struct GNUNET_MessageHeader *m; |
1356 | union QueueContext qc; | 1306 | union QueueContext qc; |
1357 | 1307 | ||
1358 | GNUNET_assert (NULL != proc); | 1308 | GNUNET_assert (NULL != proc); |
1359 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1309 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1360 | "Asked to get replication entry in %s\n", | 1310 | "Asked to get replication entry\n"); |
1361 | GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); | 1311 | env = GNUNET_MQ_msg (m, |
1312 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); | ||
1362 | qc.rc.proc = proc; | 1313 | qc.rc.proc = proc; |
1363 | qc.rc.proc_cls = proc_cls; | 1314 | qc.rc.proc_cls = proc_cls; |
1364 | qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), | 1315 | qe = make_queue_entry (h, |
1365 | queue_priority, max_queue_size, timeout, | 1316 | env, |
1366 | &process_result_message, &qc); | 1317 | queue_priority, |
1318 | max_queue_size, | ||
1319 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | ||
1320 | &qc); | ||
1367 | if (NULL == qe) | 1321 | if (NULL == qe) |
1368 | { | 1322 | { |
1369 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1323 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1374,9 +1328,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1374 | gettext_noop | 1328 | gettext_noop |
1375 | ("# GET REPLICATION requests executed"), 1, | 1329 | ("# GET REPLICATION requests executed"), 1, |
1376 | GNUNET_NO); | 1330 | GNUNET_NO); |
1377 | m = (struct GNUNET_MessageHeader *) &qe[1]; | ||
1378 | m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); | ||
1379 | m->size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
1380 | process_queue (h); | 1331 | process_queue (h); |
1381 | return qe; | 1332 | return qe; |
1382 | } | 1333 | } |
@@ -1393,7 +1344,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1393 | * @param queue_priority ranking of this request in the priority queue | 1344 | * @param queue_priority ranking of this request in the priority queue |
1394 | * @param max_queue_size at what queue size should this request be dropped | 1345 | * @param max_queue_size at what queue size should this request be dropped |
1395 | * (if other requests of higher priority are in the queue) | 1346 | * (if other requests of higher priority are in the queue) |
1396 | * @param timeout how long to wait at most for a response | ||
1397 | * @param type allowed type for the operation (never zero) | 1347 | * @param type allowed type for the operation (never zero) |
1398 | * @param proc function to call on a random value; it | 1348 | * @param proc function to call on a random value; it |
1399 | * will be called once with a value (if available) | 1349 | * will be called once with a value (if available) |
@@ -1407,31 +1357,32 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1407 | uint64_t offset, | 1357 | uint64_t offset, |
1408 | unsigned int queue_priority, | 1358 | unsigned int queue_priority, |
1409 | unsigned int max_queue_size, | 1359 | unsigned int max_queue_size, |
1410 | struct GNUNET_TIME_Relative timeout, | ||
1411 | enum GNUNET_BLOCK_Type type, | 1360 | enum GNUNET_BLOCK_Type type, |
1412 | GNUNET_DATASTORE_DatumProcessor proc, | 1361 | GNUNET_DATASTORE_DatumProcessor proc, |
1413 | void *proc_cls) | 1362 | void *proc_cls) |
1414 | { | 1363 | { |
1415 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1364 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1365 | struct GNUNET_MQ_Envelope *env; | ||
1416 | struct GetZeroAnonymityMessage *m; | 1366 | struct GetZeroAnonymityMessage *m; |
1417 | union QueueContext qc; | 1367 | union QueueContext qc; |
1418 | 1368 | ||
1419 | GNUNET_assert (NULL != proc); | 1369 | GNUNET_assert (NULL != proc); |
1420 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | 1370 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1421 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1371 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1422 | "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", | 1372 | "Asked to get %llu-th zero-anonymity entry of type %d\n", |
1423 | (unsigned long long) offset, | 1373 | (unsigned long long) offset, |
1424 | type, | 1374 | type); |
1425 | GNUNET_STRINGS_relative_time_to_string (timeout, | 1375 | env = GNUNET_MQ_msg (m, |
1426 | GNUNET_YES)); | 1376 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1377 | m->type = htonl ((uint32_t) type); | ||
1378 | m->offset = GNUNET_htonll (offset); | ||
1427 | qc.rc.proc = proc; | 1379 | qc.rc.proc = proc; |
1428 | qc.rc.proc_cls = proc_cls; | 1380 | qc.rc.proc_cls = proc_cls; |
1429 | qe = make_queue_entry (h, | 1381 | qe = make_queue_entry (h, |
1430 | sizeof (struct GetZeroAnonymityMessage), | 1382 | env, |
1431 | queue_priority, | 1383 | queue_priority, |
1432 | max_queue_size, | 1384 | max_queue_size, |
1433 | timeout, | 1385 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
1434 | &process_result_message, | ||
1435 | &qc); | 1386 | &qc); |
1436 | if (NULL == qe) | 1387 | if (NULL == qe) |
1437 | { | 1388 | { |
@@ -1443,11 +1394,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1443 | gettext_noop | 1394 | gettext_noop |
1444 | ("# GET ZERO ANONYMITY requests executed"), 1, | 1395 | ("# GET ZERO ANONYMITY requests executed"), 1, |
1445 | GNUNET_NO); | 1396 | GNUNET_NO); |
1446 | m = (struct GetZeroAnonymityMessage *) &qe[1]; | ||
1447 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | ||
1448 | m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); | ||
1449 | m->type = htonl ((uint32_t) type); | ||
1450 | m->offset = GNUNET_htonll (offset); | ||
1451 | process_queue (h); | 1397 | process_queue (h); |
1452 | return qe; | 1398 | return qe; |
1453 | } | 1399 | } |
@@ -1467,7 +1413,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1467 | * @param queue_priority ranking of this request in the priority queue | 1413 | * @param queue_priority ranking of this request in the priority queue |
1468 | * @param max_queue_size at what queue size should this request be dropped | 1414 | * @param max_queue_size at what queue size should this request be dropped |
1469 | * (if other requests of higher priority are in the queue) | 1415 | * (if other requests of higher priority are in the queue) |
1470 | * @param timeout how long to wait at most for a response | ||
1471 | * @param proc function to call on each matching value; | 1416 | * @param proc function to call on each matching value; |
1472 | * will be called once with a NULL value at the end | 1417 | * will be called once with a NULL value at the end |
1473 | * @param proc_cls closure for @a proc | 1418 | * @param proc_cls closure for @a proc |
@@ -1481,28 +1426,44 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1481 | enum GNUNET_BLOCK_Type type, | 1426 | enum GNUNET_BLOCK_Type type, |
1482 | unsigned int queue_priority, | 1427 | unsigned int queue_priority, |
1483 | unsigned int max_queue_size, | 1428 | unsigned int max_queue_size, |
1484 | struct GNUNET_TIME_Relative timeout, | ||
1485 | GNUNET_DATASTORE_DatumProcessor proc, | 1429 | GNUNET_DATASTORE_DatumProcessor proc, |
1486 | void *proc_cls) | 1430 | void *proc_cls) |
1487 | { | 1431 | { |
1488 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1432 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1433 | struct GNUNET_MQ_Envelope *env; | ||
1434 | struct GetKeyMessage *gkm; | ||
1489 | struct GetMessage *gm; | 1435 | struct GetMessage *gm; |
1490 | union QueueContext qc; | 1436 | union QueueContext qc; |
1491 | 1437 | ||
1492 | GNUNET_assert (NULL != proc); | 1438 | GNUNET_assert (NULL != proc); |
1493 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1439 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1494 | "Asked to look for data of type %u under key `%s'\n", | 1440 | "Asked to look for data of type %u under key `%s'\n", |
1495 | (unsigned int) type, GNUNET_h2s (key)); | 1441 | (unsigned int) type, |
1442 | GNUNET_h2s (key)); | ||
1443 | if (NULL == key) | ||
1444 | { | ||
1445 | env = GNUNET_MQ_msg (gm, | ||
1446 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); | ||
1447 | gm->type = htonl (type); | ||
1448 | gm->offset = GNUNET_htonll (offset); | ||
1449 | } | ||
1450 | else | ||
1451 | { | ||
1452 | env = GNUNET_MQ_msg (gkm, | ||
1453 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); | ||
1454 | gkm->type = htonl (type); | ||
1455 | gkm->offset = GNUNET_htonll (offset); | ||
1456 | gkm->key = *key; | ||
1457 | } | ||
1496 | qc.rc.proc = proc; | 1458 | qc.rc.proc = proc; |
1497 | qc.rc.proc_cls = proc_cls; | 1459 | qc.rc.proc_cls = proc_cls; |
1498 | qe = make_queue_entry (h, | 1460 | qe = make_queue_entry (h, |
1499 | sizeof (struct GetMessage), | 1461 | env, |
1500 | queue_priority, | 1462 | queue_priority, |
1501 | max_queue_size, | 1463 | max_queue_size, |
1502 | timeout, | 1464 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
1503 | &process_result_message, | ||
1504 | &qc); | 1465 | &qc); |
1505 | if (qe == NULL) | 1466 | if (NULL == qe) |
1506 | { | 1467 | { |
1507 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1468 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1508 | "Could not queue request for `%s'\n", | 1469 | "Could not queue request for `%s'\n", |
@@ -1515,20 +1476,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1515 | 1, | 1476 | 1, |
1516 | GNUNET_NO); | 1477 | GNUNET_NO); |
1517 | #endif | 1478 | #endif |
1518 | gm = (struct GetMessage *) &qe[1]; | ||
1519 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); | ||
1520 | gm->type = htonl (type); | ||
1521 | gm->offset = GNUNET_htonll (offset); | ||
1522 | if (key != NULL) | ||
1523 | { | ||
1524 | gm->header.size = htons (sizeof (struct GetMessage)); | ||
1525 | gm->key = *key; | ||
1526 | } | ||
1527 | else | ||
1528 | { | ||
1529 | gm->header.size = | ||
1530 | htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)); | ||
1531 | } | ||
1532 | process_queue (h); | 1479 | process_queue (h); |
1533 | return qe; | 1480 | return qe; |
1534 | } | 1481 | } |
@@ -1543,16 +1490,14 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1543 | void | 1490 | void |
1544 | GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | 1491 | GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) |
1545 | { | 1492 | { |
1546 | struct GNUNET_DATASTORE_Handle *h; | 1493 | struct GNUNET_DATASTORE_Handle *h = qe->h; |
1547 | 1494 | ||
1548 | GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); | ||
1549 | h = qe->h; | ||
1550 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1495 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1551 | "Pending DATASTORE request %p cancelled (%d, %d)\n", | 1496 | "Pending DATASTORE request %p cancelled (%d, %d)\n", |
1552 | qe, | 1497 | qe, |
1553 | qe->was_transmitted, | 1498 | NULL == qe->env, |
1554 | h->queue_head == qe); | 1499 | h->queue_head == qe); |
1555 | if (GNUNET_YES == qe->was_transmitted) | 1500 | if (NULL == qe->env) |
1556 | { | 1501 | { |
1557 | free_queue_entry (qe); | 1502 | free_queue_entry (qe); |
1558 | h->skip_next_messages++; | 1503 | h->skip_next_messages++; |