summaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c629
1 files changed, 309 insertions, 320 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 5e85bfdb7..3c1112333 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -11,12 +11,12 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file fs/gnunet-service-fs_pe.c 22 * @file fs/gnunet-service-fs_pe.c
@@ -63,9 +63,7 @@ struct PeerPlan;
63 * to be able to lookup all plan entries corresponding 63 * to be able to lookup all plan entries corresponding
64 * to a given pending request.) 64 * to a given pending request.)
65 */ 65 */
66struct GSF_PendingRequestPlanBijection 66struct GSF_PendingRequestPlanBijection {
67{
68
69 /** 67 /**
70 * This is a doubly-linked list. 68 * This is a doubly-linked list.
71 */ 69 */
@@ -97,7 +95,6 @@ struct GSF_PendingRequestPlanBijection
97 * and one of the origins of the request). 95 * and one of the origins of the request).
98 */ 96 */
99 struct GSF_PendingRequest *pr; 97 struct GSF_PendingRequest *pr;
100
101}; 98};
102 99
103 100
@@ -107,9 +104,7 @@ struct GSF_PendingRequestPlanBijection
107 * with one entry in each heap of each `struct PeerPlan`. Each 104 * with one entry in each heap of each `struct PeerPlan`. Each
108 * entry tracks information relevant for this request and this peer. 105 * entry tracks information relevant for this request and this peer.
109 */ 106 */
110struct GSF_RequestPlan 107struct GSF_RequestPlan {
111{
112
113 /** 108 /**
114 * This is a doubly-linked list. 109 * This is a doubly-linked list.
115 */ 110 */
@@ -161,15 +156,13 @@ struct GSF_RequestPlan
161 * How often did we transmit this request to this peer? 156 * How often did we transmit this request to this peer?
162 */ 157 */
163 unsigned int transmission_counter; 158 unsigned int transmission_counter;
164
165}; 159};
166 160
167 161
168/** 162/**
169 * Transmission plan for a peer. 163 * Transmission plan for a peer.
170 */ 164 */
171struct PeerPlan 165struct PeerPlan {
172{
173 /** 166 /**
174 * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority. 167 * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority.
175 */ 168 */
@@ -202,7 +195,6 @@ struct PeerPlan
202 * Current message under transmission for the plan. 195 * Current message under transmission for the plan.
203 */ 196 */
204 struct GNUNET_MQ_Envelope *env; 197 struct GNUNET_MQ_Envelope *env;
205
206}; 198};
207 199
208 200
@@ -235,9 +227,9 @@ static unsigned long long plan_count;
235 * @return the associated query 227 * @return the associated query
236 */ 228 */
237static const struct GNUNET_HashCode * 229static const struct GNUNET_HashCode *
238get_rp_key (struct GSF_RequestPlan *rp) 230get_rp_key(struct GSF_RequestPlan *rp)
239{ 231{
240 return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query; 232 return &GSF_pending_request_get_data_(rp->pe_head->pr)->query;
241} 233}
242 234
243 235
@@ -248,8 +240,8 @@ get_rp_key (struct GSF_RequestPlan *rp)
248 * @param rp request to plan 240 * @param rp request to plan
249 */ 241 */
250static void 242static void
251plan (struct PeerPlan *pp, 243plan(struct PeerPlan *pp,
252 struct GSF_RequestPlan *rp) 244 struct GSF_RequestPlan *rp)
253{ 245{
254#define N ((double)128.0) 246#define N ((double)128.0)
255 /** 247 /**
@@ -260,28 +252,28 @@ plan (struct PeerPlan *pp,
260 struct GSF_PendingRequestData *prd; 252 struct GSF_PendingRequestData *prd;
261 struct GNUNET_TIME_Relative delay; 253 struct GNUNET_TIME_Relative delay;
262 254
263 GNUNET_assert (rp->pp == pp); 255 GNUNET_assert(rp->pp == pp);
264 GNUNET_STATISTICS_set (GSF_stats, 256 GNUNET_STATISTICS_set(GSF_stats,
265 gettext_noop ("# average retransmission delay (ms)"), 257 gettext_noop("# average retransmission delay (ms)"),
266 total_delay * 1000LL / plan_count, GNUNET_NO); 258 total_delay * 1000LL / plan_count, GNUNET_NO);
267 prd = GSF_pending_request_get_data_ (rp->pe_head->pr); 259 prd = GSF_pending_request_get_data_(rp->pe_head->pr);
268 260
269 if (rp->transmission_counter < 8) 261 if (rp->transmission_counter < 8)
270 delay = 262 delay =
271 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 263 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
272 rp->transmission_counter); 264 rp->transmission_counter);
273 else if (rp->transmission_counter < 32) 265 else if (rp->transmission_counter < 32)
274 delay = 266 delay =
275 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 267 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
276 8 + 268 8 +
277 (1LL << (rp->transmission_counter - 8))); 269 (1LL << (rp->transmission_counter - 8)));
278 else 270 else
279 delay = 271 delay =
280 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 272 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
281 8 + (1LL << 24)); 273 8 + (1LL << 24));
282 delay.rel_value_us = 274 delay.rel_value_us =
283 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 275 GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
284 delay.rel_value_us + 1); 276 delay.rel_value_us + 1);
285 /* Add 0.01 to avg_delay to avoid division-by-zero later */ 277 /* Add 0.01 to avg_delay to avoid division-by-zero later */
286 avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01; 278 avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
287 279
@@ -307,36 +299,36 @@ plan (struct PeerPlan *pp,
307 * Note: M_PI_4 = PI/4 = arctan(1) 299 * Note: M_PI_4 = PI/4 = arctan(1)
308 */ 300 */
309 rp->priority = 301 rp->priority =
310 round ((GSF_current_priorities + 302 round((GSF_current_priorities +
311 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4; 303 1.0) * atan(delay.rel_value_us / avg_delay)) / M_PI_4;
312 /* Note: usage of 'round' and 'atan' requires -lm */ 304 /* Note: usage of 'round' and 'atan' requires -lm */
313 305
314 if (rp->transmission_counter != 0) 306 if (rp->transmission_counter != 0)
315 delay.rel_value_us += TTL_DECREMENT * 1000; 307 delay.rel_value_us += TTL_DECREMENT * 1000;
316 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 308 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
317 "Considering (re)transmission number %u in %s\n", 309 "Considering (re)transmission number %u in %s\n",
318 (unsigned int) rp->transmission_counter, 310 (unsigned int)rp->transmission_counter,
319 GNUNET_STRINGS_relative_time_to_string (delay, 311 GNUNET_STRINGS_relative_time_to_string(delay,
320 GNUNET_YES)); 312 GNUNET_YES));
321 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); 313 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute(delay);
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 314 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
323 "Earliest (re)transmission for `%s' in %us\n", 315 "Earliest (re)transmission for `%s' in %us\n",
324 GNUNET_h2s (&prd->query), 316 GNUNET_h2s(&prd->query),
325 rp->transmission_counter); 317 rp->transmission_counter);
326 GNUNET_assert (rp->hn == NULL); 318 GNUNET_assert(rp->hn == NULL);
327 if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) 319 if (0 == GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission).rel_value_us)
328 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, 320 rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap,
329 rp, 321 rp,
330 rp->priority); 322 rp->priority);
331 else 323 else
332 rp->hn = 324 rp->hn =
333 GNUNET_CONTAINER_heap_insert (pp->delay_heap, 325 GNUNET_CONTAINER_heap_insert(pp->delay_heap,
334 rp, 326 rp,
335 rp->earliest_transmission.abs_value_us); 327 rp->earliest_transmission.abs_value_us);
336 GNUNET_assert (GNUNET_YES == 328 GNUNET_assert(GNUNET_YES ==
337 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, 329 GNUNET_CONTAINER_multihashmap_contains_value(pp->plan_map,
338 get_rp_key (rp), 330 get_rp_key(rp),
339 rp)); 331 rp));
340#undef N 332#undef N
341} 333}
342 334
@@ -348,7 +340,7 @@ plan (struct PeerPlan *pp,
348 * @return pending request with highest TTL 340 * @return pending request with highest TTL
349 */ 341 */
350struct GSF_PendingRequest * 342struct GSF_PendingRequest *
351get_latest (const struct GSF_RequestPlan *rp) 343get_latest(const struct GSF_RequestPlan *rp)
352{ 344{
353 struct GSF_PendingRequest *ret; 345 struct GSF_PendingRequest *ret;
354 struct GSF_PendingRequestPlanBijection *bi; 346 struct GSF_PendingRequestPlanBijection *bi;
@@ -359,18 +351,18 @@ get_latest (const struct GSF_RequestPlan *rp)
359 if (NULL == bi) 351 if (NULL == bi)
360 return NULL; /* should never happen */ 352 return NULL; /* should never happen */
361 ret = bi->pr; 353 ret = bi->pr;
362 rprd = GSF_pending_request_get_data_ (ret); 354 rprd = GSF_pending_request_get_data_(ret);
363 for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE) 355 for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
364 {
365 GNUNET_break (GNUNET_YES ==
366 GSF_pending_request_test_active_ (bi->pr));
367 prd = GSF_pending_request_get_data_ (bi->pr);
368 if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
369 { 356 {
370 ret = bi->pr; 357 GNUNET_break(GNUNET_YES ==
371 rprd = prd; 358 GSF_pending_request_test_active_(bi->pr));
359 prd = GSF_pending_request_get_data_(bi->pr);
360 if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
361 {
362 ret = bi->pr;
363 rprd = prd;
364 }
372 } 365 }
373 }
374 return ret; 366 return ret;
375} 367}
376 368
@@ -381,102 +373,100 @@ get_latest (const struct GSF_RequestPlan *rp)
381 * @param cls the `struct PeerPlan` 373 * @param cls the `struct PeerPlan`
382 */ 374 */
383static void 375static void
384schedule_peer_transmission (void *cls) 376schedule_peer_transmission(void *cls)
385{ 377{
386 struct PeerPlan *pp = cls; 378 struct PeerPlan *pp = cls;
387 struct GSF_RequestPlan *rp; 379 struct GSF_RequestPlan *rp;
388 struct GNUNET_TIME_Relative delay; 380 struct GNUNET_TIME_Relative delay;
389 381
390 if (NULL != pp->task) 382 if (NULL != pp->task)
391 { 383 {
392 pp->task = NULL; 384 pp->task = NULL;
393 } 385 }
394 else 386 else
395 { 387 {
396 GNUNET_assert (NULL != pp->env); 388 GNUNET_assert(NULL != pp->env);
397 pp->env = NULL; 389 pp->env = NULL;
398 } 390 }
399 /* move ready requests to priority queue */ 391 /* move ready requests to priority queue */
400 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && 392 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap))) &&
401 (0 == GNUNET_TIME_absolute_get_remaining 393 (0 == GNUNET_TIME_absolute_get_remaining
402 (rp->earliest_transmission).rel_value_us)) 394 (rp->earliest_transmission).rel_value_us))
403 {
404 GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
405 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
406 rp,
407 rp->priority);
408 }
409 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
410 {
411 /* priority heap (still) empty, check for delay... */
412 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
413 if (NULL == rp)
414 { 395 {
415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 396 GNUNET_assert(rp == GNUNET_CONTAINER_heap_remove_root(pp->delay_heap));
416 "No active requests for plan %p.\n", 397 rp->hn = GNUNET_CONTAINER_heap_insert(pp->priority_heap,
417 pp); 398 rp,
418 return; /* both queues empty */ 399 rp->priority);
400 }
401 if (0 == GNUNET_CONTAINER_heap_get_size(pp->priority_heap))
402 {
403 /* priority heap (still) empty, check for delay... */
404 rp = GNUNET_CONTAINER_heap_peek(pp->delay_heap);
405 if (NULL == rp)
406 {
407 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
408 "No active requests for plan %p.\n",
409 pp);
410 return; /* both queues empty */
411 }
412 delay = GNUNET_TIME_absolute_get_remaining(rp->earliest_transmission);
413 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
414 "Sleeping for %s before retrying requests on plan %p.\n",
415 GNUNET_STRINGS_relative_time_to_string(delay,
416 GNUNET_YES),
417 pp);
418 GNUNET_STATISTICS_set(GSF_stats,
419 gettext_noop("# delay heap timeout (ms)"),
420 delay.rel_value_us / 1000LL, GNUNET_NO);
421
422 pp->task
423 = GNUNET_SCHEDULER_add_at(rp->earliest_transmission,
424 &schedule_peer_transmission,
425 pp);
426 return;
419 } 427 }
420 delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
421 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422 "Sleeping for %s before retrying requests on plan %p.\n",
423 GNUNET_STRINGS_relative_time_to_string (delay,
424 GNUNET_YES),
425 pp);
426 GNUNET_STATISTICS_set (GSF_stats,
427 gettext_noop ("# delay heap timeout (ms)"),
428 delay.rel_value_us / 1000LL, GNUNET_NO);
429
430 pp->task
431 = GNUNET_SCHEDULER_add_at (rp->earliest_transmission,
432 &schedule_peer_transmission,
433 pp);
434 return;
435 }
436#if INSANE_STATISTICS 428#if INSANE_STATISTICS
437 GNUNET_STATISTICS_update (GSF_stats, 429 GNUNET_STATISTICS_update(GSF_stats,
438 gettext_noop ("# query plans executed"), 430 gettext_noop("# query plans executed"),
439 1, 431 1,
440 GNUNET_NO); 432 GNUNET_NO);
441#endif 433#endif
442 /* process from priority heap */ 434 /* process from priority heap */
443 rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); 435 rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap);
444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 436 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
445 "Executing query plan %p\n", 437 "Executing query plan %p\n",
446 rp); 438 rp);
447 GNUNET_assert (NULL != rp); 439 GNUNET_assert(NULL != rp);
448 rp->hn = NULL; 440 rp->hn = NULL;
449 rp->last_transmission = GNUNET_TIME_absolute_get (); 441 rp->last_transmission = GNUNET_TIME_absolute_get();
450 rp->transmission_counter++; 442 rp->transmission_counter++;
451 total_delay++; 443 total_delay++;
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 444 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
453 "Executing plan %p executed %u times, planning retransmission\n", 445 "Executing plan %p executed %u times, planning retransmission\n",
454 rp, 446 rp,
455 rp->transmission_counter); 447 rp->transmission_counter);
456 GNUNET_assert (NULL == pp->env); 448 GNUNET_assert(NULL == pp->env);
457 pp->env = GSF_pending_request_get_message_ (get_latest (rp)); 449 pp->env = GSF_pending_request_get_message_(get_latest(rp));
458 GNUNET_MQ_notify_sent (pp->env, 450 GNUNET_MQ_notify_sent(pp->env,
459 &schedule_peer_transmission, 451 &schedule_peer_transmission,
460 pp); 452 pp);
461 GSF_peer_transmit_ (pp->cp, 453 GSF_peer_transmit_(pp->cp,
462 GNUNET_YES, 454 GNUNET_YES,
463 rp->priority, 455 rp->priority,
464 pp->env); 456 pp->env);
465 GNUNET_STATISTICS_update (GSF_stats, 457 GNUNET_STATISTICS_update(GSF_stats,
466 gettext_noop ("# query messages sent to other peers"), 458 gettext_noop("# query messages sent to other peers"),
467 1, 459 1,
468 GNUNET_NO); 460 GNUNET_NO);
469 plan (pp, 461 plan(pp,
470 rp); 462 rp);
471} 463}
472 464
473 465
474/** 466/**
475 * Closure for merge_pr(). 467 * Closure for merge_pr().
476 */ 468 */
477struct MergeContext 469struct MergeContext {
478{
479
480 /** 470 /**
481 * Request we are trying to merge. 471 * Request we are trying to merge.
482 */ 472 */
@@ -486,7 +476,6 @@ struct MergeContext
486 * Set to #GNUNET_YES if we succeeded to merge. 476 * Set to #GNUNET_YES if we succeeded to merge.
487 */ 477 */
488 int merged; 478 int merged;
489
490}; 479};
491 480
492 481
@@ -501,9 +490,9 @@ struct MergeContext
501 * #GNUNET_NO if not (merge success) 490 * #GNUNET_NO if not (merge success)
502 */ 491 */
503static int 492static int
504merge_pr (void *cls, 493merge_pr(void *cls,
505 const struct GNUNET_HashCode *query, 494 const struct GNUNET_HashCode *query,
506 void *element) 495 void *element)
507{ 496{
508 struct MergeContext *mpr = cls; 497 struct MergeContext *mpr = cls;
509 struct GSF_RequestPlan *rp = element; 498 struct GSF_RequestPlan *rp = element;
@@ -511,44 +500,44 @@ merge_pr (void *cls,
511 struct GSF_PendingRequestPlanBijection *bi; 500 struct GSF_PendingRequestPlanBijection *bi;
512 struct GSF_PendingRequest *latest; 501 struct GSF_PendingRequest *latest;
513 502
514 GNUNET_break (GNUNET_YES == 503 GNUNET_break(GNUNET_YES ==
515 GSF_pending_request_test_active_ (mpr->pr)); 504 GSF_pending_request_test_active_(mpr->pr));
516 if (GNUNET_OK != 505 if (GNUNET_OK !=
517 GSF_pending_request_is_compatible_ (mpr->pr, 506 GSF_pending_request_is_compatible_(mpr->pr,
518 rp->pe_head->pr)) 507 rp->pe_head->pr))
519 return GNUNET_YES; 508 return GNUNET_YES;
520 /* merge new request with existing request plan */ 509 /* merge new request with existing request plan */
521 bi = GNUNET_new (struct GSF_PendingRequestPlanBijection); 510 bi = GNUNET_new(struct GSF_PendingRequestPlanBijection);
522 bi->rp = rp; 511 bi->rp = rp;
523 bi->pr = mpr->pr; 512 bi->pr = mpr->pr;
524 prd = GSF_pending_request_get_data_ (mpr->pr); 513 prd = GSF_pending_request_get_data_(mpr->pr);
525 GNUNET_CONTAINER_MDLL_insert (PR, 514 GNUNET_CONTAINER_MDLL_insert(PR,
526 prd->pr_head, 515 prd->pr_head,
527 prd->pr_tail, 516 prd->pr_tail,
528 bi); 517 bi);
529 GNUNET_CONTAINER_MDLL_insert (PE, 518 GNUNET_CONTAINER_MDLL_insert(PE,
530 rp->pe_head, 519 rp->pe_head,
531 rp->pe_tail, 520 rp->pe_tail,
532 bi); 521 bi);
533 mpr->merged = GNUNET_YES; 522 mpr->merged = GNUNET_YES;
534#if INSANE_STATISTICS 523#if INSANE_STATISTICS
535 GNUNET_STATISTICS_update (GSF_stats, 524 GNUNET_STATISTICS_update(GSF_stats,
536 gettext_noop ("# requests merged"), 525 gettext_noop("# requests merged"),
537 1, 526 1,
538 GNUNET_NO); 527 GNUNET_NO);
539#endif 528#endif
540 latest = get_latest (rp); 529 latest = get_latest(rp);
541 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us < 530 if (GSF_pending_request_get_data_(latest)->ttl.abs_value_us <
542 prd->ttl.abs_value_us) 531 prd->ttl.abs_value_us)
543 { 532 {
544#if INSANE_STATISTICS 533#if INSANE_STATISTICS
545 GNUNET_STATISTICS_update (GSF_stats, 534 GNUNET_STATISTICS_update(GSF_stats,
546 gettext_noop ("# requests refreshed"), 535 gettext_noop("# requests refreshed"),
547 1, 536 1,
548 GNUNET_NO); 537 GNUNET_NO);
549#endif 538#endif
550 rp->transmission_counter = 0; /* reset */ 539 rp->transmission_counter = 0; /* reset */
551 } 540 }
552 return GNUNET_NO; 541 return GNUNET_NO;
553} 542}
554 543
@@ -560,8 +549,8 @@ merge_pr (void *cls,
560 * @param pr request with the entry 549 * @param pr request with the entry
561 */ 550 */
562void 551void
563GSF_plan_add_ (struct GSF_ConnectedPeer *cp, 552GSF_plan_add_(struct GSF_ConnectedPeer *cp,
564 struct GSF_PendingRequest *pr) 553 struct GSF_PendingRequest *pr)
565{ 554{
566 const struct GNUNET_PeerIdentity *id; 555 const struct GNUNET_PeerIdentity *id;
567 struct PeerPlan *pp; 556 struct PeerPlan *pp;
@@ -570,66 +559,66 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
570 struct GSF_PendingRequestPlanBijection *bi; 559 struct GSF_PendingRequestPlanBijection *bi;
571 struct MergeContext mpc; 560 struct MergeContext mpc;
572 561
573 GNUNET_assert (GNUNET_YES == 562 GNUNET_assert(GNUNET_YES ==
574 GSF_pending_request_test_active_ (pr)); 563 GSF_pending_request_test_active_(pr));
575 GNUNET_assert (NULL != cp); 564 GNUNET_assert(NULL != cp);
576 id = GSF_connected_peer_get_identity2_ (cp); 565 id = GSF_connected_peer_get_identity2_(cp);
577 pp = GNUNET_CONTAINER_multipeermap_get (plans, id); 566 pp = GNUNET_CONTAINER_multipeermap_get(plans, id);
578 if (NULL == pp) 567 if (NULL == pp)
579 { 568 {
580 pp = GNUNET_new (struct PeerPlan); 569 pp = GNUNET_new(struct PeerPlan);
581 pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO); 570 pp->plan_map = GNUNET_CONTAINER_multihashmap_create(128, GNUNET_NO);
582 pp->priority_heap = 571 pp->priority_heap =
583 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX); 572 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MAX);
584 pp->delay_heap = 573 pp->delay_heap =
585 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 574 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
586 pp->cp = cp; 575 pp->cp = cp;
587 GNUNET_assert (GNUNET_OK == 576 GNUNET_assert(GNUNET_OK ==
588 GNUNET_CONTAINER_multipeermap_put (plans, 577 GNUNET_CONTAINER_multipeermap_put(plans,
589 id, 578 id,
590 pp, 579 pp,
591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 580 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, 581 pp->task = GNUNET_SCHEDULER_add_now(&schedule_peer_transmission,
593 pp); 582 pp);
594 } 583 }
595 mpc.merged = GNUNET_NO; 584 mpc.merged = GNUNET_NO;
596 mpc.pr = pr; 585 mpc.pr = pr;
597 prd = GSF_pending_request_get_data_ (pr); 586 prd = GSF_pending_request_get_data_(pr);
598 GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map, 587 GNUNET_CONTAINER_multihashmap_get_multiple(pp->plan_map,
599 &prd->query, 588 &prd->query,
600 &merge_pr, 589 &merge_pr,
601 &mpc); 590 &mpc);
602 if (GNUNET_NO != mpc.merged) 591 if (GNUNET_NO != mpc.merged)
603 return; 592 return;
604 plan_count++; 593 plan_count++;
605 GNUNET_STATISTICS_update (GSF_stats, 594 GNUNET_STATISTICS_update(GSF_stats,
606 gettext_noop ("# query plan entries"), 595 gettext_noop("# query plan entries"),
607 1, 596 1,
608 GNUNET_NO); 597 GNUNET_NO);
609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 598 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
610 "Planning transmission of query `%s' to peer `%s'\n", 599 "Planning transmission of query `%s' to peer `%s'\n",
611 GNUNET_h2s (&prd->query), 600 GNUNET_h2s(&prd->query),
612 GNUNET_i2s (id)); 601 GNUNET_i2s(id));
613 rp = GNUNET_new (struct GSF_RequestPlan); 602 rp = GNUNET_new(struct GSF_RequestPlan);
614 bi = GNUNET_new (struct GSF_PendingRequestPlanBijection); 603 bi = GNUNET_new(struct GSF_PendingRequestPlanBijection);
615 bi->rp = rp; 604 bi->rp = rp;
616 bi->pr = pr; 605 bi->pr = pr;
617 GNUNET_CONTAINER_MDLL_insert (PR, 606 GNUNET_CONTAINER_MDLL_insert(PR,
618 prd->pr_head, 607 prd->pr_head,
619 prd->pr_tail, 608 prd->pr_tail,
620 bi); 609 bi);
621 GNUNET_CONTAINER_MDLL_insert (PE, 610 GNUNET_CONTAINER_MDLL_insert(PE,
622 rp->pe_head, 611 rp->pe_head,
623 rp->pe_tail, 612 rp->pe_tail,
624 bi); 613 bi);
625 rp->pp = pp; 614 rp->pp = pp;
626 GNUNET_assert (GNUNET_YES == 615 GNUNET_assert(GNUNET_YES ==
627 GNUNET_CONTAINER_multihashmap_put (pp->plan_map, 616 GNUNET_CONTAINER_multihashmap_put(pp->plan_map,
628 get_rp_key (rp), 617 get_rp_key(rp),
629 rp, 618 rp,
630 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 619 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
631 plan (pp, 620 plan(pp,
632 rp); 621 rp);
633} 622}
634 623
635 624
@@ -640,7 +629,7 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
640 * @param cp connected peer 629 * @param cp connected peer
641 */ 630 */
642void 631void
643GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) 632GSF_plan_notify_peer_disconnect_(const struct GSF_ConnectedPeer *cp)
644{ 633{
645 const struct GNUNET_PeerIdentity *id; 634 const struct GNUNET_PeerIdentity *id;
646 struct PeerPlan *pp; 635 struct PeerPlan *pp;
@@ -648,70 +637,70 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
648 struct GSF_PendingRequestData *prd; 637 struct GSF_PendingRequestData *prd;
649 struct GSF_PendingRequestPlanBijection *bi; 638 struct GSF_PendingRequestPlanBijection *bi;
650 639
651 id = GSF_connected_peer_get_identity2_ (cp); 640 id = GSF_connected_peer_get_identity2_(cp);
652 pp = GNUNET_CONTAINER_multipeermap_get (plans, id); 641 pp = GNUNET_CONTAINER_multipeermap_get(plans, id);
653 if (NULL == pp) 642 if (NULL == pp)
654 return; /* nothing was ever planned for this peer */ 643 return; /* nothing was ever planned for this peer */
655 GNUNET_assert (GNUNET_YES == 644 GNUNET_assert(GNUNET_YES ==
656 GNUNET_CONTAINER_multipeermap_remove (plans, id, 645 GNUNET_CONTAINER_multipeermap_remove(plans, id,
657 pp)); 646 pp));
658 if (NULL != pp->task) 647 if (NULL != pp->task)
659 {
660 GNUNET_SCHEDULER_cancel (pp->task);
661 pp->task = NULL;
662 }
663 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
664 {
665 GNUNET_break (GNUNET_YES ==
666 GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
667 get_rp_key (rp),
668 rp));
669 while (NULL != (bi = rp->pe_head))
670 { 648 {
671 GNUNET_CONTAINER_MDLL_remove (PE, 649 GNUNET_SCHEDULER_cancel(pp->task);
672 rp->pe_head, 650 pp->task = NULL;
673 rp->pe_tail,
674 bi);
675 prd = GSF_pending_request_get_data_ (bi->pr);
676 GNUNET_CONTAINER_MDLL_remove (PR,
677 prd->pr_head,
678 prd->pr_tail,
679 bi);
680 GNUNET_free (bi);
681 } 651 }
682 plan_count--; 652 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->priority_heap)))
683 GNUNET_free (rp); 653 {
684 } 654 GNUNET_break(GNUNET_YES ==
685 GNUNET_CONTAINER_heap_destroy (pp->priority_heap); 655 GNUNET_CONTAINER_multihashmap_remove(pp->plan_map,
686 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) 656 get_rp_key(rp),
687 {
688 GNUNET_break (GNUNET_YES ==
689 GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
690 get_rp_key (rp),
691 rp)); 657 rp));
692 while (NULL != (bi = rp->pe_head)) 658 while (NULL != (bi = rp->pe_head))
659 {
660 GNUNET_CONTAINER_MDLL_remove(PE,
661 rp->pe_head,
662 rp->pe_tail,
663 bi);
664 prd = GSF_pending_request_get_data_(bi->pr);
665 GNUNET_CONTAINER_MDLL_remove(PR,
666 prd->pr_head,
667 prd->pr_tail,
668 bi);
669 GNUNET_free(bi);
670 }
671 plan_count--;
672 GNUNET_free(rp);
673 }
674 GNUNET_CONTAINER_heap_destroy(pp->priority_heap);
675 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root(pp->delay_heap)))
693 { 676 {
694 prd = GSF_pending_request_get_data_ (bi->pr); 677 GNUNET_break(GNUNET_YES ==
695 GNUNET_CONTAINER_MDLL_remove (PE, 678 GNUNET_CONTAINER_multihashmap_remove(pp->plan_map,
696 rp->pe_head, 679 get_rp_key(rp),
697 rp->pe_tail, 680 rp));
698 bi); 681 while (NULL != (bi = rp->pe_head))
699 GNUNET_CONTAINER_MDLL_remove (PR, 682 {
700 prd->pr_head, 683 prd = GSF_pending_request_get_data_(bi->pr);
701 prd->pr_tail, 684 GNUNET_CONTAINER_MDLL_remove(PE,
702 bi); 685 rp->pe_head,
703 GNUNET_free (bi); 686 rp->pe_tail,
687 bi);
688 GNUNET_CONTAINER_MDLL_remove(PR,
689 prd->pr_head,
690 prd->pr_tail,
691 bi);
692 GNUNET_free(bi);
693 }
694 plan_count--;
695 GNUNET_free(rp);
704 } 696 }
705 plan_count--; 697 GNUNET_STATISTICS_set(GSF_stats,
706 GNUNET_free (rp); 698 gettext_noop("# query plan entries"),
707 } 699 plan_count,
708 GNUNET_STATISTICS_set (GSF_stats, 700 GNUNET_NO);
709 gettext_noop ("# query plan entries"), 701 GNUNET_CONTAINER_heap_destroy(pp->delay_heap);
710 plan_count, 702 GNUNET_CONTAINER_multihashmap_destroy(pp->plan_map);
711 GNUNET_NO); 703 GNUNET_free(pp);
712 GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
713 GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
714 GNUNET_free (pp);
715} 704}
716 705
717 706
@@ -725,23 +714,23 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
725 * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise. 714 * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
726 */ 715 */
727int 716int
728GSF_request_plan_reference_get_last_transmission_ (struct GSF_PendingRequestPlanBijection *pr_head, 717GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head,
729 struct GSF_ConnectedPeer *sender, 718 struct GSF_ConnectedPeer *sender,
730 struct GNUNET_TIME_Absolute *result) 719 struct GNUNET_TIME_Absolute *result)
731{ 720{
732 struct GSF_PendingRequestPlanBijection *bi; 721 struct GSF_PendingRequestPlanBijection *bi;
733 722
734 for (bi = pr_head; NULL != bi; bi = bi->next_PR) 723 for (bi = pr_head; NULL != bi; bi = bi->next_PR)
735 {
736 if (bi->rp->pp->cp == sender)
737 { 724 {
738 if (0 == bi->rp->last_transmission.abs_value_us) 725 if (bi->rp->pp->cp == sender)
739 *result = GNUNET_TIME_UNIT_FOREVER_ABS; 726 {
740 else 727 if (0 == bi->rp->last_transmission.abs_value_us)
741 *result = bi->rp->last_transmission; 728 *result = GNUNET_TIME_UNIT_FOREVER_ABS;
742 return GNUNET_YES; 729 else
730 *result = bi->rp->last_transmission;
731 return GNUNET_YES;
732 }
743 } 733 }
744 }
745 return GNUNET_NO; 734 return GNUNET_NO;
746} 735}
747 736
@@ -753,41 +742,41 @@ GSF_request_plan_reference_get_last_transmission_ (struct GSF_PendingRequestPlan
753 * @param pr request that is done 742 * @param pr request that is done
754 */ 743 */
755void 744void
756GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) 745GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr)
757{ 746{
758 struct GSF_RequestPlan *rp; 747 struct GSF_RequestPlan *rp;
759 struct GSF_PendingRequestData *prd; 748 struct GSF_PendingRequestData *prd;
760 struct GSF_PendingRequestPlanBijection *bi; 749 struct GSF_PendingRequestPlanBijection *bi;
761 750
762 prd = GSF_pending_request_get_data_ (pr); 751 prd = GSF_pending_request_get_data_(pr);
763 while (NULL != (bi = prd->pr_head)) 752 while (NULL != (bi = prd->pr_head))
764 {
765 rp = bi->rp;
766 GNUNET_CONTAINER_MDLL_remove (PR,
767 prd->pr_head,
768 prd->pr_tail,
769 bi);
770 GNUNET_CONTAINER_MDLL_remove (PE,
771 rp->pe_head,
772 rp->pe_tail,
773 bi);
774 GNUNET_assert (bi->pr == pr);
775 if (NULL == rp->pe_head)
776 { 753 {
777 GNUNET_CONTAINER_heap_remove_node (rp->hn); 754 rp = bi->rp;
778 plan_count--; 755 GNUNET_CONTAINER_MDLL_remove(PR,
779 GNUNET_break (GNUNET_YES == 756 prd->pr_head,
780 GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map, 757 prd->pr_tail,
781 &prd->query, 758 bi);
782 rp)); 759 GNUNET_CONTAINER_MDLL_remove(PE,
783 GNUNET_free (rp); 760 rp->pe_head,
761 rp->pe_tail,
762 bi);
763 GNUNET_assert(bi->pr == pr);
764 if (NULL == rp->pe_head)
765 {
766 GNUNET_CONTAINER_heap_remove_node(rp->hn);
767 plan_count--;
768 GNUNET_break(GNUNET_YES ==
769 GNUNET_CONTAINER_multihashmap_remove(rp->pp->plan_map,
770 &prd->query,
771 rp));
772 GNUNET_free(rp);
773 }
774 GNUNET_free(bi);
784 } 775 }
785 GNUNET_free (bi); 776 GNUNET_STATISTICS_set(GSF_stats,
786 } 777 gettext_noop("# query plan entries"),
787 GNUNET_STATISTICS_set (GSF_stats, 778 plan_count,
788 gettext_noop ("# query plan entries"), 779 GNUNET_NO);
789 plan_count,
790 GNUNET_NO);
791} 780}
792 781
793 782
@@ -795,10 +784,10 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
795 * Initialize plan subsystem. 784 * Initialize plan subsystem.
796 */ 785 */
797void 786void
798GSF_plan_init () 787GSF_plan_init()
799{ 788{
800 plans = GNUNET_CONTAINER_multipeermap_create (256, 789 plans = GNUNET_CONTAINER_multipeermap_create(256,
801 GNUNET_YES); 790 GNUNET_YES);
802} 791}
803 792
804 793
@@ -806,10 +795,10 @@ GSF_plan_init ()
806 * Shutdown plan subsystem. 795 * Shutdown plan subsystem.
807 */ 796 */
808void 797void
809GSF_plan_done () 798GSF_plan_done()
810{ 799{
811 GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans)); 800 GNUNET_assert(0 == GNUNET_CONTAINER_multipeermap_size(plans));
812 GNUNET_CONTAINER_multipeermap_destroy (plans); 801 GNUNET_CONTAINER_multipeermap_destroy(plans);
813} 802}
814 803
815 804