diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-03-14 19:04:15 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-03-14 19:04:15 +0000 |
commit | ca98a9146439dca8bc6349ada18e7cbe8ecae63c (patch) | |
tree | 43b04d0dbb31c91a885ada60fd5065177208bcd3 /src/fs/gnunet-service-fs_pe.c | |
parent | 34b96c3a763101c8291b06ac9f512a9b9fecd20f (diff) | |
download | gnunet-ca98a9146439dca8bc6349ada18e7cbe8ecae63c.tar.gz gnunet-ca98a9146439dca8bc6349ada18e7cbe8ecae63c.zip |
adding delay heap
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pe.c | 109 |
1 files changed, 82 insertions, 27 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index af20c4bf3..1c73c8dda 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c | |||
@@ -59,14 +59,24 @@ struct GSF_RequestPlan | |||
59 | struct GSF_PendingRequest *pr; | 59 | struct GSF_PendingRequest *pr; |
60 | 60 | ||
61 | /** | 61 | /** |
62 | * Earliest time we'd be happy to transmit this request. | 62 | * Earliest time we'd be happy to (re)transmit this request. |
63 | */ | 63 | */ |
64 | struct GNUNET_TIME_Absolute earliest_transmission; | 64 | struct GNUNET_TIME_Absolute earliest_transmission; |
65 | 65 | ||
66 | /** | 66 | /** |
67 | * Priority for this request for this target. | 67 | * When was the last time we transmitted this request to this peer? 0 for never. |
68 | */ | 68 | */ |
69 | uint32_t priority; | 69 | struct GNUNET_TIME_Absolute last_transmission; |
70 | |||
71 | /** | ||
72 | * Current priority for this request for this target. | ||
73 | */ | ||
74 | uint64_t priority; | ||
75 | |||
76 | /** | ||
77 | * How often did we transmit this request to this peer? | ||
78 | */ | ||
79 | unsigned int transmission_counter; | ||
70 | 80 | ||
71 | }; | 81 | }; |
72 | 82 | ||
@@ -77,9 +87,14 @@ struct GSF_RequestPlan | |||
77 | struct PeerPlan | 87 | struct PeerPlan |
78 | { | 88 | { |
79 | /** | 89 | /** |
80 | * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority. | 90 | * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority. |
81 | */ | 91 | */ |
82 | struct GNUNET_CONTAINER_Heap *heap; | 92 | struct GNUNET_CONTAINER_Heap *priority_heap; |
93 | |||
94 | /** | ||
95 | * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first. | ||
96 | */ | ||
97 | struct GNUNET_CONTAINER_Heap *delay_heap; | ||
83 | 98 | ||
84 | /** | 99 | /** |
85 | * Current transmission request handle. | 100 | * Current transmission request handle. |
@@ -114,16 +129,20 @@ static void | |||
114 | plan (struct PeerPlan *pp, | 129 | plan (struct PeerPlan *pp, |
115 | struct GSF_RequestPlan *rp) | 130 | struct GSF_RequestPlan *rp) |
116 | { | 131 | { |
117 | GNUNET_CONTAINER_HeapCostType weight; | ||
118 | struct GSF_PendingRequestData *prd; | 132 | struct GSF_PendingRequestData *prd; |
119 | 133 | ||
120 | prd = GSF_pending_request_get_data_ (rp->pr); | 134 | prd = GSF_pending_request_get_data_ (rp->pr); |
121 | weight = 0; // FIXME: calculate real weight! | ||
122 | // FIXME: calculate 'rp->earliest_transmission'! | 135 | // FIXME: calculate 'rp->earliest_transmission'! |
123 | // fIXME: claculate 'rp->priority'! | 136 | // fIXME: claculate 'rp->priority'! |
124 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap, | 137 | |
125 | rp, | 138 | if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) |
126 | weight); | 139 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, |
140 | rp, | ||
141 | rp->priority); | ||
142 | else | ||
143 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, | ||
144 | rp, | ||
145 | rp->earliest_transmission.abs_value); | ||
127 | } | 146 | } |
128 | 147 | ||
129 | 148 | ||
@@ -161,7 +180,7 @@ transmit_message_callback (void *cls, | |||
161 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | 180 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); |
162 | return 0; | 181 | return 0; |
163 | } | 182 | } |
164 | rp = GNUNET_CONTAINER_heap_peek (pp->heap); | 183 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); |
165 | msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); | 184 | msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); |
166 | if (msize > buf_size) | 185 | if (msize > buf_size) |
167 | { | 186 | { |
@@ -170,8 +189,10 @@ transmit_message_callback (void *cls, | |||
170 | return 0; | 189 | return 0; |
171 | } | 190 | } |
172 | /* remove from root, add again elsewhere... */ | 191 | /* remove from root, add again elsewhere... */ |
173 | GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap)); | 192 | GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); |
174 | rp->hn = NULL; | 193 | rp->hn = NULL; |
194 | rp->last_transmission = GNUNET_TIME_absolute_get (); | ||
195 | rp->transmission_counter++; | ||
175 | plan (pp, rp); | 196 | plan (pp, rp); |
176 | return msize; | 197 | return msize; |
177 | } | 198 | } |
@@ -191,24 +212,34 @@ schedule_peer_transmission (void *cls, | |||
191 | struct GSF_RequestPlan *rp; | 212 | struct GSF_RequestPlan *rp; |
192 | struct GSF_PendingRequestData *prd; | 213 | struct GSF_PendingRequestData *prd; |
193 | size_t msize; | 214 | size_t msize; |
194 | struct GNUNET_TIME_Relative delay; | ||
195 | 215 | ||
196 | pp->task = GNUNET_SCHEDULER_NO_TASK; | 216 | pp->task = GNUNET_SCHEDULER_NO_TASK; |
197 | if (NULL == pp->heap) | ||
198 | return; | ||
199 | if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap)) | ||
200 | return; | ||
201 | GNUNET_assert (NULL == pp->pth); | 217 | GNUNET_assert (NULL == pp->pth); |
202 | rp = GNUNET_CONTAINER_heap_peek (pp->heap); | 218 | /* move ready requests to priority queue */ |
203 | prd = GSF_pending_request_get_data_ (rp->pr); | 219 | while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && |
204 | delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission); | 220 | (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) |
205 | if (delay.rel_value > 0) | 221 | { |
222 | rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap); | ||
223 | GNUNET_CONTAINER_heap_insert (pp->priority_heap, | ||
224 | rp, | ||
225 | rp->priority); | ||
226 | if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) | ||
227 | break; | ||
228 | } | ||
229 | if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) | ||
206 | { | 230 | { |
207 | pp->task = GNUNET_SCHEDULER_add_delayed (delay, | 231 | /* priority heap (still) empty, check for delay... */ |
232 | rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); | ||
233 | if (NULL == rp) | ||
234 | return; /* both queues empty */ | ||
235 | pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), | ||
208 | &schedule_peer_transmission, | 236 | &schedule_peer_transmission, |
209 | pp); | 237 | pp); |
210 | return; | 238 | return; |
211 | } | 239 | } |
240 | /* process from priority heap */ | ||
241 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); | ||
242 | prd = GSF_pending_request_get_data_ (rp->pr); | ||
212 | msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); | 243 | msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); |
213 | pp->pth = GSF_peer_transmit_ (pp->cp, | 244 | pp->pth = GSF_peer_transmit_ (pp->cp, |
214 | GNUNET_YES, | 245 | GNUNET_YES, |
@@ -242,7 +273,8 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, | |||
242 | if (NULL == pp) | 273 | if (NULL == pp) |
243 | { | 274 | { |
244 | pp = GNUNET_malloc (sizeof (struct PeerPlan)); | 275 | pp = GNUNET_malloc (sizeof (struct PeerPlan)); |
245 | pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | 276 | pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX); |
277 | pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
246 | GNUNET_CONTAINER_multihashmap_put (plans, | 278 | GNUNET_CONTAINER_multihashmap_put (plans, |
247 | &id.hashPubKey, | 279 | &id.hashPubKey, |
248 | pp, | 280 | pp, |
@@ -255,10 +287,24 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, | |||
255 | prd->rp_tail, | 287 | prd->rp_tail, |
256 | rp); | 288 | rp); |
257 | plan (pp, rp); | 289 | plan (pp, rp); |
290 | if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) | ||
291 | { | ||
292 | /* no request that should be done immediately, figure out delay */ | ||
293 | if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap)) | ||
294 | return; /* did not change delay heap top, no need to do anything */ | ||
295 | GNUNET_assert (NULL == pp->pth); | ||
296 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) | ||
297 | GNUNET_SCHEDULER_cancel (pp->task); | ||
298 | pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), | ||
299 | &schedule_peer_transmission, | ||
300 | pp); | ||
301 | return; | ||
302 | } | ||
303 | |||
258 | if (pp->pth != NULL) | 304 | if (pp->pth != NULL) |
259 | { | 305 | { |
260 | if (rp != GNUNET_CONTAINER_heap_peek (pp->heap)) | 306 | if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap)) |
261 | return; | 307 | return; /* did not change priority heap top, no need to do anyhing */ |
262 | GSF_peer_transmit_cancel_ (pp->pth); | 308 | GSF_peer_transmit_cancel_ (pp->pth); |
263 | pp->pth = NULL; | 309 | pp->pth = NULL; |
264 | } | 310 | } |
@@ -293,7 +339,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) | |||
293 | GSF_peer_transmit_cancel_ (pp->pth); | 339 | GSF_peer_transmit_cancel_ (pp->pth); |
294 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) | 340 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) |
295 | GNUNET_SCHEDULER_cancel (pp->task); | 341 | GNUNET_SCHEDULER_cancel (pp->task); |
296 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap))) | 342 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) |
343 | { | ||
344 | prd = GSF_pending_request_get_data_ (rp->pr); | ||
345 | GNUNET_CONTAINER_DLL_remove (prd->rp_head, | ||
346 | prd->rp_tail, | ||
347 | rp); | ||
348 | GNUNET_free (rp); | ||
349 | } | ||
350 | GNUNET_CONTAINER_heap_destroy (pp->priority_heap); | ||
351 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) | ||
297 | { | 352 | { |
298 | prd = GSF_pending_request_get_data_ (rp->pr); | 353 | prd = GSF_pending_request_get_data_ (rp->pr); |
299 | GNUNET_CONTAINER_DLL_remove (prd->rp_head, | 354 | GNUNET_CONTAINER_DLL_remove (prd->rp_head, |
@@ -301,7 +356,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) | |||
301 | rp); | 356 | rp); |
302 | GNUNET_free (rp); | 357 | GNUNET_free (rp); |
303 | } | 358 | } |
304 | GNUNET_CONTAINER_heap_destroy (pp->heap); | 359 | GNUNET_CONTAINER_heap_destroy (pp->delay_heap); |
305 | GNUNET_free (pp); | 360 | GNUNET_free (pp); |
306 | } | 361 | } |
307 | 362 | ||