diff options
author | Christian Grothoff <christian@grothoff.org> | 2014-12-16 17:34:18 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2014-12-16 17:34:18 +0000 |
commit | 0b92336e16fd941ec085c66579486244c6bd83f4 (patch) | |
tree | bbb47f96c7bba867018003de148654fd8ca0eab5 /src/fs/gnunet-service-fs_push.c | |
parent | f1cc760d1c98e2738430ad24d8004c5c700c1c49 (diff) | |
download | gnunet-0b92336e16fd941ec085c66579486244c6bd83f4.tar.gz gnunet-0b92336e16fd941ec085c66579486244c6bd83f4.zip |
do not poll empty database every 100ms for migration, doxygen fixes, code cleanup
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r-- | src/fs/gnunet-service-fs_push.c | 167 |
1 files changed, 104 insertions, 63 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index c80dc8b08..f099f7fdc 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c | |||
@@ -181,6 +181,11 @@ static unsigned int mig_size; | |||
181 | */ | 181 | */ |
182 | static int enabled; | 182 | static int enabled; |
183 | 183 | ||
184 | /** | ||
185 | * Did we find anything in the datastore? | ||
186 | */ | ||
187 | static int value_found; | ||
188 | |||
184 | 189 | ||
185 | /** | 190 | /** |
186 | * Delete the given migration block. | 191 | * Delete the given migration block. |
@@ -205,16 +210,17 @@ find_content (struct MigrationReadyPeer *mrp); | |||
205 | 210 | ||
206 | 211 | ||
207 | /** | 212 | /** |
208 | * Transmit the message currently scheduled for | 213 | * Transmit the message currently scheduled for transmission. |
209 | * transmission. | ||
210 | * | 214 | * |
211 | * @param cls the 'struct MigrationReadyPeer' | 215 | * @param cls the `struct MigrationReadyPeer` |
212 | * @param buf_size number of bytes available in buf | 216 | * @param buf_size number of bytes available in @a buf |
213 | * @param buf where to copy the message, NULL on error (peer disconnect) | 217 | * @param buf where to copy the message, NULL on error (peer disconnect) |
214 | * @return number of bytes copied to 'buf', can be 0 (without indicating an error) | 218 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) |
215 | */ | 219 | */ |
216 | static size_t | 220 | static size_t |
217 | transmit_message (void *cls, size_t buf_size, void *buf) | 221 | transmit_message (void *cls, |
222 | size_t buf_size, | ||
223 | void *buf) | ||
218 | { | 224 | { |
219 | struct MigrationReadyPeer *peer = cls; | 225 | struct MigrationReadyPeer *peer = cls; |
220 | struct PutMessage *msg; | 226 | struct PutMessage *msg; |
@@ -223,7 +229,7 @@ transmit_message (void *cls, size_t buf_size, void *buf) | |||
223 | peer->th = NULL; | 229 | peer->th = NULL; |
224 | msg = peer->msg; | 230 | msg = peer->msg; |
225 | peer->msg = NULL; | 231 | peer->msg = NULL; |
226 | if (buf == NULL) | 232 | if (NULL == buf) |
227 | { | 233 | { |
228 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
229 | "Failed to migrate content to another peer (disconnect)\n"); | 235 | "Failed to migrate content to another peer (disconnect)\n"); |
@@ -234,8 +240,10 @@ transmit_message (void *cls, size_t buf_size, void *buf) | |||
234 | GNUNET_assert (msize <= buf_size); | 240 | GNUNET_assert (msize <= buf_size); |
235 | memcpy (buf, msg, msize); | 241 | memcpy (buf, msg, msize); |
236 | GNUNET_free (msg); | 242 | GNUNET_free (msg); |
237 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n", | 243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
238 | msize); | 244 | "Pushing %u bytes to %s\n", |
245 | msize, | ||
246 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); | ||
239 | find_content (peer); | 247 | find_content (peer); |
240 | return msize; | 248 | return msize; |
241 | } | 249 | } |
@@ -246,7 +254,7 @@ transmit_message (void *cls, size_t buf_size, void *buf) | |||
246 | * | 254 | * |
247 | * @param peer target peer | 255 | * @param peer target peer |
248 | * @param block the block | 256 | * @param block the block |
249 | * @return GNUNET_YES if the block was deleted (!) | 257 | * @return #GNUNET_YES if the block was deleted (!) |
250 | */ | 258 | */ |
251 | static int | 259 | static int |
252 | transmit_content (struct MigrationReadyPeer *peer, | 260 | transmit_content (struct MigrationReadyPeer *peer, |
@@ -287,9 +295,13 @@ transmit_content (struct MigrationReadyPeer *peer, | |||
287 | ret = GNUNET_NO; | 295 | ret = GNUNET_NO; |
288 | } | 296 | } |
289 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 297 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
290 | "Asking for transmission of %u bytes for migration\n", msize); | 298 | "Asking for transmission of %u bytes to %s for migration\n", |
291 | peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ , | 299 | msize, |
292 | GNUNET_TIME_UNIT_FOREVER_REL, msize, | 300 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); |
301 | peer->th = GSF_peer_transmit_ (peer->peer, | ||
302 | GNUNET_NO, 0 /* priority */ , | ||
303 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
304 | msize, | ||
293 | &transmit_message, peer); | 305 | &transmit_message, peer); |
294 | return ret; | 306 | return ret; |
295 | } | 307 | } |
@@ -436,19 +448,27 @@ consider_gathering () | |||
436 | { | 448 | { |
437 | struct GNUNET_TIME_Relative delay; | 449 | struct GNUNET_TIME_Relative delay; |
438 | 450 | ||
439 | if (GSF_dsh == NULL) | 451 | if (NULL == GSF_dsh) |
440 | return; | 452 | return; |
441 | if (mig_qe != NULL) | 453 | if (NULL != mig_qe) |
442 | return; | 454 | return; |
443 | if (mig_task != GNUNET_SCHEDULER_NO_TASK) | 455 | if (GNUNET_SCHEDULER_NO_TASK != mig_task) |
444 | return; | 456 | return; |
445 | if (mig_size >= MAX_MIGRATION_QUEUE) | 457 | if (mig_size >= MAX_MIGRATION_QUEUE) |
446 | return; | 458 | return; |
447 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); | 459 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); |
448 | delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); | 460 | delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); |
449 | delay = GNUNET_TIME_relative_max (delay, min_migration_delay); | 461 | delay = GNUNET_TIME_relative_max (delay, min_migration_delay); |
462 | if (GNUNET_NO == value_found) | ||
463 | { | ||
464 | /* wait at least 5s if the datastore is empty */ | ||
465 | delay = GNUNET_TIME_relative_max (delay, | ||
466 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | ||
467 | 5)); | ||
468 | } | ||
450 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 469 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
451 | "Scheduling gathering task (queue size: %u)\n", mig_size); | 470 | "Scheduling gathering task (queue size: %u)\n", |
471 | mig_size); | ||
452 | mig_task = | 472 | mig_task = |
453 | GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); | 473 | GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); |
454 | } | 474 | } |
@@ -469,21 +489,28 @@ consider_gathering () | |||
469 | * maybe 0 if no unique identifier is available | 489 | * maybe 0 if no unique identifier is available |
470 | */ | 490 | */ |
471 | static void | 491 | static void |
472 | process_migration_content (void *cls, const struct GNUNET_HashCode * key, size_t size, | 492 | process_migration_content (void *cls, |
473 | const void *data, enum GNUNET_BLOCK_Type type, | 493 | const struct GNUNET_HashCode *key, |
474 | uint32_t priority, uint32_t anonymity, | 494 | size_t size, |
475 | struct GNUNET_TIME_Absolute expiration, uint64_t uid) | 495 | const void *data, |
496 | enum GNUNET_BLOCK_Type type, | ||
497 | uint32_t priority, | ||
498 | uint32_t anonymity, | ||
499 | struct GNUNET_TIME_Absolute expiration, | ||
500 | uint64_t uid) | ||
476 | { | 501 | { |
477 | struct MigrationReadyBlock *mb; | 502 | struct MigrationReadyBlock *mb; |
478 | struct MigrationReadyPeer *pos; | 503 | struct MigrationReadyPeer *pos; |
479 | 504 | ||
480 | mig_qe = NULL; | 505 | mig_qe = NULL; |
481 | if (key == NULL) | 506 | if (NULL == key) |
482 | { | 507 | { |
483 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n"); | 508 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
509 | "No content found for migration...\n"); | ||
484 | consider_gathering (); | 510 | consider_gathering (); |
485 | return; | 511 | return; |
486 | } | 512 | } |
513 | value_found = GNUNET_YES; | ||
487 | if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < | 514 | if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < |
488 | MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) | 515 | MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) |
489 | { | 516 | { |
@@ -494,34 +521,44 @@ process_migration_content (void *cls, const struct GNUNET_HashCode * key, size_t | |||
494 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 521 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) |
495 | { | 522 | { |
496 | if (GNUNET_OK != | 523 | if (GNUNET_OK != |
497 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, | 524 | GNUNET_FS_handle_on_demand_block (key, |
498 | anonymity, expiration, uid, | 525 | size, |
526 | data, | ||
527 | type, | ||
528 | priority, | ||
529 | anonymity, | ||
530 | expiration, | ||
531 | uid, | ||
499 | &process_migration_content, NULL)) | 532 | &process_migration_content, NULL)) |
500 | consider_gathering (); | 533 | consider_gathering (); |
501 | return; | 534 | return; |
502 | } | 535 | } |
503 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 536 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
504 | "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", | 537 | "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", |
505 | GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE); | 538 | GNUNET_h2s (key), |
539 | type, mig_size + 1, | ||
540 | MAX_MIGRATION_QUEUE); | ||
506 | mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); | 541 | mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); |
507 | mb->query = *key; | 542 | mb->query = *key; |
508 | mb->expiration = expiration; | 543 | mb->expiration = expiration; |
509 | mb->size = size; | 544 | mb->size = size; |
510 | mb->type = type; | 545 | mb->type = type; |
511 | memcpy (&mb[1], data, size); | 546 | memcpy (&mb[1], data, size); |
512 | GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb); | 547 | GNUNET_CONTAINER_DLL_insert_after (mig_head, |
548 | mig_tail, | ||
549 | mig_tail, | ||
550 | mb); | ||
513 | mig_size++; | 551 | mig_size++; |
514 | pos = peer_head; | 552 | for (pos = peer_head; NULL != pos; pos = pos->next) |
515 | while (pos != NULL) | ||
516 | { | 553 | { |
517 | if (NULL == pos->th) | 554 | if (NULL == pos->th) |
518 | { | 555 | { |
519 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 556 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
520 | "Preparing to push best content to peer\n"); | 557 | "Preparing to push best content to peer %s\n", |
558 | GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); | ||
521 | if (GNUNET_YES == transmit_content (pos, mb)) | 559 | if (GNUNET_YES == transmit_content (pos, mb)) |
522 | break; /* 'mb' was freed! */ | 560 | break; /* 'mb' was freed! */ |
523 | } | 561 | } |
524 | pos = pos->next; | ||
525 | } | 562 | } |
526 | consider_gathering (); | 563 | consider_gathering (); |
527 | } | 564 | } |
@@ -541,18 +578,18 @@ gather_migration_blocks (void *cls, | |||
541 | mig_task = GNUNET_SCHEDULER_NO_TASK; | 578 | mig_task = GNUNET_SCHEDULER_NO_TASK; |
542 | if (mig_size >= MAX_MIGRATION_QUEUE) | 579 | if (mig_size >= MAX_MIGRATION_QUEUE) |
543 | return; | 580 | return; |
544 | if (GSF_dsh != NULL) | 581 | if (NULL == GSF_dsh) |
545 | { | 582 | return; |
546 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 583 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
547 | "Asking datastore for content for replication (queue size: %u)\n", | 584 | "Asking datastore for content for replication (queue size: %u)\n", |
548 | mig_size); | 585 | mig_size); |
549 | mig_qe = | 586 | value_found = GNUNET_NO; |
550 | GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, | 587 | mig_qe = |
551 | GNUNET_TIME_UNIT_FOREVER_REL, | 588 | GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, |
552 | &process_migration_content, NULL); | 589 | GNUNET_TIME_UNIT_FOREVER_REL, |
553 | if (NULL == mig_qe) | 590 | &process_migration_content, NULL); |
554 | consider_gathering (); | 591 | if (NULL == mig_qe) |
555 | } | 592 | consider_gathering (); |
556 | } | 593 | } |
557 | 594 | ||
558 | 595 | ||
@@ -569,10 +606,16 @@ GSF_push_start_ (struct GSF_ConnectedPeer *peer) | |||
569 | 606 | ||
570 | if (GNUNET_YES != enabled) | 607 | if (GNUNET_YES != enabled) |
571 | return; | 608 | return; |
609 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
610 | "Adding peer %s to list for pushing\n", | ||
611 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer))); | ||
612 | |||
572 | mrp = GNUNET_new (struct MigrationReadyPeer); | 613 | mrp = GNUNET_new (struct MigrationReadyPeer); |
573 | mrp->peer = peer; | 614 | mrp->peer = peer; |
574 | find_content (mrp); | 615 | find_content (mrp); |
575 | GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp); | 616 | GNUNET_CONTAINER_DLL_insert (peer_head, |
617 | peer_tail, | ||
618 | mrp); | ||
576 | } | 619 | } |
577 | 620 | ||
578 | 621 | ||
@@ -587,27 +630,25 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) | |||
587 | { | 630 | { |
588 | struct MigrationReadyPeer *pos; | 631 | struct MigrationReadyPeer *pos; |
589 | 632 | ||
590 | pos = peer_head; | 633 | for (pos = peer_head; NULL != pos; pos = pos->next) |
591 | while (pos != NULL) | ||
592 | { | ||
593 | if (pos->peer == peer) | 634 | if (pos->peer == peer) |
594 | { | 635 | break; |
595 | GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); | 636 | if (NULL == pos) |
596 | if (NULL != pos->th) | 637 | return; |
597 | { | 638 | GNUNET_CONTAINER_DLL_remove (peer_head, |
598 | GSF_peer_transmit_cancel_ (pos->th); | 639 | peer_tail, |
599 | pos->th = NULL; | 640 | pos); |
600 | } | 641 | if (NULL != pos->th) |
601 | if (NULL != pos->msg) | 642 | { |
602 | { | 643 | GSF_peer_transmit_cancel_ (pos->th); |
603 | GNUNET_free (pos->msg); | 644 | pos->th = NULL; |
604 | pos->msg = NULL; | 645 | } |
605 | } | 646 | if (NULL != pos->msg) |
606 | GNUNET_free (pos); | 647 | { |
607 | return; | 648 | GNUNET_free (pos->msg); |
608 | } | 649 | pos->msg = NULL; |
609 | pos = pos->next; | ||
610 | } | 650 | } |
651 | GNUNET_free (pos); | ||
611 | } | 652 | } |
612 | 653 | ||
613 | 654 | ||