aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-25 15:38:48 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-25 15:38:48 +0000
commit68b9a6e898f2f2f48006311adfcb7d71055d9c7c (patch)
tree9f70c851e42741f201199f86d7981d7e9dd872c6
parentab9c4e9d3795f07a12e86717f0a635080728ef81 (diff)
downloadgnunet-68b9a6e898f2f2f48006311adfcb7d71055d9c7c.tar.gz
gnunet-68b9a6e898f2f2f48006311adfcb7d71055d9c7c.zip
deduplicate requests
-rw-r--r--src/fs/gnunet-service-fs_pe.c100
1 files changed, 79 insertions, 21 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index b52e04712..83733ef8d 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -55,9 +55,9 @@ struct GSF_RequestPlan
55 struct GNUNET_CONTAINER_HeapNode *hn; 55 struct GNUNET_CONTAINER_HeapNode *hn;
56 56
57 /** 57 /**
58 * Associated pending request. 58 * Array of associated pending requests.
59 */ 59 */
60 struct GSF_PendingRequest *pr; 60 struct GSF_PendingRequest **prs;
61 61
62 /** 62 /**
63 * Earliest time we'd be happy to (re)transmit this request. 63 * Earliest time we'd be happy to (re)transmit this request.
@@ -70,6 +70,11 @@ struct GSF_RequestPlan
70 struct GNUNET_TIME_Absolute last_transmission; 70 struct GNUNET_TIME_Absolute last_transmission;
71 71
72 /** 72 /**
73 * Number of entries in 'prs'.
74 */
75 unsigned int prs_length;
76
77 /**
73 * Current priority for this request for this target. 78 * Current priority for this request for this target.
74 */ 79 */
75 uint64_t priority; 80 uint64_t priority;
@@ -158,7 +163,7 @@ plan (struct PeerPlan *pp,
158 gettext_noop ("# average retransmission delay (ms)"), 163 gettext_noop ("# average retransmission delay (ms)"),
159 total_delay * 1000LL / plan_count, 164 total_delay * 1000LL / plan_count,
160 GNUNET_NO); 165 GNUNET_NO);
161 prd = GSF_pending_request_get_data_ (rp->pr); 166 prd = GSF_pending_request_get_data_ (rp->prs[0]);
162 // FIXME: calculate 'rp->priority'! 167 // FIXME: calculate 'rp->priority'!
163 if (rp->transmission_counter < 32) 168 if (rp->transmission_counter < 32)
164 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 169 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -220,7 +225,7 @@ transmit_message_callback (void *cls,
220 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); 225 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
221 return 0; 226 return 0;
222 } 227 }
223 msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); 228 msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf);
224 if (msize > buf_size) 229 if (msize > buf_size)
225 { 230 {
226 /* buffer to small (message changed), try again */ 231 /* buffer to small (message changed), try again */
@@ -309,7 +314,7 @@ schedule_peer_transmission (void *cls,
309 rp); 314 rp);
310#endif 315#endif
311 GNUNET_assert (NULL != rp); 316 GNUNET_assert (NULL != rp);
312 msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); 317 msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL);
313 pp->pth = GSF_peer_transmit_ (pp->cp, 318 pp->pth = GSF_peer_transmit_ (pp->cp,
314 GNUNET_YES, 319 GNUNET_YES,
315 rp->priority, 320 rp->priority,
@@ -335,6 +340,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
335 struct PeerPlan *pp; 340 struct PeerPlan *pp;
336 struct GSF_PendingRequestData *prd; 341 struct GSF_PendingRequestData *prd;
337 struct GSF_RequestPlan *rp; 342 struct GSF_RequestPlan *rp;
343 unsigned int i;
344 size_t msize;
338 345
339 GNUNET_assert (NULL != cp); 346 GNUNET_assert (NULL != cp);
340 GSF_connected_peer_get_identity_ (cp, &id); 347 GSF_connected_peer_get_identity_ (cp, &id);
@@ -351,12 +358,39 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
351 pp, 358 pp,
352 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 359 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
353 } 360 }
361 msize = GSF_pending_request_get_message_ (pr, 0, NULL);
354 prd = GSF_pending_request_get_data_ (pr); 362 prd = GSF_pending_request_get_data_ (pr);
363 for (rp = prd->rp_head; NULL != rp; rp = rp->next)
364 {
365 char mbuf[msize];
366 char xbuf[msize];
367
368 GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf));
369 if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) &&
370 (0 == memcmp (xbuf, mbuf, msize)) )
371 {
372 /* add request to existing plan */
373 GNUNET_array_append (rp->prs,
374 rp->prs_length,
375 pr);
376 for (i=0;i<rp->prs_length;i++)
377 if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value)
378 {
379 GNUNET_STATISTICS_update (GSF_stats,
380 gettext_noop ("# requests refreshed"),
381 1,
382 GNUNET_NO);
383 rp->transmission_counter = 0; /* reset */
384 break;
385 }
386 return;
387 }
388 }
355 plan_count++; 389 plan_count++;
356 GNUNET_STATISTICS_update (GSF_stats, 390 GNUNET_STATISTICS_update (GSF_stats,
357 gettext_noop ("# query plan entries"), 391 gettext_noop ("# query plan entries"),
358 1, 392 1,
359 GNUNET_NO); 393 GNUNET_NO);
360 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); 394 rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
361#if DEBUG_FS 395#if DEBUG_FS
362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -365,7 +399,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
365 GNUNET_i2s (&id), 399 GNUNET_i2s (&id),
366 rp); 400 rp);
367#endif 401#endif
368 rp->pr = pr; 402 GNUNET_array_append (rp->prs,
403 rp->prs_length,
404 pr);
369 GNUNET_CONTAINER_DLL_insert (prd->rp_head, 405 GNUNET_CONTAINER_DLL_insert (prd->rp_head,
370 prd->rp_tail, 406 prd->rp_tail,
371 rp); 407 rp);
@@ -386,6 +422,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
386 struct PeerPlan *pp; 422 struct PeerPlan *pp;
387 struct GSF_RequestPlan *rp; 423 struct GSF_RequestPlan *rp;
388 struct GSF_PendingRequestData *prd; 424 struct GSF_PendingRequestData *prd;
425 unsigned int i;
389 426
390 GSF_connected_peer_get_identity_ (cp, &id); 427 GSF_connected_peer_get_identity_ (cp, &id);
391 pp = GNUNET_CONTAINER_multihashmap_get (plans, 428 pp = GNUNET_CONTAINER_multihashmap_get (plans,
@@ -405,21 +442,29 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
405 } 442 }
406 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 443 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
407 { 444 {
408 prd = GSF_pending_request_get_data_ (rp->pr); 445 for (i=0;i<rp->prs_length;i++)
409 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 446 {
410 prd->rp_tail, 447 prd = GSF_pending_request_get_data_ (rp->prs[i]);
411 rp); 448 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
449 prd->rp_tail,
450 rp);
451 }
412 plan_count--; 452 plan_count--;
453 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
413 GNUNET_free (rp); 454 GNUNET_free (rp);
414 } 455 }
415 GNUNET_CONTAINER_heap_destroy (pp->priority_heap); 456 GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
416 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) 457 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
417 { 458 {
418 prd = GSF_pending_request_get_data_ (rp->pr); 459 for (i=0;i<rp->prs_length;i++)
419 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 460 {
420 prd->rp_tail, 461 prd = GSF_pending_request_get_data_ (rp->prs[i]);
421 rp); 462 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
463 prd->rp_tail,
464 rp);
465 }
422 plan_count--; 466 plan_count--;
467 GNUNET_array_grow (rp->prs, rp->prs_length, 0);
423 GNUNET_free (rp); 468 GNUNET_free (rp);
424 } 469 }
425 GNUNET_STATISTICS_set (GSF_stats, 470 GNUNET_STATISTICS_set (GSF_stats,
@@ -443,16 +488,29 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
443{ 488{
444 struct GSF_RequestPlan *rp; 489 struct GSF_RequestPlan *rp;
445 struct GSF_PendingRequestData *prd; 490 struct GSF_PendingRequestData *prd;
491 unsigned int i;
446 492
447 prd = GSF_pending_request_get_data_ (pr); 493 prd = GSF_pending_request_get_data_ (pr);
448 while (NULL != (rp = prd->rp_head)) 494 while (NULL != (rp = prd->rp_head))
449 { 495 {
450 GNUNET_CONTAINER_heap_remove_node (rp->hn); 496 for (i=0;i<rp->prs_length;i++)
451 GNUNET_CONTAINER_DLL_remove (prd->rp_head, 497 {
452 prd->rp_tail, 498 if (rp->prs[i] == pr)
453 rp); 499 {
454 plan_count--; 500 rp->prs[i] = rp->prs[rp->prs_length - 1];
455 GNUNET_free (rp); 501 GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1);
502 if (rp->prs_length == 0)
503 {
504 GNUNET_CONTAINER_heap_remove_node (rp->hn);
505 GNUNET_CONTAINER_DLL_remove (prd->rp_head,
506 prd->rp_tail,
507 rp);
508 plan_count--;
509 GNUNET_free (rp);
510 break;
511 }
512 }
513 }
456 } 514 }
457 GNUNET_STATISTICS_set (GSF_stats, 515 GNUNET_STATISTICS_set (GSF_stats,
458 gettext_noop ("# query plan entries"), 516 gettext_noop ("# query plan entries"),