aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-18 20:18:28 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-18 20:18:28 +0000
commitfc0a84c6e454211f0755e1ce536015e02859bc93 (patch)
treed93308a46e198cbc68d9864abf33c65d16b544c7
parent4ef41d6e9d0fd46311c3267e26f0e73281f090b7 (diff)
downloadgnunet-fc0a84c6e454211f0755e1ce536015e02859bc93.tar.gz
gnunet-fc0a84c6e454211f0755e1ce536015e02859bc93.zip
transition from TRANSPORT_try_connect to modern ATS connectivity API
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c428
1 files changed, 283 insertions, 145 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index f4d817223..097253d06 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -75,11 +75,6 @@
75#define MAXIMUM_PENDING_PER_PEER 64 75#define MAXIMUM_PENDING_PER_PEER 64
76 76
77/** 77/**
78 * How often to update our preference levels for peers in our routing tables.
79 */
80#define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
81
82/**
83 * How long at least to wait before sending another find peer request. 78 * How long at least to wait before sending another find peer request.
84 */ 79 */
85#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) 80#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
@@ -331,11 +326,6 @@ struct PeerInfo
331 struct GNUNET_CORE_TransmitHandle *th; 326 struct GNUNET_CORE_TransmitHandle *th;
332 327
333 /** 328 /**
334 * Task for scheduling preference updates
335 */
336 struct GNUNET_SCHEDULER_Task *preference_task;
337
338 /**
339 * What is the identity of the peer? 329 * What is the identity of the peer?
340 */ 330 */
341 struct GNUNET_PeerIdentity id; 331 struct GNUNET_PeerIdentity id;
@@ -378,6 +368,29 @@ struct PeerBucket
378 368
379 369
380/** 370/**
371 * Information about a peer that we would like to connect to.
372 */
373struct ConnectInfo
374{
375
376 /**
377 * Handle to active HELLO offer operation, or NULL.
378 */
379 struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
380
381 /**
382 * Handle to active connectivity suggestion operation, or NULL.
383 */
384 struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
385
386 /**
387 * How much would we like to connect to this peer?
388 */
389 uint32_t strength;
390};
391
392
393/**
381 * Do we cache all results that we are routing in the local datacache? 394 * Do we cache all results that we are routing in the local datacache?
382 */ 395 */
383static int cache_results; 396static int cache_results;
@@ -410,11 +423,17 @@ static struct PeerBucket k_buckets[MAX_BUCKETS];
410 423
411/** 424/**
412 * Hash map of all CORE-connected peers, for easy removal from 425 * Hash map of all CORE-connected peers, for easy removal from
413 * #k_buckets on disconnect. 426 * #k_buckets on disconnect. Values are of type `struct PeerInfo`.
414 */ 427 */
415static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers; 428static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
416 429
417/** 430/**
431 * Hash map of all peers we would like to be connected to.
432 * Values are of type `struct ConnectInfo`.
433 */
434static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
435
436/**
418 * Maximum size for each bucket. 437 * Maximum size for each bucket.
419 */ 438 */
420static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; 439static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
@@ -440,17 +459,11 @@ static struct GNUNET_HashCode my_identity_hash;
440static struct GNUNET_CORE_Handle *core_api; 459static struct GNUNET_CORE_Handle *core_api;
441 460
442/** 461/**
443 * Handle to ATS performance monitoring.
444 */
445static struct GNUNET_ATS_PerformanceHandle *ats_perf;
446
447/**
448 * Handle to ATS connectivity. 462 * Handle to ATS connectivity.
449 */ 463 */
450static struct GNUNET_ATS_ConnectivityHandle *ats_ch; 464static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
451 465
452 466
453
454/** 467/**
455 * Find the optimal bucket for this key. 468 * Find the optimal bucket for this key.
456 * 469 *
@@ -475,65 +488,165 @@ find_bucket (const struct GNUNET_HashCode *hc)
475 488
476 489
477/** 490/**
478 * Let GNUnet core know that we like the given peer. 491 * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
492 * Clean up the "oh" field in the @a cls
479 * 493 *
480 * @param cls the `struct PeerInfo` of the peer 494 * @param cls a `struct ConnectInfo`
481 * @param tc scheduler context. 495 * @param tc unused
482 */ 496 */
483static void 497static void
484update_core_preference (void *cls, 498offer_hello_done (void *cls,
485 const struct GNUNET_SCHEDULER_TaskContext *tc) 499 const struct GNUNET_SCHEDULER_TaskContext *tc)
486{ 500{
487 struct PeerInfo *peer = cls; 501 struct ConnectInfo *ci = cls;
488 uint64_t preference;
489 unsigned int matching;
490 int bucket;
491 struct GNUNET_HashCode phash;
492 502
493 peer->preference_task = NULL; 503 ci->oh = NULL;
494 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 504}
495 return; 505
496 GNUNET_CRYPTO_hash (&peer->id, 506
497 sizeof (struct GNUNET_PeerIdentity), 507/**
498 &phash); 508 * Function called for all entries in #all_desired_peers to clean up.
499 matching = 509 *
500 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, 510 * @param cls NULL
501 &phash); 511 * @param peer peer the entry is for
502 if (matching >= 64) 512 * @param value the value to remove
503 matching = 63; 513 * @return #GNUNET_YES
504 bucket = find_bucket (&phash); 514 */
505 if (bucket == GNUNET_SYSERR) 515static int
506 preference = 0; 516free_connect_info (void *cls,
507 else 517 const struct GNUNET_PeerIdentity *peer,
518 void *value)
519{
520 struct ConnectInfo *ci = cls;
521
522 GNUNET_assert (GNUNET_YES ==
523 GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
524 peer,
525 ci));
526 if (NULL != ci->sh)
527 {
528 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
529 ci->sh = NULL;
530 }
531 if (NULL != ci->oh)
508 { 532 {
509 GNUNET_assert (k_buckets[bucket].peers_size != 0); 533 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
510 preference = (1LL << matching) / k_buckets[bucket].peers_size; 534 ci->oh = NULL;
511 } 535 }
512 if (preference == 0) 536 GNUNET_free (ci);
537 return GNUNET_YES;
538}
539
540
541/**
542 * Consider if we want to connect to a given peer, and if so
543 * let ATS know. If applicable, the HELLO is offered to the
544 * TRANSPORT service.
545 *
546 * @param pid peer to consider connectivity requirements for
547 * @param h a HELLO message, or NULL
548 */
549static void
550try_connect (const struct GNUNET_PeerIdentity *pid,
551 const struct GNUNET_MessageHeader *h)
552{
553 int bucket;
554 struct GNUNET_HashCode pid_hash;
555 struct ConnectInfo *ci;
556 uint32_t strength;
557
558 GNUNET_CRYPTO_hash (pid,
559 sizeof (struct GNUNET_PeerIdentity),
560 &pid_hash);
561 bucket = find_bucket (&pid_hash);
562 if (bucket < 0)
563 return; /* self? */
564 ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
565 pid);
566
567 if (k_buckets[bucket].peers_size < bucket_size)
568 strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
569 else
570 strength = bucket; /* minimum value of connectivity */
571 if (GNUNET_YES ==
572 GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
573 pid))
574 strength *= 2; /* double for connected peers */
575 else if (k_buckets[bucket].peers_size > bucket_size)
576 strength = 0; /* bucket full, we really do not care about more */
577
578 if ( (0 == strength) &&
579 (NULL != ci) )
513 { 580 {
514 peer->preference_task = 581 /* release request */
515 GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, 582 GNUNET_assert (GNUNET_YES ==
516 &update_core_preference, peer); 583 free_connect_info (NULL,
584 pid,
585 ci));
517 return; 586 return;
518 } 587 }
519 GNUNET_STATISTICS_update (GDS_stats, 588 if (NULL == ci)
520 gettext_noop ("# Preference updates given to core"), 589 ci = GNUNET_new (struct ConnectInfo);
521 1, GNUNET_NO); 590 if ( (NULL != GDS_transport_handle) &&
522 GNUNET_ATS_performance_change_preference (ats_perf, 591 (NULL != ci->oh) &&
523 &peer->id, 592 (NULL != h) )
524 GNUNET_ATS_PREFERENCE_BANDWIDTH, 593 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
525 (double) preference, 594 if ( (NULL != GDS_transport_handle) &&
526 GNUNET_ATS_PREFERENCE_END); 595 (NULL != h) )
527 peer->preference_task = 596 ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
528 GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, 597 h,
529 &update_core_preference, peer); 598 &offer_hello_done,
599 ci);
600 if ( (NULL != ci->sh) &&
601 (ci->strength != strength) )
602 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
603 if (ci->strength != strength)
604 ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
605 pid,
606 strength);
607 ci->strength = strength;
608}
609
610
611/**
612 * Function called for each peer in #all_desired_peers during
613 * #update_connect_preferences() if we have reason to adjust
614 * the strength of our desire to keep connections to certain
615 * peers. Calls #try_connect() to update the calculations for
616 * the given @a pid.
617 *
618 * @param cls NULL
619 * @param pid peer to update
620 * @param value unused
621 * @return #GNUNET_YES (continue to iterate)
622 */
623static int
624update_desire_strength (void *cls,
625 const struct GNUNET_PeerIdentity *pid,
626 void *value)
627{
628 try_connect (pid, NULL);
629 return GNUNET_YES;
630}
530 631
531 632
633/**
634 * Update our preferences for connectivity as given to ATS.
635 *
636 * @param cls the `struct PeerInfo` of the peer
637 * @param tc scheduler context.
638 */
639static void
640update_connect_preferences ()
641{
642 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
643 &update_desire_strength,
644 NULL);
532} 645}
533 646
534 647
535/** 648/**
536 * Closure for 'add_known_to_bloom'. 649 * Closure for #add_known_to_bloom().
537 */ 650 */
538struct BloomConstructorContext 651struct BloomConstructorContext
539{ 652{
@@ -567,8 +680,12 @@ add_known_to_bloom (void *cls,
567 struct GNUNET_HashCode key_hash; 680 struct GNUNET_HashCode key_hash;
568 struct GNUNET_HashCode mh; 681 struct GNUNET_HashCode mh;
569 682
570 GNUNET_CRYPTO_hash (key, sizeof (struct GNUNET_PeerIdentity), &key_hash); 683 GNUNET_CRYPTO_hash (key,
571 GNUNET_BLOCK_mingle_hash (&key_hash, ctx->bf_mutator, &mh); 684 sizeof (struct GNUNET_PeerIdentity),
685 &key_hash);
686 GNUNET_BLOCK_mingle_hash (&key_hash,
687 ctx->bf_mutator,
688 &mh);
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n", 690 "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
574 GNUNET_i2s (key), ctx->bf_mutator); 691 GNUNET_i2s (key), ctx->bf_mutator);
@@ -667,7 +784,9 @@ handle_core_connect (void *cls,
667 GNUNET_break (0); 784 GNUNET_break (0);
668 return; 785 return;
669 } 786 }
670 GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1, 787 GNUNET_STATISTICS_update (GDS_stats,
788 gettext_noop ("# peers connected"),
789 1,
671 GNUNET_NO); 790 GNUNET_NO);
672 GNUNET_CRYPTO_hash (peer, 791 GNUNET_CRYPTO_hash (peer,
673 sizeof (struct GNUNET_PeerIdentity), 792 sizeof (struct GNUNET_PeerIdentity),
@@ -684,18 +803,19 @@ handle_core_connect (void *cls,
684 k_buckets[peer_bucket].tail, 803 k_buckets[peer_bucket].tail,
685 ret); 804 ret);
686 k_buckets[peer_bucket].peers_size++; 805 k_buckets[peer_bucket].peers_size++;
687 closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket); 806 closest_bucket = GNUNET_MAX (closest_bucket,
688 if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size)) 807 peer_bucket);
689 {
690 ret->preference_task =
691 GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
692 newly_found_peers++;
693 }
694 GNUNET_assert (GNUNET_OK == 808 GNUNET_assert (GNUNET_OK ==
695 GNUNET_CONTAINER_multipeermap_put (all_connected_peers, 809 GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
696 peer, 810 peer,
697 ret, 811 ret,
698 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 812 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
813 if ( (peer_bucket > 0) &&
814 (k_buckets[peer_bucket].peers_size <= bucket_size))
815 {
816 update_connect_preferences ();
817 newly_found_peers++;
818 }
699 if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) && 819 if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
700 (GNUNET_YES != disable_try_connect)) 820 (GNUNET_YES != disable_try_connect))
701 { 821 {
@@ -723,7 +843,9 @@ handle_core_disconnect (void *cls,
723 struct GNUNET_HashCode phash; 843 struct GNUNET_HashCode phash;
724 844
725 /* Check for disconnect from self message */ 845 /* Check for disconnect from self message */
726 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 846 if (0 == memcmp (&my_identity,
847 peer,
848 sizeof (struct GNUNET_PeerIdentity)))
727 return; 849 return;
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 850 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729 "Disconnected %s\n", 851 "Disconnected %s\n",
@@ -736,30 +858,28 @@ handle_core_disconnect (void *cls,
736 GNUNET_break (0); 858 GNUNET_break (0);
737 return; 859 return;
738 } 860 }
739 GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1, 861 GNUNET_STATISTICS_update (GDS_stats,
862 gettext_noop ("# peers connected"),
863 -1,
740 GNUNET_NO); 864 GNUNET_NO);
741 GNUNET_assert (GNUNET_YES == 865 GNUNET_assert (GNUNET_YES ==
742 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers, 866 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
743 peer, 867 peer,
744 to_remove)); 868 to_remove));
745 if (NULL != to_remove->preference_task)
746 {
747 GNUNET_SCHEDULER_cancel (to_remove->preference_task);
748 to_remove->preference_task = NULL;
749 }
750 GNUNET_CRYPTO_hash (peer, 869 GNUNET_CRYPTO_hash (peer,
751 sizeof (struct GNUNET_PeerIdentity), 870 sizeof (struct GNUNET_PeerIdentity),
752 &phash); 871 &phash);
753 current_bucket = find_bucket (&phash); 872 current_bucket = find_bucket (&phash);
754 GNUNET_assert (current_bucket >= 0); 873 GNUNET_assert (current_bucket >= 0);
755 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head, 874 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
756 k_buckets[current_bucket].tail, to_remove); 875 k_buckets[current_bucket].tail,
876 to_remove);
757 GNUNET_assert (k_buckets[current_bucket].peers_size > 0); 877 GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
758 k_buckets[current_bucket].peers_size--; 878 k_buckets[current_bucket].peers_size--;
759 while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0)) 879 while ( (closest_bucket > 0) &&
880 (0 == k_buckets[closest_bucket].peers_size) )
760 closest_bucket--; 881 closest_bucket--;
761 882 if (NULL != to_remove->th)
762 if (to_remove->th != NULL)
763 { 883 {
764 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th); 884 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
765 to_remove->th = NULL; 885 to_remove->th = NULL;
@@ -767,14 +887,18 @@ handle_core_disconnect (void *cls,
767 discarded = 0; 887 discarded = 0;
768 while (NULL != (pos = to_remove->head)) 888 while (NULL != (pos = to_remove->head))
769 { 889 {
770 GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos); 890 GNUNET_CONTAINER_DLL_remove (to_remove->head,
891 to_remove->tail,
892 pos);
771 discarded++; 893 discarded++;
772 GNUNET_free (pos); 894 GNUNET_free (pos);
773 } 895 }
896 if (k_buckets[current_bucket].peers_size < bucket_size)
897 update_connect_preferences ();
774 GNUNET_STATISTICS_update (GDS_stats, 898 GNUNET_STATISTICS_update (GDS_stats,
775 gettext_noop 899 gettext_noop ("# Queued messages discarded (peer disconnected)"),
776 ("# Queued messages discarded (peer disconnected)"), 900 discarded,
777 discarded, GNUNET_NO); 901 GNUNET_NO);
778 GNUNET_free (to_remove); 902 GNUNET_free (to_remove);
779} 903}
780 904
@@ -784,12 +908,14 @@ handle_core_disconnect (void *cls,
784 * out to the destination. 908 * out to the destination.
785 * 909 *
786 * @param cls the 'struct PeerInfo' of the target peer 910 * @param cls the 'struct PeerInfo' of the target peer
787 * @param size number of bytes available in buf 911 * @param size number of bytes available in @a buf
788 * @param buf where the callee should write the message 912 * @param buf where the callee should write the message
789 * @return number of bytes written to buf 913 * @return number of bytes written to @a buf
790 */ 914 */
791static size_t 915static size_t
792core_transmit_notify (void *cls, size_t size, void *buf) 916core_transmit_notify (void *cls,
917 size_t size,
918 void *buf)
793{ 919{
794 struct PeerInfo *peer = cls; 920 struct PeerInfo *peer = cls;
795 char *cbuf = buf; 921 char *cbuf = buf;
@@ -885,9 +1011,11 @@ process_peer_queue (struct PeerInfo *peer)
885 GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO, 1011 GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
886 GNUNET_CORE_PRIO_BEST_EFFORT, 1012 GNUNET_CORE_PRIO_BEST_EFFORT,
887 GNUNET_TIME_absolute_get_remaining 1013 GNUNET_TIME_absolute_get_remaining
888 (pending->timeout), &peer->id, 1014 (pending->timeout),
1015 &peer->id,
889 ntohs (pending->msg->size), 1016 ntohs (pending->msg->size),
890 &core_transmit_notify, peer); 1017 &core_transmit_notify,
1018 peer);
891 GNUNET_break (NULL != peer->th); 1019 GNUNET_break (NULL != peer->th);
892} 1020}
893 1021
@@ -1029,12 +1157,12 @@ am_closest_peer (const struct GNUNET_HashCode *key,
1029 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key); 1157 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
1030 pos = k_buckets[bucket_num].head; 1158 pos = k_buckets[bucket_num].head;
1031 count = 0; 1159 count = 0;
1032 while ((pos != NULL) && (count < bucket_size)) 1160 while ((NULL != pos) && (count < bucket_size))
1033 { 1161 {
1034 GNUNET_CRYPTO_hash (&pos->id, 1162 GNUNET_CRYPTO_hash (&pos->id,
1035 sizeof (struct GNUNET_PeerIdentity), 1163 sizeof (struct GNUNET_PeerIdentity),
1036 &phash); 1164 &phash);
1037 if ((bloom != NULL) && 1165 if ((NULL != bloom) &&
1038 (GNUNET_YES == 1166 (GNUNET_YES ==
1039 GNUNET_CONTAINER_bloomfilter_test (bloom, &phash))) 1167 GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
1040 { 1168 {
@@ -1071,8 +1199,9 @@ am_closest_peer (const struct GNUNET_HashCode *key,
1071 * @return Peer to route to, or NULL on error 1199 * @return Peer to route to, or NULL on error
1072 */ 1200 */
1073static struct PeerInfo * 1201static struct PeerInfo *
1074select_peer (const struct GNUNET_HashCode * key, 1202select_peer (const struct GNUNET_HashCode *key,
1075 const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops) 1203 const struct GNUNET_CONTAINER_BloomFilter *bloom,
1204 uint32_t hops)
1076{ 1205{
1077 unsigned int bc; 1206 unsigned int bc;
1078 unsigned int count; 1207 unsigned int count;
@@ -1788,12 +1917,16 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1788 ntohl (put->desired_replication_level), 1917 ntohl (put->desired_replication_level),
1789 GNUNET_TIME_absolute_ntoh (put->expiration_time), 1918 GNUNET_TIME_absolute_ntoh (put->expiration_time),
1790 ntohl (put->hop_count), bf, 1919 ntohl (put->hop_count), bf,
1791 &put->key, putlen, 1920 &put->key,
1792 pp, payload, payload_size); 1921 putlen,
1922 pp,
1923 payload,
1924 payload_size);
1793 /* notify monitoring clients */ 1925 /* notify monitoring clients */
1794 GDS_CLIENTS_process_put (options 1926 GDS_CLIENTS_process_put (options
1795 | (GNUNET_OK == forwarded) 1927 | ( (GNUNET_OK == forwarded)
1796 ? GNUNET_DHT_RO_LAST_HOP : 0, 1928 ? GNUNET_DHT_RO_LAST_HOP
1929 : 0 ),
1797 ntohl (put->type), 1930 ntohl (put->type),
1798 ntohl (put->hop_count), 1931 ntohl (put->hop_count),
1799 ntohl (put->desired_replication_level), 1932 ntohl (put->desired_replication_level),
@@ -2130,8 +2263,11 @@ handle_dht_p2p_result (void *cls,
2130 char *tmp; 2263 char *tmp;
2131 2264
2132 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); 2265 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2133 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N RESULT %s: %s->%s (%u)\n", 2266 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2134 GNUNET_h2s (&prm->key), GNUNET_i2s (peer), tmp, 2267 "R5N RESULT %s: %s->%s (%u)\n",
2268 GNUNET_h2s (&prm->key),
2269 GNUNET_i2s (peer),
2270 tmp,
2135 get_path_length + 1); 2271 get_path_length + 1);
2136 GNUNET_free (tmp); 2272 GNUNET_free (tmp);
2137 } 2273 }
@@ -2140,7 +2276,6 @@ handle_dht_p2p_result (void *cls,
2140 { 2276 {
2141 const struct GNUNET_MessageHeader *h; 2277 const struct GNUNET_MessageHeader *h;
2142 struct GNUNET_PeerIdentity pid; 2278 struct GNUNET_PeerIdentity pid;
2143 int bucket;
2144 2279
2145 /* Should be a HELLO, validate and consider using it! */ 2280 /* Should be a HELLO, validate and consider using it! */
2146 if (data_size < sizeof (struct GNUNET_MessageHeader)) 2281 if (data_size < sizeof (struct GNUNET_MessageHeader))
@@ -2165,47 +2300,35 @@ handle_dht_p2p_result (void *cls,
2165 (0 != memcmp (&my_identity, 2300 (0 != memcmp (&my_identity,
2166 &pid, 2301 &pid,
2167 sizeof (struct GNUNET_PeerIdentity))) ) 2302 sizeof (struct GNUNET_PeerIdentity))) )
2168 { 2303 try_connect (&pid,
2169 struct GNUNET_HashCode pid_hash; 2304 h);
2170
2171 GNUNET_CRYPTO_hash (&pid,
2172 sizeof (struct GNUNET_PeerIdentity),
2173 &pid_hash);
2174 bucket = find_bucket (&pid_hash);
2175 if ( (bucket >= 0) &&
2176 (k_buckets[bucket].peers_size < bucket_size) &&
2177 (NULL != GDS_transport_handle) )
2178 {
2179 GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
2180 h,
2181 NULL,
2182 NULL);
2183 GNUNET_TRANSPORT_try_connect (GDS_transport_handle,
2184 &pid,
2185 NULL,
2186 NULL); /*FIXME TRY_CONNECT change */
2187 }
2188 }
2189 } 2305 }
2190 2306
2191 /* append 'peer' to 'get_path' */ 2307 /* append 'peer' to 'get_path' */
2192 { 2308 {
2193 struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; 2309 struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2194 2310
2195 memcpy (xget_path, get_path, 2311 memcpy (xget_path,
2312 get_path,
2196 get_path_length * sizeof (struct GNUNET_PeerIdentity)); 2313 get_path_length * sizeof (struct GNUNET_PeerIdentity));
2197 xget_path[get_path_length] = *peer; 2314 xget_path[get_path_length] = *peer;
2198 get_path_length++; 2315 get_path_length++;
2199 2316
2200 /* forward to local clients */ 2317 /* forward to local clients */
2201 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time), 2318 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2202 &prm->key, get_path_length, xget_path, 2319 &prm->key,
2203 put_path_length, put_path, type, data_size, data); 2320 get_path_length,
2321 xget_path,
2322 put_path_length,
2323 put_path,
2324 type,
2325 data_size,
2326 data);
2204 GDS_CLIENTS_process_get_resp (type, 2327 GDS_CLIENTS_process_get_resp (type,
2205 xget_path, get_path_length, 2328 xget_path,
2329 get_path_length,
2206 put_path, put_path_length, 2330 put_path, put_path_length,
2207 GNUNET_TIME_absolute_ntoh ( 2331 GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2208 prm->expiration_time),
2209 &prm->key, 2332 &prm->key,
2210 data, 2333 data,
2211 data_size); 2334 data_size);
@@ -2220,13 +2343,22 @@ handle_dht_p2p_result (void *cls,
2220 2343
2221 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time), 2344 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2222 &prm->key, 2345 &prm->key,
2223 get_path_length + put_path_length, xput_path, 2346 get_path_length + put_path_length,
2224 type, data_size, data); 2347 xput_path,
2348 type,
2349 data_size,
2350 data);
2225 } 2351 }
2226 /* forward to other peers */ 2352 /* forward to other peers */
2227 GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time), 2353 GDS_ROUTING_process (type,
2228 &prm->key, put_path_length, put_path, get_path_length, 2354 GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2229 xget_path, data, data_size); 2355 &prm->key,
2356 put_path_length,
2357 put_path,
2358 get_path_length,
2359 xget_path,
2360 data,
2361 data_size);
2230 } 2362 }
2231 2363
2232 return GNUNET_YES; 2364 return GNUNET_YES;
@@ -2236,7 +2368,7 @@ handle_dht_p2p_result (void *cls,
2236/** 2368/**
2237 * Initialize neighbours subsystem. 2369 * Initialize neighbours subsystem.
2238 * 2370 *
2239 * @return GNUNET_OK on success, GNUNET_SYSERR on error 2371 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2240 */ 2372 */
2241int 2373int
2242GDS_NEIGHBOURS_init () 2374GDS_NEIGHBOURS_init ()
@@ -2260,18 +2392,21 @@ GDS_NEIGHBOURS_init ()
2260 2392
2261 log_route_details_stderr = 2393 log_route_details_stderr =
2262 (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO; 2394 (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2263 ats_perf = GNUNET_ATS_performance_init (GDS_cfg,
2264 NULL,
2265 NULL);
2266 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg); 2395 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2267 core_api = 2396 core_api =
2268 GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect, 2397 GNUNET_CORE_connect (GDS_cfg, NULL,
2269 &handle_core_disconnect, NULL, GNUNET_NO, NULL, 2398 &core_init,
2270 GNUNET_NO, core_handlers); 2399 &handle_core_connect,
2400 &handle_core_disconnect,
2401 NULL, GNUNET_NO,
2402 NULL, GNUNET_NO,
2403 core_handlers);
2271 if (core_api == NULL) 2404 if (core_api == NULL)
2272 return GNUNET_SYSERR; 2405 return GNUNET_SYSERR;
2273 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256, 2406 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2274 GNUNET_NO); 2407 GNUNET_NO);
2408 all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2409 GNUNET_NO);
2275 return GNUNET_OK; 2410 return GNUNET_OK;
2276} 2411}
2277 2412
@@ -2286,13 +2421,16 @@ GDS_NEIGHBOURS_done ()
2286 return; 2421 return;
2287 GNUNET_CORE_disconnect (core_api); 2422 GNUNET_CORE_disconnect (core_api);
2288 core_api = NULL; 2423 core_api = NULL;
2289 GNUNET_ATS_performance_done (ats_perf);
2290 ats_perf = NULL;
2291 GNUNET_ATS_connectivity_done (ats_ch);
2292 ats_ch = NULL;
2293 GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); 2424 GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2294 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers); 2425 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2295 all_connected_peers = NULL; 2426 all_connected_peers = NULL;
2427 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2428 &free_connect_info,
2429 NULL);
2430 GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2431 all_desired_peers = NULL;
2432 GNUNET_ATS_connectivity_done (ats_ch);
2433 ats_ch = NULL;
2296 if (NULL != find_peer_task) 2434 if (NULL != find_peer_task)
2297 { 2435 {
2298 GNUNET_SCHEDULER_cancel (find_peer_task); 2436 GNUNET_SCHEDULER_cancel (find_peer_task);