aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-06-07 22:08:36 +0000
committerChristian Grothoff <christian@grothoff.org>2011-06-07 22:08:36 +0000
commit4cb7e23cef8a149ac1334519ff898cc05811ac66 (patch)
tree74701b6ef23c97b508a8a382fc929ed4ca783bbf /src/fs/gnunet-service-fs_cp.c
parenta75ced83ea0041bc72e615743aa192f141ead643 (diff)
downloadgnunet-4cb7e23cef8a149ac1334519ff898cc05811ac66.tar.gz
gnunet-4cb7e23cef8a149ac1334519ff898cc05811ac66.zip
add support for delays
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c164
1 files changed, 158 insertions, 6 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 7123db73a..9686c7dfe 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -126,6 +126,45 @@ struct GSF_PeerTransmitHandle
126 126
127 127
128/** 128/**
129 * Handle for an entry in our delay list.
130 */
131struct GSF_DelayedHandle
132{
133
134 /**
135 * Kept in a doubly-linked list.
136 */
137 struct GSF_DelayedHandle *next;
138
139 /**
140 * Kept in a doubly-linked list.
141 */
142 struct GSF_DelayedHandle *prev;
143
144 /**
145 * Peer this transmission belongs to.
146 */
147 struct GSF_ConnectedPeer *cp;
148
149 /**
150 * The PUT that was delayed.
151 */
152 struct PutMessage *pm;
153
154 /**
155 * Task for the delay.
156 */
157 GNUNET_SCHEDULER_TaskIdentifier delay_task;
158
159 /**
160 * Size of the message.
161 */
162 size_t msize;
163
164};
165
166
167/**
129 * Information per peer and request. 168 * Information per peer and request.
130 */ 169 */
131struct PeerRequest 170struct PeerRequest
@@ -184,6 +223,18 @@ struct GSF_ConnectedPeer
184 struct GSF_PeerTransmitHandle *pth_tail; 223 struct GSF_PeerTransmitHandle *pth_tail;
185 224
186 /** 225 /**
226 * Messages (replies, queries, content migration) we would like to
227 * send to this peer in the near future. Sorted by priority, head.
228 */
229 struct GSF_DelayedHandle *delayed_head;
230
231 /**
232 * Messages (replies, queries, content migration) we would like to
233 * send to this peer in the near future. Sorted by priority, tail.
234 */
235 struct GSF_DelayedHandle *delayed_tail;
236
237 /**
187 * Migration stop message in our queue, or NULL if we have none pending. 238 * Migration stop message in our queue, or NULL if we have none pending.
188 */ 239 */
189 struct GSF_PeerTransmitHandle *migration_pth; 240 struct GSF_PeerTransmitHandle *migration_pth;
@@ -757,6 +808,61 @@ peer_request_destroy (void *cls,
757 808
758 809
759/** 810/**
811 * The artificial delay is over, transmit the message now.
812 *
813 * @param cls the 'struct GSF_DelayedHandle' with the message
814 * @param tc scheduler context
815 */
816static void
817transmit_delayed_now (void *cls,
818 const struct GNUNET_SCHEDULER_TaskContext *tc)
819{
820 struct GSF_DelayedHandle *dh = cls;
821 struct GSF_ConnectedPeer *cp = dh->cp;
822
823 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
824 cp->delayed_tail,
825 dh);
826 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
827 {
828 GNUNET_free (dh->pm);
829 GNUNET_free (dh);
830 return;
831 }
832 (void) GSF_peer_transmit_ (cp, GNUNET_NO,
833 UINT32_MAX,
834 REPLY_TIMEOUT,
835 dh->msize,
836 &copy_reply,
837 dh->pm);
838 GNUNET_free (dh);
839}
840
841
842/**
843 * Get the randomized delay a response should be subjected to.
844 *
845 * @return desired delay
846 */
847static struct GNUNET_TIME_Relative
848get_randomized_delay ()
849{
850 struct GNUNET_TIME_Relative ret;
851
852 /* FIXME: replace 5000 with something relating to current observed P2P message latency */
853 ret = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
854 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
855 5000));
856 GNUNET_STATISTICS_update (GSF_stats,
857 gettext_noop ("# artificial delays introduced (ms)"),
858 ret.rel_value,
859 GNUNET_NO);
860
861 return ret;
862}
863
864
865/**
760 * Handle a reply to a pending request. Also called if a request 866 * Handle a reply to a pending request. Also called if a request
761 * expires (then with data == NULL). The handler may be called 867 * expires (then with data == NULL). The handler may be called
762 * many times (depending on the request type), but will not be 868 * many times (depending on the request type), but will not be
@@ -767,6 +873,7 @@ peer_request_destroy (void *cls,
767 * @param cls 'struct PeerRequest' this is an answer for 873 * @param cls 'struct PeerRequest' this is an answer for
768 * @param eval evaluation of the result 874 * @param eval evaluation of the result
769 * @param pr handle to the original pending request 875 * @param pr handle to the original pending request
876 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
770 * @param expiration when does 'data' expire? 877 * @param expiration when does 'data' expire?
771 * @param type type of the block 878 * @param type type of the block
772 * @param data response data, NULL on request expiration 879 * @param data response data, NULL on request expiration
@@ -776,6 +883,7 @@ static void
776handle_p2p_reply (void *cls, 883handle_p2p_reply (void *cls,
777 enum GNUNET_BLOCK_EvaluationResult eval, 884 enum GNUNET_BLOCK_EvaluationResult eval,
778 struct GSF_PendingRequest *pr, 885 struct GSF_PendingRequest *pr,
886 uint32_t reply_anonymity_level,
779 struct GNUNET_TIME_Absolute expiration, 887 struct GNUNET_TIME_Absolute expiration,
780 enum GNUNET_BLOCK_Type type, 888 enum GNUNET_BLOCK_Type type,
781 const void *data, 889 const void *data,
@@ -825,18 +933,52 @@ handle_p2p_reply (void *cls,
825 GNUNET_break (0); 933 GNUNET_break (0);
826 return; 934 return;
827 } 935 }
936 if ( (reply_anonymity_level != UINT32_MAX) &&
937 (reply_anonymity_level > 1) )
938 {
939 if (reply_anonymity_level - 1 > GSF_cover_content_count)
940 {
941 GNUNET_STATISTICS_update (GSF_stats,
942 gettext_noop ("# replies dropped due to insufficient cover traffic"),
943 1,
944 GNUNET_NO);
945 return;
946 }
947 GSF_cover_content_count -= (reply_anonymity_level - 1);
948 }
949
828 pm = GNUNET_malloc (msize); 950 pm = GNUNET_malloc (msize);
829 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); 951 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
830 pm->header.size = htons (msize); 952 pm->header.size = htons (msize);
831 pm->type = htonl (type); 953 pm->type = htonl (type);
832 pm->expiration = GNUNET_TIME_absolute_hton (expiration); 954 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
833 memcpy (&pm[1], data, data_len); 955 memcpy (&pm[1], data, data_len);
834 (void) GSF_peer_transmit_ (cp, GNUNET_NO, 956 if ( (reply_anonymity_level != UINT32_MAX) &&
835 UINT32_MAX, 957 (reply_anonymity_level != 0) &&
836 REPLY_TIMEOUT, 958 (GSF_enable_randomized_delays == GNUNET_YES) )
837 msize, 959 {
838 &copy_reply, 960 struct GSF_DelayedHandle *dh;
839 pm); 961
962 dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
963 dh->cp = cp;
964 dh->pm = pm;
965 dh->msize = msize;
966 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
967 cp->delayed_tail,
968 dh);
969 dh->delay_task = GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
970 &transmit_delayed_now,
971 dh);
972 }
973 else
974 {
975 (void) GSF_peer_transmit_ (cp, GNUNET_NO,
976 UINT32_MAX,
977 REPLY_TIMEOUT,
978 msize,
979 &copy_reply,
980 pm);
981 }
840 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 982 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
841 return; 983 return;
842 if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task) 984 if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
@@ -1492,6 +1634,7 @@ GSF_peer_disconnect_handler_ (void *cls,
1492{ 1634{
1493 struct GSF_ConnectedPeer *cp; 1635 struct GSF_ConnectedPeer *cp;
1494 struct GSF_PeerTransmitHandle *pth; 1636 struct GSF_PeerTransmitHandle *pth;
1637 struct GSF_DelayedHandle *dh;
1495 1638
1496 cp = GNUNET_CONTAINER_multihashmap_get (cp_map, 1639 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1497 &peer->hashPubKey); 1640 &peer->hashPubKey);
@@ -1542,6 +1685,15 @@ GSF_peer_disconnect_handler_ (void *cls,
1542 GNUNET_assert (0 == pth->cth_in_progress); 1685 GNUNET_assert (0 == pth->cth_in_progress);
1543 GNUNET_free (pth); 1686 GNUNET_free (pth);
1544 } 1687 }
1688 while (NULL != (dh = cp->delayed_head))
1689 {
1690 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1691 cp->delayed_tail,
1692 dh);
1693 GNUNET_SCHEDULER_cancel (dh->delay_task);
1694 GNUNET_free (dh->pm);
1695 GNUNET_free (dh);
1696 }
1545 GNUNET_PEER_change_rc (cp->ppd.pid, -1); 1697 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1546 if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task) 1698 if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1547 { 1699 {