aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-10-07 13:52:14 +0000
committerChristian Grothoff <christian@grothoff.org>2010-10-07 13:52:14 +0000
commit14d30b9759fad1d797fe0650c11b38b869163e51 (patch)
treee0a1b5b1005bf59bac72b0a8812ccea53c45cb79 /src/fs
parent5f0ced15a7ac2363341fc5657f93f5932bda2719 (diff)
downloadgnunet-14d30b9759fad1d797fe0650c11b38b869163e51.tar.gz
gnunet-14d30b9759fad1d797fe0650c11b38b869163e51.zip
adding support for artificial delays
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/fs.h6
-rw-r--r--src/fs/gnunet-service-fs.c151
2 files changed, 123 insertions, 34 deletions
diff --git a/src/fs/fs.h b/src/fs/fs.h
index 0eba5615c..d48af35b4 100644
--- a/src/fs/fs.h
+++ b/src/fs/fs.h
@@ -123,12 +123,6 @@
123#define AVAILABILITY_TRIALS_MAX 8 123#define AVAILABILITY_TRIALS_MAX 8
124 124
125/** 125/**
126 * By how much (in ms) do we decrement the TTL
127 * at each hop?
128 */
129#define TTL_DECREMENT 5000
130
131/**
132 * Length of the P2P success tracker. Note that 126 * Length of the P2P success tracker. Note that
133 * having a very long list can also hurt performance. 127 * having a very long list can also hurt performance.
134 */ 128 */
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index c2baf1ada..5118cb56c 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -45,6 +45,13 @@
45#define DEBUG_FS GNUNET_NO 45#define DEBUG_FS GNUNET_NO
46 46
47/** 47/**
48 * Should we introduce random latency in processing? Required for proper
49 * implementation of GAP, but can be disabled for performance evaluation of
50 * the basic routing algorithm.
51 */
52#define SUPPORT_DELAYS GNUNET_NO
53
54/**
48 * Maximum number of outgoing messages we queue per peer. 55 * Maximum number of outgoing messages we queue per peer.
49 */ 56 */
50#define MAX_QUEUE_PER_PEER 16 57#define MAX_QUEUE_PER_PEER 16
@@ -140,6 +147,11 @@ struct PendingMessage
140 void *cont_cls; 147 void *cont_cls;
141 148
142 /** 149 /**
150 * Do not transmit this pending message until this deadline.
151 */
152 struct GNUNET_TIME_Absolute delay_until;
153
154 /**
143 * Size of the reply; actual reply message follows 155 * Size of the reply; actual reply message follows
144 * at the end of this struct. 156 * at the end of this struct.
145 */ 157 */
@@ -226,6 +238,11 @@ struct ConnectedPeer
226 struct GNUNET_TIME_Absolute last_transmission_request_start; 238 struct GNUNET_TIME_Absolute last_transmission_request_start;
227 239
228 /** 240 /**
241 * ID of delay task for scheduling transmission.
242 */
243 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
244
245 /**
229 * Average priority of successful replies. Calculated 246 * Average priority of successful replies. Calculated
230 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n 247 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
231 */ 248 */
@@ -976,7 +993,7 @@ consider_migration (void *cls,
976 } 993 }
977 994
978 /* consider scheduling transmission to cp for content migration */ 995 /* consider scheduling transmission to cp for content migration */
979 if (cp->cth != NULL) 996 if (cp->cth != NULL)
980 return GNUNET_YES; 997 return GNUNET_YES;
981 msize = 0; 998 msize = 0;
982 pos = mig_head; 999 pos = mig_head;
@@ -1004,6 +1021,11 @@ consider_migration (void *cls,
1004 msize, 1021 msize,
1005 GNUNET_h2s (key)); 1022 GNUNET_h2s (key));
1006#endif 1023#endif
1024 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1025 {
1026 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1027 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1028 }
1007 cp->cth 1029 cp->cth
1008 = GNUNET_CORE_notify_transmit_ready (core, 1030 = GNUNET_CORE_notify_transmit_ready (core,
1009 0, GNUNET_TIME_UNIT_FOREVER_REL, 1031 0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1336,12 +1358,12 @@ destroy_pending_message (struct PendingMessage *pm,
1336 TransmissionContinuation cont; 1358 TransmissionContinuation cont;
1337 void *cont_cls; 1359 void *cont_cls;
1338 1360
1361 cont = pm->cont;
1362 cont_cls = pm->cont_cls;
1339 if (pml != NULL) 1363 if (pml != NULL)
1340 { 1364 {
1341 GNUNET_assert (pml->pm == pm); 1365 GNUNET_assert (pml->pm == pm);
1342 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); 1366 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1343 cont = pm->cont;
1344 cont_cls = pm->cont_cls;
1345 destroy_pending_message_list_entry (pml); 1367 destroy_pending_message_list_entry (pml);
1346 } 1368 }
1347 else 1369 else
@@ -1689,7 +1711,15 @@ peer_disconnect_handler (void *cls,
1689 GNUNET_PEER_change_rc (cp->pid, -1); 1711 GNUNET_PEER_change_rc (cp->pid, -1);
1690 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1712 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1691 if (NULL != cp->cth) 1713 if (NULL != cp->cth)
1692 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); 1714 {
1715 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1716 cp->cth = NULL;
1717 }
1718 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1719 {
1720 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1721 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1722 }
1693 while (NULL != (pm = cp->pending_messages_head)) 1723 while (NULL != (pm = cp->pending_messages_head))
1694 destroy_pending_message (pm, 0 /* delivery failed */); 1724 destroy_pending_message (pm, 0 /* delivery failed */);
1695 GNUNET_LOAD_value_free (cp->transmission_delay); 1725 GNUNET_LOAD_value_free (cp->transmission_delay);
@@ -1894,6 +1924,39 @@ shutdown_task (void *cls,
1894 1924
1895 1925
1896/** 1926/**
1927 * We've had to delay a request for transmission to core, but now
1928 * we should be ready. Run it.
1929 *
1930 * @param cls the 'struct ConnectedPeer' for which a request was delayed
1931 * @param tc task context (unused)
1932 */
1933static void
1934delayed_transmission_request (void *cls,
1935 const struct GNUNET_SCHEDULER_TaskContext *tc)
1936{
1937 struct ConnectedPeer *cp = cls;
1938 struct GNUNET_PeerIdentity pid;
1939 struct PendingMessage *pm;
1940
1941 pm = cp->pending_messages_head;
1942 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1943 GNUNET_assert (cp->cth == NULL);
1944 if (pm == NULL)
1945 return;
1946 GNUNET_PEER_resolve (cp->pid,
1947 &pid);
1948 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1949 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1950 pm->priority,
1951 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1952 &pid,
1953 pm->msize,
1954 &transmit_to_peer,
1955 cp);
1956}
1957
1958
1959/**
1897 * Transmit messages by copying it to the target buffer 1960 * Transmit messages by copying it to the target buffer
1898 * "buf". "buf" will be NULL and "size" zero if the socket was closed 1961 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1899 * for writing in the meantime. In that case, do nothing 1962 * for writing in the meantime. In that case, do nothing
@@ -1912,13 +1975,16 @@ transmit_to_peer (void *cls,
1912{ 1975{
1913 struct ConnectedPeer *cp = cls; 1976 struct ConnectedPeer *cp = cls;
1914 char *cbuf = buf; 1977 char *cbuf = buf;
1915 struct GNUNET_PeerIdentity pid;
1916 struct PendingMessage *pm; 1978 struct PendingMessage *pm;
1979 struct PendingMessage *next_pm;
1980 struct GNUNET_TIME_Absolute now;
1981 struct GNUNET_TIME_Relative min_delay;
1917 struct MigrationReadyBlock *mb; 1982 struct MigrationReadyBlock *mb;
1918 struct MigrationReadyBlock *next; 1983 struct MigrationReadyBlock *next;
1919 struct PutMessage migm; 1984 struct PutMessage migm;
1920 size_t msize; 1985 size_t msize;
1921 unsigned int i; 1986 unsigned int i;
1987 struct GNUNET_PeerIdentity pid;
1922 1988
1923 cp->cth = NULL; 1989 cp->cth = NULL;
1924 if (NULL == buf) 1990 if (NULL == buf)
@@ -1930,33 +1996,48 @@ transmit_to_peer (void *cls,
1930 GNUNET_LOAD_update (cp->transmission_delay, 1996 GNUNET_LOAD_update (cp->transmission_delay,
1931 UINT64_MAX); 1997 UINT64_MAX);
1932 return 0; 1998 return 0;
1933 } 1999 }
1934 GNUNET_LOAD_update (cp->transmission_delay, 2000 GNUNET_LOAD_update (cp->transmission_delay,
1935 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value); 2001 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2002 now = GNUNET_TIME_absolute_get ();
1936 msize = 0; 2003 msize = 0;
1937 while ( (NULL != (pm = cp->pending_messages_head) ) && 2004 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2005 next_pm = cp->pending_messages_head;
2006 while ( (NULL != (pm = next_pm) ) &&
1938 (pm->msize <= size) ) 2007 (pm->msize <= size) )
1939 { 2008 {
2009 next_pm = pm->next;
2010 if (pm->delay_until.value > now.value)
2011 {
2012 min_delay = GNUNET_TIME_relative_min (min_delay,
2013 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2014 continue;
2015 }
1940 memcpy (&cbuf[msize], &pm[1], pm->msize); 2016 memcpy (&cbuf[msize], &pm[1], pm->msize);
1941 msize += pm->msize; 2017 msize += pm->msize;
1942 size -= pm->msize; 2018 size -= pm->msize;
2019 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2020 cp->pending_messages_tail,
2021 pm);
2022 if (NULL == pm->pml)
2023 cp->pending_requests--;
1943 destroy_pending_message (pm, cp->pid); 2024 destroy_pending_message (pm, cp->pid);
1944 } 2025 }
1945 if (NULL != pm) 2026 if (pm != NULL)
1946 { 2027 min_delay = GNUNET_TIME_UNIT_ZERO;
1947 GNUNET_PEER_resolve (cp->pid, 2028 if (NULL != cp->pending_messages_head)
1948 &pid); 2029 {
1949 cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); 2030 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
1950 cp->cth = GNUNET_CORE_notify_transmit_ready (core, 2031 cp->delayed_transmission_request_task
1951 pm->priority, 2032 = GNUNET_SCHEDULER_add_delayed (sched,
1952 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 2033 min_delay,
1953 &pid, 2034 &delayed_transmission_request,
1954 pm->msize, 2035 cp);
1955 &transmit_to_peer,
1956 cp);
1957 } 2036 }
1958 else 2037 if (pm == NULL)
1959 { 2038 {
2039 GNUNET_PEER_resolve (cp->pid,
2040 &pid);
1960 next = mig_head; 2041 next = mig_head;
1961 while (NULL != (mb = next)) 2042 while (NULL != (mb = next))
1962 { 2043 {
@@ -1984,7 +2065,7 @@ transmit_to_peer (void *cls,
1984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2065 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985 "Pushing migration block `%s' (%u bytes) to `%s'\n", 2066 "Pushing migration block `%s' (%u bytes) to `%s'\n",
1986 GNUNET_h2s (&mb->query), 2067 GNUNET_h2s (&mb->query),
1987 mb->size, 2068 (unsigned int) mb->size,
1988 GNUNET_i2s (&pid)); 2069 GNUNET_i2s (&pid));
1989#endif 2070#endif
1990 break; 2071 break;
@@ -1995,7 +2076,7 @@ transmit_to_peer (void *cls,
1995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", 2077 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1997 GNUNET_h2s (&mb->query), 2078 GNUNET_h2s (&mb->query),
1998 mb->size, 2079 (unsigned int) mb->size,
1999 GNUNET_i2s (&pid)); 2080 GNUNET_i2s (&pid));
2000#endif 2081#endif
2001 } 2082 }
@@ -2013,9 +2094,9 @@ transmit_to_peer (void *cls,
2013 } 2094 }
2014#if DEBUG_FS 2095#if DEBUG_FS
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "Transmitting %u bytes to peer %u\n", 2097 "Transmitting %u bytes to peer with PID %u\n",
2017 msize, 2098 (unsigned int) msize,
2018 cp->pid); 2099 (unsigned int) cp->pid);
2019#endif 2100#endif
2020 return msize; 2101 return msize;
2021} 2102}
@@ -2063,7 +2144,15 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2063 destroy_pending_message (cp->pending_messages_tail, 0); 2144 destroy_pending_message (cp->pending_messages_tail, 0);
2064 GNUNET_PEER_resolve (cp->pid, &pid); 2145 GNUNET_PEER_resolve (cp->pid, &pid);
2065 if (NULL != cp->cth) 2146 if (NULL != cp->cth)
2066 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); 2147 {
2148 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2149 cp->cth = NULL;
2150 }
2151 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2152 {
2153 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2154 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2155 }
2067 /* need to schedule transmission */ 2156 /* need to schedule transmission */
2068 cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); 2157 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2069 cp->cth = GNUNET_CORE_notify_transmit_ready (core, 2158 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
@@ -3119,6 +3208,12 @@ process_reply (void *cls,
3119 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); 3208 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3120 reply->cont = &transmit_reply_continuation; 3209 reply->cont = &transmit_reply_continuation;
3121 reply->cont_cls = pr; 3210 reply->cont_cls = pr;
3211#if SUPPORT_DELAYS
3212 reply->delay_until
3213 = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3214 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3215 TTL_DECREMENT)));
3216#endif
3122 reply->msize = msize; 3217 reply->msize = msize;
3123 reply->priority = UINT32_MAX; /* send replies first! */ 3218 reply->priority = UINT32_MAX; /* send replies first! */
3124 pm = (struct PutMessage*) &reply[1]; 3219 pm = (struct PutMessage*) &reply[1];
@@ -3557,10 +3652,10 @@ process_local_reply (void *cls,
3557 prq.priority = priority; 3652 prq.priority = priority;
3558 prq.finished = GNUNET_NO; 3653 prq.finished = GNUNET_NO;
3559 prq.request_found = GNUNET_NO; 3654 prq.request_found = GNUNET_NO;
3560 process_reply (&prq, key, pr);
3561 if ( (old_rf == 0) && 3655 if ( (old_rf == 0) &&
3562 (pr->results_found == 1) ) 3656 (pr->results_found == 0) )
3563 update_datastore_delays (pr->start_time); 3657 update_datastore_delays (pr->start_time);
3658 process_reply (&prq, key, pr);
3564 if (prq.finished == GNUNET_YES) 3659 if (prq.finished == GNUNET_YES)
3565 return; 3660 return;
3566 if (pr->qe == NULL) 3661 if (pr->qe == NULL)