diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 1150 |
1 files changed, 537 insertions, 613 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index f355dfab5..85e402a4d 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -82,7 +82,7 @@ union QueueContext | |||
82 | { | 82 | { |
83 | 83 | ||
84 | struct StatusContext sc; | 84 | struct StatusContext sc; |
85 | 85 | ||
86 | struct ResultContext rc; | 86 | struct ResultContext rc; |
87 | 87 | ||
88 | }; | 88 | }; |
@@ -121,7 +121,7 @@ struct GNUNET_DATASTORE_QueueEntry | |||
121 | * Function to call after transmission of the request. | 121 | * Function to call after transmission of the request. |
122 | */ | 122 | */ |
123 | GNUNET_DATASTORE_ContinuationWithStatus cont; | 123 | GNUNET_DATASTORE_ContinuationWithStatus cont; |
124 | 124 | ||
125 | /** | 125 | /** |
126 | * Closure for 'cont'. | 126 | * Closure for 'cont'. |
127 | */ | 127 | */ |
@@ -167,7 +167,7 @@ struct GNUNET_DATASTORE_QueueEntry | |||
167 | * multiple of 64 bits. | 167 | * multiple of 64 bits. |
168 | */ | 168 | */ |
169 | int was_transmitted; | 169 | int was_transmitted; |
170 | 170 | ||
171 | }; | 171 | }; |
172 | 172 | ||
173 | /** | 173 | /** |
@@ -250,22 +250,19 @@ struct GNUNET_DATASTORE_Handle | |||
250 | * @return handle to use to access the service | 250 | * @return handle to use to access the service |
251 | */ | 251 | */ |
252 | struct GNUNET_DATASTORE_Handle * | 252 | struct GNUNET_DATASTORE_Handle * |
253 | GNUNET_DATASTORE_connect (const struct | 253 | GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) |
254 | GNUNET_CONFIGURATION_Handle | ||
255 | *cfg) | ||
256 | { | 254 | { |
257 | struct GNUNET_CLIENT_Connection *c; | 255 | struct GNUNET_CLIENT_Connection *c; |
258 | struct GNUNET_DATASTORE_Handle *h; | 256 | struct GNUNET_DATASTORE_Handle *h; |
259 | 257 | ||
260 | c = GNUNET_CLIENT_connect ("datastore", cfg); | 258 | c = GNUNET_CLIENT_connect ("datastore", cfg); |
261 | if (c == NULL) | 259 | if (c == NULL) |
262 | return NULL; /* oops */ | 260 | return NULL; /* oops */ |
263 | h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + | 261 | h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + |
264 | GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); | 262 | GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); |
265 | h->client = c; | 263 | h->client = c; |
266 | h->cfg = cfg; | 264 | h->cfg = cfg; |
267 | h->stats = GNUNET_STATISTICS_create ("datastore-api", | 265 | h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); |
268 | cfg); | ||
269 | return h; | 266 | return h; |
270 | } | 267 | } |
271 | 268 | ||
@@ -279,26 +276,24 @@ GNUNET_DATASTORE_connect (const struct | |||
279 | * @return number of bytes written to buf | 276 | * @return number of bytes written to buf |
280 | */ | 277 | */ |
281 | static size_t | 278 | static size_t |
282 | transmit_drop (void *cls, | 279 | transmit_drop (void *cls, size_t size, void *buf) |
283 | size_t size, | ||
284 | void *buf) | ||
285 | { | 280 | { |
286 | struct GNUNET_DATASTORE_Handle *h = cls; | 281 | struct GNUNET_DATASTORE_Handle *h = cls; |
287 | struct GNUNET_MessageHeader *hdr; | 282 | struct GNUNET_MessageHeader *hdr; |
288 | 283 | ||
289 | if (buf == NULL) | 284 | if (buf == NULL) |
290 | { | 285 | { |
291 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 286 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
292 | _("Failed to transmit request to drop database.\n")); | 287 | _("Failed to transmit request to drop database.\n")); |
293 | GNUNET_DATASTORE_disconnect (h, GNUNET_NO); | 288 | GNUNET_DATASTORE_disconnect (h, GNUNET_NO); |
294 | return 0; | 289 | return 0; |
295 | } | 290 | } |
296 | GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); | 291 | GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); |
297 | hdr = buf; | 292 | hdr = buf; |
298 | hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); | 293 | hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); |
299 | hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); | 294 | hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); |
300 | GNUNET_DATASTORE_disconnect (h, GNUNET_NO); | 295 | GNUNET_DATASTORE_disconnect (h, GNUNET_NO); |
301 | return sizeof(struct GNUNET_MessageHeader); | 296 | return sizeof (struct GNUNET_MessageHeader); |
302 | } | 297 | } |
303 | 298 | ||
304 | 299 | ||
@@ -310,55 +305,51 @@ transmit_drop (void *cls, | |||
310 | * @param drop set to GNUNET_YES to delete all data in datastore (!) | 305 | * @param drop set to GNUNET_YES to delete all data in datastore (!) |
311 | */ | 306 | */ |
312 | void | 307 | void |
313 | GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | 308 | GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) |
314 | int drop) | ||
315 | { | 309 | { |
316 | struct GNUNET_DATASTORE_QueueEntry *qe; | 310 | struct GNUNET_DATASTORE_QueueEntry *qe; |
317 | 311 | ||
318 | #if DEBUG_DATASTORE | 312 | #if DEBUG_DATASTORE |
319 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 313 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); |
320 | "Datastore disconnect\n"); | ||
321 | #endif | 314 | #endif |
322 | if (NULL != h->th) | 315 | if (NULL != h->th) |
323 | { | 316 | { |
324 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | 317 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); |
325 | h->th = NULL; | 318 | h->th = NULL; |
326 | } | 319 | } |
327 | if (h->client != NULL) | 320 | if (h->client != NULL) |
328 | { | 321 | { |
329 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | 322 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
330 | h->client = NULL; | 323 | h->client = NULL; |
331 | } | 324 | } |
332 | if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | 325 | if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) |
333 | { | 326 | { |
334 | GNUNET_SCHEDULER_cancel (h->reconnect_task); | 327 | GNUNET_SCHEDULER_cancel (h->reconnect_task); |
335 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | 328 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
336 | } | 329 | } |
337 | while (NULL != (qe = h->queue_head)) | 330 | while (NULL != (qe = h->queue_head)) |
331 | { | ||
332 | GNUNET_assert (NULL != qe->response_proc); | ||
333 | qe->response_proc (h, NULL); | ||
334 | } | ||
335 | if (GNUNET_YES == drop) | ||
336 | { | ||
337 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); | ||
338 | if (h->client != NULL) | ||
338 | { | 339 | { |
339 | GNUNET_assert (NULL != qe->response_proc); | 340 | if (NULL != |
340 | qe->response_proc (h, NULL); | 341 | GNUNET_CLIENT_notify_transmit_ready (h->client, |
341 | } | 342 | sizeof (struct |
342 | if (GNUNET_YES == drop) | 343 | GNUNET_MessageHeader), |
343 | { | 344 | GNUNET_TIME_UNIT_MINUTES, |
344 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); | 345 | GNUNET_YES, &transmit_drop, h)) |
345 | if (h->client != NULL) | 346 | return; |
346 | { | 347 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
347 | if (NULL != | 348 | h->client = NULL; |
348 | GNUNET_CLIENT_notify_transmit_ready (h->client, | ||
349 | sizeof(struct GNUNET_MessageHeader), | ||
350 | GNUNET_TIME_UNIT_MINUTES, | ||
351 | GNUNET_YES, | ||
352 | &transmit_drop, | ||
353 | h)) | ||
354 | return; | ||
355 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | ||
356 | h->client = NULL; | ||
357 | } | ||
358 | GNUNET_break (0); | ||
359 | } | 349 | } |
360 | GNUNET_STATISTICS_destroy (h->stats, | 350 | GNUNET_break (0); |
361 | GNUNET_NO); | 351 | } |
352 | GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); | ||
362 | h->stats = NULL; | 353 | h->stats = NULL; |
363 | GNUNET_free (h); | 354 | GNUNET_free (h); |
364 | } | 355 | } |
@@ -371,20 +362,18 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | |||
371 | * @param tc scheduler context | 362 | * @param tc scheduler context |
372 | */ | 363 | */ |
373 | static void | 364 | static void |
374 | timeout_queue_entry (void *cls, | 365 | timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
375 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
376 | { | 366 | { |
377 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | 367 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; |
378 | 368 | ||
379 | GNUNET_STATISTICS_update (qe->h->stats, | 369 | GNUNET_STATISTICS_update (qe->h->stats, |
380 | gettext_noop ("# queue entry timeouts"), | 370 | gettext_noop ("# queue entry timeouts"), |
381 | 1, | 371 | 1, GNUNET_NO); |
382 | GNUNET_NO); | ||
383 | qe->task = GNUNET_SCHEDULER_NO_TASK; | 372 | qe->task = GNUNET_SCHEDULER_NO_TASK; |
384 | GNUNET_assert (qe->was_transmitted == GNUNET_NO); | 373 | GNUNET_assert (qe->was_transmitted == GNUNET_NO); |
385 | #if DEBUG_DATASTORE | 374 | #if DEBUG_DATASTORE |
386 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 375 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
387 | "Timeout of request in datastore queue\n"); | 376 | "Timeout of request in datastore queue\n"); |
388 | #endif | 377 | #endif |
389 | qe->response_proc (qe->h, NULL); | 378 | qe->response_proc (qe->h, NULL); |
390 | } | 379 | } |
@@ -406,12 +395,12 @@ timeout_queue_entry (void *cls, | |||
406 | */ | 395 | */ |
407 | static struct GNUNET_DATASTORE_QueueEntry * | 396 | static struct GNUNET_DATASTORE_QueueEntry * |
408 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | 397 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, |
409 | size_t msize, | 398 | size_t msize, |
410 | unsigned int queue_priority, | 399 | unsigned int queue_priority, |
411 | unsigned int max_queue_size, | 400 | unsigned int max_queue_size, |
412 | struct GNUNET_TIME_Relative timeout, | 401 | struct GNUNET_TIME_Relative timeout, |
413 | GNUNET_CLIENT_MessageHandler response_proc, | 402 | GNUNET_CLIENT_MessageHandler response_proc, |
414 | const union QueueContext *qc) | 403 | const union QueueContext *qc) |
415 | { | 404 | { |
416 | struct GNUNET_DATASTORE_QueueEntry *ret; | 405 | struct GNUNET_DATASTORE_QueueEntry *ret; |
417 | struct GNUNET_DATASTORE_QueueEntry *pos; | 406 | struct GNUNET_DATASTORE_QueueEntry *pos; |
@@ -419,21 +408,18 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
419 | 408 | ||
420 | c = 0; | 409 | c = 0; |
421 | pos = h->queue_head; | 410 | pos = h->queue_head; |
422 | while ( (pos != NULL) && | 411 | while ((pos != NULL) && |
423 | (c < max_queue_size) && | 412 | (c < max_queue_size) && (pos->priority >= queue_priority)) |
424 | (pos->priority >= queue_priority) ) | 413 | { |
425 | { | 414 | c++; |
426 | c++; | 415 | pos = pos->next; |
427 | pos = pos->next; | 416 | } |
428 | } | ||
429 | if (c >= max_queue_size) | 417 | if (c >= max_queue_size) |
430 | { | 418 | { |
431 | GNUNET_STATISTICS_update (h->stats, | 419 | GNUNET_STATISTICS_update (h->stats, |
432 | gettext_noop ("# queue overflows"), | 420 | gettext_noop ("# queue overflows"), 1, GNUNET_NO); |
433 | 1, | 421 | return NULL; |
434 | GNUNET_NO); | 422 | } |
435 | return NULL; | ||
436 | } | ||
437 | ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); | 423 | ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); |
438 | ret->h = h; | 424 | ret->h = h; |
439 | ret->response_proc = response_proc; | 425 | ret->response_proc = response_proc; |
@@ -444,61 +430,49 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
444 | ret->message_size = msize; | 430 | ret->message_size = msize; |
445 | ret->was_transmitted = GNUNET_NO; | 431 | ret->was_transmitted = GNUNET_NO; |
446 | if (pos == NULL) | 432 | if (pos == NULL) |
447 | { | 433 | { |
448 | /* append at the tail */ | 434 | /* append at the tail */ |
449 | pos = h->queue_tail; | 435 | pos = h->queue_tail; |
450 | } | 436 | } |
451 | else | 437 | else |
452 | { | 438 | { |
453 | pos = pos->prev; | 439 | pos = pos->prev; |
454 | /* do not insert at HEAD if HEAD query was already | 440 | /* do not insert at HEAD if HEAD query was already |
455 | transmitted and we are still receiving replies! */ | 441 | * transmitted and we are still receiving replies! */ |
456 | if ( (pos == NULL) && | 442 | if ((pos == NULL) && (h->queue_head->was_transmitted)) |
457 | (h->queue_head->was_transmitted) ) | 443 | pos = h->queue_head; |
458 | pos = h->queue_head; | 444 | } |
459 | } | ||
460 | c++; | 445 | c++; |
461 | GNUNET_STATISTICS_update (h->stats, | 446 | GNUNET_STATISTICS_update (h->stats, |
462 | gettext_noop ("# queue entries created"), | 447 | gettext_noop ("# queue entries created"), |
463 | 1, | 448 | 1, GNUNET_NO); |
464 | GNUNET_NO); | 449 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); |
465 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, | ||
466 | h->queue_tail, | ||
467 | pos, | ||
468 | ret); | ||
469 | h->queue_size++; | 450 | h->queue_size++; |
470 | ret->task = GNUNET_SCHEDULER_add_delayed (timeout, | 451 | ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); |
471 | &timeout_queue_entry, | ||
472 | ret); | ||
473 | pos = ret->next; | 452 | pos = ret->next; |
474 | while (pos != NULL) | 453 | while (pos != NULL) |
454 | { | ||
455 | if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) | ||
475 | { | 456 | { |
476 | if ( (pos->max_queue < h->queue_size) && | 457 | GNUNET_assert (pos->response_proc != NULL); |
477 | (pos->was_transmitted == GNUNET_NO) ) | 458 | /* move 'pos' element to head so that it will be |
478 | { | 459 | * killed on 'NULL' call below */ |
479 | GNUNET_assert (pos->response_proc != NULL); | ||
480 | /* move 'pos' element to head so that it will be | ||
481 | killed on 'NULL' call below */ | ||
482 | #if DEBUG_DATASTORE | 460 | #if DEBUG_DATASTORE |
483 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 461 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
484 | "Dropping request from datastore queue\n"); | 462 | "Dropping request from datastore queue\n"); |
485 | #endif | 463 | #endif |
486 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | 464 | GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); |
487 | h->queue_tail, | 465 | GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); |
488 | pos); | 466 | GNUNET_STATISTICS_update (h->stats, |
489 | GNUNET_CONTAINER_DLL_insert (h->queue_head, | 467 | gettext_noop |
490 | h->queue_tail, | 468 | ("# Requests dropped from datastore queue"), 1, |
491 | pos); | 469 | GNUNET_NO); |
492 | GNUNET_STATISTICS_update (h->stats, | 470 | GNUNET_assert (h->queue_head == pos); |
493 | gettext_noop ("# Requests dropped from datastore queue"), | 471 | pos->response_proc (h, NULL); |
494 | 1, | 472 | break; |
495 | GNUNET_NO); | ||
496 | GNUNET_assert (h->queue_head == pos); | ||
497 | pos->response_proc (h, NULL); | ||
498 | break; | ||
499 | } | ||
500 | pos = pos->next; | ||
501 | } | 473 | } |
474 | pos = pos->next; | ||
475 | } | ||
502 | return ret; | 476 | return ret; |
503 | } | 477 | } |
504 | 478 | ||
@@ -509,8 +483,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, | |||
509 | * | 483 | * |
510 | * @param h handle to the datastore | 484 | * @param h handle to the datastore |
511 | */ | 485 | */ |
512 | static void | 486 | static void process_queue (struct GNUNET_DATASTORE_Handle *h); |
513 | process_queue (struct GNUNET_DATASTORE_Handle *h); | ||
514 | 487 | ||
515 | 488 | ||
516 | /** | 489 | /** |
@@ -520,8 +493,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h); | |||
520 | * @param tc scheduler context | 493 | * @param tc scheduler context |
521 | */ | 494 | */ |
522 | static void | 495 | static void |
523 | try_reconnect (void *cls, | 496 | try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
524 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
525 | { | 497 | { |
526 | struct GNUNET_DATASTORE_Handle *h = cls; | 498 | struct GNUNET_DATASTORE_Handle *h = cls; |
527 | 499 | ||
@@ -534,18 +506,17 @@ try_reconnect (void *cls, | |||
534 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | 506 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
535 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); | 507 | h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); |
536 | if (h->client == NULL) | 508 | if (h->client == NULL) |
537 | { | 509 | { |
538 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 510 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
539 | "DATASTORE reconnect failed (fatally)\n"); | 511 | "DATASTORE reconnect failed (fatally)\n"); |
540 | return; | 512 | return; |
541 | } | 513 | } |
542 | GNUNET_STATISTICS_update (h->stats, | 514 | GNUNET_STATISTICS_update (h->stats, |
543 | gettext_noop ("# datastore connections (re)created"), | 515 | gettext_noop |
544 | 1, | 516 | ("# datastore connections (re)created"), 1, |
545 | GNUNET_NO); | 517 | GNUNET_NO); |
546 | #if DEBUG_DATASTORE | 518 | #if DEBUG_DATASTORE |
547 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 519 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); |
548 | "Reconnected to DATASTORE\n"); | ||
549 | #endif | 520 | #endif |
550 | process_queue (h); | 521 | process_queue (h); |
551 | } | 522 | } |
@@ -561,25 +532,23 @@ static void | |||
561 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) | 532 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) |
562 | { | 533 | { |
563 | if (h->client == NULL) | 534 | if (h->client == NULL) |
564 | { | 535 | { |
565 | #if DEBUG_DATASTORE | 536 | #if DEBUG_DATASTORE |
566 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 537 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
567 | "client NULL in disconnect, will not try to reconnect\n"); | 538 | "client NULL in disconnect, will not try to reconnect\n"); |
568 | #endif | 539 | #endif |
569 | return; | 540 | return; |
570 | } | 541 | } |
571 | #if 0 | 542 | #if 0 |
572 | GNUNET_STATISTICS_update (stats, | 543 | GNUNET_STATISTICS_update (stats, |
573 | gettext_noop ("# reconnected to DATASTORE"), | 544 | gettext_noop ("# reconnected to DATASTORE"), |
574 | 1, | 545 | 1, GNUNET_NO); |
575 | GNUNET_NO); | ||
576 | #endif | 546 | #endif |
577 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | 547 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
578 | h->skip_next_messages = 0; | 548 | h->skip_next_messages = 0; |
579 | h->client = NULL; | 549 | h->client = NULL; |
580 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, | 550 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, |
581 | &try_reconnect, | 551 | &try_reconnect, h); |
582 | h); | ||
583 | } | 552 | } |
584 | 553 | ||
585 | 554 | ||
@@ -590,30 +559,28 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) | |||
590 | * @param cls the 'struct GNUNET_DATASTORE_Handle' | 559 | * @param cls the 'struct GNUNET_DATASTORE_Handle' |
591 | * @param msg the received message | 560 | * @param msg the received message |
592 | */ | 561 | */ |
593 | static void | 562 | static void |
594 | receive_cb (void *cls, | 563 | receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
595 | const struct GNUNET_MessageHeader *msg) | ||
596 | { | 564 | { |
597 | struct GNUNET_DATASTORE_Handle *h = cls; | 565 | struct GNUNET_DATASTORE_Handle *h = cls; |
598 | struct GNUNET_DATASTORE_QueueEntry *qe; | 566 | struct GNUNET_DATASTORE_QueueEntry *qe; |
599 | 567 | ||
600 | h->in_receive = GNUNET_NO; | 568 | h->in_receive = GNUNET_NO; |
601 | #if DEBUG_DATASTORE | 569 | #if DEBUG_DATASTORE |
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 570 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); |
603 | "Receiving reply from datastore\n"); | ||
604 | #endif | 571 | #endif |
605 | if (h->skip_next_messages > 0) | 572 | if (h->skip_next_messages > 0) |
606 | { | 573 | { |
607 | h->skip_next_messages--; | 574 | h->skip_next_messages--; |
608 | process_queue (h); | 575 | process_queue (h); |
609 | return; | 576 | return; |
610 | } | 577 | } |
611 | if (NULL == (qe = h->queue_head)) | 578 | if (NULL == (qe = h->queue_head)) |
612 | { | 579 | { |
613 | GNUNET_break (0); | 580 | GNUNET_break (0); |
614 | process_queue (h); | 581 | process_queue (h); |
615 | return; | 582 | return; |
616 | } | 583 | } |
617 | qe->response_proc (h, msg); | 584 | qe->response_proc (h, msg); |
618 | } | 585 | } |
619 | 586 | ||
@@ -627,9 +594,7 @@ receive_cb (void *cls, | |||
627 | * @return number of bytes written to buf | 594 | * @return number of bytes written to buf |
628 | */ | 595 | */ |
629 | static size_t | 596 | static size_t |
630 | transmit_request (void *cls, | 597 | transmit_request (void *cls, size_t size, void *buf) |
631 | size_t size, | ||
632 | void *buf) | ||
633 | { | 598 | { |
634 | struct GNUNET_DATASTORE_Handle *h = cls; | 599 | struct GNUNET_DATASTORE_Handle *h = cls; |
635 | struct GNUNET_DATASTORE_QueueEntry *qe; | 600 | struct GNUNET_DATASTORE_QueueEntry *qe; |
@@ -637,27 +602,25 @@ transmit_request (void *cls, | |||
637 | 602 | ||
638 | h->th = NULL; | 603 | h->th = NULL; |
639 | if (NULL == (qe = h->queue_head)) | 604 | if (NULL == (qe = h->queue_head)) |
640 | return 0; /* no entry in queue */ | 605 | return 0; /* no entry in queue */ |
641 | if (buf == NULL) | 606 | if (buf == NULL) |
642 | { | 607 | { |
643 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 608 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
644 | _("Failed to transmit request to DATASTORE.\n")); | 609 | _("Failed to transmit request to DATASTORE.\n")); |
645 | GNUNET_STATISTICS_update (h->stats, | 610 | GNUNET_STATISTICS_update (h->stats, |
646 | gettext_noop ("# transmission request failures"), | 611 | gettext_noop ("# transmission request failures"), |
647 | 1, | 612 | 1, GNUNET_NO); |
648 | GNUNET_NO); | 613 | do_disconnect (h); |
649 | do_disconnect (h); | 614 | return 0; |
650 | return 0; | 615 | } |
651 | } | ||
652 | if (size < (msize = qe->message_size)) | 616 | if (size < (msize = qe->message_size)) |
653 | { | 617 | { |
654 | process_queue (h); | 618 | process_queue (h); |
655 | return 0; | 619 | return 0; |
656 | } | 620 | } |
657 | #if DEBUG_DATASTORE | 621 | #if DEBUG_DATASTORE |
658 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 622 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
659 | "Transmitting %u byte request to DATASTORE\n", | 623 | "Transmitting %u byte request to DATASTORE\n", msize); |
660 | msize); | ||
661 | #endif | 624 | #endif |
662 | memcpy (buf, &qe[1], msize); | 625 | memcpy (buf, &qe[1], msize); |
663 | qe->was_transmitted = GNUNET_YES; | 626 | qe->was_transmitted = GNUNET_YES; |
@@ -666,13 +629,11 @@ transmit_request (void *cls, | |||
666 | GNUNET_assert (GNUNET_NO == h->in_receive); | 629 | GNUNET_assert (GNUNET_NO == h->in_receive); |
667 | h->in_receive = GNUNET_YES; | 630 | h->in_receive = GNUNET_YES; |
668 | GNUNET_CLIENT_receive (h->client, | 631 | GNUNET_CLIENT_receive (h->client, |
669 | &receive_cb, | 632 | &receive_cb, |
670 | h, | 633 | h, GNUNET_TIME_absolute_get_remaining (qe->timeout)); |
671 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | ||
672 | GNUNET_STATISTICS_update (h->stats, | 634 | GNUNET_STATISTICS_update (h->stats, |
673 | gettext_noop ("# bytes sent to datastore"), | 635 | gettext_noop ("# bytes sent to datastore"), |
674 | 1, | 636 | 1, GNUNET_NO); |
675 | GNUNET_NO); | ||
676 | return msize; | 637 | return msize; |
677 | } | 638 | } |
678 | 639 | ||
@@ -689,53 +650,47 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) | |||
689 | struct GNUNET_DATASTORE_QueueEntry *qe; | 650 | struct GNUNET_DATASTORE_QueueEntry *qe; |
690 | 651 | ||
691 | if (NULL == (qe = h->queue_head)) | 652 | if (NULL == (qe = h->queue_head)) |
692 | { | 653 | { |
693 | #if DEBUG_DATASTORE > 1 | 654 | #if DEBUG_DATASTORE > 1 |
694 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 655 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); |
695 | "Queue empty\n"); | ||
696 | #endif | 656 | #endif |
697 | return; /* no entry in queue */ | 657 | return; /* no entry in queue */ |
698 | } | 658 | } |
699 | if (qe->was_transmitted == GNUNET_YES) | 659 | if (qe->was_transmitted == GNUNET_YES) |
700 | { | 660 | { |
701 | #if DEBUG_DATASTORE > 1 | 661 | #if DEBUG_DATASTORE > 1 |
702 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 662 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); |
703 | "Head request already transmitted\n"); | ||
704 | #endif | 663 | #endif |
705 | return; /* waiting for replies */ | 664 | return; /* waiting for replies */ |
706 | } | 665 | } |
707 | if (h->th != NULL) | 666 | if (h->th != NULL) |
708 | { | 667 | { |
709 | #if DEBUG_DATASTORE > 1 | 668 | #if DEBUG_DATASTORE > 1 |
710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); |
711 | "Pending transmission request\n"); | ||
712 | #endif | 670 | #endif |
713 | return; /* request pending */ | 671 | return; /* request pending */ |
714 | } | 672 | } |
715 | if (h->client == NULL) | 673 | if (h->client == NULL) |
716 | { | 674 | { |
717 | #if DEBUG_DATASTORE > 1 | 675 | #if DEBUG_DATASTORE > 1 |
718 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 676 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); |
719 | "Not connected\n"); | ||
720 | #endif | 677 | #endif |
721 | return; /* waiting for reconnect */ | 678 | return; /* waiting for reconnect */ |
722 | } | 679 | } |
723 | if (GNUNET_YES == h->in_receive) | 680 | if (GNUNET_YES == h->in_receive) |
724 | { | 681 | { |
725 | /* wait for response to previous query */ | 682 | /* wait for response to previous query */ |
726 | return; | 683 | return; |
727 | } | 684 | } |
728 | #if DEBUG_DATASTORE | 685 | #if DEBUG_DATASTORE |
729 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 686 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
730 | "Queueing %u byte request to DATASTORE\n", | 687 | "Queueing %u byte request to DATASTORE\n", qe->message_size); |
731 | qe->message_size); | ||
732 | #endif | 688 | #endif |
733 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, | 689 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, |
734 | qe->message_size, | 690 | qe->message_size, |
735 | GNUNET_TIME_absolute_get_remaining (qe->timeout), | 691 | GNUNET_TIME_absolute_get_remaining |
736 | GNUNET_YES, | 692 | (qe->timeout), GNUNET_YES, |
737 | &transmit_request, | 693 | &transmit_request, h); |
738 | h); | ||
739 | GNUNET_assert (GNUNET_NO == h->in_receive); | 694 | GNUNET_assert (GNUNET_NO == h->in_receive); |
740 | GNUNET_break (NULL != h->th); | 695 | GNUNET_break (NULL != h->th); |
741 | } | 696 | } |
@@ -767,16 +722,14 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
767 | { | 722 | { |
768 | struct GNUNET_DATASTORE_Handle *h = qe->h; | 723 | struct GNUNET_DATASTORE_Handle *h = qe->h; |
769 | 724 | ||
770 | GNUNET_CONTAINER_DLL_remove (h->queue_head, | 725 | GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); |
771 | h->queue_tail, | ||
772 | qe); | ||
773 | if (qe->task != GNUNET_SCHEDULER_NO_TASK) | 726 | if (qe->task != GNUNET_SCHEDULER_NO_TASK) |
774 | { | 727 | { |
775 | GNUNET_SCHEDULER_cancel (qe->task); | 728 | GNUNET_SCHEDULER_cancel (qe->task); |
776 | qe->task = GNUNET_SCHEDULER_NO_TASK; | 729 | qe->task = GNUNET_SCHEDULER_NO_TASK; |
777 | } | 730 | } |
778 | h->queue_size--; | 731 | h->queue_size--; |
779 | qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ | 732 | qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ |
780 | GNUNET_free (qe); | 733 | GNUNET_free (qe); |
781 | } | 734 | } |
782 | 735 | ||
@@ -788,10 +741,8 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
788 | * @param cls closure | 741 | * @param cls closure |
789 | * @param msg message received, NULL on timeout or fatal error | 742 | * @param msg message received, NULL on timeout or fatal error |
790 | */ | 743 | */ |
791 | static void | 744 | static void |
792 | process_status_message (void *cls, | 745 | process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) |
793 | const struct | ||
794 | GNUNET_MessageHeader * msg) | ||
795 | { | 746 | { |
796 | struct GNUNET_DATASTORE_Handle *h = cls; | 747 | struct GNUNET_DATASTORE_Handle *h = cls; |
797 | struct GNUNET_DATASTORE_QueueEntry *qe; | 748 | struct GNUNET_DATASTORE_QueueEntry *qe; |
@@ -802,74 +753,68 @@ process_status_message (void *cls, | |||
802 | int was_transmitted; | 753 | int was_transmitted; |
803 | 754 | ||
804 | if (NULL == (qe = h->queue_head)) | 755 | if (NULL == (qe = h->queue_head)) |
805 | { | 756 | { |
806 | GNUNET_break (0); | 757 | GNUNET_break (0); |
807 | do_disconnect (h); | 758 | do_disconnect (h); |
808 | return; | 759 | return; |
809 | } | 760 | } |
810 | rc = qe->qc.sc; | 761 | rc = qe->qc.sc; |
811 | if (msg == NULL) | 762 | if (msg == NULL) |
812 | { | 763 | { |
813 | was_transmitted = qe->was_transmitted; | 764 | was_transmitted = qe->was_transmitted; |
814 | free_queue_entry (qe); | 765 | free_queue_entry (qe); |
815 | if (was_transmitted == GNUNET_YES) | 766 | if (was_transmitted == GNUNET_YES) |
816 | do_disconnect (h); | 767 | do_disconnect (h); |
817 | else | 768 | else |
818 | process_queue (h); | 769 | process_queue (h); |
819 | if (rc.cont != NULL) | 770 | if (rc.cont != NULL) |
820 | rc.cont (rc.cont_cls, | 771 | rc.cont (rc.cont_cls, |
821 | GNUNET_SYSERR, | 772 | GNUNET_SYSERR, |
822 | _("Failed to receive status response from database.")); | 773 | _("Failed to receive status response from database.")); |
823 | return; | 774 | return; |
824 | } | 775 | } |
825 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | 776 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); |
826 | free_queue_entry (qe); | 777 | free_queue_entry (qe); |
827 | if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || | 778 | if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || |
828 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) | 779 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) |
829 | { | 780 | { |
830 | GNUNET_break (0); | 781 | GNUNET_break (0); |
831 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 782 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
832 | do_disconnect (h); | 783 | do_disconnect (h); |
833 | if (rc.cont != NULL) | 784 | if (rc.cont != NULL) |
834 | rc.cont (rc.cont_cls, | 785 | rc.cont (rc.cont_cls, |
835 | GNUNET_SYSERR, | 786 | GNUNET_SYSERR, |
836 | _("Error reading response from datastore service")); | 787 | _("Error reading response from datastore service")); |
837 | return; | 788 | return; |
838 | } | 789 | } |
839 | sm = (const struct StatusMessage*) msg; | 790 | sm = (const struct StatusMessage *) msg; |
840 | status = ntohl(sm->status); | 791 | status = ntohl (sm->status); |
841 | emsg = NULL; | 792 | emsg = NULL; |
842 | if (ntohs(msg->size) > sizeof(struct StatusMessage)) | 793 | if (ntohs (msg->size) > sizeof (struct StatusMessage)) |
843 | { | 794 | { |
844 | emsg = (const char*) &sm[1]; | 795 | emsg = (const char *) &sm[1]; |
845 | if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') | 796 | if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') |
846 | { | ||
847 | GNUNET_break (0); | ||
848 | emsg = _("Invalid error message received from datastore service"); | ||
849 | } | ||
850 | } | ||
851 | if ( (status == GNUNET_SYSERR) && | ||
852 | (emsg == NULL) ) | ||
853 | { | 797 | { |
854 | GNUNET_break (0); | 798 | GNUNET_break (0); |
855 | emsg = _("Invalid error message received from datastore service"); | 799 | emsg = _("Invalid error message received from datastore service"); |
856 | } | 800 | } |
801 | } | ||
802 | if ((status == GNUNET_SYSERR) && (emsg == NULL)) | ||
803 | { | ||
804 | GNUNET_break (0); | ||
805 | emsg = _("Invalid error message received from datastore service"); | ||
806 | } | ||
857 | #if DEBUG_DATASTORE | 807 | #if DEBUG_DATASTORE |
858 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 808 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
859 | "Received status %d/%s\n", | 809 | "Received status %d/%s\n", (int) status, emsg); |
860 | (int) status, | ||
861 | emsg); | ||
862 | #endif | 810 | #endif |
863 | GNUNET_STATISTICS_update (h->stats, | 811 | GNUNET_STATISTICS_update (h->stats, |
864 | gettext_noop ("# status messages received"), | 812 | gettext_noop ("# status messages received"), |
865 | 1, | 813 | 1, GNUNET_NO); |
866 | GNUNET_NO); | ||
867 | h->retry_time.rel_value = 0; | 814 | h->retry_time.rel_value = 0; |
868 | process_queue (h); | 815 | process_queue (h); |
869 | if (rc.cont != NULL) | 816 | if (rc.cont != NULL) |
870 | rc.cont (rc.cont_cls, | 817 | rc.cont (rc.cont_cls, status, emsg); |
871 | status, | ||
872 | emsg); | ||
873 | } | 818 | } |
874 | 819 | ||
875 | 820 | ||
@@ -901,20 +846,20 @@ process_status_message (void *cls, | |||
901 | */ | 846 | */ |
902 | struct GNUNET_DATASTORE_QueueEntry * | 847 | struct GNUNET_DATASTORE_QueueEntry * |
903 | GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | 848 | GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, |
904 | uint32_t rid, | 849 | uint32_t rid, |
905 | const GNUNET_HashCode * key, | 850 | const GNUNET_HashCode * key, |
906 | size_t size, | 851 | size_t size, |
907 | const void *data, | 852 | const void *data, |
908 | enum GNUNET_BLOCK_Type type, | 853 | enum GNUNET_BLOCK_Type type, |
909 | uint32_t priority, | 854 | uint32_t priority, |
910 | uint32_t anonymity, | 855 | uint32_t anonymity, |
911 | uint32_t replication, | 856 | uint32_t replication, |
912 | struct GNUNET_TIME_Absolute expiration, | 857 | struct GNUNET_TIME_Absolute expiration, |
913 | unsigned int queue_priority, | 858 | unsigned int queue_priority, |
914 | unsigned int max_queue_size, | 859 | unsigned int max_queue_size, |
915 | struct GNUNET_TIME_Relative timeout, | 860 | struct GNUNET_TIME_Relative timeout, |
916 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 861 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
917 | void *cont_cls) | 862 | void *cont_cls) |
918 | { | 863 | { |
919 | struct GNUNET_DATASTORE_QueueEntry *qe; | 864 | struct GNUNET_DATASTORE_QueueEntry *qe; |
920 | struct DataMessage *dm; | 865 | struct DataMessage *dm; |
@@ -923,42 +868,41 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
923 | 868 | ||
924 | #if DEBUG_DATASTORE | 869 | #if DEBUG_DATASTORE |
925 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 870 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
926 | "Asked to put %u bytes of data under key `%s' for %llu ms\n", | 871 | "Asked to put %u bytes of data under key `%s' for %llu ms\n", |
927 | size, | 872 | size, |
928 | GNUNET_h2s (key), | 873 | GNUNET_h2s (key), |
929 | GNUNET_TIME_absolute_get_remaining (expiration).rel_value); | 874 | GNUNET_TIME_absolute_get_remaining (expiration).rel_value); |
930 | #endif | 875 | #endif |
931 | msize = sizeof(struct DataMessage) + size; | 876 | msize = sizeof (struct DataMessage) + size; |
932 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 877 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); |
933 | qc.sc.cont = cont; | 878 | qc.sc.cont = cont; |
934 | qc.sc.cont_cls = cont_cls; | 879 | qc.sc.cont_cls = cont_cls; |
935 | qe = make_queue_entry (h, msize, | 880 | qe = make_queue_entry (h, msize, |
936 | queue_priority, max_queue_size, timeout, | 881 | queue_priority, max_queue_size, timeout, |
937 | &process_status_message, &qc); | 882 | &process_status_message, &qc); |
938 | if (qe == NULL) | 883 | if (qe == NULL) |
939 | { | 884 | { |
940 | #if DEBUG_DATASTORE | 885 | #if DEBUG_DATASTORE |
941 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 886 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
942 | "Could not create queue entry for PUT\n"); | 887 | "Could not create queue entry for PUT\n"); |
943 | #endif | 888 | #endif |
944 | return NULL; | 889 | return NULL; |
945 | } | 890 | } |
946 | GNUNET_STATISTICS_update (h->stats, | 891 | GNUNET_STATISTICS_update (h->stats, |
947 | gettext_noop ("# PUT requests executed"), | 892 | gettext_noop ("# PUT requests executed"), |
948 | 1, | 893 | 1, GNUNET_NO); |
949 | GNUNET_NO); | 894 | dm = (struct DataMessage *) &qe[1]; |
950 | dm = (struct DataMessage* ) &qe[1]; | 895 | dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); |
951 | dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); | 896 | dm->header.size = htons (msize); |
952 | dm->header.size = htons(msize); | 897 | dm->rid = htonl (rid); |
953 | dm->rid = htonl(rid); | 898 | dm->size = htonl ((uint32_t) size); |
954 | dm->size = htonl( (uint32_t) size); | 899 | dm->type = htonl (type); |
955 | dm->type = htonl(type); | 900 | dm->priority = htonl (priority); |
956 | dm->priority = htonl(priority); | 901 | dm->anonymity = htonl (anonymity); |
957 | dm->anonymity = htonl(anonymity); | ||
958 | dm->replication = htonl (replication); | 902 | dm->replication = htonl (replication); |
959 | dm->reserved = htonl (0); | 903 | dm->reserved = htonl (0); |
960 | dm->uid = GNUNET_htonll(0); | 904 | dm->uid = GNUNET_htonll (0); |
961 | dm->expiration = GNUNET_TIME_absolute_hton(expiration); | 905 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); |
962 | dm->key = *key; | 906 | dm->key = *key; |
963 | memcpy (&dm[1], data, size); | 907 | memcpy (&dm[1], data, size); |
964 | process_queue (h); | 908 | process_queue (h); |
@@ -987,13 +931,13 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, | |||
987 | */ | 931 | */ |
988 | struct GNUNET_DATASTORE_QueueEntry * | 932 | struct GNUNET_DATASTORE_QueueEntry * |
989 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | 933 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, |
990 | uint64_t amount, | 934 | uint64_t amount, |
991 | uint32_t entries, | 935 | uint32_t entries, |
992 | unsigned int queue_priority, | 936 | unsigned int queue_priority, |
993 | unsigned int max_queue_size, | 937 | unsigned int max_queue_size, |
994 | struct GNUNET_TIME_Relative timeout, | 938 | struct GNUNET_TIME_Relative timeout, |
995 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 939 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
996 | void *cont_cls) | 940 | void *cont_cls) |
997 | { | 941 | { |
998 | struct GNUNET_DATASTORE_QueueEntry *qe; | 942 | struct GNUNET_DATASTORE_QueueEntry *qe; |
999 | struct ReserveMessage *rm; | 943 | struct ReserveMessage *rm; |
@@ -1003,32 +947,30 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1003 | cont = &drop_status_cont; | 947 | cont = &drop_status_cont; |
1004 | #if DEBUG_DATASTORE | 948 | #if DEBUG_DATASTORE |
1005 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 949 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1006 | "Asked to reserve %llu bytes of data and %u entries\n", | 950 | "Asked to reserve %llu bytes of data and %u entries\n", |
1007 | (unsigned long long) amount, | 951 | (unsigned long long) amount, (unsigned int) entries); |
1008 | (unsigned int) entries); | ||
1009 | #endif | 952 | #endif |
1010 | qc.sc.cont = cont; | 953 | qc.sc.cont = cont; |
1011 | qc.sc.cont_cls = cont_cls; | 954 | qc.sc.cont_cls = cont_cls; |
1012 | qe = make_queue_entry (h, sizeof(struct ReserveMessage), | 955 | qe = make_queue_entry (h, sizeof (struct ReserveMessage), |
1013 | queue_priority, max_queue_size, timeout, | 956 | queue_priority, max_queue_size, timeout, |
1014 | &process_status_message, &qc); | 957 | &process_status_message, &qc); |
1015 | if (qe == NULL) | 958 | if (qe == NULL) |
1016 | { | 959 | { |
1017 | #if DEBUG_DATASTORE | 960 | #if DEBUG_DATASTORE |
1018 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 961 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1019 | "Could not create queue entry to reserve\n"); | 962 | "Could not create queue entry to reserve\n"); |
1020 | #endif | 963 | #endif |
1021 | return NULL; | 964 | return NULL; |
1022 | } | 965 | } |
1023 | GNUNET_STATISTICS_update (h->stats, | 966 | GNUNET_STATISTICS_update (h->stats, |
1024 | gettext_noop ("# RESERVE requests executed"), | 967 | gettext_noop ("# RESERVE requests executed"), |
1025 | 1, | 968 | 1, GNUNET_NO); |
1026 | GNUNET_NO); | 969 | rm = (struct ReserveMessage *) &qe[1]; |
1027 | rm = (struct ReserveMessage*) &qe[1]; | 970 | rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); |
1028 | rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); | 971 | rm->header.size = htons (sizeof (struct ReserveMessage)); |
1029 | rm->header.size = htons(sizeof (struct ReserveMessage)); | 972 | rm->entries = htonl (entries); |
1030 | rm->entries = htonl(entries); | 973 | rm->amount = GNUNET_htonll (amount); |
1031 | rm->amount = GNUNET_htonll(amount); | ||
1032 | process_queue (h); | 974 | process_queue (h); |
1033 | return qe; | 975 | return qe; |
1034 | } | 976 | } |
@@ -1057,12 +999,12 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1057 | */ | 999 | */ |
1058 | struct GNUNET_DATASTORE_QueueEntry * | 1000 | struct GNUNET_DATASTORE_QueueEntry * |
1059 | GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | 1001 | GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, |
1060 | uint32_t rid, | 1002 | uint32_t rid, |
1061 | unsigned int queue_priority, | 1003 | unsigned int queue_priority, |
1062 | unsigned int max_queue_size, | 1004 | unsigned int max_queue_size, |
1063 | struct GNUNET_TIME_Relative timeout, | 1005 | struct GNUNET_TIME_Relative timeout, |
1064 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1006 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1065 | void *cont_cls) | 1007 | void *cont_cls) |
1066 | { | 1008 | { |
1067 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1009 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1068 | struct ReleaseReserveMessage *rrm; | 1010 | struct ReleaseReserveMessage *rrm; |
@@ -1071,31 +1013,29 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1071 | if (cont == NULL) | 1013 | if (cont == NULL) |
1072 | cont = &drop_status_cont; | 1014 | cont = &drop_status_cont; |
1073 | #if DEBUG_DATASTORE | 1015 | #if DEBUG_DATASTORE |
1074 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1016 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); |
1075 | "Asked to release reserve %d\n", | ||
1076 | rid); | ||
1077 | #endif | 1017 | #endif |
1078 | qc.sc.cont = cont; | 1018 | qc.sc.cont = cont; |
1079 | qc.sc.cont_cls = cont_cls; | 1019 | qc.sc.cont_cls = cont_cls; |
1080 | qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), | 1020 | qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), |
1081 | queue_priority, max_queue_size, timeout, | 1021 | queue_priority, max_queue_size, timeout, |
1082 | &process_status_message, &qc); | 1022 | &process_status_message, &qc); |
1083 | if (qe == NULL) | 1023 | if (qe == NULL) |
1084 | { | 1024 | { |
1085 | #if DEBUG_DATASTORE | 1025 | #if DEBUG_DATASTORE |
1086 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1026 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1087 | "Could not create queue entry to release reserve\n"); | 1027 | "Could not create queue entry to release reserve\n"); |
1088 | #endif | 1028 | #endif |
1089 | return NULL; | 1029 | return NULL; |
1090 | } | 1030 | } |
1091 | GNUNET_STATISTICS_update (h->stats, | 1031 | GNUNET_STATISTICS_update (h->stats, |
1092 | gettext_noop ("# RELEASE RESERVE requests executed"), | 1032 | gettext_noop |
1093 | 1, | 1033 | ("# RELEASE RESERVE requests executed"), 1, |
1094 | GNUNET_NO); | 1034 | GNUNET_NO); |
1095 | rrm = (struct ReleaseReserveMessage*) &qe[1]; | 1035 | rrm = (struct ReleaseReserveMessage *) &qe[1]; |
1096 | rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); | 1036 | rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); |
1097 | rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); | 1037 | rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); |
1098 | rrm->rid = htonl(rid); | 1038 | rrm->rid = htonl (rid); |
1099 | process_queue (h); | 1039 | process_queue (h); |
1100 | return qe; | 1040 | return qe; |
1101 | } | 1041 | } |
@@ -1120,14 +1060,14 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, | |||
1120 | */ | 1060 | */ |
1121 | struct GNUNET_DATASTORE_QueueEntry * | 1061 | struct GNUNET_DATASTORE_QueueEntry * |
1122 | GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | 1062 | GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, |
1123 | uint64_t uid, | 1063 | uint64_t uid, |
1124 | uint32_t priority, | 1064 | uint32_t priority, |
1125 | struct GNUNET_TIME_Absolute expiration, | 1065 | struct GNUNET_TIME_Absolute expiration, |
1126 | unsigned int queue_priority, | 1066 | unsigned int queue_priority, |
1127 | unsigned int max_queue_size, | 1067 | unsigned int max_queue_size, |
1128 | struct GNUNET_TIME_Relative timeout, | 1068 | struct GNUNET_TIME_Relative timeout, |
1129 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1069 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1130 | void *cont_cls) | 1070 | void *cont_cls) |
1131 | { | 1071 | { |
1132 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1072 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1133 | struct UpdateMessage *um; | 1073 | struct UpdateMessage *um; |
@@ -1137,34 +1077,33 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1137 | cont = &drop_status_cont; | 1077 | cont = &drop_status_cont; |
1138 | #if DEBUG_DATASTORE | 1078 | #if DEBUG_DATASTORE |
1139 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1079 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1140 | "Asked to update entry %llu raising priority by %u and expiration to %llu\n", | 1080 | "Asked to update entry %llu raising priority by %u and expiration to %llu\n", |
1141 | uid, | 1081 | uid, |
1142 | (unsigned int) priority, | 1082 | (unsigned int) priority, |
1143 | (unsigned long long) expiration.abs_value); | 1083 | (unsigned long long) expiration.abs_value); |
1144 | #endif | 1084 | #endif |
1145 | qc.sc.cont = cont; | 1085 | qc.sc.cont = cont; |
1146 | qc.sc.cont_cls = cont_cls; | 1086 | qc.sc.cont_cls = cont_cls; |
1147 | qe = make_queue_entry (h, sizeof(struct UpdateMessage), | 1087 | qe = make_queue_entry (h, sizeof (struct UpdateMessage), |
1148 | queue_priority, max_queue_size, timeout, | 1088 | queue_priority, max_queue_size, timeout, |
1149 | &process_status_message, &qc); | 1089 | &process_status_message, &qc); |
1150 | if (qe == NULL) | 1090 | if (qe == NULL) |
1151 | { | 1091 | { |
1152 | #if DEBUG_DATASTORE | 1092 | #if DEBUG_DATASTORE |
1153 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1093 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1154 | "Could not create queue entry for UPDATE\n"); | 1094 | "Could not create queue entry for UPDATE\n"); |
1155 | #endif | 1095 | #endif |
1156 | return NULL; | 1096 | return NULL; |
1157 | } | 1097 | } |
1158 | GNUNET_STATISTICS_update (h->stats, | 1098 | GNUNET_STATISTICS_update (h->stats, |
1159 | gettext_noop ("# UPDATE requests executed"), | 1099 | gettext_noop ("# UPDATE requests executed"), |
1160 | 1, | 1100 | 1, GNUNET_NO); |
1161 | GNUNET_NO); | 1101 | um = (struct UpdateMessage *) &qe[1]; |
1162 | um = (struct UpdateMessage*) &qe[1]; | 1102 | um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); |
1163 | um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); | 1103 | um->header.size = htons (sizeof (struct UpdateMessage)); |
1164 | um->header.size = htons(sizeof (struct UpdateMessage)); | 1104 | um->priority = htonl (priority); |
1165 | um->priority = htonl(priority); | 1105 | um->expiration = GNUNET_TIME_absolute_hton (expiration); |
1166 | um->expiration = GNUNET_TIME_absolute_hton(expiration); | 1106 | um->uid = GNUNET_htonll (uid); |
1167 | um->uid = GNUNET_htonll(uid); | ||
1168 | process_queue (h); | 1107 | process_queue (h); |
1169 | return qe; | 1108 | return qe; |
1170 | } | 1109 | } |
@@ -1193,14 +1132,14 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
1193 | */ | 1132 | */ |
1194 | struct GNUNET_DATASTORE_QueueEntry * | 1133 | struct GNUNET_DATASTORE_QueueEntry * |
1195 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | 1134 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, |
1196 | const GNUNET_HashCode *key, | 1135 | const GNUNET_HashCode * key, |
1197 | size_t size, | 1136 | size_t size, |
1198 | const void *data, | 1137 | const void *data, |
1199 | unsigned int queue_priority, | 1138 | unsigned int queue_priority, |
1200 | unsigned int max_queue_size, | 1139 | unsigned int max_queue_size, |
1201 | struct GNUNET_TIME_Relative timeout, | 1140 | struct GNUNET_TIME_Relative timeout, |
1202 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1141 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1203 | void *cont_cls) | 1142 | void *cont_cls) |
1204 | { | 1143 | { |
1205 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1144 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1206 | struct DataMessage *dm; | 1145 | struct DataMessage *dm; |
@@ -1211,39 +1150,37 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1211 | cont = &drop_status_cont; | 1150 | cont = &drop_status_cont; |
1212 | #if DEBUG_DATASTORE | 1151 | #if DEBUG_DATASTORE |
1213 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1152 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1214 | "Asked to remove %u bytes under key `%s'\n", | 1153 | "Asked to remove %u bytes under key `%s'\n", |
1215 | size, | 1154 | size, GNUNET_h2s (key)); |
1216 | GNUNET_h2s (key)); | ||
1217 | #endif | 1155 | #endif |
1218 | qc.sc.cont = cont; | 1156 | qc.sc.cont = cont; |
1219 | qc.sc.cont_cls = cont_cls; | 1157 | qc.sc.cont_cls = cont_cls; |
1220 | msize = sizeof(struct DataMessage) + size; | 1158 | msize = sizeof (struct DataMessage) + size; |
1221 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 1159 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); |
1222 | qe = make_queue_entry (h, msize, | 1160 | qe = make_queue_entry (h, msize, |
1223 | queue_priority, max_queue_size, timeout, | 1161 | queue_priority, max_queue_size, timeout, |
1224 | &process_status_message, &qc); | 1162 | &process_status_message, &qc); |
1225 | if (qe == NULL) | 1163 | if (qe == NULL) |
1226 | { | 1164 | { |
1227 | #if DEBUG_DATASTORE | 1165 | #if DEBUG_DATASTORE |
1228 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1166 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1229 | "Could not create queue entry for REMOVE\n"); | 1167 | "Could not create queue entry for REMOVE\n"); |
1230 | #endif | 1168 | #endif |
1231 | return NULL; | 1169 | return NULL; |
1232 | } | 1170 | } |
1233 | GNUNET_STATISTICS_update (h->stats, | 1171 | GNUNET_STATISTICS_update (h->stats, |
1234 | gettext_noop ("# REMOVE requests executed"), | 1172 | gettext_noop ("# REMOVE requests executed"), |
1235 | 1, | 1173 | 1, GNUNET_NO); |
1236 | GNUNET_NO); | 1174 | dm = (struct DataMessage *) &qe[1]; |
1237 | dm = (struct DataMessage*) &qe[1]; | 1175 | dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); |
1238 | dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | 1176 | dm->header.size = htons (msize); |
1239 | dm->header.size = htons(msize); | 1177 | dm->rid = htonl (0); |
1240 | dm->rid = htonl(0); | 1178 | dm->size = htonl (size); |
1241 | dm->size = htonl(size); | 1179 | dm->type = htonl (0); |
1242 | dm->type = htonl(0); | 1180 | dm->priority = htonl (0); |
1243 | dm->priority = htonl(0); | 1181 | dm->anonymity = htonl (0); |
1244 | dm->anonymity = htonl(0); | 1182 | dm->uid = GNUNET_htonll (0); |
1245 | dm->uid = GNUNET_htonll(0); | 1183 | dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); |
1246 | dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); | ||
1247 | dm->key = *key; | 1184 | dm->key = *key; |
1248 | memcpy (&dm[1], data, size); | 1185 | memcpy (&dm[1], data, size); |
1249 | process_queue (h); | 1186 | process_queue (h); |
@@ -1258,9 +1195,8 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1258 | * @param cls closure | 1195 | * @param cls closure |
1259 | * @param msg message received, NULL on timeout or fatal error | 1196 | * @param msg message received, NULL on timeout or fatal error |
1260 | */ | 1197 | */ |
1261 | static void | 1198 | static void |
1262 | process_result_message (void *cls, | 1199 | process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) |
1263 | const struct GNUNET_MessageHeader *msg) | ||
1264 | { | 1200 | { |
1265 | struct GNUNET_DATASTORE_Handle *h = cls; | 1201 | struct GNUNET_DATASTORE_Handle *h = cls; |
1266 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1202 | struct GNUNET_DATASTORE_QueueEntry *qe; |
@@ -1269,104 +1205,98 @@ process_result_message (void *cls, | |||
1269 | int was_transmitted; | 1205 | int was_transmitted; |
1270 | 1206 | ||
1271 | if (msg == NULL) | 1207 | if (msg == NULL) |
1208 | { | ||
1209 | qe = h->queue_head; | ||
1210 | GNUNET_assert (NULL != qe); | ||
1211 | rc = qe->qc.rc; | ||
1212 | was_transmitted = qe->was_transmitted; | ||
1213 | free_queue_entry (qe); | ||
1214 | if (was_transmitted == GNUNET_YES) | ||
1272 | { | 1215 | { |
1273 | qe = h->queue_head; | 1216 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1274 | GNUNET_assert (NULL != qe); | 1217 | _("Failed to receive response from database.\n")); |
1275 | rc = qe->qc.rc; | 1218 | do_disconnect (h); |
1276 | was_transmitted = qe->was_transmitted; | ||
1277 | free_queue_entry (qe); | ||
1278 | if (was_transmitted == GNUNET_YES) | ||
1279 | { | ||
1280 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1281 | _("Failed to receive response from database.\n")); | ||
1282 | do_disconnect (h); | ||
1283 | } | ||
1284 | else | ||
1285 | { | ||
1286 | process_queue (h); | ||
1287 | } | ||
1288 | if (rc.proc != NULL) | ||
1289 | rc.proc (rc.proc_cls, | ||
1290 | NULL, 0, NULL, 0, 0, 0, | ||
1291 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1292 | return; | ||
1293 | } | 1219 | } |
1294 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | 1220 | else |
1295 | { | 1221 | { |
1296 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); | ||
1297 | qe = h->queue_head; | ||
1298 | rc = qe->qc.rc; | ||
1299 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1300 | free_queue_entry (qe); | ||
1301 | #if DEBUG_DATASTORE | ||
1302 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1303 | "Received end of result set, new queue size is %u\n", | ||
1304 | h->queue_size); | ||
1305 | #endif | ||
1306 | if (rc.proc != NULL) | ||
1307 | rc.proc (rc.proc_cls, | ||
1308 | NULL, 0, NULL, 0, 0, 0, | ||
1309 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1310 | h->retry_time.rel_value = 0; | ||
1311 | h->result_count = 0; | ||
1312 | process_queue (h); | 1222 | process_queue (h); |
1313 | return; | ||
1314 | } | 1223 | } |
1224 | if (rc.proc != NULL) | ||
1225 | rc.proc (rc.proc_cls, | ||
1226 | NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1227 | return; | ||
1228 | } | ||
1229 | if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | ||
1230 | { | ||
1231 | GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); | ||
1232 | qe = h->queue_head; | ||
1233 | rc = qe->qc.rc; | ||
1234 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1235 | free_queue_entry (qe); | ||
1236 | #if DEBUG_DATASTORE | ||
1237 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1238 | "Received end of result set, new queue size is %u\n", | ||
1239 | h->queue_size); | ||
1240 | #endif | ||
1241 | if (rc.proc != NULL) | ||
1242 | rc.proc (rc.proc_cls, | ||
1243 | NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1244 | h->retry_time.rel_value = 0; | ||
1245 | h->result_count = 0; | ||
1246 | process_queue (h); | ||
1247 | return; | ||
1248 | } | ||
1315 | qe = h->queue_head; | 1249 | qe = h->queue_head; |
1316 | GNUNET_assert (NULL != qe); | 1250 | GNUNET_assert (NULL != qe); |
1317 | rc = qe->qc.rc; | 1251 | rc = qe->qc.rc; |
1318 | if (GNUNET_YES != qe->was_transmitted) | 1252 | if (GNUNET_YES != qe->was_transmitted) |
1319 | { | 1253 | { |
1320 | GNUNET_break (0); | 1254 | GNUNET_break (0); |
1321 | free_queue_entry (qe); | 1255 | free_queue_entry (qe); |
1322 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 1256 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
1323 | do_disconnect (h); | 1257 | do_disconnect (h); |
1324 | if (rc.proc != NULL) | 1258 | if (rc.proc != NULL) |
1325 | rc.proc (rc.proc_cls, | 1259 | rc.proc (rc.proc_cls, |
1326 | NULL, 0, NULL, 0, 0, 0, | 1260 | NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
1327 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 1261 | return; |
1328 | return; | 1262 | } |
1329 | } | 1263 | if ((ntohs (msg->size) < sizeof (struct DataMessage)) || |
1330 | if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || | 1264 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || |
1331 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || | 1265 | (ntohs (msg->size) != |
1332 | (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) | 1266 | sizeof (struct DataMessage) + |
1333 | { | 1267 | ntohl (((const struct DataMessage *) msg)->size))) |
1334 | GNUNET_break (0); | 1268 | { |
1335 | free_queue_entry (qe); | 1269 | GNUNET_break (0); |
1336 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 1270 | free_queue_entry (qe); |
1337 | do_disconnect (h); | 1271 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
1338 | if (rc.proc != NULL) | 1272 | do_disconnect (h); |
1339 | rc.proc (rc.proc_cls, | 1273 | if (rc.proc != NULL) |
1340 | NULL, 0, NULL, 0, 0, 0, | 1274 | rc.proc (rc.proc_cls, |
1341 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 1275 | NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
1342 | return; | 1276 | return; |
1343 | } | 1277 | } |
1344 | GNUNET_STATISTICS_update (h->stats, | 1278 | GNUNET_STATISTICS_update (h->stats, |
1345 | gettext_noop ("# Results received"), | 1279 | gettext_noop ("# Results received"), 1, GNUNET_NO); |
1346 | 1, | 1280 | dm = (const struct DataMessage *) msg; |
1347 | GNUNET_NO); | ||
1348 | dm = (const struct DataMessage*) msg; | ||
1349 | #if DEBUG_DATASTORE | 1281 | #if DEBUG_DATASTORE |
1350 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1282 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1351 | "Received result %llu with type %u and size %u with key %s\n", | 1283 | "Received result %llu with type %u and size %u with key %s\n", |
1352 | (unsigned long long) GNUNET_ntohll(dm->uid), | 1284 | (unsigned long long) GNUNET_ntohll (dm->uid), |
1353 | ntohl(dm->type), | 1285 | ntohl (dm->type), ntohl (dm->size), GNUNET_h2s (&dm->key)); |
1354 | ntohl(dm->size), | ||
1355 | GNUNET_h2s(&dm->key)); | ||
1356 | #endif | 1286 | #endif |
1357 | free_queue_entry (qe); | 1287 | free_queue_entry (qe); |
1358 | h->retry_time.rel_value = 0; | 1288 | h->retry_time.rel_value = 0; |
1359 | process_queue (h); | 1289 | process_queue (h); |
1360 | if (rc.proc != NULL) | 1290 | if (rc.proc != NULL) |
1361 | rc.proc (rc.proc_cls, | 1291 | rc.proc (rc.proc_cls, |
1362 | &dm->key, | 1292 | &dm->key, |
1363 | ntohl(dm->size), | 1293 | ntohl (dm->size), |
1364 | &dm[1], | 1294 | &dm[1], |
1365 | ntohl(dm->type), | 1295 | ntohl (dm->type), |
1366 | ntohl(dm->priority), | 1296 | ntohl (dm->priority), |
1367 | ntohl(dm->anonymity), | 1297 | ntohl (dm->anonymity), |
1368 | GNUNET_TIME_absolute_ntoh(dm->expiration), | 1298 | GNUNET_TIME_absolute_ntoh (dm->expiration), |
1369 | GNUNET_ntohll(dm->uid)); | 1299 | GNUNET_ntohll (dm->uid)); |
1370 | } | 1300 | } |
1371 | 1301 | ||
1372 | 1302 | ||
@@ -1391,11 +1321,11 @@ process_result_message (void *cls, | |||
1391 | */ | 1321 | */ |
1392 | struct GNUNET_DATASTORE_QueueEntry * | 1322 | struct GNUNET_DATASTORE_QueueEntry * |
1393 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | 1323 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, |
1394 | unsigned int queue_priority, | 1324 | unsigned int queue_priority, |
1395 | unsigned int max_queue_size, | 1325 | unsigned int max_queue_size, |
1396 | struct GNUNET_TIME_Relative timeout, | 1326 | struct GNUNET_TIME_Relative timeout, |
1397 | GNUNET_DATASTORE_DatumProcessor proc, | 1327 | GNUNET_DATASTORE_DatumProcessor proc, |
1398 | void *proc_cls) | 1328 | void *proc_cls) |
1399 | { | 1329 | { |
1400 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1330 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1401 | struct GNUNET_MessageHeader *m; | 1331 | struct GNUNET_MessageHeader *m; |
@@ -1404,29 +1334,29 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1404 | GNUNET_assert (NULL != proc); | 1334 | GNUNET_assert (NULL != proc); |
1405 | #if DEBUG_DATASTORE | 1335 | #if DEBUG_DATASTORE |
1406 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1336 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1407 | "Asked to get replication entry in %llu ms\n", | 1337 | "Asked to get replication entry in %llu ms\n", |
1408 | (unsigned long long) timeout.rel_value); | 1338 | (unsigned long long) timeout.rel_value); |
1409 | #endif | 1339 | #endif |
1410 | qc.rc.proc = proc; | 1340 | qc.rc.proc = proc; |
1411 | qc.rc.proc_cls = proc_cls; | 1341 | qc.rc.proc_cls = proc_cls; |
1412 | qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), | 1342 | qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), |
1413 | queue_priority, max_queue_size, timeout, | 1343 | queue_priority, max_queue_size, timeout, |
1414 | &process_result_message, &qc); | 1344 | &process_result_message, &qc); |
1415 | if (qe == NULL) | 1345 | if (qe == NULL) |
1416 | { | 1346 | { |
1417 | #if DEBUG_DATASTORE | 1347 | #if DEBUG_DATASTORE |
1418 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1348 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1419 | "Could not create queue entry for GET REPLICATION\n"); | 1349 | "Could not create queue entry for GET REPLICATION\n"); |
1420 | #endif | 1350 | #endif |
1421 | return NULL; | 1351 | return NULL; |
1422 | } | 1352 | } |
1423 | GNUNET_STATISTICS_update (h->stats, | 1353 | GNUNET_STATISTICS_update (h->stats, |
1424 | gettext_noop ("# GET REPLICATION requests executed"), | 1354 | gettext_noop |
1425 | 1, | 1355 | ("# GET REPLICATION requests executed"), 1, |
1426 | GNUNET_NO); | 1356 | GNUNET_NO); |
1427 | m = (struct GNUNET_MessageHeader*) &qe[1]; | 1357 | m = (struct GNUNET_MessageHeader *) &qe[1]; |
1428 | m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); | 1358 | m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); |
1429 | m->size = htons(sizeof (struct GNUNET_MessageHeader)); | 1359 | m->size = htons (sizeof (struct GNUNET_MessageHeader)); |
1430 | process_queue (h); | 1360 | process_queue (h); |
1431 | return qe; | 1361 | return qe; |
1432 | } | 1362 | } |
@@ -1454,13 +1384,13 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, | |||
1454 | */ | 1384 | */ |
1455 | struct GNUNET_DATASTORE_QueueEntry * | 1385 | struct GNUNET_DATASTORE_QueueEntry * |
1456 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | 1386 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, |
1457 | uint64_t offset, | 1387 | uint64_t offset, |
1458 | unsigned int queue_priority, | 1388 | unsigned int queue_priority, |
1459 | unsigned int max_queue_size, | 1389 | unsigned int max_queue_size, |
1460 | struct GNUNET_TIME_Relative timeout, | 1390 | struct GNUNET_TIME_Relative timeout, |
1461 | enum GNUNET_BLOCK_Type type, | 1391 | enum GNUNET_BLOCK_Type type, |
1462 | GNUNET_DATASTORE_DatumProcessor proc, | 1392 | GNUNET_DATASTORE_DatumProcessor proc, |
1463 | void *proc_cls) | 1393 | void *proc_cls) |
1464 | { | 1394 | { |
1465 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1395 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1466 | struct GetZeroAnonymityMessage *m; | 1396 | struct GetZeroAnonymityMessage *m; |
@@ -1470,31 +1400,30 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1470 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); | 1400 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1471 | #if DEBUG_DATASTORE | 1401 | #if DEBUG_DATASTORE |
1472 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1473 | "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", | 1403 | "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", |
1474 | (unsigned long long) offset, | 1404 | (unsigned long long) offset, |
1475 | type, | 1405 | type, (unsigned long long) timeout.rel_value); |
1476 | (unsigned long long) timeout.rel_value); | ||
1477 | #endif | 1406 | #endif |
1478 | qc.rc.proc = proc; | 1407 | qc.rc.proc = proc; |
1479 | qc.rc.proc_cls = proc_cls; | 1408 | qc.rc.proc_cls = proc_cls; |
1480 | qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), | 1409 | qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), |
1481 | queue_priority, max_queue_size, timeout, | 1410 | queue_priority, max_queue_size, timeout, |
1482 | &process_result_message, &qc); | 1411 | &process_result_message, &qc); |
1483 | if (qe == NULL) | 1412 | if (qe == NULL) |
1484 | { | 1413 | { |
1485 | #if DEBUG_DATASTORE | 1414 | #if DEBUG_DATASTORE |
1486 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1415 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1487 | "Could not create queue entry for zero-anonymity procation\n"); | 1416 | "Could not create queue entry for zero-anonymity procation\n"); |
1488 | #endif | 1417 | #endif |
1489 | return NULL; | 1418 | return NULL; |
1490 | } | 1419 | } |
1491 | GNUNET_STATISTICS_update (h->stats, | 1420 | GNUNET_STATISTICS_update (h->stats, |
1492 | gettext_noop ("# GET ZERO ANONYMITY requests executed"), | 1421 | gettext_noop |
1493 | 1, | 1422 | ("# GET ZERO ANONYMITY requests executed"), 1, |
1494 | GNUNET_NO); | 1423 | GNUNET_NO); |
1495 | m = (struct GetZeroAnonymityMessage*) &qe[1]; | 1424 | m = (struct GetZeroAnonymityMessage *) &qe[1]; |
1496 | m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | 1425 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1497 | m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); | 1426 | m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); |
1498 | m->type = htonl ((uint32_t) type); | 1427 | m->type = htonl ((uint32_t) type); |
1499 | m->offset = GNUNET_htonll (offset); | 1428 | m->offset = GNUNET_htonll (offset); |
1500 | process_queue (h); | 1429 | process_queue (h); |
@@ -1525,14 +1454,13 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, | |||
1525 | */ | 1454 | */ |
1526 | struct GNUNET_DATASTORE_QueueEntry * | 1455 | struct GNUNET_DATASTORE_QueueEntry * |
1527 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | 1456 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, |
1528 | uint64_t offset, | 1457 | uint64_t offset, |
1529 | const GNUNET_HashCode * key, | 1458 | const GNUNET_HashCode * key, |
1530 | enum GNUNET_BLOCK_Type type, | 1459 | enum GNUNET_BLOCK_Type type, |
1531 | unsigned int queue_priority, | 1460 | unsigned int queue_priority, |
1532 | unsigned int max_queue_size, | 1461 | unsigned int max_queue_size, |
1533 | struct GNUNET_TIME_Relative timeout, | 1462 | struct GNUNET_TIME_Relative timeout, |
1534 | GNUNET_DATASTORE_DatumProcessor proc, | 1463 | GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) |
1535 | void *proc_cls) | ||
1536 | { | 1464 | { |
1537 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1465 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1538 | struct GetMessage *gm; | 1466 | struct GetMessage *gm; |
@@ -1541,41 +1469,39 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, | |||
1541 | GNUNET_assert (NULL != proc); | 1469 | GNUNET_assert (NULL != proc); |
1542 | #if DEBUG_DATASTORE | 1470 | #if DEBUG_DATASTORE |
1543 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1471 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1544 | "Asked to look for data of type %u under key `%s'\n", | 1472 | "Asked to look for data of type %u under key `%s'\n", |
1545 | (unsigned int) type, | 1473 | (unsigned int) type, GNUNET_h2s (key)); |
1546 | GNUNET_h2s (key)); | ||
1547 | #endif | 1474 | #endif |
1548 | qc.rc.proc = proc; | 1475 | qc.rc.proc = proc; |
1549 | qc.rc.proc_cls = proc_cls; | 1476 | qc.rc.proc_cls = proc_cls; |
1550 | qe = make_queue_entry (h, sizeof(struct GetMessage), | 1477 | qe = make_queue_entry (h, sizeof (struct GetMessage), |
1551 | queue_priority, max_queue_size, timeout, | 1478 | queue_priority, max_queue_size, timeout, |
1552 | &process_result_message, &qc); | 1479 | &process_result_message, &qc); |
1553 | if (qe == NULL) | 1480 | if (qe == NULL) |
1554 | { | 1481 | { |
1555 | #if DEBUG_DATASTORE | 1482 | #if DEBUG_DATASTORE |
1556 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1483 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1557 | "Could not queue request for `%s'\n", | 1484 | "Could not queue request for `%s'\n", GNUNET_h2s (key)); |
1558 | GNUNET_h2s (key)); | ||
1559 | #endif | 1485 | #endif |
1560 | return NULL; | 1486 | return NULL; |
1561 | } | 1487 | } |
1562 | GNUNET_STATISTICS_update (h->stats, | 1488 | GNUNET_STATISTICS_update (h->stats, |
1563 | gettext_noop ("# GET requests executed"), | 1489 | gettext_noop ("# GET requests executed"), |
1564 | 1, | 1490 | 1, GNUNET_NO); |
1565 | GNUNET_NO); | 1491 | gm = (struct GetMessage *) &qe[1]; |
1566 | gm = (struct GetMessage*) &qe[1]; | 1492 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); |
1567 | gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); | 1493 | gm->type = htonl (type); |
1568 | gm->type = htonl(type); | ||
1569 | gm->offset = GNUNET_htonll (offset); | 1494 | gm->offset = GNUNET_htonll (offset); |
1570 | if (key != NULL) | 1495 | if (key != NULL) |
1571 | { | 1496 | { |
1572 | gm->header.size = htons(sizeof (struct GetMessage)); | 1497 | gm->header.size = htons (sizeof (struct GetMessage)); |
1573 | gm->key = *key; | 1498 | gm->key = *key; |
1574 | } | 1499 | } |
1575 | else | 1500 | else |
1576 | { | 1501 | { |
1577 | gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); | 1502 | gm->header.size = |
1578 | } | 1503 | htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)); |
1504 | } | ||
1579 | process_queue (h); | 1505 | process_queue (h); |
1580 | return qe; | 1506 | return qe; |
1581 | } | 1507 | } |
@@ -1595,18 +1521,16 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
1595 | GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); | 1521 | GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); |
1596 | h = qe->h; | 1522 | h = qe->h; |
1597 | #if DEBUG_DATASTORE | 1523 | #if DEBUG_DATASTORE |
1598 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1599 | "Pending DATASTORE request %p cancelled (%d, %d)\n", | 1525 | "Pending DATASTORE request %p cancelled (%d, %d)\n", |
1600 | qe, | 1526 | qe, qe->was_transmitted, h->queue_head == qe); |
1601 | qe->was_transmitted, | ||
1602 | h->queue_head == qe); | ||
1603 | #endif | 1527 | #endif |
1604 | if (GNUNET_YES == qe->was_transmitted) | 1528 | if (GNUNET_YES == qe->was_transmitted) |
1605 | { | 1529 | { |
1606 | free_queue_entry (qe); | 1530 | free_queue_entry (qe); |
1607 | h->skip_next_messages++; | 1531 | h->skip_next_messages++; |
1608 | return; | 1532 | return; |
1609 | } | 1533 | } |
1610 | free_queue_entry (qe); | 1534 | free_queue_entry (qe); |
1611 | process_queue (h); | 1535 | process_queue (h); |
1612 | } | 1536 | } |