aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-26 16:27:05 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-26 16:27:05 +0000
commitbf4a9d8364675b34ac18d505e508006e2b773670 (patch)
treea3bf1d6965c9f7fa4215d2377949f950b71ba087
parent661869cea600301846e9d78475c8c3b4dbde225a (diff)
downloadgnunet-bf4a9d8364675b34ac18d505e508006e2b773670.tar.gz
gnunet-bf4a9d8364675b34ac18d505e508006e2b773670.zip
fixing fs
-rw-r--r--src/fs/gnunet-service-fs.c2
-rw-r--r--src/fs/gnunet-service-fs.h6
-rw-r--r--src/fs/gnunet-service-fs_cp.c4
-rw-r--r--src/fs/gnunet-service-fs_lc.c4
-rw-r--r--src/fs/gnunet-service-fs_pe.c319
-rw-r--r--src/fs/gnunet-service-fs_pr.c68
-rw-r--r--src/fs/gnunet-service-fs_pr.h22
7 files changed, 334 insertions, 91 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index a52e06c02..5ea1bb7c1 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -347,7 +347,7 @@ start_p2p_processing (void *cls,
347 return; /* we're done, 'pr' was already destroyed... */ 347 return; /* we're done, 'pr' was already destroyed... */
348 if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) 348 if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) )
349 { 349 {
350 GSF_pending_request_cancel_ (pr); 350 GSF_pending_request_cancel_ (pr, GNUNET_YES);
351 return; 351 return;
352 } 352 }
353 GSF_dht_lookup_ (pr); 353 GSF_dht_lookup_ (pr);
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h
index ed7cd2e7b..bee814318 100644
--- a/src/fs/gnunet-service-fs.h
+++ b/src/fs/gnunet-service-fs.h
@@ -88,6 +88,12 @@ struct GSF_LocalClient;
88struct GSF_RequestPlan; 88struct GSF_RequestPlan;
89 89
90/** 90/**
91 * DLL of request plans a particular pending request is
92 * involved with.
93 */
94struct GSF_RequestPlanReference;
95
96/**
91 * Our connection to the datastore. 97 * Our connection to the datastore.
92 */ 98 */
93extern struct GNUNET_DATASTORE_Handle *GSF_dsh; 99extern struct GNUNET_DATASTORE_Handle *GSF_dsh;
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index f09516693..ea8a84dfe 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -811,7 +811,7 @@ cancel_pending_request (void *cls,
811 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 811 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
812 &prd->query, 812 &prd->query,
813 peerreq)); 813 peerreq));
814 GSF_pending_request_cancel_ (pr); 814 GSF_pending_request_cancel_ (pr, GNUNET_NO);
815 GNUNET_free (peerreq); 815 GNUNET_free (peerreq);
816 return GNUNET_OK; 816 return GNUNET_OK;
817} 817}
@@ -1368,7 +1368,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1368 -1, 1368 -1,
1369 GNUNET_NO); 1369 GNUNET_NO);
1370 priority += prd->priority; 1370 priority += prd->priority;
1371 GSF_pending_request_cancel_ (pr); 1371 GSF_pending_request_cancel_ (pr, GNUNET_YES);
1372 GNUNET_assert (GNUNET_YES == 1372 GNUNET_assert (GNUNET_YES ==
1373 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 1373 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1374 &gm->query, 1374 &gm->query,
diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c
index f56ea5116..18f5c10dc 100644
--- a/src/fs/gnunet-service-fs_lc.c
+++ b/src/fs/gnunet-service-fs_lc.c
@@ -202,7 +202,7 @@ client_request_destroy (void *cls,
202 GNUNET_CONTAINER_DLL_remove (lc->cr_head, 202 GNUNET_CONTAINER_DLL_remove (lc->cr_head,
203 lc->cr_tail, 203 lc->cr_tail,
204 cr); 204 cr);
205 GSF_pending_request_cancel_ (cr->pr); 205 GSF_pending_request_cancel_ (cr->pr, GNUNET_NO);
206 GNUNET_STATISTICS_update (GSF_stats, 206 GNUNET_STATISTICS_update (GSF_stats,
207 gettext_noop ("# client searches active"), 207 gettext_noop ("# client searches active"),
208 - 1, 208 - 1,
@@ -514,7 +514,7 @@ GSF_client_disconnect_handler_ (void *cls,
514 GNUNET_CONTAINER_DLL_remove (pos->cr_head, 514 GNUNET_CONTAINER_DLL_remove (pos->cr_head,
515 pos->cr_tail, 515 pos->cr_tail,
516 cr); 516 cr);
517 GSF_pending_request_cancel_ (cr->pr); 517 GSF_pending_request_cancel_ (cr->pr, GNUNET_NO);
518 GNUNET_STATISTICS_update (GSF_stats, 518 GNUNET_STATISTICS_update (GSF_stats,
519 gettext_noop ("# client searches active"), 519 gettext_noop ("# client searches active"),
520 - 1, 520 - 1,
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 83733ef8d..4dd54c88e 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -31,6 +31,72 @@
31 31
32 32
33/** 33/**
34 * List of GSF_PendingRequests this request plan
35 * participates with.
36 */
37struct PendingRequestList;
38
39
40/**
41 * DLL of request plans a particular pending request is
42 * involved with.
43 */
44struct GSF_RequestPlanReference
45{
46
47 /**
48 * This is a doubly-linked list.
49 */
50 struct GSF_RequestPlanReference *next;
51
52 /**
53 * This is a doubly-linked list.
54 */
55 struct GSF_RequestPlanReference *prev;
56
57 /**
58 * Associated request plan.
59 */
60 struct GSF_RequestPlan *rp;
61
62 /**
63 * Corresponding PendingRequestList.
64 */
65 struct PendingRequestList *prl;
66};
67
68
69/**
70 * List of GSF_PendingRequests this request plan
71 * participates with.
72 */
73struct PendingRequestList
74{
75
76 /**
77 * This is a doubly-linked list.
78 */
79 struct PendingRequestList *next;
80
81 /**
82 * This is a doubly-linked list.
83 */
84 struct PendingRequestList *prev;
85
86 /**
87 * Array of associated pending requests.
88 */
89 struct GSF_PendingRequest *pr;
90
91 /**
92 * Corresponding GSF_RequestPlanReference.
93 */
94 struct GSF_RequestPlanReference *rpr;
95
96};
97
98
99/**
34 * Information we keep per request per peer. This is a doubly-linked 100 * Information we keep per request per peer. This is a doubly-linked
35 * list (with head and tail in the 'struct GSF_PendingRequestData') 101 * list (with head and tail in the 'struct GSF_PendingRequestData')
36 * with one entry in each heap of each 'struct PeerPlan'. Each 102 * with one entry in each heap of each 'struct PeerPlan'. Each
@@ -55,9 +121,14 @@ struct GSF_RequestPlan
55 struct GNUNET_CONTAINER_HeapNode *hn; 121 struct GNUNET_CONTAINER_HeapNode *hn;
56 122
57 /** 123 /**
58 * Array of associated pending requests. 124 * Head of list of associated pending requests.
59 */ 125 */
60 struct GSF_PendingRequest **prs; 126 struct PendingRequestList *prl_head;
127
128 /**
129 * Tail of list of associated pending requests.
130 */
131 struct PendingRequestList *prl_tail;
61 132
62 /** 133 /**
63 * Earliest time we'd be happy to (re)transmit this request. 134 * Earliest time we'd be happy to (re)transmit this request.
@@ -70,11 +141,6 @@ struct GSF_RequestPlan
70 struct GNUNET_TIME_Absolute last_transmission; 141 struct GNUNET_TIME_Absolute last_transmission;
71 142
72 /** 143 /**
73 * Number of entries in 'prs'.
74 */
75 unsigned int prs_length;
76
77 /**
78 * Current priority for this request for this target. 144 * Current priority for this request for this target.
79 */ 145 */
80 uint64_t priority; 146 uint64_t priority;
@@ -163,7 +229,7 @@ plan (struct PeerPlan *pp,
163 gettext_noop ("# average retransmission delay (ms)"), 229 gettext_noop ("# average retransmission delay (ms)"),
164 total_delay * 1000LL / plan_count, 230 total_delay * 1000LL / plan_count,
165 GNUNET_NO); 231 GNUNET_NO);
166 prd = GSF_pending_request_get_data_ (rp->prs[0]); 232 prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
167 // FIXME: calculate 'rp->priority'! 233 // FIXME: calculate 'rp->priority'!
168 if (rp->transmission_counter < 32) 234 if (rp->transmission_counter < 32)
169 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 235 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -196,6 +262,32 @@ plan (struct PeerPlan *pp,
196 262
197 263
198/** 264/**
265 * Get the pending request with the highest TTL from the given plan.
266 *
267 * @param rp plan to investigate
268 * @return pending request with highest TTL
269 */
270struct GSF_PendingRequest *
271get_latest (const struct GSF_RequestPlan *rp)
272{
273 struct GSF_PendingRequest *ret;
274 struct PendingRequestList *prl;
275
276 prl = rp->prl_head;
277 ret = prl->pr;
278 prl = prl->next;
279 while (NULL != prl)
280 {
281 if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
282 GSF_pending_request_get_data_ (ret)->ttl.abs_value)
283 ret = prl->pr;
284 prl = prl->next;
285 }
286 return ret;
287}
288
289
290/**
199 * Function called to get a message for transmission. 291 * Function called to get a message for transmission.
200 * 292 *
201 * @param cls closure 293 * @param cls closure
@@ -225,7 +317,7 @@ transmit_message_callback (void *cls,
225 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 317 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
226 return 0; 318 return 0;
227 } 319 }
228 msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf); 320 msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
229 if (msize > buf_size) 321 if (msize > buf_size)
230 { 322 {
231 /* buffer to small (message changed), try again */ 323 /* buffer to small (message changed), try again */
@@ -314,7 +406,7 @@ schedule_peer_transmission (void *cls,
314 rp); 406 rp);
315#endif 407#endif
316 GNUNET_assert (NULL != rp); 408 GNUNET_assert (NULL != rp);
317 msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL); 409 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
318 pp->pth = GSF_peer_transmit_ (pp->cp, 410 pp->pth = GSF_peer_transmit_ (pp->cp,
319 GNUNET_YES, 411 GNUNET_YES,
320 rp->priority, 412 rp->priority,
@@ -327,6 +419,79 @@ schedule_peer_transmission (void *cls,
327 419
328 420
329/** 421/**
422 * Closure for 'merge_pr'.
423 */
424struct MergeContext
425{
426
427 struct GSF_PendingRequest *pr;
428
429 int merged;
430
431};
432
433
434/**
435 * Iterator that checks if an equivalent request is already
436 * present for this peer.
437 *
438 * @param cls closure
439 * @param node internal node of the heap (ignored)
440 * @param element request plan stored at the node
441 * @param cost cost associated with the node (ignored)
442 * @return GNUNET_YES if we should continue to iterate,
443 * GNUNET_NO if not (merge success)
444 */
445static int
446merge_pr (void *cls,
447 struct GNUNET_CONTAINER_HeapNode *node,
448 void *element,
449 GNUNET_CONTAINER_HeapCostType cost)
450{
451 struct MergeContext *mpr = cls;
452 struct GSF_RequestPlan *rp = element;
453 struct GSF_PendingRequestData *prd;
454 struct GSF_RequestPlanReference *rpr;
455 struct PendingRequestList *prl;
456 struct GSF_PendingRequest *latest;
457
458 if (GNUNET_OK !=
459 GSF_pending_request_is_compatible_ (mpr->pr,
460 rp->prl_head->pr))
461 return GNUNET_YES;
462 /* merge new request with existing request plan */
463 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
464 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
465 rpr->rp = rp;
466 rpr->prl = prl;
467 prl->rpr = rpr;
468 prl->pr = mpr->pr;
469 prd = GSF_pending_request_get_data_ (mpr->pr);
470 GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
471 prd->rpr_tail,
472 rpr);
473 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
474 rp->prl_tail,
475 prl);
476 mpr->merged = GNUNET_YES;
477 GNUNET_STATISTICS_update (GSF_stats,
478 gettext_noop ("# requests merged"),
479 1,
480 GNUNET_NO);
481 latest = get_latest (rp);
482 if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value)
483 {
484 GNUNET_STATISTICS_update (GSF_stats,
485 gettext_noop ("# requests refreshed"),
486 1,
487 GNUNET_NO);
488 rp->transmission_counter = 0; /* reset */
489 }
490 return GNUNET_NO;
491}
492
493
494/**
330 * Create a new query plan entry. 495 * Create a new query plan entry.
331 * 496 *
332 * @param cp peer with the entry 497 * @param cp peer with the entry
@@ -340,7 +505,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
340 struct PeerPlan *pp; 505 struct PeerPlan *pp;
341 struct GSF_PendingRequestData *prd; 506 struct GSF_PendingRequestData *prd;
342 struct GSF_RequestPlan *rp; 507 struct GSF_RequestPlan *rp;
343 unsigned int i; 508 struct GSF_RequestPlanReference *rpr;
509 struct PendingRequestList *prl;
510 struct MergeContext mpc;
344 size_t msize; 511 size_t msize;
345 512
346 GNUNET_assert (NULL != cp); 513 GNUNET_assert (NULL != cp);
@@ -359,52 +526,39 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
359 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 526 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
360 } 527 }
361 msize = GSF_pending_request_get_message_ (pr, 0, NULL); 528 msize = GSF_pending_request_get_message_ (pr, 0, NULL);
362 prd = GSF_pending_request_get_data_ (pr); 529 mpc.merged = GNUNET_NO;
363 for (rp = prd->rp_head; NULL != rp; rp = rp->next) 530 mpc.pr = pr;
364 { 531 GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
365 char mbuf[msize]; 532 if (mpc.merged != GNUNET_NO)
366 char xbuf[msize]; 533 return;
367 534 GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
368 GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf)); 535 if (mpc.merged != GNUNET_NO)
369 if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) && 536 return;
370 (0 == memcmp (xbuf, mbuf, msize)) )
371 {
372 /* add request to existing plan */
373 GNUNET_array_append (rp->prs,
374 rp->prs_length,
375 pr);
376 for (i=0;i<rp->prs_length;i++)
377 if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value)
378 {
379 GNUNET_STATISTICS_update (GSF_stats,
380 gettext_noop ("# requests refreshed"),
381 1,
382 GNUNET_NO);
383 rp->transmission_counter = 0; /* reset */
384 break;
385 }
386 return;
387 }
388 }
389 plan_count++; 537 plan_count++;
390 GNUNET_STATISTICS_update (GSF_stats, 538 GNUNET_STATISTICS_update (GSF_stats,
391 gettext_noop ("# query plan entries"), 539 gettext_noop ("# query plan entries"),
392 1, 540 1,
393 GNUNET_NO); 541 GNUNET_NO);
394 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); 542 prd = GSF_pending_request_get_data_ (pr);
395#if DEBUG_FS 543#if DEBUG_FS
396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
397 "Planning transmission of query `%s' to peer `%s' (%p)\n", 545 "Planning transmission of query `%s' to peer `%s'\n",
398 GNUNET_h2s (&prd->query), 546 GNUNET_h2s (&prd->query),
399 GNUNET_i2s (&id), 547 GNUNET_i2s (&id));
400 rp);
401#endif 548#endif
402 GNUNET_array_append (rp->prs, 549 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
403 rp->prs_length, 550 rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
404 pr); 551 prl = GNUNET_malloc (sizeof (struct PendingRequestList));
405 GNUNET_CONTAINER_DLL_insert (prd->rp_head, 552 rpr->rp = rp;
406 prd->rp_tail, 553 rpr->prl = prl;
407 rp); 554 prl->rpr = rpr;
555 prl->pr = pr;
556 GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
557 prd->rpr_tail,
558 rpr);
559 GNUNET_CONTAINER_DLL_insert (rp->prl_head,
560 rp->prl_tail,
561 prl);
408 plan (pp, rp); 562 plan (pp, rp);
409} 563}
410 564
@@ -422,7 +576,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
422 struct PeerPlan *pp; 576 struct PeerPlan *pp;
423 struct GSF_RequestPlan *rp; 577 struct GSF_RequestPlan *rp;
424 struct GSF_PendingRequestData *prd; 578 struct GSF_PendingRequestData *prd;
425 unsigned int i; 579 struct PendingRequestList *prl;
426 580
427 GSF_connected_peer_get_identity_ (cp, &id); 581 GSF_connected_peer_get_identity_ (cp, &id);
428 pp = GNUNET_CONTAINER_multihashmap_get (plans, 582 pp = GNUNET_CONTAINER_multihashmap_get (plans,
@@ -442,29 +596,35 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
442 } 596 }
443 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 597 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
444 { 598 {
445 for (i=0;i<rp->prs_length;i++) 599 while (NULL != (prl = rp->prl_head))
446 { 600 {
447 prd = GSF_pending_request_get_data_ (rp->prs[i]); 601 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
448 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 602 rp->prl_tail,
449 prd->rp_tail, 603 prl);
450 rp); 604 prd = GSF_pending_request_get_data_ (prl->pr);
605 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
606 prd->rpr_tail,
607 prl->rpr);
608 GNUNET_free (prl->rpr);
609 GNUNET_free (prl);
451 } 610 }
452 plan_count--;
453 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
454 GNUNET_free (rp); 611 GNUNET_free (rp);
455 } 612 }
456 GNUNET_CONTAINER_heap_destroy (pp->priority_heap); 613 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
457 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) 614 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
458 { 615 {
459 for (i=0;i<rp->prs_length;i++) 616 while (NULL != (prl = rp->prl_head))
460 { 617 {
461 prd = GSF_pending_request_get_data_ (rp->prs[i]); 618 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
462 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 619 rp->prl_tail,
463 prd->rp_tail, 620 prl);
464 rp); 621 prd = GSF_pending_request_get_data_ (prl->pr);
622 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
623 prd->rpr_tail,
624 prl->rpr);
625 GNUNET_free (prl->rpr);
626 GNUNET_free (prl);
465 } 627 }
466 plan_count--;
467 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
468 GNUNET_free (rp); 628 GNUNET_free (rp);
469 } 629 }
470 GNUNET_STATISTICS_set (GSF_stats, 630 GNUNET_STATISTICS_set (GSF_stats,
@@ -488,28 +648,25 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
488{ 648{
489 struct GSF_RequestPlan *rp; 649 struct GSF_RequestPlan *rp;
490 struct GSF_PendingRequestData *prd; 650 struct GSF_PendingRequestData *prd;
491 unsigned int i; 651 struct GSF_RequestPlanReference *rpr;
492 652
493 prd = GSF_pending_request_get_data_ (pr); 653 prd = GSF_pending_request_get_data_ (pr);
494 while (NULL != (rp = prd->rp_head)) 654 while (NULL != (rpr = prd->rpr_head))
495 { 655 {
496 for (i=0;i<rp->prs_length;i++) 656 GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
657 prd->rpr_tail,
658 rpr);
659 rp = rpr->rp;
660 GNUNET_CONTAINER_DLL_remove (rp->prl_head,
661 rp->prl_tail,
662 rpr->prl);
663 GNUNET_free (rpr->prl);
664 GNUNET_free (rpr);
665 if (rp->prl_head == 0)
497 { 666 {
498 if (rp->prs[i] == pr) 667 GNUNET_CONTAINER_heap_remove_node (rp->hn);
499 { 668 plan_count--;
500 rp->prs[i] = rp->prs[rp->prs_length - 1]; 669 GNUNET_free (rp);
501 GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1);
502 if (rp->prs_length == 0)
503 {
504 GNUNET_CONTAINER_heap_remove_node (rp->hn);
505 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
506 prd->rp_tail,
507 rp);
508 plan_count--;
509 GNUNET_free (rp);
510 break;
511 }
512 }
513 } 670 }
514 } 671 }
515 GNUNET_STATISTICS_set (GSF_stats, 672 GNUNET_STATISTICS_set (GSF_stats,
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 37865d913..4db3505dd 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -385,7 +385,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
385 GNUNET_TIME_UNIT_FOREVER_ABS, 385 GNUNET_TIME_UNIT_FOREVER_ABS,
386 GNUNET_BLOCK_TYPE_ANY, 386 GNUNET_BLOCK_TYPE_ANY,
387 NULL, 0); 387 NULL, 0);
388 GSF_pending_request_cancel_ (dpr); 388 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
389 } 389 }
390 } 390 }
391 GNUNET_STATISTICS_update (GSF_stats, 391 GNUNET_STATISTICS_update (GSF_stats,
@@ -410,6 +410,33 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
410 410
411 411
412/** 412/**
413 * Test if two pending requests are compatible (would generate
414 * the same query modulo filters and should thus be processed
415 * jointly).
416 *
417 * @param pra a pending request
418 * @param pra another pending request
419 * @return GNUNET_OK if the requests are compatible
420 */
421int
422GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
423 struct GSF_PendingRequest *prb)
424{
425 if ( (pra->public_data.type != prb->public_data.type) ||
426 (0 != memcmp (&pra->public_data.query,
427 &prb->public_data.query,
428 sizeof (GNUNET_HashCode))) ||
429 ( (pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
430 (0 != memcmp (&pra->public_data.namespace,
431 &prb->public_data.namespace,
432 sizeof (GNUNET_HashCode))) ) )
433 return GNUNET_NO;
434 return GNUNET_OK;
435}
436
437
438
439/**
413 * Update a given pending request with additional replies 440 * Update a given pending request with additional replies
414 * that have been seen. 441 * that have been seen.
415 * 442 *
@@ -646,12 +673,47 @@ clean_request (void *cls,
646 * Explicitly cancel a pending request. 673 * Explicitly cancel a pending request.
647 * 674 *
648 * @param pr request to cancel 675 * @param pr request to cancel
676 * @param full_cleanup fully purge the request
649 */ 677 */
650void 678void
651GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) 679GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
680 int full_cleanup)
652{ 681{
682 GSF_LocalLookupContinuation cont;
683
653 if (NULL == pr_map) 684 if (NULL == pr_map)
654 return; /* already cleaned up! */ 685 return; /* already cleaned up! */
686 if (GNUNET_YES != full_cleanup)
687 {
688 /* make request inactive (we're no longer interested in more results),
689 but do NOT remove from our data-structures, we still need it there
690 to prevent the request from looping */
691 pr->rh = NULL;
692 if (NULL != (cont = pr->llc_cont))
693 {
694 pr->llc_cont = NULL;
695 cont (pr->llc_cont_cls,
696 pr,
697 pr->local_result);
698 }
699 GSF_plan_notify_request_done_ (pr);
700 if (NULL != pr->qe)
701 {
702 GNUNET_DATASTORE_cancel (pr->qe);
703 pr->qe = NULL;
704 }
705 if (NULL != pr->gh)
706 {
707 GNUNET_DHT_get_stop (pr->gh);
708 pr->gh = NULL;
709 }
710 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
711 {
712 GNUNET_SCHEDULER_cancel (pr->warn_task);
713 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
714 }
715 return;
716 }
655 GNUNET_assert (GNUNET_YES == 717 GNUNET_assert (GNUNET_YES ==
656 clean_request (NULL, &pr->public_data.query, pr)); 718 clean_request (NULL, &pr->public_data.query, pr));
657} 719}
@@ -763,6 +825,8 @@ process_reply (void *cls,
763 struct GSF_PendingRequest *pr = value; 825 struct GSF_PendingRequest *pr = value;
764 GNUNET_HashCode chash; 826 GNUNET_HashCode chash;
765 827
828 if (NULL == pr->rh)
829 return GNUNET_YES;
766#if DEBUG_FS 830#if DEBUG_FS
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 831 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768 "Matched result (type %u) for query `%s' with pending request\n", 832 "Matched result (type %u) for query `%s' with pending request\n",
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index 2f828e281..1e71aa7ee 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -94,12 +94,12 @@ struct GSF_PendingRequestData
94 /** 94 /**
95 * Fields for the plan module to track a DLL with the request. 95 * Fields for the plan module to track a DLL with the request.
96 */ 96 */
97 struct GSF_RequestPlan *rp_head; 97 struct GSF_RequestPlanReference *rpr_head;
98 98
99 /** 99 /**
100 * Fields for the plan module to track a DLL with the request. 100 * Fields for the plan module to track a DLL with the request.
101 */ 101 */
102 struct GSF_RequestPlan *rp_tail; 102 struct GSF_RequestPlanReference *rpr_tail;
103 103
104 /** 104 /**
105 * Current TTL for the request. 105 * Current TTL for the request.
@@ -242,6 +242,20 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
242 242
243 243
244/** 244/**
245 * Test if two pending requests are compatible (would generate
246 * the same query modulo filters and should thus be processed
247 * jointly).
248 *
249 * @param pra a pending request
250 * @param pra another pending request
251 * @return GNUNET_OK if the requests are compatible
252 */
253int
254GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
255 struct GSF_PendingRequest *prb);
256
257
258/**
245 * Generate the message corresponding to the given pending request for 259 * Generate the message corresponding to the given pending request for
246 * transmission to other peers (or at least determine its size). 260 * transmission to other peers (or at least determine its size).
247 * 261 *
@@ -260,9 +274,11 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
260 * Explicitly cancel a pending request. 274 * Explicitly cancel a pending request.
261 * 275 *
262 * @param pr request to cancel 276 * @param pr request to cancel
277 * @param full_cleanup fully purge the request
263 */ 278 */
264void 279void
265GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr); 280GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
281 int full_cleanup);
266 282
267 283
268/** 284/**