diff options
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pe.c | 622 |
1 files changed, 316 insertions, 306 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index 3c1112333..7933b2dee 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c | |||
@@ -63,7 +63,8 @@ struct PeerPlan; | |||
63 | * to be able to lookup all plan entries corresponding | 63 | * to be able to lookup all plan entries corresponding |
64 | * to a given pending request.) | 64 | * to a given pending request.) |
65 | */ | 65 | */ |
66 | struct GSF_PendingRequestPlanBijection { | 66 | struct GSF_PendingRequestPlanBijection |
67 | { | ||
67 | /** | 68 | /** |
68 | * This is a doubly-linked list. | 69 | * This is a doubly-linked list. |
69 | */ | 70 | */ |
@@ -104,7 +105,8 @@ struct GSF_PendingRequestPlanBijection { | |||
104 | * with one entry in each heap of each `struct PeerPlan`. Each | 105 | * with one entry in each heap of each `struct PeerPlan`. Each |
105 | * entry tracks information relevant for this request and this peer. | 106 | * entry tracks information relevant for this request and this peer. |
106 | */ | 107 | */ |
107 | struct GSF_RequestPlan { | 108 | struct GSF_RequestPlan |
109 | { | ||
108 | /** | 110 | /** |
109 | * This is a doubly-linked list. | 111 | * This is a doubly-linked list. |
110 | */ | 112 | */ |
@@ -162,7 +164,8 @@ struct GSF_RequestPlan { | |||
162 | /** | 164 | /** |
163 | * Transmission plan for a peer. | 165 | * Transmission plan for a peer. |
164 | */ | 166 | */ |
165 | struct PeerPlan { | 167 | struct PeerPlan |
168 | { | ||
166 | /** | 169 | /** |
167 | * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority. | 170 | * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority. |
168 | */ | 171 | */ |
@@ -227,9 +230,9 @@ static unsigned long long plan_count; | |||
227 | * @return the associated query | 230 | * @return the associated query |
228 | */ | 231 | */ |
229 | static const struct GNUNET_HashCode * | 232 | static const struct GNUNET_HashCode * |
230 | get_rp_key(struct GSF_RequestPlan *rp) | 233 | get_rp_key (struct GSF_RequestPlan *rp) |
231 | { | 234 | { |
232 | return &GSF_pending_request_get_data_(rp->pe_head->pr)->query; | 235 | return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query; |
233 | } | 236 | } |
234 | 237 | ||
235 | 238 | ||
@@ -240,10 +243,10 @@ get_rp_key(struct GSF_RequestPlan *rp) | |||
240 | * @param rp request to plan | 243 | * @param rp request to plan |
241 | */ | 244 | */ |
242 | static void | 245 | static void |
243 | plan(struct PeerPlan *pp, | 246 | plan (struct PeerPlan *pp, |
244 | struct GSF_RequestPlan *rp) | 247 | struct GSF_RequestPlan *rp) |
245 | { | 248 | { |
246 | #define N ((double)128.0) | 249 | #define N ((double) 128.0) |
247 | /** | 250 | /** |
248 | * Running average delay we currently impose. | 251 | * Running average delay we currently impose. |
249 | */ | 252 | */ |
@@ -252,28 +255,28 @@ plan(struct PeerPlan *pp, | |||
252 | struct GSF_PendingRequestData *prd; | 255 | struct GSF_PendingRequestData *prd; |
253 | struct GNUNET_TIME_Relative delay; | 256 | struct GNUNET_TIME_Relative delay; |
254 | 257 | ||
255 | GNUNET_assert(rp->pp == pp); | 258 | GNUNET_assert (rp->pp == pp); |
256 | GNUNET_STATISTICS_set(GSF_stats, | 259 | GNUNET_STATISTICS_set (GSF_stats, |
257 | gettext_noop("# average retransmission delay (ms)"), | 260 | gettext_noop ("# average retransmission delay (ms)"), |
258 | total_delay * 1000LL / plan_count, GNUNET_NO); | 261 | total_delay * 1000LL / plan_count, GNUNET_NO); |
259 | prd = GSF_pending_request_get_data_(rp->pe_head->pr); | 262 | prd = GSF_pending_request_get_data_ (rp->pe_head->pr); |
260 | 263 | ||
261 | if (rp->transmission_counter < 8) | 264 | if (rp->transmission_counter < 8) |
262 | delay = | 265 | delay = |
263 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, | 266 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
264 | rp->transmission_counter); | 267 | rp->transmission_counter); |
265 | else if (rp->transmission_counter < 32) | 268 | else if (rp->transmission_counter < 32) |
266 | delay = | 269 | delay = |
267 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, | 270 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
268 | 8 + | 271 | 8 |
269 | (1LL << (rp->transmission_counter - 8))); | 272 | + (1LL << (rp->transmission_counter - 8))); |
270 | else | 273 | else |
271 | delay = | 274 | delay = |
272 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, | 275 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
273 | 8 + (1LL << 24)); | 276 | 8 + (1LL << 24)); |
274 | delay.rel_value_us = | 277 | delay.rel_value_us = |
275 | GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, | 278 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
276 | delay.rel_value_us + 1); | 279 | delay.rel_value_us + 1); |
277 | /* Add 0.01 to avg_delay to avoid division-by-zero later */ | 280 | /* Add 0.01 to avg_delay to avoid division-by-zero later */ |
278 | avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01; | 281 | avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01; |
279 | 282 | ||
@@ -299,36 +302,37 @@ plan(struct PeerPlan *pp, | |||
299 | * Note: M_PI_4 = PI/4 = arctan(1) | 302 | * Note: M_PI_4 = PI/4 = arctan(1) |
300 | */ | 303 | */ |
301 | rp->priority = | 304 | rp->priority = |
302 | round((GSF_current_priorities + | 305 | round ((GSF_current_priorities |
303 | 1.0) * atan(delay.rel_value_us / avg_delay)) / M_PI_4; | 306 | + 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4; |
304 | /* Note: usage of 'round' and 'atan' requires -lm */ | 307 | /* Note: usage of 'round' and 'atan' requires -lm */ |
305 | 308 | ||
306 | if (rp->transmission_counter != 0) | 309 | if (rp->transmission_counter != 0) |
307 | delay.rel_value_us += TTL_DECREMENT * 1000; | 310 | delay.rel_value_us += TTL_DECREMENT * 1000; |
308 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 311 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
309 | "Considering (re)transmission number %u in %s\n", | 312 | "Considering (re)transmission number %u in %s\n", |
310 | (unsigned int)rp->transmission_counter, | 313 | (unsigned int) rp->transmission_counter, |
311 | GNUNET_STRINGS_relative_time_to_string(delay, | 314 | GNUNET_STRINGS_relative_time_to_string (delay, |
312 | GNUNET_YES)); | 315 | GNUNET_YES)); |
313 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute(delay); | 316 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); |
314 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 317 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
315 | "Earliest (re)transmission for `%s' in %us\n", | 318 | "Earliest (re)transmission for `%s' in %us\n", |
316 | GNUNET_h2s(&prd->query), | 319 | GNUNET_h2s (&prd->query), |
317 | rp->transmission_counter); | 320 | rp->transmission_counter); |
318 | GNUNET_assert(rp->hn == NULL); | 321 | GNUNET_assert (rp->hn == NULL); |
319 | if (0 == GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission).rel_value_us) | 322 | if (0 == GNUNET_TIME_absolute_get_remaining ( |
320 | rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap, | 323 | rp->earliest_transmission).rel_value_us) |
321 | rp, | 324 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, |
322 | rp->priority); | 325 | rp, |
326 | rp->priority); | ||
323 | else | 327 | else |
324 | rp->hn = | 328 | rp->hn = |
325 | GNUNET_CONTAINER_heap_insert(pp->delay_heap, | 329 | GNUNET_CONTAINER_heap_insert (pp->delay_heap, |
326 | rp, | 330 | rp, |
327 | rp->earliest_transmission.abs_value_us); | 331 | rp->earliest_transmission.abs_value_us); |
328 | GNUNET_assert(GNUNET_YES == | 332 | GNUNET_assert (GNUNET_YES == |
329 | GNUNET_CONTAINER_multihashmap_contains_value(pp->plan_map, | 333 | GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, |
330 | get_rp_key(rp), | 334 | get_rp_key (rp), |
331 | rp)); | 335 | rp)); |
332 | #undef N | 336 | #undef N |
333 | } | 337 | } |
334 | 338 | ||
@@ -340,7 +344,7 @@ plan(struct PeerPlan *pp, | |||
340 | * @return pending request with highest TTL | 344 | * @return pending request with highest TTL |
341 | */ | 345 | */ |
342 | struct GSF_PendingRequest * | 346 | struct GSF_PendingRequest * |
343 | get_latest(const struct GSF_RequestPlan *rp) | 347 | get_latest (const struct GSF_RequestPlan *rp) |
344 | { | 348 | { |
345 | struct GSF_PendingRequest *ret; | 349 | struct GSF_PendingRequest *ret; |
346 | struct GSF_PendingRequestPlanBijection *bi; | 350 | struct GSF_PendingRequestPlanBijection *bi; |
@@ -351,18 +355,18 @@ get_latest(const struct GSF_RequestPlan *rp) | |||
351 | if (NULL == bi) | 355 | if (NULL == bi) |
352 | return NULL; /* should never happen */ | 356 | return NULL; /* should never happen */ |
353 | ret = bi->pr; | 357 | ret = bi->pr; |
354 | rprd = GSF_pending_request_get_data_(ret); | 358 | rprd = GSF_pending_request_get_data_ (ret); |
355 | for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE) | 359 | for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE) |
360 | { | ||
361 | GNUNET_break (GNUNET_YES == | ||
362 | GSF_pending_request_test_active_ (bi->pr)); | ||
363 | prd = GSF_pending_request_get_data_ (bi->pr); | ||
364 | if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us) | ||
356 | { | 365 | { |
357 | GNUNET_break(GNUNET_YES == | 366 | ret = bi->pr; |
358 | GSF_pending_request_test_active_(bi->pr)); | 367 | rprd = prd; |
359 | prd = GSF_pending_request_get_data_(bi->pr); | ||
360 | if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us) | ||
361 | { | ||
362 | ret = bi->pr; | ||
363 | rprd = prd; | ||
364 | } | ||
365 | } | 368 | } |
369 | } | ||
366 | return ret; | 370 | return ret; |
367 | } | 371 | } |
368 | 372 | ||
@@ -373,100 +377,102 @@ get_latest(const struct GSF_RequestPlan *rp) | |||
373 | * @param cls the `struct PeerPlan` | 377 | * @param cls the `struct PeerPlan` |
374 | */ | 378 | */ |
375 | static void | 379 | static void |
376 | schedule_peer_transmission(void *cls) | 380 | schedule_peer_transmission (void *cls) |
377 | { | 381 | { |
378 | struct PeerPlan *pp = cls; | 382 | struct PeerPlan *pp = cls; |
379 | struct GSF_RequestPlan *rp; | 383 | struct GSF_RequestPlan *rp; |
380 | struct GNUNET_TIME_Relative delay; | 384 | struct GNUNET_TIME_Relative delay; |
381 | 385 | ||
382 | if (NULL != pp->task) | 386 | if (NULL != pp->task) |
383 | { | 387 | { |
384 | pp->task = NULL; | 388 | pp->task = NULL; |
385 | } | 389 | } |
386 | else | 390 | else |
387 | { | 391 | { |
388 | GNUNET_assert(NULL != pp->env); | 392 | GNUNET_assert (NULL != pp->env); |
389 | pp->env = NULL; | 393 | pp->env = NULL; |
390 | } | 394 | } |
391 | /* move ready requests to priority queue */ | 395 | /* move ready requests to priority queue */ |
392 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap))) && | 396 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && |
393 | (0 == GNUNET_TIME_absolute_get_remaining | 397 | (0 == GNUNET_TIME_absolute_get_remaining |
394 | (rp->earliest_transmission).rel_value_us)) | 398 | (rp->earliest_transmission).rel_value_us)) |
399 | { | ||
400 | GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)); | ||
401 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, | ||
402 | rp, | ||
403 | rp->priority); | ||
404 | } | ||
405 | if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) | ||
406 | { | ||
407 | /* priority heap (still) empty, check for delay... */ | ||
408 | rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); | ||
409 | if (NULL == rp) | ||
395 | { | 410 | { |
396 | GNUNET_assert(rp == GNUNET_CONTAINER_heap_remove_root(pp->delay_heap)); | 411 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
397 | rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap, | 412 | "No active requests for plan %p.\n", |
398 | rp, | 413 | pp); |
399 | rp->priority); | 414 | return; /* both queues empty */ |
400 | } | 415 | } |
401 | if (0 == GNUNET_CONTAINER_heap_get_size(pp->priority_heap)) | 416 | delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission); |
402 | { | 417 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
403 | /* priority heap (still) empty, check for delay... */ | 418 | "Sleeping for %s before retrying requests on plan %p.\n", |
404 | rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap); | 419 | GNUNET_STRINGS_relative_time_to_string (delay, |
405 | if (NULL == rp) | ||
406 | { | ||
407 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | ||
408 | "No active requests for plan %p.\n", | ||
409 | pp); | ||
410 | return; /* both queues empty */ | ||
411 | } | ||
412 | delay = GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission); | ||
413 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | ||
414 | "Sleeping for %s before retrying requests on plan %p.\n", | ||
415 | GNUNET_STRINGS_relative_time_to_string(delay, | ||
416 | GNUNET_YES), | 420 | GNUNET_YES), |
417 | pp); | 421 | pp); |
418 | GNUNET_STATISTICS_set(GSF_stats, | 422 | GNUNET_STATISTICS_set (GSF_stats, |
419 | gettext_noop("# delay heap timeout (ms)"), | 423 | gettext_noop ("# delay heap timeout (ms)"), |
420 | delay.rel_value_us / 1000LL, GNUNET_NO); | 424 | delay.rel_value_us / 1000LL, GNUNET_NO); |
421 | 425 | ||
422 | pp->task | 426 | pp->task |
423 | = GNUNET_SCHEDULER_add_at(rp->earliest_transmission, | 427 | = GNUNET_SCHEDULER_add_at (rp->earliest_transmission, |
424 | &schedule_peer_transmission, | 428 | &schedule_peer_transmission, |
425 | pp); | 429 | pp); |
426 | return; | 430 | return; |
427 | } | 431 | } |
428 | #if INSANE_STATISTICS | 432 | #if INSANE_STATISTICS |
429 | GNUNET_STATISTICS_update(GSF_stats, | 433 | GNUNET_STATISTICS_update (GSF_stats, |
430 | gettext_noop("# query plans executed"), | 434 | gettext_noop ("# query plans executed"), |
431 | 1, | 435 | 1, |
432 | GNUNET_NO); | 436 | GNUNET_NO); |
433 | #endif | 437 | #endif |
434 | /* process from priority heap */ | 438 | /* process from priority heap */ |
435 | rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap); | 439 | rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); |
436 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
437 | "Executing query plan %p\n", | 441 | "Executing query plan %p\n", |
438 | rp); | 442 | rp); |
439 | GNUNET_assert(NULL != rp); | 443 | GNUNET_assert (NULL != rp); |
440 | rp->hn = NULL; | 444 | rp->hn = NULL; |
441 | rp->last_transmission = GNUNET_TIME_absolute_get(); | 445 | rp->last_transmission = GNUNET_TIME_absolute_get (); |
442 | rp->transmission_counter++; | 446 | rp->transmission_counter++; |
443 | total_delay++; | 447 | total_delay++; |
444 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 448 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
445 | "Executing plan %p executed %u times, planning retransmission\n", | 449 | "Executing plan %p executed %u times, planning retransmission\n", |
446 | rp, | 450 | rp, |
447 | rp->transmission_counter); | 451 | rp->transmission_counter); |
448 | GNUNET_assert(NULL == pp->env); | 452 | GNUNET_assert (NULL == pp->env); |
449 | pp->env = GSF_pending_request_get_message_(get_latest(rp)); | 453 | pp->env = GSF_pending_request_get_message_ (get_latest (rp)); |
450 | GNUNET_MQ_notify_sent(pp->env, | 454 | GNUNET_MQ_notify_sent (pp->env, |
451 | &schedule_peer_transmission, | 455 | &schedule_peer_transmission, |
452 | pp); | 456 | pp); |
453 | GSF_peer_transmit_(pp->cp, | 457 | GSF_peer_transmit_ (pp->cp, |
454 | GNUNET_YES, | 458 | GNUNET_YES, |
455 | rp->priority, | 459 | rp->priority, |
456 | pp->env); | 460 | pp->env); |
457 | GNUNET_STATISTICS_update(GSF_stats, | 461 | GNUNET_STATISTICS_update (GSF_stats, |
458 | gettext_noop("# query messages sent to other peers"), | 462 | gettext_noop ( |
459 | 1, | 463 | "# query messages sent to other peers"), |
460 | GNUNET_NO); | 464 | 1, |
461 | plan(pp, | 465 | GNUNET_NO); |
462 | rp); | 466 | plan (pp, |
467 | rp); | ||
463 | } | 468 | } |
464 | 469 | ||
465 | 470 | ||
466 | /** | 471 | /** |
467 | * Closure for merge_pr(). | 472 | * Closure for merge_pr(). |
468 | */ | 473 | */ |
469 | struct MergeContext { | 474 | struct MergeContext |
475 | { | ||
470 | /** | 476 | /** |
471 | * Request we are trying to merge. | 477 | * Request we are trying to merge. |
472 | */ | 478 | */ |
@@ -490,9 +496,9 @@ struct MergeContext { | |||
490 | * #GNUNET_NO if not (merge success) | 496 | * #GNUNET_NO if not (merge success) |
491 | */ | 497 | */ |
492 | static int | 498 | static int |
493 | merge_pr(void *cls, | 499 | merge_pr (void *cls, |
494 | const struct GNUNET_HashCode *query, | 500 | const struct GNUNET_HashCode *query, |
495 | void *element) | 501 | void *element) |
496 | { | 502 | { |
497 | struct MergeContext *mpr = cls; | 503 | struct MergeContext *mpr = cls; |
498 | struct GSF_RequestPlan *rp = element; | 504 | struct GSF_RequestPlan *rp = element; |
@@ -500,44 +506,44 @@ merge_pr(void *cls, | |||
500 | struct GSF_PendingRequestPlanBijection *bi; | 506 | struct GSF_PendingRequestPlanBijection *bi; |
501 | struct GSF_PendingRequest *latest; | 507 | struct GSF_PendingRequest *latest; |
502 | 508 | ||
503 | GNUNET_break(GNUNET_YES == | 509 | GNUNET_break (GNUNET_YES == |
504 | GSF_pending_request_test_active_(mpr->pr)); | 510 | GSF_pending_request_test_active_ (mpr->pr)); |
505 | if (GNUNET_OK != | 511 | if (GNUNET_OK != |
506 | GSF_pending_request_is_compatible_(mpr->pr, | 512 | GSF_pending_request_is_compatible_ (mpr->pr, |
507 | rp->pe_head->pr)) | 513 | rp->pe_head->pr)) |
508 | return GNUNET_YES; | 514 | return GNUNET_YES; |
509 | /* merge new request with existing request plan */ | 515 | /* merge new request with existing request plan */ |
510 | bi = GNUNET_new(struct GSF_PendingRequestPlanBijection); | 516 | bi = GNUNET_new (struct GSF_PendingRequestPlanBijection); |
511 | bi->rp = rp; | 517 | bi->rp = rp; |
512 | bi->pr = mpr->pr; | 518 | bi->pr = mpr->pr; |
513 | prd = GSF_pending_request_get_data_(mpr->pr); | 519 | prd = GSF_pending_request_get_data_ (mpr->pr); |
514 | GNUNET_CONTAINER_MDLL_insert(PR, | 520 | GNUNET_CONTAINER_MDLL_insert (PR, |
515 | prd->pr_head, | 521 | prd->pr_head, |
516 | prd->pr_tail, | 522 | prd->pr_tail, |
517 | bi); | 523 | bi); |
518 | GNUNET_CONTAINER_MDLL_insert(PE, | 524 | GNUNET_CONTAINER_MDLL_insert (PE, |
519 | rp->pe_head, | 525 | rp->pe_head, |
520 | rp->pe_tail, | 526 | rp->pe_tail, |
521 | bi); | 527 | bi); |
522 | mpr->merged = GNUNET_YES; | 528 | mpr->merged = GNUNET_YES; |
523 | #if INSANE_STATISTICS | 529 | #if INSANE_STATISTICS |
524 | GNUNET_STATISTICS_update(GSF_stats, | 530 | GNUNET_STATISTICS_update (GSF_stats, |
525 | gettext_noop("# requests merged"), | 531 | gettext_noop ("# requests merged"), |
526 | 1, | 532 | 1, |
527 | GNUNET_NO); | 533 | GNUNET_NO); |
528 | #endif | 534 | #endif |
529 | latest = get_latest(rp); | 535 | latest = get_latest (rp); |
530 | if (GSF_pending_request_get_data_(latest)->ttl.abs_value_us < | 536 | if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us < |
531 | prd->ttl.abs_value_us) | 537 | prd->ttl.abs_value_us) |
532 | { | 538 | { |
533 | #if INSANE_STATISTICS | 539 | #if INSANE_STATISTICS |
534 | GNUNET_STATISTICS_update(GSF_stats, | 540 | GNUNET_STATISTICS_update (GSF_stats, |
535 | gettext_noop("# requests refreshed"), | 541 | gettext_noop ("# requests refreshed"), |
536 | 1, | 542 | 1, |
537 | GNUNET_NO); | 543 | GNUNET_NO); |
538 | #endif | 544 | #endif |
539 | rp->transmission_counter = 0; /* reset */ | 545 | rp->transmission_counter = 0; /* reset */ |
540 | } | 546 | } |
541 | return GNUNET_NO; | 547 | return GNUNET_NO; |
542 | } | 548 | } |
543 | 549 | ||
@@ -549,8 +555,8 @@ merge_pr(void *cls, | |||
549 | * @param pr request with the entry | 555 | * @param pr request with the entry |
550 | */ | 556 | */ |
551 | void | 557 | void |
552 | GSF_plan_add_(struct GSF_ConnectedPeer *cp, | 558 | GSF_plan_add_ (struct GSF_ConnectedPeer *cp, |
553 | struct GSF_PendingRequest *pr) | 559 | struct GSF_PendingRequest *pr) |
554 | { | 560 | { |
555 | const struct GNUNET_PeerIdentity *id; | 561 | const struct GNUNET_PeerIdentity *id; |
556 | struct PeerPlan *pp; | 562 | struct PeerPlan *pp; |
@@ -559,66 +565,66 @@ GSF_plan_add_(struct GSF_ConnectedPeer *cp, | |||
559 | struct GSF_PendingRequestPlanBijection *bi; | 565 | struct GSF_PendingRequestPlanBijection *bi; |
560 | struct MergeContext mpc; | 566 | struct MergeContext mpc; |
561 | 567 | ||
562 | GNUNET_assert(GNUNET_YES == | 568 | GNUNET_assert (GNUNET_YES == |
563 | GSF_pending_request_test_active_(pr)); | 569 | GSF_pending_request_test_active_ (pr)); |
564 | GNUNET_assert(NULL != cp); | 570 | GNUNET_assert (NULL != cp); |
565 | id = GSF_connected_peer_get_identity2_(cp); | 571 | id = GSF_connected_peer_get_identity2_ (cp); |
566 | pp = GNUNET_CONTAINER_multipeermap_get(plans, id); | 572 | pp = GNUNET_CONTAINER_multipeermap_get (plans, id); |
567 | if (NULL == pp) | 573 | if (NULL == pp) |
568 | { | 574 | { |
569 | pp = GNUNET_new(struct PeerPlan); | 575 | pp = GNUNET_new (struct PeerPlan); |
570 | pp->plan_map = GNUNET_CONTAINER_multihashmap_create(128, GNUNET_NO); | 576 | pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO); |
571 | pp->priority_heap = | 577 | pp->priority_heap = |
572 | GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MAX); | 578 | GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX); |
573 | pp->delay_heap = | 579 | pp->delay_heap = |
574 | GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); | 580 | GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); |
575 | pp->cp = cp; | 581 | pp->cp = cp; |
576 | GNUNET_assert(GNUNET_OK == | 582 | GNUNET_assert (GNUNET_OK == |
577 | GNUNET_CONTAINER_multipeermap_put(plans, | 583 | GNUNET_CONTAINER_multipeermap_put (plans, |
578 | id, | 584 | id, |
579 | pp, | 585 | pp, |
580 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 586 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
581 | pp->task = GNUNET_SCHEDULER_add_now(&schedule_peer_transmission, | 587 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, |
582 | pp); | 588 | pp); |
583 | } | 589 | } |
584 | mpc.merged = GNUNET_NO; | 590 | mpc.merged = GNUNET_NO; |
585 | mpc.pr = pr; | 591 | mpc.pr = pr; |
586 | prd = GSF_pending_request_get_data_(pr); | 592 | prd = GSF_pending_request_get_data_ (pr); |
587 | GNUNET_CONTAINER_multihashmap_get_multiple(pp->plan_map, | 593 | GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map, |
588 | &prd->query, | 594 | &prd->query, |
589 | &merge_pr, | 595 | &merge_pr, |
590 | &mpc); | 596 | &mpc); |
591 | if (GNUNET_NO != mpc.merged) | 597 | if (GNUNET_NO != mpc.merged) |
592 | return; | 598 | return; |
593 | plan_count++; | 599 | plan_count++; |
594 | GNUNET_STATISTICS_update(GSF_stats, | 600 | GNUNET_STATISTICS_update (GSF_stats, |
595 | gettext_noop("# query plan entries"), | 601 | gettext_noop ("# query plan entries"), |
596 | 1, | 602 | 1, |
597 | GNUNET_NO); | 603 | GNUNET_NO); |
598 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 604 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
599 | "Planning transmission of query `%s' to peer `%s'\n", | 605 | "Planning transmission of query `%s' to peer `%s'\n", |
600 | GNUNET_h2s(&prd->query), | 606 | GNUNET_h2s (&prd->query), |
601 | GNUNET_i2s(id)); | 607 | GNUNET_i2s (id)); |
602 | rp = GNUNET_new(struct GSF_RequestPlan); | 608 | rp = GNUNET_new (struct GSF_RequestPlan); |
603 | bi = GNUNET_new(struct GSF_PendingRequestPlanBijection); | 609 | bi = GNUNET_new (struct GSF_PendingRequestPlanBijection); |
604 | bi->rp = rp; | 610 | bi->rp = rp; |
605 | bi->pr = pr; | 611 | bi->pr = pr; |
606 | GNUNET_CONTAINER_MDLL_insert(PR, | 612 | GNUNET_CONTAINER_MDLL_insert (PR, |
607 | prd->pr_head, | 613 | prd->pr_head, |
608 | prd->pr_tail, | 614 | prd->pr_tail, |
609 | bi); | 615 | bi); |
610 | GNUNET_CONTAINER_MDLL_insert(PE, | 616 | GNUNET_CONTAINER_MDLL_insert (PE, |
611 | rp->pe_head, | 617 | rp->pe_head, |
612 | rp->pe_tail, | 618 | rp->pe_tail, |
613 | bi); | 619 | bi); |
614 | rp->pp = pp; | 620 | rp->pp = pp; |
615 | GNUNET_assert(GNUNET_YES == | 621 | GNUNET_assert (GNUNET_YES == |
616 | GNUNET_CONTAINER_multihashmap_put(pp->plan_map, | 622 | GNUNET_CONTAINER_multihashmap_put (pp->plan_map, |
617 | get_rp_key(rp), | 623 | get_rp_key (rp), |
618 | rp, | 624 | rp, |
619 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 625 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
620 | plan(pp, | 626 | plan (pp, |
621 | rp); | 627 | rp); |
622 | } | 628 | } |
623 | 629 | ||
624 | 630 | ||
@@ -629,7 +635,7 @@ GSF_plan_add_(struct GSF_ConnectedPeer *cp, | |||
629 | * @param cp connected peer | 635 | * @param cp connected peer |
630 | */ | 636 | */ |
631 | void | 637 | void |
632 | GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp) | 638 | GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) |
633 | { | 639 | { |
634 | const struct GNUNET_PeerIdentity *id; | 640 | const struct GNUNET_PeerIdentity *id; |
635 | struct PeerPlan *pp; | 641 | struct PeerPlan *pp; |
@@ -637,70 +643,70 @@ GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp) | |||
637 | struct GSF_PendingRequestData *prd; | 643 | struct GSF_PendingRequestData *prd; |
638 | struct GSF_PendingRequestPlanBijection *bi; | 644 | struct GSF_PendingRequestPlanBijection *bi; |
639 | 645 | ||
640 | id = GSF_connected_peer_get_identity2_(cp); | 646 | id = GSF_connected_peer_get_identity2_ (cp); |
641 | pp = GNUNET_CONTAINER_multipeermap_get(plans, id); | 647 | pp = GNUNET_CONTAINER_multipeermap_get (plans, id); |
642 | if (NULL == pp) | 648 | if (NULL == pp) |
643 | return; /* nothing was ever planned for this peer */ | 649 | return; /* nothing was ever planned for this peer */ |
644 | GNUNET_assert(GNUNET_YES == | 650 | GNUNET_assert (GNUNET_YES == |
645 | GNUNET_CONTAINER_multipeermap_remove(plans, id, | 651 | GNUNET_CONTAINER_multipeermap_remove (plans, id, |
646 | pp)); | 652 | pp)); |
647 | if (NULL != pp->task) | 653 | if (NULL != pp->task) |
654 | { | ||
655 | GNUNET_SCHEDULER_cancel (pp->task); | ||
656 | pp->task = NULL; | ||
657 | } | ||
658 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) | ||
659 | { | ||
660 | GNUNET_break (GNUNET_YES == | ||
661 | GNUNET_CONTAINER_multihashmap_remove (pp->plan_map, | ||
662 | get_rp_key (rp), | ||
663 | rp)); | ||
664 | while (NULL != (bi = rp->pe_head)) | ||
648 | { | 665 | { |
649 | GNUNET_SCHEDULER_cancel(pp->task); | 666 | GNUNET_CONTAINER_MDLL_remove (PE, |
650 | pp->task = NULL; | 667 | rp->pe_head, |
668 | rp->pe_tail, | ||
669 | bi); | ||
670 | prd = GSF_pending_request_get_data_ (bi->pr); | ||
671 | GNUNET_CONTAINER_MDLL_remove (PR, | ||
672 | prd->pr_head, | ||
673 | prd->pr_tail, | ||
674 | bi); | ||
675 | GNUNET_free (bi); | ||
651 | } | 676 | } |
652 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap))) | 677 | plan_count--; |
653 | { | 678 | GNUNET_free (rp); |
654 | GNUNET_break(GNUNET_YES == | 679 | } |
655 | GNUNET_CONTAINER_multihashmap_remove(pp->plan_map, | 680 | GNUNET_CONTAINER_heap_destroy (pp->priority_heap); |
656 | get_rp_key(rp), | 681 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) |
682 | { | ||
683 | GNUNET_break (GNUNET_YES == | ||
684 | GNUNET_CONTAINER_multihashmap_remove (pp->plan_map, | ||
685 | get_rp_key (rp), | ||
657 | rp)); | 686 | rp)); |
658 | while (NULL != (bi = rp->pe_head)) | 687 | while (NULL != (bi = rp->pe_head)) |
659 | { | ||
660 | GNUNET_CONTAINER_MDLL_remove(PE, | ||
661 | rp->pe_head, | ||
662 | rp->pe_tail, | ||
663 | bi); | ||
664 | prd = GSF_pending_request_get_data_(bi->pr); | ||
665 | GNUNET_CONTAINER_MDLL_remove(PR, | ||
666 | prd->pr_head, | ||
667 | prd->pr_tail, | ||
668 | bi); | ||
669 | GNUNET_free(bi); | ||
670 | } | ||
671 | plan_count--; | ||
672 | GNUNET_free(rp); | ||
673 | } | ||
674 | GNUNET_CONTAINER_heap_destroy(pp->priority_heap); | ||
675 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->delay_heap))) | ||
676 | { | 688 | { |
677 | GNUNET_break(GNUNET_YES == | 689 | prd = GSF_pending_request_get_data_ (bi->pr); |
678 | GNUNET_CONTAINER_multihashmap_remove(pp->plan_map, | 690 | GNUNET_CONTAINER_MDLL_remove (PE, |
679 | get_rp_key(rp), | 691 | rp->pe_head, |
680 | rp)); | 692 | rp->pe_tail, |
681 | while (NULL != (bi = rp->pe_head)) | 693 | bi); |
682 | { | 694 | GNUNET_CONTAINER_MDLL_remove (PR, |
683 | prd = GSF_pending_request_get_data_(bi->pr); | 695 | prd->pr_head, |
684 | GNUNET_CONTAINER_MDLL_remove(PE, | 696 | prd->pr_tail, |
685 | rp->pe_head, | 697 | bi); |
686 | rp->pe_tail, | 698 | GNUNET_free (bi); |
687 | bi); | ||
688 | GNUNET_CONTAINER_MDLL_remove(PR, | ||
689 | prd->pr_head, | ||
690 | prd->pr_tail, | ||
691 | bi); | ||
692 | GNUNET_free(bi); | ||
693 | } | ||
694 | plan_count--; | ||
695 | GNUNET_free(rp); | ||
696 | } | 699 | } |
697 | GNUNET_STATISTICS_set(GSF_stats, | 700 | plan_count--; |
698 | gettext_noop("# query plan entries"), | 701 | GNUNET_free (rp); |
699 | plan_count, | 702 | } |
700 | GNUNET_NO); | 703 | GNUNET_STATISTICS_set (GSF_stats, |
701 | GNUNET_CONTAINER_heap_destroy(pp->delay_heap); | 704 | gettext_noop ("# query plan entries"), |
702 | GNUNET_CONTAINER_multihashmap_destroy(pp->plan_map); | 705 | plan_count, |
703 | GNUNET_free(pp); | 706 | GNUNET_NO); |
707 | GNUNET_CONTAINER_heap_destroy (pp->delay_heap); | ||
708 | GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map); | ||
709 | GNUNET_free (pp); | ||
704 | } | 710 | } |
705 | 711 | ||
706 | 712 | ||
@@ -714,23 +720,27 @@ GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp) | |||
714 | * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise. | 720 | * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise. |
715 | */ | 721 | */ |
716 | int | 722 | int |
717 | GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head, | 723 | GSF_request_plan_reference_get_last_transmission_ (struct |
718 | struct GSF_ConnectedPeer *sender, | 724 | GSF_PendingRequestPlanBijection |
719 | struct GNUNET_TIME_Absolute *result) | 725 | *pr_head, |
726 | struct GSF_ConnectedPeer * | ||
727 | sender, | ||
728 | struct GNUNET_TIME_Absolute * | ||
729 | result) | ||
720 | { | 730 | { |
721 | struct GSF_PendingRequestPlanBijection *bi; | 731 | struct GSF_PendingRequestPlanBijection *bi; |
722 | 732 | ||
723 | for (bi = pr_head; NULL != bi; bi = bi->next_PR) | 733 | for (bi = pr_head; NULL != bi; bi = bi->next_PR) |
734 | { | ||
735 | if (bi->rp->pp->cp == sender) | ||
724 | { | 736 | { |
725 | if (bi->rp->pp->cp == sender) | 737 | if (0 == bi->rp->last_transmission.abs_value_us) |
726 | { | 738 | *result = GNUNET_TIME_UNIT_FOREVER_ABS; |
727 | if (0 == bi->rp->last_transmission.abs_value_us) | 739 | else |
728 | *result = GNUNET_TIME_UNIT_FOREVER_ABS; | 740 | *result = bi->rp->last_transmission; |
729 | else | 741 | return GNUNET_YES; |
730 | *result = bi->rp->last_transmission; | ||
731 | return GNUNET_YES; | ||
732 | } | ||
733 | } | 742 | } |
743 | } | ||
734 | return GNUNET_NO; | 744 | return GNUNET_NO; |
735 | } | 745 | } |
736 | 746 | ||
@@ -742,41 +752,41 @@ GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanB | |||
742 | * @param pr request that is done | 752 | * @param pr request that is done |
743 | */ | 753 | */ |
744 | void | 754 | void |
745 | GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr) | 755 | GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) |
746 | { | 756 | { |
747 | struct GSF_RequestPlan *rp; | 757 | struct GSF_RequestPlan *rp; |
748 | struct GSF_PendingRequestData *prd; | 758 | struct GSF_PendingRequestData *prd; |
749 | struct GSF_PendingRequestPlanBijection *bi; | 759 | struct GSF_PendingRequestPlanBijection *bi; |
750 | 760 | ||
751 | prd = GSF_pending_request_get_data_(pr); | 761 | prd = GSF_pending_request_get_data_ (pr); |
752 | while (NULL != (bi = prd->pr_head)) | 762 | while (NULL != (bi = prd->pr_head)) |
763 | { | ||
764 | rp = bi->rp; | ||
765 | GNUNET_CONTAINER_MDLL_remove (PR, | ||
766 | prd->pr_head, | ||
767 | prd->pr_tail, | ||
768 | bi); | ||
769 | GNUNET_CONTAINER_MDLL_remove (PE, | ||
770 | rp->pe_head, | ||
771 | rp->pe_tail, | ||
772 | bi); | ||
773 | GNUNET_assert (bi->pr == pr); | ||
774 | if (NULL == rp->pe_head) | ||
753 | { | 775 | { |
754 | rp = bi->rp; | 776 | GNUNET_CONTAINER_heap_remove_node (rp->hn); |
755 | GNUNET_CONTAINER_MDLL_remove(PR, | 777 | plan_count--; |
756 | prd->pr_head, | 778 | GNUNET_break (GNUNET_YES == |
757 | prd->pr_tail, | 779 | GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map, |
758 | bi); | 780 | &prd->query, |
759 | GNUNET_CONTAINER_MDLL_remove(PE, | 781 | rp)); |
760 | rp->pe_head, | 782 | GNUNET_free (rp); |
761 | rp->pe_tail, | ||
762 | bi); | ||
763 | GNUNET_assert(bi->pr == pr); | ||
764 | if (NULL == rp->pe_head) | ||
765 | { | ||
766 | GNUNET_CONTAINER_heap_remove_node(rp->hn); | ||
767 | plan_count--; | ||
768 | GNUNET_break(GNUNET_YES == | ||
769 | GNUNET_CONTAINER_multihashmap_remove(rp->pp->plan_map, | ||
770 | &prd->query, | ||
771 | rp)); | ||
772 | GNUNET_free(rp); | ||
773 | } | ||
774 | GNUNET_free(bi); | ||
775 | } | 783 | } |
776 | GNUNET_STATISTICS_set(GSF_stats, | 784 | GNUNET_free (bi); |
777 | gettext_noop("# query plan entries"), | 785 | } |
778 | plan_count, | 786 | GNUNET_STATISTICS_set (GSF_stats, |
779 | GNUNET_NO); | 787 | gettext_noop ("# query plan entries"), |
788 | plan_count, | ||
789 | GNUNET_NO); | ||
780 | } | 790 | } |
781 | 791 | ||
782 | 792 | ||
@@ -784,10 +794,10 @@ GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr) | |||
784 | * Initialize plan subsystem. | 794 | * Initialize plan subsystem. |
785 | */ | 795 | */ |
786 | void | 796 | void |
787 | GSF_plan_init() | 797 | GSF_plan_init () |
788 | { | 798 | { |
789 | plans = GNUNET_CONTAINER_multipeermap_create(256, | 799 | plans = GNUNET_CONTAINER_multipeermap_create (256, |
790 | GNUNET_YES); | 800 | GNUNET_YES); |
791 | } | 801 | } |
792 | 802 | ||
793 | 803 | ||
@@ -795,10 +805,10 @@ GSF_plan_init() | |||
795 | * Shutdown plan subsystem. | 805 | * Shutdown plan subsystem. |
796 | */ | 806 | */ |
797 | void | 807 | void |
798 | GSF_plan_done() | 808 | GSF_plan_done () |
799 | { | 809 | { |
800 | GNUNET_assert(0 == GNUNET_CONTAINER_multipeermap_size(plans)); | 810 | GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans)); |
801 | GNUNET_CONTAINER_multipeermap_destroy(plans); | 811 | GNUNET_CONTAINER_multipeermap_destroy (plans); |
802 | } | 812 | } |
803 | 813 | ||
804 | 814 | ||