aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-26 16:27:05 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-26 16:27:05 +0000
commitbf4a9d8364675b34ac18d505e508006e2b773670 (patch)
treea3bf1d6965c9f7fa4215d2377949f950b71ba087 /src/fs/gnunet-service-fs_pe.c
parent661869cea600301846e9d78475c8c3b4dbde225a (diff)
downloadgnunet-bf4a9d8364675b34ac18d505e508006e2b773670.tar.gz
gnunet-bf4a9d8364675b34ac18d505e508006e2b773670.zip
fixing fs
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c319
1 files changed, 238 insertions, 81 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 83733ef8d..4dd54c88e 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -31,6 +31,72 @@
31 31
32 32
33/** 33/**
34 * List of GSF_PendingRequests this request plan
35 * participates with.
36 */
37struct PendingRequestList;
38
39
40/**
41 * DLL of request plans a particular pending request is
42 * involved with.
43 */
44struct GSF_RequestPlanReference
45{
46
47 /**
48 * This is a doubly-linked list.
49 */
50 struct GSF_RequestPlanReference *next;
51
52 /**
53 * This is a doubly-linked list.
54 */
55 struct GSF_RequestPlanReference *prev;
56
57 /**
58 * Associated request plan.
59 */
60 struct GSF_RequestPlan *rp;
61
62 /**
63 * Corresponding PendingRequestList.
64 */
65 struct PendingRequestList *prl;
66};
67
68
69/**
70 * List of GSF_PendingRequests this request plan
71 * participates with.
72 */
73struct PendingRequestList
74{
75
76 /**
77 * This is a doubly-linked list.
78 */
79 struct PendingRequestList *next;
80
81 /**
82 * This is a doubly-linked list.
83 */
84 struct PendingRequestList *prev;
85
86 /**
87 * Array of associated pending requests.
88 */
89 struct GSF_PendingRequest *pr;
90
91 /**
92 * Corresponding GSF_RequestPlanReference.
93 */
94 struct GSF_RequestPlanReference *rpr;
95
96};
97
98
99/**
34 * Information we keep per request per peer. This is a doubly-linked 100 * Information we keep per request per peer. This is a doubly-linked
35 * list (with head and tail in the 'struct GSF_PendingRequestData') 101 * list (with head and tail in the 'struct GSF_PendingRequestData')
36 * with one entry in each heap of each 'struct PeerPlan'. Each 102 * with one entry in each heap of each 'struct PeerPlan'. Each
@@ -55,9 +121,14 @@ struct GSF_RequestPlan
55 struct GNUNET_CONTAINER_HeapNode *hn; 121 struct GNUNET_CONTAINER_HeapNode *hn;
56 122
57 /** 123 /**
58 * Array of associated pending requests. 124 * Head of list of associated pending requests.
59 */ 125 */
60 struct GSF_PendingRequest **prs; 126 struct PendingRequestList *prl_head;
127
128 /**
129 * Tail of list of associated pending requests.
130 */
131 struct PendingRequestList *prl_tail;
61 132
62 /** 133 /**
63 * Earliest time we'd be happy to (re)transmit this request. 134 * Earliest time we'd be happy to (re)transmit this request.
@@ -70,11 +141,6 @@ struct GSF_RequestPlan
70 struct GNUNET_TIME_Absolute last_transmission; 141 struct GNUNET_TIME_Absolute last_transmission;
71 142
72 /** 143 /**
73 * Number of entries in 'prs'.
74 */
75 unsigned int prs_length;
76
77 /**
78 * Current priority for this request for this target. 144 * Current priority for this request for this target.
79 */ 145 */
80 uint64_t priority; 146 uint64_t priority;
@@ -163,7 +229,7 @@ plan (struct PeerPlan *pp,
163 gettext_noop ("# average retransmission delay (ms)"), 229 gettext_noop ("# average retransmission delay (ms)"),
164 total_delay * 1000LL / plan_count, 230 total_delay * 1000LL / plan_count,
165 GNUNET_NO); 231 GNUNET_NO);
166 prd = GSF_pending_request_get_data_ (rp->prs[0]); 232 prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
167 // FIXME: calculate 'rp->priority'! 233 // FIXME: calculate 'rp->priority'!
168 if (rp->transmission_counter < 32) 234 if (rp->transmission_counter < 32)
169 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 235 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -196,6 +262,32 @@ plan (struct PeerPlan *pp,
196 262
197 263
198/** 264/**
265 * Get the pending request with the highest TTL from the given plan.
266 *
267 * @param rp plan to investigate
268 * @return pending request with highest TTL
269 */
270struct GSF_PendingRequest *
271get_latest (const struct GSF_RequestPlan *rp)
272{
273 struct GSF_PendingRequest *ret;
274 struct PendingRequestList *prl;
275
276 prl = rp->prl_head;
277 ret = prl->pr;
278 prl = prl->next;
279 while (NULL != prl)
280 {
281 if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
282 GSF_pending_request_get_data_ (ret)->ttl.abs_value)
283 ret = prl->pr;
284 prl = prl->next;
285 }
286 return ret;
287}
288
289
290/**
199 * Function called to get a message for transmission. 291 * Function called to get a message for transmission.
200 * 292 *
201 * @param cls closure 293 * @param cls closure
@@ -225,7 +317,7 @@ transmit_message_callback (void *cls,
225 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 317 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
226 return 0; 318 return 0;
227 } 319 }
228 msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf); 320 msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
229 if (msize > buf_size) 321 if (msize > buf_size)
230 { 322 {
231 /* buffer to small (message changed), try again */ 323 /* buffer to small (message changed), try again */
@@ -314,7 +406,7 @@ schedule_peer_transmission (void *cls,
314 rp); 406 rp);
315#endif 407#endif
316 GNUNET_assert (NULL != rp); 408 GNUNET_assert (NULL != rp);
317 msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL); 409 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
318 pp->pth = GSF_peer_transmit_ (pp->cp, 410 pp->pth = GSF_peer_transmit_ (pp->cp,
319 GNUNET_YES, 411 GNUNET_YES,
320 rp->priority, 412 rp->priority,
@@ -327,6 +419,79 @@ schedule_peer_transmission (void *cls,
327 419
328 420
329/** 421/**
422 * Closure for 'merge_pr'.
423 */
424struct MergeContext
425{
426
427 struct GSF_PendingRequest *pr;
428
429 int merged;
430
431};
432
433
434/**
435 * Iterator that checks if an equivalent request is already
436 * present for this peer.
437 *
438 * @param cls closure
439 * @param node internal node of the heap (ignored)
440 * @param element request plan stored at the node
441 * @param cost cost associated with the node (ignored)
442 * @return GNUNET_YES if we should continue to iterate,
443 * GNUNET_NO if not (merge success)
444 */
445static int
446merge_pr (void *cls,
447 struct GNUNET_CONTAINER_HeapNode *node,
448 void *element,
449 GNUNET_CONTAINER_HeapCostType cost)
450{
451 struct MergeContext *mpr = cls;
452 struct GSF_RequestPlan *rp = element;
453 struct GSF_PendingRequestData *prd;
454 struct GSF_RequestPlanReference *rpr;
455 struct PendingRequestList *prl;
456 struct GSF_PendingRequest *latest;
457
458 if (GNUNET_OK !=
459 GSF_pending_request_is_compatible_ (mpr->pr,
460 rp->prl_head->pr))
461 return GNUNET_YES;
462 /* merge new request with existing request plan */
463 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
464 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
465 rpr->rp = rp;
466 rpr->prl = prl;
467 prl->rpr = rpr;
468 prl->pr = mpr->pr;
469 prd = GSF_pending_request_get_data_ (mpr->pr);
470 GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
471 prd->rpr_tail,
472 rpr);
473 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
474 rp->prl_tail,
475 prl);
476 mpr->merged = GNUNET_YES;
477 GNUNET_STATISTICS_update (GSF_stats,
478 gettext_noop ("# requests merged"),
479 1,
480 GNUNET_NO);
481 latest = get_latest (rp);
482 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value)
483 {
484 GNUNET_STATISTICS_update (GSF_stats,
485 gettext_noop ("# requests refreshed"),
486 1,
487 GNUNET_NO);
488 rp->transmission_counter = 0; /* reset */
489 }
490 return GNUNET_NO;
491}
492
493
494/**
330 * Create a new query plan entry. 495 * Create a new query plan entry.
331 * 496 *
332 * @param cp peer with the entry 497 * @param cp peer with the entry
@@ -340,7 +505,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
340 struct PeerPlan *pp; 505 struct PeerPlan *pp;
341 struct GSF_PendingRequestData *prd; 506 struct GSF_PendingRequestData *prd;
342 struct GSF_RequestPlan *rp; 507 struct GSF_RequestPlan *rp;
343 unsigned int i; 508 struct GSF_RequestPlanReference *rpr;
509 struct PendingRequestList *prl;
510 struct MergeContext mpc;
344 size_t msize; 511 size_t msize;
345 512
346 GNUNET_assert (NULL != cp); 513 GNUNET_assert (NULL != cp);
@@ -359,52 +526,39 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
359 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 526 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
360 } 527 }
361 msize = GSF_pending_request_get_message_ (pr, 0, NULL); 528 msize = GSF_pending_request_get_message_ (pr, 0, NULL);
362 prd = GSF_pending_request_get_data_ (pr); 529 mpc.merged = GNUNET_NO;
363 for (rp = prd->rp_head; NULL != rp; rp = rp->next) 530 mpc.pr = pr;
364 { 531 GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
365 char mbuf[msize]; 532 if (mpc.merged != GNUNET_NO)
366 char xbuf[msize]; 533 return;
367 534 GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
368 GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf)); 535 if (mpc.merged != GNUNET_NO)
369 if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) && 536 return;
370 (0 == memcmp (xbuf, mbuf, msize)) )
371 {
372 /* add request to existing plan */
373 GNUNET_array_append (rp->prs,
374 rp->prs_length,
375 pr);
376 for (i=0;i<rp->prs_length;i++)
377 if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value)
378 {
379 GNUNET_STATISTICS_update (GSF_stats,
380 gettext_noop ("# requests refreshed"),
381 1,
382 GNUNET_NO);
383 rp->transmission_counter = 0; /* reset */
384 break;
385 }
386 return;
387 }
388 }
389 plan_count++; 537 plan_count++;
390 GNUNET_STATISTICS_update (GSF_stats, 538 GNUNET_STATISTICS_update (GSF_stats,
391 gettext_noop ("# query plan entries"), 539 gettext_noop ("# query plan entries"),
392 1, 540 1,
393 GNUNET_NO); 541 GNUNET_NO);
394 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); 542 prd = GSF_pending_request_get_data_ (pr);
395#if DEBUG_FS 543#if DEBUG_FS
396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
397 "Planning transmission of query `%s' to peer `%s' (%p)\n", 545 "Planning transmission of query `%s' to peer `%s'\n",
398 GNUNET_h2s (&prd->query), 546 GNUNET_h2s (&prd->query),
399 GNUNET_i2s (&id), 547 GNUNET_i2s (&id));
400 rp);
401#endif 548#endif
402 GNUNET_array_append (rp->prs, 549 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
403 rp->prs_length, 550 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
404 pr); 551 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
405 GNUNET_CONTAINER_DLL_insert (prd->rp_head, 552 rpr->rp = rp;
406 prd->rp_tail, 553 rpr->prl = prl;
407 rp); 554 prl->rpr = rpr;
555 prl->pr = pr;
556 GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
557 prd->rpr_tail,
558 rpr);
559 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
560 rp->prl_tail,
561 prl);
408 plan (pp, rp); 562 plan (pp, rp);
409} 563}
410 564
@@ -422,7 +576,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
422 struct PeerPlan *pp; 576 struct PeerPlan *pp;
423 struct GSF_RequestPlan *rp; 577 struct GSF_RequestPlan *rp;
424 struct GSF_PendingRequestData *prd; 578 struct GSF_PendingRequestData *prd;
425 unsigned int i; 579 struct PendingRequestList *prl;
426 580
427 GSF_connected_peer_get_identity_ (cp, &id); 581 GSF_connected_peer_get_identity_ (cp, &id);
428 pp = GNUNET_CONTAINER_multihashmap_get (plans, 582 pp = GNUNET_CONTAINER_multihashmap_get (plans,
@@ -442,29 +596,35 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
442 } 596 }
443 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 597 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
444 { 598 {
445 for (i=0;i<rp->prs_length;i++) 599 while (NULL != (prl = rp->prl_head))
446 { 600 {
447 prd = GSF_pending_request_get_data_ (rp->prs[i]); 601 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
448 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 602 rp->prl_tail,
449 prd->rp_tail, 603 prl);
450 rp); 604 prd = GSF_pending_request_get_data_ (prl->pr);
605 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
606 prd->rpr_tail,
607 prl->rpr);
608 GNUNET_free (prl->rpr);
609 GNUNET_free (prl);
451 } 610 }
452 plan_count--;
453 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
454 GNUNET_free (rp); 611 GNUNET_free (rp);
455 } 612 }
456 GNUNET_CONTAINER_heap_destroy (pp->priority_heap); 613 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
457 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) 614 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
458 { 615 {
459 for (i=0;i<rp->prs_length;i++) 616 while (NULL != (prl = rp->prl_head))
460 { 617 {
461 prd = GSF_pending_request_get_data_ (rp->prs[i]); 618 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
462 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 619 rp->prl_tail,
463 prd->rp_tail, 620 prl);
464 rp); 621 prd = GSF_pending_request_get_data_ (prl->pr);
622 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
623 prd->rpr_tail,
624 prl->rpr);
625 GNUNET_free (prl->rpr);
626 GNUNET_free (prl);
465 } 627 }
466 plan_count--;
467 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
468 GNUNET_free (rp); 628 GNUNET_free (rp);
469 } 629 }
470 GNUNET_STATISTICS_set (GSF_stats, 630 GNUNET_STATISTICS_set (GSF_stats,
@@ -488,28 +648,25 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
488{ 648{
489 struct GSF_RequestPlan *rp; 649 struct GSF_RequestPlan *rp;
490 struct GSF_PendingRequestData *prd; 650 struct GSF_PendingRequestData *prd;
491 unsigned int i; 651 struct GSF_RequestPlanReference *rpr;
492 652
493 prd = GSF_pending_request_get_data_ (pr); 653 prd = GSF_pending_request_get_data_ (pr);
494 while (NULL != (rp = prd->rp_head)) 654 while (NULL != (rpr = prd->rpr_head))
495 { 655 {
496 for (i=0;i<rp->prs_length;i++) 656 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
657 prd->rpr_tail,
658 rpr);
659 rp = rpr->rp;
660 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
661 rp->prl_tail,
662 rpr->prl);
663 GNUNET_free (rpr->prl);
664 GNUNET_free (rpr);
665 if (rp->prl_head == 0)
497 { 666 {
498 if (rp->prs[i] == pr) 667 GNUNET_CONTAINER_heap_remove_node (rp->hn);
499 { 668 plan_count--;
500 rp->prs[i] = rp->prs[rp->prs_length - 1]; 669 GNUNET_free (rp);
501 GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1);
502 if (rp->prs_length == 0)
503 {
504 GNUNET_CONTAINER_heap_remove_node (rp->hn);
505 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
506 prd->rp_tail,
507 rp);
508 plan_count--;
509 GNUNET_free (rp);
510 break;
511 }
512 }
513 } 670 }
514 } 671 }
515 GNUNET_STATISTICS_set (GSF_stats, 672 GNUNET_STATISTICS_set (GSF_stats,