aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c622
1 files changed, 316 insertions, 306 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 3c1112333..7933b2dee 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -63,7 +63,8 @@ struct PeerPlan;
63 * to be able to lookup all plan entries corresponding 63 * to be able to lookup all plan entries corresponding
64 * to a given pending request.) 64 * to a given pending request.)
65 */ 65 */
66struct GSF_PendingRequestPlanBijection { 66struct GSF_PendingRequestPlanBijection
67{
67 /** 68 /**
68 * This is a doubly-linked list. 69 * This is a doubly-linked list.
69 */ 70 */
@@ -104,7 +105,8 @@ struct GSF_PendingRequestPlanBijection {
104 * with one entry in each heap of each `struct PeerPlan`. Each 105 * with one entry in each heap of each `struct PeerPlan`. Each
105 * entry tracks information relevant for this request and this peer. 106 * entry tracks information relevant for this request and this peer.
106 */ 107 */
107struct GSF_RequestPlan { 108struct GSF_RequestPlan
109{
108 /** 110 /**
109 * This is a doubly-linked list. 111 * This is a doubly-linked list.
110 */ 112 */
@@ -162,7 +164,8 @@ struct GSF_RequestPlan {
162/** 164/**
163 * Transmission plan for a peer. 165 * Transmission plan for a peer.
164 */ 166 */
165struct PeerPlan { 167struct PeerPlan
168{
166 /** 169 /**
167 * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority. 170 * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority.
168 */ 171 */
@@ -227,9 +230,9 @@ static unsigned long long plan_count;
227 * @return the associated query 230 * @return the associated query
228 */ 231 */
229static const struct GNUNET_HashCode * 232static const struct GNUNET_HashCode *
230get_rp_key(struct GSF_RequestPlan *rp) 233get_rp_key (struct GSF_RequestPlan *rp)
231{ 234{
232 return &GSF_pending_request_get_data_(rp->pe_head->pr)->query; 235 return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query;
233} 236}
234 237
235 238
@@ -240,10 +243,10 @@ get_rp_key(struct GSF_RequestPlan *rp)
240 * @param rp request to plan 243 * @param rp request to plan
241 */ 244 */
242static void 245static void
243plan(struct PeerPlan *pp, 246plan (struct PeerPlan *pp,
244 struct GSF_RequestPlan *rp) 247 struct GSF_RequestPlan *rp)
245{ 248{
246#define N ((double)128.0) 249#define N ((double) 128.0)
247 /** 250 /**
248 * Running average delay we currently impose. 251 * Running average delay we currently impose.
249 */ 252 */
@@ -252,28 +255,28 @@ plan(struct PeerPlan *pp,
252 struct GSF_PendingRequestData *prd; 255 struct GSF_PendingRequestData *prd;
253 struct GNUNET_TIME_Relative delay; 256 struct GNUNET_TIME_Relative delay;
254 257
255 GNUNET_assert(rp->pp == pp); 258 GNUNET_assert (rp->pp == pp);
256 GNUNET_STATISTICS_set(GSF_stats, 259 GNUNET_STATISTICS_set (GSF_stats,
257 gettext_noop("# average retransmission delay (ms)"), 260 gettext_noop ("# average retransmission delay (ms)"),
258 total_delay * 1000LL / plan_count, GNUNET_NO); 261 total_delay * 1000LL / plan_count, GNUNET_NO);
259 prd = GSF_pending_request_get_data_(rp->pe_head->pr); 262 prd = GSF_pending_request_get_data_ (rp->pe_head->pr);
260 263
261 if (rp->transmission_counter < 8) 264 if (rp->transmission_counter < 8)
262 delay = 265 delay =
263 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 266 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
264 rp->transmission_counter); 267 rp->transmission_counter);
265 else if (rp->transmission_counter < 32) 268 else if (rp->transmission_counter < 32)
266 delay = 269 delay =
267 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 270 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
268 8 + 271 8
269 (1LL << (rp->transmission_counter - 8))); 272 + (1LL << (rp->transmission_counter - 8)));
270 else 273 else
271 delay = 274 delay =
272 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 275 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
273 8 + (1LL << 24)); 276 8 + (1LL << 24));
274 delay.rel_value_us = 277 delay.rel_value_us =
275 GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 278 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
276 delay.rel_value_us + 1); 279 delay.rel_value_us + 1);
277 /* Add 0.01 to avg_delay to avoid division-by-zero later */ 280 /* Add 0.01 to avg_delay to avoid division-by-zero later */
278 avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01; 281 avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
279 282
@@ -299,36 +302,37 @@ plan(struct PeerPlan *pp,
299 * Note: M_PI_4 = PI/4 = arctan(1) 302 * Note: M_PI_4 = PI/4 = arctan(1)
300 */ 303 */
301 rp->priority = 304 rp->priority =
302 round((GSF_current_priorities + 305 round ((GSF_current_priorities
303 1.0) * atan(delay.rel_value_us / avg_delay)) / M_PI_4; 306 + 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4;
304 /* Note: usage of 'round' and 'atan' requires -lm */ 307 /* Note: usage of 'round' and 'atan' requires -lm */
305 308
306 if (rp->transmission_counter != 0) 309 if (rp->transmission_counter != 0)
307 delay.rel_value_us += TTL_DECREMENT * 1000; 310 delay.rel_value_us += TTL_DECREMENT * 1000;
308 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
309 "Considering (re)transmission number %u in %s\n", 312 "Considering (re)transmission number %u in %s\n",
310 (unsigned int)rp->transmission_counter, 313 (unsigned int) rp->transmission_counter,
311 GNUNET_STRINGS_relative_time_to_string(delay, 314 GNUNET_STRINGS_relative_time_to_string (delay,
312 GNUNET_YES)); 315 GNUNET_YES));
313 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute(delay); 316 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
314 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 317 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315 "Earliest (re)transmission for `%s' in %us\n", 318 "Earliest (re)transmission for `%s' in %us\n",
316 GNUNET_h2s(&prd->query), 319 GNUNET_h2s (&prd->query),
317 rp->transmission_counter); 320 rp->transmission_counter);
318 GNUNET_assert(rp->hn == NULL); 321 GNUNET_assert (rp->hn == NULL);
319 if (0 == GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission).rel_value_us) 322 if (0 == GNUNET_TIME_absolute_get_remaining (
320 rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap, 323 rp->earliest_transmission).rel_value_us)
321 rp, 324 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
322 rp->priority); 325 rp,
326 rp->priority);
323 else 327 else
324 rp->hn = 328 rp->hn =
325 GNUNET_CONTAINER_heap_insert(pp->delay_heap, 329 GNUNET_CONTAINER_heap_insert (pp->delay_heap,
326 rp, 330 rp,
327 rp->earliest_transmission.abs_value_us); 331 rp->earliest_transmission.abs_value_us);
328 GNUNET_assert(GNUNET_YES == 332 GNUNET_assert (GNUNET_YES ==
329 GNUNET_CONTAINER_multihashmap_contains_value(pp->plan_map, 333 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
330 get_rp_key(rp), 334 get_rp_key (rp),
331 rp)); 335 rp));
332#undef N 336#undef N
333} 337}
334 338
@@ -340,7 +344,7 @@ plan(struct PeerPlan *pp,
340 * @return pending request with highest TTL 344 * @return pending request with highest TTL
341 */ 345 */
342struct GSF_PendingRequest * 346struct GSF_PendingRequest *
343get_latest(const struct GSF_RequestPlan *rp) 347get_latest (const struct GSF_RequestPlan *rp)
344{ 348{
345 struct GSF_PendingRequest *ret; 349 struct GSF_PendingRequest *ret;
346 struct GSF_PendingRequestPlanBijection *bi; 350 struct GSF_PendingRequestPlanBijection *bi;
@@ -351,18 +355,18 @@ get_latest(const struct GSF_RequestPlan *rp)
351 if (NULL == bi) 355 if (NULL == bi)
352 return NULL; /* should never happen */ 356 return NULL; /* should never happen */
353 ret = bi->pr; 357 ret = bi->pr;
354 rprd = GSF_pending_request_get_data_(ret); 358 rprd = GSF_pending_request_get_data_ (ret);
355 for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE) 359 for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
360 {
361 GNUNET_break (GNUNET_YES ==
362 GSF_pending_request_test_active_ (bi->pr));
363 prd = GSF_pending_request_get_data_ (bi->pr);
364 if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
356 { 365 {
357 GNUNET_break(GNUNET_YES == 366 ret = bi->pr;
358 GSF_pending_request_test_active_(bi->pr)); 367 rprd = prd;
359 prd = GSF_pending_request_get_data_(bi->pr);
360 if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
361 {
362 ret = bi->pr;
363 rprd = prd;
364 }
365 } 368 }
369 }
366 return ret; 370 return ret;
367} 371}
368 372
@@ -373,100 +377,102 @@ get_latest(const struct GSF_RequestPlan *rp)
373 * @param cls the `struct PeerPlan` 377 * @param cls the `struct PeerPlan`
374 */ 378 */
375static void 379static void
376schedule_peer_transmission(void *cls) 380schedule_peer_transmission (void *cls)
377{ 381{
378 struct PeerPlan *pp = cls; 382 struct PeerPlan *pp = cls;
379 struct GSF_RequestPlan *rp; 383 struct GSF_RequestPlan *rp;
380 struct GNUNET_TIME_Relative delay; 384 struct GNUNET_TIME_Relative delay;
381 385
382 if (NULL != pp->task) 386 if (NULL != pp->task)
383 { 387 {
384 pp->task = NULL; 388 pp->task = NULL;
385 } 389 }
386 else 390 else
387 { 391 {
388 GNUNET_assert(NULL != pp->env); 392 GNUNET_assert (NULL != pp->env);
389 pp->env = NULL; 393 pp->env = NULL;
390 } 394 }
391 /* move ready requests to priority queue */ 395 /* move ready requests to priority queue */
392 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap))) && 396 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
393 (0 == GNUNET_TIME_absolute_get_remaining 397 (0 == GNUNET_TIME_absolute_get_remaining
394 (rp->earliest_transmission).rel_value_us)) 398 (rp->earliest_transmission).rel_value_us))
399 {
400 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
401 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
402 rp,
403 rp->priority);
404 }
405 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
406 {
407 /* priority heap (still) empty, check for delay... */
408 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
409 if (NULL == rp)
395 { 410 {
396 GNUNET_assert(rp == GNUNET_CONTAINER_heap_remove_root(pp->delay_heap)); 411 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
397 rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap, 412 "No active requests for plan %p.\n",
398 rp, 413 pp);
399 rp->priority); 414 return; /* both queues empty */
400 } 415 }
401 if (0 == GNUNET_CONTAINER_heap_get_size(pp->priority_heap)) 416 delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
402 { 417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 /* priority heap (still) empty, check for delay... */ 418 "Sleeping for %s before retrying requests on plan %p.\n",
404 rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap); 419 GNUNET_STRINGS_relative_time_to_string (delay,
405 if (NULL == rp)
406 {
407 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
408 "No active requests for plan %p.\n",
409 pp);
410 return; /* both queues empty */
411 }
412 delay = GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission);
413 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
414 "Sleeping for %s before retrying requests on plan %p.\n",
415 GNUNET_STRINGS_relative_time_to_string(delay,
416 GNUNET_YES), 420 GNUNET_YES),
417 pp); 421 pp);
418 GNUNET_STATISTICS_set(GSF_stats, 422 GNUNET_STATISTICS_set (GSF_stats,
419 gettext_noop("# delay heap timeout (ms)"), 423 gettext_noop ("# delay heap timeout (ms)"),
420 delay.rel_value_us / 1000LL, GNUNET_NO); 424 delay.rel_value_us / 1000LL, GNUNET_NO);
421 425
422 pp->task 426 pp->task
423 = GNUNET_SCHEDULER_add_at(rp->earliest_transmission, 427 = GNUNET_SCHEDULER_add_at (rp->earliest_transmission,
424 &schedule_peer_transmission, 428 &schedule_peer_transmission,
425 pp); 429 pp);
426 return; 430 return;
427 } 431 }
428#if INSANE_STATISTICS 432#if INSANE_STATISTICS
429 GNUNET_STATISTICS_update(GSF_stats, 433 GNUNET_STATISTICS_update (GSF_stats,
430 gettext_noop("# query plans executed"), 434 gettext_noop ("# query plans executed"),
431 1, 435 1,
432 GNUNET_NO); 436 GNUNET_NO);
433#endif 437#endif
434 /* process from priority heap */ 438 /* process from priority heap */
435 rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap); 439 rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
436 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437 "Executing query plan %p\n", 441 "Executing query plan %p\n",
438 rp); 442 rp);
439 GNUNET_assert(NULL != rp); 443 GNUNET_assert (NULL != rp);
440 rp->hn = NULL; 444 rp->hn = NULL;
441 rp->last_transmission = GNUNET_TIME_absolute_get(); 445 rp->last_transmission = GNUNET_TIME_absolute_get ();
442 rp->transmission_counter++; 446 rp->transmission_counter++;
443 total_delay++; 447 total_delay++;
444 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 448 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445 "Executing plan %p executed %u times, planning retransmission\n", 449 "Executing plan %p executed %u times, planning retransmission\n",
446 rp, 450 rp,
447 rp->transmission_counter); 451 rp->transmission_counter);
448 GNUNET_assert(NULL == pp->env); 452 GNUNET_assert (NULL == pp->env);
449 pp->env = GSF_pending_request_get_message_(get_latest(rp)); 453 pp->env = GSF_pending_request_get_message_ (get_latest (rp));
450 GNUNET_MQ_notify_sent(pp->env, 454 GNUNET_MQ_notify_sent (pp->env,
451 &schedule_peer_transmission, 455 &schedule_peer_transmission,
452 pp); 456 pp);
453 GSF_peer_transmit_(pp->cp, 457 GSF_peer_transmit_ (pp->cp,
454 GNUNET_YES, 458 GNUNET_YES,
455 rp->priority, 459 rp->priority,
456 pp->env); 460 pp->env);
457 GNUNET_STATISTICS_update(GSF_stats, 461 GNUNET_STATISTICS_update (GSF_stats,
458 gettext_noop("# query messages sent to other peers"), 462 gettext_noop (
459 1, 463 "# query messages sent to other peers"),
460 GNUNET_NO); 464 1,
461 plan(pp, 465 GNUNET_NO);
462 rp); 466 plan (pp,
467 rp);
463} 468}
464 469
465 470
466/** 471/**
467 * Closure for merge_pr(). 472 * Closure for merge_pr().
468 */ 473 */
469struct MergeContext { 474struct MergeContext
475{
470 /** 476 /**
471 * Request we are trying to merge. 477 * Request we are trying to merge.
472 */ 478 */
@@ -490,9 +496,9 @@ struct MergeContext {
490 * #GNUNET_NO if not (merge success) 496 * #GNUNET_NO if not (merge success)
491 */ 497 */
492static int 498static int
493merge_pr(void *cls, 499merge_pr (void *cls,
494 const struct GNUNET_HashCode *query, 500 const struct GNUNET_HashCode *query,
495 void *element) 501 void *element)
496{ 502{
497 struct MergeContext *mpr = cls; 503 struct MergeContext *mpr = cls;
498 struct GSF_RequestPlan *rp = element; 504 struct GSF_RequestPlan *rp = element;
@@ -500,44 +506,44 @@ merge_pr(void *cls,
500 struct GSF_PendingRequestPlanBijection *bi; 506 struct GSF_PendingRequestPlanBijection *bi;
501 struct GSF_PendingRequest *latest; 507 struct GSF_PendingRequest *latest;
502 508
503 GNUNET_break(GNUNET_YES == 509 GNUNET_break (GNUNET_YES ==
504 GSF_pending_request_test_active_(mpr->pr)); 510 GSF_pending_request_test_active_ (mpr->pr));
505 if (GNUNET_OK != 511 if (GNUNET_OK !=
506 GSF_pending_request_is_compatible_(mpr->pr, 512 GSF_pending_request_is_compatible_ (mpr->pr,
507 rp->pe_head->pr)) 513 rp->pe_head->pr))
508 return GNUNET_YES; 514 return GNUNET_YES;
509 /* merge new request with existing request plan */ 515 /* merge new request with existing request plan */
510 bi = GNUNET_new(struct GSF_PendingRequestPlanBijection); 516 bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
511 bi->rp = rp; 517 bi->rp = rp;
512 bi->pr = mpr->pr; 518 bi->pr = mpr->pr;
513 prd = GSF_pending_request_get_data_(mpr->pr); 519 prd = GSF_pending_request_get_data_ (mpr->pr);
514 GNUNET_CONTAINER_MDLL_insert(PR, 520 GNUNET_CONTAINER_MDLL_insert (PR,
515 prd->pr_head, 521 prd->pr_head,
516 prd->pr_tail, 522 prd->pr_tail,
517 bi); 523 bi);
518 GNUNET_CONTAINER_MDLL_insert(PE, 524 GNUNET_CONTAINER_MDLL_insert (PE,
519 rp->pe_head, 525 rp->pe_head,
520 rp->pe_tail, 526 rp->pe_tail,
521 bi); 527 bi);
522 mpr->merged = GNUNET_YES; 528 mpr->merged = GNUNET_YES;
523#if INSANE_STATISTICS 529#if INSANE_STATISTICS
524 GNUNET_STATISTICS_update(GSF_stats, 530 GNUNET_STATISTICS_update (GSF_stats,
525 gettext_noop("# requests merged"), 531 gettext_noop ("# requests merged"),
526 1, 532 1,
527 GNUNET_NO); 533 GNUNET_NO);
528#endif 534#endif
529 latest = get_latest(rp); 535 latest = get_latest (rp);
530 if (GSF_pending_request_get_data_(latest)->ttl.abs_value_us < 536 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us <
531 prd->ttl.abs_value_us) 537 prd->ttl.abs_value_us)
532 { 538 {
533#if INSANE_STATISTICS 539#if INSANE_STATISTICS
534 GNUNET_STATISTICS_update(GSF_stats, 540 GNUNET_STATISTICS_update (GSF_stats,
535 gettext_noop("# requests refreshed"), 541 gettext_noop ("# requests refreshed"),
536 1, 542 1,
537 GNUNET_NO); 543 GNUNET_NO);
538#endif 544#endif
539 rp->transmission_counter = 0; /* reset */ 545 rp->transmission_counter = 0; /* reset */
540 } 546 }
541 return GNUNET_NO; 547 return GNUNET_NO;
542} 548}
543 549
@@ -549,8 +555,8 @@ merge_pr(void *cls,
549 * @param pr request with the entry 555 * @param pr request with the entry
550 */ 556 */
551void 557void
552GSF_plan_add_(struct GSF_ConnectedPeer *cp, 558GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
553 struct GSF_PendingRequest *pr) 559 struct GSF_PendingRequest *pr)
554{ 560{
555 const struct GNUNET_PeerIdentity *id; 561 const struct GNUNET_PeerIdentity *id;
556 struct PeerPlan *pp; 562 struct PeerPlan *pp;
@@ -559,66 +565,66 @@ GSF_plan_add_(struct GSF_ConnectedPeer *cp,
559 struct GSF_PendingRequestPlanBijection *bi; 565 struct GSF_PendingRequestPlanBijection *bi;
560 struct MergeContext mpc; 566 struct MergeContext mpc;
561 567
562 GNUNET_assert(GNUNET_YES == 568 GNUNET_assert (GNUNET_YES ==
563 GSF_pending_request_test_active_(pr)); 569 GSF_pending_request_test_active_ (pr));
564 GNUNET_assert(NULL != cp); 570 GNUNET_assert (NULL != cp);
565 id = GSF_connected_peer_get_identity2_(cp); 571 id = GSF_connected_peer_get_identity2_ (cp);
566 pp = GNUNET_CONTAINER_multipeermap_get(plans, id); 572 pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
567 if (NULL == pp) 573 if (NULL == pp)
568 { 574 {
569 pp = GNUNET_new(struct PeerPlan); 575 pp = GNUNET_new (struct PeerPlan);
570 pp->plan_map = GNUNET_CONTAINER_multihashmap_create(128, GNUNET_NO); 576 pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
571 pp->priority_heap = 577 pp->priority_heap =
572 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MAX); 578 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
573 pp->delay_heap = 579 pp->delay_heap =
574 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); 580 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
575 pp->cp = cp; 581 pp->cp = cp;
576 GNUNET_assert(GNUNET_OK == 582 GNUNET_assert (GNUNET_OK ==
577 GNUNET_CONTAINER_multipeermap_put(plans, 583 GNUNET_CONTAINER_multipeermap_put (plans,
578 id, 584 id,
579 pp, 585 pp,
580 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 586 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
581 pp->task = GNUNET_SCHEDULER_add_now(&schedule_peer_transmission, 587 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
582 pp); 588 pp);
583 } 589 }
584 mpc.merged = GNUNET_NO; 590 mpc.merged = GNUNET_NO;
585 mpc.pr = pr; 591 mpc.pr = pr;
586 prd = GSF_pending_request_get_data_(pr); 592 prd = GSF_pending_request_get_data_ (pr);
587 GNUNET_CONTAINER_multihashmap_get_multiple(pp->plan_map, 593 GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
588 &prd->query, 594 &prd->query,
589 &merge_pr, 595 &merge_pr,
590 &mpc); 596 &mpc);
591 if (GNUNET_NO != mpc.merged) 597 if (GNUNET_NO != mpc.merged)
592 return; 598 return;
593 plan_count++; 599 plan_count++;
594 GNUNET_STATISTICS_update(GSF_stats, 600 GNUNET_STATISTICS_update (GSF_stats,
595 gettext_noop("# query plan entries"), 601 gettext_noop ("# query plan entries"),
596 1, 602 1,
597 GNUNET_NO); 603 GNUNET_NO);
598 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "Planning transmission of query `%s' to peer `%s'\n", 605 "Planning transmission of query `%s' to peer `%s'\n",
600 GNUNET_h2s(&prd->query), 606 GNUNET_h2s (&prd->query),
601 GNUNET_i2s(id)); 607 GNUNET_i2s (id));
602 rp = GNUNET_new(struct GSF_RequestPlan); 608 rp = GNUNET_new (struct GSF_RequestPlan);
603 bi = GNUNET_new(struct GSF_PendingRequestPlanBijection); 609 bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
604 bi->rp = rp; 610 bi->rp = rp;
605 bi->pr = pr; 611 bi->pr = pr;
606 GNUNET_CONTAINER_MDLL_insert(PR, 612 GNUNET_CONTAINER_MDLL_insert (PR,
607 prd->pr_head, 613 prd->pr_head,
608 prd->pr_tail, 614 prd->pr_tail,
609 bi); 615 bi);
610 GNUNET_CONTAINER_MDLL_insert(PE, 616 GNUNET_CONTAINER_MDLL_insert (PE,
611 rp->pe_head, 617 rp->pe_head,
612 rp->pe_tail, 618 rp->pe_tail,
613 bi); 619 bi);
614 rp->pp = pp; 620 rp->pp = pp;
615 GNUNET_assert(GNUNET_YES == 621 GNUNET_assert (GNUNET_YES ==
616 GNUNET_CONTAINER_multihashmap_put(pp->plan_map, 622 GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
617 get_rp_key(rp), 623 get_rp_key (rp),
618 rp, 624 rp,
619 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 625 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
620 plan(pp, 626 plan (pp,
621 rp); 627 rp);
622} 628}
623 629
624 630
@@ -629,7 +635,7 @@ GSF_plan_add_(struct GSF_ConnectedPeer *cp,
629 * @param cp connected peer 635 * @param cp connected peer
630 */ 636 */
631void 637void
632GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp) 638GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
633{ 639{
634 const struct GNUNET_PeerIdentity *id; 640 const struct GNUNET_PeerIdentity *id;
635 struct PeerPlan *pp; 641 struct PeerPlan *pp;
@@ -637,70 +643,70 @@ GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp)
637 struct GSF_PendingRequestData *prd; 643 struct GSF_PendingRequestData *prd;
638 struct GSF_PendingRequestPlanBijection *bi; 644 struct GSF_PendingRequestPlanBijection *bi;
639 645
640 id = GSF_connected_peer_get_identity2_(cp); 646 id = GSF_connected_peer_get_identity2_ (cp);
641 pp = GNUNET_CONTAINER_multipeermap_get(plans, id); 647 pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
642 if (NULL == pp) 648 if (NULL == pp)
643 return; /* nothing was ever planned for this peer */ 649 return; /* nothing was ever planned for this peer */
644 GNUNET_assert(GNUNET_YES == 650 GNUNET_assert (GNUNET_YES ==
645 GNUNET_CONTAINER_multipeermap_remove(plans, id, 651 GNUNET_CONTAINER_multipeermap_remove (plans, id,
646 pp)); 652 pp));
647 if (NULL != pp->task) 653 if (NULL != pp->task)
654 {
655 GNUNET_SCHEDULER_cancel (pp->task);
656 pp->task = NULL;
657 }
658 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
659 {
660 GNUNET_break (GNUNET_YES ==
661 GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
662 get_rp_key (rp),
663 rp));
664 while (NULL != (bi = rp->pe_head))
648 { 665 {
649 GNUNET_SCHEDULER_cancel(pp->task); 666 GNUNET_CONTAINER_MDLL_remove (PE,
650 pp->task = NULL; 667 rp->pe_head,
668 rp->pe_tail,
669 bi);
670 prd = GSF_pending_request_get_data_ (bi->pr);
671 GNUNET_CONTAINER_MDLL_remove (PR,
672 prd->pr_head,
673 prd->pr_tail,
674 bi);
675 GNUNET_free (bi);
651 } 676 }
652 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap))) 677 plan_count--;
653 { 678 GNUNET_free (rp);
654 GNUNET_break(GNUNET_YES == 679 }
655 GNUNET_CONTAINER_multihashmap_remove(pp->plan_map, 680 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
656 get_rp_key(rp), 681 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
682 {
683 GNUNET_break (GNUNET_YES ==
684 GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
685 get_rp_key (rp),
657 rp)); 686 rp));
658 while (NULL != (bi = rp->pe_head)) 687 while (NULL != (bi = rp->pe_head))
659 {
660 GNUNET_CONTAINER_MDLL_remove(PE,
661 rp->pe_head,
662 rp->pe_tail,
663 bi);
664 prd = GSF_pending_request_get_data_(bi->pr);
665 GNUNET_CONTAINER_MDLL_remove(PR,
666 prd->pr_head,
667 prd->pr_tail,
668 bi);
669 GNUNET_free(bi);
670 }
671 plan_count--;
672 GNUNET_free(rp);
673 }
674 GNUNET_CONTAINER_heap_destroy(pp->priority_heap);
675 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->delay_heap)))
676 { 688 {
677 GNUNET_break(GNUNET_YES == 689 prd = GSF_pending_request_get_data_ (bi->pr);
678 GNUNET_CONTAINER_multihashmap_remove(pp->plan_map, 690 GNUNET_CONTAINER_MDLL_remove (PE,
679 get_rp_key(rp), 691 rp->pe_head,
680 rp)); 692 rp->pe_tail,
681 while (NULL != (bi = rp->pe_head)) 693 bi);
682 { 694 GNUNET_CONTAINER_MDLL_remove (PR,
683 prd = GSF_pending_request_get_data_(bi->pr); 695 prd->pr_head,
684 GNUNET_CONTAINER_MDLL_remove(PE, 696 prd->pr_tail,
685 rp->pe_head, 697 bi);
686 rp->pe_tail, 698 GNUNET_free (bi);
687 bi);
688 GNUNET_CONTAINER_MDLL_remove(PR,
689 prd->pr_head,
690 prd->pr_tail,
691 bi);
692 GNUNET_free(bi);
693 }
694 plan_count--;
695 GNUNET_free(rp);
696 } 699 }
697 GNUNET_STATISTICS_set(GSF_stats, 700 plan_count--;
698 gettext_noop("# query plan entries"), 701 GNUNET_free (rp);
699 plan_count, 702 }
700 GNUNET_NO); 703 GNUNET_STATISTICS_set (GSF_stats,
701 GNUNET_CONTAINER_heap_destroy(pp->delay_heap); 704 gettext_noop ("# query plan entries"),
702 GNUNET_CONTAINER_multihashmap_destroy(pp->plan_map); 705 plan_count,
703 GNUNET_free(pp); 706 GNUNET_NO);
707 GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
708 GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
709 GNUNET_free (pp);
704} 710}
705 711
706 712
@@ -714,23 +720,27 @@ GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp)
714 * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise. 720 * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
715 */ 721 */
716int 722int
717GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head, 723GSF_request_plan_reference_get_last_transmission_ (struct
718 struct GSF_ConnectedPeer *sender, 724 GSF_PendingRequestPlanBijection
719 struct GNUNET_TIME_Absolute *result) 725 *pr_head,
726 struct GSF_ConnectedPeer *
727 sender,
728 struct GNUNET_TIME_Absolute *
729 result)
720{ 730{
721 struct GSF_PendingRequestPlanBijection *bi; 731 struct GSF_PendingRequestPlanBijection *bi;
722 732
723 for (bi = pr_head; NULL != bi; bi = bi->next_PR) 733 for (bi = pr_head; NULL != bi; bi = bi->next_PR)
734 {
735 if (bi->rp->pp->cp == sender)
724 { 736 {
725 if (bi->rp->pp->cp == sender) 737 if (0 == bi->rp->last_transmission.abs_value_us)
726 { 738 *result = GNUNET_TIME_UNIT_FOREVER_ABS;
727 if (0 == bi->rp->last_transmission.abs_value_us) 739 else
728 *result = GNUNET_TIME_UNIT_FOREVER_ABS; 740 *result = bi->rp->last_transmission;
729 else 741 return GNUNET_YES;
730 *result = bi->rp->last_transmission;
731 return GNUNET_YES;
732 }
733 } 742 }
743 }
734 return GNUNET_NO; 744 return GNUNET_NO;
735} 745}
736 746
@@ -742,41 +752,41 @@ GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanB
742 * @param pr request that is done 752 * @param pr request that is done
743 */ 753 */
744void 754void
745GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr) 755GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
746{ 756{
747 struct GSF_RequestPlan *rp; 757 struct GSF_RequestPlan *rp;
748 struct GSF_PendingRequestData *prd; 758 struct GSF_PendingRequestData *prd;
749 struct GSF_PendingRequestPlanBijection *bi; 759 struct GSF_PendingRequestPlanBijection *bi;
750 760
751 prd = GSF_pending_request_get_data_(pr); 761 prd = GSF_pending_request_get_data_ (pr);
752 while (NULL != (bi = prd->pr_head)) 762 while (NULL != (bi = prd->pr_head))
763 {
764 rp = bi->rp;
765 GNUNET_CONTAINER_MDLL_remove (PR,
766 prd->pr_head,
767 prd->pr_tail,
768 bi);
769 GNUNET_CONTAINER_MDLL_remove (PE,
770 rp->pe_head,
771 rp->pe_tail,
772 bi);
773 GNUNET_assert (bi->pr == pr);
774 if (NULL == rp->pe_head)
753 { 775 {
754 rp = bi->rp; 776 GNUNET_CONTAINER_heap_remove_node (rp->hn);
755 GNUNET_CONTAINER_MDLL_remove(PR, 777 plan_count--;
756 prd->pr_head, 778 GNUNET_break (GNUNET_YES ==
757 prd->pr_tail, 779 GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
758 bi); 780 &prd->query,
759 GNUNET_CONTAINER_MDLL_remove(PE, 781 rp));
760 rp->pe_head, 782 GNUNET_free (rp);
761 rp->pe_tail,
762 bi);
763 GNUNET_assert(bi->pr == pr);
764 if (NULL == rp->pe_head)
765 {
766 GNUNET_CONTAINER_heap_remove_node(rp->hn);
767 plan_count--;
768 GNUNET_break(GNUNET_YES ==
769 GNUNET_CONTAINER_multihashmap_remove(rp->pp->plan_map,
770 &prd->query,
771 rp));
772 GNUNET_free(rp);
773 }
774 GNUNET_free(bi);
775 } 783 }
776 GNUNET_STATISTICS_set(GSF_stats, 784 GNUNET_free (bi);
777 gettext_noop("# query plan entries"), 785 }
778 plan_count, 786 GNUNET_STATISTICS_set (GSF_stats,
779 GNUNET_NO); 787 gettext_noop ("# query plan entries"),
788 plan_count,
789 GNUNET_NO);
780} 790}
781 791
782 792
@@ -784,10 +794,10 @@ GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr)
784 * Initialize plan subsystem. 794 * Initialize plan subsystem.
785 */ 795 */
786void 796void
787GSF_plan_init() 797GSF_plan_init ()
788{ 798{
789 plans = GNUNET_CONTAINER_multipeermap_create(256, 799 plans = GNUNET_CONTAINER_multipeermap_create (256,
790 GNUNET_YES); 800 GNUNET_YES);
791} 801}
792 802
793 803
@@ -795,10 +805,10 @@ GSF_plan_init()
795 * Shutdown plan subsystem. 805 * Shutdown plan subsystem.
796 */ 806 */
797void 807void
798GSF_plan_done() 808GSF_plan_done ()
799{ 809{
800 GNUNET_assert(0 == GNUNET_CONTAINER_multipeermap_size(plans)); 810 GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans));
801 GNUNET_CONTAINER_multipeermap_destroy(plans); 811 GNUNET_CONTAINER_multipeermap_destroy (plans);
802} 812}
803 813
804 814