aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c374
1 files changed, 165 insertions, 209 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 789642fc6..d3ab5026b 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -43,7 +43,7 @@ struct PendingRequestList;
43 */ 43 */
44struct GSF_RequestPlanReference 44struct GSF_RequestPlanReference
45{ 45{
46 46
47 /** 47 /**
48 * This is a doubly-linked list. 48 * This is a doubly-linked list.
49 */ 49 */
@@ -123,12 +123,12 @@ struct GSF_RequestPlan
123 /** 123 /**
124 * Head of list of associated pending requests. 124 * Head of list of associated pending requests.
125 */ 125 */
126 struct PendingRequestList *prl_head; 126 struct PendingRequestList *prl_head;
127 127
128 /** 128 /**
129 * Tail of list of associated pending requests. 129 * Tail of list of associated pending requests.
130 */ 130 */
131 struct PendingRequestList *prl_tail; 131 struct PendingRequestList *prl_tail;
132 132
133 /** 133 /**
134 * Earliest time we'd be happy to (re)transmit this request. 134 * Earliest time we'd be happy to (re)transmit this request.
@@ -209,7 +209,7 @@ static unsigned long long plan_count;
209 */ 209 */
210static void 210static void
211schedule_peer_transmission (void *cls, 211schedule_peer_transmission (void *cls,
212 const struct GNUNET_SCHEDULER_TaskContext *tc); 212 const struct GNUNET_SCHEDULER_TaskContext *tc);
213 213
214 214
215/** 215/**
@@ -219,42 +219,36 @@ schedule_peer_transmission (void *cls,
219 * @param rp request to plan 219 * @param rp request to plan
220 */ 220 */
221static void 221static void
222plan (struct PeerPlan *pp, 222plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
223 struct GSF_RequestPlan *rp)
224{ 223{
225 struct GSF_PendingRequestData *prd; 224 struct GSF_PendingRequestData *prd;
226 struct GNUNET_TIME_Relative delay; 225 struct GNUNET_TIME_Relative delay;
227 226
228 GNUNET_STATISTICS_set (GSF_stats, 227 GNUNET_STATISTICS_set (GSF_stats,
229 gettext_noop ("# average retransmission delay (ms)"), 228 gettext_noop ("# average retransmission delay (ms)"),
230 total_delay * 1000LL / plan_count, 229 total_delay * 1000LL / plan_count, GNUNET_NO);
231 GNUNET_NO);
232 prd = GSF_pending_request_get_data_ (rp->prl_head->pr); 230 prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
233 // FIXME: calculate 'rp->priority'! 231 // FIXME: calculate 'rp->priority'!
234 if (rp->transmission_counter < 32) 232 if (rp->transmission_counter < 32)
235 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 233 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
236 1LL << rp->transmission_counter); 234 1LL << rp->transmission_counter);
237 else 235 else
238 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 236 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
239 UINT_MAX); 237 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
240 rp->earliest_transmission
241 = GNUNET_TIME_relative_to_absolute (delay);
242#if DEBUG_FS 238#if DEBUG_FS
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Earliest (re)transmission for `%s' in %us\n", 240 "Earliest (re)transmission for `%s' in %us\n",
245 GNUNET_h2s (&prd->query), 241 GNUNET_h2s (&prd->query), rp->transmission_counter);
246 rp->transmission_counter); 242#endif
247#endif
248 243
249 GNUNET_assert (rp->hn == NULL); 244 GNUNET_assert (rp->hn == NULL);
250 if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) 245 if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
251 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, 246 == 0)
252 rp, 247 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
253 rp->priority);
254 else 248 else
255 rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, 249 rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
256 rp, 250 rp,
257 rp->earliest_transmission.abs_value); 251 rp->earliest_transmission.abs_value);
258 if (GNUNET_SCHEDULER_NO_TASK != pp->task) 252 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
259 GNUNET_SCHEDULER_cancel (pp->task); 253 GNUNET_SCHEDULER_cancel (pp->task);
260 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 254 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
@@ -277,13 +271,13 @@ get_latest (const struct GSF_RequestPlan *rp)
277 ret = prl->pr; 271 ret = prl->pr;
278 prl = prl->next; 272 prl = prl->next;
279 while (NULL != prl) 273 while (NULL != prl)
280 { 274 {
281 if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value > 275 if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
282 GSF_pending_request_get_data_ (ret)->ttl.abs_value) 276 GSF_pending_request_get_data_ (ret)->ttl.abs_value)
283 ret = prl->pr; 277 ret = prl->pr;
284 prl = prl->next; 278 prl = prl->next;
285 } 279 }
286 return ret; 280 return ret;
287} 281}
288 282
289 283
@@ -295,10 +289,8 @@ get_latest (const struct GSF_RequestPlan *rp)
295 * @param buf where to copy the message, NULL on error (peer disconnect) 289 * @param buf where to copy the message, NULL on error (peer disconnect)
296 * @return number of bytes copied to 'buf', can be 0 (without indicating an error) 290 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
297 */ 291 */
298static size_t 292static size_t
299transmit_message_callback (void *cls, 293transmit_message_callback (void *cls, size_t buf_size, void *buf)
300 size_t buf_size,
301 void *buf)
302{ 294{
303 struct PeerPlan *pp = cls; 295 struct PeerPlan *pp = cls;
304 struct GSF_RequestPlan *rp; 296 struct GSF_RequestPlan *rp;
@@ -306,24 +298,24 @@ transmit_message_callback (void *cls,
306 298
307 pp->pth = NULL; 299 pp->pth = NULL;
308 if (NULL == buf) 300 if (NULL == buf)
309 { 301 {
310 /* failed, try again... */ 302 /* failed, try again... */
311 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 303 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
312 return 0; 304 return 0;
313 } 305 }
314 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); 306 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
315 if (NULL == rp) 307 if (NULL == rp)
316 { 308 {
317 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 309 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
318 return 0; 310 return 0;
319 } 311 }
320 msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf); 312 msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
321 if (msize > buf_size) 313 if (msize > buf_size)
322 { 314 {
323 /* buffer to small (message changed), try again */ 315 /* buffer to small (message changed), try again */
324 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 316 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
325 return 0; 317 return 0;
326 } 318 }
327 /* remove from root, add again elsewhere... */ 319 /* remove from root, add again elsewhere... */
328 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); 320 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
329 rp->hn = NULL; 321 rp->hn = NULL;
@@ -332,15 +324,14 @@ transmit_message_callback (void *cls,
332 total_delay++; 324 total_delay++;
333#if DEBUG_FS 325#if DEBUG_FS
334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335 "Executing plan %p executed %u times, planning retransmission\n", 327 "Executing plan %p executed %u times, planning retransmission\n",
336 rp, 328 rp, rp->transmission_counter);
337 rp->transmission_counter); 329#endif
338#endif
339 plan (pp, rp); 330 plan (pp, rp);
340 GNUNET_STATISTICS_update (GSF_stats, 331 GNUNET_STATISTICS_update (GSF_stats,
341 gettext_noop ("# queries messages sent to other peers"), 332 gettext_noop
342 1, 333 ("# queries messages sent to other peers"), 1,
343 GNUNET_NO); 334 GNUNET_NO);
344 return msize; 335 return msize;
345} 336}
346 337
@@ -353,7 +344,7 @@ transmit_message_callback (void *cls,
353 */ 344 */
354static void 345static void
355schedule_peer_transmission (void *cls, 346schedule_peer_transmission (void *cls,
356 const struct GNUNET_SCHEDULER_TaskContext *tc) 347 const struct GNUNET_SCHEDULER_TaskContext *tc)
357{ 348{
358 struct PeerPlan *pp = cls; 349 struct PeerPlan *pp = cls;
359 struct GSF_RequestPlan *rp; 350 struct GSF_RequestPlan *rp;
@@ -361,59 +352,55 @@ schedule_peer_transmission (void *cls,
361 352
362 pp->task = GNUNET_SCHEDULER_NO_TASK; 353 pp->task = GNUNET_SCHEDULER_NO_TASK;
363 if (pp->pth != NULL) 354 if (pp->pth != NULL)
364 { 355 {
365 GSF_peer_transmit_cancel_ (pp->pth); 356 GSF_peer_transmit_cancel_ (pp->pth);
366 pp->pth = NULL; 357 pp->pth = NULL;
367 } 358 }
368 /* move ready requests to priority queue */ 359 /* move ready requests to priority queue */
369 while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && 360 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
370 (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) 361 (GNUNET_TIME_absolute_get_remaining
371 { 362 (rp->earliest_transmission).rel_value == 0))
372 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)); 363 {
373 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, 364 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
374 rp, 365 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
375 rp->priority); 366 }
376 }
377 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) 367 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
368 {
369 /* priority heap (still) empty, check for delay... */
370 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
371 if (NULL == rp)
378 { 372 {
379 /* priority heap (still) empty, check for delay... */
380 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
381 if (NULL == rp)
382 {
383#if DEBUG_FS
384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385 "No active requests for plan %p.\n",
386 pp);
387#endif
388 return; /* both queues empty */
389 }
390#if DEBUG_FS 373#if DEBUG_FS
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Sleeping for %llu ms before retrying requests on plan %p.\n", 375 "No active requests for plan %p.\n", pp);
393 (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
394 pp);
395#endif 376#endif
396 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), 377 return; /* both queues empty */
397 &schedule_peer_transmission,
398 pp);
399 return;
400 } 378 }
379#if DEBUG_FS
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Sleeping for %llu ms before retrying requests on plan %p.\n",
382 (unsigned long long)
383 GNUNET_TIME_absolute_get_remaining
384 (rp->earliest_transmission).rel_value, pp);
385#endif
386 pp->task =
387 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
388 (rp->earliest_transmission),
389 &schedule_peer_transmission, pp);
390 return;
391 }
401 /* process from priority heap */ 392 /* process from priority heap */
402 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); 393 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
403#if DEBUG_FS > 1 394#if DEBUG_FS > 1
404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
405 "Executing query plan %p\n", 396#endif
406 rp);
407#endif
408 GNUNET_assert (NULL != rp); 397 GNUNET_assert (NULL != rp);
409 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); 398 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
410 pp->pth = GSF_peer_transmit_ (pp->cp, 399 pp->pth = GSF_peer_transmit_ (pp->cp,
411 GNUNET_YES, 400 GNUNET_YES,
412 rp->priority, 401 rp->priority,
413 GNUNET_TIME_UNIT_FOREVER_REL, 402 GNUNET_TIME_UNIT_FOREVER_REL,
414 msize, 403 msize, &transmit_message_callback, pp);
415 &transmit_message_callback,
416 pp);
417 GNUNET_assert (NULL != pp->pth); 404 GNUNET_assert (NULL != pp->pth);
418} 405}
419 406
@@ -444,9 +431,8 @@ struct MergeContext
444 */ 431 */
445static int 432static int
446merge_pr (void *cls, 433merge_pr (void *cls,
447 struct GNUNET_CONTAINER_HeapNode *node, 434 struct GNUNET_CONTAINER_HeapNode *node,
448 void *element, 435 void *element, GNUNET_CONTAINER_HeapCostType cost)
449 GNUNET_CONTAINER_HeapCostType cost)
450{ 436{
451 struct MergeContext *mpr = cls; 437 struct MergeContext *mpr = cls;
452 struct GSF_RequestPlan *rp = element; 438 struct GSF_RequestPlan *rp = element;
@@ -456,37 +442,30 @@ merge_pr (void *cls,
456 struct GSF_PendingRequest *latest; 442 struct GSF_PendingRequest *latest;
457 443
458 if (GNUNET_OK != 444 if (GNUNET_OK !=
459 GSF_pending_request_is_compatible_ (mpr->pr, 445 GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
460 rp->prl_head->pr))
461 return GNUNET_YES; 446 return GNUNET_YES;
462 /* merge new request with existing request plan */ 447 /* merge new request with existing request plan */
463 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference)); 448 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
464 prl = GNUNET_malloc (sizeof (struct PendingRequestList)); 449 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
465 rpr->rp = rp; 450 rpr->rp = rp;
466 rpr->prl = prl; 451 rpr->prl = prl;
467 prl->rpr = rpr; 452 prl->rpr = rpr;
468 prl->pr = mpr->pr; 453 prl->pr = mpr->pr;
469 prd = GSF_pending_request_get_data_ (mpr->pr); 454 prd = GSF_pending_request_get_data_ (mpr->pr);
470 GNUNET_CONTAINER_DLL_insert (prd->rpr_head, 455 GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
471 prd->rpr_tail, 456 GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
472 rpr);
473 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
474 rp->prl_tail,
475 prl);
476 mpr->merged = GNUNET_YES; 457 mpr->merged = GNUNET_YES;
477 GNUNET_STATISTICS_update (GSF_stats, 458 GNUNET_STATISTICS_update (GSF_stats,
478 gettext_noop ("# requests merged"), 459 gettext_noop ("# requests merged"), 1, GNUNET_NO);
479 1,
480 GNUNET_NO);
481 latest = get_latest (rp); 460 latest = get_latest (rp);
482 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value) 461 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
483 { 462 prd->ttl.abs_value)
484 GNUNET_STATISTICS_update (GSF_stats, 463 {
485 gettext_noop ("# requests refreshed"), 464 GNUNET_STATISTICS_update (GSF_stats,
486 1, 465 gettext_noop ("# requests refreshed"),
487 GNUNET_NO); 466 1, GNUNET_NO);
488 rp->transmission_counter = 0; /* reset */ 467 rp->transmission_counter = 0; /* reset */
489 } 468 }
490 return GNUNET_NO; 469 return GNUNET_NO;
491} 470}
492 471
@@ -498,8 +477,7 @@ merge_pr (void *cls,
498 * @param pr request with the entry 477 * @param pr request with the entry
499 */ 478 */
500void 479void
501GSF_plan_add_ (struct GSF_ConnectedPeer *cp, 480GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
502 struct GSF_PendingRequest *pr)
503{ 481{
504 struct GNUNET_PeerIdentity id; 482 struct GNUNET_PeerIdentity id;
505 struct PeerPlan *pp; 483 struct PeerPlan *pp;
@@ -511,19 +489,20 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
511 489
512 GNUNET_assert (NULL != cp); 490 GNUNET_assert (NULL != cp);
513 GSF_connected_peer_get_identity_ (cp, &id); 491 GSF_connected_peer_get_identity_ (cp, &id);
514 pp = GNUNET_CONTAINER_multihashmap_get (plans, 492 pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
515 &id.hashPubKey);
516 if (NULL == pp) 493 if (NULL == pp)
517 { 494 {
518 pp = GNUNET_malloc (sizeof (struct PeerPlan)); 495 pp = GNUNET_malloc (sizeof (struct PeerPlan));
519 pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX); 496 pp->priority_heap =
520 pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 497 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
521 pp->cp = cp; 498 pp->delay_heap =
522 GNUNET_CONTAINER_multihashmap_put (plans, 499 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
523 &id.hashPubKey, 500 pp->cp = cp;
524 pp, 501 GNUNET_CONTAINER_multihashmap_put (plans,
525 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 502 &id.hashPubKey,
526 } 503 pp,
504 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
505 }
527 mpc.merged = GNUNET_NO; 506 mpc.merged = GNUNET_NO;
528 mpc.pr = pr; 507 mpc.pr = pr;
529 GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc); 508 GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
@@ -534,29 +513,23 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
534 return; 513 return;
535 plan_count++; 514 plan_count++;
536 GNUNET_STATISTICS_update (GSF_stats, 515 GNUNET_STATISTICS_update (GSF_stats,
537 gettext_noop ("# query plan entries"), 516 gettext_noop ("# query plan entries"),
538 1, 517 1, GNUNET_NO);
539 GNUNET_NO);
540 prd = GSF_pending_request_get_data_ (pr); 518 prd = GSF_pending_request_get_data_ (pr);
541#if DEBUG_FS 519#if DEBUG_FS
542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Planning transmission of query `%s' to peer `%s'\n", 521 "Planning transmission of query `%s' to peer `%s'\n",
544 GNUNET_h2s (&prd->query), 522 GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
545 GNUNET_i2s (&id)); 523#endif
546#endif
547 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); 524 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
548 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference)); 525 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
549 prl = GNUNET_malloc (sizeof (struct PendingRequestList)); 526 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
550 rpr->rp = rp; 527 rpr->rp = rp;
551 rpr->prl = prl; 528 rpr->prl = prl;
552 prl->rpr = rpr; 529 prl->rpr = rpr;
553 prl->pr = pr; 530 prl->pr = pr;
554 GNUNET_CONTAINER_DLL_insert (prd->rpr_head, 531 GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
555 prd->rpr_tail, 532 GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
556 rpr);
557 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
558 rp->prl_tail,
559 prl);
560 plan (pp, rp); 533 plan (pp, rp);
561} 534}
562 535
@@ -577,58 +550,47 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
577 struct PendingRequestList *prl; 550 struct PendingRequestList *prl;
578 551
579 GSF_connected_peer_get_identity_ (cp, &id); 552 GSF_connected_peer_get_identity_ (cp, &id);
580 pp = GNUNET_CONTAINER_multihashmap_get (plans, 553 pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
581 &id.hashPubKey);
582 if (NULL == pp) 554 if (NULL == pp)
583 return; /* nothing was ever planned for this peer */ 555 return; /* nothing was ever planned for this peer */
584 GNUNET_assert (GNUNET_YES == 556 GNUNET_assert (GNUNET_YES ==
585 GNUNET_CONTAINER_multihashmap_remove (plans, 557 GNUNET_CONTAINER_multihashmap_remove (plans,
586 &id.hashPubKey, 558 &id.hashPubKey, pp));
587 pp));
588 if (NULL != pp->pth) 559 if (NULL != pp->pth)
589 GSF_peer_transmit_cancel_ (pp->pth); 560 GSF_peer_transmit_cancel_ (pp->pth);
590 if (GNUNET_SCHEDULER_NO_TASK != pp->task) 561 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
591 { 562 {
592 GNUNET_SCHEDULER_cancel (pp->task); 563 GNUNET_SCHEDULER_cancel (pp->task);
593 pp->task = GNUNET_SCHEDULER_NO_TASK; 564 pp->task = GNUNET_SCHEDULER_NO_TASK;
594 } 565 }
595 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 566 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
567 {
568 while (NULL != (prl = rp->prl_head))
596 { 569 {
597 while (NULL != (prl = rp->prl_head)) 570 GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
598 { 571 prd = GSF_pending_request_get_data_ (prl->pr);
599 GNUNET_CONTAINER_DLL_remove (rp->prl_head, 572 GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
600 rp->prl_tail, 573 GNUNET_free (prl->rpr);
601 prl); 574 GNUNET_free (prl);
602 prd = GSF_pending_request_get_data_ (prl->pr);
603 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
604 prd->rpr_tail,
605 prl->rpr);
606 GNUNET_free (prl->rpr);
607 GNUNET_free (prl);
608 }
609 GNUNET_free (rp);
610 } 575 }
576 GNUNET_free (rp);
577 }
611 GNUNET_CONTAINER_heap_destroy (pp->priority_heap); 578 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
612 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) 579 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
580 {
581 while (NULL != (prl = rp->prl_head))
613 { 582 {
614 while (NULL != (prl = rp->prl_head)) 583 GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
615 { 584 prd = GSF_pending_request_get_data_ (prl->pr);
616 GNUNET_CONTAINER_DLL_remove (rp->prl_head, 585 GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
617 rp->prl_tail, 586 GNUNET_free (prl->rpr);
618 prl); 587 GNUNET_free (prl);
619 prd = GSF_pending_request_get_data_ (prl->pr);
620 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
621 prd->rpr_tail,
622 prl->rpr);
623 GNUNET_free (prl->rpr);
624 GNUNET_free (prl);
625 }
626 GNUNET_free (rp);
627 } 588 }
589 GNUNET_free (rp);
590 }
628 GNUNET_STATISTICS_set (GSF_stats, 591 GNUNET_STATISTICS_set (GSF_stats,
629 gettext_noop ("# query plan entries"), 592 gettext_noop ("# query plan entries"),
630 plan_count, 593 plan_count, GNUNET_NO);
631 GNUNET_NO);
632 594
633 GNUNET_CONTAINER_heap_destroy (pp->delay_heap); 595 GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
634 GNUNET_free (pp); 596 GNUNET_free (pp);
@@ -650,27 +612,22 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
650 612
651 prd = GSF_pending_request_get_data_ (pr); 613 prd = GSF_pending_request_get_data_ (pr);
652 while (NULL != (rpr = prd->rpr_head)) 614 while (NULL != (rpr = prd->rpr_head))
615 {
616 GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
617 rp = rpr->rp;
618 GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
619 GNUNET_free (rpr->prl);
620 GNUNET_free (rpr);
621 if (rp->prl_head == 0)
653 { 622 {
654 GNUNET_CONTAINER_DLL_remove (prd->rpr_head, 623 GNUNET_CONTAINER_heap_remove_node (rp->hn);
655 prd->rpr_tail, 624 plan_count--;
656 rpr); 625 GNUNET_free (rp);
657 rp = rpr->rp;
658 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
659 rp->prl_tail,
660 rpr->prl);
661 GNUNET_free (rpr->prl);
662 GNUNET_free (rpr);
663 if (rp->prl_head == 0)
664 {
665 GNUNET_CONTAINER_heap_remove_node (rp->hn);
666 plan_count--;
667 GNUNET_free (rp);
668 }
669 } 626 }
627 }
670 GNUNET_STATISTICS_set (GSF_stats, 628 GNUNET_STATISTICS_set (GSF_stats,
671 gettext_noop ("# query plan entries"), 629 gettext_noop ("# query plan entries"),
672 plan_count, 630 plan_count, GNUNET_NO);
673 GNUNET_NO);
674} 631}
675 632
676 633
@@ -690,8 +647,7 @@ GSF_plan_init ()
690void 647void
691GSF_plan_done () 648GSF_plan_done ()
692{ 649{
693 GNUNET_assert (0 == 650 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
694 GNUNET_CONTAINER_multihashmap_size (plans));
695 GNUNET_CONTAINER_multihashmap_destroy (plans); 651 GNUNET_CONTAINER_multihashmap_destroy (plans);
696} 652}
697 653