diff options
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.c | 147 |
2 files changed, 120 insertions, 28 deletions
@@ -1,7 +1,6 @@ | |||
1 | 0.9.0pre2: | 1 | 0.9.0pre2: |
2 | FS: | 2 | FS: |
3 | - measure latencies (core, datastore) => trust economy | 3 | - measure latencies (core, datastore) => trust economy |
4 | - refuse content migration message (or solicit?) | ||
5 | - FS performance benchmarking | 4 | - FS performance benchmarking |
6 | - integrate with DHT (need DHT API to fit block API better first; also, get rid of the continuation!) | 5 | - integrate with DHT (need DHT API to fit block API better first; also, get rid of the continuation!) |
7 | * DHT: [Nate] | 6 | * DHT: [Nate] |
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 9e53d2fd5..2bfdeb674 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -28,8 +28,6 @@ | |||
28 | * - consider more precise latency estimation (per-peer & request) -- again load API? | 28 | * - consider more precise latency estimation (per-peer & request) -- again load API? |
29 | * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. | 29 | * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. |
30 | * - introduce random latency in processing | 30 | * - introduce random latency in processing |
31 | * - tell other peers to stop migration if our PUTs fail (or if | ||
32 | * we don't support migration per configuration?) | ||
33 | * - more statistics | 31 | * - more statistics |
34 | */ | 32 | */ |
35 | #include "platform.h" | 33 | #include "platform.h" |
@@ -190,6 +188,12 @@ struct ConnectedPeer | |||
190 | struct GNUNET_TIME_Absolute migration_blocked; | 188 | struct GNUNET_TIME_Absolute migration_blocked; |
191 | 189 | ||
192 | /** | 190 | /** |
191 | * Time until when we blocked this peer from migrating | ||
192 | * data to us. | ||
193 | */ | ||
194 | struct GNUNET_TIME_Absolute last_migration_block; | ||
195 | |||
196 | /** | ||
193 | * Handle for an active request for transmission to this | 197 | * Handle for an active request for transmission to this |
194 | * peer, or NULL. | 198 | * peer, or NULL. |
195 | */ | 199 | */ |
@@ -752,9 +756,14 @@ static int active_migration; | |||
752 | static double current_priorities; | 756 | static double current_priorities; |
753 | 757 | ||
754 | /** | 758 | /** |
755 | * Datastore load tracking. | 759 | * Datastore 'GET' load tracking. |
756 | */ | 760 | */ |
757 | static struct GNUNET_LOAD_Value *datastore_load; | 761 | static struct GNUNET_LOAD_Value *datastore_get_load; |
762 | |||
763 | /** | ||
764 | * Datastore 'PUT' load tracking. | ||
765 | */ | ||
766 | static struct GNUNET_LOAD_Value *datastore_put_load; | ||
758 | 767 | ||
759 | 768 | ||
760 | /** | 769 | /** |
@@ -769,7 +778,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start) | |||
769 | struct GNUNET_TIME_Relative delay; | 778 | struct GNUNET_TIME_Relative delay; |
770 | 779 | ||
771 | delay = GNUNET_TIME_absolute_get_duration (start); | 780 | delay = GNUNET_TIME_absolute_get_duration (start); |
772 | GNUNET_LOAD_update (datastore_load, | 781 | GNUNET_LOAD_update (datastore_get_load, |
773 | delay.value); | 782 | delay.value); |
774 | } | 783 | } |
775 | 784 | ||
@@ -1126,12 +1135,20 @@ destroy_pending_message (struct PendingMessage *pm, | |||
1126 | TransmissionContinuation cont; | 1135 | TransmissionContinuation cont; |
1127 | void *cont_cls; | 1136 | void *cont_cls; |
1128 | 1137 | ||
1129 | GNUNET_assert (pml->pm == pm); | 1138 | if (pml != NULL) |
1130 | GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); | 1139 | { |
1131 | cont = pm->cont; | 1140 | GNUNET_assert (pml->pm == pm); |
1132 | cont_cls = pm->cont_cls; | 1141 | GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); |
1133 | destroy_pending_message_list_entry (pml); | 1142 | cont = pm->cont; |
1134 | cont (cont_cls, tpid); | 1143 | cont_cls = pm->cont_cls; |
1144 | destroy_pending_message_list_entry (pml); | ||
1145 | } | ||
1146 | else | ||
1147 | { | ||
1148 | GNUNET_free (pm); | ||
1149 | } | ||
1150 | if (cont != NULL) | ||
1151 | cont (cont_cls, tpid); | ||
1135 | } | 1152 | } |
1136 | 1153 | ||
1137 | 1154 | ||
@@ -1636,8 +1653,10 @@ shutdown_task (void *cls, | |||
1636 | GNUNET_assert (0 == mig_size); | 1653 | GNUNET_assert (0 == mig_size); |
1637 | GNUNET_DHT_disconnect (dht_handle); | 1654 | GNUNET_DHT_disconnect (dht_handle); |
1638 | dht_handle = NULL; | 1655 | dht_handle = NULL; |
1639 | GNUNET_LOAD_value_free (datastore_load); | 1656 | GNUNET_LOAD_value_free (datastore_get_load); |
1640 | datastore_load = NULL; | 1657 | datastore_get_load = NULL; |
1658 | GNUNET_LOAD_value_free (datastore_put_load); | ||
1659 | datastore_put_load = NULL; | ||
1641 | GNUNET_BLOCK_context_destroy (block_ctx); | 1660 | GNUNET_BLOCK_context_destroy (block_ctx); |
1642 | block_ctx = NULL; | 1661 | block_ctx = NULL; |
1643 | GNUNET_CONFIGURATION_destroy (block_cfg); | 1662 | GNUNET_CONFIGURATION_destroy (block_cfg); |
@@ -1793,14 +1812,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, | |||
1793 | 1812 | ||
1794 | GNUNET_assert (pm->next == NULL); | 1813 | GNUNET_assert (pm->next == NULL); |
1795 | GNUNET_assert (pm->pml == NULL); | 1814 | GNUNET_assert (pm->pml == NULL); |
1796 | pml = GNUNET_malloc (sizeof (struct PendingMessageList)); | 1815 | if (pr != NULL) |
1797 | pml->req = pr; | 1816 | { |
1798 | pml->target = cp; | 1817 | pml = GNUNET_malloc (sizeof (struct PendingMessageList)); |
1799 | pml->pm = pm; | 1818 | pml->req = pr; |
1800 | pm->pml = pml; | 1819 | pml->target = cp; |
1801 | GNUNET_CONTAINER_DLL_insert (pr->pending_head, | 1820 | pml->pm = pm; |
1802 | pr->pending_tail, | 1821 | pm->pml = pml; |
1803 | pml); | 1822 | GNUNET_CONTAINER_DLL_insert (pr->pending_head, |
1823 | pr->pending_tail, | ||
1824 | pml); | ||
1825 | } | ||
1804 | pos = cp->pending_messages_head; | 1826 | pos = cp->pending_messages_head; |
1805 | while ( (pos != NULL) && | 1827 | while ( (pos != NULL) && |
1806 | (pm->priority < pos->priority) ) | 1828 | (pm->priority < pos->priority) ) |
@@ -2560,6 +2582,11 @@ struct ProcessReplyClosure | |||
2560 | * Did we finish processing the associated request? | 2582 | * Did we finish processing the associated request? |
2561 | */ | 2583 | */ |
2562 | int finished; | 2584 | int finished; |
2585 | |||
2586 | /** | ||
2587 | * Did we find a matching request? | ||
2588 | */ | ||
2589 | int request_found; | ||
2563 | }; | 2590 | }; |
2564 | 2591 | ||
2565 | 2592 | ||
@@ -2715,6 +2742,7 @@ process_reply (void *cls, | |||
2715 | prq->priority += pr->remaining_priority; | 2742 | prq->priority += pr->remaining_priority; |
2716 | pr->remaining_priority = 0; | 2743 | pr->remaining_priority = 0; |
2717 | pr->results_found++; | 2744 | pr->results_found++; |
2745 | prq->request_found = GNUNET_YES; | ||
2718 | if (NULL != pr->client_request_list) | 2746 | if (NULL != pr->client_request_list) |
2719 | { | 2747 | { |
2720 | GNUNET_STATISTICS_update (stats, | 2748 | GNUNET_STATISTICS_update (stats, |
@@ -2800,7 +2828,19 @@ put_migration_continuation (void *cls, | |||
2800 | int success, | 2828 | int success, |
2801 | const char *msg) | 2829 | const char *msg) |
2802 | { | 2830 | { |
2803 | /* FIXME */ | 2831 | struct GNUNET_TIME_Absolute *start = cls; |
2832 | struct GNUNET_TIME_Relative delay; | ||
2833 | |||
2834 | delay = GNUNET_TIME_absolute_get_duration (*start); | ||
2835 | GNUNET_free (start); | ||
2836 | GNUNET_LOAD_update (datastore_put_load, | ||
2837 | delay.value); | ||
2838 | if (GNUNET_OK == success) | ||
2839 | return; | ||
2840 | GNUNET_STATISTICS_update (stats, | ||
2841 | gettext_noop ("# datastore 'put' failures"), | ||
2842 | 1, | ||
2843 | GNUNET_NO); | ||
2804 | } | 2844 | } |
2805 | 2845 | ||
2806 | 2846 | ||
@@ -2830,6 +2870,12 @@ handle_p2p_put (void *cls, | |||
2830 | struct GNUNET_TIME_Absolute expiration; | 2870 | struct GNUNET_TIME_Absolute expiration; |
2831 | GNUNET_HashCode query; | 2871 | GNUNET_HashCode query; |
2832 | struct ProcessReplyClosure prq; | 2872 | struct ProcessReplyClosure prq; |
2873 | struct GNUNET_TIME_Absolute *start; | ||
2874 | struct GNUNET_TIME_Relative block_time; | ||
2875 | double putl; | ||
2876 | struct ConnectedPeer *cp; | ||
2877 | struct PendingMessage *pm; | ||
2878 | struct MigrationStopMessage *msm; | ||
2833 | 2879 | ||
2834 | msize = ntohs (message->size); | 2880 | msize = ntohs (message->size); |
2835 | if (msize < sizeof (struct PutMessage)) | 2881 | if (msize < sizeof (struct PutMessage)) |
@@ -2876,6 +2922,7 @@ handle_p2p_put (void *cls, | |||
2876 | prq.expiration = expiration; | 2922 | prq.expiration = expiration; |
2877 | prq.priority = 0; | 2923 | prq.priority = 0; |
2878 | prq.finished = GNUNET_NO; | 2924 | prq.finished = GNUNET_NO; |
2925 | prq.request_found = GNUNET_NO; | ||
2879 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | 2926 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, |
2880 | &query, | 2927 | &query, |
2881 | &process_reply, | 2928 | &process_reply, |
@@ -2893,6 +2940,8 @@ handle_p2p_put (void *cls, | |||
2893 | GNUNET_h2s (&query), | 2940 | GNUNET_h2s (&query), |
2894 | prq.priority); | 2941 | prq.priority); |
2895 | #endif | 2942 | #endif |
2943 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); | ||
2944 | *start = GNUNET_TIME_absolute_get (); | ||
2896 | GNUNET_DATASTORE_put (dsh, | 2945 | GNUNET_DATASTORE_put (dsh, |
2897 | 0, &query, dsize, &put[1], | 2946 | 0, &query, dsize, &put[1], |
2898 | type, prq.priority, 1 /* anonymity */, | 2947 | type, prq.priority, 1 /* anonymity */, |
@@ -2900,7 +2949,36 @@ handle_p2p_put (void *cls, | |||
2900 | 1 + prq.priority, MAX_DATASTORE_QUEUE, | 2949 | 1 + prq.priority, MAX_DATASTORE_QUEUE, |
2901 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | 2950 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, |
2902 | &put_migration_continuation, | 2951 | &put_migration_continuation, |
2903 | NULL); | 2952 | start); |
2953 | } | ||
2954 | putl = GNUNET_LOAD_get_load (datastore_put_load); | ||
2955 | if ( (GNUNET_NO == prq.request_found) && | ||
2956 | ( (GNUNET_YES != active_migration) || | ||
2957 | (putl > 2.0) ) ) | ||
2958 | { | ||
2959 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
2960 | &other->hashPubKey); | ||
2961 | if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000) | ||
2962 | return GNUNET_OK; /* already blocked */ | ||
2963 | /* We're too busy; send MigrationStop message! */ | ||
2964 | if (GNUNET_YES != active_migration) | ||
2965 | putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); | ||
2966 | block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
2967 | 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
2968 | (unsigned int) (60000 * putl * putl))); | ||
2969 | |||
2970 | cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); | ||
2971 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||
2972 | sizeof (struct MigrationStopMessage)); | ||
2973 | pm->msize = sizeof (struct MigrationStopMessage); | ||
2974 | pm->priority = UINT32_MAX; | ||
2975 | msm = (struct MigrationStopMessage*) &pm[1]; | ||
2976 | msm->header.size = htons (sizeof (struct MigrationStopMessage)); | ||
2977 | msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | ||
2978 | msm->duration = GNUNET_TIME_relative_hton (block_time); | ||
2979 | add_to_pending_messages_for_peer (cp, | ||
2980 | pm, | ||
2981 | NULL); | ||
2904 | } | 2982 | } |
2905 | return GNUNET_OK; | 2983 | return GNUNET_OK; |
2906 | } | 2984 | } |
@@ -2925,7 +3003,18 @@ handle_p2p_migration_stop (void *cls, | |||
2925 | struct GNUNET_TIME_Relative latency, | 3003 | struct GNUNET_TIME_Relative latency, |
2926 | uint32_t distance) | 3004 | uint32_t distance) |
2927 | { | 3005 | { |
2928 | // FIXME! | 3006 | struct ConnectedPeer *cp; |
3007 | const struct MigrationStopMessage *msm; | ||
3008 | |||
3009 | msm = (const struct MigrationStopMessage*) message; | ||
3010 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
3011 | &other->hashPubKey); | ||
3012 | if (cp == NULL) | ||
3013 | { | ||
3014 | GNUNET_break (0); | ||
3015 | return GNUNET_OK; | ||
3016 | } | ||
3017 | cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); | ||
2929 | return GNUNET_OK; | 3018 | return GNUNET_OK; |
2930 | } | 3019 | } |
2931 | 3020 | ||
@@ -3110,6 +3199,7 @@ process_local_reply (void *cls, | |||
3110 | prq.type = type; | 3199 | prq.type = type; |
3111 | prq.priority = priority; | 3200 | prq.priority = priority; |
3112 | prq.finished = GNUNET_NO; | 3201 | prq.finished = GNUNET_NO; |
3202 | prq.request_found = GNUNET_NO; | ||
3113 | process_reply (&prq, key, pr); | 3203 | process_reply (&prq, key, pr); |
3114 | if ( (old_rf == 0) && | 3204 | if ( (old_rf == 0) && |
3115 | (pr->results_found == 1) ) | 3205 | (pr->results_found == 1) ) |
@@ -3798,7 +3888,8 @@ run (void *cls, | |||
3798 | GNUNET_SCHEDULER_shutdown (sched); | 3888 | GNUNET_SCHEDULER_shutdown (sched); |
3799 | return; | 3889 | return; |
3800 | } | 3890 | } |
3801 | datastore_load = GNUNET_LOAD_value_init (); | 3891 | datastore_get_load = GNUNET_LOAD_value_init (); |
3892 | datastore_put_load = GNUNET_LOAD_value_init (); | ||
3802 | block_cfg = GNUNET_CONFIGURATION_create (); | 3893 | block_cfg = GNUNET_CONFIGURATION_create (); |
3803 | GNUNET_CONFIGURATION_set_value_string (block_cfg, | 3894 | GNUNET_CONFIGURATION_set_value_string (block_cfg, |
3804 | "block", | 3895 | "block", |
@@ -3821,8 +3912,10 @@ run (void *cls, | |||
3821 | block_ctx = NULL; | 3912 | block_ctx = NULL; |
3822 | GNUNET_CONFIGURATION_destroy (block_cfg); | 3913 | GNUNET_CONFIGURATION_destroy (block_cfg); |
3823 | block_cfg = NULL; | 3914 | block_cfg = NULL; |
3824 | GNUNET_LOAD_value_free (datastore_load); | 3915 | GNUNET_LOAD_value_free (datastore_get_load); |
3825 | datastore_load = NULL; | 3916 | datastore_get_load = NULL; |
3917 | GNUNET_LOAD_value_free (datastore_put_load); | ||
3918 | datastore_put_load = NULL; | ||
3826 | return; | 3919 | return; |
3827 | } | 3920 | } |
3828 | } | 3921 | } |