aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-14 19:04:15 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-14 19:04:15 +0000
commitca98a9146439dca8bc6349ada18e7cbe8ecae63c (patch)
tree43b04d0dbb31c91a885ada60fd5065177208bcd3 /src/fs/gnunet-service-fs_pe.c
parent34b96c3a763101c8291b06ac9f512a9b9fecd20f (diff)
downloadgnunet-ca98a9146439dca8bc6349ada18e7cbe8ecae63c.tar.gz
gnunet-ca98a9146439dca8bc6349ada18e7cbe8ecae63c.zip
adding delay heap
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c109
1 files changed, 82 insertions, 27 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index af20c4bf3..1c73c8dda 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -59,14 +59,24 @@ struct GSF_RequestPlan
59 struct GSF_PendingRequest *pr; 59 struct GSF_PendingRequest *pr;
60 60
61 /** 61 /**
62 * Earliest time we'd be happy to transmit this request. 62 * Earliest time we'd be happy to (re)transmit this request.
63 */ 63 */
64 struct GNUNET_TIME_Absolute earliest_transmission; 64 struct GNUNET_TIME_Absolute earliest_transmission;
65 65
66 /** 66 /**
67 * Priority for this request for this target. 67 * When was the last time we transmitted this request to this peer? 0 for never.
68 */ 68 */
69 uint32_t priority; 69 struct GNUNET_TIME_Absolute last_transmission;
70
71 /**
72 * Current priority for this request for this target.
73 */
74 uint64_t priority;
75
76 /**
77 * How often did we transmit this request to this peer?
78 */
79 unsigned int transmission_counter;
70 80
71}; 81};
72 82
@@ -77,9 +87,14 @@ struct GSF_RequestPlan
77struct PeerPlan 87struct PeerPlan
78{ 88{
79 /** 89 /**
80 * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority. 90 * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
81 */ 91 */
82 struct GNUNET_CONTAINER_Heap *heap; 92 struct GNUNET_CONTAINER_Heap *priority_heap;
93
94 /**
95 * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
96 */
97 struct GNUNET_CONTAINER_Heap *delay_heap;
83 98
84 /** 99 /**
85 * Current transmission request handle. 100 * Current transmission request handle.
@@ -114,16 +129,20 @@ static void
114plan (struct PeerPlan *pp, 129plan (struct PeerPlan *pp,
115 struct GSF_RequestPlan *rp) 130 struct GSF_RequestPlan *rp)
116{ 131{
117 GNUNET_CONTAINER_HeapCostType weight;
118 struct GSF_PendingRequestData *prd; 132 struct GSF_PendingRequestData *prd;
119 133
120 prd = GSF_pending_request_get_data_ (rp->pr); 134 prd = GSF_pending_request_get_data_ (rp->pr);
121 weight = 0; // FIXME: calculate real weight!
122 // FIXME: calculate 'rp->earliest_transmission'! 135 // FIXME: calculate 'rp->earliest_transmission'!
123 // fIXME: claculate 'rp->priority'! 136 // fIXME: claculate 'rp->priority'!
124 rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap, 137
125 rp, 138 if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
126 weight); 139 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
140 rp,
141 rp->priority);
142 else
143 rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
144 rp,
145 rp->earliest_transmission.abs_value);
127} 146}
128 147
129 148
@@ -161,7 +180,7 @@ transmit_message_callback (void *cls,
161 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 180 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
162 return 0; 181 return 0;
163 } 182 }
164 rp = GNUNET_CONTAINER_heap_peek (pp->heap); 183 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
165 msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); 184 msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
166 if (msize > buf_size) 185 if (msize > buf_size)
167 { 186 {
@@ -170,8 +189,10 @@ transmit_message_callback (void *cls,
170 return 0; 189 return 0;
171 } 190 }
172 /* remove from root, add again elsewhere... */ 191 /* remove from root, add again elsewhere... */
173 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap)); 192 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
174 rp->hn = NULL; 193 rp->hn = NULL;
194 rp->last_transmission = GNUNET_TIME_absolute_get ();
195 rp->transmission_counter++;
175 plan (pp, rp); 196 plan (pp, rp);
176 return msize; 197 return msize;
177} 198}
@@ -191,24 +212,34 @@ schedule_peer_transmission (void *cls,
191 struct GSF_RequestPlan *rp; 212 struct GSF_RequestPlan *rp;
192 struct GSF_PendingRequestData *prd; 213 struct GSF_PendingRequestData *prd;
193 size_t msize; 214 size_t msize;
194 struct GNUNET_TIME_Relative delay;
195 215
196 pp->task = GNUNET_SCHEDULER_NO_TASK; 216 pp->task = GNUNET_SCHEDULER_NO_TASK;
197 if (NULL == pp->heap)
198 return;
199 if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
200 return;
201 GNUNET_assert (NULL == pp->pth); 217 GNUNET_assert (NULL == pp->pth);
202 rp = GNUNET_CONTAINER_heap_peek (pp->heap); 218 /* move ready requests to priority queue */
203 prd = GSF_pending_request_get_data_ (rp->pr); 219 while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
204 delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission); 220 (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
205 if (delay.rel_value > 0) 221 {
222 rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
223 GNUNET_CONTAINER_heap_insert (pp->priority_heap,
224 rp,
225 rp->priority);
226 if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
227 break;
228 }
229 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
206 { 230 {
207 pp->task = GNUNET_SCHEDULER_add_delayed (delay, 231 /* priority heap (still) empty, check for delay... */
232 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
233 if (NULL == rp)
234 return; /* both queues empty */
235 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
208 &schedule_peer_transmission, 236 &schedule_peer_transmission,
209 pp); 237 pp);
210 return; 238 return;
211 } 239 }
240 /* process from priority heap */
241 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
242 prd = GSF_pending_request_get_data_ (rp->pr);
212 msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); 243 msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
213 pp->pth = GSF_peer_transmit_ (pp->cp, 244 pp->pth = GSF_peer_transmit_ (pp->cp,
214 GNUNET_YES, 245 GNUNET_YES,
@@ -242,7 +273,8 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
242 if (NULL == pp) 273 if (NULL == pp)
243 { 274 {
244 pp = GNUNET_malloc (sizeof (struct PeerPlan)); 275 pp = GNUNET_malloc (sizeof (struct PeerPlan));
245 pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 276 pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
277 pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
246 GNUNET_CONTAINER_multihashmap_put (plans, 278 GNUNET_CONTAINER_multihashmap_put (plans,
247 &id.hashPubKey, 279 &id.hashPubKey,
248 pp, 280 pp,
@@ -255,10 +287,24 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
255 prd->rp_tail, 287 prd->rp_tail,
256 rp); 288 rp);
257 plan (pp, rp); 289 plan (pp, rp);
290 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
291 {
292 /* no request that should be done immediately, figure out delay */
293 if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
294 return; /* did not change delay heap top, no need to do anything */
295 GNUNET_assert (NULL == pp->pth);
296 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
297 GNUNET_SCHEDULER_cancel (pp->task);
298 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
299 &schedule_peer_transmission,
300 pp);
301 return;
302 }
303
258 if (pp->pth != NULL) 304 if (pp->pth != NULL)
259 { 305 {
260 if (rp != GNUNET_CONTAINER_heap_peek (pp->heap)) 306 if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
261 return; 307 return; /* did not change priority heap top, no need to do anyhing */
262 GSF_peer_transmit_cancel_ (pp->pth); 308 GSF_peer_transmit_cancel_ (pp->pth);
263 pp->pth = NULL; 309 pp->pth = NULL;
264 } 310 }
@@ -293,7 +339,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
293 GSF_peer_transmit_cancel_ (pp->pth); 339 GSF_peer_transmit_cancel_ (pp->pth);
294 if (GNUNET_SCHEDULER_NO_TASK != pp->task) 340 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
295 GNUNET_SCHEDULER_cancel (pp->task); 341 GNUNET_SCHEDULER_cancel (pp->task);
296 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap))) 342 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
343 {
344 prd = GSF_pending_request_get_data_ (rp->pr);
345 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
346 prd->rp_tail,
347 rp);
348 GNUNET_free (rp);
349 }
350 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
351 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
297 { 352 {
298 prd = GSF_pending_request_get_data_ (rp->pr); 353 prd = GSF_pending_request_get_data_ (rp->pr);
299 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 354 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
@@ -301,7 +356,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
301 rp); 356 rp);
302 GNUNET_free (rp); 357 GNUNET_free (rp);
303 } 358 }
304 GNUNET_CONTAINER_heap_destroy (pp->heap); 359 GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
305 GNUNET_free (pp); 360 GNUNET_free (pp);
306} 361}
307 362