aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs.c')
-rw-r--r--src/fs/gnunet-service-fs.c147
1 files changed, 120 insertions, 27 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 9e53d2fd5..2bfdeb674 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -28,8 +28,6 @@
28 * - consider more precise latency estimation (per-peer & request) -- again load API? 28 * - consider more precise latency estimation (per-peer & request) -- again load API?
29 * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. 29 * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30 * - introduce random latency in processing 30 * - introduce random latency in processing
31 * - tell other peers to stop migration if our PUTs fail (or if
32 * we don't support migration per configuration?)
33 * - more statistics 31 * - more statistics
34 */ 32 */
35#include "platform.h" 33#include "platform.h"
@@ -190,6 +188,12 @@ struct ConnectedPeer
190 struct GNUNET_TIME_Absolute migration_blocked; 188 struct GNUNET_TIME_Absolute migration_blocked;
191 189
192 /** 190 /**
191 * Time until when we blocked this peer from migrating
192 * data to us.
193 */
194 struct GNUNET_TIME_Absolute last_migration_block;
195
196 /**
193 * Handle for an active request for transmission to this 197 * Handle for an active request for transmission to this
194 * peer, or NULL. 198 * peer, or NULL.
195 */ 199 */
@@ -752,9 +756,14 @@ static int active_migration;
752static double current_priorities; 756static double current_priorities;
753 757
754/** 758/**
755 * Datastore load tracking. 759 * Datastore 'GET' load tracking.
756 */ 760 */
757static struct GNUNET_LOAD_Value *datastore_load; 761static struct GNUNET_LOAD_Value *datastore_get_load;
762
763/**
764 * Datastore 'PUT' load tracking.
765 */
766static struct GNUNET_LOAD_Value *datastore_put_load;
758 767
759 768
760/** 769/**
@@ -769,7 +778,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start)
769 struct GNUNET_TIME_Relative delay; 778 struct GNUNET_TIME_Relative delay;
770 779
771 delay = GNUNET_TIME_absolute_get_duration (start); 780 delay = GNUNET_TIME_absolute_get_duration (start);
772 GNUNET_LOAD_update (datastore_load, 781 GNUNET_LOAD_update (datastore_get_load,
773 delay.value); 782 delay.value);
774} 783}
775 784
@@ -1126,12 +1135,20 @@ destroy_pending_message (struct PendingMessage *pm,
1126 TransmissionContinuation cont; 1135 TransmissionContinuation cont;
1127 void *cont_cls; 1136 void *cont_cls;
1128 1137
1129 GNUNET_assert (pml->pm == pm); 1138 if (pml != NULL)
1130 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); 1139 {
1131 cont = pm->cont; 1140 GNUNET_assert (pml->pm == pm);
1132 cont_cls = pm->cont_cls; 1141 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1133 destroy_pending_message_list_entry (pml); 1142 cont = pm->cont;
1134 cont (cont_cls, tpid); 1143 cont_cls = pm->cont_cls;
1144 destroy_pending_message_list_entry (pml);
1145 }
1146 else
1147 {
1148 GNUNET_free (pm);
1149 }
1150 if (cont != NULL)
1151 cont (cont_cls, tpid);
1135} 1152}
1136 1153
1137 1154
@@ -1636,8 +1653,10 @@ shutdown_task (void *cls,
1636 GNUNET_assert (0 == mig_size); 1653 GNUNET_assert (0 == mig_size);
1637 GNUNET_DHT_disconnect (dht_handle); 1654 GNUNET_DHT_disconnect (dht_handle);
1638 dht_handle = NULL; 1655 dht_handle = NULL;
1639 GNUNET_LOAD_value_free (datastore_load); 1656 GNUNET_LOAD_value_free (datastore_get_load);
1640 datastore_load = NULL; 1657 datastore_get_load = NULL;
1658 GNUNET_LOAD_value_free (datastore_put_load);
1659 datastore_put_load = NULL;
1641 GNUNET_BLOCK_context_destroy (block_ctx); 1660 GNUNET_BLOCK_context_destroy (block_ctx);
1642 block_ctx = NULL; 1661 block_ctx = NULL;
1643 GNUNET_CONFIGURATION_destroy (block_cfg); 1662 GNUNET_CONFIGURATION_destroy (block_cfg);
@@ -1793,14 +1812,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1793 1812
1794 GNUNET_assert (pm->next == NULL); 1813 GNUNET_assert (pm->next == NULL);
1795 GNUNET_assert (pm->pml == NULL); 1814 GNUNET_assert (pm->pml == NULL);
1796 pml = GNUNET_malloc (sizeof (struct PendingMessageList)); 1815 if (pr != NULL)
1797 pml->req = pr; 1816 {
1798 pml->target = cp; 1817 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1799 pml->pm = pm; 1818 pml->req = pr;
1800 pm->pml = pml; 1819 pml->target = cp;
1801 GNUNET_CONTAINER_DLL_insert (pr->pending_head, 1820 pml->pm = pm;
1802 pr->pending_tail, 1821 pm->pml = pml;
1803 pml); 1822 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1823 pr->pending_tail,
1824 pml);
1825 }
1804 pos = cp->pending_messages_head; 1826 pos = cp->pending_messages_head;
1805 while ( (pos != NULL) && 1827 while ( (pos != NULL) &&
1806 (pm->priority < pos->priority) ) 1828 (pm->priority < pos->priority) )
@@ -2560,6 +2582,11 @@ struct ProcessReplyClosure
2560 * Did we finish processing the associated request? 2582 * Did we finish processing the associated request?
2561 */ 2583 */
2562 int finished; 2584 int finished;
2585
2586 /**
2587 * Did we find a matching request?
2588 */
2589 int request_found;
2563}; 2590};
2564 2591
2565 2592
@@ -2715,6 +2742,7 @@ process_reply (void *cls,
2715 prq->priority += pr->remaining_priority; 2742 prq->priority += pr->remaining_priority;
2716 pr->remaining_priority = 0; 2743 pr->remaining_priority = 0;
2717 pr->results_found++; 2744 pr->results_found++;
2745 prq->request_found = GNUNET_YES;
2718 if (NULL != pr->client_request_list) 2746 if (NULL != pr->client_request_list)
2719 { 2747 {
2720 GNUNET_STATISTICS_update (stats, 2748 GNUNET_STATISTICS_update (stats,
@@ -2800,7 +2828,19 @@ put_migration_continuation (void *cls,
2800 int success, 2828 int success,
2801 const char *msg) 2829 const char *msg)
2802{ 2830{
2803 /* FIXME */ 2831 struct GNUNET_TIME_Absolute *start = cls;
2832 struct GNUNET_TIME_Relative delay;
2833
2834 delay = GNUNET_TIME_absolute_get_duration (*start);
2835 GNUNET_free (start);
2836 GNUNET_LOAD_update (datastore_put_load,
2837 delay.value);
2838 if (GNUNET_OK == success)
2839 return;
2840 GNUNET_STATISTICS_update (stats,
2841 gettext_noop ("# datastore 'put' failures"),
2842 1,
2843 GNUNET_NO);
2804} 2844}
2805 2845
2806 2846
@@ -2830,6 +2870,12 @@ handle_p2p_put (void *cls,
2830 struct GNUNET_TIME_Absolute expiration; 2870 struct GNUNET_TIME_Absolute expiration;
2831 GNUNET_HashCode query; 2871 GNUNET_HashCode query;
2832 struct ProcessReplyClosure prq; 2872 struct ProcessReplyClosure prq;
2873 struct GNUNET_TIME_Absolute *start;
2874 struct GNUNET_TIME_Relative block_time;
2875 double putl;
2876 struct ConnectedPeer *cp;
2877 struct PendingMessage *pm;
2878 struct MigrationStopMessage *msm;
2833 2879
2834 msize = ntohs (message->size); 2880 msize = ntohs (message->size);
2835 if (msize < sizeof (struct PutMessage)) 2881 if (msize < sizeof (struct PutMessage))
@@ -2876,6 +2922,7 @@ handle_p2p_put (void *cls,
2876 prq.expiration = expiration; 2922 prq.expiration = expiration;
2877 prq.priority = 0; 2923 prq.priority = 0;
2878 prq.finished = GNUNET_NO; 2924 prq.finished = GNUNET_NO;
2925 prq.request_found = GNUNET_NO;
2879 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, 2926 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2880 &query, 2927 &query,
2881 &process_reply, 2928 &process_reply,
@@ -2893,6 +2940,8 @@ handle_p2p_put (void *cls,
2893 GNUNET_h2s (&query), 2940 GNUNET_h2s (&query),
2894 prq.priority); 2941 prq.priority);
2895#endif 2942#endif
2943 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
2944 *start = GNUNET_TIME_absolute_get ();
2896 GNUNET_DATASTORE_put (dsh, 2945 GNUNET_DATASTORE_put (dsh,
2897 0, &query, dsize, &put[1], 2946 0, &query, dsize, &put[1],
2898 type, prq.priority, 1 /* anonymity */, 2947 type, prq.priority, 1 /* anonymity */,
@@ -2900,7 +2949,36 @@ handle_p2p_put (void *cls,
2900 1 + prq.priority, MAX_DATASTORE_QUEUE, 2949 1 + prq.priority, MAX_DATASTORE_QUEUE,
2901 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 2950 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2902 &put_migration_continuation, 2951 &put_migration_continuation,
2903 NULL); 2952 start);
2953 }
2954 putl = GNUNET_LOAD_get_load (datastore_put_load);
2955 if ( (GNUNET_NO == prq.request_found) &&
2956 ( (GNUNET_YES != active_migration) ||
2957 (putl > 2.0) ) )
2958 {
2959 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2960 &other->hashPubKey);
2961 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
2962 return GNUNET_OK; /* already blocked */
2963 /* We're too busy; send MigrationStop message! */
2964 if (GNUNET_YES != active_migration)
2965 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
2966 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2967 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2968 (unsigned int) (60000 * putl * putl)));
2969
2970 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
2971 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
2972 sizeof (struct MigrationStopMessage));
2973 pm->msize = sizeof (struct MigrationStopMessage);
2974 pm->priority = UINT32_MAX;
2975 msm = (struct MigrationStopMessage*) &pm[1];
2976 msm->header.size = htons (sizeof (struct MigrationStopMessage));
2977 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
2978 msm->duration = GNUNET_TIME_relative_hton (block_time);
2979 add_to_pending_messages_for_peer (cp,
2980 pm,
2981 NULL);
2904 } 2982 }
2905 return GNUNET_OK; 2983 return GNUNET_OK;
2906} 2984}
@@ -2925,7 +3003,18 @@ handle_p2p_migration_stop (void *cls,
2925 struct GNUNET_TIME_Relative latency, 3003 struct GNUNET_TIME_Relative latency,
2926 uint32_t distance) 3004 uint32_t distance)
2927{ 3005{
2928 // FIXME! 3006 struct ConnectedPeer *cp;
3007 const struct MigrationStopMessage *msm;
3008
3009 msm = (const struct MigrationStopMessage*) message;
3010 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3011 &other->hashPubKey);
3012 if (cp == NULL)
3013 {
3014 GNUNET_break (0);
3015 return GNUNET_OK;
3016 }
3017 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
2929 return GNUNET_OK; 3018 return GNUNET_OK;
2930} 3019}
2931 3020
@@ -3110,6 +3199,7 @@ process_local_reply (void *cls,
3110 prq.type = type; 3199 prq.type = type;
3111 prq.priority = priority; 3200 prq.priority = priority;
3112 prq.finished = GNUNET_NO; 3201 prq.finished = GNUNET_NO;
3202 prq.request_found = GNUNET_NO;
3113 process_reply (&prq, key, pr); 3203 process_reply (&prq, key, pr);
3114 if ( (old_rf == 0) && 3204 if ( (old_rf == 0) &&
3115 (pr->results_found == 1) ) 3205 (pr->results_found == 1) )
@@ -3798,7 +3888,8 @@ run (void *cls,
3798 GNUNET_SCHEDULER_shutdown (sched); 3888 GNUNET_SCHEDULER_shutdown (sched);
3799 return; 3889 return;
3800 } 3890 }
3801 datastore_load = GNUNET_LOAD_value_init (); 3891 datastore_get_load = GNUNET_LOAD_value_init ();
3892 datastore_put_load = GNUNET_LOAD_value_init ();
3802 block_cfg = GNUNET_CONFIGURATION_create (); 3893 block_cfg = GNUNET_CONFIGURATION_create ();
3803 GNUNET_CONFIGURATION_set_value_string (block_cfg, 3894 GNUNET_CONFIGURATION_set_value_string (block_cfg,
3804 "block", 3895 "block",
@@ -3821,8 +3912,10 @@ run (void *cls,
3821 block_ctx = NULL; 3912 block_ctx = NULL;
3822 GNUNET_CONFIGURATION_destroy (block_cfg); 3913 GNUNET_CONFIGURATION_destroy (block_cfg);
3823 block_cfg = NULL; 3914 block_cfg = NULL;
3824 GNUNET_LOAD_value_free (datastore_load); 3915 GNUNET_LOAD_value_free (datastore_get_load);
3825 datastore_load = NULL; 3916 datastore_get_load = NULL;
3917 GNUNET_LOAD_value_free (datastore_put_load);
3918 datastore_put_load = NULL;
3826 return; 3919 return;
3827 } 3920 }
3828} 3921}