aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-15 15:30:06 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-15 15:30:06 +0000
commitd5c5c5b962115be085a1cc71a0480d7e2a3eda73 (patch)
tree96bdd8c004a00556ac9df07393aecf17f723636e /src/fs/gnunet-service-fs_pr.c
parent0ad72d359ab3ea5f232f4a8a0eb04700e8c84b49 (diff)
downloadgnunet-d5c5c5b962115be085a1cc71a0480d7e2a3eda73.tar.gz
gnunet-d5c5c5b962115be085a1cc71a0480d7e2a3eda73.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c126
1 files changed, 50 insertions, 76 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 4dc00f54c..aca63ac94 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -58,6 +58,11 @@ struct GSF_PendingRequest
58 struct GNUNET_CONTAINER_BloomFilter *bf; 58 struct GNUNET_CONTAINER_BloomFilter *bf;
59 59
60 /** 60 /**
61 * Entry for this pending request in the expiration heap, or NULL.
62 */
63 struct GNUNET_CONTAINER_HeapNode *hnode;
64
65 /**
61 * Number of valid entries in the 'replies_seen' array. 66 * Number of valid entries in the 'replies_seen' array.
62 */ 67 */
63 unsigned int replies_seen_count; 68 unsigned int replies_seen_count;
@@ -70,7 +75,7 @@ struct GSF_PendingRequest
70 /** 75 /**
71 * Mingle value we currently use for the bf. 76 * Mingle value we currently use for the bf.
72 */ 77 */
73 int32_t mingle; 78 uint32_t mingle;
74 79
75}; 80};
76 81
@@ -158,8 +163,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
158 return GNUNET_NO; /* size not changed */ 163 return GNUNET_NO; /* size not changed */
159 if (pr->bf != NULL) 164 if (pr->bf != NULL)
160 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 165 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 166 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
162 UINT32_MAX); 167 UINT32_MAX);
163 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 168 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
164 nsize, 169 nsize,
165 BLOOMFILTER_K); 170 BLOOMFILTER_K);
@@ -202,7 +207,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
202 const struct GNUNET_PeerIdentity *target, 207 const struct GNUNET_PeerIdentity *target,
203 const char *bf_data, 208 const char *bf_data,
204 size_t bf_size, 209 size_t bf_size,
205 int32_t mingle, 210 uint32_t mingle,
206 uint32_t anonymity_level, 211 uint32_t anonymity_level,
207 uint32_t priority, 212 uint32_t priority,
208 int32_t ttl, 213 int32_t ttl,
@@ -212,7 +217,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
212 void *rh_cls) 217 void *rh_cls)
213{ 218{
214 struct GSF_PendingRequest *pr; 219 struct GSF_PendingRequest *pr;
215 220 struct GSF_PendingRequest *dpr;
216 221
217 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 222 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
218 pr->public_data.query = *query; 223 pr->public_data.query = *query;
@@ -228,6 +233,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
228 } 233 }
229 pr->public_data.anonymity_level = anonymity_data; 234 pr->public_data.anonymity_level = anonymity_data;
230 pr->public_data.priority = priority; 235 pr->public_data.priority = priority;
236 pr->public_data.original_priority = priority;
231 pr->public_data.options = options; 237 pr->public_data.options = options;
232 pr->public_data.type = type; 238 pr->public_data.type = type;
233 pr->public_data.start_time = GNUNET_TIME_absolute_get (); 239 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
@@ -265,25 +271,26 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
265 query, 271 query,
266 pr, 272 pr,
267 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 273 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
268 // FIXME: if not a local query, we also need to track the 274 if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
269 // total number of external queries we currently have and
270 // bound it => need an additional heap!
271
272 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
273 pr,
274 pr->start_time.abs_value + pr->ttl);
275
276
277
278 /* make sure we don't track too many requests */
279 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
280 { 275 {
281 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); 276 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
282 GNUNET_assert (pr != NULL); 277 pr,
283 destroy_pending_request (pr); 278 pr->ttl.abs_value);
279 /* make sure we don't track too many requests */
280 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
281 {
282 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
283 GNUNET_assert (dpr != NULL);
284 if (pr == dpr)
285 break; /* let the request live briefly... */
286 dpr->rh (dpr->rh_cls,
287 dpr,
288 GNUNET_TIME_UNIT_FOREVER_ABS,
289 NULL, 0,
290 GNUNET_SYSERR);
291 GSF_pending_request_cancel_ (dpr);
292 }
284 } 293 }
285
286
287 return pr; 294 return pr;
288} 295}
289 296
@@ -348,8 +355,8 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
348 { 355 {
349 /* we're not the initiator, but the initiator did not give us 356 /* we're not the initiator, but the initiator did not give us
350 any bloom-filter, so we need to create one on-the-fly */ 357 any bloom-filter, so we need to create one on-the-fly */
351 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 358 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
352 UINT32_MAX); 359 UINT32_MAX);
353 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count), 360 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
354 pr->mingle, 361 pr->mingle,
355 BLOOMFILTER_K); 362 BLOOMFILTER_K);
@@ -464,6 +471,9 @@ clean_request (void *cls,
464 GNUNET_free_non_null (pr->replies_seen); 471 GNUNET_free_non_null (pr->replies_seen);
465 if (NULL != pr->bf) 472 if (NULL != pr->bf)
466 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 473 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
474 if (NULL != pr->hnode)
475 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
476 pr->hnode);
467 GNUNET_free (pr); 477 GNUNET_free (pr);
468 return GNUNET_YES; 478 return GNUNET_YES;
469} 479}
@@ -517,7 +527,7 @@ struct ProcessReplyClosure
517 /** 527 /**
518 * Who gave us this reply? NULL for local host (or DHT) 528 * Who gave us this reply? NULL for local host (or DHT)
519 */ 529 */
520 struct ConnectedPeer *sender; 530 struct GSF_ConnectedPeer *sender;
521 531
522 /** 532 /**
523 * When the reply expires. 533 * When the reply expires.
@@ -577,41 +587,9 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
577 587
578 if (prq->sender == NULL) 588 if (prq->sender == NULL)
579 return; 589 return;
580 /* FIXME: adapt code to new API... */ 590 GSF_peer_update_performance_ (prq->sender,
581 for (i=0;i<pr->used_targets_off;i++) 591 pr->start_time,
582 if (pr->used_targets[i].pid == prq->sender->pid) 592 prq->priority);
583 break;
584 if (i < pr->used_targets_off)
585 {
586 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
587 prq->sender->avg_delay.rel_value
588 = (prq->sender->avg_delay.rel_value *
589 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
590 prq->sender->avg_priority
591 = (prq->sender->avg_priority *
592 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
593 }
594 if (pr->cp != NULL)
595 {
596 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
597 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
598 -1);
599 GNUNET_PEER_change_rc (pr->cp->pid, 1);
600 prq->sender->last_p2p_replies
601 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
602 = pr->cp->pid;
603 }
604 else
605 {
606 if (NULL != prq->sender->last_client_replies
607 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
608 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
609 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
610 prq->sender->last_client_replies
611 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
612 = pr->client_request_list->client_list->client;
613 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
614 }
615} 593}
616 594
617 595
@@ -636,6 +614,7 @@ process_reply (void *cls,
636 struct PutMessage *pm; 614 struct PutMessage *pm;
637 struct ConnectedPeer *cp; 615 struct ConnectedPeer *cp;
638 size_t msize; 616 size_t msize;
617 GNUNET_HashCode chash;
639 618
640#if DEBUG_FS 619#if DEBUG_FS
641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -661,6 +640,7 @@ process_reply (void *cls,
661 update_request_performance_data (prq, pr); 640 update_request_performance_data (prq, pr);
662 break; 641 break;
663 case GNUNET_BLOCK_EVALUATION_OK_LAST: 642 case GNUNET_BLOCK_EVALUATION_OK_LAST:
643 /* short cut: stop processing early, no BF-update, etc. */
664 update_request_performance_data (prq, pr); 644 update_request_performance_data (prq, pr);
665 GNUNET_LOAD_update (rt_entry_lifetime, 645 GNUNET_LOAD_update (rt_entry_lifetime,
666 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); 646 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
@@ -694,18 +674,11 @@ process_reply (void *cls,
694 prq->type); 674 prq->type);
695 return GNUNET_NO; 675 return GNUNET_NO;
696 } 676 }
697 /* FIXME: adapt code to new API! */ 677 /* update bloomfilter */
698 if (pr->client_request_list != NULL) 678 GNUNET_CRYPTO_hash (prq->data,
699 { 679 prq->size,
700 if (pr->replies_seen_size == pr->replies_seen_off) 680 &chash);
701 GNUNET_array_grow (pr->replies_seen, 681 GSF_pending_request_update_ (pr, &chash, 1);
702 pr->replies_seen_size,
703 pr->replies_seen_size * 2 + 4);
704 GNUNET_CRYPTO_hash (prq->data,
705 prq->size,
706 &pr->replies_seen[pr->replies_seen_off++]);
707 refresh_bloomfilter (pr);
708 }
709 if (NULL == prq->sender) 682 if (NULL == prq->sender)
710 { 683 {
711#if DEBUG_FS 684#if DEBUG_FS
@@ -718,11 +691,12 @@ process_reply (void *cls,
718 1, 691 1,
719 GNUNET_NO); 692 GNUNET_NO);
720 } 693 }
721 prq->priority += pr->remaining_priority; 694 prq->priority += pr->public_data.original_priority;
722 pr->remaining_priority = 0; 695 pr->public_data.remaining_priority = 0;
723 pr->results_found++; 696 pr->public_data.original_priority = 0;
697 pr->public_data.results_found++;
724 prq->request_found = GNUNET_YES; 698 prq->request_found = GNUNET_YES;
725 /* finally, pass on to other peers / local clients */ 699 /* finally, pass on to other peer / local client */
726 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES); 700 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
727 return GNUNET_YES; 701 return GNUNET_YES;
728} 702}