aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
commitf54389f6724ecbd39389d53fba7b3bfdb2e0a8eb (patch)
tree11a7156180b22e4eaf784f5b1e400261c00e3ef9 /src/fs/gnunet-service-fs_pr.c
parent3a39cd4cd22e345733ba225e7a4c0b6eecdad7df (diff)
downloadgnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.tar.gz
gnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c807
1 files changed, 766 insertions, 41 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index ad19a807e..047c07587 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -37,16 +37,39 @@ struct GSF_PendingRequest
37 */ 37 */
38 struct GSF_PendingRequestData public_data; 38 struct GSF_PendingRequestData public_data;
39 39
40 /**
41 * Function to call if we encounter a reply.
42 */
40 GSF_PendingRequestReplyHandler rh; 43 GSF_PendingRequestReplyHandler rh;
41 44
45 /**
46 * Closure for 'rh'
47 */
42 void *rh_cls; 48 void *rh_cls;
43 49
44 const GNUNET_HashCode *replies_seen; 50 /**
51 * Array of hash codes of replies we've already seen.
52 */
53 GNUNET_HashCode *replies_seen;
45 54
55 /**
56 * Bloomfilter masking replies we've already seen.
57 */
46 struct GNUNET_CONTAINER_BloomFilter *bf; 58 struct GNUNET_CONTAINER_BloomFilter *bf;
47 59
60 /**
61 * Number of valid entries in the 'replies_seen' array.
62 */
48 unsigned int replies_seen_count; 63 unsigned int replies_seen_count;
49 64
65 /**
66 * Length of the 'replies_seen' array.
67 */
68 unsigned int replies_seen_size;
69
70 /**
71 * Mingle value we currently use for the bf.
72 */
50 int32_t mingle; 73 int32_t mingle;
51 74
52}; 75};
@@ -56,7 +79,99 @@ struct GSF_PendingRequest
56 * All pending requests, ordered by the query. Entries 79 * All pending requests, ordered by the query. Entries
57 * are of type 'struct GSF_PendingRequest*'. 80 * are of type 'struct GSF_PendingRequest*'.
58 */ 81 */
59static struct GNUNET_CONTAINER_MultiHashMap *requests; 82static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
83
84
85/**
86 * Datastore 'PUT' load tracking.
87 */
88static struct GNUNET_LOAD_Value *datastore_put_load;
89
90
91/**
92 * Are we allowed to migrate content to this peer.
93 */
94static int active_to_migration;
95
96
97/**
98 * Heap with the request that will expire next at the top. Contains
99 * pointers of type "struct PendingRequest*"; these will *also* be
100 * aliased from the "requests_by_peer" data structures and the
101 * "requests_by_query" table. Note that requests from our clients
102 * don't expire and are thus NOT in the "requests_by_expiration"
103 * (or the "requests_by_peer" tables).
104 */
105static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
106
107
108/**
109 * How many bytes should a bloomfilter be if we have already seen
110 * entry_count responses? Note that BLOOMFILTER_K gives us the number
111 * of bits set per entry. Furthermore, we should not re-size the
112 * filter too often (to keep it cheap).
113 *
114 * Since other peers will also add entries but not resize the filter,
115 * we should generally pick a slightly larger size than what the
116 * strict math would suggest.
117 *
118 * @return must be a power of two and smaller or equal to 2^15.
119 */
120static size_t
121compute_bloomfilter_size (unsigned int entry_count)
122{
123 size_t size;
124 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125 uint16_t max = 1 << 15;
126
127 if (entry_count > max)
128 return max;
129 size = 8;
130 while ((size < max) && (size < ideal))
131 size *= 2;
132 if (size > max)
133 return max;
134 return size;
135}
136
137
138/**
139 * Recalculate our bloom filter for filtering replies. This function
140 * will create a new bloom filter from scratch, so it should only be
141 * called if we have no bloomfilter at all (and hence can create a
142 * fresh one of minimal size without problems) OR if our peer is the
143 * initiator (in which case we may resize to larger than mimimum size).
144 *
145 * @param pr request for which the BF is to be recomputed
146 * @return GNUNET_YES if a refresh actually happened
147 */
148static int
149refresh_bloomfilter (struct GSF_PendingRequest *pr)
150{
151 unsigned int i;
152 size_t nsize;
153 GNUNET_HashCode mhash;
154
155 nsize = compute_bloomfilter_size (pr->replies_seen_off);
156 if ( (bf != NULL) &&
157 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158 return GNUNET_NO; /* size not changed */
159 if (pr->bf != NULL)
160 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
162 UINT32_MAX);
163 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
164 nsize,
165 BLOOMFILTER_K);
166 for (i=0;i<pr->replies_seen_count;i++)
167 {
168 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
169 pr->mingle,
170 &mhash);
171 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
172 }
173 return GNUNET_YES;
174}
60 175
61 176
62/** 177/**
@@ -92,7 +207,54 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
92 GSF_PendingRequestReplyHandler rh, 207 GSF_PendingRequestReplyHandler rh,
93 void *rh_cls) 208 void *rh_cls)
94{ 209{
95 return NULL; // FIXME 210 struct GSF_PendingRequest *pr;
211
212
213 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
214 pr->public_data.query = *query;
215 if (GNUNET_BLOCK_TYPE_SBLOCK == type)
216 {
217 GNUNET_assert (NULL != namespace);
218 pr->public_data.namespace = *namespace;
219 }
220 if (NULL != target)
221 {
222 pr->public_data.target = *target;
223 pr->has_target = GNUNET_YES;
224 }
225 pr->public_data.anonymity_level = anonymity_data;
226 pr->public_data.priority = priority;
227 pr->public_data.options = options;
228 pr->public_data.type = type;
229 pr->rh = rh;
230 pr->rh_cls = rh_cls;
231 if (replies_seen_count > 0)
232 {
233 pr->replies_seen_size = replies_seen_count;
234 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
235 memcpy (pr->replies_seen,
236 replies_seen,
237 replies_seen_count * sizeof (struct GNUNET_HashCode));
238 pr->replies_seen_count = replies_seen_count;
239 }
240 if (NULL != bf)
241 {
242 pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
243 pr->mingle = mingle;
244 }
245 else if ( (replies_seen_count > 0) &&
246 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
247 {
248 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
249 }
250 GNUNET_CONTAINER_multihashmap_put (pr_map,
251 query,
252 pr,
253 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
254 // FIXME: if not a local query, we also need to track the
255 // total number of external queries we currently have and
256 // bound it => need an additional heap!
257 return pr;
96} 258}
97 259
98 260
@@ -109,34 +271,54 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
109 const GNUNET_HashCode *replies_seen, 271 const GNUNET_HashCode *replies_seen,
110 unsigned int replies_seen_count) 272 unsigned int replies_seen_count)
111{ 273{
112 // FIXME 274 unsigned int i;
113} 275 GNUNET_HashCode mhash;
114 276
115 277 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
116 278 return; /* integer overflow */
117/** 279 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
118 * Get the query for a given pending request. 280 {
119 * 281 /* we're responsible for the BF, full refresh */
120 * @param pr the request 282 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
121 * @return pointer to the query (only valid as long as pr is valid) 283 GNUNET_array_grow (pr->replies_seen,
122 */ 284 pr->replies_seen_size,
123const GNUNET_HashCode * 285 replies_seen_count + pr->replies_seen_count);
124GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr) 286 memcpy (&pr->replies_seen[pr->replies_seen_count],
125{ 287 replies_seen,
126 return NULL; // FIXME 288 sizeof (GNUNET_HashCode) * replies_seen_count);
127} 289 pr->replies_seen_count += replies_seen;
128 290 if (GNUNET_NO == refresh_bloomfilter (pr))
129 291 {
130/** 292 /* bf not recalculated, simply extend it with new bits */
131 * Get the type of a given pending request. 293 for (i=0;i<pr->replies_seen_count;i++)
132 * 294 {
133 * @param pr the request 295 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
134 * @return query type 296 pr->mingle,
135 */ 297 &mhash);
136enum GNUNET_BLOCK_Type 298 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
137GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr) 299 }
138{ 300 }
139 return 0; // FIXME 301 }
302 else
303 {
304 if (NULL == pr->bf)
305 {
306 /* we're not the initiator, but the initiator did not give us
307 any bloom-filter, so we need to create one on-the-fly */
308 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
309 UINT32_MAX);
310 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
311 pr->mingle,
312 BLOOMFILTER_K);
313 }
314 for (i=0;i<pr->replies_seen_count;i++)
315 {
316 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
317 pr->mingle,
318 &mhash);
319 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
320 }
321 }
140} 322}
141 323
142 324
@@ -145,16 +327,102 @@ GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr)
145 * transmission to other peers (or at least determine its size). 327 * transmission to other peers (or at least determine its size).
146 * 328 *
147 * @param pr request to generate the message for 329 * @param pr request to generate the message for
330 * @param do_route are we routing the reply
148 * @param buf_size number of bytes available in buf 331 * @param buf_size number of bytes available in buf
149 * @param buf where to copy the message (can be NULL) 332 * @param buf where to copy the message (can be NULL)
150 * @return number of bytes needed (if > buf_size) or used 333 * @return number of bytes needed (if > buf_size) or used
151 */ 334 */
152size_t 335size_t
153GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 336GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
337 int do_route,
154 size_t buf_size, 338 size_t buf_size,
155 void *buf) 339 void *buf)
156{ 340{
157 return 0; // FIXME 341 struct PendingMessage *pm;
342 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
343 struct GetMessage *gm;
344 GNUNET_HashCode *ext;
345 size_t msize;
346 unsigned int k;
347 int no_route;
348 uint32_t bm;
349 uint32_t prio;
350 size_t bf_size;
351
352 k = 0;
353 bm = 0;
354 if (GNUNET_YES != do_route)
355 {
356 bm |= GET_MESSAGE_BIT_RETURN_TO;
357 k++;
358 }
359 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
360 {
361 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
362 k++;
363 }
364 if (GNUNET_YES == pr->has_target)
365 {
366 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
367 k++;
368 }
369 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
370 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
371 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
372 if (buf_size < msize)
373 return msize;
374 gm = (struct GetMessage*) lbuf;
375 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
376 gm->header.size = htons (msize);
377 gm->type = htonl (pr->type);
378 if (GNUNET_YES == do_route)
379 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
380 pr->public_data.priority + 1);
381 else
382 prio = 0;
383 pr->public_data.priority -= prio;
384 gm->priority = htonl (prio);
385 gm->ttl = htonl (pr->ttl);
386 gm->filter_mutator = htonl(pr->mingle);
387 gm->hash_bitmap = htonl (bm);
388 gm->query = pr->query;
389 ext = (GNUNET_HashCode*) &gm[1];
390 k = 0;
391 if (GNUNET_YES != do_route)
392 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
393 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
394 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
395 if (GNUNET_YES == pr->has_target)
396 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
397 if (pr->bf != NULL)
398 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
399 (char*) &ext[k],
400 bf_size);
401 memcpy (buf, gm, msize);
402 return msize;
403}
404
405
406/**
407 * Iterator to free pending requests.
408 *
409 * @param cls closure, unused
410 * @param key current key code
411 * @param value value in the hash map (pending request)
412 * @return GNUNET_YES (we should continue to iterate)
413 */
414static int
415clean_request (void *cls,
416 const GNUNET_HashCode * key,
417 void *value)
418{
419 struct GSF_PendingRequest *pr = value;
420
421 GNUNET_free_non_null (pr->replies_seen);
422 if (NULL != pr->bf)
423 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
424 GNUNET_free (pr);
425 return GNUNET_YES;
158} 426}
159 427
160 428
@@ -166,6 +434,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
166void 434void
167GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) 435GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
168{ 436{
437 GNUNET_assert (GNUNET_OK ==
438 GNUNET_CONTAINER_multihashmap_remove (pr_map,
439 &pr->public_data.query,
440 pr));
441 GNUNET_assert (GNUNET_YES ==
442 clean_request (NULL, &pr->public_data.query, pr));
169} 443}
170 444
171 445
@@ -176,10 +450,369 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
176 * @param cls closure for it 450 * @param cls closure for it
177 */ 451 */
178void 452void
179GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, 453GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
180 void *cls) 454 void *cls)
455{
456 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
457 (GNUNET_CONTAINER_HashMapIterator) it,
458 cls);
459}
460
461
462
463
464/**
465 * Closure for "process_reply" function.
466 */
467struct ProcessReplyClosure
468{
469 /**
470 * The data for the reply.
471 */
472 const void *data;
473
474 /**
475 * Who gave us this reply? NULL for local host (or DHT)
476 */
477 struct ConnectedPeer *sender;
478
479 /**
480 * When the reply expires.
481 */
482 struct GNUNET_TIME_Absolute expiration;
483
484 /**
485 * Size of data.
486 */
487 size_t size;
488
489 /**
490 * Type of the block.
491 */
492 enum GNUNET_BLOCK_Type type;
493
494 /**
495 * How much was this reply worth to us?
496 */
497 uint32_t priority;
498
499 /**
500 * Anonymity requirements for this reply.
501 */
502 uint32_t anonymity_level;
503
504 /**
505 * Evaluation result (returned).
506 */
507 enum GNUNET_BLOCK_EvaluationResult eval;
508
509 /**
510 * Did we finish processing the associated request?
511 */
512 int finished;
513
514 /**
515 * Did we find a matching request?
516 */
517 int request_found;
518};
519
520
521/**
522 * Update the performance data for the sender (if any) since
523 * the sender successfully answered one of our queries.
524 *
525 * @param prq information about the sender
526 * @param pr request that was satisfied
527 */
528static void
529update_request_performance_data (struct ProcessReplyClosure *prq,
530 struct GSF_PendingRequest *pr)
531{
532 unsigned int i;
533 struct GNUNET_TIME_Relative cur_delay;
534
535 if (prq->sender == NULL)
536 return;
537 /* FIXME: adapt code to new API... */
538 for (i=0;i<pr->used_targets_off;i++)
539 if (pr->used_targets[i].pid == prq->sender->pid)
540 break;
541 if (i < pr->used_targets_off)
542 {
543 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
544 prq->sender->avg_delay.rel_value
545 = (prq->sender->avg_delay.rel_value *
546 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
547 prq->sender->avg_priority
548 = (prq->sender->avg_priority *
549 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
550 }
551 if (pr->cp != NULL)
552 {
553 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
554 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
555 -1);
556 GNUNET_PEER_change_rc (pr->cp->pid, 1);
557 prq->sender->last_p2p_replies
558 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
559 = pr->cp->pid;
560 }
561 else
562 {
563 if (NULL != prq->sender->last_client_replies
564 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
565 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
566 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
567 prq->sender->last_client_replies
568 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
569 = pr->client_request_list->client_list->client;
570 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
571 }
572}
573
574
575
576/**
577 * We have received a reply; handle it!
578 *
579 * @param cls response (struct ProcessReplyClosure)
580 * @param key our query
581 * @param value value in the hash map (info about the query)
582 * @return GNUNET_YES (we should continue to iterate)
583 */
584static int
585process_reply (void *cls,
586 const GNUNET_HashCode * key,
587 void *value)
588{
589 struct ProcessReplyClosure *prq = cls;
590 struct GSF_PendingRequest *pr = value;
591 struct PendingMessage *reply;
592 struct ClientResponseMessage *creply;
593 struct ClientList *cl;
594 struct PutMessage *pm;
595 struct ConnectedPeer *cp;
596 size_t msize;
597
598#if DEBUG_FS
599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600 "Matched result (type %u) for query `%s' with pending request\n",
601 (unsigned int) prq->type,
602 GNUNET_h2s (key));
603#endif
604 GNUNET_STATISTICS_update (stats,
605 gettext_noop ("# replies received and matched"),
606 1,
607 GNUNET_NO);
608 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
609 prq->type,
610 key,
611 &pr->bf,
612 pr->mingle,
613 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
614 prq->data,
615 prq->size);
616 switch (prq->eval)
617 {
618 case GNUNET_BLOCK_EVALUATION_OK_MORE:
619 update_request_performance_data (prq, pr);
620 break;
621 case GNUNET_BLOCK_EVALUATION_OK_LAST:
622 update_request_performance_data (prq, pr);
623 /* FIXME: adapt code to new API! */
624 while (NULL != pr->pending_head)
625 destroy_pending_message_list_entry (pr->pending_head);
626 if (pr->qe != NULL)
627 {
628 if (pr->client_request_list != NULL)
629 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
630 GNUNET_YES);
631 GNUNET_DATASTORE_cancel (pr->qe);
632 pr->qe = NULL;
633 }
634 pr->do_remove = GNUNET_YES;
635 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
636 {
637 GNUNET_SCHEDULER_cancel (pr->task);
638 pr->task = GNUNET_SCHEDULER_NO_TASK;
639 }
640 GNUNET_break (GNUNET_YES ==
641 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
642 key,
643 pr));
644 GNUNET_LOAD_update (rt_entry_lifetime,
645 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
646 break;
647 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
648 GNUNET_STATISTICS_update (stats,
649 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
650 1,
651 GNUNET_NO);
652#if DEBUG_FS && 0
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 "Duplicate response `%s', discarding.\n",
655 GNUNET_h2s (&mhash));
656#endif
657 return GNUNET_YES; /* duplicate */
658 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
659 return GNUNET_YES; /* wrong namespace */
660 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
661 GNUNET_break (0);
662 return GNUNET_YES;
663 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
664 GNUNET_break (0);
665 return GNUNET_YES;
666 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
667 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
668 _("Unsupported block type %u\n"),
669 prq->type);
670 return GNUNET_NO;
671 }
672 /* FIXME: adapt code to new API! */
673 if (pr->client_request_list != NULL)
674 {
675 if (pr->replies_seen_size == pr->replies_seen_off)
676 GNUNET_array_grow (pr->replies_seen,
677 pr->replies_seen_size,
678 pr->replies_seen_size * 2 + 4);
679 GNUNET_CRYPTO_hash (prq->data,
680 prq->size,
681 &pr->replies_seen[pr->replies_seen_off++]);
682 refresh_bloomfilter (pr);
683 }
684 if (NULL == prq->sender)
685 {
686#if DEBUG_FS
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Found result for query `%s' in local datastore\n",
689 GNUNET_h2s (key));
690#endif
691 GNUNET_STATISTICS_update (stats,
692 gettext_noop ("# results found locally"),
693 1,
694 GNUNET_NO);
695 }
696 prq->priority += pr->remaining_priority;
697 pr->remaining_priority = 0;
698 pr->results_found++;
699 prq->request_found = GNUNET_YES;
700 /* finally, pass on to other peers / local clients */
701 pr->rh (pr->rh_cls, pr, prq->data, prq->size);
702 return GNUNET_YES;
703}
704
705
706/**
707 * Continuation called to notify client about result of the
708 * operation.
709 *
710 * @param cls closure
711 * @param success GNUNET_SYSERR on failure
712 * @param msg NULL on success, otherwise an error message
713 */
714static void
715put_migration_continuation (void *cls,
716 int success,
717 const char *msg)
718{
719 struct GNUNET_TIME_Absolute *start = cls;
720 struct GNUNET_TIME_Relative delay;
721
722 delay = GNUNET_TIME_absolute_get_duration (*start);
723 GNUNET_free (start);
724 /* FIXME: should we really update the load value on failure? */
725 GNUNET_LOAD_update (datastore_put_load,
726 delay.rel_value);
727 if (GNUNET_OK == success)
728 return;
729 GNUNET_STATISTICS_update (stats,
730 gettext_noop ("# datastore 'put' failures"),
731 1,
732 GNUNET_NO);
733}
734
735
736/**
737 * Test if the DATABASE (PUT) load on this peer is too high
738 * to even consider processing the query at
739 * all.
740 *
741 * @return GNUNET_YES if the load is too high to do anything (load high)
742 * GNUNET_NO to process normally (load normal or low)
743 */
744static int
745test_put_load_too_high (uint32_t priority)
746{
747 double ld;
748
749 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
750 return GNUNET_NO; /* very fast */
751 ld = GNUNET_LOAD_get_load (datastore_put_load);
752 if (ld < 2.0 * (1 + priority))
753 return GNUNET_NO;
754 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# storage requests dropped due to high load"),
756 1,
757 GNUNET_NO);
758 return GNUNET_YES;
759}
760
761
762/**
763 * Iterator called on each result obtained for a DHT
764 * operation that expects a reply
765 *
766 * @param cls closure
767 * @param exp when will this value expire
768 * @param key key of the result
769 * @param get_path NULL-terminated array of pointers
770 * to the peers on reverse GET path (or NULL if not recorded)
771 * @param put_path NULL-terminated array of pointers
772 * to the peers on the PUT path (or NULL if not recorded)
773 * @param type type of the result
774 * @param size number of bytes in data
775 * @param data pointer to the result data
776 */
777void
778GSF_handle_dht_reply_ (void *cls,
779 struct GNUNET_TIME_Absolute exp,
780 const GNUNET_HashCode * key,
781 const struct GNUNET_PeerIdentity * const *get_path,
782 const struct GNUNET_PeerIdentity * const *put_path,
783 enum GNUNET_BLOCK_Type type,
784 size_t size,
785 const void *data)
181{ 786{
182 // FIXME 787 struct GSF_PendingRequest *pr = cls;
788 struct ProcessReplyClosure prq;
789
790 memset (&prq, 0, sizeof (prq));
791 prq.data = data;
792 prq.expiration = exp;
793 prq.size = size;
794 prq.type = type;
795 process_reply (&prq, key, pr);
796 if ( (GNUNET_YES == active_to_migration) &&
797 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
798 {
799#if DEBUG_FS
800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 "Replicating result for query `%s' with priority %u\n",
802 GNUNET_h2s (&query),
803 prq.priority);
804#endif
805 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
806 *start = GNUNET_TIME_absolute_get ();
807 GNUNET_DATASTORE_put (dsh,
808 0, &query, dsize, &put[1],
809 type, prq.priority, 1 /* anonymity */,
810 expiration,
811 1 + prq.priority, MAX_DATASTORE_QUEUE,
812 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
813 &put_migration_continuation,
814 start);
815 }
183} 816}
184 817
185 818
@@ -189,18 +822,106 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
189 * this content and possibly passes it on (to local clients or other 822 * this content and possibly passes it on (to local clients or other
190 * peers). Does NOT perform migration (content caching at this peer). 823 * peers). Does NOT perform migration (content caching at this peer).
191 * 824 *
192 * @param other the other peer involved (sender or receiver, NULL 825 * @param cp the other peer involved (sender or receiver, NULL
193 * for loopback messages where we are both sender and receiver) 826 * for loopback messages where we are both sender and receiver)
194 * @param message the actual message 827 * @param message the actual message
195 * @return how valueable was the content to us (0 for not at all), 828 * @return GNUNET_OK if the message was well-formed,
196 * GNUNET_SYSERR if the message was malformed (close connection, 829 * GNUNET_SYSERR if the message was malformed (close connection,
197 * do not cache under any circumstances) 830 * do not cache under any circumstances)
198 */ 831 */
199int 832int
200GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, 833GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
201 const struct GNUNET_MessageHeader *message) 834 const struct GNUNET_MessageHeader *message)
202{ 835{
203 return GNUNET_SYSERR; // FIXME 836 const struct PutMessage *put;
837 uint16_t msize;
838 size_t dsize;
839 enum GNUNET_BLOCK_Type type;
840 struct GNUNET_TIME_Absolute expiration;
841 GNUNET_HashCode query;
842 struct ProcessReplyClosure prq;
843 struct GNUNET_TIME_Relative block_time;
844 double putl;
845 struct GNUNET_TIME_Absolute *start;
846
847 msize = ntohs (message->size);
848 if (msize < sizeof (struct PutMessage))
849 {
850 GNUNET_break_op(0);
851 return GNUNET_SYSERR;
852 }
853 put = (const struct PutMessage*) message;
854 dsize = msize - sizeof (struct PutMessage);
855 type = ntohl (put->type);
856 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
857 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
858 return GNUNET_SYSERR;
859 if (GNUNET_OK !=
860 GNUNET_BLOCK_get_key (block_ctx,
861 type,
862 &put[1],
863 dsize,
864 &query))
865 {
866 GNUNET_break_op (0);
867 return GNUNET_SYSERR;
868 }
869 /* now, lookup 'query' */
870 prq.data = (const void*) &put[1];
871 if (NULL != cp)
872 prq.sender = cp;
873 else
874 prq.sender = NULL;
875 prq.size = dsize;
876 prq.type = type;
877 prq.expiration = expiration;
878 prq.priority = 0;
879 prq.anonymity_level = 1;
880 prq.finished = GNUNET_NO;
881 prq.request_found = GNUNET_NO;
882 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
883 &query,
884 &process_reply,
885 &prq);
886 if (NULL != cp)
887 {
888 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
889 GSF_get_peer_performance_data (cp)->trust += prq.priority;
890 }
891 if ( (GNUNET_YES == active_to_migration) &&
892 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
893 {
894#if DEBUG_FS
895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896 "Replicating result for query `%s' with priority %u\n",
897 GNUNET_h2s (&query),
898 prq.priority);
899#endif
900 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
901 *start = GNUNET_TIME_absolute_get ();
902 GNUNET_DATASTORE_put (dsh,
903 0, &query, dsize, &put[1],
904 type, prq.priority, 1 /* anonymity */,
905 expiration,
906 1 + prq.priority, MAX_DATASTORE_QUEUE,
907 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
908 &put_migration_continuation,
909 start);
910 }
911 putl = GNUNET_LOAD_get_load (datastore_put_load);
912 if ( (NULL != (cp = prq.sender)) &&
913 (GNUNET_NO == prq.request_found) &&
914 ( (GNUNET_YES != active_to_migration) ||
915 (putl > 2.5 * (1 + prq.priority)) ) )
916 {
917 if (GNUNET_YES != active_to_migration)
918 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
919 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
920 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
921 (unsigned int) (60000 * putl * putl)));
922 GSF_block_peer_migration (cp, block_time);
923 }
924 return GNUNET_OK;
204} 925}
205 926
206 927
@@ -210,7 +931,7 @@ GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
210void 931void
211GSF_pending_request_init_ () 932GSF_pending_request_init_ ()
212{ 933{
213 // FIXME 934 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
214} 935}
215 936
216 937
@@ -220,7 +941,11 @@ GSF_pending_request_init_ ()
220void 941void
221GSF_pending_request_done_ () 942GSF_pending_request_done_ ()
222{ 943{
223 // FIXME 944 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
945 &clean_request,
946 NULL);
947 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
948 pr_map = NULL;
224} 949}
225 950
226 951