aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_push.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-12-16 17:34:18 +0000
committerChristian Grothoff <christian@grothoff.org>2014-12-16 17:34:18 +0000
commit0b92336e16fd941ec085c66579486244c6bd83f4 (patch)
treebbb47f96c7bba867018003de148654fd8ca0eab5 /src/fs/gnunet-service-fs_push.c
parentf1cc760d1c98e2738430ad24d8004c5c700c1c49 (diff)
downloadgnunet-0b92336e16fd941ec085c66579486244c6bd83f4.tar.gz
gnunet-0b92336e16fd941ec085c66579486244c6bd83f4.zip
do not poll empty database every 100ms for migration, doxygen fixes, code cleanup
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r--src/fs/gnunet-service-fs_push.c167
1 files changed, 104 insertions, 63 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index c80dc8b08..f099f7fdc 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -181,6 +181,11 @@ static unsigned int mig_size;
181 */ 181 */
182static int enabled; 182static int enabled;
183 183
184/**
185 * Did we find anything in the datastore?
186 */
187static int value_found;
188
184 189
185/** 190/**
186 * Delete the given migration block. 191 * Delete the given migration block.
@@ -205,16 +210,17 @@ find_content (struct MigrationReadyPeer *mrp);
205 210
206 211
207/** 212/**
208 * Transmit the message currently scheduled for 213 * Transmit the message currently scheduled for transmission.
209 * transmission.
210 * 214 *
211 * @param cls the 'struct MigrationReadyPeer' 215 * @param cls the `struct MigrationReadyPeer`
212 * @param buf_size number of bytes available in buf 216 * @param buf_size number of bytes available in @a buf
213 * @param buf where to copy the message, NULL on error (peer disconnect) 217 * @param buf where to copy the message, NULL on error (peer disconnect)
214 * @return number of bytes copied to 'buf', can be 0 (without indicating an error) 218 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
215 */ 219 */
216static size_t 220static size_t
217transmit_message (void *cls, size_t buf_size, void *buf) 221transmit_message (void *cls,
222 size_t buf_size,
223 void *buf)
218{ 224{
219 struct MigrationReadyPeer *peer = cls; 225 struct MigrationReadyPeer *peer = cls;
220 struct PutMessage *msg; 226 struct PutMessage *msg;
@@ -223,7 +229,7 @@ transmit_message (void *cls, size_t buf_size, void *buf)
223 peer->th = NULL; 229 peer->th = NULL;
224 msg = peer->msg; 230 msg = peer->msg;
225 peer->msg = NULL; 231 peer->msg = NULL;
226 if (buf == NULL) 232 if (NULL == buf)
227 { 233 {
228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
229 "Failed to migrate content to another peer (disconnect)\n"); 235 "Failed to migrate content to another peer (disconnect)\n");
@@ -234,8 +240,10 @@ transmit_message (void *cls, size_t buf_size, void *buf)
234 GNUNET_assert (msize <= buf_size); 240 GNUNET_assert (msize <= buf_size);
235 memcpy (buf, msg, msize); 241 memcpy (buf, msg, msize);
236 GNUNET_free (msg); 242 GNUNET_free (msg);
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n", 243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 msize); 244 "Pushing %u bytes to %s\n",
245 msize,
246 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
239 find_content (peer); 247 find_content (peer);
240 return msize; 248 return msize;
241} 249}
@@ -246,7 +254,7 @@ transmit_message (void *cls, size_t buf_size, void *buf)
246 * 254 *
247 * @param peer target peer 255 * @param peer target peer
248 * @param block the block 256 * @param block the block
249 * @return GNUNET_YES if the block was deleted (!) 257 * @return #GNUNET_YES if the block was deleted (!)
250 */ 258 */
251static int 259static int
252transmit_content (struct MigrationReadyPeer *peer, 260transmit_content (struct MigrationReadyPeer *peer,
@@ -287,9 +295,13 @@ transmit_content (struct MigrationReadyPeer *peer,
287 ret = GNUNET_NO; 295 ret = GNUNET_NO;
288 } 296 }
289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290 "Asking for transmission of %u bytes for migration\n", msize); 298 "Asking for transmission of %u bytes to %s for migration\n",
291 peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ , 299 msize,
292 GNUNET_TIME_UNIT_FOREVER_REL, msize, 300 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
301 peer->th = GSF_peer_transmit_ (peer->peer,
302 GNUNET_NO, 0 /* priority */ ,
303 GNUNET_TIME_UNIT_FOREVER_REL,
304 msize,
293 &transmit_message, peer); 305 &transmit_message, peer);
294 return ret; 306 return ret;
295} 307}
@@ -436,19 +448,27 @@ consider_gathering ()
436{ 448{
437 struct GNUNET_TIME_Relative delay; 449 struct GNUNET_TIME_Relative delay;
438 450
439 if (GSF_dsh == NULL) 451 if (NULL == GSF_dsh)
440 return; 452 return;
441 if (mig_qe != NULL) 453 if (NULL != mig_qe)
442 return; 454 return;
443 if (mig_task != GNUNET_SCHEDULER_NO_TASK) 455 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
444 return; 456 return;
445 if (mig_size >= MAX_MIGRATION_QUEUE) 457 if (mig_size >= MAX_MIGRATION_QUEUE)
446 return; 458 return;
447 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); 459 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
448 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); 460 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
449 delay = GNUNET_TIME_relative_max (delay, min_migration_delay); 461 delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
462 if (GNUNET_NO == value_found)
463 {
464 /* wait at least 5s if the datastore is empty */
465 delay = GNUNET_TIME_relative_max (delay,
466 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
467 5));
468 }
450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451 "Scheduling gathering task (queue size: %u)\n", mig_size); 470 "Scheduling gathering task (queue size: %u)\n",
471 mig_size);
452 mig_task = 472 mig_task =
453 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); 473 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
454} 474}
@@ -469,21 +489,28 @@ consider_gathering ()
469 * maybe 0 if no unique identifier is available 489 * maybe 0 if no unique identifier is available
470 */ 490 */
471static void 491static void
472process_migration_content (void *cls, const struct GNUNET_HashCode * key, size_t size, 492process_migration_content (void *cls,
473 const void *data, enum GNUNET_BLOCK_Type type, 493 const struct GNUNET_HashCode *key,
474 uint32_t priority, uint32_t anonymity, 494 size_t size,
475 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 495 const void *data,
496 enum GNUNET_BLOCK_Type type,
497 uint32_t priority,
498 uint32_t anonymity,
499 struct GNUNET_TIME_Absolute expiration,
500 uint64_t uid)
476{ 501{
477 struct MigrationReadyBlock *mb; 502 struct MigrationReadyBlock *mb;
478 struct MigrationReadyPeer *pos; 503 struct MigrationReadyPeer *pos;
479 504
480 mig_qe = NULL; 505 mig_qe = NULL;
481 if (key == NULL) 506 if (NULL == key)
482 { 507 {
483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n"); 508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "No content found for migration...\n");
484 consider_gathering (); 510 consider_gathering ();
485 return; 511 return;
486 } 512 }
513 value_found = GNUNET_YES;
487 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < 514 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
488 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) 515 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
489 { 516 {
@@ -494,34 +521,44 @@ process_migration_content (void *cls, const struct GNUNET_HashCode * key, size_t
494 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 521 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
495 { 522 {
496 if (GNUNET_OK != 523 if (GNUNET_OK !=
497 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 524 GNUNET_FS_handle_on_demand_block (key,
498 anonymity, expiration, uid, 525 size,
526 data,
527 type,
528 priority,
529 anonymity,
530 expiration,
531 uid,
499 &process_migration_content, NULL)) 532 &process_migration_content, NULL))
500 consider_gathering (); 533 consider_gathering ();
501 return; 534 return;
502 } 535 }
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", 537 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
505 GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE); 538 GNUNET_h2s (key),
539 type, mig_size + 1,
540 MAX_MIGRATION_QUEUE);
506 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); 541 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
507 mb->query = *key; 542 mb->query = *key;
508 mb->expiration = expiration; 543 mb->expiration = expiration;
509 mb->size = size; 544 mb->size = size;
510 mb->type = type; 545 mb->type = type;
511 memcpy (&mb[1], data, size); 546 memcpy (&mb[1], data, size);
512 GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb); 547 GNUNET_CONTAINER_DLL_insert_after (mig_head,
548 mig_tail,
549 mig_tail,
550 mb);
513 mig_size++; 551 mig_size++;
514 pos = peer_head; 552 for (pos = peer_head; NULL != pos; pos = pos->next)
515 while (pos != NULL)
516 { 553 {
517 if (NULL == pos->th) 554 if (NULL == pos->th)
518 { 555 {
519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520 "Preparing to push best content to peer\n"); 557 "Preparing to push best content to peer %s\n",
558 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
521 if (GNUNET_YES == transmit_content (pos, mb)) 559 if (GNUNET_YES == transmit_content (pos, mb))
522 break; /* 'mb' was freed! */ 560 break; /* 'mb' was freed! */
523 } 561 }
524 pos = pos->next;
525 } 562 }
526 consider_gathering (); 563 consider_gathering ();
527} 564}
@@ -541,18 +578,18 @@ gather_migration_blocks (void *cls,
541 mig_task = GNUNET_SCHEDULER_NO_TASK; 578 mig_task = GNUNET_SCHEDULER_NO_TASK;
542 if (mig_size >= MAX_MIGRATION_QUEUE) 579 if (mig_size >= MAX_MIGRATION_QUEUE)
543 return; 580 return;
544 if (GSF_dsh != NULL) 581 if (NULL == GSF_dsh)
545 { 582 return;
546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547 "Asking datastore for content for replication (queue size: %u)\n", 584 "Asking datastore for content for replication (queue size: %u)\n",
548 mig_size); 585 mig_size);
549 mig_qe = 586 value_found = GNUNET_NO;
550 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, 587 mig_qe =
551 GNUNET_TIME_UNIT_FOREVER_REL, 588 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
552 &process_migration_content, NULL); 589 GNUNET_TIME_UNIT_FOREVER_REL,
553 if (NULL == mig_qe) 590 &process_migration_content, NULL);
554 consider_gathering (); 591 if (NULL == mig_qe)
555 } 592 consider_gathering ();
556} 593}
557 594
558 595
@@ -569,10 +606,16 @@ GSF_push_start_ (struct GSF_ConnectedPeer *peer)
569 606
570 if (GNUNET_YES != enabled) 607 if (GNUNET_YES != enabled)
571 return; 608 return;
609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610 "Adding peer %s to list for pushing\n",
611 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
612
572 mrp = GNUNET_new (struct MigrationReadyPeer); 613 mrp = GNUNET_new (struct MigrationReadyPeer);
573 mrp->peer = peer; 614 mrp->peer = peer;
574 find_content (mrp); 615 find_content (mrp);
575 GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp); 616 GNUNET_CONTAINER_DLL_insert (peer_head,
617 peer_tail,
618 mrp);
576} 619}
577 620
578 621
@@ -587,27 +630,25 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
587{ 630{
588 struct MigrationReadyPeer *pos; 631 struct MigrationReadyPeer *pos;
589 632
590 pos = peer_head; 633 for (pos = peer_head; NULL != pos; pos = pos->next)
591 while (pos != NULL)
592 {
593 if (pos->peer == peer) 634 if (pos->peer == peer)
594 { 635 break;
595 GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); 636 if (NULL == pos)
596 if (NULL != pos->th) 637 return;
597 { 638 GNUNET_CONTAINER_DLL_remove (peer_head,
598 GSF_peer_transmit_cancel_ (pos->th); 639 peer_tail,
599 pos->th = NULL; 640 pos);
600 } 641 if (NULL != pos->th)
601 if (NULL != pos->msg) 642 {
602 { 643 GSF_peer_transmit_cancel_ (pos->th);
603 GNUNET_free (pos->msg); 644 pos->th = NULL;
604 pos->msg = NULL; 645 }
605 } 646 if (NULL != pos->msg)
606 GNUNET_free (pos); 647 {
607 return; 648 GNUNET_free (pos->msg);
608 } 649 pos->msg = NULL;
609 pos = pos->next;
610 } 650 }
651 GNUNET_free (pos);
611} 652}
612 653
613 654